From 2bc6fbef2f850ce7d5b20659173ad6875dd6e237 Mon Sep 17 00:00:00 2001 From: Nex Date: Fri, 17 Jun 2022 22:30:46 +0200 Subject: [PATCH] Starting to add type hints --- mvt/android/cmd_check_adb.py | 5 +- mvt/android/cmd_check_backup.py | 5 +- mvt/android/cmd_check_bugreport.py | 5 +- mvt/android/cmd_download_apks.py | 2 +- mvt/android/modules/adb/base.py | 7 +- mvt/android/modules/adb/chrome_history.py | 11 +-- .../modules/adb/dumpsys_accessibility.py | 9 +-- mvt/android/modules/adb/dumpsys_activities.py | 9 +-- mvt/android/modules/adb/dumpsys_appops.py | 11 +-- .../modules/adb/dumpsys_battery_daily.py | 11 +-- .../modules/adb/dumpsys_battery_history.py | 9 +-- mvt/android/modules/adb/dumpsys_dbinfo.py | 9 +-- mvt/android/modules/adb/dumpsys_full.py | 7 +- mvt/android/modules/adb/dumpsys_receivers.py | 9 +-- mvt/android/modules/adb/files.py | 11 +-- mvt/android/modules/adb/getprop.py | 7 +- mvt/android/modules/adb/logcat.py | 7 +- mvt/android/modules/adb/packages.py | 11 +-- mvt/android/modules/adb/processes.py | 9 +-- mvt/android/modules/adb/root_binaries.py | 7 +- mvt/android/modules/adb/selinux_status.py | 7 +- mvt/android/modules/adb/settings.py | 9 +-- mvt/android/modules/adb/sms.py | 11 +-- mvt/android/modules/adb/whatsapp.py | 11 +-- mvt/android/modules/backup/sms.py | 11 +-- .../modules/bugreport/accessibility.py | 9 +-- mvt/android/modules/bugreport/activities.py | 9 +-- mvt/android/modules/bugreport/appops.py | 11 +-- .../modules/bugreport/battery_daily.py | 11 +-- .../modules/bugreport/battery_history.py | 9 +-- mvt/android/modules/bugreport/dbinfo.py | 9 +-- mvt/android/modules/bugreport/getprop.py | 7 +- mvt/android/modules/bugreport/packages.py | 11 +-- mvt/android/modules/bugreport/receivers.py | 9 +-- mvt/android/parsers/dumpsys.py | 14 ++-- mvt/android/parsers/getprop.py | 2 +- mvt/common/cmd_check_iocs.py | 7 +- mvt/common/command.py | 69 ++++++++++--------- mvt/common/indicators.py | 41 +++++------ mvt/common/logo.py | 2 +- mvt/common/module.py | 25 ++++--- mvt/common/updates.py | 2 +- mvt/common/url.py | 12 ++-- mvt/common/utils.py | 2 +- mvt/common/virustotal.py | 2 +- mvt/ios/cmd_check_backup.py | 5 +- mvt/ios/cmd_check_fs.py | 5 +- mvt/ios/decrypt.py | 17 ++--- mvt/ios/modules/backup/backup_info.py | 8 ++- .../modules/backup/configuration_profiles.py | 12 ++-- mvt/ios/modules/backup/manifest.py | 12 ++-- mvt/ios/modules/backup/profile_events.py | 11 +-- mvt/ios/modules/base.py | 6 +- mvt/ios/modules/fs/analytics.py | 12 ++-- mvt/ios/modules/fs/cache_files.py | 12 ++-- mvt/ios/modules/fs/filesystem.py | 12 ++-- mvt/ios/modules/fs/net_netusage.py | 8 ++- mvt/ios/modules/fs/safari_favicon.py | 12 ++-- mvt/ios/modules/fs/shutdownlog.py | 13 ++-- mvt/ios/modules/fs/version_history.py | 10 +-- mvt/ios/modules/fs/webkit_base.py | 2 +- mvt/ios/modules/fs/webkit_indexeddb.py | 11 +-- mvt/ios/modules/fs/webkit_localstorage.py | 11 +-- .../modules/fs/webkit_safariviewservice.py | 9 ++- mvt/ios/modules/mixed/calls.py | 10 +-- mvt/ios/modules/mixed/chrome_favicon.py | 12 ++-- mvt/ios/modules/mixed/chrome_history.py | 12 ++-- mvt/ios/modules/mixed/contacts.py | 8 ++- mvt/ios/modules/mixed/firefox_favicon.py | 12 ++-- mvt/ios/modules/mixed/firefox_history.py | 12 ++-- mvt/ios/modules/mixed/idstatuscache.py | 12 ++-- mvt/ios/modules/mixed/interactionc.py | 10 +-- mvt/ios/modules/mixed/locationd.py | 12 ++-- mvt/ios/modules/mixed/net_datausage.py | 9 ++- mvt/ios/modules/mixed/osanalytics_addaily.py | 12 ++-- mvt/ios/modules/mixed/safari_browserstate.py | 12 ++-- mvt/ios/modules/mixed/safari_history.py | 12 ++-- mvt/ios/modules/mixed/shortcuts.py | 12 ++-- mvt/ios/modules/mixed/sms.py | 12 ++-- mvt/ios/modules/mixed/sms_attachments.py | 10 +-- mvt/ios/modules/mixed/tcc.py | 12 ++-- .../mixed/webkit_resource_load_statistics.py | 10 +-- .../mixed/webkit_session_resource_log.py | 10 +-- mvt/ios/modules/mixed/whatsapp.py | 11 +-- mvt/ios/modules/net_base.py | 10 +-- mvt/ios/versions.py | 7 +- 86 files changed, 514 insertions(+), 386 deletions(-) diff --git a/mvt/android/cmd_check_adb.py b/mvt/android/cmd_check_adb.py index ca8aefe..0d4bc44 100644 --- a/mvt/android/cmd_check_adb.py +++ b/mvt/android/cmd_check_adb.py @@ -17,8 +17,9 @@ class CmdAndroidCheckADB(Command): name = "check-adb" modules = ADB_MODULES - def __init__(self, target_path=None, results_path=None, ioc_files=[], - module_name=None, serial=None, fast_mode=False): + def __init__(self, target_path: str = None, results_path: str = None, + ioc_files: list = [], module_name: str = None, serial: str = None, + fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) diff --git a/mvt/android/cmd_check_backup.py b/mvt/android/cmd_check_backup.py index 82ba1cc..148fb1e 100644 --- a/mvt/android/cmd_check_backup.py +++ b/mvt/android/cmd_check_backup.py @@ -26,8 +26,9 @@ class CmdAndroidCheckBackup(Command): name = "check-backup" modules = BACKUP_MODULES - def __init__(self, target_path=None, results_path=None, ioc_files=[], - module_name=None, serial=None, fast_mode=False): + def __init__(self, target_path: str = None, results_path: str = None, + ioc_files: list = [], module_name: str = None, serial: str = None, + fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) diff --git a/mvt/android/cmd_check_bugreport.py b/mvt/android/cmd_check_bugreport.py index 4f13b67..5d25b0a 100644 --- a/mvt/android/cmd_check_bugreport.py +++ b/mvt/android/cmd_check_bugreport.py @@ -20,8 +20,9 @@ class CmdAndroidCheckBugreport(Command): name = "check-bugreport" modules = BUGREPORT_MODULES - def __init__(self, target_path=None, results_path=None, ioc_files=[], - module_name=None, serial=None, fast_mode=False): + def __init__(self, target_path: str = None, results_path: str = None, + ioc_files: list = [], module_name: str = None, serial: str = None, + fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) diff --git a/mvt/android/cmd_download_apks.py b/mvt/android/cmd_download_apks.py index 22d82bf..1b758dd 100644 --- a/mvt/android/cmd_download_apks.py +++ b/mvt/android/cmd_download_apks.py @@ -176,7 +176,7 @@ class DownloadAPKs(AndroidExtraction): with open(json_path, "w", encoding="utf-8") as handle: json.dump(self.packages, handle, indent=4) - def run(self): + def run(self) -> None: """Run all steps of fetch-apk.""" self.get_packages() self._adb_connect() diff --git a/mvt/android/modules/adb/base.py b/mvt/android/modules/adb/base.py index 5134325..ff8f306 100644 --- a/mvt/android/modules/adb/base.py +++ b/mvt/android/modules/adb/base.py @@ -33,8 +33,9 @@ ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub") class AndroidExtraction(MVTModule): """This class provides a base for all Android extraction modules.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) @@ -273,6 +274,6 @@ class AndroidExtraction(MVTModule): self.log.warn("All attempts to decrypt backup with password failed!") - def run(self): + def run(self) -> None: """Run the main procedure.""" raise NotImplementedError diff --git a/mvt/android/modules/adb/chrome_history.py b/mvt/android/modules/adb/chrome_history.py index f9315b3..71fc57c 100644 --- a/mvt/android/modules/adb/chrome_history.py +++ b/mvt/android/modules/adb/chrome_history.py @@ -20,13 +20,14 @@ CHROME_HISTORY_PATH = "data/data/com.android.chrome/app_chrome/Default/History" class ChromeHistory(AndroidExtraction): """This module extracts records from Android's Chrome browsing history.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -34,7 +35,7 @@ class ChromeHistory(AndroidExtraction): "data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, redirect source: {record['redirect_source']})" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -77,7 +78,7 @@ class ChromeHistory(AndroidExtraction): log.info("Extracted a total of %d history items", len(self.results)) - def run(self): + def run(self) -> None: try: self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH), self._parse_db) diff --git a/mvt/android/modules/adb/dumpsys_accessibility.py b/mvt/android/modules/adb/dumpsys_accessibility.py index 20651b4..0ddacc6 100644 --- a/mvt/android/modules/adb/dumpsys_accessibility.py +++ b/mvt/android/modules/adb/dumpsys_accessibility.py @@ -15,13 +15,14 @@ log = logging.getLogger(__name__) class DumpsysAccessibility(AndroidExtraction): """This module extracts stats on accessibility.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -32,7 +33,7 @@ class DumpsysAccessibility(AndroidExtraction): self.detected.append(result) continue - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("dumpsys accessibility") self._adb_disconnect() diff --git a/mvt/android/modules/adb/dumpsys_activities.py b/mvt/android/modules/adb/dumpsys_activities.py index 5a0311e..60fd815 100644 --- a/mvt/android/modules/adb/dumpsys_activities.py +++ b/mvt/android/modules/adb/dumpsys_activities.py @@ -15,15 +15,16 @@ log = logging.getLogger(__name__) class DumpsysActivities(AndroidExtraction): """This module extracts details on receivers for risky activities.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = results if results else {} - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -35,7 +36,7 @@ class DumpsysActivities(AndroidExtraction): self.detected.append({intent: activity}) continue - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("dumpsys package") self._adb_disconnect() diff --git a/mvt/android/modules/adb/dumpsys_appops.py b/mvt/android/modules/adb/dumpsys_appops.py index 1019102..f6c6f49 100644 --- a/mvt/android/modules/adb/dumpsys_appops.py +++ b/mvt/android/modules/adb/dumpsys_appops.py @@ -17,13 +17,14 @@ class DumpsysAppOps(AndroidExtraction): slug = "dumpsys_appops" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: records = [] for perm in record["permissions"]: if "entries" not in perm: @@ -40,7 +41,7 @@ class DumpsysAppOps(AndroidExtraction): return records - def check_indicators(self): + def check_indicators(self) -> None: for result in self.results: if self.indicators: ioc = self.indicators.check_app_id(result.get("package_name")) @@ -54,7 +55,7 @@ class DumpsysAppOps(AndroidExtraction): self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission", result["package_name"]) - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("dumpsys appops") self._adb_disconnect() diff --git a/mvt/android/modules/adb/dumpsys_battery_daily.py b/mvt/android/modules/adb/dumpsys_battery_daily.py index 9b83c3d..eb889b1 100644 --- a/mvt/android/modules/adb/dumpsys_battery_daily.py +++ b/mvt/android/modules/adb/dumpsys_battery_daily.py @@ -15,13 +15,14 @@ log = logging.getLogger(__name__) class DumpsysBatteryDaily(AndroidExtraction): """This module extracts records from battery daily updates.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["from"], "module": self.__class__.__name__, @@ -29,7 +30,7 @@ class DumpsysBatteryDaily(AndroidExtraction): "data": f"Recorded update of package {record['package_name']} with vers {record['vers']}" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -40,7 +41,7 @@ class DumpsysBatteryDaily(AndroidExtraction): self.detected.append(result) continue - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("dumpsys batterystats --daily") self._adb_disconnect() diff --git a/mvt/android/modules/adb/dumpsys_battery_history.py b/mvt/android/modules/adb/dumpsys_battery_history.py index c69d080..8adc25d 100644 --- a/mvt/android/modules/adb/dumpsys_battery_history.py +++ b/mvt/android/modules/adb/dumpsys_battery_history.py @@ -15,13 +15,14 @@ log = logging.getLogger(__name__) class DumpsysBatteryHistory(AndroidExtraction): """This module extracts records from battery history events.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -32,7 +33,7 @@ class DumpsysBatteryHistory(AndroidExtraction): self.detected.append(result) continue - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("dumpsys batterystats --history") self._adb_disconnect() diff --git a/mvt/android/modules/adb/dumpsys_dbinfo.py b/mvt/android/modules/adb/dumpsys_dbinfo.py index 44a78ce..d7e3877 100644 --- a/mvt/android/modules/adb/dumpsys_dbinfo.py +++ b/mvt/android/modules/adb/dumpsys_dbinfo.py @@ -17,13 +17,14 @@ class DumpsysDBInfo(AndroidExtraction): slug = "dumpsys_dbinfo" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -36,7 +37,7 @@ class DumpsysDBInfo(AndroidExtraction): self.detected.append(result) continue - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("dumpsys dbinfo") self._adb_disconnect() diff --git a/mvt/android/modules/adb/dumpsys_full.py b/mvt/android/modules/adb/dumpsys_full.py index ba39e55..53a4964 100644 --- a/mvt/android/modules/adb/dumpsys_full.py +++ b/mvt/android/modules/adb/dumpsys_full.py @@ -14,13 +14,14 @@ log = logging.getLogger(__name__) class DumpsysFull(AndroidExtraction): """This module extracts stats on battery consumption by processes.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("dumpsys") diff --git a/mvt/android/modules/adb/dumpsys_receivers.py b/mvt/android/modules/adb/dumpsys_receivers.py index c58c009..a3076ab 100644 --- a/mvt/android/modules/adb/dumpsys_receivers.py +++ b/mvt/android/modules/adb/dumpsys_receivers.py @@ -21,15 +21,16 @@ INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL" class DumpsysReceivers(AndroidExtraction): """This module extracts details on receivers for risky activities.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = results if results else {} - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -57,7 +58,7 @@ class DumpsysReceivers(AndroidExtraction): self.detected.append({intent: receiver}) continue - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("dumpsys package") diff --git a/mvt/android/modules/adb/files.py b/mvt/android/modules/adb/files.py index 08f4de5..bc326d0 100644 --- a/mvt/android/modules/adb/files.py +++ b/mvt/android/modules/adb/files.py @@ -17,8 +17,9 @@ log = logging.getLogger(__name__) class Files(AndroidExtraction): """This module extracts the list of files on the device.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) @@ -46,7 +47,7 @@ class Files(AndroidExtraction): for file_line in output.splitlines(): self.results.append({"path": file_line.rstrip()}) - def serialize(self, record): + def serialize(self, record: dict) -> None: if "modified_time" in record: return { "timestamp": record["modified_time"], @@ -63,7 +64,7 @@ class Files(AndroidExtraction): result["path"]) self.detected.append(result) - def check_indicators(self): + def check_indicators(self) -> None: """Check file list for known suspicious files or suspicious properties""" self.check_suspicious() @@ -75,7 +76,7 @@ class Files(AndroidExtraction): self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"]) self.detected.append(result) - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null") diff --git a/mvt/android/modules/adb/getprop.py b/mvt/android/modules/adb/getprop.py index 657b9b8..b32134f 100644 --- a/mvt/android/modules/adb/getprop.py +++ b/mvt/android/modules/adb/getprop.py @@ -16,15 +16,16 @@ log = logging.getLogger(__name__) class Getprop(AndroidExtraction): """This module extracts device properties from getprop command.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = {} if not results else results - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("getprop") self._adb_disconnect() diff --git a/mvt/android/modules/adb/logcat.py b/mvt/android/modules/adb/logcat.py index d06f80a..0458c73 100644 --- a/mvt/android/modules/adb/logcat.py +++ b/mvt/android/modules/adb/logcat.py @@ -14,13 +14,14 @@ log = logging.getLogger(__name__) class Logcat(AndroidExtraction): """This module extracts details on installed packages.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def run(self): + def run(self) -> None: self._adb_connect() # Get the current logcat. diff --git a/mvt/android/modules/adb/packages.py b/mvt/android/modules/adb/packages.py index ca9d7b3..4c488c1 100644 --- a/mvt/android/modules/adb/packages.py +++ b/mvt/android/modules/adb/packages.py @@ -72,13 +72,14 @@ ROOT_PACKAGES = [ class Packages(AndroidExtraction): """This module extracts the list of installed packages.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: records = [] timestamps = [ @@ -97,7 +98,7 @@ class Packages(AndroidExtraction): return records - def check_indicators(self): + def check_indicators(self) -> None: for result in self.results: if result["package_name"] in ROOT_PACKAGES: self.log.warning("Found an installed package related to rooting/jailbreaking: \"%s\"", @@ -238,7 +239,7 @@ class Packages(AndroidExtraction): return package_files - def run(self): + def run(self) -> None: self._adb_connect() packages = self._adb_command("pm list packages -u -i -f") diff --git a/mvt/android/modules/adb/processes.py b/mvt/android/modules/adb/processes.py index 5843b61..ac2d3f0 100644 --- a/mvt/android/modules/adb/processes.py +++ b/mvt/android/modules/adb/processes.py @@ -13,13 +13,14 @@ log = logging.getLogger(__name__) class Processes(AndroidExtraction): """This module extracts details on running processes.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -29,7 +30,7 @@ class Processes(AndroidExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("ps -e") diff --git a/mvt/android/modules/adb/root_binaries.py b/mvt/android/modules/adb/root_binaries.py index a9dedec..6de7770 100644 --- a/mvt/android/modules/adb/root_binaries.py +++ b/mvt/android/modules/adb/root_binaries.py @@ -13,13 +13,14 @@ log = logging.getLogger(__name__) class RootBinaries(AndroidExtraction): """This module extracts the list of installed packages.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def run(self): + def run(self) -> None: root_binaries = [ "su", "busybox", diff --git a/mvt/android/modules/adb/selinux_status.py b/mvt/android/modules/adb/selinux_status.py index c0169ec..5a6a93e 100644 --- a/mvt/android/modules/adb/selinux_status.py +++ b/mvt/android/modules/adb/selinux_status.py @@ -15,15 +15,16 @@ class SELinuxStatus(AndroidExtraction): slug = "selinux_status" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = {} if not results else results - def run(self): + def run(self) -> None: self._adb_connect() output = self._adb_command("getenforce") self._adb_disconnect() diff --git a/mvt/android/modules/adb/settings.py b/mvt/android/modules/adb/settings.py index 9e6a5f6..57c3a20 100644 --- a/mvt/android/modules/adb/settings.py +++ b/mvt/android/modules/adb/settings.py @@ -57,15 +57,16 @@ ANDROID_DANGEROUS_SETTINGS = [ class Settings(AndroidExtraction): """This module extracts Android system settings.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = {} if not results else results - def check_indicators(self): + def check_indicators(self) -> None: for namespace, settings in self.results.items(): for key, value in settings.items(): for danger in ANDROID_DANGEROUS_SETTINGS: @@ -76,7 +77,7 @@ class Settings(AndroidExtraction): key, value, danger["description"]) break - def run(self): + def run(self) -> None: self._adb_connect() for namespace in ["system", "secure", "global"]: diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index aceffc9..7ebd728 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -46,13 +46,14 @@ FROM sms; class SMS(AndroidExtraction): """This module extracts all SMS messages containing links.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: body = record["body"].replace("\n", "\\n") return { "timestamp": record["isodate"], @@ -61,7 +62,7 @@ class SMS(AndroidExtraction): "data": f"{record['address']}: \"{body}\"" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -126,7 +127,7 @@ class SMS(AndroidExtraction): log.info("Extracted a total of %d SMS messages containing links", len(self.results)) - def run(self): + def run(self) -> None: try: if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))): self.SMS_DB_TYPE = 1 diff --git a/mvt/android/modules/adb/whatsapp.py b/mvt/android/modules/adb/whatsapp.py index 9d4c544..41324b5 100644 --- a/mvt/android/modules/adb/whatsapp.py +++ b/mvt/android/modules/adb/whatsapp.py @@ -20,13 +20,14 @@ WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db" class Whatsapp(AndroidExtraction): """This module extracts all WhatsApp messages containing links.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: text = record["data"].replace("\n", "\\n") return { "timestamp": record["isodate"], @@ -35,7 +36,7 @@ class Whatsapp(AndroidExtraction): "data": f"\"{text}\"" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -84,7 +85,7 @@ class Whatsapp(AndroidExtraction): log.info("Extracted a total of %d WhatsApp messages containing links", len(messages)) self.results = messages - def run(self): + def run(self) -> None: try: self._adb_process_file(os.path.join("/", WHATSAPP_PATH), self._parse_db) except Exception as e: diff --git a/mvt/android/modules/backup/sms.py b/mvt/android/modules/backup/sms.py index e06dc53..294159d 100644 --- a/mvt/android/modules/backup/sms.py +++ b/mvt/android/modules/backup/sms.py @@ -3,19 +3,22 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging + from mvt.android.modules.backup.base import BackupExtraction from mvt.android.parsers.backup import parse_sms_file class SMS(BackupExtraction): - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = [] - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -26,7 +29,7 @@ class SMS(BackupExtraction): if self.indicators.check_domains(message["links"]): self.detected.append(message) - def run(self): + def run(self) -> None: for file in self._get_files_by_pattern("apps/com.android.providers.telephony/d_f/*_sms_backup"): self.log.info("Processing SMS backup file at %s", file) data = self._get_file_content(file) diff --git a/mvt/android/modules/bugreport/accessibility.py b/mvt/android/modules/bugreport/accessibility.py index cb001e8..2d004bd 100644 --- a/mvt/android/modules/bugreport/accessibility.py +++ b/mvt/android/modules/bugreport/accessibility.py @@ -15,13 +15,14 @@ log = logging.getLogger(__name__) class Accessibility(BugReportModule): """This module extracts stats on accessibility.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -32,7 +33,7 @@ class Accessibility(BugReportModule): self.detected.append(result) continue - def run(self): + def run(self) -> None: content = self._get_dumpstate_file() if not content: self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") diff --git a/mvt/android/modules/bugreport/activities.py b/mvt/android/modules/bugreport/activities.py index 6bc0fdd..51a3fb0 100644 --- a/mvt/android/modules/bugreport/activities.py +++ b/mvt/android/modules/bugreport/activities.py @@ -15,15 +15,16 @@ log = logging.getLogger(__name__) class Activities(BugReportModule): """This module extracts details on receivers for risky activities.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = results if results else {} - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -35,7 +36,7 @@ class Activities(BugReportModule): self.detected.append({intent: activity}) continue - def run(self): + def run(self) -> None: content = self._get_dumpstate_file() if not content: self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") diff --git a/mvt/android/modules/bugreport/appops.py b/mvt/android/modules/bugreport/appops.py index 63b381f..4cbb802 100644 --- a/mvt/android/modules/bugreport/appops.py +++ b/mvt/android/modules/bugreport/appops.py @@ -15,13 +15,14 @@ log = logging.getLogger(__name__) class Appops(BugReportModule): """This module extracts information on package from App-Ops Manager.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: records = [] for perm in record["permissions"]: if "entries" not in perm: @@ -38,7 +39,7 @@ class Appops(BugReportModule): return records - def check_indicators(self): + def check_indicators(self) -> None: for result in self.results: if self.indicators: ioc = self.indicators.check_app_id(result.get("package_name")) @@ -51,7 +52,7 @@ class Appops(BugReportModule): if perm["name"] == "REQUEST_INSTALL_PACKAGES" and perm["access"] == "allow": self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission", result["package_name"]) - def run(self): + def run(self) -> None: content = self._get_dumpstate_file() if not content: self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") diff --git a/mvt/android/modules/bugreport/battery_daily.py b/mvt/android/modules/bugreport/battery_daily.py index 93eab5e..d3c8492 100644 --- a/mvt/android/modules/bugreport/battery_daily.py +++ b/mvt/android/modules/bugreport/battery_daily.py @@ -15,13 +15,14 @@ log = logging.getLogger(__name__) class BatteryDaily(BugReportModule): """This module extracts records from battery daily updates.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["from"], "module": self.__class__.__name__, @@ -29,7 +30,7 @@ class BatteryDaily(BugReportModule): "data": f"Recorded update of package {record['package_name']} with vers {record['vers']}" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -40,7 +41,7 @@ class BatteryDaily(BugReportModule): self.detected.append(result) continue - def run(self): + def run(self) -> None: content = self._get_dumpstate_file() if not content: self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") diff --git a/mvt/android/modules/bugreport/battery_history.py b/mvt/android/modules/bugreport/battery_history.py index 0dd0ff7..ca08d00 100644 --- a/mvt/android/modules/bugreport/battery_history.py +++ b/mvt/android/modules/bugreport/battery_history.py @@ -15,13 +15,14 @@ log = logging.getLogger(__name__) class BatteryHistory(BugReportModule): """This module extracts records from battery daily updates.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -32,7 +33,7 @@ class BatteryHistory(BugReportModule): self.detected.append(result) continue - def run(self): + def run(self) -> None: content = self._get_dumpstate_file() if not content: self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") diff --git a/mvt/android/modules/bugreport/dbinfo.py b/mvt/android/modules/bugreport/dbinfo.py index 07d2d28..52ee858 100644 --- a/mvt/android/modules/bugreport/dbinfo.py +++ b/mvt/android/modules/bugreport/dbinfo.py @@ -17,13 +17,14 @@ class DBInfo(BugReportModule): slug = "dbinfo" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -36,7 +37,7 @@ class DBInfo(BugReportModule): self.detected.append(result) continue - def run(self): + def run(self) -> None: content = self._get_dumpstate_file() if not content: self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") diff --git a/mvt/android/modules/bugreport/getprop.py b/mvt/android/modules/bugreport/getprop.py index 3783c59..bfa5d23 100644 --- a/mvt/android/modules/bugreport/getprop.py +++ b/mvt/android/modules/bugreport/getprop.py @@ -16,15 +16,16 @@ log = logging.getLogger(__name__) class Getprop(BugReportModule): """This module extracts device properties from getprop command.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = {} if not results else results - def run(self): + def run(self) -> None: content = self._get_dumpstate_file() if not content: self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") diff --git a/mvt/android/modules/bugreport/packages.py b/mvt/android/modules/bugreport/packages.py index 4137ae0..e51b291 100644 --- a/mvt/android/modules/bugreport/packages.py +++ b/mvt/android/modules/bugreport/packages.py @@ -14,13 +14,14 @@ log = logging.getLogger(__name__) class Packages(BugReportModule): """This module extracts details on receivers for risky activities.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: records = [] timestamps = [ @@ -39,7 +40,7 @@ class Packages(BugReportModule): return records - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -133,7 +134,7 @@ class Packages(BugReportModule): return results - def run(self): + def run(self) -> None: content = self._get_dumpstate_file() if not content: self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") diff --git a/mvt/android/modules/bugreport/receivers.py b/mvt/android/modules/bugreport/receivers.py index 3c2b069..97928eb 100644 --- a/mvt/android/modules/bugreport/receivers.py +++ b/mvt/android/modules/bugreport/receivers.py @@ -21,15 +21,16 @@ INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL" class Receivers(BugReportModule): """This module extracts details on receivers for risky activities.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - serial=None, fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = results if results else {} - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -57,7 +58,7 @@ class Receivers(BugReportModule): self.detected.append({intent: receiver}) continue - def run(self): + def run(self) -> None: content = self._get_dumpstate_file() if not content: self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") diff --git a/mvt/android/parsers/dumpsys.py b/mvt/android/parsers/dumpsys.py index 18b03af..3ff40be 100644 --- a/mvt/android/parsers/dumpsys.py +++ b/mvt/android/parsers/dumpsys.py @@ -9,7 +9,7 @@ from datetime import datetime from mvt.common.utils import convert_timestamp_to_iso -def parse_dumpsys_accessibility(output): +def parse_dumpsys_accessibility(output: str) -> list: results = [] in_services = False @@ -34,7 +34,7 @@ def parse_dumpsys_accessibility(output): return results -def parse_dumpsys_activity_resolver_table(output): +def parse_dumpsys_activity_resolver_table(output: str) -> dict: results = {} in_activity_resolver_table = False @@ -90,7 +90,7 @@ def parse_dumpsys_activity_resolver_table(output): return results -def parse_dumpsys_battery_daily(output): +def parse_dumpsys_battery_daily(output: str) -> list: results = [] daily = None daily_updates = [] @@ -136,7 +136,7 @@ def parse_dumpsys_battery_daily(output): return results -def parse_dumpsys_battery_history(output): +def parse_dumpsys_battery_history(output: str) -> list: results = [] for line in output.splitlines(): @@ -181,7 +181,7 @@ def parse_dumpsys_battery_history(output): return results -def parse_dumpsys_dbinfo(output): +def parse_dumpsys_dbinfo(output: str) -> list: results = [] rxp = re.compile(r'.*\[([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})\].*\[Pid:\((\d+)\)\](\w+).*sql\=\"(.+?)\"') @@ -234,7 +234,7 @@ def parse_dumpsys_dbinfo(output): return results -def parse_dumpsys_receiver_resolver_table(output): +def parse_dumpsys_receiver_resolver_table(output: str) -> dict: results = {} in_receiver_resolver_table = False @@ -290,7 +290,7 @@ def parse_dumpsys_receiver_resolver_table(output): return results -def parse_dumpsys_appops(output): +def parse_dumpsys_appops(output: str) -> list: results = [] perm = {} package = {} diff --git a/mvt/android/parsers/getprop.py b/mvt/android/parsers/getprop.py index ab0a09c..c6ee4c0 100644 --- a/mvt/android/parsers/getprop.py +++ b/mvt/android/parsers/getprop.py @@ -6,7 +6,7 @@ import re -def parse_getprop(output): +def parse_getprop(output: str) -> dict: results = {} rxp = re.compile(r"\[(.+?)\]: \[(.+?)\]") diff --git a/mvt/common/cmd_check_iocs.py b/mvt/common/cmd_check_iocs.py index 10467fe..245ca71 100644 --- a/mvt/common/cmd_check_iocs.py +++ b/mvt/common/cmd_check_iocs.py @@ -16,13 +16,14 @@ class CmdCheckIOCS(Command): name = "check-iocs" modules = [] - def __init__(self, target_path=None, results_path=None, ioc_files=[], - module_name=None, serial=None, fast_mode=False): + def __init__(self, target_path: str = None, results_path: str = None, + ioc_files: list = [], module_name: str = None, serial: str = None, + fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) - def run(self): + def run(self) -> None: all_modules = [] for entry in self.modules: if entry not in all_modules: diff --git a/mvt/common/command.py b/mvt/common/command.py index 462d09e..748803e 100644 --- a/mvt/common/command.py +++ b/mvt/common/command.py @@ -9,6 +9,7 @@ import logging import os import sys from datetime import datetime +from typing import Callable from mvt.common.indicators import Indicators from mvt.common.module import run_module, save_timeline @@ -18,9 +19,10 @@ from mvt.common.version import MVT_VERSION class Command(object): - def __init__(self, target_path=None, results_path=None, ioc_files=[], - module_name=None, serial=None, fast_mode=False, - log=logging.getLogger(__name__)): + def __init__(self, target_path: str = None, results_path: str = None, + ioc_files: list = [], module_name: str = None, serial: str = None, + fast_mode: bool = False, + log: logging.Logger = logging.getLogger(__name__)): self.name = "" self.target_path = target_path @@ -37,7 +39,7 @@ class Command(object): self.timeline = [] self.timeline_detected = [] - def _create_storage(self): + def _create_storage(self) -> None: if self.results_path and not os.path.exists(self.results_path): try: os.makedirs(self.results_path) @@ -46,7 +48,7 @@ class Command(object): self.results_path, e) sys.exit(1) - def _add_log_file_handler(self, logger): + def _add_log_file_handler(self, logger: logging.Logger) -> None: if not self.results_path: return @@ -56,7 +58,7 @@ class Command(object): fh.setFormatter(formatter) logger.addHandler(fh) - def _store_timeline(self): + def _store_timeline(self) -> None: if not self.results_path: return @@ -68,12 +70,16 @@ class Command(object): save_timeline(self.timeline_detected, os.path.join(self.results_path, "timeline_detected.csv")) - def _store_info(self): + def _store_info(self) -> None: if not self.results_path: return + target_path = None + if self.target_path: + target_path = os.path.abspath(self.target_path) + info = { - "target_path": os.path.abspath(self.target_path), + "target_path": target_path, "mvt_version": MVT_VERSION, "date": convert_timestamp_to_iso(datetime.now()), "ioc_files": [], @@ -83,44 +89,45 @@ class Command(object): for coll in self.iocs.ioc_collections: info["ioc_files"].append(coll.get("stix2_file_path", "")) - if os.path.isfile(self.target_path): - h = hashlib.sha256() - with open(self.target_path, "rb") as handle: - h.update(handle.read()) + if self.target_path: + if os.path.isfile(self.target_path): + h = hashlib.sha256() + with open(self.target_path, "rb") as handle: + h.update(handle.read()) - info["hashes"].append({ - "file_path": self.target_path, - "sha256": h.hexdigest(), - }) - elif os.path.isdir(self.target_path): - for (root, dirs, files) in os.walk(self.target_path): - for file in files: - file_path = os.path.join(root, file) - h = hashlib.sha256() + info["hashes"].append({ + "file_path": self.target_path, + "sha256": h.hexdigest(), + }) + elif os.path.isdir(self.target_path): + for (root, dirs, files) in os.walk(self.target_path): + for file in files: + file_path = os.path.join(root, file) + h = hashlib.sha256() - with open(file_path, "rb") as handle: - h.update(handle.read()) + with open(file_path, "rb") as handle: + h.update(handle.read()) - info["hashes"].append({ - "file_path": file_path, - "sha256": h.hexdigest(), - }) + info["hashes"].append({ + "file_path": file_path, + "sha256": h.hexdigest(), + }) with open(os.path.join(self.results_path, "info.json"), "w+") as handle: json.dump(info, handle, indent=4) - def list_modules(self): + def list_modules(self) -> None: self.log.info("Following is the list of available %s modules:", self.name) for module in self.modules: self.log.info(" - %s", module.__name__) - def init(self): + def init(self) -> None: raise NotImplementedError - def module_init(self, module): + def module_init(self, module: Callable) -> None: raise NotImplementedError - def run(self): + def run(self) -> None: self._create_storage() self._add_log_file_handler(self.log) diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 53f9980..e139ca6 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -4,6 +4,7 @@ # https://license.mvt.re/1.1/ import json +import logging import os import requests @@ -17,13 +18,13 @@ class Indicators: functions to compare extracted artifacts to the indicators. """ - def __init__(self, log=None): + def __init__(self, log=logging.Logger) -> None: self.data_dir = user_data_dir("mvt") self.log = log self.ioc_collections = [] self.total_ioc_count = 0 - def _load_downloaded_indicators(self): + def _load_downloaded_indicators(self) -> None: if not os.path.isdir(self.data_dir): return @@ -31,7 +32,7 @@ class Indicators: if f.lower().endswith(".stix2"): self.parse_stix2(os.path.join(self.data_dir, f)) - def _check_stix2_env_variable(self): + def _check_stix2_env_variable(self) -> None: """ Checks if a variable MVT_STIX2 contains path to a STIX files. """ @@ -46,8 +47,8 @@ class Indicators: self.log.error("Path specified with env MVT_STIX2 is not a valid file: %s", path) - def _new_collection(self, cid="", name="", description="", file_name="", - file_path=""): + def _new_collection(self, cid: str = "", name: str = "", description: str = "", + file_name: str = "", file_path: str = "") -> dict: return { "id": cid, "name": name, @@ -65,14 +66,14 @@ class Indicators: "count": 0, } - def _add_indicator(self, ioc, ioc_coll, ioc_coll_list): + def _add_indicator(self, ioc: str, ioc_coll: dict, ioc_coll_list: list) -> None: ioc = ioc.strip("'") if ioc not in ioc_coll_list: ioc_coll_list.append(ioc) ioc_coll["count"] += 1 self.total_ioc_count += 1 - def parse_stix2(self, file_path): + def parse_stix2(self, file_path: str) -> None: """Extract indicators from a STIX2 file. :param file_path: Path to the STIX2 file to parse @@ -178,7 +179,7 @@ class Indicators: self.ioc_collections.extend(collections) - def load_indicators_files(self, files, load_default=True): + def load_indicators_files(self, files: list, load_default: bool = True) -> None: """ Load a list of indicators files. """ @@ -196,7 +197,7 @@ class Indicators: self._check_stix2_env_variable() self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count) - def get_iocs(self, ioc_type): + def get_iocs(self, ioc_type: str) -> dict: for ioc_collection in self.ioc_collections: for ioc in ioc_collection.get(ioc_type, []): yield { @@ -206,7 +207,7 @@ class Indicators: "stix2_file_name": ioc_collection["stix2_file_name"], } - def check_domain(self, url): + def check_domain(self, url: str) -> dict: """Check if a given URL matches any of the provided domain indicators. :param url: URL to match against domain indicators @@ -278,7 +279,7 @@ class Indicators: return ioc - def check_domains(self, urls): + def check_domains(self, urls: list) -> dict: """Check a list of URLs against the provided list of domain indicators. :param urls: List of URLs to check against domain indicators @@ -294,7 +295,7 @@ class Indicators: if check: return check - def check_process(self, process): + def check_process(self, process: str) -> dict: """Check the provided process name against the list of process indicators. @@ -319,7 +320,7 @@ class Indicators: process, ioc["name"]) return ioc - def check_processes(self, processes): + def check_processes(self, processes: list) -> dict: """Check the provided list of processes against the list of process indicators. @@ -336,7 +337,7 @@ class Indicators: if check: return check - def check_email(self, email): + def check_email(self, email: str) -> dict: """Check the provided email against the list of email indicators. :param email: Email address to check against email indicators @@ -353,7 +354,7 @@ class Indicators: email, ioc["name"]) return ioc - def check_file_name(self, file_name): + def check_file_name(self, file_name: str) -> dict: """Check the provided file name against the list of file indicators. :param file_name: File name to check against file @@ -371,7 +372,7 @@ class Indicators: file_name, ioc["name"]) return ioc - def check_file_path(self, file_path): + def check_file_path(self, file_path: str) -> dict: """Check the provided file path against the list of file indicators (both path and name). :param file_path: File path or file name to check against file @@ -394,7 +395,7 @@ class Indicators: file_path, ioc["name"]) return ioc - def check_profile(self, profile_uuid): + def check_profile(self, profile_uuid: str) -> dict: """Check the provided configuration profile UUID against the list of indicators. :param profile_uuid: Profile UUID to check against configuration profile indicators @@ -411,7 +412,7 @@ class Indicators: profile_uuid, ioc["name"]) return ioc - def check_file_hash(self, file_hash): + def check_file_hash(self, file_hash: str) -> dict: """Check the provided SHA256 file hash against the list of indicators. :param file_hash: SHA256 hash to check @@ -428,7 +429,7 @@ class Indicators: file_hash, ioc["name"]) return ioc - def check_app_id(self, app_id): + def check_app_id(self, app_id: str) -> dict: """Check the provided app identifier (typically an Android package name) against the list of indicators. @@ -447,7 +448,7 @@ class Indicators: return ioc -def download_indicators_files(log): +def download_indicators_files(log: logging.Logger) -> None: """ Download indicators from repo into MVT app data directory. """ diff --git a/mvt/common/logo.py b/mvt/common/logo.py index c776af7..908ca3b 100644 --- a/mvt/common/logo.py +++ b/mvt/common/logo.py @@ -9,7 +9,7 @@ from .updates import check_for_updates from .version import MVT_VERSION -def logo(): +def logo() -> None: print("\n") print("\t[bold]MVT[/bold] - Mobile Verification Toolkit") print("\t\thttps://mvt.re") diff --git a/mvt/common/module.py b/mvt/common/module.py index 59855c9..2745a26 100644 --- a/mvt/common/module.py +++ b/mvt/common/module.py @@ -4,8 +4,10 @@ # https://license.mvt.re/1.1/ import csv +import logging import os import re +from typing import Callable import simplejson as json @@ -28,8 +30,9 @@ class MVTModule(object): enabled = True slug = None - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=None): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = None): """Initialize module. :param file_path: Path to the module's database file, if there is any @@ -56,7 +59,7 @@ class MVTModule(object): self.timeline_detected = [] @classmethod - def from_json(cls, json_path, log=None): + def from_json(cls, json_path: str, log: logging.Logger = None): with open(json_path, "r", encoding="utf-8") as handle: results = json.load(handle) if log: @@ -72,7 +75,7 @@ class MVTModule(object): sub = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", self.__class__.__name__) return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower() - def check_indicators(self): + def check_indicators(self) -> None: """Check the results of this module against a provided list of indicators. @@ -80,7 +83,7 @@ class MVTModule(object): """ raise NotImplementedError - def save_to_json(self): + def save_to_json(self) -> None: """Save the collected results to a json file.""" if not self.results_path: return @@ -103,11 +106,11 @@ class MVTModule(object): with open(detected_json_path, "w", encoding="utf-8") as handle: json.dump(self.detected, handle, indent=4, default=str) - def serialize(self, record): + def serialize(self, record: dict) -> None: raise NotImplementedError @staticmethod - def _deduplicate_timeline(timeline): + def _deduplicate_timeline(timeline: list) -> list: """Serialize entry as JSON to deduplicate repeated entries :param timeline: List of entries from timeline to deduplicate @@ -118,7 +121,7 @@ class MVTModule(object): timeline_set.add(json.dumps(record, sort_keys=True)) return [json.loads(record) for record in timeline_set] - def to_timeline(self): + def to_timeline(self) -> None: """Convert results into a timeline.""" for result in self.results: record = self.serialize(result) @@ -140,12 +143,12 @@ class MVTModule(object): self.timeline = self._deduplicate_timeline(self.timeline) self.timeline_detected = self._deduplicate_timeline(self.timeline_detected) - def run(self): + def run(self) -> None: """Run the main module procedure.""" raise NotImplementedError -def run_module(module): +def run_module(module: Callable): module.log.info("Running module %s...", module.__class__.__name__) try: @@ -184,7 +187,7 @@ def run_module(module): module.save_to_json() -def save_timeline(timeline, timeline_path): +def save_timeline(timeline: list, timeline_path: str): """Save the timeline in a csv file. :param timeline: List of records to order and store diff --git a/mvt/common/updates.py b/mvt/common/updates.py index 8bdb267..907090f 100644 --- a/mvt/common/updates.py +++ b/mvt/common/updates.py @@ -9,7 +9,7 @@ from packaging import version from .version import MVT_VERSION -def check_for_updates(): +def check_for_updates() -> None: res = requests.get("https://pypi.org/pypi/mvt/json") data = res.json() latest_version = data.get("info", {}).get("version", "") diff --git a/mvt/common/url.py b/mvt/common/url.py index 8efa073..2e93c4e 100644 --- a/mvt/common/url.py +++ b/mvt/common/url.py @@ -253,7 +253,7 @@ SHORTENER_DOMAINS = [ class URL: - def __init__(self, url): + def __init__(self, url: str) -> None: if type(url) == bytes: url = url.decode() @@ -262,7 +262,7 @@ class URL: self.top_level = self.get_top_level() self.is_shortened = False - def get_domain(self): + def get_domain(self) -> None: """Get the domain from a URL. :param url: URL to parse @@ -273,11 +273,13 @@ class URL: """ # TODO: Properly handle exception. try: - return get_tld(self.url, as_object=True, fix_protocol=True).parsed_url.netloc.lower().lstrip("www.") + return get_tld(self.url, + as_object=True, + fix_protocol=True).parsed_url.netloc.lower().lstrip("www.") except Exception: return None - def get_top_level(self): + def get_top_level(self) -> None: """Get only the top-level domain from a URL. :param url: URL to parse @@ -306,7 +308,7 @@ class URL: return self.is_shortened - def unshorten(self): + def unshorten(self) -> None: """Unshorten the URL by requesting an HTTP HEAD response.""" res = requests.head(self.url) if str(res.status_code).startswith("30"): diff --git a/mvt/common/utils.py b/mvt/common/utils.py index 4b26ee7..0f531d8 100644 --- a/mvt/common/utils.py +++ b/mvt/common/utils.py @@ -8,7 +8,7 @@ import hashlib import re -def convert_mactime_to_unix(timestamp, from_2001=True): +def convert_mactime_to_unix(timestamp, from_2001: bool = True): """Converts Mac Standard Time to a Unix timestamp. :param timestamp: MacTime timestamp (either int or float). diff --git a/mvt/common/virustotal.py b/mvt/common/virustotal.py index 4e392f8..16cb9ac 100644 --- a/mvt/common/virustotal.py +++ b/mvt/common/virustotal.py @@ -21,7 +21,7 @@ class VTQuotaExceeded(Exception): pass -def virustotal_lookup(file_hash): +def virustotal_lookup(file_hash: str): if MVT_VT_API_KEY not in os.environ: raise VTNoKey("No VirusTotal API key provided: to use VirusTotal lookups please provide your API key with `export MVT_VT_API_KEY=`") diff --git a/mvt/ios/cmd_check_backup.py b/mvt/ios/cmd_check_backup.py index 6b1f41d..72cd892 100644 --- a/mvt/ios/cmd_check_backup.py +++ b/mvt/ios/cmd_check_backup.py @@ -18,8 +18,9 @@ class CmdIOSCheckBackup(Command): name = "check-backup" modules = BACKUP_MODULES + MIXED_MODULES - def __init__(self, target_path=None, results_path=None, ioc_files=[], - module_name=None, serial=None, fast_mode=False): + def __init__(self, target_path: str = None, results_path: str = None, + ioc_files: list = [], module_name: str = None, serial: str = None, + fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) diff --git a/mvt/ios/cmd_check_fs.py b/mvt/ios/cmd_check_fs.py index 46a2961..21dc9fe 100644 --- a/mvt/ios/cmd_check_fs.py +++ b/mvt/ios/cmd_check_fs.py @@ -18,8 +18,9 @@ class CmdIOSCheckFS(Command): name = "check-fs" modules = FS_MODULES + MIXED_MODULES - def __init__(self, target_path=None, results_path=None, ioc_files=[], - module_name=None, serial=None, fast_mode=False): + def __init__(self, target_path: str = None, results_path: str = None, + ioc_files: list = [], module_name: str = None, serial: str = None, + fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) diff --git a/mvt/ios/decrypt.py b/mvt/ios/decrypt.py index be47e95..c3eb285 100644 --- a/mvt/ios/decrypt.py +++ b/mvt/ios/decrypt.py @@ -24,7 +24,7 @@ class DecryptBackup: """ - def __init__(self, backup_path, dest_path=None): + def __init__(self, backup_path: str, dest_path: str = None) -> None: """Decrypts an encrypted iOS backup. :param backup_path: Path to the encrypted backup folder :param dest_path: Path to the folder where to store the decrypted backup @@ -38,7 +38,7 @@ class DecryptBackup: return self._backup is not None @staticmethod - def is_encrypted(backup_path) -> bool: + def is_encrypted(backup_path: str) -> bool: """Query Manifest.db file to see if it's encrypted or not. :param backup_path: Path to the backup to decrypt @@ -54,13 +54,14 @@ class DecryptBackup: log.critical("The backup does not seem encrypted!") return False - def _process_file(self, relative_path, domain, item, file_id, item_folder): + def _process_file(self, relative_path: str, domain: str, item, + file_id: str, item_folder: str) -> None: self._backup.getFileDecryptedCopy(manifestEntry=item, targetName=file_id, targetFolder=item_folder) log.info("Decrypted file %s [%s] to %s/%s", relative_path, domain, item_folder, file_id) - def process_backup(self): + def process_backup(self) -> None: if not os.path.exists(self.dest_path): os.makedirs(self.dest_path) @@ -111,7 +112,7 @@ class DecryptBackup: shutil.copy(os.path.join(self.backup_path, file_name), self.dest_path) - def decrypt_with_password(self, password): + def decrypt_with_password(self, password: str) -> None: """Decrypts an encrypted iOS backup. :param password: Password to use to decrypt the original backup @@ -149,7 +150,7 @@ class DecryptBackup: log.exception(e) log.critical("Failed to decrypt backup. Did you provide the correct password? Did you point to the right backup path?") - def decrypt_with_key_file(self, key_file): + def decrypt_with_key_file(self, key_file: str) -> None: """Decrypts an encrypted iOS backup using a key file. :param key_file: File to read the key bytes to decrypt the backup @@ -179,7 +180,7 @@ class DecryptBackup: log.exception(e) log.critical("Failed to decrypt backup. Did you provide the correct key file?") - def get_key(self): + def get_key(self) -> None: """Retrieve and prints the encryption key.""" if not self._backup: return @@ -188,7 +189,7 @@ class DecryptBackup: log.info("Derived decryption key for backup at path %s is: \"%s\"", self.backup_path, self._decryption_key) - def write_key(self, key_path): + def write_key(self, key_path: str) -> None: """Save extracted key to file. :param key_path: Path to the file where to write the derived decryption key. diff --git a/mvt/ios/modules/backup/backup_info.py b/mvt/ios/modules/backup/backup_info.py index 1608d89..0be3090 100644 --- a/mvt/ios/modules/backup/backup_info.py +++ b/mvt/ios/modules/backup/backup_info.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import os import plistlib @@ -15,15 +16,16 @@ from ..base import IOSExtraction class BackupInfo(IOSExtraction): """This module extracts information about the device and the backup.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = {} - def run(self): + def run(self) -> None: info_path = os.path.join(self.target_path, "Info.plist") if not os.path.exists(info_path): raise DatabaseNotFoundError("No Info.plist at backup path, unable to extract device information") diff --git a/mvt/ios/modules/backup/configuration_profiles.py b/mvt/ios/modules/backup/configuration_profiles.py index 98cf9e1..ae3472f 100644 --- a/mvt/ios/modules/backup/configuration_profiles.py +++ b/mvt/ios/modules/backup/configuration_profiles.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import os import plistlib from base64 import b64encode @@ -17,13 +18,14 @@ CONF_PROFILES_DOMAIN = "SysSharedContainerDomain-systemgroup.com.apple.configura class ConfigurationProfiles(IOSExtraction): """This module extracts the full plist data from configuration profiles.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: if not record["install_date"]: return @@ -36,7 +38,7 @@ class ConfigurationProfiles(IOSExtraction): "data": f"{record['plist']['PayloadType']} installed: {record['plist']['PayloadUUID']} - {payload_name}: {payload_description}" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -58,7 +60,7 @@ class ConfigurationProfiles(IOSExtraction): self.detected.append(result) continue - def run(self): + def run(self) -> None: for conf_file in self._get_backup_files_from_manifest(domain=CONF_PROFILES_DOMAIN): conf_rel_path = conf_file["relative_path"] # Filter out all configuration files that are not configuration profiles. diff --git a/mvt/ios/modules/backup/manifest.py b/mvt/ios/modules/backup/manifest.py index fc7e75a..591fa01 100644 --- a/mvt/ios/modules/backup/manifest.py +++ b/mvt/ios/modules/backup/manifest.py @@ -5,6 +5,7 @@ import datetime import io +import logging import os import plistlib import sqlite3 @@ -18,8 +19,9 @@ from ..base import IOSExtraction class Manifest(IOSExtraction): """This module extracts information from a backup Manifest.db file.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) @@ -47,7 +49,7 @@ class Manifest(IOSExtraction): timestamp = datetime.datetime.utcfromtimestamp(timestamp_or_unix_time_int) return convert_timestamp_to_iso(timestamp) - def serialize(self, record): + def serialize(self, record: dict) -> None: records = [] if "modified" not in record or "status_changed" not in record: return @@ -67,7 +69,7 @@ class Manifest(IOSExtraction): return records - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -92,7 +94,7 @@ class Manifest(IOSExtraction): ioc["value"], rel_path) self.detected.append(result) - def run(self): + def run(self) -> None: manifest_db_path = os.path.join(self.target_path, "Manifest.db") if not os.path.isfile(manifest_db_path): raise DatabaseNotFoundError("unable to find backup's Manifest.db") diff --git a/mvt/ios/modules/backup/profile_events.py b/mvt/ios/modules/backup/profile_events.py index 81036d4..c23ed2d 100644 --- a/mvt/ios/modules/backup/profile_events.py +++ b/mvt/ios/modules/backup/profile_events.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import plistlib from mvt.common.utils import convert_timestamp_to_iso @@ -18,14 +19,14 @@ class ProfileEvents(IOSExtraction): """ - - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record.get("timestamp"), "module": self.__class__.__name__, @@ -33,7 +34,7 @@ class ProfileEvents(IOSExtraction): "data": f"Process {record.get('process')} started operation {record.get('operation')} of profile {record.get('profile_id')}" } - def run(self): + def run(self) -> None: for events_file in self._get_backup_files_from_manifest(relative_path=CONF_PROFILES_EVENTS_RELPATH): events_file_path = self._get_backup_file_from_id(events_file["file_id"]) if not events_file_path: diff --git a/mvt/ios/modules/base.py b/mvt/ios/modules/base.py index e542877..fbe0451 100644 --- a/mvt/ios/modules/base.py +++ b/mvt/ios/modules/base.py @@ -4,6 +4,7 @@ # https://license.mvt.re/1.1/ import glob +import logging import os import shutil import sqlite3 @@ -16,8 +17,9 @@ from mvt.common.module import (DatabaseCorruptedError, DatabaseNotFoundError, class IOSExtraction(MVTModule): """This class provides a base for all iOS filesystem/backup extraction modules.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) diff --git a/mvt/ios/modules/fs/analytics.py b/mvt/ios/modules/fs/analytics.py index 7761d60..d9bae40 100644 --- a/mvt/ios/modules/fs/analytics.py +++ b/mvt/ios/modules/fs/analytics.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import plistlib import sqlite3 @@ -18,13 +19,14 @@ ANALYTICS_DB_PATH = [ class Analytics(IOSExtraction): """This module extracts information from the private/var/Keychains/Analytics/*.db files.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["timestamp"], "module": self.__class__.__name__, @@ -32,7 +34,7 @@ class Analytics(IOSExtraction): "data": f"{record}", } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -117,7 +119,7 @@ class Analytics(IOSExtraction): self.log.info("Extracted information on %d analytics data from %s", len(self.results), artifact) - def run(self): + def run(self) -> None: for file_path in self._get_fs_files_from_patterns(ANALYTICS_DB_PATH): self.file_path = file_path self.log.info("Found Analytics database file at path: %s", file_path) diff --git a/mvt/ios/modules/fs/cache_files.py b/mvt/ios/modules/fs/cache_files.py index 0277449..00d9b46 100644 --- a/mvt/ios/modules/fs/cache_files.py +++ b/mvt/ios/modules/fs/cache_files.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import os import sqlite3 @@ -11,13 +12,14 @@ from ..base import IOSExtraction class CacheFiles(IOSExtraction): - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: records = [] for item in self.results[record]: records.append({ @@ -29,7 +31,7 @@ class CacheFiles(IOSExtraction): return records - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -69,7 +71,7 @@ class CacheFiles(IOSExtraction): "isodate": row[5], }) - def run(self): + def run(self) -> None: self.results = {} for root, dirs, files in os.walk(self.target_path): for file_name in files: diff --git a/mvt/ios/modules/fs/filesystem.py b/mvt/ios/modules/fs/filesystem.py index 538fa30..e3c230d 100644 --- a/mvt/ios/modules/fs/filesystem.py +++ b/mvt/ios/modules/fs/filesystem.py @@ -4,6 +4,7 @@ # https://license.mvt.re/1.1/ import datetime +import logging import os from mvt.common.utils import convert_timestamp_to_iso @@ -18,13 +19,14 @@ class Filesystem(IOSExtraction): """ - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["modified"], "module": self.__class__.__name__, @@ -32,7 +34,7 @@ class Filesystem(IOSExtraction): "data": record["path"], } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -57,7 +59,7 @@ class Filesystem(IOSExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: for root, dirs, files in os.walk(self.target_path): for dir_name in dirs: try: diff --git a/mvt/ios/modules/fs/net_netusage.py b/mvt/ios/modules/fs/net_netusage.py index c860a96..c0a8bc0 100644 --- a/mvt/ios/modules/fs/net_netusage.py +++ b/mvt/ios/modules/fs/net_netusage.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from ..net_base import NetBase @@ -20,13 +21,14 @@ class Netusage(NetBase): """ - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def run(self): + def run(self) -> None: for netusage_path in self._get_fs_files_from_patterns(NETUSAGE_ROOT_PATHS): self.file_path = netusage_path self.log.info("Found NetUsage database at path: %s", self.file_path) diff --git a/mvt/ios/modules/fs/safari_favicon.py b/mvt/ios/modules/fs/safari_favicon.py index 941ab5d..8158fc0 100644 --- a/mvt/ios/modules/fs/safari_favicon.py +++ b/mvt/ios/modules/fs/safari_favicon.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso @@ -18,13 +19,14 @@ SAFARI_FAVICON_ROOT_PATHS = [ class SafariFavicon(IOSExtraction): """This module extracts all Safari favicon records.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -32,7 +34,7 @@ class SafariFavicon(IOSExtraction): "data": f"Safari favicon from {record['url']} with icon URL {record['icon_url']} ({record['type']})", } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -92,7 +94,7 @@ class SafariFavicon(IOSExtraction): cur.close() conn.close() - def run(self): + def run(self) -> None: for file_path in self._get_fs_files_from_patterns(SAFARI_FAVICON_ROOT_PATHS): self.log.info("Found Safari favicon cache database at path: %s", file_path) self._process_favicon_db(file_path) diff --git a/mvt/ios/modules/fs/shutdownlog.py b/mvt/ios/modules/fs/shutdownlog.py index ef9d170..2bd648c 100644 --- a/mvt/ios/modules/fs/shutdownlog.py +++ b/mvt/ios/modules/fs/shutdownlog.py @@ -3,6 +3,8 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging + from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso from ..base import IOSExtraction @@ -15,13 +17,14 @@ SHUTDOWN_LOG_PATH = [ class ShutdownLog(IOSExtraction): """This module extracts processes information from the shutdown log file.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -29,7 +32,7 @@ class ShutdownLog(IOSExtraction): "data": f"Client {record['client']} with PID {record['pid']} was running when the device was shut down", } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -83,7 +86,7 @@ class ShutdownLog(IOSExtraction): self.results = sorted(self.results, key=lambda entry: entry["isodate"]) - def run(self): + def run(self) -> None: self._find_ios_database(root_paths=SHUTDOWN_LOG_PATH) self.log.info("Found shutdown log at path: %s", self.file_path) with open(self.file_path, "r", encoding="utf-8") as handle: diff --git a/mvt/ios/modules/fs/version_history.py b/mvt/ios/modules/fs/version_history.py index be5bd4c..9073e8d 100644 --- a/mvt/ios/modules/fs/version_history.py +++ b/mvt/ios/modules/fs/version_history.py @@ -5,6 +5,7 @@ import datetime import json +import logging from mvt.common.utils import convert_timestamp_to_iso @@ -18,13 +19,14 @@ IOS_ANALYTICS_JOURNAL_PATHS = [ class IOSVersionHistory(IOSExtraction): """This module extracts iOS update history from Analytics Journal log files.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -32,7 +34,7 @@ class IOSVersionHistory(IOSExtraction): "data": f"Recorded iOS version {record['os_version']}", } - def run(self): + def run(self) -> None: for found_path in self._get_fs_files_from_patterns(IOS_ANALYTICS_JOURNAL_PATHS): with open(found_path, "r", encoding="utf-8") as analytics_log: log_line = json.loads(analytics_log.readline().strip()) diff --git a/mvt/ios/modules/fs/webkit_base.py b/mvt/ios/modules/fs/webkit_base.py index 88b32d8..b3579df 100644 --- a/mvt/ios/modules/fs/webkit_base.py +++ b/mvt/ios/modules/fs/webkit_base.py @@ -14,7 +14,7 @@ from ..base import IOSExtraction class WebkitBase(IOSExtraction): """This class is a base for other WebKit-related modules.""" - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return diff --git a/mvt/ios/modules/fs/webkit_indexeddb.py b/mvt/ios/modules/fs/webkit_indexeddb.py index 4745e76..877593e 100644 --- a/mvt/ios/modules/fs/webkit_indexeddb.py +++ b/mvt/ios/modules/fs/webkit_indexeddb.py @@ -3,6 +3,8 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging + from .webkit_base import WebkitBase WEBKIT_INDEXEDDB_ROOT_PATHS = [ @@ -19,13 +21,14 @@ class WebkitIndexedDB(WebkitBase): slug = "webkit_indexeddb" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -33,7 +36,7 @@ class WebkitIndexedDB(WebkitBase): "data": f"IndexedDB folder {record['folder']} containing file for URL {record['url']}", } - def run(self): + def run(self) -> None: self._process_webkit_folder(WEBKIT_INDEXEDDB_ROOT_PATHS) self.log.info("Extracted a total of %d WebKit IndexedDB records", len(self.results)) diff --git a/mvt/ios/modules/fs/webkit_localstorage.py b/mvt/ios/modules/fs/webkit_localstorage.py index 323a8bb..cbd1ca3 100644 --- a/mvt/ios/modules/fs/webkit_localstorage.py +++ b/mvt/ios/modules/fs/webkit_localstorage.py @@ -3,6 +3,8 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging + from .webkit_base import WebkitBase WEBKIT_LOCALSTORAGE_ROOT_PATHS = [ @@ -17,13 +19,14 @@ class WebkitLocalStorage(WebkitBase): """ - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -31,7 +34,7 @@ class WebkitLocalStorage(WebkitBase): "data": f"WebKit Local Storage folder {record['folder']} containing file for URL {record['url']}", } - def run(self): + def run(self) -> None: self._process_webkit_folder(WEBKIT_LOCALSTORAGE_ROOT_PATHS) self.log.info("Extracted a total of %d records from WebKit Local Storages", len(self.results)) diff --git a/mvt/ios/modules/fs/webkit_safariviewservice.py b/mvt/ios/modules/fs/webkit_safariviewservice.py index 56a9863..a32e9a1 100644 --- a/mvt/ios/modules/fs/webkit_safariviewservice.py +++ b/mvt/ios/modules/fs/webkit_safariviewservice.py @@ -3,6 +3,8 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging + from .webkit_base import WebkitBase WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS = [ @@ -17,13 +19,14 @@ class WebkitSafariViewService(WebkitBase): """ - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def run(self): + def run(self) -> None: self._process_webkit_folder(WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS) self.log.info("Extracted a total of %d records from WebKit SafariViewService WebsiteData", len(self.results)) diff --git a/mvt/ios/modules/mixed/calls.py b/mvt/ios/modules/mixed/calls.py index 53688f2..03d50b4 100644 --- a/mvt/ios/modules/mixed/calls.py +++ b/mvt/ios/modules/mixed/calls.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso @@ -20,13 +21,14 @@ CALLS_ROOT_PATHS = [ class Calls(IOSExtraction): """This module extracts phone calls details""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -34,7 +36,7 @@ class Calls(IOSExtraction): "data": f"From {record['number']} using {record['provider']} during {record['duration']} seconds" } - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=CALLS_BACKUP_IDS, root_paths=CALLS_ROOT_PATHS) self.log.info("Found Calls database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/chrome_favicon.py b/mvt/ios/modules/mixed/chrome_favicon.py index 3941638..0ba2795 100644 --- a/mvt/ios/modules/mixed/chrome_favicon.py +++ b/mvt/ios/modules/mixed/chrome_favicon.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from mvt.common.utils import (convert_chrometime_to_unix, @@ -23,13 +24,14 @@ CHROME_FAVICON_ROOT_PATHS = [ class ChromeFavicon(IOSExtraction): """This module extracts all Chrome favicon records.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -37,7 +39,7 @@ class ChromeFavicon(IOSExtraction): "data": f"{record['icon_url']} from {record['url']}" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -50,7 +52,7 @@ class ChromeFavicon(IOSExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=CHROME_FAVICON_BACKUP_IDS, root_paths=CHROME_FAVICON_ROOT_PATHS) self.log.info("Found Chrome favicon cache database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/chrome_history.py b/mvt/ios/modules/mixed/chrome_history.py index c44d2c9..595de15 100644 --- a/mvt/ios/modules/mixed/chrome_history.py +++ b/mvt/ios/modules/mixed/chrome_history.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from mvt.common.utils import (convert_chrometime_to_unix, @@ -22,13 +23,14 @@ CHROME_HISTORY_ROOT_PATHS = [ class ChromeHistory(IOSExtraction): """This module extracts all Chome visits.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -36,7 +38,7 @@ class ChromeHistory(IOSExtraction): "data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, redirect source: {record['redirect_source']})" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -46,7 +48,7 @@ class ChromeHistory(IOSExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=CHROME_HISTORY_BACKUP_IDS, root_paths=CHROME_HISTORY_ROOT_PATHS) self.log.info("Found Chrome history database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/contacts.py b/mvt/ios/modules/mixed/contacts.py index dc2cf40..4585327 100644 --- a/mvt/ios/modules/mixed/contacts.py +++ b/mvt/ios/modules/mixed/contacts.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from ..base import IOSExtraction @@ -18,13 +19,14 @@ CONTACTS_ROOT_PATHS = [ class Contacts(IOSExtraction): """This module extracts all contact details from the phone's address book.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=CONTACTS_BACKUP_IDS, root_paths=CONTACTS_ROOT_PATHS) self.log.info("Found Contacts database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/firefox_favicon.py b/mvt/ios/modules/mixed/firefox_favicon.py index 7af9678..b8bdfa2 100644 --- a/mvt/ios/modules/mixed/firefox_favicon.py +++ b/mvt/ios/modules/mixed/firefox_favicon.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from datetime import datetime @@ -21,13 +22,14 @@ FIREFOX_HISTORY_ROOT_PATHS = [ class FirefoxFavicon(IOSExtraction): """This module extracts all Firefox favicon""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -35,7 +37,7 @@ class FirefoxFavicon(IOSExtraction): "data": f"Firefox favicon {record['url']} when visiting {record['history_url']}", } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -48,7 +50,7 @@ class FirefoxFavicon(IOSExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS, root_paths=FIREFOX_HISTORY_ROOT_PATHS) self.log.info("Found Firefox favicon database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/firefox_history.py b/mvt/ios/modules/mixed/firefox_history.py index b6f50b9..803838c 100644 --- a/mvt/ios/modules/mixed/firefox_history.py +++ b/mvt/ios/modules/mixed/firefox_history.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from datetime import datetime @@ -25,13 +26,14 @@ class FirefoxHistory(IOSExtraction): """ - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -39,7 +41,7 @@ class FirefoxHistory(IOSExtraction): "data": f"Firefox visit with ID {record['id']} to URL: {record['url']}", } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -49,7 +51,7 @@ class FirefoxHistory(IOSExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS, root_paths=FIREFOX_HISTORY_ROOT_PATHS) self.log.info("Found Firefox history database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/idstatuscache.py b/mvt/ios/modules/mixed/idstatuscache.py index 8e9df3e..45d2208 100644 --- a/mvt/ios/modules/mixed/idstatuscache.py +++ b/mvt/ios/modules/mixed/idstatuscache.py @@ -4,6 +4,7 @@ # https://license.mvt.re/1.1/ import collections +import logging import plistlib from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso @@ -22,13 +23,14 @@ IDSTATUSCACHE_ROOT_PATHS = [ class IDStatusCache(IOSExtraction): """Extracts Apple Authentication information from idstatuscache.plist""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -36,7 +38,7 @@ class IDStatusCache(IOSExtraction): "data": f"Lookup of {record['user']} within {record['package']} (Status {record['idstatus']})" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -83,7 +85,7 @@ class IDStatusCache(IOSExtraction): entry["occurrences"] = entry_counter[entry["user"]] self.results.append(entry) - def run(self): + def run(self) -> None: if self.is_backup: self._find_ios_database(backup_ids=IDSTATUSCACHE_BACKUP_IDS) diff --git a/mvt/ios/modules/mixed/interactionc.py b/mvt/ios/modules/mixed/interactionc.py index e16e58a..7f06dc7 100644 --- a/mvt/ios/modules/mixed/interactionc.py +++ b/mvt/ios/modules/mixed/interactionc.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso @@ -20,8 +21,9 @@ INTERACTIONC_ROOT_PATHS = [ class InteractionC(IOSExtraction): """This module extracts data from InteractionC db.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) @@ -39,7 +41,7 @@ class InteractionC(IOSExtraction): "last_outgoing_recipient_date", ] - def serialize(self, record): + def serialize(self, record: dict) -> None: records = [] processed = [] for ts in self.timestamps: @@ -63,7 +65,7 @@ class InteractionC(IOSExtraction): return records - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=INTERACTIONC_BACKUP_IDS, root_paths=INTERACTIONC_ROOT_PATHS) self.log.info("Found InteractionC database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/locationd.py b/mvt/ios/modules/mixed/locationd.py index c2576f3..89bffa4 100644 --- a/mvt/ios/modules/mixed/locationd.py +++ b/mvt/ios/modules/mixed/locationd.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import plistlib from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso @@ -21,8 +22,9 @@ LOCATIOND_ROOT_PATHS = [ class LocationdClients(IOSExtraction): """Extract information from apps who used geolocation.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) @@ -39,7 +41,7 @@ class LocationdClients(IOSExtraction): "BeaconRegionTimeStopped", ] - def serialize(self, record): + def serialize(self, record: dict) -> None: records = [] for timestamp in self.timestamps: if timestamp in record.keys(): @@ -52,7 +54,7 @@ class LocationdClients(IOSExtraction): return records - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -108,7 +110,7 @@ class LocationdClients(IOSExtraction): self.results.append(result) - def run(self): + def run(self) -> None: if self.is_backup: self._find_ios_database(backup_ids=LOCATIOND_BACKUP_IDS) diff --git a/mvt/ios/modules/mixed/net_datausage.py b/mvt/ios/modules/mixed/net_datausage.py index 4b63842..873e4a2 100644 --- a/mvt/ios/modules/mixed/net_datausage.py +++ b/mvt/ios/modules/mixed/net_datausage.py @@ -3,6 +3,8 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging + from ..net_base import NetBase DATAUSAGE_BACKUP_IDS = [ @@ -20,13 +22,14 @@ class Datausage(NetBase): """ - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=DATAUSAGE_BACKUP_IDS, root_paths=DATAUSAGE_ROOT_PATHS) self.log.info("Found DataUsage database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/osanalytics_addaily.py b/mvt/ios/modules/mixed/osanalytics_addaily.py index 1c12e02..58d2cc4 100644 --- a/mvt/ios/modules/mixed/osanalytics_addaily.py +++ b/mvt/ios/modules/mixed/osanalytics_addaily.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import plistlib from mvt.common.utils import convert_timestamp_to_iso @@ -20,13 +21,14 @@ OSANALYTICS_ADDAILY_ROOT_PATHS = [ class OSAnalyticsADDaily(IOSExtraction): """Extract network usage information by process, from com.apple.osanalytics.addaily.plist""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: record_data = f"{record['package']} WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \ f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}" return { @@ -36,7 +38,7 @@ class OSAnalyticsADDaily(IOSExtraction): "data": record_data, } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -46,7 +48,7 @@ class OSAnalyticsADDaily(IOSExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=OSANALYTICS_ADDAILY_BACKUP_IDS, root_paths=OSANALYTICS_ADDAILY_ROOT_PATHS) self.log.info("Found com.apple.osanalytics.addaily plist at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/safari_browserstate.py b/mvt/ios/modules/mixed/safari_browserstate.py index d1a6a7f..320f03f 100644 --- a/mvt/ios/modules/mixed/safari_browserstate.py +++ b/mvt/ios/modules/mixed/safari_browserstate.py @@ -4,6 +4,7 @@ # https://license.mvt.re/1.1/ import io +import logging import os import plistlib import sqlite3 @@ -23,15 +24,16 @@ SAFARI_BROWSER_STATE_ROOT_PATHS = [ class SafariBrowserState(IOSExtraction): """This module extracts all Safari browser state records.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self._session_history_count = 0 - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["last_viewed_timestamp"], "module": self.__class__.__name__, @@ -39,7 +41,7 @@ class SafariBrowserState(IOSExtraction): "data": f"{record['tab_title']} - {record['tab_url']}" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -118,7 +120,7 @@ class SafariBrowserState(IOSExtraction): "safari_browser_state_db": os.path.relpath(db_path, self.target_path), }) - def run(self): + def run(self) -> None: if self.is_backup: for backup_file in self._get_backup_files_from_manifest(relative_path=SAFARI_BROWSER_STATE_BACKUP_RELPATH): self.file_path = self._get_backup_file_from_id(backup_file["file_id"]) diff --git a/mvt/ios/modules/mixed/safari_history.py b/mvt/ios/modules/mixed/safari_history.py index 64c39d5..60fdae6 100644 --- a/mvt/ios/modules/mixed/safari_history.py +++ b/mvt/ios/modules/mixed/safari_history.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import os import sqlite3 @@ -25,13 +26,14 @@ class SafariHistory(IOSExtraction): """ - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -73,7 +75,7 @@ class SafariHistory(IOSExtraction): if elapsed_time.seconds == 0: self.log.warning("Redirect took less than a second! (%d milliseconds)", elapsed_ms) - def check_indicators(self): + def check_indicators(self) -> None: self._find_injections() if not self.indicators: @@ -117,7 +119,7 @@ class SafariHistory(IOSExtraction): cur.close() conn.close() - def run(self): + def run(self) -> None: if self.is_backup: for history_file in self._get_backup_files_from_manifest(relative_path=SAFARI_HISTORY_BACKUP_RELPATH): history_path = self._get_backup_file_from_id(history_file["file_id"]) diff --git a/mvt/ios/modules/mixed/shortcuts.py b/mvt/ios/modules/mixed/shortcuts.py index b9c3655..b9558b3 100644 --- a/mvt/ios/modules/mixed/shortcuts.py +++ b/mvt/ios/modules/mixed/shortcuts.py @@ -5,6 +5,7 @@ import io import itertools +import logging import plistlib import sqlite3 @@ -24,13 +25,14 @@ SHORTCUT_ROOT_PATHS = [ class Shortcuts(IOSExtraction): """This module extracts all info about SMS/iMessage attachments.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: found_urls = "" if record["action_urls"]: found_urls = "- URLs in actions: {}".format(", ".join(record["action_urls"])) @@ -50,7 +52,7 @@ class Shortcuts(IOSExtraction): "data": f"iOS Shortcut '{record['shortcut_name'].decode('utf-8')}': {desc} {found_urls}" }] - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -60,7 +62,7 @@ class Shortcuts(IOSExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=SHORTCUT_BACKUP_IDS, root_paths=SHORTCUT_ROOT_PATHS) self.log.info("Found Shortcuts database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/sms.py b/mvt/ios/modules/mixed/sms.py index 5a88256..01ebb85 100644 --- a/mvt/ios/modules/mixed/sms.py +++ b/mvt/ios/modules/mixed/sms.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from base64 import b64encode @@ -22,13 +23,14 @@ SMS_ROOT_PATHS = [ class SMS(IOSExtraction): """This module extracts all SMS messages containing links.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: text = record["text"].replace("\n", "\\n") return { "timestamp": record["isodate"], @@ -37,7 +39,7 @@ class SMS(IOSExtraction): "data": f"{record['service']}: {record['guid']} \"{text}\" from {record['phone_number']} ({record['account']})" } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -48,7 +50,7 @@ class SMS(IOSExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS) self.log.info("Found SMS database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/sms_attachments.py b/mvt/ios/modules/mixed/sms_attachments.py index 0f6ba39..13a64ed 100644 --- a/mvt/ios/modules/mixed/sms_attachments.py +++ b/mvt/ios/modules/mixed/sms_attachments.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from base64 import b64encode @@ -21,13 +22,14 @@ SMS_ROOT_PATHS = [ class SMSAttachments(IOSExtraction): """This module extracts all info about SMS/iMessage attachments.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -36,7 +38,7 @@ class SMSAttachments(IOSExtraction): f"with {record['total_bytes']} bytes (is_sticker: {record['is_sticker']}, has_user_info: {record['has_user_info']})" } - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS) self.log.info("Found SMS database at path: %s", self.file_path) diff --git a/mvt/ios/modules/mixed/tcc.py b/mvt/ios/modules/mixed/tcc.py index 226a0d3..de1a565 100644 --- a/mvt/ios/modules/mixed/tcc.py +++ b/mvt/ios/modules/mixed/tcc.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import sqlite3 from datetime import datetime @@ -47,13 +48,14 @@ AUTH_REASONS = { class TCC(IOSExtraction): """This module extracts records from the TCC.db SQLite database.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: if "last_modified" in record: if "allowed_value" in record: msg = f"Access to {record['service']} by {record['client']} {record['allowed_value']}" @@ -66,7 +68,7 @@ class TCC(IOSExtraction): "data": msg } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -156,7 +158,7 @@ class TCC(IOSExtraction): cur.close() conn.close() - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=TCC_BACKUP_IDS, root_paths=TCC_ROOT_PATHS) self.log.info("Found TCC database at path: %s", self.file_path) self.process_db(self.file_path) diff --git a/mvt/ios/modules/mixed/webkit_resource_load_statistics.py b/mvt/ios/modules/mixed/webkit_resource_load_statistics.py index 12fc08c..405beef 100644 --- a/mvt/ios/modules/mixed/webkit_resource_load_statistics.py +++ b/mvt/ios/modules/mixed/webkit_resource_load_statistics.py @@ -4,6 +4,7 @@ # https://license.mvt.re/1.1/ import datetime +import logging import os import sqlite3 @@ -22,15 +23,16 @@ class WebkitResourceLoadStatistics(IOSExtraction): """This module extracts records from WebKit ResourceLoadStatistics observations.db.""" # TODO: Add serialize(). - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) self.results = {} if not results else results - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -73,7 +75,7 @@ class WebkitResourceLoadStatistics(IOSExtraction): if len(self.results[key]) > 0: self.log.info("Extracted a total of %d records from %s", len(self.results[key]), db_path) - def run(self): + def run(self) -> None: if self.is_backup: try: for backup_file in self._get_backup_files_from_manifest(relative_path=WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH): diff --git a/mvt/ios/modules/mixed/webkit_session_resource_log.py b/mvt/ios/modules/mixed/webkit_session_resource_log.py index 5106d86..8f28ceb 100644 --- a/mvt/ios/modules/mixed/webkit_session_resource_log.py +++ b/mvt/ios/modules/mixed/webkit_session_resource_log.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import os import plistlib @@ -29,8 +30,9 @@ class WebkitSessionResourceLog(IOSExtraction): """ - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) @@ -51,7 +53,7 @@ class WebkitSessionResourceLog(IOSExtraction): return domains - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -117,7 +119,7 @@ class WebkitSessionResourceLog(IOSExtraction): return items - def run(self): + def run(self) -> None: if self.is_backup: for log_file in self._get_backup_files_from_manifest(relative_path=WEBKIT_SESSION_RESOURCE_LOG_BACKUP_RELPATH): log_path = self._get_backup_file_from_id(log_file["file_id"]) diff --git a/mvt/ios/modules/mixed/whatsapp.py b/mvt/ios/modules/mixed/whatsapp.py index 6e288dc..d98c3e3 100644 --- a/mvt/ios/modules/mixed/whatsapp.py +++ b/mvt/ios/modules/mixed/whatsapp.py @@ -24,13 +24,14 @@ WHATSAPP_ROOT_PATHS = [ class Whatsapp(IOSExtraction): """This module extracts all WhatsApp messages containing links.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record): + def serialize(self, record: dict) -> None: text = record.get("ZTEXT", "").replace("\n", "\\n") links_text = "" if record["links"]: @@ -43,7 +44,7 @@ class Whatsapp(IOSExtraction): "data": f"\'{text}\' from {record.get('ZFROMJID', 'Unknown')}{links_text}", } - def check_indicators(self): + def check_indicators(self) -> None: if not self.indicators: return @@ -53,7 +54,7 @@ class Whatsapp(IOSExtraction): result["matched_indicator"] = ioc self.detected.append(result) - def run(self): + def run(self) -> None: self._find_ios_database(backup_ids=WHATSAPP_BACKUP_IDS, root_paths=WHATSAPP_ROOT_PATHS) self.log.info("Found WhatsApp database at path: %s", self.file_path) diff --git a/mvt/ios/modules/net_base.py b/mvt/ios/modules/net_base.py index 5cb53ed..3b775c1 100644 --- a/mvt/ios/modules/net_base.py +++ b/mvt/ios/modules/net_base.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import operator import sqlite3 from pathlib import Path @@ -15,8 +16,9 @@ from .base import IOSExtraction class NetBase(IOSExtraction): """This class provides a base for DataUsage and NetUsage extraction modules.""" - def __init__(self, file_path=None, target_path=None, results_path=None, - fast_mode=False, log=None, results=[]): + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = None, results: list = []) -> None: super().__init__(file_path=file_path, target_path=target_path, results_path=results_path, fast_mode=fast_mode, log=log, results=results) @@ -78,7 +80,7 @@ class NetBase(IOSExtraction): self.log.info("Extracted information on %d processes", len(self.results)) - def serialize(self, record): + def serialize(self, record: dict) -> None: record_data = f"{record['proc_name']} (Bundle ID: {record['bundle_id']}, ID: {record['proc_id']})" record_data_usage = record_data + f" WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \ f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}" @@ -214,7 +216,7 @@ class NetBase(IOSExtraction): self.results = sorted(self.results, key=operator.itemgetter("first_isodate")) - def check_indicators(self): + def check_indicators(self) -> None: # Check for manipulated process records. # TODO: Catching KeyError for live_isodate for retro-compatibility. # This is not very good. diff --git a/mvt/ios/versions.py b/mvt/ios/versions.py index 537e360..a6b4e0b 100644 --- a/mvt/ios/versions.py +++ b/mvt/ios/versions.py @@ -244,18 +244,19 @@ IPHONE_IOS_VERSIONS = [ ] -def get_device_desc_from_id(identifier, devices_list=IPHONE_MODELS): +def get_device_desc_from_id(identifier: str, + devices_list: list = IPHONE_MODELS) -> str: for model in IPHONE_MODELS: if identifier == model["identifier"]: return model["description"] -def find_version_by_build(build): +def find_version_by_build(build: str) -> str: build = build.upper() for version in IPHONE_IOS_VERSIONS: if build == version["build"]: return version["version"] -def latest_ios_version(): +def latest_ios_version() -> str: return IPHONE_IOS_VERSIONS[-1]