diff --git a/mvt/android/cmd_check_backup.py b/mvt/android/cmd_check_backup.py index c0a200b..df4f845 100644 --- a/mvt/android/cmd_check_backup.py +++ b/mvt/android/cmd_check_backup.py @@ -58,8 +58,8 @@ class CmdAndroidCheckBackup(Command): except InvalidBackupPassword: log.critical("Invalid backup password") sys.exit(1) - except AndroidBackupParsingError as e: - log.critical("Impossible to parse this backup file: %s", e) + except AndroidBackupParsingError as exc: + log.critical("Impossible to parse this backup file: %s", exc) log.critical("Please use Android Backup Extractor (ABE) instead") sys.exit(1) diff --git a/mvt/android/cmd_download_apks.py b/mvt/android/cmd_download_apks.py index 3c20c36..dcea520 100644 --- a/mvt/android/cmd_download_apks.py +++ b/mvt/android/cmd_download_apks.py @@ -83,9 +83,9 @@ class DownloadAPKs(AndroidExtraction): remote_path) self._adb_reconnect() return None - except Exception as e: + except Exception as exc: log.exception("Failed to pull package file from %s: %s", - remote_path, e) + remote_path, exc) self._adb_reconnect() return None diff --git a/mvt/android/modules/adb/base.py b/mvt/android/modules/adb/base.py index 5721d36..1fb3c36 100644 --- a/mvt/android/modules/adb/base.py +++ b/mvt/android/modules/adb/base.py @@ -101,8 +101,8 @@ class AndroidExtraction(MVTModule): self.log.error("Unable to connect to the device over USB. " "Try to unplug, plug the device and start again.") sys.exit(-1) - except OSError as e: - if e.errno == 113 and self.serial: + except OSError as exc: + if exc.errno == 113 and self.serial: self.log.critical("Unable to connect to the device %s: " "did you specify the correct IP addres?", self.serial) @@ -185,12 +185,12 @@ class AndroidExtraction(MVTModule): """ try: self.device.pull(remote_path, local_path, progress_callback) - except AdbCommandFailureException as e: + except AdbCommandFailureException as exc: if retry_root: self._adb_download_root(remote_path, local_path, progress_callback) else: - raise Exception(f"Unable to download file {remote_path}: {e}") + raise Exception(f"Unable to download file {remote_path}: {exc}") from exc def _adb_download_root(self, remote_path: str, local_path: str, progress_callback: Callable = None) -> None: @@ -222,8 +222,8 @@ class AndroidExtraction(MVTModule): # Delete the copy on /sdcard/. self._adb_command(f"rm -rf {new_remote_path}") - except AdbCommandFailureException as e: - raise Exception(f"Unable to download file {remote_path}: {e}") + except AdbCommandFailureException as exc: + raise Exception(f"Unable to download file {remote_path}: {exc}") from exc def _adb_process_file(self, remote_path: str, process_routine: Callable) -> None: diff --git a/mvt/android/modules/adb/chrome_history.py b/mvt/android/modules/adb/chrome_history.py index 8a51bbb..d104713 100644 --- a/mvt/android/modules/adb/chrome_history.py +++ b/mvt/android/modules/adb/chrome_history.py @@ -8,8 +8,8 @@ import os import sqlite3 from typing import Union -from mvt.common.utils import (convert_chrometime_to_unix, - convert_timestamp_to_iso) +from mvt.common.utils import (convert_chrometime_to_datetime, + convert_datetime_to_iso) from .base import AndroidExtraction @@ -70,7 +70,7 @@ class ChromeHistory(AndroidExtraction): "url": item[1], "visit_id": item[2], "timestamp": item[3], - "isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(item[3])), + "isodate": convert_datetime_to_iso(convert_chrometime_to_datetime(item[3])), "redirect_source": item[4], }) @@ -84,5 +84,5 @@ class ChromeHistory(AndroidExtraction): try: self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH), self._parse_db) - except Exception as e: - self.log.error(e) + except Exception as exc: + self.log.error(exc) diff --git a/mvt/android/modules/adb/files.py b/mvt/android/modules/adb/files.py index eb43175..090e740 100644 --- a/mvt/android/modules/adb/files.py +++ b/mvt/android/modules/adb/files.py @@ -3,13 +3,12 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import datetime import logging import os import stat from typing import Union -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_unix_to_iso from .base import AndroidExtraction @@ -79,7 +78,7 @@ class Files(AndroidExtraction): for file_line in output.splitlines(): [unix_timestamp, mode, size, owner, group, full_path] = file_line.rstrip().split(" ", 5) - mod_time = convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(float(unix_timestamp)))) + mod_time = convert_unix_to_iso(unix_timestamp) self.results.append({ "path": full_path, diff --git a/mvt/android/modules/adb/packages.py b/mvt/android/modules/adb/packages.py index abf206b..628a442 100644 --- a/mvt/android/modules/adb/packages.py +++ b/mvt/android/modules/adb/packages.py @@ -137,8 +137,8 @@ class Packages(AndroidExtraction): results = virustotal_lookup(hashes[i]) except VTNoKey: return - except VTQuotaExceeded as e: - print("Unable to continue: %s", e) + except VTQuotaExceeded as exc: + print("Unable to continue: %s", exc) break if not results: diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index 59e757d..9ddf79e 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -11,7 +11,7 @@ from typing import Union from mvt.android.parsers.backup import (AndroidBackupParsingError, parse_tar_for_sms) from mvt.common.module import InsufficientPrivileges -from mvt.common.utils import check_for_links, convert_timestamp_to_iso +from mvt.common.utils import check_for_links, convert_unix_to_iso from .base import AndroidExtraction @@ -99,7 +99,7 @@ class SMS(AndroidExtraction): message[names[index]] = value message["direction"] = ("received" if message["incoming"] == 1 else "sent") - message["isodate"] = convert_timestamp_to_iso(message["timestamp"]) + message["isodate"] = convert_unix_to_iso(message["timestamp"]) # If we find links in the messages or if they are empty we add # them to the list of results. @@ -137,11 +137,11 @@ class SMS(AndroidExtraction): def run(self) -> None: try: - if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))): + if self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH)): self.sms_db_type = 1 self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db) - elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))): + elif self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH)): self.sms_db_type = 2 self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db) diff --git a/mvt/android/modules/adb/whatsapp.py b/mvt/android/modules/adb/whatsapp.py index ec8e5ee..a6cde26 100644 --- a/mvt/android/modules/adb/whatsapp.py +++ b/mvt/android/modules/adb/whatsapp.py @@ -9,7 +9,7 @@ import os import sqlite3 from typing import Union -from mvt.common.utils import check_for_links, convert_timestamp_to_iso +from mvt.common.utils import check_for_links, convert_unix_to_iso from .base import AndroidExtraction @@ -71,7 +71,7 @@ class Whatsapp(AndroidExtraction): continue message["direction"] = ("send" if message["key_from_me"] == 1 else "received") - message["isodate"] = convert_timestamp_to_iso(message["timestamp"]) + message["isodate"] = convert_unix_to_iso(message["timestamp"]) # If we find links in the messages or if they are empty we add them # to the list. @@ -92,5 +92,5 @@ class Whatsapp(AndroidExtraction): try: self._adb_process_file(os.path.join("/", WHATSAPP_PATH), self._parse_db) - except Exception as e: - self.log.error(e) + except Exception as exc: + self.log.error(exc) diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py index a1f65fe..a5a4098 100644 --- a/mvt/android/parsers/backup.py +++ b/mvt/android/parsers/backup.py @@ -13,7 +13,7 @@ from cryptography.hazmat.primitives import hashes, padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC -from mvt.common.utils import check_for_links, convert_timestamp_to_iso +from mvt.common.utils import check_for_links, convert_unix_to_iso PBKDF2_KEY_SIZE = 32 @@ -94,8 +94,8 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_checksum_length = ord(key_blob.read(1)) master_key_checksum = key_blob.read(master_key_checksum_length) - except TypeError: - raise InvalidBackupPassword() + except TypeError as exc: + raise InvalidBackupPassword() from exc # Handle quirky encoding of master key bytes in Android original Java crypto code. if format_version > 1: @@ -174,8 +174,8 @@ def parse_backup_file(data, password=None): if is_compressed: try: tar_data = zlib.decompress(tar_data) - except zlib.error: - raise AndroidBackupParsingError("Impossible to decompress the backup file") + except zlib.error as exc: + raise AndroidBackupParsingError("Impossible to decompress the backup file") from exc return tar_data @@ -216,8 +216,7 @@ def parse_sms_file(data): message_links = check_for_links(entry["body"]) - utc_timestamp = datetime.datetime.utcfromtimestamp(int(entry["date"]) / 1000) - entry["isodate"] = convert_timestamp_to_iso(utc_timestamp) + entry["isodate"] = convert_unix_to_iso(int(entry["date"]) / 1000) entry["direction"] = ("sent" if int(entry["date_sent"]) else "received") # If we find links in the messages or if they are empty we add them to diff --git a/mvt/android/parsers/dumpsys.py b/mvt/android/parsers/dumpsys.py index 4e5dd5e..deea1ab 100644 --- a/mvt/android/parsers/dumpsys.py +++ b/mvt/android/parsers/dumpsys.py @@ -6,7 +6,7 @@ import re from datetime import datetime -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso def parse_dumpsys_accessibility(output: str) -> list: @@ -357,7 +357,7 @@ def parse_dumpsys_appops(output: str) -> list: entry["type"] = line[line.find("[")+1:line.find("]")] try: - entry["timestamp"] = convert_timestamp_to_iso( + entry["timestamp"] = convert_datetime_to_iso( datetime.strptime( line[line.find("]")+1:line.find("(")].strip(), "%Y-%m-%d %H:%M:%S.%f")) diff --git a/mvt/common/command.py b/mvt/common/command.py index 518ab46..024dae3 100644 --- a/mvt/common/command.py +++ b/mvt/common/command.py @@ -13,7 +13,7 @@ from typing import Callable from mvt.common.indicators import Indicators from mvt.common.module import run_module, save_timeline -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso from mvt.common.version import MVT_VERSION @@ -48,9 +48,9 @@ class Command: if self.results_path and not os.path.exists(self.results_path): try: os.makedirs(self.results_path) - except Exception as e: + except Exception as exc: self.log.critical("Unable to create output folder %s: %s", - self.results_path, e) + self.results_path, exc) sys.exit(1) def _add_log_file_handler(self, logger: logging.Logger) -> None: @@ -88,7 +88,7 @@ class Command: info = { "target_path": target_path, "mvt_version": MVT_VERSION, - "date": convert_timestamp_to_iso(datetime.now()), + "date": convert_datetime_to_iso(datetime.now()), "ioc_files": [], "hashes": [], } diff --git a/mvt/common/module.py b/mvt/common/module.py index 754ece6..b28e157 100644 --- a/mvt/common/module.py +++ b/mvt/common/module.py @@ -98,9 +98,10 @@ class MVTModule: with open(results_json_path, "w", encoding="utf-8") as handle: try: json.dump(self.results, handle, indent=4, default=str) - except Exception as e: - self.log.error("Unable to store results of module %s to file %s: %s", - self.__class__.__name__, results_file_name, e) + except Exception as exc: + self.log.error("Unable to store results of module %s " + "to file %s: %s", self.__class__.__name__, + results_file_name, exc) if self.detected: detected_file_name = f"{name}_detected.json" @@ -159,18 +160,18 @@ def run_module(module: Callable) -> None: except NotImplementedError: module.log.exception("The run() procedure of module %s was not implemented yet!", module.__class__.__name__) - except InsufficientPrivileges as e: + except InsufficientPrivileges as exc: module.log.info("Insufficient privileges for module %s: %s", - module.__class__.__name__, e) - except DatabaseNotFoundError as e: + module.__class__.__name__, exc) + except DatabaseNotFoundError as exc: module.log.info("There might be no data to extract by module %s: %s", - module.__class__.__name__, e) - except DatabaseCorruptedError as e: + module.__class__.__name__, exc) + except DatabaseCorruptedError as exc: module.log.error("The %s module database seems to be corrupted: %s", - module.__class__.__name__, e) - except Exception as e: + module.__class__.__name__, exc) + except Exception as exc: module.log.exception("Error in running extraction from module %s: %s", - module.__class__.__name__, e) + module.__class__.__name__, exc) else: try: module.check_indicators() diff --git a/mvt/common/utils.py b/mvt/common/utils.py index 0313769..025408c 100644 --- a/mvt/common/utils.py +++ b/mvt/common/utils.py @@ -6,16 +6,18 @@ import datetime import hashlib import re +from typing import Union -def convert_mactime_to_unix(timestamp, from_2001: bool = True): - """Converts Mac Standard Time to a Unix timestamp. +def convert_mactime_to_datetime(timestamp: Union[int, float], + from_2001: bool = True): + """Converts Mac Standard Time to a datetime. :param timestamp: MacTime timestamp (either int or float). :type timestamp: int :param from_2001: bool: Whether to (Default value = True) :param from_2001: Default value = True) - :returns: Unix epoch timestamp. + :returns: datetime. """ if not timestamp: @@ -37,12 +39,12 @@ def convert_mactime_to_unix(timestamp, from_2001: bool = True): return None -def convert_chrometime_to_unix(timestamp: int) -> int: - """Converts Chrome timestamp to a Unix timestamp. +def convert_chrometime_to_datetime(timestamp: int) -> int: + """Converts Chrome timestamp to a datetime. :param timestamp: Chrome timestamp as int. :type timestamp: int - :returns: Unix epoch timestamp. + :returns: datetime. """ epoch_start = datetime.datetime(1601, 1, 1) @@ -50,19 +52,50 @@ def convert_chrometime_to_unix(timestamp: int) -> int: return epoch_start + delta -def convert_timestamp_to_iso(timestamp: str) -> str: - """Converts Unix timestamp to ISO string. +def convert_datetime_to_iso(datetime: datetime.datetime) -> str: + """Converts datetime to ISO string. - :param timestamp: Unix timestamp. - :type timestamp: int - :returns: ISO timestamp string in YYYY-mm-dd HH:MM:SS.ms format. + :param datetime: datetime. + :type datetime: datetime.datetime + :returns: ISO datetime string in YYYY-mm-dd HH:MM:SS.ms format. :rtype: str """ try: - return timestamp.strftime("%Y-%m-%d %H:%M:%S.%f") + return datetime.strftime("%Y-%m-%d %H:%M:%S.%f") except Exception: - return None + return "" + + +def convert_unix_to_iso(timestamp: int) -> str: + """Converts a unix epoch to ISO string. + + :param timestamp: Epoc timestamp to convert. + :type timestamp: int + :returns: ISO datetime string in YYYY-mm-dd HH:MM:SS.ms format. + :rtype: str + + """ + try: + return convert_datetime_to_iso(datetime.datetime.utcfromtimestamp(int(timestamp))) + except Exception: + return "" + + +def convert_mactime_to_iso(timestamp: int, from_2001: bool = True): + """Wraps two conversions from mactime to iso date. + + :param timestamp: MacTime timestamp (either int or float). + :type timestamp: int + :param from_2001: bool: Whether to (Default value = True) + :param from_2001: Default value = True) + :returns: ISO timestamp string in YYYY-mm-dd HH:MM:SS.ms format. + :rtype: str + + """ + + return convert_datetime_to_iso(convert_mactime_to_datetime(timestamp, + from_2001)) def check_for_links(text: str) -> list: diff --git a/mvt/ios/decrypt.py b/mvt/ios/decrypt.py index 1ed277b..eb65662 100644 --- a/mvt/ios/decrypt.py +++ b/mvt/ios/decrypt.py @@ -102,8 +102,8 @@ class DecryptBackup: domain, item, file_id, item_folder)) - except Exception as e: - log.error("Failed to decrypt file %s: %s", relative_path, e) + except Exception as exc: + log.error("Failed to decrypt file %s: %s", relative_path, exc) pool.close() pool.join() @@ -144,16 +144,16 @@ class DecryptBackup: self._backup = iOSbackup(udid=os.path.basename(self.backup_path), cleartextpassword=password, backuproot=os.path.dirname(self.backup_path)) - except Exception as e: - if isinstance(e, KeyError) and len(e.args) > 0 and e.args[0] == b"KEY": + except Exception as exc: + if isinstance(exc, KeyError) and len(exc.args) > 0 and exc.args[0] == b"KEY": log.critical("Failed to decrypt backup. Password is probably wrong.") - elif (isinstance(e, FileNotFoundError) - and os.path.basename(e.filename) == "Manifest.plist"): + elif (isinstance(exc, FileNotFoundError) + and os.path.basename(exc.filename) == "Manifest.plist"): log.critical("Failed to find a valid backup at %s. " "Did you point to the right backup path?", self.backup_path) else: - log.exception(e) + log.exception(exc) log.critical("Failed to decrypt backup. Did you provide the " "correct password? Did you point to the right " "backup path?") @@ -185,8 +185,8 @@ class DecryptBackup: self._backup = iOSbackup(udid=os.path.basename(self.backup_path), derivedkey=key_bytes_raw, backuproot=os.path.dirname(self.backup_path)) - except Exception as e: - log.exception(e) + except Exception as exc: + log.exception(exc) log.critical("Failed to decrypt backup. Did you provide the " "correct key file?") @@ -212,8 +212,8 @@ class DecryptBackup: try: with open(key_path, 'w', encoding="utf-8") as handle: handle.write(self._decryption_key) - except Exception as e: - log.exception(e) + except Exception as exc: + log.exception(exc) log.critical("Failed to write key to file: %s", key_path) return else: diff --git a/mvt/ios/modules/backup/backup_info.py b/mvt/ios/modules/backup/backup_info.py index bc994ef..e09bbf3 100644 --- a/mvt/ios/modules/backup/backup_info.py +++ b/mvt/ios/modules/backup/backup_info.py @@ -29,7 +29,8 @@ class BackupInfo(IOSExtraction): def run(self) -> None: info_path = os.path.join(self.target_path, "Info.plist") if not os.path.exists(info_path): - raise DatabaseNotFoundError("No Info.plist at backup path, unable to extract device information") + raise DatabaseNotFoundError("No Info.plist at backup path, unable " + "to extract device information") with open(info_path, "rb") as handle: info = plistlib.load(handle) @@ -57,5 +58,6 @@ class BackupInfo(IOSExtraction): if "Product Version" in info: latest = latest_ios_version() if info["Product Version"] != latest["version"]: - self.log.warning("This phone is running an outdated iOS version: %s (latest is %s)", + self.log.warning("This phone is running an outdated iOS " + "version: %s (latest is %s)", info["Product Version"], latest['version']) diff --git a/mvt/ios/modules/backup/configuration_profiles.py b/mvt/ios/modules/backup/configuration_profiles.py index 26b9360..3c1660f 100644 --- a/mvt/ios/modules/backup/configuration_profiles.py +++ b/mvt/ios/modules/backup/configuration_profiles.py @@ -9,7 +9,7 @@ import plistlib from base64 import b64encode from typing import Union -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -119,7 +119,7 @@ class ConfigurationProfiles(IOSExtraction): "relative_path": conf_file["relative_path"], "domain": conf_file["domain"], "plist": conf_plist, - "install_date": convert_timestamp_to_iso(conf_plist.get("InstallDate")), + "install_date": convert_datetime_to_iso(conf_plist.get("InstallDate")), }) self.log.info("Extracted details about %d configuration profiles", len(self.results)) diff --git a/mvt/ios/modules/backup/manifest.py b/mvt/ios/modules/backup/manifest.py index d2c9304..b40da84 100644 --- a/mvt/ios/modules/backup/manifest.py +++ b/mvt/ios/modules/backup/manifest.py @@ -11,7 +11,7 @@ import plistlib import sqlite3 from mvt.common.module import DatabaseNotFoundError -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso from ..base import IOSExtraction @@ -35,7 +35,8 @@ class Manifest(IOSExtraction): :param key: """ - return dictionary.get(key.encode("utf-8"), None) or dictionary.get(key, None) + return (dictionary.get(key.encode("utf-8"), None) + or dictionary.get(key, None)) @staticmethod def _convert_timestamp(timestamp_or_unix_time_int): @@ -45,17 +46,17 @@ class Manifest(IOSExtraction): """ if isinstance(timestamp_or_unix_time_int, datetime.datetime): - return convert_timestamp_to_iso(timestamp_or_unix_time_int) + return convert_datetime_to_iso(timestamp_or_unix_time_int) - timestamp = datetime.datetime.utcfromtimestamp(timestamp_or_unix_time_int) - return convert_timestamp_to_iso(timestamp) + return convert_unix_to_iso(timestamp_or_unix_time_int) def serialize(self, record: dict) -> []: records = [] if "modified" not in record or "status_changed" not in record: return records - for timestamp in set([record["created"], record["modified"], record["status_changed"]]): + for timestamp in set([record["created"], record["modified"], + record["status_changed"]]): macb = "" macb += "M" if timestamp == record["modified"] else "-" macb += "-" @@ -80,8 +81,11 @@ class Manifest(IOSExtraction): continue if result["domain"]: - if os.path.basename(result["relative_path"]) == "com.apple.CrashReporter.plist" and result["domain"] == "RootDomain": - self.log.warning("Found a potentially suspicious \"com.apple.CrashReporter.plist\" file created in RootDomain") + if (os.path.basename(result["relative_path"]) == "com.apple.CrashReporter.plist" + and result["domain"] == "RootDomain"): + self.log.warning("Found a potentially suspicious " + "\"com.apple.CrashReporter.plist\" " + "file created in RootDomain") self.detected.append(result) continue @@ -92,7 +96,8 @@ class Manifest(IOSExtraction): rel_path = result["relative_path"].lower() for ioc in self.indicators.get_iocs("domains"): if ioc["value"].lower() in rel_path: - self.log.warning("Found mention of domain \"%s\" in a backup file with path: %s", + self.log.warning("Found mention of domain \"%s\" in a " + "backup file with path: %s", ioc["value"], rel_path) self.detected.append(result) @@ -101,7 +106,8 @@ class Manifest(IOSExtraction): if not os.path.isfile(manifest_db_path): raise DatabaseNotFoundError("unable to find backup's Manifest.db") - self.log.info("Found Manifest.db database at path: %s", manifest_db_path) + self.log.info("Found Manifest.db database at path: %s", + manifest_db_path) conn = sqlite3.connect(manifest_db_path) cur = conn.cursor() @@ -128,19 +134,24 @@ class Manifest(IOSExtraction): file_metadata = self._get_key(file_plist, "$objects")[1] cleaned_metadata.update({ "created": self._convert_timestamp(self._get_key(file_metadata, "Birth")), - "modified": self._convert_timestamp(self._get_key(file_metadata, "LastModified")), - "status_changed": self._convert_timestamp(self._get_key(file_metadata, "LastStatusChange")), + "modified": self._convert_timestamp(self._get_key(file_metadata, + "LastModified")), + "status_changed": self._convert_timestamp(self._get_key(file_metadata, + "LastStatusChange")), "mode": oct(self._get_key(file_metadata, "Mode")), "owner": self._get_key(file_metadata, "UserID"), "size": self._get_key(file_metadata, "Size"), }) except Exception: - self.log.exception("Error reading manifest file metadata for file with ID %s and relative path %s", - file_data["fileID"], file_data["relativePath"]) + self.log.exception("Error reading manifest file metadata " + "for file with ID %s and relative path %s", + file_data["fileID"], + file_data["relativePath"]) self.results.append(cleaned_metadata) cur.close() conn.close() - self.log.info("Extracted a total of %d file metadata items", len(self.results)) + self.log.info("Extracted a total of %d file metadata items", + len(self.results)) diff --git a/mvt/ios/modules/backup/profile_events.py b/mvt/ios/modules/backup/profile_events.py index 0325460..f27f71d 100644 --- a/mvt/ios/modules/backup/profile_events.py +++ b/mvt/ios/modules/backup/profile_events.py @@ -7,7 +7,7 @@ import logging import plistlib from typing import Union -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -34,7 +34,8 @@ class ProfileEvents(IOSExtraction): "module": self.__class__.__name__, "event": "profile_operation", "data": f"Process {record.get('process')} started operation " - f"{record.get('operation')} of profile {record.get('profile_id')}" + f"{record.get('operation')} of profile " + f"{record.get('profile_id')}" } def check_indicators(self) -> None: @@ -75,7 +76,7 @@ class ProfileEvents(IOSExtraction): for key, value in event[key].items(): key = key.lower() if key == "timestamp": - result["timestamp"] = str(convert_timestamp_to_iso(value)) + result["timestamp"] = str(convert_datetime_to_iso(value)) else: result[key] = value @@ -89,13 +90,15 @@ class ProfileEvents(IOSExtraction): if not events_file_path: continue - self.log.info("Found MCProfileEvents.plist file at %s", events_file_path) + self.log.info("Found MCProfileEvents.plist file at %s", + events_file_path) with open(events_file_path, "rb") as handle: self.results.extend(self.parse_profile_events(handle.read())) for result in self.results: - self.log.info("On %s process \"%s\" started operation \"%s\" of profile \"%s\"", + self.log.info("On %s process \"%s\" started operation \"%s\" " + "of profile \"%s\"", result.get("timestamp"), result.get("process"), result.get("operation"), result.get("profile_id")) diff --git a/mvt/ios/modules/base.py b/mvt/ios/modules/base.py index e6fb243..8609507 100644 --- a/mvt/ios/modules/base.py +++ b/mvt/ios/modules/base.py @@ -44,8 +44,8 @@ class IOSExtraction(MVTModule): try: recover = False cur.execute("SELECT name FROM sqlite_master WHERE type='table';") - except sqlite3.DatabaseError as e: - if "database disk image is malformed" in str(e): + except sqlite3.DatabaseError as exc: + if "database disk image is malformed" in str(exc): recover = True finally: conn.close() @@ -102,8 +102,8 @@ class IOSExtraction(MVTModule): (relative_path,)) elif domain: cur.execute(f"{base_sql} domain = ?;", (domain,)) - except Exception as e: - raise DatabaseCorruptedError(f"failed to query Manifest.db: {e}") + except Exception as exc: + raise DatabaseCorruptedError(f"failed to query Manifest.db: {exc}") from exc for row in cur: yield { diff --git a/mvt/ios/modules/fs/analytics.py b/mvt/ios/modules/fs/analytics.py index a27056c..1c4af19 100644 --- a/mvt/ios/modules/fs/analytics.py +++ b/mvt/ios/modules/fs/analytics.py @@ -8,7 +8,7 @@ import plistlib import sqlite3 from typing import Union -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -18,7 +18,8 @@ ANALYTICS_DB_PATH = [ class Analytics(IOSExtraction): - """This module extracts information from the private/var/Keychains/Analytics/*.db files.""" + """This module extracts information from the + private/var/Keychains/Analytics/*.db files.""" def __init__(self, file_path: str = None, target_path: str = None, results_path: str = None, fast_mode: bool = False, @@ -47,16 +48,20 @@ class Analytics(IOSExtraction): ioc = self.indicators.check_process(value) if ioc: - self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s", - value, result["artifact"], result["timestamp"]) + self.log.warning("Found mention of a malicious process " + "\"%s\" in %s file at %s", + value, result["artifact"], + result["timestamp"]) result["matched_indicator"] = ioc self.detected.append(result) continue ioc = self.indicators.check_domain(value) if ioc: - self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s", - value, result["artifact"], result["timestamp"]) + self.log.warning("Found mention of a malicious domain " + "\"%s\" in %s file at %s", + value, result["artifact"], + result["timestamp"]) result["matched_indicator"] = ioc self.detected.append(result) @@ -98,11 +103,11 @@ class Analytics(IOSExtraction): for row in cur: if row[0] and row[1]: - isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[0], False)) + isodate = convert_mactime_to_iso(row[0], False) data = plistlib.loads(row[1]) data["isodate"] = isodate elif row[0]: - isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[0], False)) + isodate = convert_mactime_to_iso(row[0], False) data = {} data["isodate"] = isodate elif row[1]: @@ -120,7 +125,8 @@ class Analytics(IOSExtraction): def process_analytics_dbs(self): for file_path in self._get_fs_files_from_patterns(ANALYTICS_DB_PATH): self.file_path = file_path - self.log.info("Found Analytics database file at path: %s", file_path) + self.log.info("Found Analytics database file at path: %s", + file_path) self._extract_analytics_data() def run(self) -> None: diff --git a/mvt/ios/modules/fs/filesystem.py b/mvt/ios/modules/fs/filesystem.py index 1d7fc21..4cca600 100644 --- a/mvt/ios/modules/fs/filesystem.py +++ b/mvt/ios/modules/fs/filesystem.py @@ -8,7 +8,7 @@ import logging import os from typing import Union -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_unix_to_iso from ..base import IOSExtraction @@ -56,7 +56,9 @@ class Filesystem(IOSExtraction): for ioc in self.indicators.get_iocs("processes"): parts = result["path"].split("/") if ioc["value"] in parts: - self.log.warning("Found known suspicious process name mentioned in file at path \"%s\" matching indicators from \"%s\"", + self.log.warning("Found known suspicious process name " + "mentioned in file at path \"%s\" " + "matching indicators from \"%s\"", result["path"], ioc["name"]) result["matched_indicator"] = ioc self.detected.append(result) @@ -68,7 +70,7 @@ class Filesystem(IOSExtraction): dir_path = os.path.join(root, dir_name) result = { "path": os.path.relpath(dir_path, self.target_path), - "modified": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(dir_path).st_mtime)), + "modified": convert_unix_to_iso(os.stat(dir_path).st_mtime), } except Exception: continue @@ -80,7 +82,7 @@ class Filesystem(IOSExtraction): file_path = os.path.join(root, file_name) result = { "path": os.path.relpath(file_path, self.target_path), - "modified": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(file_path).st_mtime)), + "modified": convert_unix_to_iso(os.stat(file_path).st_mtime), } except Exception: continue diff --git a/mvt/ios/modules/fs/net_netusage.py b/mvt/ios/modules/fs/net_netusage.py index 9e6a016..9aa5576 100644 --- a/mvt/ios/modules/fs/net_netusage.py +++ b/mvt/ios/modules/fs/net_netusage.py @@ -35,8 +35,9 @@ class Netusage(NetBase): self.log.info("Found NetUsage database at path: %s", self.file_path) try: self._extract_net_data() - except sqlite3.OperationalError as e: - self.log.info("Skipping this NetUsage database because it seems empty or malformed: %s", e) + except sqlite3.OperationalError as exc: + self.log.info("Skipping this NetUsage database because " + "it seems empty or malformed: %s", exc) continue self._find_suspicious_processes() diff --git a/mvt/ios/modules/fs/safari_favicon.py b/mvt/ios/modules/fs/safari_favicon.py index 8023fa4..a68665a 100644 --- a/mvt/ios/modules/fs/safari_favicon.py +++ b/mvt/ios/modules/fs/safari_favicon.py @@ -7,7 +7,7 @@ import logging import sqlite3 from typing import Union -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -33,7 +33,8 @@ class SafariFavicon(IOSExtraction): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "safari_favicon", - "data": f"Safari favicon from {record['url']} with icon URL {record['icon_url']} ({record['type']})", + "data": f"Safari favicon from {record['url']} with icon URL " + f"{record['icon_url']} ({record['type']})", } def check_indicators(self) -> None: @@ -69,7 +70,7 @@ class SafariFavicon(IOSExtraction): "url": row[0], "icon_url": row[1], "timestamp": row[2], - "isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[2])), + "isodate": convert_mactime_to_iso(row[2]), "type": "valid", "safari_favicon_db_path": file_path, }) @@ -88,7 +89,7 @@ class SafariFavicon(IOSExtraction): "url": row[0], "icon_url": row[1], "timestamp": row[2], - "isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[2])), + "isodate": convert_mactime_to_iso(row[2]), "type": "rejected", "safari_favicon_db_path": file_path, }) @@ -98,8 +99,10 @@ class SafariFavicon(IOSExtraction): def run(self) -> None: for file_path in self._get_fs_files_from_patterns(SAFARI_FAVICON_ROOT_PATHS): - self.log.info("Found Safari favicon cache database at path: %s", file_path) + self.log.info("Found Safari favicon cache database at path: %s", + file_path) self._process_favicon_db(file_path) - self.log.info("Extracted a total of %d favicon records", len(self.results)) + self.log.info("Extracted a total of %d favicon records", + len(self.results)) self.results = sorted(self.results, key=lambda x: x["isodate"]) diff --git a/mvt/ios/modules/fs/shutdownlog.py b/mvt/ios/modules/fs/shutdownlog.py index 06f509d..8771b18 100644 --- a/mvt/ios/modules/fs/shutdownlog.py +++ b/mvt/ios/modules/fs/shutdownlog.py @@ -6,7 +6,7 @@ import logging from typing import Union -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -31,7 +31,8 @@ class ShutdownLog(IOSExtraction): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "shutdown", - "data": f"Client {record['client']} with PID {record['pid']} was running when the device was shut down", + "data": f"Client {record['client']} with PID {record['pid']} " + "was running when the device was shut down", } def check_indicators(self) -> None: @@ -48,7 +49,8 @@ class ShutdownLog(IOSExtraction): for ioc in self.indicators.get_iocs("processes"): parts = result["client"].split("/") if ioc in parts: - self.log.warning("Found mention of a known malicious process \"%s\" in shutdown.log", + self.log.warning("Found mention of a known malicious " + "process \"%s\" in shutdown.log", ioc) result["matched_indicator"] = ioc self.detected.append(result) @@ -74,8 +76,7 @@ class ShutdownLog(IOSExtraction): except Exception: mac_timestamp = 0 - timestamp = convert_mactime_to_unix(mac_timestamp, from_2001=False) - isodate = convert_timestamp_to_iso(timestamp) + isodate = convert_mactime_to_iso(mac_timestamp, from_2001=False) for current_process in current_processes: self.results.append({ diff --git a/mvt/ios/modules/fs/version_history.py b/mvt/ios/modules/fs/version_history.py index 670c0cd..4b57e8f 100644 --- a/mvt/ios/modules/fs/version_history.py +++ b/mvt/ios/modules/fs/version_history.py @@ -8,7 +8,7 @@ import json import logging from typing import Union -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -45,7 +45,7 @@ class IOSVersionHistory(IOSExtraction): "%Y-%m-%d %H:%M:%S.%f %z") timestamp_utc = timestamp.astimezone(datetime.timezone.utc) self.results.append({ - "isodate": convert_timestamp_to_iso(timestamp_utc), + "isodate": convert_datetime_to_iso(timestamp_utc), "os_version": log_line["os_version"], }) diff --git a/mvt/ios/modules/fs/webkit_base.py b/mvt/ios/modules/fs/webkit_base.py index b3579df..b944664 100644 --- a/mvt/ios/modules/fs/webkit_base.py +++ b/mvt/ios/modules/fs/webkit_base.py @@ -6,7 +6,7 @@ import datetime import os -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -35,9 +35,10 @@ class WebkitBase(IOSExtraction): name = name.replace("http_", "http://") name = name.replace("https_", "https://") url = name.split("_")[0] + utc_timestamp = datetime.datetime.utcfromtimestamp(os.stat(found_path).st_mtime) self.results.append({ "folder": key, "url": url, - "isodate": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(found_path).st_mtime)), + "isodate": convert_datetime_to_iso(utc_timestamp), }) diff --git a/mvt/ios/modules/fs/webkit_indexeddb.py b/mvt/ios/modules/fs/webkit_indexeddb.py index ab5d821..591ffae 100644 --- a/mvt/ios/modules/fs/webkit_indexeddb.py +++ b/mvt/ios/modules/fs/webkit_indexeddb.py @@ -35,7 +35,8 @@ class WebkitIndexedDB(WebkitBase): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "webkit_indexeddb", - "data": f"IndexedDB folder {record['folder']} containing file for URL {record['url']}", + "data": f"IndexedDB folder {record['folder']} containing " + f"file for URL {record['url']}", } def run(self) -> None: diff --git a/mvt/ios/modules/fs/webkit_localstorage.py b/mvt/ios/modules/fs/webkit_localstorage.py index e9f3ea7..6a0a354 100644 --- a/mvt/ios/modules/fs/webkit_localstorage.py +++ b/mvt/ios/modules/fs/webkit_localstorage.py @@ -33,10 +33,12 @@ class WebkitLocalStorage(WebkitBase): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "webkit_local_storage", - "data": f"WebKit Local Storage folder {record['folder']} containing file for URL {record['url']}", + "data": f"WebKit Local Storage folder {record['folder']} " + f"containing file for URL {record['url']}", } def run(self) -> None: self._process_webkit_folder(WEBKIT_LOCALSTORAGE_ROOT_PATHS) - self.log.info("Extracted a total of %d records from WebKit Local Storages", + self.log.info("Extracted a total of %d records from WebKit " + "Local Storages", len(self.results)) diff --git a/mvt/ios/modules/fs/webkit_safariviewservice.py b/mvt/ios/modules/fs/webkit_safariviewservice.py index ac204ac..c118f54 100644 --- a/mvt/ios/modules/fs/webkit_safariviewservice.py +++ b/mvt/ios/modules/fs/webkit_safariviewservice.py @@ -29,5 +29,6 @@ class WebkitSafariViewService(WebkitBase): def run(self) -> None: self._process_webkit_folder(WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS) - self.log.info("Extracted a total of %d records from WebKit SafariViewService WebsiteData", + self.log.info("Extracted a total of %d records from WebKit " + "SafariViewService WebsiteData", len(self.results)) diff --git a/mvt/ios/modules/mixed/calls.py b/mvt/ios/modules/mixed/calls.py index 15d634e..2c9b906 100644 --- a/mvt/ios/modules/mixed/calls.py +++ b/mvt/ios/modules/mixed/calls.py @@ -7,7 +7,7 @@ import logging import sqlite3 from typing import Union -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -35,7 +35,8 @@ class Calls(IOSExtraction): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "call", - "data": f"From {record['number']} using {record['provider']} during {record['duration']} seconds" + "data": f"From {record['number']} using {record['provider']} " + f"during {record['duration']} seconds" } def run(self) -> None: @@ -54,7 +55,7 @@ class Calls(IOSExtraction): for row in cur: self.results.append({ - "isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[0])), + "isodate": convert_mactime_to_iso(row[0]), "duration": row[1], "location": row[2], "number": row[3].decode("utf-8") if row[3] and row[3] is bytes else row[3], diff --git a/mvt/ios/modules/mixed/chrome_favicon.py b/mvt/ios/modules/mixed/chrome_favicon.py index 1f53da6..c28ab77 100644 --- a/mvt/ios/modules/mixed/chrome_favicon.py +++ b/mvt/ios/modules/mixed/chrome_favicon.py @@ -7,8 +7,8 @@ import logging import sqlite3 from typing import Union -from mvt.common.utils import (convert_chrometime_to_unix, - convert_timestamp_to_iso) +from mvt.common.utils import (convert_chrometime_to_datetime, + convert_datetime_to_iso) from ..base import IOSExtraction @@ -57,7 +57,8 @@ class ChromeFavicon(IOSExtraction): def run(self) -> None: self._find_ios_database(backup_ids=CHROME_FAVICON_BACKUP_IDS, root_paths=CHROME_FAVICON_ROOT_PATHS) - self.log.info("Found Chrome favicon cache database at path: %s", self.file_path) + self.log.info("Found Chrome favicon cache database at path: %s", + self.file_path) conn = sqlite3.connect(self.file_path) @@ -82,7 +83,7 @@ class ChromeFavicon(IOSExtraction): "url": row[0], "icon_url": row[1], "timestamp": last_timestamp, - "isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(last_timestamp)), + "isodate": convert_datetime_to_iso(convert_chrometime_to_datetime(last_timestamp)), }) cur.close() diff --git a/mvt/ios/modules/mixed/chrome_history.py b/mvt/ios/modules/mixed/chrome_history.py index 8c721dc..1ea68fc 100644 --- a/mvt/ios/modules/mixed/chrome_history.py +++ b/mvt/ios/modules/mixed/chrome_history.py @@ -7,8 +7,8 @@ import logging import sqlite3 from typing import Union -from mvt.common.utils import (convert_chrometime_to_unix, - convert_timestamp_to_iso) +from mvt.common.utils import (convert_chrometime_to_datetime, + convert_datetime_to_iso) from ..base import IOSExtraction @@ -37,7 +37,9 @@ class ChromeHistory(IOSExtraction): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "visit", - "data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, redirect source: {record['redirect_source']})" + "data": f"{record['id']} - {record['url']} " + f"(visit ID: {record['visit_id']}, " + f"redirect source: {record['redirect_source']})" } def check_indicators(self) -> None: @@ -53,7 +55,8 @@ class ChromeHistory(IOSExtraction): def run(self) -> None: self._find_ios_database(backup_ids=CHROME_HISTORY_BACKUP_IDS, root_paths=CHROME_HISTORY_ROOT_PATHS) - self.log.info("Found Chrome history database at path: %s", self.file_path) + self.log.info("Found Chrome history database at path: %s", + self.file_path) conn = sqlite3.connect(self.file_path) cur = conn.cursor() @@ -75,11 +78,12 @@ class ChromeHistory(IOSExtraction): "url": item[1], "visit_id": item[2], "timestamp": item[3], - "isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(item[3])), + "isodate": convert_datetime_to_iso(convert_chrometime_to_datetime(item[3])), "redirect_source": item[4], }) cur.close() conn.close() - self.log.info("Extracted a total of %d history items", len(self.results)) + self.log.info("Extracted a total of %d history items", + len(self.results)) diff --git a/mvt/ios/modules/mixed/contacts.py b/mvt/ios/modules/mixed/contacts.py index 4b54451..8514884 100644 --- a/mvt/ios/modules/mixed/contacts.py +++ b/mvt/ios/modules/mixed/contacts.py @@ -28,7 +28,8 @@ class Contacts(IOSExtraction): log=log, results=results) def run(self) -> None: - self._find_ios_database(backup_ids=CONTACTS_BACKUP_IDS, root_paths=CONTACTS_ROOT_PATHS) + self._find_ios_database(backup_ids=CONTACTS_BACKUP_IDS, + root_paths=CONTACTS_ROOT_PATHS) self.log.info("Found Contacts database at path: %s", self.file_path) conn = sqlite3.connect(self.file_path) diff --git a/mvt/ios/modules/mixed/firefox_favicon.py b/mvt/ios/modules/mixed/firefox_favicon.py index 858a532..8e4b87b 100644 --- a/mvt/ios/modules/mixed/firefox_favicon.py +++ b/mvt/ios/modules/mixed/firefox_favicon.py @@ -8,7 +8,7 @@ import sqlite3 from datetime import datetime from typing import Union -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_unix_to_iso from ..base import IOSExtraction @@ -36,7 +36,8 @@ class FirefoxFavicon(IOSExtraction): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "firefox_history", - "data": f"Firefox favicon {record['url']} when visiting {record['history_url']}", + "data": f"Firefox favicon {record['url']} " + f"when visiting {record['history_url']}", } def check_indicators(self) -> None: @@ -55,7 +56,8 @@ class FirefoxFavicon(IOSExtraction): def run(self) -> None: self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS, root_paths=FIREFOX_HISTORY_ROOT_PATHS) - self.log.info("Found Firefox favicon database at path: %s", self.file_path) + self.log.info("Found Firefox favicon database at path: %s", + self.file_path) conn = sqlite3.connect(self.file_path) cur = conn.cursor() @@ -81,7 +83,7 @@ class FirefoxFavicon(IOSExtraction): "width": item[2], "height": item[3], "type": item[4], - "isodate": convert_timestamp_to_iso(datetime.utcfromtimestamp(item[5])), + "isodate": convert_unix_to_iso(item[5]), "history_id": item[6], "history_url": item[7] }) @@ -89,4 +91,5 @@ class FirefoxFavicon(IOSExtraction): cur.close() conn.close() - self.log.info("Extracted a total of %d history items", len(self.results)) + self.log.info("Extracted a total of %d history items", + len(self.results)) diff --git a/mvt/ios/modules/mixed/firefox_history.py b/mvt/ios/modules/mixed/firefox_history.py index 8a457b4..0b7b1d3 100644 --- a/mvt/ios/modules/mixed/firefox_history.py +++ b/mvt/ios/modules/mixed/firefox_history.py @@ -5,10 +5,9 @@ import logging import sqlite3 -from datetime import datetime from typing import Union -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_unix_to_iso from ..base import IOSExtraction @@ -40,7 +39,8 @@ class FirefoxHistory(IOSExtraction): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "firefox_history", - "data": f"Firefox visit with ID {record['id']} to URL: {record['url']}", + "data": f"Firefox visit with ID {record['id']} " + f"to URL: {record['url']}", } def check_indicators(self) -> None: @@ -54,8 +54,10 @@ class FirefoxHistory(IOSExtraction): self.detected.append(result) def run(self) -> None: - self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS, root_paths=FIREFOX_HISTORY_ROOT_PATHS) - self.log.info("Found Firefox history database at path: %s", self.file_path) + self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS, + root_paths=FIREFOX_HISTORY_ROOT_PATHS) + self.log.info("Found Firefox history database at path: %s", + self.file_path) conn = sqlite3.connect(self.file_path) cur = conn.cursor() @@ -74,7 +76,7 @@ class FirefoxHistory(IOSExtraction): for row in cur: self.results.append({ "id": row[0], - "isodate": convert_timestamp_to_iso(datetime.utcfromtimestamp(row[1])), + "isodate": convert_unix_to_iso(row[1]), "url": row[2], "title": row[3], "i1000000s_local": row[4], @@ -84,4 +86,5 @@ class FirefoxHistory(IOSExtraction): cur.close() conn.close() - self.log.info("Extracted a total of %d history items", len(self.results)) + self.log.info("Extracted a total of %d history items", + len(self.results)) diff --git a/mvt/ios/modules/mixed/idstatuscache.py b/mvt/ios/modules/mixed/idstatuscache.py index bb76498..91f2fee 100644 --- a/mvt/ios/modules/mixed/idstatuscache.py +++ b/mvt/ios/modules/mixed/idstatuscache.py @@ -8,7 +8,7 @@ import logging import plistlib from typing import Union -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -37,7 +37,8 @@ class IDStatusCache(IOSExtraction): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "lookup", - "data": f"Lookup of {record['user']} within {record['package']} (Status {record['idstatus']})" + "data": f"Lookup of {record['user']} within {record['package']} " + f"(Status {record['idstatus']})" } def check_indicators(self) -> None: @@ -54,7 +55,8 @@ class IDStatusCache(IOSExtraction): continue if "\\x00\\x00" in result.get("user", ""): - self.log.warning("Found an ID Status Cache entry with suspicious patterns: %s", + self.log.warning("Found an ID Status Cache entry with " + "suspicious patterns: %s", result.get("user")) self.detected.append(result) @@ -77,7 +79,7 @@ class IDStatusCache(IOSExtraction): id_status_cache_entries.append({ "package": app, "user": entry.replace("\x00", "\\x00"), - "isodate": convert_timestamp_to_iso(convert_mactime_to_unix(lookup_date)), + "isodate": convert_mactime_to_iso(lookup_date), "idstatus": id_status, }) @@ -91,12 +93,15 @@ class IDStatusCache(IOSExtraction): if self.is_backup: self._find_ios_database(backup_ids=IDSTATUSCACHE_BACKUP_IDS) - self.log.info("Found IDStatusCache plist at path: %s", self.file_path) + self.log.info("Found IDStatusCache plist at path: %s", + self.file_path) self._extract_idstatuscache_entries(self.file_path) elif self.is_fs_dump: for idstatuscache_path in self._get_fs_files_from_patterns(IDSTATUSCACHE_ROOT_PATHS): self.file_path = idstatuscache_path - self.log.info("Found IDStatusCache plist at path: %s", self.file_path) + self.log.info("Found IDStatusCache plist at path: %s", + self.file_path) self._extract_idstatuscache_entries(self.file_path) - self.log.info("Extracted a total of %d ID Status Cache entries", len(self.results)) + self.log.info("Extracted a total of %d ID Status Cache entries", + len(self.results)) diff --git a/mvt/ios/modules/mixed/interactionc.py b/mvt/ios/modules/mixed/interactionc.py index 2f9393a..b307776 100644 --- a/mvt/ios/modules/mixed/interactionc.py +++ b/mvt/ios/modules/mixed/interactionc.py @@ -7,7 +7,7 @@ import logging import sqlite3 from typing import Union -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -59,9 +59,12 @@ class InteractionC(IOSExtraction): "timestamp": record[timestamp], "module": self.__class__.__name__, "event": timestamp, - "data": f"[{record['bundle_id']}] {record['account']} - from {record['sender_display_name']} " - f"({record['sender_identifier']}) to {record['recipient_display_name']} " - f"({record['recipient_identifier']}): {record['content']}" + "data": f"[{record['bundle_id']}] {record['account']} - " + f"from {record['sender_display_name']} " + f"({record['sender_identifier']}) " + f"to {record['recipient_display_name']} " + f"({record['recipient_identifier']}): " + f"{record['content']}" }) processed.append(record[timestamp]) @@ -133,8 +136,8 @@ class InteractionC(IOSExtraction): for row in cur: self.results.append({ - "start_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[0])), - "end_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[1])), + "start_date": convert_mactime_to_iso(row[0]), + "end_date": convert_mactime_to_iso(row[1]), "bundle_id": row[2], "account": row[3], "target_bundle_id": row[4], @@ -158,14 +161,14 @@ class InteractionC(IOSExtraction): "incoming_recipient_count": row[22], "incoming_sender_count": row[23], "outgoing_recipient_count": row[24], - "interactions_creation_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[25])) if row[25] else None, - "contacts_creation_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[26])) if row[26] else None, - "first_incoming_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[27])) if row[27] else None, - "first_incoming_sender_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[28])) if row[28] else None, - "first_outgoing_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[29])) if row[29] else None, - "last_incoming_sender_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[30])) if row[30] else None, - "last_incoming_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[31])) if row[31] else None, - "last_outgoing_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[32])) if row[32] else None, + "interactions_creation_date": convert_mactime_to_iso(row[25]) if row[25] else None, + "contacts_creation_date": convert_mactime_to_iso(row[26]) if row[26] else None, + "first_incoming_recipient_date": convert_mactime_to_iso(row[27]) if row[27] else None, + "first_incoming_sender_date": convert_mactime_to_iso(row[28]) if row[28] else None, + "first_outgoing_recipient_date": convert_mactime_to_iso(row[29]) if row[29] else None, + "last_incoming_sender_date": convert_mactime_to_iso(row[30]) if row[30] else None, + "last_incoming_recipient_date": convert_mactime_to_iso(row[31]) if row[31] else None, + "last_outgoing_recipient_date": convert_mactime_to_iso(row[32]) if row[32] else None, "custom_id": row[33], "location_uuid": row[35], "group_name": row[36], @@ -176,4 +179,5 @@ class InteractionC(IOSExtraction): cur.close() conn.close() - self.log.info("Extracted a total of %d InteractionC events", len(self.results)) + self.log.info("Extracted a total of %d InteractionC events", + len(self.results)) diff --git a/mvt/ios/modules/mixed/locationd.py b/mvt/ios/modules/mixed/locationd.py index d353723..a50bf7a 100644 --- a/mvt/ios/modules/mixed/locationd.py +++ b/mvt/ios/modules/mixed/locationd.py @@ -7,7 +7,7 @@ import logging import plistlib from typing import Union -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -66,8 +66,8 @@ class LocationdClients(IOSExtraction): ioc = self.indicators.check_process(proc_name) if ioc: - self.log.warning("Found a suspicious process name in LocationD entry %s", - result["package"]) + self.log.warning("Found a suspicious process name in " + "LocationD entry %s", result["package"]) result["matched_indicator"] = ioc self.detected.append(result) continue @@ -75,8 +75,8 @@ class LocationdClients(IOSExtraction): if "BundlePath" in result: ioc = self.indicators.check_file_path(result["BundlePath"]) if ioc: - self.log.warning("Found a suspicious file path in Location D: %s", - result["BundlePath"]) + self.log.warning("Found a suspicious file path in " + "Location D: %s", result["BundlePath"]) result["matched_indicator"] = ioc self.detected.append(result) continue @@ -84,8 +84,8 @@ class LocationdClients(IOSExtraction): if "Executable" in result: ioc = self.indicators.check_file_path(result["Executable"]) if ioc: - self.log.warning("Found a suspicious file path in Location D: %s", - result["Executable"]) + self.log.warning("Found a suspicious file path in " + "Location D: %s", result["Executable"]) result["matched_indicator"] = ioc self.detected.append(result) continue @@ -93,8 +93,8 @@ class LocationdClients(IOSExtraction): if "Registered" in result: ioc = self.indicators.check_file_path(result["Registered"]) if ioc: - self.log.warning("Found a suspicious file path in Location D: %s", - result["Registered"]) + self.log.warning("Found a suspicious file path in " + "Location D: %s", result["Registered"]) result["matched_indicator"] = ioc self.detected.append(result) continue @@ -108,19 +108,22 @@ class LocationdClients(IOSExtraction): result["package"] = key for timestamp in self.timestamps: if timestamp in result.keys(): - result[timestamp] = convert_timestamp_to_iso(convert_mactime_to_unix(result[timestamp])) + result[timestamp] = convert_mactime_to_iso(result[timestamp]) self.results.append(result) def run(self) -> None: if self.is_backup: self._find_ios_database(backup_ids=LOCATIOND_BACKUP_IDS) - self.log.info("Found Locationd Clients plist at path: %s", self.file_path) + self.log.info("Found Locationd Clients plist at path: %s", + self.file_path) self._extract_locationd_entries(self.file_path) elif self.is_fs_dump: for locationd_path in self._get_fs_files_from_patterns(LOCATIOND_ROOT_PATHS): self.file_path = locationd_path - self.log.info("Found Locationd Clients plist at path: %s", self.file_path) + self.log.info("Found Locationd Clients plist at path: %s", + self.file_path) self._extract_locationd_entries(self.file_path) - self.log.info("Extracted a total of %d Locationd Clients entries", len(self.results)) + self.log.info("Extracted a total of %d Locationd Clients entries", + len(self.results)) diff --git a/mvt/ios/modules/mixed/osanalytics_addaily.py b/mvt/ios/modules/mixed/osanalytics_addaily.py index e17ee3b..6cba43b 100644 --- a/mvt/ios/modules/mixed/osanalytics_addaily.py +++ b/mvt/ios/modules/mixed/osanalytics_addaily.py @@ -7,7 +7,7 @@ import logging import plistlib from typing import Union -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -20,7 +20,8 @@ OSANALYTICS_ADDAILY_ROOT_PATHS = [ class OSAnalyticsADDaily(IOSExtraction): - """Extract network usage information by process, from com.apple.osanalytics.addaily.plist""" + """Extract network usage information by process, + from com.apple.osanalytics.addaily.plist""" def __init__(self, file_path: str = None, target_path: str = None, results_path: str = None, fast_mode: bool = False, @@ -31,13 +32,14 @@ class OSAnalyticsADDaily(IOSExtraction): log=log, results=results) def serialize(self, record: dict) -> Union[dict, list]: - record_data = f"{record['package']} WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \ - f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}" return { "timestamp": record["ts"], "module": self.__class__.__name__, "event": "osanalytics_addaily", - "data": record_data, + "data": f"{record['package']} WIFI IN: {record['wifi_in']}, " + f"WIFI OUT: {record['wifi_out']} - " + f"WWAN IN: {record['wwan_in']}, " + f"WWAN OUT: {record['wwan_out']}", } def check_indicators(self) -> None: @@ -53,7 +55,8 @@ class OSAnalyticsADDaily(IOSExtraction): def run(self) -> None: self._find_ios_database(backup_ids=OSANALYTICS_ADDAILY_BACKUP_IDS, root_paths=OSANALYTICS_ADDAILY_ROOT_PATHS) - self.log.info("Found com.apple.osanalytics.addaily plist at path: %s", self.file_path) + self.log.info("Found com.apple.osanalytics.addaily plist at path: %s", + self.file_path) with open(self.file_path, "rb") as handle: file_plist = plistlib.load(handle) @@ -61,11 +64,12 @@ class OSAnalyticsADDaily(IOSExtraction): for app, values in file_plist.get("netUsageBaseline", {}).items(): self.results.append({ "package": app, - "ts": convert_timestamp_to_iso(values[0]), + "ts": convert_datetime_to_iso(values[0]), "wifi_in": values[1], "wifi_out": values[2], "wwan_in": values[3], "wwan_out": values[4], }) - self.log.info("Extracted a total of %d com.apple.osanalytics.addaily entries", len(self.results)) + self.log.info("Extracted a total of %d com.apple.osanalytics.addaily " + "entries", len(self.results)) diff --git a/mvt/ios/modules/mixed/safari_browserstate.py b/mvt/ios/modules/mixed/safari_browserstate.py index 38ae07d..a3b3796 100644 --- a/mvt/ios/modules/mixed/safari_browserstate.py +++ b/mvt/ios/modules/mixed/safari_browserstate.py @@ -10,8 +10,7 @@ import plistlib import sqlite3 from typing import Union -from mvt.common.utils import (convert_mactime_to_unix, - convert_timestamp_to_iso, keys_bytes_to_string) +from mvt.common.utils import convert_mactime_to_iso, keys_bytes_to_string from ..base import IOSExtraction @@ -117,9 +116,10 @@ class SafariBrowserState(IOSExtraction): "tab_title": row[0], "tab_url": row[1], "tab_visible_url": row[2], - "last_viewed_timestamp": convert_timestamp_to_iso(convert_mactime_to_unix(row[3])), + "last_viewed_timestamp": convert_mactime_to_iso(row[3]), "session_data": session_entries, - "safari_browser_state_db": os.path.relpath(db_path, self.target_path), + "safari_browser_state_db": os.path.relpath(db_path, + self.target_path), }) def run(self) -> None: @@ -129,12 +129,15 @@ class SafariBrowserState(IOSExtraction): if not browserstate_path: continue - self.log.info("Found Safari browser state database at path: %s", browserstate_path) + self.log.info("Found Safari browser state database at path: %s", + browserstate_path) self._process_browser_state_db(browserstate_path) elif self.is_fs_dump: for browserstate_path in self._get_fs_files_from_patterns(SAFARI_BROWSER_STATE_ROOT_PATHS): - self.log.info("Found Safari browser state database at path: %s", browserstate_path) + self.log.info("Found Safari browser state database at path: %s", + browserstate_path) self._process_browser_state_db(browserstate_path) - self.log.info("Extracted a total of %d tab records and %d session history entries", - len(self.results), self._session_history_count) + self.log.info("Extracted a total of %d tab records and %d session " + "history entries", len(self.results), + self._session_history_count) diff --git a/mvt/ios/modules/mixed/safari_history.py b/mvt/ios/modules/mixed/safari_history.py index 64d2944..e27adea 100644 --- a/mvt/ios/modules/mixed/safari_history.py +++ b/mvt/ios/modules/mixed/safari_history.py @@ -9,7 +9,8 @@ import sqlite3 from typing import Union from mvt.common.url import URL -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import (convert_mactime_to_datetime, + convert_mactime_to_iso) from ..base import IOSExtraction @@ -69,8 +70,8 @@ class SafariHistory(IOSExtraction): self.log.info("Found HTTP redirect to different domain: \"%s\" -> \"%s\"", origin_domain, redirect_domain) - redirect_time = convert_mactime_to_unix(redirect["timestamp"]) - origin_time = convert_mactime_to_unix(result["timestamp"]) + redirect_time = convert_mactime_to_datetime(redirect["timestamp"]) + origin_time = convert_mactime_to_datetime(result["timestamp"]) elapsed_time = redirect_time - origin_time elapsed_ms = elapsed_time.microseconds / 1000 @@ -112,7 +113,7 @@ class SafariHistory(IOSExtraction): "url": row[1], "visit_id": row[2], "timestamp": row[3], - "isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[3])), + "isodate": convert_mactime_to_iso(row[3]), "redirect_source": row[4], "redirect_destination": row[5], "safari_history_db": os.path.relpath(history_path, self.target_path), diff --git a/mvt/ios/modules/mixed/shortcuts.py b/mvt/ios/modules/mixed/shortcuts.py index 1fae12d..b32a6c2 100644 --- a/mvt/ios/modules/mixed/shortcuts.py +++ b/mvt/ios/modules/mixed/shortcuts.py @@ -10,8 +10,7 @@ import plistlib import sqlite3 from typing import Union -from mvt.common.utils import (check_for_links, convert_mactime_to_unix, - convert_timestamp_to_iso) +from mvt.common.utils import check_for_links, convert_mactime_to_iso from ..base import IOSExtraction @@ -114,8 +113,8 @@ class Shortcuts(IOSExtraction): action["urls"] = [url.rstrip("',") for url in extracted_urls] actions.append(action) - shortcut["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(shortcut.pop("created_date"))) - shortcut["modified_date"] = convert_timestamp_to_iso(convert_mactime_to_unix(shortcut["modified_date"])) + shortcut["isodate"] = convert_mactime_to_iso(shortcut.pop("created_date")) + shortcut["modified_date"] = convert_mactime_to_iso(shortcut["modified_date"]) shortcut["parsed_actions"] = len(actions) shortcut["action_urls"] = list(itertools.chain(*[action["urls"] for action in actions])) self.results.append(shortcut) diff --git a/mvt/ios/modules/mixed/sms.py b/mvt/ios/modules/mixed/sms.py index 2ed20bc..21b367a 100644 --- a/mvt/ios/modules/mixed/sms.py +++ b/mvt/ios/modules/mixed/sms.py @@ -8,8 +8,7 @@ import sqlite3 from base64 import b64encode from typing import Union -from mvt.common.utils import (check_for_links, convert_mactime_to_unix, - convert_timestamp_to_iso) +from mvt.common.utils import check_for_links, convert_mactime_to_iso from ..base import IOSExtraction @@ -38,7 +37,8 @@ class SMS(IOSExtraction): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "sms_received", - "data": f"{record['service']}: {record['guid']} \"{text}\" from {record['phone_number']} ({record['account']})" + "data": f"{record['service']}: {record['guid']} \"{text}\" " + f"from {record['phone_number']} ({record['account']})" } def check_indicators(self) -> None: @@ -69,9 +69,9 @@ class SMS(IOSExtraction): """) # Force the query early to catch database issues items = list(cur) - except sqlite3.DatabaseError as e: + except sqlite3.DatabaseError as exc: conn.close() - if "database disk image is malformed" in str(e): + if "database disk image is malformed" in str(exc): self._recover_sqlite_db_if_needed(self.file_path, forced=True) conn = sqlite3.connect(self.file_path) cur = conn.cursor() @@ -84,7 +84,7 @@ class SMS(IOSExtraction): """) items = list(cur) else: - raise e + raise exc names = [description[0] for description in cur.description] for item in items: @@ -100,25 +100,29 @@ class SMS(IOSExtraction): message[names[index]] = value # We convert Mac's ridiculous timestamp format. - message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(message["date"])) - message["direction"] = ("sent" if message.get("is_from_me", 0) == 1 else "received") + message["isodate"] = convert_mactime_to_iso(message["date"]) + message["direction"] = ("sent" if message.get("is_from_me", 0) == 1 + else "received") # Sometimes "text" is None instead of empty string. if not message.get("text", None): message["text"] = "" if message.get("text", "").startswith("ALERT: State-sponsored attackers may be targeting your iPhone"): - self.log.warn("Apple warning about state-sponsored attack received on the %s", message["isodate"]) + self.log.warn("Apple warning about state-sponsored attack " + "received on the %s", message["isodate"]) self.results.append(message) else: # Extract links from the SMS message. message_links = check_for_links(message.get("text", "")) - # If we find links in the messages or if they are empty we add them to the list. + # If we find links in the messages or if they are empty we add + # them to the list. if message_links or message.get("text", "").strip() == "": self.results.append(message) cur.close() conn.close() - self.log.info("Extracted a total of %d SMS messages containing links", len(self.results)) + self.log.info("Extracted a total of %d SMS messages containing links", + len(self.results)) diff --git a/mvt/ios/modules/mixed/sms_attachments.py b/mvt/ios/modules/mixed/sms_attachments.py index 4f864b0..1a05e90 100644 --- a/mvt/ios/modules/mixed/sms_attachments.py +++ b/mvt/ios/modules/mixed/sms_attachments.py @@ -8,7 +8,7 @@ import sqlite3 from base64 import b64encode from typing import Union -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -36,8 +36,12 @@ class SMSAttachments(IOSExtraction): "timestamp": record["isodate"], "module": self.__class__.__name__, "event": "sms_attachment", - "data": f"{record['service']}: Attachment '{record['transfer_name']}' {record['direction']} from {record['phone_number']} " - f"with {record['total_bytes']} bytes (is_sticker: {record['is_sticker']}, has_user_info: {record['has_user_info']})" + "data": f"{record['service']}: Attachment " + f"'{record['transfer_name']}' {record['direction']} " + f"from {record['phone_number']} " + f"with {record['total_bytes']} bytes " + f"(is_sticker: {record['is_sticker']}, " + f"has_user_info: {record['has_user_info']})" } def run(self) -> None: @@ -70,16 +74,18 @@ class SMSAttachments(IOSExtraction): value = b64encode(value).decode() attachment[names[index]] = value - attachment["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(attachment["created_date"])) - attachment["start_date"] = convert_timestamp_to_iso(convert_mactime_to_unix(attachment["start_date"])) + attachment["isodate"] = convert_mactime_to_iso(attachment["created_date"]) + attachment["start_date"] = convert_mactime_to_iso(attachment["start_date"]) attachment["direction"] = ("sent" if attachment["is_outgoing"] == 1 else "received") attachment["has_user_info"] = attachment["user_info"] is not None attachment["service"] = attachment["service"] or "Unknown" attachment["filename"] = attachment["filename"] or "NULL" - if (attachment["filename"].startswith("/var/tmp/") and attachment["filename"].endswith("-1") + if (attachment["filename"].startswith("/var/tmp/") + and attachment["filename"].endswith("-1") and attachment["direction"] == "received"): - self.log.warn(f"Suspicious iMessage attachment '{attachment['filename']}' on {attachment['isodate']}") + self.log.warn("Suspicious iMessage attachment %s on %s", + attachment['filename'], attachment['isodate']) self.detected.append(attachment) self.results.append(attachment) diff --git a/mvt/ios/modules/mixed/tcc.py b/mvt/ios/modules/mixed/tcc.py index e84a923..f8efa4f 100644 --- a/mvt/ios/modules/mixed/tcc.py +++ b/mvt/ios/modules/mixed/tcc.py @@ -5,10 +5,9 @@ import logging import sqlite3 -from datetime import datetime from typing import Union -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_unix_to_iso from ..base import IOSExtraction @@ -23,7 +22,6 @@ AUTH_VALUE_OLD = { 0: "denied", 1: "allowed" } - AUTH_VALUES = { 0: "denied", 1: "unknown", @@ -60,9 +58,11 @@ class TCC(IOSExtraction): def serialize(self, record: dict) -> Union[dict, list]: if "last_modified" in record: if "allowed_value" in record: - msg = f"Access to {record['service']} by {record['client']} {record['allowed_value']}" + msg = (f"Access to {record['service']} by {record['client']} " + f"{record['allowed_value']}") else: - msg = f"Access to {record['service']} by {record['client']} {record['auth_value']}" + msg = (f"Access to {record['service']} by {record['client']} " + f"{record['auth_value']}") return { "timestamp": record["last_modified"], @@ -89,18 +89,21 @@ class TCC(IOSExtraction): db_version = "v3" try: cur.execute("""SELECT - service, client, client_type, auth_value, auth_reason, last_modified + service, client, client_type, auth_value, + auth_reason, last_modified FROM access;""") except sqlite3.OperationalError: # v2 version try: cur.execute("""SELECT - service, client, client_type, allowed, prompt_count, last_modified + service, client, client_type, allowed, + prompt_count, last_modified FROM access;""") db_version = "v2" except sqlite3.OperationalError: cur.execute("""SELECT - service, client, client_type, allowed, prompt_count + service, client, client_type, allowed, + prompt_count FROM access;""") db_version = "v1" @@ -108,18 +111,20 @@ class TCC(IOSExtraction): service = row[0] client = row[1] client_type = row[2] - client_type_desc = "bundle_id" if client_type == 0 else "absolute_path" + client_type_desc = ("bundle_id" if client_type == 0 + else "absolute_path") if db_version == "v3": auth_value = row[3] auth_value_desc = AUTH_VALUES.get(auth_value, "") auth_reason = row[4] auth_reason_desc = AUTH_REASONS.get(auth_reason, "unknown") - last_modified = convert_timestamp_to_iso(datetime.utcfromtimestamp((row[5]))) + last_modified = convert_unix_to_iso(row[5]) if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]: device = "microphone" if service == "kTCCServiceMicrophone" else "camera" - self.log.info("Found client \"%s\" with access %s to %s on %s by %s", - client, auth_value_desc, device, last_modified, auth_reason_desc) + self.log.info("Found client \"%s\" with access %s to %s " + "on %s by %s", client, auth_value_desc, + device, last_modified, auth_reason_desc) self.results.append({ "service": service, @@ -134,11 +139,13 @@ class TCC(IOSExtraction): allowed_desc = AUTH_VALUE_OLD.get(allowed_value, "") prompt_count = row[4] if db_version == "v2": - last_modified = convert_timestamp_to_iso(datetime.utcfromtimestamp((row[5]))) + last_modified = convert_unix_to_iso(row[5]) if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]: device = "microphone" if service == "kTCCServiceMicrophone" else "camera" - self.log.info("Found client \"%s\" with access %s to %s at %s", - client, allowed_desc, device, last_modified) + self.log.info("Found client \"%s\" with access %s to " + "%s at %s", client, allowed_desc, device, + last_modified) + self.results.append({ "service": service, "client": client, @@ -152,6 +159,7 @@ class TCC(IOSExtraction): device = "microphone" if service == "kTCCServiceMicrophone" else "camera" self.log.info("Found client \"%s\" with access %s to %s", client, allowed_desc, device) + self.results.append({ "service": service, "client": client, @@ -164,7 +172,8 @@ class TCC(IOSExtraction): conn.close() def run(self) -> None: - self._find_ios_database(backup_ids=TCC_BACKUP_IDS, root_paths=TCC_ROOT_PATHS) + self._find_ios_database(backup_ids=TCC_BACKUP_IDS, + root_paths=TCC_ROOT_PATHS) self.log.info("Found TCC database at path: %s", self.file_path) self.process_db(self.file_path) diff --git a/mvt/ios/modules/mixed/webkit_resource_load_statistics.py b/mvt/ios/modules/mixed/webkit_resource_load_statistics.py index 17e6962..83764d9 100644 --- a/mvt/ios/modules/mixed/webkit_resource_load_statistics.py +++ b/mvt/ios/modules/mixed/webkit_resource_load_statistics.py @@ -8,7 +8,7 @@ import logging import os import sqlite3 -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -20,7 +20,8 @@ WEBKIT_RESOURCELOADSTATICS_ROOT_PATHS = [ class WebkitResourceLoadStatistics(IOSExtraction): - """This module extracts records from WebKit ResourceLoadStatistics observations.db.""" + """This module extracts records from WebKit ResourceLoadStatistics + observations.db.""" # TODO: Add serialize(). def __init__(self, file_path: str = None, target_path: str = None, @@ -49,7 +50,8 @@ class WebkitResourceLoadStatistics(IOSExtraction): self.detected[key].append(item) def _process_observations_db(self, db_path, key): - self.log.info("Found WebKit ResourceLoadStatistics observations.db file at path %s", db_path) + self.log.info("Found WebKit ResourceLoadStatistics observations.db " + "file at path %s", db_path) self._recover_sqlite_db_if_needed(db_path) @@ -70,7 +72,7 @@ class WebkitResourceLoadStatistics(IOSExtraction): "registrable_domain": row[1], "last_seen": row[2], "had_user_interaction": bool(row[3]), - "last_seen_isodate": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(row[2]))), + "last_seen_isodate": convert_datetime_to_iso(datetime.datetime.utcfromtimestamp(int(row[2]))), }) if len(self.results[key]) > 0: @@ -84,8 +86,10 @@ class WebkitResourceLoadStatistics(IOSExtraction): key = f"{backup_file['domain']}/{WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH}" if db_path: self._process_observations_db(db_path=db_path, key=key) - except Exception as e: - self.log.info("Unable to search for WebKit observations.db: %s", e) + except Exception as exc: + self.log.info("Unable to search for WebKit observations.db: %s", + exc) elif self.is_fs_dump: for db_path in self._get_fs_files_from_patterns(WEBKIT_RESOURCELOADSTATICS_ROOT_PATHS): - self._process_observations_db(db_path=db_path, key=os.path.relpath(db_path, self.target_path)) + self._process_observations_db(db_path=db_path, + key=os.path.relpath(db_path, self.target_path)) diff --git a/mvt/ios/modules/mixed/webkit_session_resource_log.py b/mvt/ios/modules/mixed/webkit_session_resource_log.py index 8646234..ff68af1 100644 --- a/mvt/ios/modules/mixed/webkit_session_resource_log.py +++ b/mvt/ios/modules/mixed/webkit_session_resource_log.py @@ -7,7 +7,7 @@ import logging import os import plistlib -from mvt.common.utils import convert_timestamp_to_iso +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -93,7 +93,8 @@ class WebkitSessionResourceLog(IOSExtraction): redirect_path += ", ".join(destination_domains) - self.log.warning("Found HTTP redirect between suspicious domains: %s", redirect_path) + self.log.warning("Found HTTP redirect between suspicious " + "domains: %s", redirect_path) def _extract_browsing_stats(self, log_path): items = [] @@ -114,8 +115,8 @@ class WebkitSessionResourceLog(IOSExtraction): "subframe_under_origin": item.get("subframeUnderTopFrameOrigins", ""), "subresource_under_origin": item.get("subresourceUnderTopFrameOrigins", ""), "user_interaction": item.get("hadUserInteraction"), - "most_recent_interaction": convert_timestamp_to_iso(item["mostRecentUserInteraction"]), - "last_seen": convert_timestamp_to_iso(item["lastSeen"]), + "most_recent_interaction": convert_datetime_to_iso(item["mostRecentUserInteraction"]), + "last_seen": convert_datetime_to_iso(item["lastSeen"]), }) return items @@ -127,13 +128,15 @@ class WebkitSessionResourceLog(IOSExtraction): if not log_path: continue - self.log.info("Found Safari browsing session resource log at path: %s", log_path) + self.log.info("Found Safari browsing session resource log at " + "path: %s", log_path) self.results[log_path] = self._extract_browsing_stats(log_path) elif self.is_fs_dump: for log_path in self._get_fs_files_from_patterns(WEBKIT_SESSION_RESOURCE_LOG_ROOT_PATHS): - self.log.info("Found Safari browsing session resource log at path: %s", log_path) + self.log.info("Found Safari browsing session resource log at " + "path: %s", log_path) key = os.path.relpath(log_path, self.target_path) self.results[key] = self._extract_browsing_stats(log_path) - self.log.info("Extracted records from %d Safari browsing session resource logs", - len(self.results)) + self.log.info("Extracted records from %d Safari browsing session " + "resource logs", len(self.results)) diff --git a/mvt/ios/modules/mixed/whatsapp.py b/mvt/ios/modules/mixed/whatsapp.py index d8912c4..5589f85 100644 --- a/mvt/ios/modules/mixed/whatsapp.py +++ b/mvt/ios/modules/mixed/whatsapp.py @@ -7,8 +7,7 @@ import logging import sqlite3 from typing import Union -from mvt.common.utils import (check_for_links, convert_mactime_to_unix, - convert_timestamp_to_iso) +from mvt.common.utils import check_for_links, convert_mactime_to_iso from ..base import IOSExtraction @@ -62,7 +61,8 @@ class Whatsapp(IOSExtraction): conn = sqlite3.connect(self.file_path) cur = conn.cursor() - # Query all messages and join tables which can contain media attachments and links + # Query all messages and join tables which can contain media attachments + # and links. cur.execute(""" SELECT ZWAMESSAGE.*, @@ -84,13 +84,15 @@ class Whatsapp(IOSExtraction): for index, value in enumerate(message_row): message[names[index]] = value - message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(message.get("ZMESSAGEDATE"))) + message["isodate"] = convert_mactime_to_iso(message.get("ZMESSAGEDATE")) message["ZTEXT"] = message["ZTEXT"] if message["ZTEXT"] else "" - # Extract links from the WhatsApp message. URLs can be stored in multiple fields/columns. + # Extract links from the WhatsApp message. URLs can be stored in + # multiple fields/columns. # Check each of them! message_links = [] - fields_with_links = ["ZTEXT", "ZMATCHEDTEXT", "ZMEDIAURL", "ZCONTENT1", "ZCONTENT2"] + fields_with_links = ["ZTEXT", "ZMATCHEDTEXT", "ZMEDIAURL", + "ZCONTENT1", "ZCONTENT2"] for field in fields_with_links: if message.get(field): message_links.extend(check_for_links(message.get(field, ""))) @@ -98,10 +100,12 @@ class Whatsapp(IOSExtraction): # Remove WhatsApp internal media URLs. filtered_links = [] for link in message_links: - if not (link.startswith("https://mmg-fna.whatsapp.net/") or link.startswith("https://mmg.whatsapp.net/")): + if not (link.startswith("https://mmg-fna.whatsapp.net/") + or link.startswith("https://mmg.whatsapp.net/")): filtered_links.append(link) - # If we find messages with links, or if there's an empty message we add it to the results list. + # If we find messages with links, or if there's an empty message + # we add it to the results list. if filtered_links or (message.get("ZTEXT") or "").strip() == "": message["links"] = list(set(filtered_links)) self.results.append(message) @@ -109,4 +113,5 @@ class Whatsapp(IOSExtraction): cur.close() conn.close() - self.log.info("Extracted a total of %d WhatsApp messages containing links", len(self.results)) + self.log.info("Extracted a total of %d WhatsApp messages containing " + "links", len(self.results)) diff --git a/mvt/ios/modules/net_base.py b/mvt/ios/modules/net_base.py index 84f6292..83de2ba 100644 --- a/mvt/ios/modules/net_base.py +++ b/mvt/ios/modules/net_base.py @@ -9,7 +9,7 @@ import sqlite3 from pathlib import Path from typing import Union -from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso +from mvt.common.utils import convert_mactime_to_iso from .base import IOSExtraction @@ -55,14 +55,14 @@ class NetBase(IOSExtraction): for row in cur: # ZPROCESS records can be missing after the JOIN. Handle NULL timestamps. if row[0] and row[1]: - first_isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[0])) - isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[1])) + first_isodate = convert_mactime_to_iso(row[0]) + isodate = convert_mactime_to_iso(row[1]) else: first_isodate = row[0] isodate = row[1] if row[11]: - live_timestamp = convert_timestamp_to_iso(convert_mactime_to_unix(row[11])) + live_timestamp = convert_mactime_to_iso(row[11]) else: live_timestamp = "" diff --git a/tests/android/test_backup_module.py b/tests/android/test_backup_module.py index 2bdb0c6..a8c0a63 100644 --- a/tests/android/test_backup_module.py +++ b/tests/android/test_backup_module.py @@ -4,7 +4,6 @@ # https://license.mvt.re/1.1/ import io -import logging import os import tarfile @@ -19,7 +18,7 @@ class TestBackupModule: def test_module_folder(self): backup_path = get_android_backup_folder() - mod = SMS(target_path=backup_path, log=logging) + mod = SMS(target_path=backup_path) files = [] for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)): for fname in subfiles: @@ -32,7 +31,7 @@ class TestBackupModule: def test_module_file(self): fpath = os.path.join(get_android_backup_folder(), "backup.ab") - mod = SMS(target_path=fpath, log=logging) + mod = SMS(target_path=fpath) with open(fpath, "rb") as f: data = f.read() tardata = parse_backup_file(data) @@ -48,7 +47,7 @@ class TestBackupModule: def test_module_file2(self): fpath = os.path.join(get_android_backup_folder(), "backup2.ab") - mod = SMS(target_path=fpath, log=logging) + mod = SMS(target_path=fpath) with open(fpath, "rb") as f: data = f.read() tardata = parse_backup_file(data, password="123456") @@ -64,7 +63,7 @@ class TestBackupModule: def test_module_file3(self): fpath = os.path.join(get_android_backup_folder(), "backup3.ab") - mod = SMS(target_path=fpath, log=logging) + mod = SMS(target_path=fpath) with open(fpath, "rb") as f: data = f.read() tardata = parse_backup_file(data) diff --git a/tests/android/test_backup_parser.py b/tests/android/test_backup_parser.py index 743fc0b..9a26406 100644 --- a/tests/android/test_backup_parser.py +++ b/tests/android/test_backup_parser.py @@ -52,6 +52,7 @@ class TestBackupParsing: m.update(ddata) assert m.hexdigest() == "33e73df2ede9798dcb3a85c06200ee41c8f52dd2f2e50ffafcceb0407bc13e3a" sms = parse_tar_for_sms(ddata) + print(sms) assert isinstance(sms, list) assert len(sms) == 1 assert len(sms[0]["links"]) == 1 diff --git a/tests/android/test_bugreport_appops.py b/tests/android/test_bugreport_appops.py index 0c6c19c..98bb278 100644 --- a/tests/android/test_bugreport_appops.py +++ b/tests/android/test_bugreport_appops.py @@ -3,7 +3,6 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import logging import os from pathlib import Path @@ -17,7 +16,7 @@ class TestAppopsModule: def test_appops_parsing(self): fpath = os.path.join(get_artifact_folder(), "android_data/bugreport/") - m = Appops(target_path=fpath, log=logging, results=[]) + m = Appops(target_path=fpath) folder_files = [] parent_path = Path(fpath).absolute().as_posix() for root, subdirs, subfiles in os.walk(os.path.abspath(fpath)): diff --git a/tests/android/test_dumpsys_parser.py b/tests/android/test_dumpsys_parser.py index 097d938..83d804b 100644 --- a/tests/android/test_dumpsys_parser.py +++ b/tests/android/test_dumpsys_parser.py @@ -14,6 +14,7 @@ class TestDumpsysParsing: file = get_artifact("android_data/dumpsys_appops.txt") with open(file) as f: data = f.read() + res = parse_dumpsys_appops(data) assert len(res) == 12 diff --git a/tests/common/test_date_conversions.py b/tests/common/test_date_conversions.py new file mode 100644 index 0000000..a292e52 --- /dev/null +++ b/tests/common/test_date_conversions.py @@ -0,0 +1,19 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 Claudio Guarnieri. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +from mvt.common.utils import convert_mactime_to_iso, convert_unix_to_iso + +TEST_DATE_EPOCH = 1626566400 +TEST_DATE_ISO = "2021-07-18 00:00:00.000000" +TEST_DATE_MAC = TEST_DATE_EPOCH - 978307200 + + +class TestDateConversions: + + def test_convert_unix_to_iso(self): + assert convert_unix_to_iso(TEST_DATE_EPOCH) == TEST_DATE_ISO + + def test_convert_mactime_to_iso(self): + assert convert_mactime_to_iso(TEST_DATE_MAC) == TEST_DATE_ISO diff --git a/tests/ios_backup/test_backup_info.py b/tests/ios_backup/test_backup_info.py index fa2e014..9099175 100644 --- a/tests/ios_backup/test_backup_info.py +++ b/tests/ios_backup/test_backup_info.py @@ -3,8 +3,6 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import logging - from mvt.common.module import run_module from mvt.ios.modules.backup.backup_info import BackupInfo @@ -14,7 +12,7 @@ from ..utils import get_ios_backup_folder class TestBackupInfoModule: def test_manifest(self): - m = BackupInfo(target_path=get_ios_backup_folder(), log=logging) + m = BackupInfo(target_path=get_ios_backup_folder()) run_module(m) assert m.results["Build Version"] == "18C66" assert m.results["IMEI"] == "42" diff --git a/tests/ios_backup/test_datausage.py b/tests/ios_backup/test_datausage.py index 5ce92bd..63fec23 100644 --- a/tests/ios_backup/test_datausage.py +++ b/tests/ios_backup/test_datausage.py @@ -15,15 +15,16 @@ from ..utils import get_ios_backup_folder class TestDatausageModule: def test_datausage(self): - m = Datausage(target_path=get_ios_backup_folder(), log=logging, results=[]) + m = Datausage(target_path=get_ios_backup_folder()) run_module(m) + assert m.results[0]["isodate"][0:19] == "2019-08-27 15:08:09" assert len(m.results) == 42 assert len(m.timeline) == 60 assert len(m.detected) == 0 def test_detection(self, indicator_file): - m = Datausage(target_path=get_ios_backup_folder(), log=logging, results=[]) - ind = Indicators(log=logging) + m = Datausage(target_path=get_ios_backup_folder()) + ind = Indicators(log=logging.getLogger()) ind.parse_stix2(indicator_file) # Adds a file that exists in the manifest. ind.ioc_collections[0]["processes"].append("CumulativeUsageTracker") diff --git a/tests/ios_backup/test_manifest.py b/tests/ios_backup/test_manifest.py index d5a76c3..1a64b3e 100644 --- a/tests/ios_backup/test_manifest.py +++ b/tests/ios_backup/test_manifest.py @@ -15,15 +15,15 @@ from ..utils import get_ios_backup_folder class TestManifestModule: def test_manifest(self): - m = Manifest(target_path=get_ios_backup_folder(), log=logging, results=[]) + m = Manifest(target_path=get_ios_backup_folder()) run_module(m) assert len(m.results) == 3721 assert len(m.timeline) == 5881 assert len(m.detected) == 0 def test_detection(self, indicator_file): - m = Manifest(target_path=get_ios_backup_folder(), log=logging, results=[]) - ind = Indicators(log=logging) + m = Manifest(target_path=get_ios_backup_folder()) + ind = Indicators(log=logging.getLogger()) ind.parse_stix2(indicator_file) ind.ioc_collections[0]["file_names"].append("com.apple.CoreBrightness.plist") m.indicators = ind diff --git a/tests/ios_backup/test_safari_browserstate.py b/tests/ios_backup/test_safari_browserstate.py index 76bdc44..7110e92 100644 --- a/tests/ios_backup/test_safari_browserstate.py +++ b/tests/ios_backup/test_safari_browserstate.py @@ -15,7 +15,7 @@ from ..utils import get_ios_backup_folder class TestSafariBrowserStateModule: def test_parsing(self): - m = SafariBrowserState(target_path=get_ios_backup_folder(), log=logging, results=[]) + m = SafariBrowserState(target_path=get_ios_backup_folder()) m.is_backup = True run_module(m) assert len(m.results) == 1 @@ -23,9 +23,9 @@ class TestSafariBrowserStateModule: assert len(m.detected) == 0 def test_detection(self, indicator_file): - m = SafariBrowserState(target_path=get_ios_backup_folder(), log=logging, results=[]) + m = SafariBrowserState(target_path=get_ios_backup_folder()) m.is_backup = True - ind = Indicators(log=logging) + ind = Indicators(log=logging.getLogger()) ind.parse_stix2(indicator_file) # Adds a file that exists in the manifest. ind.ioc_collections[0]["domains"].append("en.wikipedia.org") diff --git a/tests/ios_backup/test_sms.py b/tests/ios_backup/test_sms.py index 4eb274d..78e90e3 100644 --- a/tests/ios_backup/test_sms.py +++ b/tests/ios_backup/test_sms.py @@ -15,15 +15,15 @@ from ..utils import get_ios_backup_folder class TestSMSModule: def test_sms(self): - m = SMS(target_path=get_ios_backup_folder(), log=logging, results=[]) + m = SMS(target_path=get_ios_backup_folder()) run_module(m) assert len(m.results) == 1 assert len(m.timeline) == 1 assert len(m.detected) == 0 def test_detection(self, indicator_file): - m = SMS(target_path=get_ios_backup_folder(), log=logging, results=[]) - ind = Indicators(log=logging) + m = SMS(target_path=get_ios_backup_folder()) + ind = Indicators(log=logging.getLogger()) ind.parse_stix2(indicator_file) # Adds a file that exists in the manifest. ind.ioc_collections[0]["domains"].append("badbadbad.example.org") diff --git a/tests/ios_backup/test_tcc.py b/tests/ios_backup/test_tcc.py index 99ce2f3..d4ca363 100644 --- a/tests/ios_backup/test_tcc.py +++ b/tests/ios_backup/test_tcc.py @@ -15,7 +15,7 @@ from ..utils import get_ios_backup_folder class TestTCCtModule: def test_tcc(self): - m = TCC(target_path=get_ios_backup_folder(), log=logging, results=[]) + m = TCC(target_path=get_ios_backup_folder()) run_module(m) assert len(m.results) == 11 assert len(m.timeline) == 11 @@ -25,8 +25,8 @@ class TestTCCtModule: assert m.results[0]["auth_value"] == "allowed" def test_tcc_detection(self, indicator_file): - m = TCC(target_path=get_ios_backup_folder(), log=logging, results=[]) - ind = Indicators(log=logging) + m = TCC(target_path=get_ios_backup_folder()) + ind = Indicators(log=logging.getLogger()) ind.parse_stix2(indicator_file) m.indicators = ind run_module(m)