Continuing enforcement of line length and simplifying date conversions

This commit is contained in:
Nex 2022-08-13 02:14:24 +02:00
parent 0f503f72b5
commit 271fe5fbee
60 changed files with 457 additions and 310 deletions

View File

@ -58,8 +58,8 @@ class CmdAndroidCheckBackup(Command):
except InvalidBackupPassword:
log.critical("Invalid backup password")
sys.exit(1)
except AndroidBackupParsingError as e:
log.critical("Impossible to parse this backup file: %s", e)
except AndroidBackupParsingError as exc:
log.critical("Impossible to parse this backup file: %s", exc)
log.critical("Please use Android Backup Extractor (ABE) instead")
sys.exit(1)

View File

@ -83,9 +83,9 @@ class DownloadAPKs(AndroidExtraction):
remote_path)
self._adb_reconnect()
return None
except Exception as e:
except Exception as exc:
log.exception("Failed to pull package file from %s: %s",
remote_path, e)
remote_path, exc)
self._adb_reconnect()
return None

View File

@ -101,8 +101,8 @@ class AndroidExtraction(MVTModule):
self.log.error("Unable to connect to the device over USB. "
"Try to unplug, plug the device and start again.")
sys.exit(-1)
except OSError as e:
if e.errno == 113 and self.serial:
except OSError as exc:
if exc.errno == 113 and self.serial:
self.log.critical("Unable to connect to the device %s: "
"did you specify the correct IP addres?",
self.serial)
@ -185,12 +185,12 @@ class AndroidExtraction(MVTModule):
"""
try:
self.device.pull(remote_path, local_path, progress_callback)
except AdbCommandFailureException as e:
except AdbCommandFailureException as exc:
if retry_root:
self._adb_download_root(remote_path, local_path,
progress_callback)
else:
raise Exception(f"Unable to download file {remote_path}: {e}")
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
def _adb_download_root(self, remote_path: str, local_path: str,
progress_callback: Callable = None) -> None:
@ -222,8 +222,8 @@ class AndroidExtraction(MVTModule):
# Delete the copy on /sdcard/.
self._adb_command(f"rm -rf {new_remote_path}")
except AdbCommandFailureException as e:
raise Exception(f"Unable to download file {remote_path}: {e}")
except AdbCommandFailureException as exc:
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
def _adb_process_file(self, remote_path: str,
process_routine: Callable) -> None:

View File

@ -8,8 +8,8 @@ import os
import sqlite3
from typing import Union
from mvt.common.utils import (convert_chrometime_to_unix,
convert_timestamp_to_iso)
from mvt.common.utils import (convert_chrometime_to_datetime,
convert_datetime_to_iso)
from .base import AndroidExtraction
@ -70,7 +70,7 @@ class ChromeHistory(AndroidExtraction):
"url": item[1],
"visit_id": item[2],
"timestamp": item[3],
"isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(item[3])),
"isodate": convert_datetime_to_iso(convert_chrometime_to_datetime(item[3])),
"redirect_source": item[4],
})
@ -84,5 +84,5 @@ class ChromeHistory(AndroidExtraction):
try:
self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH),
self._parse_db)
except Exception as e:
self.log.error(e)
except Exception as exc:
self.log.error(exc)

View File

@ -3,13 +3,12 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import datetime
import logging
import os
import stat
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_unix_to_iso
from .base import AndroidExtraction
@ -79,7 +78,7 @@ class Files(AndroidExtraction):
for file_line in output.splitlines():
[unix_timestamp, mode, size, owner, group, full_path] = file_line.rstrip().split(" ", 5)
mod_time = convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(float(unix_timestamp))))
mod_time = convert_unix_to_iso(unix_timestamp)
self.results.append({
"path": full_path,

View File

@ -137,8 +137,8 @@ class Packages(AndroidExtraction):
results = virustotal_lookup(hashes[i])
except VTNoKey:
return
except VTQuotaExceeded as e:
print("Unable to continue: %s", e)
except VTQuotaExceeded as exc:
print("Unable to continue: %s", exc)
break
if not results:

View File

@ -11,7 +11,7 @@ from typing import Union
from mvt.android.parsers.backup import (AndroidBackupParsingError,
parse_tar_for_sms)
from mvt.common.module import InsufficientPrivileges
from mvt.common.utils import check_for_links, convert_timestamp_to_iso
from mvt.common.utils import check_for_links, convert_unix_to_iso
from .base import AndroidExtraction
@ -99,7 +99,7 @@ class SMS(AndroidExtraction):
message[names[index]] = value
message["direction"] = ("received" if message["incoming"] == 1 else "sent")
message["isodate"] = convert_timestamp_to_iso(message["timestamp"])
message["isodate"] = convert_unix_to_iso(message["timestamp"])
# If we find links in the messages or if they are empty we add
# them to the list of results.
@ -137,11 +137,11 @@ class SMS(AndroidExtraction):
def run(self) -> None:
try:
if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))):
if self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH)):
self.sms_db_type = 1
self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH),
self._parse_db)
elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))):
elif self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH)):
self.sms_db_type = 2
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH),
self._parse_db)

View File

@ -9,7 +9,7 @@ import os
import sqlite3
from typing import Union
from mvt.common.utils import check_for_links, convert_timestamp_to_iso
from mvt.common.utils import check_for_links, convert_unix_to_iso
from .base import AndroidExtraction
@ -71,7 +71,7 @@ class Whatsapp(AndroidExtraction):
continue
message["direction"] = ("send" if message["key_from_me"] == 1 else "received")
message["isodate"] = convert_timestamp_to_iso(message["timestamp"])
message["isodate"] = convert_unix_to_iso(message["timestamp"])
# If we find links in the messages or if they are empty we add them
# to the list.
@ -92,5 +92,5 @@ class Whatsapp(AndroidExtraction):
try:
self._adb_process_file(os.path.join("/", WHATSAPP_PATH),
self._parse_db)
except Exception as e:
self.log.error(e)
except Exception as exc:
self.log.error(exc)

View File

@ -13,7 +13,7 @@ from cryptography.hazmat.primitives import hashes, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from mvt.common.utils import check_for_links, convert_timestamp_to_iso
from mvt.common.utils import check_for_links, convert_unix_to_iso
PBKDF2_KEY_SIZE = 32
@ -94,8 +94,8 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds,
master_key_checksum_length = ord(key_blob.read(1))
master_key_checksum = key_blob.read(master_key_checksum_length)
except TypeError:
raise InvalidBackupPassword()
except TypeError as exc:
raise InvalidBackupPassword() from exc
# Handle quirky encoding of master key bytes in Android original Java crypto code.
if format_version > 1:
@ -174,8 +174,8 @@ def parse_backup_file(data, password=None):
if is_compressed:
try:
tar_data = zlib.decompress(tar_data)
except zlib.error:
raise AndroidBackupParsingError("Impossible to decompress the backup file")
except zlib.error as exc:
raise AndroidBackupParsingError("Impossible to decompress the backup file") from exc
return tar_data
@ -216,8 +216,7 @@ def parse_sms_file(data):
message_links = check_for_links(entry["body"])
utc_timestamp = datetime.datetime.utcfromtimestamp(int(entry["date"]) / 1000)
entry["isodate"] = convert_timestamp_to_iso(utc_timestamp)
entry["isodate"] = convert_unix_to_iso(int(entry["date"]) / 1000)
entry["direction"] = ("sent" if int(entry["date_sent"]) else "received")
# If we find links in the messages or if they are empty we add them to

View File

@ -6,7 +6,7 @@
import re
from datetime import datetime
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso
def parse_dumpsys_accessibility(output: str) -> list:
@ -357,7 +357,7 @@ def parse_dumpsys_appops(output: str) -> list:
entry["type"] = line[line.find("[")+1:line.find("]")]
try:
entry["timestamp"] = convert_timestamp_to_iso(
entry["timestamp"] = convert_datetime_to_iso(
datetime.strptime(
line[line.find("]")+1:line.find("(")].strip(),
"%Y-%m-%d %H:%M:%S.%f"))

View File

@ -13,7 +13,7 @@ from typing import Callable
from mvt.common.indicators import Indicators
from mvt.common.module import run_module, save_timeline
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.version import MVT_VERSION
@ -48,9 +48,9 @@ class Command:
if self.results_path and not os.path.exists(self.results_path):
try:
os.makedirs(self.results_path)
except Exception as e:
except Exception as exc:
self.log.critical("Unable to create output folder %s: %s",
self.results_path, e)
self.results_path, exc)
sys.exit(1)
def _add_log_file_handler(self, logger: logging.Logger) -> None:
@ -88,7 +88,7 @@ class Command:
info = {
"target_path": target_path,
"mvt_version": MVT_VERSION,
"date": convert_timestamp_to_iso(datetime.now()),
"date": convert_datetime_to_iso(datetime.now()),
"ioc_files": [],
"hashes": [],
}

View File

@ -98,9 +98,10 @@ class MVTModule:
with open(results_json_path, "w", encoding="utf-8") as handle:
try:
json.dump(self.results, handle, indent=4, default=str)
except Exception as e:
self.log.error("Unable to store results of module %s to file %s: %s",
self.__class__.__name__, results_file_name, e)
except Exception as exc:
self.log.error("Unable to store results of module %s "
"to file %s: %s", self.__class__.__name__,
results_file_name, exc)
if self.detected:
detected_file_name = f"{name}_detected.json"
@ -159,18 +160,18 @@ def run_module(module: Callable) -> None:
except NotImplementedError:
module.log.exception("The run() procedure of module %s was not implemented yet!",
module.__class__.__name__)
except InsufficientPrivileges as e:
except InsufficientPrivileges as exc:
module.log.info("Insufficient privileges for module %s: %s",
module.__class__.__name__, e)
except DatabaseNotFoundError as e:
module.__class__.__name__, exc)
except DatabaseNotFoundError as exc:
module.log.info("There might be no data to extract by module %s: %s",
module.__class__.__name__, e)
except DatabaseCorruptedError as e:
module.__class__.__name__, exc)
except DatabaseCorruptedError as exc:
module.log.error("The %s module database seems to be corrupted: %s",
module.__class__.__name__, e)
except Exception as e:
module.__class__.__name__, exc)
except Exception as exc:
module.log.exception("Error in running extraction from module %s: %s",
module.__class__.__name__, e)
module.__class__.__name__, exc)
else:
try:
module.check_indicators()

View File

@ -6,16 +6,18 @@
import datetime
import hashlib
import re
from typing import Union
def convert_mactime_to_unix(timestamp, from_2001: bool = True):
"""Converts Mac Standard Time to a Unix timestamp.
def convert_mactime_to_datetime(timestamp: Union[int, float],
from_2001: bool = True):
"""Converts Mac Standard Time to a datetime.
:param timestamp: MacTime timestamp (either int or float).
:type timestamp: int
:param from_2001: bool: Whether to (Default value = True)
:param from_2001: Default value = True)
:returns: Unix epoch timestamp.
:returns: datetime.
"""
if not timestamp:
@ -37,12 +39,12 @@ def convert_mactime_to_unix(timestamp, from_2001: bool = True):
return None
def convert_chrometime_to_unix(timestamp: int) -> int:
"""Converts Chrome timestamp to a Unix timestamp.
def convert_chrometime_to_datetime(timestamp: int) -> int:
"""Converts Chrome timestamp to a datetime.
:param timestamp: Chrome timestamp as int.
:type timestamp: int
:returns: Unix epoch timestamp.
:returns: datetime.
"""
epoch_start = datetime.datetime(1601, 1, 1)
@ -50,19 +52,50 @@ def convert_chrometime_to_unix(timestamp: int) -> int:
return epoch_start + delta
def convert_timestamp_to_iso(timestamp: str) -> str:
"""Converts Unix timestamp to ISO string.
def convert_datetime_to_iso(datetime: datetime.datetime) -> str:
"""Converts datetime to ISO string.
:param timestamp: Unix timestamp.
:type timestamp: int
:returns: ISO timestamp string in YYYY-mm-dd HH:MM:SS.ms format.
:param datetime: datetime.
:type datetime: datetime.datetime
:returns: ISO datetime string in YYYY-mm-dd HH:MM:SS.ms format.
:rtype: str
"""
try:
return timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")
return datetime.strftime("%Y-%m-%d %H:%M:%S.%f")
except Exception:
return None
return ""
def convert_unix_to_iso(timestamp: int) -> str:
"""Converts a unix epoch to ISO string.
:param timestamp: Epoc timestamp to convert.
:type timestamp: int
:returns: ISO datetime string in YYYY-mm-dd HH:MM:SS.ms format.
:rtype: str
"""
try:
return convert_datetime_to_iso(datetime.datetime.utcfromtimestamp(int(timestamp)))
except Exception:
return ""
def convert_mactime_to_iso(timestamp: int, from_2001: bool = True):
"""Wraps two conversions from mactime to iso date.
:param timestamp: MacTime timestamp (either int or float).
:type timestamp: int
:param from_2001: bool: Whether to (Default value = True)
:param from_2001: Default value = True)
:returns: ISO timestamp string in YYYY-mm-dd HH:MM:SS.ms format.
:rtype: str
"""
return convert_datetime_to_iso(convert_mactime_to_datetime(timestamp,
from_2001))
def check_for_links(text: str) -> list:

