From 7024909e05e1bf5f864f1221f2e4a7b5e5fd0762 Mon Sep 17 00:00:00 2001 From: Nex Date: Wed, 22 Jun 2022 16:53:29 +0200 Subject: [PATCH 1/3] Adding more type hints --- mvt/android/modules/adb/base.py | 29 +++++++++++++---------- mvt/android/modules/adb/chrome_history.py | 2 +- mvt/android/modules/adb/files.py | 4 ++-- mvt/android/modules/adb/packages.py | 6 ++--- mvt/android/modules/adb/sms.py | 4 ++-- mvt/android/modules/adb/whatsapp.py | 2 +- mvt/android/modules/backup/base.py | 10 ++++---- mvt/android/modules/bugreport/base.py | 13 +++++----- mvt/android/modules/bugreport/packages.py | 4 ++-- mvt/common/module.py | 6 ++--- mvt/common/updates.py | 4 ++-- mvt/common/utils.py | 10 ++++---- 12 files changed, 51 insertions(+), 43 deletions(-) diff --git a/mvt/android/modules/adb/base.py b/mvt/android/modules/adb/base.py index ff8f306..5dafc9d 100644 --- a/mvt/android/modules/adb/base.py +++ b/mvt/android/modules/adb/base.py @@ -12,6 +12,7 @@ import string import sys import tempfile import time +from typing import Callable from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb from adb_shell.auth.keygen import keygen, write_public_keyfile @@ -44,7 +45,7 @@ class AndroidExtraction(MVTModule): self.serial = None @staticmethod - def _adb_check_keys(): + def _adb_check_keys() -> None: """Make sure Android adb keys exist.""" if not os.path.isdir(os.path.dirname(ADB_KEY_PATH)): os.makedirs(os.path.dirname(ADB_KEY_PATH)) @@ -55,7 +56,7 @@ class AndroidExtraction(MVTModule): if not os.path.exists(ADB_PUB_KEY_PATH): write_public_keyfile(ADB_KEY_PATH, ADB_PUB_KEY_PATH) - def _adb_connect(self): + def _adb_connect(self) -> None: """Connect to the device over adb.""" self._adb_check_keys() @@ -104,17 +105,17 @@ class AndroidExtraction(MVTModule): else: break - def _adb_disconnect(self): + def _adb_disconnect(self) -> None: """Close adb connection to the device.""" self.device.close() - def _adb_reconnect(self): + def _adb_reconnect(self) -> None: """Reconnect to device using adb.""" log.info("Reconnecting ...") self._adb_disconnect() self._adb_connect() - def _adb_command(self, command): + def _adb_command(self, command: str) -> str: """Execute an adb shell command. :param command: Shell command to execute @@ -123,7 +124,7 @@ class AndroidExtraction(MVTModule): """ return self.device.shell(command, read_timeout_s=200.0) - def _adb_check_if_root(self): + def _adb_check_if_root(self) -> bool: """Check if we have a `su` binary on the Android device. @@ -132,7 +133,7 @@ class AndroidExtraction(MVTModule): """ return bool(self._adb_command("command -v su")) - def _adb_root_or_die(self): + def _adb_root_or_die(self) -> None: """Check if we have a `su` binary, otherwise raise an Exception.""" if not self._adb_check_if_root(): raise InsufficientPrivileges("This module is optionally available in case the device is already rooted. Do NOT root your own device!") @@ -146,7 +147,7 @@ class AndroidExtraction(MVTModule): """ return self._adb_command(f"su -c {command}") - def _adb_check_file_exists(self, file): + def _adb_check_file_exists(self, file: str) -> bool: """Verify that a file exists. :param file: Path of the file @@ -163,7 +164,9 @@ class AndroidExtraction(MVTModule): return bool(self._adb_command_as_root(f"[ ! -f {file} ] || echo 1")) - def _adb_download(self, remote_path, local_path, progress_callback=None, retry_root=True): + def _adb_download(self, remote_path: str, local_path: str, + progress_callback: Callable = None, + retry_root: bool = True) -> None: """Download a file form the device. :param remote_path: Path to download from the device @@ -180,7 +183,8 @@ class AndroidExtraction(MVTModule): else: raise Exception(f"Unable to download file {remote_path}: {e}") - def _adb_download_root(self, remote_path, local_path, progress_callback=None): + def _adb_download_root(self, remote_path: str, local_path: str, + progress_callback: Callable = None) -> None: try: # Check if we have root, if not raise an Exception. self._adb_root_or_die() @@ -208,7 +212,8 @@ class AndroidExtraction(MVTModule): except AdbCommandFailureException as e: raise Exception(f"Unable to download file {remote_path}: {e}") - def _adb_process_file(self, remote_path, process_routine): + def _adb_process_file(self, remote_path: str, + process_routine: Callable) -> None: """Download a local copy of a file which is only accessible as root. This is a wrapper around process_routine. @@ -248,7 +253,7 @@ class AndroidExtraction(MVTModule): # Disconnect from the device. self._adb_disconnect() - def _generate_backup(self, package_name): + def _generate_backup(self, package_name: str) -> bytes: self.log.warning("Please check phone and accept Android backup prompt. You may need to set a backup password. \a") # TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... diff --git a/mvt/android/modules/adb/chrome_history.py b/mvt/android/modules/adb/chrome_history.py index 71fc57c..25ce7b8 100644 --- a/mvt/android/modules/adb/chrome_history.py +++ b/mvt/android/modules/adb/chrome_history.py @@ -43,7 +43,7 @@ class ChromeHistory(AndroidExtraction): if self.indicators.check_domain(result["url"]): self.detected.append(result) - def _parse_db(self, db_path): + def _parse_db(self, db_path: str) -> None: """Parse a Chrome History database file. :param db_path: Path to the History database to process. diff --git a/mvt/android/modules/adb/files.py b/mvt/android/modules/adb/files.py index bc326d0..b61f677 100644 --- a/mvt/android/modules/adb/files.py +++ b/mvt/android/modules/adb/files.py @@ -25,7 +25,7 @@ class Files(AndroidExtraction): log=log, results=results) self.full_find = False - def find_files(self, folder): + def find_files(self, folder: str) -> None: if self.full_find: output = self._adb_command(f"find '{folder}' -printf '%T@ %m %s %u %g %p\n' 2> /dev/null") @@ -56,7 +56,7 @@ class Files(AndroidExtraction): "data": record["path"], } - def check_suspicious(self): + def check_suspicious(self) -> None: """Check for files with suspicious permissions""" for result in sorted(self.results, key=lambda item: item["path"]): if result.get("is_suid"): diff --git a/mvt/android/modules/adb/packages.py b/mvt/android/modules/adb/packages.py index 4c488c1..ea7284e 100644 --- a/mvt/android/modules/adb/packages.py +++ b/mvt/android/modules/adb/packages.py @@ -122,7 +122,7 @@ class Packages(AndroidExtraction): self.detected.append(result) @staticmethod - def check_virustotal(packages): + def check_virustotal(packages: list) -> None: hashes = [] for package in packages: for file in package.get("files", []): @@ -175,7 +175,7 @@ class Packages(AndroidExtraction): console.print(table) @staticmethod - def parse_package_for_details(output): + def parse_package_for_details(output: str) -> dict: details = { "uid": "", "version_name": "", @@ -214,7 +214,7 @@ class Packages(AndroidExtraction): return details - def _get_files_for_package(self, package_name): + def _get_files_for_package(self, package_name: str) -> list: output = self._adb_command(f"pm path {package_name}") output = output.strip().replace("package:", "") if not output: diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index 7ebd728..4a13de7 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -75,7 +75,7 @@ class SMS(AndroidExtraction): if self.indicators.check_domains(message_links): self.detected.append(message) - def _parse_db(self, db_path): + def _parse_db(self, db_path: str) -> None: """Parse an Android bugle_db SMS database file. :param db_path: Path to the Android SMS database file to process @@ -109,7 +109,7 @@ class SMS(AndroidExtraction): log.info("Extracted a total of %d SMS messages containing links", len(self.results)) - def _extract_sms_adb(self): + def _extract_sms_adb(self) -> None: """Use the Android backup command to extract SMS data from the native SMS app It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression diff --git a/mvt/android/modules/adb/whatsapp.py b/mvt/android/modules/adb/whatsapp.py index 41324b5..9dba664 100644 --- a/mvt/android/modules/adb/whatsapp.py +++ b/mvt/android/modules/adb/whatsapp.py @@ -48,7 +48,7 @@ class Whatsapp(AndroidExtraction): if self.indicators.check_domains(message_links): self.detected.append(message) - def _parse_db(self, db_path): + def _parse_db(self, db_path: str) -> None: """Parse an Android msgstore.db WhatsApp database file. :param db_path: Path to the Android WhatsApp database file to process diff --git a/mvt/android/modules/backup/base.py b/mvt/android/modules/backup/base.py index c8e983c..67256da 100644 --- a/mvt/android/modules/backup/base.py +++ b/mvt/android/modules/backup/base.py @@ -5,6 +5,7 @@ import fnmatch import os +from tarfile import TarFile from mvt.common.module import MVTModule @@ -13,14 +14,14 @@ class BackupExtraction(MVTModule): """This class provides a base for all backup extractios modules""" ab = None - def from_folder(self, backup_path, files): + def from_folder(self, backup_path: str, files: list) -> None: """ Get all the files and list them """ self.backup_path = backup_path self.files = files - def from_ab(self, file_path, tar, files): + def from_ab(self, file_path: str, tar: TarFile, files: list) -> None: """ Extract the files """ @@ -28,10 +29,10 @@ class BackupExtraction(MVTModule): self.tar = tar self.files = files - def _get_files_by_pattern(self, pattern): + def _get_files_by_pattern(self, pattern: str) -> list: return fnmatch.filter(self.files, pattern) - def _get_file_content(self, file_path): + def _get_file_content(self, file_path: str) -> bytes: if self.ab: try: member = self.tar.getmember(file_path) @@ -43,4 +44,5 @@ class BackupExtraction(MVTModule): data = handle.read() handle.close() + return data diff --git a/mvt/android/modules/bugreport/base.py b/mvt/android/modules/bugreport/base.py index 76425e8..50cec46 100644 --- a/mvt/android/modules/bugreport/base.py +++ b/mvt/android/modules/bugreport/base.py @@ -6,6 +6,7 @@ import fnmatch import logging import os +from zipfile import ZipFile from mvt.common.module import MVTModule @@ -17,15 +18,15 @@ class BugReportModule(MVTModule): zip_archive = None - def from_folder(self, extract_path, extract_files): + def from_folder(self, extract_path: str, extract_files: str) -> None: self.extract_path = extract_path self.extract_files = extract_files - def from_zip(self, zip_archive, zip_files): + def from_zip(self, zip_archive: ZipFile, zip_files: list) -> None: self.zip_archive = zip_archive self.zip_files = zip_files - def _get_files_by_pattern(self, pattern): + def _get_files_by_pattern(self, pattern: str) -> list: file_names = [] if self.zip_archive: for zip_file in self.zip_files: @@ -35,13 +36,13 @@ class BugReportModule(MVTModule): return fnmatch.filter(file_names, pattern) - def _get_files_by_patterns(self, patterns): + def _get_files_by_patterns(self, patterns: list) -> list: for pattern in patterns: matches = self._get_files_by_pattern(pattern) if matches: return matches - def _get_file_content(self, file_path): + def _get_file_content(self, file_path: str) -> bytes: if self.zip_archive: handle = self.zip_archive.open(file_path) else: @@ -52,7 +53,7 @@ class BugReportModule(MVTModule): return data - def _get_dumpstate_file(self): + def _get_dumpstate_file(self) -> bytes: main = self._get_files_by_pattern("main_entry.txt") if main: main_content = self._get_file_content(main[0]) diff --git a/mvt/android/modules/bugreport/packages.py b/mvt/android/modules/bugreport/packages.py index e51b291..72e39c7 100644 --- a/mvt/android/modules/bugreport/packages.py +++ b/mvt/android/modules/bugreport/packages.py @@ -52,7 +52,7 @@ class Packages(BugReportModule): continue @staticmethod - def parse_package_for_details(output): + def parse_package_for_details(output: str) -> dict: details = { "uid": "", "version_name": "", @@ -103,7 +103,7 @@ class Packages(BugReportModule): return details - def parse_packages_list(self, output): + def parse_packages_list(self, output: str) -> list: pkg_rxp = re.compile(r" Package \[(.+?)\].*") results = [] diff --git a/mvt/common/module.py b/mvt/common/module.py index 2745a26..23115b7 100644 --- a/mvt/common/module.py +++ b/mvt/common/module.py @@ -67,7 +67,7 @@ class MVTModule(object): len(results), json_path) return cls(results=results, log=log) - def get_slug(self): + def get_slug(self) -> str: """Use the module's class name to retrieve a slug""" if self.slug: return self.slug @@ -148,7 +148,7 @@ class MVTModule(object): raise NotImplementedError -def run_module(module: Callable): +def run_module(module: Callable) -> None: module.log.info("Running module %s...", module.__class__.__name__) try: @@ -187,7 +187,7 @@ def run_module(module: Callable): module.save_to_json() -def save_timeline(timeline: list, timeline_path: str): +def save_timeline(timeline: list, timeline_path: str) -> None: """Save the timeline in a csv file. :param timeline: List of records to order and store diff --git a/mvt/common/updates.py b/mvt/common/updates.py index 907090f..4057bef 100644 --- a/mvt/common/updates.py +++ b/mvt/common/updates.py @@ -9,7 +9,7 @@ from packaging import version from .version import MVT_VERSION -def check_for_updates() -> None: +def check_for_updates() -> str: res = requests.get("https://pypi.org/pypi/mvt/json") data = res.json() latest_version = data.get("info", {}).get("version", "") @@ -17,4 +17,4 @@ def check_for_updates() -> None: if version.parse(latest_version) > version.parse(MVT_VERSION): return latest_version - return None + return "" diff --git a/mvt/common/utils.py b/mvt/common/utils.py index 0f531d8..19e82f0 100644 --- a/mvt/common/utils.py +++ b/mvt/common/utils.py @@ -37,7 +37,7 @@ def convert_mactime_to_unix(timestamp, from_2001: bool = True): return None -def convert_chrometime_to_unix(timestamp): +def convert_chrometime_to_unix(timestamp: int) -> int: """Converts Chrome timestamp to a Unix timestamp. :param timestamp: Chrome timestamp as int. @@ -50,7 +50,7 @@ def convert_chrometime_to_unix(timestamp): return epoch_start + delta -def convert_timestamp_to_iso(timestamp): +def convert_timestamp_to_iso(timestamp: str) -> str: """Converts Unix timestamp to ISO string. :param timestamp: Unix timestamp. @@ -65,7 +65,7 @@ def convert_timestamp_to_iso(timestamp): return None -def check_for_links(text): +def check_for_links(text: str) -> list: """Checks if a given text contains HTTP links. :param text: Any provided text. @@ -76,7 +76,7 @@ def check_for_links(text): return re.findall(r"(?Phttps?://[^\s]+)", text, re.IGNORECASE) -def get_sha256_from_file_path(file_path): +def get_sha256_from_file_path(file_path: str) -> str: """Calculate the SHA256 hash of a file from a file path. :param file_path: Path to the file to hash @@ -93,7 +93,7 @@ def get_sha256_from_file_path(file_path): # Note: taken from here: # https://stackoverflow.com/questions/57014259/json-dumps-on-dictionary-with-bytes-for-keys -def keys_bytes_to_string(obj): +def keys_bytes_to_string(obj) -> str: """Convert object keys from bytes to string. :param obj: Object to convert from bytes to string. From 3f4ddaaa0c45d25e8efc1907301711b5389da84b Mon Sep 17 00:00:00 2001 From: Nex Date: Wed, 22 Jun 2022 17:53:53 +0200 Subject: [PATCH 2/3] Minor code style fixes --- mvt/ios/modules/mixed/interactionc.py | 3 ++- mvt/ios/modules/mixed/locationd.py | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mvt/ios/modules/mixed/interactionc.py b/mvt/ios/modules/mixed/interactionc.py index 7f06dc7..cf25b72 100644 --- a/mvt/ios/modules/mixed/interactionc.py +++ b/mvt/ios/modules/mixed/interactionc.py @@ -66,7 +66,8 @@ class InteractionC(IOSExtraction): return records def run(self) -> None: - self._find_ios_database(backup_ids=INTERACTIONC_BACKUP_IDS, root_paths=INTERACTIONC_ROOT_PATHS) + self._find_ios_database(backup_ids=INTERACTIONC_BACKUP_IDS, + root_paths=INTERACTIONC_ROOT_PATHS) self.log.info("Found InteractionC database at path: %s", self.file_path) conn = sqlite3.connect(self.file_path) diff --git a/mvt/ios/modules/mixed/locationd.py b/mvt/ios/modules/mixed/locationd.py index 89bffa4..528bb6d 100644 --- a/mvt/ios/modules/mixed/locationd.py +++ b/mvt/ios/modules/mixed/locationd.py @@ -111,7 +111,6 @@ class LocationdClients(IOSExtraction): self.results.append(result) def run(self) -> None: - if self.is_backup: self._find_ios_database(backup_ids=LOCATIOND_BACKUP_IDS) self.log.info("Found Locationd Clients plist at path: %s", self.file_path) From cca9083dff5d9ec1404b9cdb89b0e24f0483ee99 Mon Sep 17 00:00:00 2001 From: Nex Date: Wed, 22 Jun 2022 17:54:03 +0200 Subject: [PATCH 3/3] Reintroduced is_backup and is_fs_dump --- mvt/ios/cmd_check_backup.py | 3 +++ mvt/ios/cmd_check_fs.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/mvt/ios/cmd_check_backup.py b/mvt/ios/cmd_check_backup.py index 72cd892..c3036ae 100644 --- a/mvt/ios/cmd_check_backup.py +++ b/mvt/ios/cmd_check_backup.py @@ -24,3 +24,6 @@ class CmdIOSCheckBackup(Command): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) + + def module_init(self, module): + module.is_backup = True diff --git a/mvt/ios/cmd_check_fs.py b/mvt/ios/cmd_check_fs.py index 21dc9fe..021d65c 100644 --- a/mvt/ios/cmd_check_fs.py +++ b/mvt/ios/cmd_check_fs.py @@ -24,3 +24,6 @@ class CmdIOSCheckFS(Command): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) + + def module_init(self, module): + module.is_fs_dump = True