View File

@ -102,8 +102,8 @@ class DecryptBackup:
domain, item,
file_id,
item_folder))
except Exception as e:
log.error("Failed to decrypt file %s: %s", relative_path, e)
except Exception as exc:
log.error("Failed to decrypt file %s: %s", relative_path, exc)
pool.close()
pool.join()
@ -144,16 +144,16 @@ class DecryptBackup:
self._backup = iOSbackup(udid=os.path.basename(self.backup_path),
cleartextpassword=password,
backuproot=os.path.dirname(self.backup_path))
except Exception as e:
if isinstance(e, KeyError) and len(e.args) > 0 and e.args[0] == b"KEY":
except Exception as exc:
if isinstance(exc, KeyError) and len(exc.args) > 0 and exc.args[0] == b"KEY":
log.critical("Failed to decrypt backup. Password is probably wrong.")
elif (isinstance(e, FileNotFoundError)
and os.path.basename(e.filename) == "Manifest.plist"):
elif (isinstance(exc, FileNotFoundError)
and os.path.basename(exc.filename) == "Manifest.plist"):
log.critical("Failed to find a valid backup at %s. "
"Did you point to the right backup path?",
self.backup_path)
else:
log.exception(e)
log.exception(exc)
log.critical("Failed to decrypt backup. Did you provide the "
"correct password? Did you point to the right "
"backup path?")
@ -185,8 +185,8 @@ class DecryptBackup:
self._backup = iOSbackup(udid=os.path.basename(self.backup_path),
derivedkey=key_bytes_raw,
backuproot=os.path.dirname(self.backup_path))
except Exception as e:
log.exception(e)
except Exception as exc:
log.exception(exc)
log.critical("Failed to decrypt backup. Did you provide the "
"correct key file?")
@ -212,8 +212,8 @@ class DecryptBackup:
try:
with open(key_path, 'w', encoding="utf-8") as handle:
handle.write(self._decryption_key)
except Exception as e:
log.exception(e)
except Exception as exc:
log.exception(exc)
log.critical("Failed to write key to file: %s", key_path)
return
else:

View File

@ -29,7 +29,8 @@ class BackupInfo(IOSExtraction):
def run(self) -> None:
info_path = os.path.join(self.target_path, "Info.plist")
if not os.path.exists(info_path):
raise DatabaseNotFoundError("No Info.plist at backup path, unable to extract device information")
raise DatabaseNotFoundError("No Info.plist at backup path, unable "
"to extract device information")
with open(info_path, "rb") as handle:
info = plistlib.load(handle)
@ -57,5 +58,6 @@ class BackupInfo(IOSExtraction):
if "Product Version" in info:
latest = latest_ios_version()
if info["Product Version"] != latest["version"]:
self.log.warning("This phone is running an outdated iOS version: %s (latest is %s)",
self.log.warning("This phone is running an outdated iOS "
"version: %s (latest is %s)",
info["Product Version"], latest['version'])

View File

@ -9,7 +9,7 @@ import plistlib
from base64 import b64encode
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@ -119,7 +119,7 @@ class ConfigurationProfiles(IOSExtraction):
"relative_path": conf_file["relative_path"],
"domain": conf_file["domain"],
"plist": conf_plist,
"install_date": convert_timestamp_to_iso(conf_plist.get("InstallDate")),
"install_date": convert_datetime_to_iso(conf_plist.get("InstallDate")),
})
self.log.info("Extracted details about %d configuration profiles", len(self.results))

View File

@ -11,7 +11,7 @@ import plistlib
import sqlite3
from mvt.common.module import DatabaseNotFoundError
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso
from ..base import IOSExtraction
@ -35,7 +35,8 @@ class Manifest(IOSExtraction):
:param key:
"""
return dictionary.get(key.encode("utf-8"), None) or dictionary.get(key, None)
return (dictionary.get(key.encode("utf-8"), None)
or dictionary.get(key, None))
@staticmethod
def _convert_timestamp(timestamp_or_unix_time_int):
@ -45,17 +46,17 @@ class Manifest(IOSExtraction):
"""
if isinstance(timestamp_or_unix_time_int, datetime.datetime):
return convert_timestamp_to_iso(timestamp_or_unix_time_int)
return convert_datetime_to_iso(timestamp_or_unix_time_int)
timestamp = datetime.datetime.utcfromtimestamp(timestamp_or_unix_time_int)
return convert_timestamp_to_iso(timestamp)
return convert_unix_to_iso(timestamp_or_unix_time_int)
def serialize(self, record: dict) -> []:
records = []
if "modified" not in record or "status_changed" not in record:
return records
for timestamp in set([record["created"], record["modified"], record["status_changed"]]):
for timestamp in set([record["created"], record["modified"],
record["status_changed"]]):
macb = ""
macb += "M" if timestamp == record["modified"] else "-"
macb += "-"
@ -80,8 +81,11 @@ class Manifest(IOSExtraction):
continue
if result["domain"]:
if os.path.basename(result["relative_path"]) == "com.apple.CrashReporter.plist" and result["domain"] == "RootDomain":
self.log.warning("Found a potentially suspicious \"com.apple.CrashReporter.plist\" file created in RootDomain")
if (os.path.basename(result["relative_path"]) == "com.apple.CrashReporter.plist"
and result["domain"] == "RootDomain"):
self.log.warning("Found a potentially suspicious "
"\"com.apple.CrashReporter.plist\" "
"file created in RootDomain")
self.detected.append(result)
continue
@ -92,7 +96,8 @@ class Manifest(IOSExtraction):
rel_path = result["relative_path"].lower()
for ioc in self.indicators.get_iocs("domains"):
if ioc["value"].lower() in rel_path:
self.log.warning("Found mention of domain \"%s\" in a backup file with path: %s",
self.log.warning("Found mention of domain \"%s\" in a "
"backup file with path: %s",
ioc["value"], rel_path)
self.detected.append(result)
@ -101,7 +106,8 @@ class Manifest(IOSExtraction):
if not os.path.isfile(manifest_db_path):
raise DatabaseNotFoundError("unable to find backup's Manifest.db")
self.log.info("Found Manifest.db database at path: %s", manifest_db_path)
self.log.info("Found Manifest.db database at path: %s",
manifest_db_path)
conn = sqlite3.connect(manifest_db_path)
cur = conn.cursor()
@ -128,19 +134,24 @@ class Manifest(IOSExtraction):
file_metadata = self._get_key(file_plist, "$objects")[1]
cleaned_metadata.update({
"created": self._convert_timestamp(self._get_key(file_metadata, "Birth")),
"modified": self._convert_timestamp(self._get_key(file_metadata, "LastModified")),
"status_changed": self._convert_timestamp(self._get_key(file_metadata, "LastStatusChange")),
"modified": self._convert_timestamp(self._get_key(file_metadata,
"LastModified")),
"status_changed": self._convert_timestamp(self._get_key(file_metadata,
"LastStatusChange")),
"mode": oct(self._get_key(file_metadata, "Mode")),
"owner": self._get_key(file_metadata, "UserID"),
"size": self._get_key(file_metadata, "Size"),
})
except Exception:
self.log.exception("Error reading manifest file metadata for file with ID %s and relative path %s",
file_data["fileID"], file_data["relativePath"])
self.log.exception("Error reading manifest file metadata "
"for file with ID %s and relative path %s",
file_data["fileID"],
file_data["relativePath"])
self.results.append(cleaned_metadata)
cur.close()
conn.close()
self.log.info("Extracted a total of %d file metadata items", len(self.results))
self.log.info("Extracted a total of %d file metadata items",
len(self.results))

View File

@ -7,7 +7,7 @@ import logging
import plistlib
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@ -34,7 +34,8 @@ class ProfileEvents(IOSExtraction):
"module": self.__class__.__name__,
"event": "profile_operation",
"data": f"Process {record.get('process')} started operation "
f"{record.get('operation')} of profile {record.get('profile_id')}"
f"{record.get('operation')} of profile "
f"{record.get('profile_id')}"
}
def check_indicators(self) -> None:
@ -75,7 +76,7 @@ class ProfileEvents(IOSExtraction):
for key, value in event[key].items():
key = key.lower()
if key == "timestamp":
result["timestamp"] = str(convert_timestamp_to_iso(value))
result["timestamp"] = str(convert_datetime_to_iso(value))
else:
result[key] = value
@ -89,13 +90,15 @@ class ProfileEvents(IOSExtraction):
if not events_file_path:
continue
self.log.info("Found MCProfileEvents.plist file at %s", events_file_path)
self.log.info("Found MCProfileEvents.plist file at %s",
events_file_path)
with open(events_file_path, "rb") as handle:
self.results.extend(self.parse_profile_events(handle.read()))
for result in self.results:
self.log.info("On %s process \"%s\" started operation \"%s\" of profile \"%s\"",
self.log.info("On %s process \"%s\" started operation \"%s\" "
"of profile \"%s\"",
result.get("timestamp"), result.get("process"),
result.get("operation"), result.get("profile_id"))

View File

@ -44,8 +44,8 @@ class IOSExtraction(MVTModule):
try:
recover = False
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
except sqlite3.DatabaseError as e:
if "database disk image is malformed" in str(e):
except sqlite3.DatabaseError as exc:
if "database disk image is malformed" in str(exc):
recover = True
finally:
conn.close()
@ -102,8 +102,8 @@ class IOSExtraction(MVTModule):
(relative_path,))
elif domain:
cur.execute(f"{base_sql} domain = ?;", (domain,))
except Exception as e:
raise DatabaseCorruptedError(f"failed to query Manifest.db: {e}")
except Exception as exc:
raise DatabaseCorruptedError(f"failed to query Manifest.db: {exc}") from exc
for row in cur:
yield {

View File

@ -8,7 +8,7 @@ import plistlib
import sqlite3
from typing import Union
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@ -18,7 +18,8 @@ ANALYTICS_DB_PATH = [
class Analytics(IOSExtraction):
"""This module extracts information from the private/var/Keychains/Analytics/*.db files."""
"""This module extracts information from the
private/var/Keychains/Analytics/*.db files."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
@ -47,16 +48,20 @@ class Analytics(IOSExtraction):
ioc = self.indicators.check_process(value)
if ioc:
self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s",
value, result["artifact"], result["timestamp"])
self.log.warning("Found mention of a malicious process "
"\"%s\" in %s file at %s",
value, result["artifact"],
result["timestamp"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc = self.indicators.check_domain(value)
if ioc:
self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s",
value, result["artifact"], result["timestamp"])
self.log.warning("Found mention of a malicious domain "
"\"%s\" in %s file at %s",
value, result["artifact"],
result["timestamp"])
result["matched_indicator"] = ioc
self.detected.append(result)
@ -98,11 +103,11 @@ class Analytics(IOSExtraction):
for row in cur:
if row[0] and row[1]:
isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[0], False))
isodate = convert_mactime_to_iso(row[0], False)
data = plistlib.loads(row[1])
data["isodate"] = isodate
elif row[0]:
isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[0], False))
isodate = convert_mactime_to_iso(row[0], False)
data = {}
data["isodate"] = isodate
elif row[1]:
@ -120,7 +125,8 @@ class Analytics(IOSExtraction):
def process_analytics_dbs(self):
for file_path in self._get_fs_files_from_patterns(ANALYTICS_DB_PATH):
self.file_path = file_path
self.log.info("Found Analytics database file at path: %s", file_path)
self.log.info("Found Analytics database file at path: %s",
file_path)
self._extract_analytics_data()
def run(self) -> None:

View File

@ -8,7 +8,7 @@ import logging
import os
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_unix_to_iso
from ..base import IOSExtraction
@ -56,7 +56,9 @@ class Filesystem(IOSExtraction):
for ioc in self.indicators.get_iocs("processes"):
parts = result["path"].split("/")
if ioc["value"] in parts:
self.log.warning("Found known suspicious process name mentioned in file at path \"%s\" matching indicators from \"%s\"",
self.log.warning("Found known suspicious process name "
"mentioned in file at path \"%s\" "
"matching indicators from \"%s\"",
result["path"], ioc["name"])
result["matched_indicator"] = ioc
self.detected.append(result)
@ -68,7 +70,7 @@ class Filesystem(IOSExtraction):
dir_path = os.path.join(root, dir_name)
result = {
"path": os.path.relpath(dir_path, self.target_path),
"modified": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(dir_path).st_mtime)),
"modified": convert_unix_to_iso(os.stat(dir_path).st_mtime),
}
except Exception:
continue
@ -80,7 +82,7 @@ class Filesystem(IOSExtraction):
file_path = os.path.join(root, file_name)
result = {
"path": os.path.relpath(file_path, self.target_path),
"modified": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(file_path).st_mtime)),
"modified": convert_unix_to_iso(os.stat(file_path).st_mtime),
}
except Exception:
continue

View File

@ -35,8 +35,9 @@ class Netusage(NetBase):
self.log.info("Found NetUsage database at path: %s", self.file_path)
try:
self._extract_net_data()
except sqlite3.OperationalError as e:
self.log.info("Skipping this NetUsage database because it seems empty or malformed: %s", e)
except sqlite3.OperationalError as exc:
self.log.info("Skipping this NetUsage database because "
"it seems empty or malformed: %s", exc)
continue
self._find_suspicious_processes()

View File

@ -7,7 +7,7 @@ import logging
import sqlite3
from typing import Union
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@ -33,7 +33,8 @@ class SafariFavicon(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "safari_favicon",
"data": f"Safari favicon from {record['url']} with icon URL {record['icon_url']} ({record['type']})",
"data": f"Safari favicon from {record['url']} with icon URL "
f"{record['icon_url']} ({record['type']})",
}
def check_indicators(self) -> None:
@ -69,7 +70,7 @@ class SafariFavicon(IOSExtraction):
"url": row[0],
"icon_url": row[1],
"timestamp": row[2],
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[2])),
"isodate": convert_mactime_to_iso(row[2]),
"type": "valid",
"safari_favicon_db_path": file_path,
})
@ -88,7 +89,7 @@ class SafariFavicon(IOSExtraction):
"url": row[0],
"icon_url": row[1],
"timestamp": row[2],
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[2])),
"isodate": convert_mactime_to_iso(row[2]),
"type": "rejected",
"safari_favicon_db_path": file_path,
})
@ -98,8 +99,10 @@ class SafariFavicon(IOSExtraction):
def run(self) -> None:
for file_path in self._get_fs_files_from_patterns(SAFARI_FAVICON_ROOT_PATHS):
self.log.info("Found Safari favicon cache database at path: %s", file_path)
self.log.info("Found Safari favicon cache database at path: %s",
file_path)
self._process_favicon_db(file_path)
self.log.info("Extracted a total of %d favicon records", len(self.results))
self.log.info("Extracted a total of %d favicon records",
len(self.results))
self.results = sorted(self.results, key=lambda x: x["isodate"])

View File

@ -6,7 +6,7 @@
import logging
from typing import Union
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@ -31,7 +31,8 @@ class ShutdownLog(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "shutdown",
"data": f"Client {record['client']} with PID {record['pid']} was running when the device was shut down",
"data": f"Client {record['client']} with PID {record['pid']} "
"was running when the device was shut down",
}
def check_indicators(self) -> None:
@ -48,7 +49,8 @@ class ShutdownLog(IOSExtraction):
for ioc in self.indicators.get_iocs("processes"):
parts = result["client"].split("/")
if ioc in parts:
self.log.warning("Found mention of a known malicious process \"%s\" in shutdown.log",
self.log.warning("Found mention of a known malicious "
"process \"%s\" in shutdown.log",
ioc)
result["matched_indicator"] = ioc
self.detected.append(result)
@ -74,8 +76,7 @@ class ShutdownLog(IOSExtraction):
except Exception:
mac_timestamp = 0
timestamp = convert_mactime_to_unix(mac_timestamp, from_2001=False)
isodate = convert_timestamp_to_iso(timestamp)
isodate = convert_mactime_to_iso(mac_timestamp, from_2001=False)
for current_process in current_processes:
self.results.append({

View File

@ -8,7 +8,7 @@ import json
import logging
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@ -45,7 +45,7 @@ class IOSVersionHistory(IOSExtraction):
"%Y-%m-%d %H:%M:%S.%f %z")
timestamp_utc = timestamp.astimezone(datetime.timezone.utc)
self.results.append({
"isodate": convert_timestamp_to_iso(timestamp_utc),
"isodate": convert_datetime_to_iso(timestamp_utc),
"os_version": log_line["os_version"],
})

View File

@ -6,7 +6,7 @@
import datetime
import os
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@ -35,9 +35,10 @@ class WebkitBase(IOSExtraction):
name = name.replace("http_", "http://")
name = name.replace("https_", "https://")
url = name.split("_")[0]
utc_timestamp = datetime.datetime.utcfromtimestamp(os.stat(found_path).st_mtime)
self.results.append({
"folder": key,
"url": url,
"isodate": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(found_path).st_mtime)),
"isodate": convert_datetime_to_iso(utc_timestamp),
})

View File

@ -35,7 +35,8 @@ class WebkitIndexedDB(WebkitBase):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "webkit_indexeddb",
"data": f"IndexedDB folder {record['folder']} containing file for URL {record['url']}",
"data": f"IndexedDB folder {record['folder']} containing "
f"file for URL {record['url']}",
}
def run(self) -> None:

View File

@ -33,10 +33,12 @@ class WebkitLocalStorage(WebkitBase):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "webkit_local_storage",
"data": f"WebKit Local Storage folder {record['folder']} containing file for URL {record['url']}",
"data": f"WebKit Local Storage folder {record['folder']} "
f"containing file for URL {record['url']}",
}
def run(self) -> None:
self._process_webkit_folder(WEBKIT_LOCALSTORAGE_ROOT_PATHS)
self.log.info("Extracted a total of %d records from WebKit Local Storages",
self.log.info("Extracted a total of %d records from WebKit "
"Local Storages",
len(self.results))

View File

@ -29,5 +29,6 @@ class WebkitSafariViewService(WebkitBase):
def run(self) -> None:
self._process_webkit_folder(WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS)
self.log.info("Extracted a total of %d records from WebKit SafariViewService WebsiteData",
self.log.info("Extracted a total of %d records from WebKit "
"SafariViewService WebsiteData",
len(self.results))

View File

@ -7,7 +7,7 @@ import logging
import sqlite3
from typing import Union
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@ -35,7 +35,8 @@ class Calls(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "call",
"data": f"From {record['number']} using {record['provider']} during {record['duration']} seconds"
"data": f"From {record['number']} using {record['provider']} "
f"during {record['duration']} seconds"
}
def run(self) -> None:
@ -54,7 +55,7 @@ class Calls(IOSExtraction):
for row in cur:
self.results.append({
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[0])),
"isodate": convert_mactime_to_iso(row[0]),
"duration": row[1],
"location": row[2],
"number": row[3].decode("utf-8") if row[3] and row[3] is bytes else row[3],

View File

@ -7,8 +7,8 @@ import logging
import sqlite3
from typing import Union
from mvt.common.utils import (convert_chrometime_to_unix,
convert_timestamp_to_iso)
from mvt.common.utils import (convert_chrometime_to_datetime,
convert_datetime_to_iso)
from ..base import IOSExtraction
@ -57,7 +57,8 @@ class ChromeFavicon(IOSExtraction):
def run(self) -> None:
self._find_ios_database(backup_ids=CHROME_FAVICON_BACKUP_IDS,
root_paths=CHROME_FAVICON_ROOT_PATHS)
self.log.info("Found Chrome favicon cache database at path: %s", self.file_path)
self.log.info("Found Chrome favicon cache database at path: %s",
self.file_path)
conn = sqlite3.connect(self.file_path)
@ -82,7 +83,7 @@ class ChromeFavicon(IOSExtraction):
"url": row[0],
"icon_url": row[1],
"timestamp": last_timestamp,
"isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(last_timestamp)),
"isodate": convert_datetime_to_iso(convert_chrometime_to_datetime(last_timestamp)),
})
cur.close()

View File

@ -7,8 +7,8 @@ import logging
import sqlite3
from typing import Union
from mvt.common.utils import (convert_chrometime_to_unix,
convert_timestamp_to_iso)
from mvt.common.utils import (convert_chrometime_to_datetime,
convert_datetime_to_iso)
from ..base import IOSExtraction
@ -37,7 +37,9 @@ class ChromeHistory(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "visit",
"data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, redirect source: {record['redirect_source']})"
"data": f"{record['id']} - {record['url']} "
f"(visit ID: {record['visit_id']}, "
f"redirect source: {record['redirect_source']})"
}
def check_indicators(self) -> None:
@ -53,7 +55,8 @@ class ChromeHistory(IOSExtraction):
def run(self) -> None:
self._find_ios_database(backup_ids=CHROME_HISTORY_BACKUP_IDS,
root_paths=CHROME_HISTORY_ROOT_PATHS)
self.log.info("Found Chrome history database at path: %s", self.file_path)
self.log.info("Found Chrome history database at path: %s",
self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
@ -75,11 +78,12 @@ class ChromeHistory(IOSExtraction):
"url": item[1],
"visit_id": item[2],
"timestamp": item[3],
"isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(item[3])),
"isodate": convert_datetime_to_iso(convert_chrometime_to_datetime(item[3])),
"redirect_source": item[4],
})
cur.close()
conn.close()
self.log.info("Extracted a total of %d history items", len(self.results))
self.log.info("Extracted a total of %d history items",
len(self.results))

View File

@ -28,7 +28,8 @@ class Contacts(IOSExtraction):
log=log, results=results)
def run(self) -> None:
self._find_ios_database(backup_ids=CONTACTS_BACKUP_IDS, root_paths=CONTACTS_ROOT_PATHS)
self._find_ios_database(backup_ids=CONTACTS_BACKUP_IDS,
root_paths=CONTACTS_ROOT_PATHS)
self.log.info("Found Contacts database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)

View File

@ -8,7 +8,7 @@ import sqlite3
from datetime import datetime
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_unix_to_iso
from ..base import IOSExtraction
@ -36,7 +36,8 @@ class FirefoxFavicon(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "firefox_history",
"data": f"Firefox favicon {record['url']} when visiting {record['history_url']}",
"data": f"Firefox favicon {record['url']} "
f"when visiting {record['history_url']}",
}
def check_indicators(self) -> None:
@ -55,7 +56,8 @@ class FirefoxFavicon(IOSExtraction):
def run(self) -> None:
self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS,
root_paths=FIREFOX_HISTORY_ROOT_PATHS)
self.log.info("Found Firefox favicon database at path: %s", self.file_path)
self.log.info("Found Firefox favicon database at path: %s",
self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
@ -81,7 +83,7 @@ class FirefoxFavicon(IOSExtraction):
"width": item[2],
"height": item[3],
"type": item[4],
"isodate": convert_timestamp_to_iso(datetime.utcfromtimestamp(item[5])),
"isodate": convert_unix_to_iso(item[5]),
"history_id": item[6],
"history_url": item[7]
})
@ -89,4 +91,5 @@ class FirefoxFavicon(IOSExtraction):
cur.close()
conn.close()
self.log.info("Extracted a total of %d history items", len(self.results))
self.log.info("Extracted a total of %d history items",
len(self.results))

View File

@ -5,10 +5,9 @@
import logging
import sqlite3
from datetime import datetime
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_unix_to_iso
from ..base import IOSExtraction
@ -40,7 +39,8 @@ class FirefoxHistory(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "firefox_history",
"data": f"Firefox visit with ID {record['id']} to URL: {record['url']}",
"data": f"Firefox visit with ID {record['id']} "
f"to URL: {record['url']}",
}
def check_indicators(self) -> None:
@ -54,8 +54,10 @@ class FirefoxHistory(IOSExtraction):
self.detected.append(result)
def run(self) -> None:
self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS, root_paths=FIREFOX_HISTORY_ROOT_PATHS)
self.log.info("Found Firefox history database at path: %s", self.file_path)
self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS,
root_paths=FIREFOX_HISTORY_ROOT_PATHS)
self.log.info("Found Firefox history database at path: %s",
self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
@ -74,7 +76,7 @@ class FirefoxHistory(IOSExtraction):
for row in cur:
self.results.append({
"id": row[0],
"isodate": convert_timestamp_to_iso(datetime.utcfromtimestamp(row[1])),
"isodate": convert_unix_to_iso(row[1]),
"url": row[2],
"title": row[3],
"i1000000s_local": row[4],
@ -84,4 +86,5 @@ class FirefoxHistory(IOSExtraction):
cur.close()
conn.close()
self.log.info("Extracted a total of %d history items", len(self.results))
self.log.info("Extracted a total of %d history items",
len(self.results))

View File

@ -8,7 +8,7 @@ import logging
import plistlib
from typing import Union
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@ -37,7 +37,8 @@ class IDStatusCache(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "lookup",
"data": f"Lookup of {record['user']} within {record['package']} (Status {record['idstatus']})"
"data": f"Lookup of {record['user']} within {record['package']} "
f"(Status {record['idstatus']})"
}
def check_indicators(self) -> None:
@ -54,7 +55,8 @@ class IDStatusCache(IOSExtraction):
continue
if "\\x00\\x00" in result.get("user", ""):
self.log.warning("Found an ID Status Cache entry with suspicious patterns: %s",
self.log.warning("Found an ID Status Cache entry with "
"suspicious patterns: %s",
result.get("user"))
self.detected.append(result)
@ -77,7 +79,7 @@ class IDStatusCache(IOSExtraction):
id_status_cache_entries.append({
"package": app,
"user": entry.replace("\x00", "\\x00"),
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(lookup_date)),
"isodate": convert_mactime_to_iso(lookup_date),
"idstatus": id_status,
})
@ -91,12 +93,15 @@ class IDStatusCache(IOSExtraction):
if self.is_backup:
self._find_ios_database(backup_ids=IDSTATUSCACHE_BACKUP_IDS)
self.log.info("Found IDStatusCache plist at path: %s", self.file_path)
self.log.info("Found IDStatusCache plist at path: %s",
self.file_path)
self._extract_idstatuscache_entries(self.file_path)
elif self.is_fs_dump:
for idstatuscache_path in self._get_fs_files_from_patterns(IDSTATUSCACHE_ROOT_PATHS):
self.file_path = idstatuscache_path
self.log.info("Found IDStatusCache plist at path: %s", self.file_path)
self.log.info("Found IDStatusCache plist at path: %s",
self.file_path)
self._extract_idstatuscache_entries(self.file_path)
self.log.info("Extracted a total of %d ID Status Cache entries", len(self.results))
self.log.info("Extracted a total of %d ID Status Cache entries",
len(self.results))

View File

@ -7,7 +7,7 @@ import logging
import sqlite3
from typing import Union
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@ -59,9 +59,12 @@ class InteractionC(IOSExtraction):
"timestamp": record[timestamp],
"module": self.__class__.__name__,
"event": timestamp,
"data": f"[{record['bundle_id']}] {record['account']} - from {record['sender_display_name']} "
f"({record['sender_identifier']}) to {record['recipient_display_name']} "
f"({record['recipient_identifier']}): {record['content']}"
"data": f"[{record['bundle_id']}] {record['account']} - "
f"from {record['sender_display_name']} "
f"({record['sender_identifier']}) "
f"to {record['recipient_display_name']} "
f"({record['recipient_identifier']}): "
f"{record['content']}"
})
processed.append(record[timestamp])
@ -133,8 +136,8 @@ class InteractionC(IOSExtraction):
for row in cur:
self.results.append({
"start_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[0])),
"end_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[1])),
"start_date": convert_mactime_to_iso(row[0]),
"end_date": convert_mactime_to_iso(row[1]),
"bundle_id": row[2],
"account": row[3],
"target_bundle_id": row[4],
@ -158,14 +161,14 @@ class InteractionC(IOSExtraction):
"incoming_recipient_count": row[22],
"incoming_sender_count": row[23],
"outgoing_recipient_count": row[24],
"interactions_creation_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[25])) if row[25] else None,
"contacts_creation_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[26])) if row[26] else None,
"first_incoming_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[27])) if row[27] else None,
"first_incoming_sender_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[28])) if row[28] else None,
"first_outgoing_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[29])) if row[29] else None,
"last_incoming_sender_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[30])) if row[30] else None,
"last_incoming_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[31])) if row[31] else None,
"last_outgoing_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[32])) if row[32] else None,
"interactions_creation_date": convert_mactime_to_iso(row[25]) if row[25] else None,
"contacts_creation_date": convert_mactime_to_iso(row[26]) if row[26] else None,
"first_incoming_recipient_date": convert_mactime_to_iso(row[27]) if row[27] else None,
"first_incoming_sender_date": convert_mactime_to_iso(row[28]) if row[28] else None,
"first_outgoing_recipient_date": convert_mactime_to_iso(row[29]) if row[29] else None,
"last_incoming_sender_date": convert_mactime_to_iso(row[30]) if row[30] else None,
"last_incoming_recipient_date": convert_mactime_to_iso(row[31]) if row[31] else None,
"last_outgoing_recipient_date": convert_mactime_to_iso(row[32]) if row[32] else None,
"custom_id": row[33],
"location_uuid": row[35],
"group_name": row[36],
@ -176,4 +179,5 @@ class InteractionC(IOSExtraction):
cur.close()
conn.close()
self.log.info("Extracted a total of %d InteractionC events", len(self.results))
self.log.info("Extracted a total of %d InteractionC events",
len(self.results))

View File

@ -7,7 +7,7 @@ import logging
import plistlib
from typing import Union
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@ -66,8 +66,8 @@ class LocationdClients(IOSExtraction):
ioc = self.indicators.check_process(proc_name)
if ioc:
self.log.warning("Found a suspicious process name in LocationD entry %s",
result["package"])
self.log.warning("Found a suspicious process name in "
"LocationD entry %s", result["package"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
@ -75,8 +75,8 @@ class LocationdClients(IOSExtraction):
if "BundlePath" in result:
ioc = self.indicators.check_file_path(result["BundlePath"])
if ioc:
self.log.warning("Found a suspicious file path in Location D: %s",
result["BundlePath"])
self.log.warning("Found a suspicious file path in "
"Location D: %s", result["BundlePath"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
@ -84,8 +84,8 @@ class LocationdClients(IOSExtraction):
if "Executable" in result:
ioc = self.indicators.check_file_path(result["Executable"])
if ioc:
self.log.warning("Found a suspicious file path in Location D: %s",
result["Executable"])
self.log.warning("Found a suspicious file path in "
"Location D: %s", result["Executable"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
@ -93,8 +93,8 @@ class LocationdClients(IOSExtraction):
if "Registered" in result:
ioc = self.indicators.check_file_path(result["Registered"])
if ioc:
self.log.warning("Found a suspicious file path in Location D: %s",
result["Registered"])
self.log.warning("Found a suspicious file path in "
"Location D: %s", result["Registered"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
@ -108,19 +108,22 @@ class LocationdClients(IOSExtraction):
result["package"] = key
for timestamp in self.timestamps:
if timestamp in result.keys():
result[timestamp] = convert_timestamp_to_iso(convert_mactime_to_unix(result[timestamp]))
result[timestamp] = convert_mactime_to_iso(result[timestamp])
self.results.append(result)
def run(self) -> None:
if self.is_backup:
self._find_ios_database(backup_ids=LOCATIOND_BACKUP_IDS)
self.log.info("Found Locationd Clients plist at path: %s", self.file_path)
self.log.info("Found Locationd Clients plist at path: %s",
self.file_path)
self._extract_locationd_entries(self.file_path)
elif self.is_fs_dump:
for locationd_path in self._get_fs_files_from_patterns(LOCATIOND_ROOT_PATHS):
self.file_path = locationd_path
self.log.info("Found Locationd Clients plist at path: %s", self.file_path)
self.log.info("Found Locationd Clients plist at path: %s",
self.file_path)
self._extract_locationd_entries(self.file_path)
self.log.info("Extracted a total of %d Locationd Clients entries", len(self.results))
self.log.info("Extracted a total of %d Locationd Clients entries",
len(self.results))

View File

@ -7,7 +7,7 @@ import logging
import plistlib
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@ -20,7 +20,8 @@ OSANALYTICS_ADDAILY_ROOT_PATHS = [
class OSAnalyticsADDaily(IOSExtraction):
"""Extract network usage information by process, from com.apple.osanalytics.addaily.plist"""
"""Extract network usage information by process,
from com.apple.osanalytics.addaily.plist"""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
@ -31,13 +32,14 @@ class OSAnalyticsADDaily(IOSExtraction):
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
record_data = f"{record['package']} WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \
f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}"
return {
"timestamp": record["ts"],
"module": self.__class__.__name__,
"event": "osanalytics_addaily",
"data": record_data,
"data": f"{record['package']} WIFI IN: {record['wifi_in']}, "
f"WIFI OUT: {record['wifi_out']} - "
f"WWAN IN: {record['wwan_in']}, "
f"WWAN OUT: {record['wwan_out']}",
}
def check_indicators(self) -> None:
@ -53,7 +55,8 @@ class OSAnalyticsADDaily(IOSExtraction):
def run(self) -> None:
self._find_ios_database(backup_ids=OSANALYTICS_ADDAILY_BACKUP_IDS,
root_paths=OSANALYTICS_ADDAILY_ROOT_PATHS)
self.log.info("Found com.apple.osanalytics.addaily plist at path: %s", self.file_path)
self.log.info("Found com.apple.osanalytics.addaily plist at path: %s",
self.file_path)
with open(self.file_path, "rb") as handle:
file_plist = plistlib.load(handle)
@ -61,11 +64,12 @@ class OSAnalyticsADDaily(IOSExtraction):
for app, values in file_plist.get("netUsageBaseline", {}).items():
self.results.append({
"package": app,
"ts": convert_timestamp_to_iso(values[0]),
"ts": convert_datetime_to_iso(values[0]),
"wifi_in": values[1],
"wifi_out": values[2],
"wwan_in": values[3],
"wwan_out": values[4],
})
self.log.info("Extracted a total of %d com.apple.osanalytics.addaily entries", len(self.results))
self.log.info("Extracted a total of %d com.apple.osanalytics.addaily "
"entries", len(self.results))

View File

@ -10,8 +10,7 @@ import plistlib
import sqlite3
from typing import Union
from mvt.common.utils import (convert_mactime_to_unix,
convert_timestamp_to_iso, keys_bytes_to_string)
from mvt.common.utils import convert_mactime_to_iso, keys_bytes_to_string
from ..base import IOSExtraction
@ -117,9 +116,10 @@ class SafariBrowserState(IOSExtraction):
"tab_title": row[0],
"tab_url": row[1],
"tab_visible_url": row[2],
"last_viewed_timestamp": convert_timestamp_to_iso(convert_mactime_to_unix(row[3])),
"last_viewed_timestamp": convert_mactime_to_iso(row[3]),
"session_data": session_entries,
"safari_browser_state_db": os.path.relpath(db_path, self.target_path),
"safari_browser_state_db": os.path.relpath(db_path,
self.target_path),
})
def run(self) -> None:
@ -129,12 +129,15 @@ class SafariBrowserState(IOSExtraction):
if not browserstate_path:
continue
self.log.info("Found Safari browser state database at path: %s", browserstate_path)
self.log.info("Found Safari browser state database at path: %s",
browserstate_path)
self._process_browser_state_db(browserstate_path)
elif self.is_fs_dump:
for browserstate_path in self._get_fs_files_from_patterns(SAFARI_BROWSER_STATE_ROOT_PATHS):
self.log.info("Found Safari browser state database at path: %s", browserstate_path)
self.log.info("Found Safari browser state database at path: %s",
browserstate_path)
self._process_browser_state_db(browserstate_path)
self.log.info("Extracted a total of %d tab records and %d session history entries",
len(self.results), self._session_history_count)
self.log.info("Extracted a total of %d tab records and %d session "
"history entries", len(self.results),
self._session_history_count)

View File

@ -9,7 +9,8 @@ import sqlite3
from typing import Union
from mvt.common.url import URL
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import (convert_mactime_to_datetime,
convert_mactime_to_iso)
from ..base import IOSExtraction
@ -69,8 +70,8 @@ class SafariHistory(IOSExtraction):
self.log.info("Found HTTP redirect to different domain: \"%s\" -> \"%s\"",
origin_domain, redirect_domain)
redirect_time = convert_mactime_to_unix(redirect["timestamp"])
origin_time = convert_mactime_to_unix(result["timestamp"])
redirect_time = convert_mactime_to_datetime(redirect["timestamp"])
origin_time = convert_mactime_to_datetime(result["timestamp"])
elapsed_time = redirect_time - origin_time
elapsed_ms = elapsed_time.microseconds / 1000
@ -112,7 +113,7 @@ class SafariHistory(IOSExtraction):
"url": row[1],
"visit_id": row[2],
"timestamp": row[3],
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[3])),
"isodate": convert_mactime_to_iso(row[3]),
"redirect_source": row[4],
"redirect_destination": row[5],
"safari_history_db": os.path.relpath(history_path, self.target_path),

View File

@ -10,8 +10,7 @@ import plistlib
import sqlite3
from typing import Union
from mvt.common.utils import (check_for_links, convert_mactime_to_unix,
convert_timestamp_to_iso)
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from ..base import IOSExtraction
@ -114,8 +113,8 @@ class Shortcuts(IOSExtraction):
action["urls"] = [url.rstrip("',") for url in extracted_urls]
actions.append(action)
shortcut["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(shortcut.pop("created_date")))
shortcut["modified_date"] = convert_timestamp_to_iso(convert_mactime_to_unix(shortcut["modified_date"]))
shortcut["isodate"] = convert_mactime_to_iso(shortcut.pop("created_date"))
shortcut["modified_date"] = convert_mactime_to_iso(shortcut["modified_date"])
shortcut["parsed_actions"] = len(actions)
shortcut["action_urls"] = list(itertools.chain(*[action["urls"] for action in actions]))
self.results.append(shortcut)

View File

@ -8,8 +8,7 @@ import sqlite3
from base64 import b64encode
from typing import Union
from mvt.common.utils import (check_for_links, convert_mactime_to_unix,
convert_timestamp_to_iso)
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from ..base import IOSExtraction
@ -38,7 +37,8 @@ class SMS(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "sms_received",
"data": f"{record['service']}: {record['guid']} \"{text}\" from {record['phone_number']} ({record['account']})"
"data": f"{record['service']}: {record['guid']} \"{text}\" "
f"from {record['phone_number']} ({record['account']})"
}
def check_indicators(self) -> None:
@ -69,9 +69,9 @@ class SMS(IOSExtraction):
""")
# Force the query early to catch database issues
items = list(cur)
except sqlite3.DatabaseError as e:
except sqlite3.DatabaseError as exc:
conn.close()
if "database disk image is malformed" in str(e):
if "database disk image is malformed" in str(exc):
self._recover_sqlite_db_if_needed(self.file_path, forced=True)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
@ -84,7 +84,7 @@ class SMS(IOSExtraction):
""")
items = list(cur)
else:
raise e
raise exc
names = [description[0] for description in cur.description]
for item in items:
@ -100,25 +100,29 @@ class SMS(IOSExtraction):
message[names[index]] = value
# We convert Mac's ridiculous timestamp format.
message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(message["date"]))
message["direction"] = ("sent" if message.get("is_from_me", 0) == 1 else "received")
message["isodate"] = convert_mactime_to_iso(message["date"])
message["direction"] = ("sent" if message.get("is_from_me", 0) == 1
else "received")
# Sometimes "text" is None instead of empty string.
if not message.get("text", None):
message["text"] = ""
if message.get("text", "").startswith("ALERT: State-sponsored attackers may be targeting your iPhone"):
self.log.warn("Apple warning about state-sponsored attack received on the %s", message["isodate"])
self.log.warn("Apple warning about state-sponsored attack "
"received on the %s", message["isodate"])
self.results.append(message)
else:
# Extract links from the SMS message.
message_links = check_for_links(message.get("text", ""))
# If we find links in the messages or if they are empty we add them to the list.
# If we find links in the messages or if they are empty we add
# them to the list.
if message_links or message.get("text", "").strip() == "":
self.results.append(message)
cur.close()
conn.close()
self.log.info("Extracted a total of %d SMS messages containing links", len(self.results))
self.log.info("Extracted a total of %d SMS messages containing links",
len(self.results))

View File

@ -8,7 +8,7 @@ import sqlite3
from base64 import b64encode
from typing import Union
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_mactime_to_iso
from ..base import IOSExtraction
@ -36,8 +36,12 @@ class SMSAttachments(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "sms_attachment",
"data": f"{record['service']}: Attachment '{record['transfer_name']}' {record['direction']} from {record['phone_number']} "
f"with {record['total_bytes']} bytes (is_sticker: {record['is_sticker']}, has_user_info: {record['has_user_info']})"
"data": f"{record['service']}: Attachment "
f"'{record['transfer_name']}' {record['direction']} "
f"from {record['phone_number']} "
f"with {record['total_bytes']} bytes "
f"(is_sticker: {record['is_sticker']}, "
f"has_user_info: {record['has_user_info']})"
}
def run(self) -> None:
@ -70,16 +74,18 @@ class SMSAttachments(IOSExtraction):
value = b64encode(value).decode()
attachment[names[index]] = value
attachment["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(attachment["created_date"]))
attachment["start_date"] = convert_timestamp_to_iso(convert_mactime_to_unix(attachment["start_date"]))
attachment["isodate"] = convert_mactime_to_iso(attachment["created_date"])
attachment["start_date"] = convert_mactime_to_iso(attachment["start_date"])
attachment["direction"] = ("sent" if attachment["is_outgoing"] == 1 else "received")
attachment["has_user_info"] = attachment["user_info"] is not None
attachment["service"] = attachment["service"] or "Unknown"
attachment["filename"] = attachment["filename"] or "NULL"
if (attachment["filename"].startswith("/var/tmp/") and attachment["filename"].endswith("-1")
if (attachment["filename"].startswith("/var/tmp/")
and attachment["filename"].endswith("-1")
and attachment["direction"] == "received"):
self.log.warn(f"Suspicious iMessage attachment '{attachment['filename']}' on {attachment['isodate']}")
self.log.warn("Suspicious iMessage attachment %s on %s",
attachment['filename'], attachment['isodate'])
self.detected.append(attachment)
self.results.append(attachment)

View File

@ -5,10 +5,9 @@
import logging
import sqlite3
from datetime import datetime
from typing import Union
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_unix_to_iso
from ..base import IOSExtraction
@ -23,7 +22,6 @@ AUTH_VALUE_OLD = {
0: "denied",
1: "allowed"
}
AUTH_VALUES = {
0: "denied",
1: "unknown",
@ -60,9 +58,11 @@ class TCC(IOSExtraction):
def serialize(self, record: dict) -> Union[dict, list]:
if "last_modified" in record:
if "allowed_value" in record:
msg = f"Access to {record['service']} by {record['client']} {record['allowed_value']}"
msg = (f"Access to {record['service']} by {record['client']} "
f"{record['allowed_value']}")
else:
msg = f"Access to {record['service']} by {record['client']} {record['auth_value']}"
msg = (f"Access to {record['service']} by {record['client']} "
f"{record['auth_value']}")
return {
"timestamp": record["last_modified"],
@ -89,18 +89,21 @@ class TCC(IOSExtraction):
db_version = "v3"
try:
cur.execute("""SELECT
service, client, client_type, auth_value, auth_reason, last_modified
service, client, client_type, auth_value,
auth_reason, last_modified
FROM access;""")
except sqlite3.OperationalError:
# v2 version
try:
cur.execute("""SELECT
service, client, client_type, allowed, prompt_count, last_modified
service, client, client_type, allowed,
prompt_count, last_modified
FROM access;""")
db_version = "v2"
except sqlite3.OperationalError:
cur.execute("""SELECT
service, client, client_type, allowed, prompt_count
service, client, client_type, allowed,
prompt_count
FROM access;""")
db_version = "v1"
@ -108,18 +111,20 @@ class TCC(IOSExtraction):
service = row[0]
client = row[1]
client_type = row[2]
client_type_desc = "bundle_id" if client_type == 0 else "absolute_path"
client_type_desc = ("bundle_id" if client_type == 0
else "absolute_path")
if db_version == "v3":
auth_value = row[3]
auth_value_desc = AUTH_VALUES.get(auth_value, "")
auth_reason = row[4]
auth_reason_desc = AUTH_REASONS.get(auth_reason, "unknown")
last_modified = convert_timestamp_to_iso(datetime.utcfromtimestamp((row[5])))
last_modified = convert_unix_to_iso(row[5])
if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]:
device = "microphone" if service == "kTCCServiceMicrophone" else "camera"
self.log.info("Found client \"%s\" with access %s to %s on %s by %s",
client, auth_value_desc, device, last_modified, auth_reason_desc)
self.log.info("Found client \"%s\" with access %s to %s "
"on %s by %s", client, auth_value_desc,
device, last_modified, auth_reason_desc)
self.results.append({
"service": service,
@ -134,11 +139,13 @@ class TCC(IOSExtraction):
allowed_desc = AUTH_VALUE_OLD.get(allowed_value, "")
prompt_count = row[4]
if db_version == "v2":
last_modified = convert_timestamp_to_iso(datetime.utcfromtimestamp((row[5])))
last_modified = convert_unix_to_iso(row[5])
if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]:
device = "microphone" if service == "kTCCServiceMicrophone" else "camera"
self.log.info("Found client \"%s\" with access %s to %s at %s",
client, allowed_desc, device, last_modified)
self.log.info("Found client \"%s\" with access %s to "
"%s at %s", client, allowed_desc, device,
last_modified)
self.results.append({
"service": service,
"client": client,
@ -152,6 +159,7 @@ class TCC(IOSExtraction):
device = "microphone" if service == "kTCCServiceMicrophone" else "camera"
self.log.info("Found client \"%s\" with access %s to %s",
client, allowed_desc, device)
self.results.append({
"service": service,
"client": client,
@ -164,7 +172,8 @@ class TCC(IOSExtraction):
conn.close()
def run(self) -> None:
self._find_ios_database(backup_ids=TCC_BACKUP_IDS, root_paths=TCC_ROOT_PATHS)
self._find_ios_database(backup_ids=TCC_BACKUP_IDS,
root_paths=TCC_ROOT_PATHS)
self.log.info("Found TCC database at path: %s", self.file_path)
self.process_db(self.file_path)

View File

@ -8,7 +8,7 @@ import logging
import os
import sqlite3
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@ -20,7 +20,8 @@ WEBKIT_RESOURCELOADSTATICS_ROOT_PATHS = [
class WebkitResourceLoadStatistics(IOSExtraction):
"""This module extracts records from WebKit ResourceLoadStatistics observations.db."""
"""This module extracts records from WebKit ResourceLoadStatistics
observations.db."""
# TODO: Add serialize().
def __init__(self, file_path: str = None, target_path: str = None,
@ -49,7 +50,8 @@ class WebkitResourceLoadStatistics(IOSExtraction):
self.detected[key].append(item)
def _process_observations_db(self, db_path, key):
self.log.info("Found WebKit ResourceLoadStatistics observations.db file at path %s", db_path)
self.log.info("Found WebKit ResourceLoadStatistics observations.db "
"file at path %s", db_path)
self._recover_sqlite_db_if_needed(db_path)
@ -70,7 +72,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
"registrable_domain": row[1],
"last_seen": row[2],
"had_user_interaction": bool(row[3]),
"last_seen_isodate": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(row[2]))),
"last_seen_isodate": convert_datetime_to_iso(datetime.datetime.utcfromtimestamp(int(row[2]))),
})
if len(self.results[key]) > 0:
@ -84,8 +86,10 @@ class WebkitResourceLoadStatistics(IOSExtraction):
key = f"{backup_file['domain']}/{WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH}"
if db_path:
self._process_observations_db(db_path=db_path, key=key)
except Exception as e:
self.log.info("Unable to search for WebKit observations.db: %s", e)
except Exception as exc:
self.log.info("Unable to search for WebKit observations.db: %s",
exc)
elif self.is_fs_dump:
for db_path in self._get_fs_files_from_patterns(WEBKIT_RESOURCELOADSTATICS_ROOT_PATHS):
self._process_observations_db(db_path=db_path, key=os.path.relpath(db_path, self.target_path))
self._process_observations_db(db_path=db_path,
key=os.path.relpath(db_path, self.target_path))

View File

@ -7,7 +7,7 @@ import logging
import os
import plistlib
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
@ -93,7 +93,8 @@ class WebkitSessionResourceLog(IOSExtraction):
redirect_path += ", ".join(destination_domains)
self.log.warning("Found HTTP redirect between suspicious domains: %s", redirect_path)
self.log.warning("Found HTTP redirect between suspicious "
"domains: %s", redirect_path)
def _extract_browsing_stats(self, log_path):
items = []
@ -114,8 +115,8 @@ class WebkitSessionResourceLog(IOSExtraction):
"subframe_under_origin": item.get("subframeUnderTopFrameOrigins", ""),
"subresource_under_origin": item.get("subresourceUnderTopFrameOrigins", ""),
"user_interaction": item.get("hadUserInteraction"),
"most_recent_interaction": convert_timestamp_to_iso(item["mostRecentUserInteraction"]),
"last_seen": convert_timestamp_to_iso(item["lastSeen"]),
"most_recent_interaction": convert_datetime_to_iso(item["mostRecentUserInteraction"]),
"last_seen": convert_datetime_to_iso(item["lastSeen"]),
})
return items
@ -127,13 +128,15 @@ class WebkitSessionResourceLog(IOSExtraction):
if not log_path:
continue
self.log.info("Found Safari browsing session resource log at path: %s", log_path)
self.log.info("Found Safari browsing session resource log at "
"path: %s", log_path)
self.results[log_path] = self._extract_browsing_stats(log_path)
elif self.is_fs_dump:
for log_path in self._get_fs_files_from_patterns(WEBKIT_SESSION_RESOURCE_LOG_ROOT_PATHS):
self.log.info("Found Safari browsing session resource log at path: %s", log_path)
self.log.info("Found Safari browsing session resource log at "
"path: %s", log_path)
key = os.path.relpath(log_path, self.target_path)
self.results[key] = self._extract_browsing_stats(log_path)
self.log.info("Extracted records from %d Safari browsing session resource logs",
len(self.results))
self.log.info("Extracted records from %d Safari browsing session "
"resource logs", len(self.results))

View File

@ -7,8 +7,7 @@ import logging
import sqlite3
from typing import Union
from mvt.common.utils import (check_for_links, convert_mactime_to_unix,
convert_timestamp_to_iso)
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from ..base import IOSExtraction
@ -62,7 +61,8 @@ class Whatsapp(IOSExtraction):
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
# Query all messages and join tables which can contain media attachments and links
# Query all messages and join tables which can contain media attachments
# and links.
cur.execute("""
SELECT
ZWAMESSAGE.*,
@ -84,13 +84,15 @@ class Whatsapp(IOSExtraction):
for index, value in enumerate(message_row):
message[names[index]] = value
message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(message.get("ZMESSAGEDATE")))
message["isodate"] = convert_mactime_to_iso(message.get("ZMESSAGEDATE"))
message["ZTEXT"] = message["ZTEXT"] if message["ZTEXT"] else ""
# Extract links from the WhatsApp message. URLs can be stored in multiple fields/columns.
# Extract links from the WhatsApp message. URLs can be stored in
# multiple fields/columns.
# Check each of them!
message_links = []
fields_with_links = ["ZTEXT", "ZMATCHEDTEXT", "ZMEDIAURL", "ZCONTENT1", "ZCONTENT2"]
fields_with_links = ["ZTEXT", "ZMATCHEDTEXT", "ZMEDIAURL",
"ZCONTENT1", "ZCONTENT2"]
for field in fields_with_links:
if message.get(field):
message_links.extend(check_for_links(message.get(field, "")))
@ -98,10 +100,12 @@ class Whatsapp(IOSExtraction):
# Remove WhatsApp internal media URLs.
filtered_links = []
for link in message_links:
if not (link.startswith("https://mmg-fna.whatsapp.net/") or link.startswith("https://mmg.whatsapp.net/")):
if not (link.startswith("https://mmg-fna.whatsapp.net/")
or link.startswith("https://mmg.whatsapp.net/")):
filtered_links.append(link)
# If we find messages with links, or if there's an empty message we add it to the results list.
# If we find messages with links, or if there's an empty message
# we add it to the results list.
if filtered_links or (message.get("ZTEXT") or "").strip() == "":
message["links"] = list(set(filtered_links))
self.results.append(message)
@ -109,4 +113,5 @@ class Whatsapp(IOSExtraction):
cur.close()
conn.close()
self.log.info("Extracted a total of %d WhatsApp messages containing links", len(self.results))
self.log.info("Extracted a total of %d WhatsApp messages containing "
"links", len(self.results))

View File

@ -9,7 +9,7 @@ import sqlite3
from pathlib import Path
from typing import Union
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_mactime_to_iso
from .base import IOSExtraction
@ -55,14 +55,14 @@ class NetBase(IOSExtraction):
for row in cur:
# ZPROCESS records can be missing after the JOIN. Handle NULL timestamps.
if row[0] and row[1]:
first_isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[0]))
isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[1]))
first_isodate = convert_mactime_to_iso(row[0])
isodate = convert_mactime_to_iso(row[1])
else:
first_isodate = row[0]
isodate = row[1]
if row[11]:
live_timestamp = convert_timestamp_to_iso(convert_mactime_to_unix(row[11]))
live_timestamp = convert_mactime_to_iso(row[11])
else:
live_timestamp = ""

View File

@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import io
import logging
import os
import tarfile
@ -19,7 +18,7 @@ class TestBackupModule:
def test_module_folder(self):
backup_path = get_android_backup_folder()
mod = SMS(target_path=backup_path, log=logging)
mod = SMS(target_path=backup_path)
files = []
for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)):
for fname in subfiles:
@ -32,7 +31,7 @@ class TestBackupModule:
def test_module_file(self):
fpath = os.path.join(get_android_backup_folder(), "backup.ab")
mod = SMS(target_path=fpath, log=logging)
mod = SMS(target_path=fpath)
with open(fpath, "rb") as f:
data = f.read()
tardata = parse_backup_file(data)
@ -48,7 +47,7 @@ class TestBackupModule:
def test_module_file2(self):
fpath = os.path.join(get_android_backup_folder(), "backup2.ab")
mod = SMS(target_path=fpath, log=logging)
mod = SMS(target_path=fpath)
with open(fpath, "rb") as f:
data = f.read()
tardata = parse_backup_file(data, password="123456")
@ -64,7 +63,7 @@ class TestBackupModule:
def test_module_file3(self):
fpath = os.path.join(get_android_backup_folder(), "backup3.ab")
mod = SMS(target_path=fpath, log=logging)
mod = SMS(target_path=fpath)
with open(fpath, "rb") as f:
data = f.read()
tardata = parse_backup_file(data)

View File

@ -52,6 +52,7 @@ class TestBackupParsing:
m.update(ddata)
assert m.hexdigest() == "33e73df2ede9798dcb3a85c06200ee41c8f52dd2f2e50ffafcceb0407bc13e3a"
sms = parse_tar_for_sms(ddata)
print(sms)
assert isinstance(sms, list)
assert len(sms) == 1
assert len(sms[0]["links"]) == 1

View File

@ -3,7 +3,6 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from pathlib import Path
@ -17,7 +16,7 @@ class TestAppopsModule:
def test_appops_parsing(self):
fpath = os.path.join(get_artifact_folder(), "android_data/bugreport/")
m = Appops(target_path=fpath, log=logging, results=[])
m = Appops(target_path=fpath)
folder_files = []
parent_path = Path(fpath).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(fpath)):

View File

@ -14,6 +14,7 @@ class TestDumpsysParsing:
file = get_artifact("android_data/dumpsys_appops.txt")
with open(file) as f:
data = f.read()
res = parse_dumpsys_appops(data)
assert len(res) == 12

View File

@ -0,0 +1,19 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from mvt.common.utils import convert_mactime_to_iso, convert_unix_to_iso
TEST_DATE_EPOCH = 1626566400
TEST_DATE_ISO = "2021-07-18 00:00:00.000000"
TEST_DATE_MAC = TEST_DATE_EPOCH - 978307200
class TestDateConversions:
def test_convert_unix_to_iso(self):
assert convert_unix_to_iso(TEST_DATE_EPOCH) == TEST_DATE_ISO
def test_convert_mactime_to_iso(self):
assert convert_mactime_to_iso(TEST_DATE_MAC) == TEST_DATE_ISO

View File

@ -3,8 +3,6 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.module import run_module
from mvt.ios.modules.backup.backup_info import BackupInfo
@ -14,7 +12,7 @@ from ..utils import get_ios_backup_folder
class TestBackupInfoModule:
def test_manifest(self):
m = BackupInfo(target_path=get_ios_backup_folder(), log=logging)
m = BackupInfo(target_path=get_ios_backup_folder())
run_module(m)
assert m.results["Build Version"] == "18C66"
assert m.results["IMEI"] == "42"

View File

@ -15,15 +15,16 @@ from ..utils import get_ios_backup_folder
class TestDatausageModule:
def test_datausage(self):
m = Datausage(target_path=get_ios_backup_folder(), log=logging, results=[])
m = Datausage(target_path=get_ios_backup_folder())
run_module(m)
assert m.results[0]["isodate"][0:19] == "2019-08-27 15:08:09"
assert len(m.results) == 42
assert len(m.timeline) == 60
assert len(m.detected) == 0
def test_detection(self, indicator_file):
m = Datausage(target_path=get_ios_backup_folder(), log=logging, results=[])
ind = Indicators(log=logging)
m = Datausage(target_path=get_ios_backup_folder())
ind = Indicators(log=logging.getLogger())
ind.parse_stix2(indicator_file)
# Adds a file that exists in the manifest.
ind.ioc_collections[0]["processes"].append("CumulativeUsageTracker")

View File

@ -15,15 +15,15 @@ from ..utils import get_ios_backup_folder
class TestManifestModule:
def test_manifest(self):
m = Manifest(target_path=get_ios_backup_folder(), log=logging, results=[])
m = Manifest(target_path=get_ios_backup_folder())
run_module(m)
assert len(m.results) == 3721
assert len(m.timeline) == 5881
assert len(m.detected) == 0
def test_detection(self, indicator_file):
m = Manifest(target_path=get_ios_backup_folder(), log=logging, results=[])
ind = Indicators(log=logging)
m = Manifest(target_path=get_ios_backup_folder())
ind = Indicators(log=logging.getLogger())
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["file_names"].append("com.apple.CoreBrightness.plist")
m.indicators = ind

View File

@ -15,7 +15,7 @@ from ..utils import get_ios_backup_folder
class TestSafariBrowserStateModule:
def test_parsing(self):
m = SafariBrowserState(target_path=get_ios_backup_folder(), log=logging, results=[])
m = SafariBrowserState(target_path=get_ios_backup_folder())
m.is_backup = True
run_module(m)
assert len(m.results) == 1
@ -23,9 +23,9 @@ class TestSafariBrowserStateModule:
assert len(m.detected) == 0
def test_detection(self, indicator_file):
m = SafariBrowserState(target_path=get_ios_backup_folder(), log=logging, results=[])
m = SafariBrowserState(target_path=get_ios_backup_folder())
m.is_backup = True
ind = Indicators(log=logging)
ind = Indicators(log=logging.getLogger())
ind.parse_stix2(indicator_file)
# Adds a file that exists in the manifest.
ind.ioc_collections[0]["domains"].append("en.wikipedia.org")

View File

@ -15,15 +15,15 @@ from ..utils import get_ios_backup_folder
class TestSMSModule:
def test_sms(self):
m = SMS(target_path=get_ios_backup_folder(), log=logging, results=[])
m = SMS(target_path=get_ios_backup_folder())
run_module(m)
assert len(m.results) == 1
assert len(m.timeline) == 1
assert len(m.detected) == 0
def test_detection(self, indicator_file):
m = SMS(target_path=get_ios_backup_folder(), log=logging, results=[])
ind = Indicators(log=logging)
m = SMS(target_path=get_ios_backup_folder())
ind = Indicators(log=logging.getLogger())
ind.parse_stix2(indicator_file)
# Adds a file that exists in the manifest.
ind.ioc_collections[0]["domains"].append("badbadbad.example.org")

View File

@ -15,7 +15,7 @@ from ..utils import get_ios_backup_folder
class TestTCCtModule:
def test_tcc(self):
m = TCC(target_path=get_ios_backup_folder(), log=logging, results=[])
m = TCC(target_path=get_ios_backup_folder())
run_module(m)
assert len(m.results) == 11
assert len(m.timeline) == 11
@ -25,8 +25,8 @@ class TestTCCtModule:
assert m.results[0]["auth_value"] == "allowed"
def test_tcc_detection(self, indicator_file):
m = TCC(target_path=get_ios_backup_folder(), log=logging, results=[])
ind = Indicators(log=logging)
m = TCC(target_path=get_ios_backup_folder())
ind = Indicators(log=logging.getLogger())
ind.parse_stix2(indicator_file)
m.indicators = ind
run_module(m)