diff --git a/mvt/ios/cli.py b/mvt/ios/cli.py index 32511c1..90283f8 100644 --- a/mvt/ios/cli.py +++ b/mvt/ios/cli.py @@ -58,11 +58,13 @@ def version(): @click.option("--destination", "-d", required=True, help="Path to the folder where to store the decrypted backup") @click.option("--password", "-p", cls=MutuallyExclusiveOption, - help=f"Password to use to decrypt the backup (or, set {MVT_IOS_BACKUP_PASSWORD} environment variable)", + help="Password to use to decrypt the backup (or, set " + f"{MVT_IOS_BACKUP_PASSWORD} environment variable)", mutually_exclusive=["key_file"]) @click.option("--key-file", "-k", cls=MutuallyExclusiveOption, type=click.Path(exists=True), - help="File containing raw encryption key to use to decrypt the backup", + help="File containing raw encryption key to use to decrypt " + "the backup", mutually_exclusive=["password"]) @click.argument("BACKUP_PATH", type=click.Path(exists=True)) @click.pass_context @@ -71,20 +73,22 @@ def decrypt_backup(ctx, destination, password, key_file, backup_path): if key_file: if MVT_IOS_BACKUP_PASSWORD in os.environ: - log.info("Ignoring %s environment variable, using --key-file '%s' instead", - MVT_IOS_BACKUP_PASSWORD, key_file) + log.info("Ignoring %s environment variable, using --key-file" + "'%s' instead", MVT_IOS_BACKUP_PASSWORD, key_file) backup.decrypt_with_key_file(key_file) elif password: - log.info("Your password may be visible in the process table because it was supplied on the command line!") + log.info("Your password may be visible in the process table because it " + "was supplied on the command line!") if MVT_IOS_BACKUP_PASSWORD in os.environ: - log.info("Ignoring %s environment variable, using --password argument instead", - MVT_IOS_BACKUP_PASSWORD) + log.info("Ignoring %s environment variable, using --password" + "argument instead", MVT_IOS_BACKUP_PASSWORD) backup.decrypt_with_password(password) elif MVT_IOS_BACKUP_PASSWORD in os.environ: - log.info("Using password from %s environment variable", MVT_IOS_BACKUP_PASSWORD) + log.info("Using password from %s environment variable", + MVT_IOS_BACKUP_PASSWORD) backup.decrypt_with_password(os.environ[MVT_IOS_BACKUP_PASSWORD]) else: sekrit = Prompt.ask("Enter backup password", password=True) @@ -101,23 +105,27 @@ def decrypt_backup(ctx, destination, password, key_file, backup_path): #============================================================================== @cli.command("extract-key", help="Extract decryption key from an iTunes backup") @click.option("--password", "-p", - help=f"Password to use to decrypt the backup (or, set {MVT_IOS_BACKUP_PASSWORD} environment variable)") + help="Password to use to decrypt the backup (or, set " + f"{MVT_IOS_BACKUP_PASSWORD} environment variable)") @click.option("--key-file", "-k", help="Key file to be written (if unset, will print to STDOUT)", required=False, - type=click.Path(exists=False, file_okay=True, dir_okay=False, writable=True)) + type=click.Path(exists=False, file_okay=True, dir_okay=False, + writable=True)) @click.argument("BACKUP_PATH", type=click.Path(exists=True)) def extract_key(password, key_file, backup_path): backup = DecryptBackup(backup_path) if password: - log.info("Your password may be visible in the process table because it was supplied on the command line!") + log.info("Your password may be visible in the process table because it " + "was supplied on the command line!") if MVT_IOS_BACKUP_PASSWORD in os.environ: - log.info("Ignoring %s environment variable, using --password argument instead", - MVT_IOS_BACKUP_PASSWORD) + log.info("Ignoring %s environment variable, using --password " + "argument instead", MVT_IOS_BACKUP_PASSWORD) elif MVT_IOS_BACKUP_PASSWORD in os.environ: - log.info("Using password from %s environment variable", MVT_IOS_BACKUP_PASSWORD) + log.info("Using password from %s environment variable", + MVT_IOS_BACKUP_PASSWORD) password = os.environ[MVT_IOS_BACKUP_PASSWORD] else: password = Prompt.ask("Enter backup password", password=True) @@ -135,7 +143,8 @@ def extract_key(password, key_file, backup_path): @cli.command("check-backup", help="Extract artifacts from an iTunes backup") @click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, default=[], help=HELP_MSG_IOC) -@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) +@click.option("--output", "-o", type=click.Path(exists=False), + help=HELP_MSG_OUTPUT) @click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) @@ -164,7 +173,8 @@ def check_backup(ctx, iocs, output, fast, list_modules, module, backup_path): @cli.command("check-fs", help="Extract artifacts from a full filesystem dump") @click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, default=[], help=HELP_MSG_IOC) -@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) +@click.option("--output", "-o", type=click.Path(exists=False), + help=HELP_MSG_OUTPUT) @click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) diff --git a/mvt/ios/decrypt.py b/mvt/ios/decrypt.py index c3eb285..1ed277b 100644 --- a/mvt/ios/decrypt.py +++ b/mvt/ios/decrypt.py @@ -59,7 +59,8 @@ class DecryptBackup: self._backup.getFileDecryptedCopy(manifestEntry=item, targetName=file_id, targetFolder=item_folder) - log.info("Decrypted file %s [%s] to %s/%s", relative_path, domain, item_folder, file_id) + log.info("Decrypted file %s [%s] to %s/%s", relative_path, domain, + item_folder, file_id) def process_backup(self) -> None: if not os.path.exists(self.dest_path): @@ -79,8 +80,10 @@ class DecryptBackup: relative_path = item["relativePath"] domain = item["domain"] - # This may be a partial backup. Skip files from the manifest which do not exist locally. - source_file_path = os.path.join(self.backup_path, file_id[0:2], file_id) + # This may be a partial backup. Skip files from the manifest + # which do not exist locally. + source_file_path = os.path.join(self.backup_path, file_id[0:2], + file_id) if not os.path.exists(source_file_path): log.debug("Skipping file %s. File not found in encrypted backup directory.", source_file_path) @@ -128,7 +131,8 @@ class DecryptBackup: self.backup_path, newpath) self.backup_path = newpath elif len(possible) > 1: - log.critical("No Manifest.plist in %s, and %d Manifest.plist files in subdirs. Please choose one!", + log.critical("No Manifest.plist in %s, and %d Manifest.plist " + "files in subdirs. Please choose one!", self.backup_path, len(possible)) return @@ -143,12 +147,16 @@ class DecryptBackup: except Exception as e: if isinstance(e, KeyError) and len(e.args) > 0 and e.args[0] == b"KEY": log.critical("Failed to decrypt backup. Password is probably wrong.") - elif isinstance(e, FileNotFoundError) and os.path.basename(e.filename) == "Manifest.plist": - log.critical("Failed to find a valid backup at %s. Did you point to the right backup path?", + elif (isinstance(e, FileNotFoundError) + and os.path.basename(e.filename) == "Manifest.plist"): + log.critical("Failed to find a valid backup at %s. " + "Did you point to the right backup path?", self.backup_path) else: log.exception(e) - log.critical("Failed to decrypt backup. Did you provide the correct password? Did you point to the right backup path?") + log.critical("Failed to decrypt backup. Did you provide the " + "correct password? Did you point to the right " + "backup path?") def decrypt_with_key_file(self, key_file: str) -> None: """Decrypts an encrypted iOS backup using a key file. @@ -168,7 +176,8 @@ class DecryptBackup: # Key should be 64 hex encoded characters (32 raw bytes) if len(key_bytes) != 64: - log.critical("Invalid key from key file. Did you provide the correct key file?") + log.critical("Invalid key from key file. Did you provide the " + "correct key file?") return try: @@ -178,7 +187,8 @@ class DecryptBackup: backuproot=os.path.dirname(self.backup_path)) except Exception as e: log.exception(e) - log.critical("Failed to decrypt backup. Did you provide the correct key file?") + log.critical("Failed to decrypt backup. Did you provide the " + "correct key file?") def get_key(self) -> None: """Retrieve and prints the encryption key.""" @@ -192,7 +202,8 @@ class DecryptBackup: def write_key(self, key_path: str) -> None: """Save extracted key to file. - :param key_path: Path to the file where to write the derived decryption key. + :param key_path: Path to the file where to write the derived decryption + key. """ if not self._decryption_key: @@ -206,5 +217,6 @@ class DecryptBackup: log.critical("Failed to write key to file: %s", key_path) return else: - log.info("Wrote decryption key to file: %s. This file is equivalent to a plaintext password. Keep it safe!", + log.info("Wrote decryption key to file: %s. This file is " + "equivalent to a plaintext password. Keep it safe!", key_path) diff --git a/mvt/ios/modules/base.py b/mvt/ios/modules/base.py index 6c6a3d4..e6fb243 100644 --- a/mvt/ios/modules/base.py +++ b/mvt/ios/modules/base.py @@ -15,7 +15,8 @@ from mvt.common.module import (DatabaseCorruptedError, DatabaseNotFoundError, class IOSExtraction(MVTModule): - """This class provides a base for all iOS filesystem/backup extraction modules.""" + """This class provides a base for all iOS filesystem/backup extraction + modules.""" def __init__(self, file_path: str = None, target_path: str = None, results_path: str = None, fast_mode: bool = False, @@ -52,12 +53,17 @@ class IOSExtraction(MVTModule): if not recover: return - self.log.info("Database at path %s is malformed. Trying to recover...", file_path) + self.log.info("Database at path %s is malformed. Trying to recover...", + file_path) if not shutil.which("sqlite3"): - raise DatabaseCorruptedError("failed to recover without sqlite3 binary: please install sqlite3!") + raise DatabaseCorruptedError("failed to recover without sqlite3 " + "binary: please install sqlite3!") if '"' in file_path: - raise DatabaseCorruptedError(f"database at path '{file_path}' is corrupted. unable to recover because it has a quotation mark (\") in its name") + raise DatabaseCorruptedError(f"database at path '{file_path}' is " + "corrupted. unable to recover because " + "it has a quotation mark (\") in its " + "name") bak_path = f"{file_path}.bak" shutil.move(file_path, bak_path) @@ -72,8 +78,10 @@ class IOSExtraction(MVTModule): def _get_backup_files_from_manifest(self, relative_path=None, domain=None): """Locate files from Manifest.db. - :param relative_path: Relative path to use as filter from Manifest.db. (Default value = None) - :param domain: Domain to use as filter from Manifest.db. (Default value = None) + :param relative_path: Relative path to use as filter from Manifest.db. + (Default value = None) + :param domain: Domain to use as filter from Manifest.db. + (Default value = None) """ manifest_db_path = os.path.join(self.target_path, "Manifest.db") @@ -90,7 +98,8 @@ class IOSExtraction(MVTModule): (relative_path, domain)) else: if relative_path: - cur.execute(f"{base_sql} relativePath = ?;", (relative_path,)) + cur.execute(f"{base_sql} relativePath = ?;", + (relative_path,)) elif domain: cur.execute(f"{base_sql} domain = ?;", (domain,)) except Exception as e: @@ -112,7 +121,8 @@ class IOSExtraction(MVTModule): def _get_fs_files_from_patterns(self, root_paths): for root_path in root_paths: - for found_path in glob.glob(os.path.join(self.target_path, root_path)): + for found_path in glob.glob(os.path.join(self.target_path, + root_path)): if not os.path.exists(found_path): continue @@ -126,7 +136,8 @@ class IOSExtraction(MVTModule): you should use the helper functions above. :param backup_id: iTunes backup database file's ID (or hash). - :param root_paths: Glob patterns for files to seek in filesystem dump. (Default value = []) + :param root_paths: Glob patterns for files to seek in filesystem dump. + (Default value = []) :param backup_ids: Default value = None) """ @@ -155,6 +166,7 @@ class IOSExtraction(MVTModule): if file_path: self.file_path = file_path else: - raise DatabaseNotFoundError("unable to find the module's database file") + raise DatabaseNotFoundError("unable to find the module's " + "database file") self._recover_sqlite_db_if_needed(self.file_path) diff --git a/mvt/ios/modules/net_base.py b/mvt/ios/modules/net_base.py index 741fcb2..84f6292 100644 --- a/mvt/ios/modules/net_base.py +++ b/mvt/ios/modules/net_base.py @@ -15,7 +15,8 @@ from .base import IOSExtraction class NetBase(IOSExtraction): - """This class provides a base for DataUsage and NetUsage extraction modules.""" + """This class provides a base for DataUsage and NetUsage extraction + modules.""" def __init__(self, file_path: str = None, target_path: str = None, results_path: str = None, fast_mode: bool = False, @@ -45,7 +46,10 @@ class NetBase(IOSExtraction): FROM ZLIVEUSAGE LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK UNION - SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK, NULL, NULL, NULL, NULL, NULL, NULL, NULL FROM ZPROCESS WHERE Z_PK NOT IN (SELECT ZHASPROCESS FROM ZLIVEUSAGE); + SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK, + NULL, NULL, NULL, NULL, NULL, NULL, NULL + FROM ZPROCESS WHERE Z_PK NOT IN + (SELECT ZHASPROCESS FROM ZLIVEUSAGE); """) for row in cur: @@ -83,9 +87,13 @@ class NetBase(IOSExtraction): self.log.info("Extracted information on %d processes", len(self.results)) def serialize(self, record: dict) -> Union[dict, list]: - record_data = f"{record['proc_name']} (Bundle ID: {record['bundle_id']}, ID: {record['proc_id']})" - record_data_usage = record_data + f" WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \ - f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}" + record_data = (f"{record['proc_name']} (Bundle ID: {record['bundle_id']}," + f" ID: {record['proc_id']})") + record_data_usage = (record_data + " " + f"WIFI IN: {record['wifi_in']}, " + f"WIFI OUT: {record['wifi_out']} - " + f"WWAN IN: {record['wwan_in']}, " + f"WWAN OUT: {record['wwan_out']}") records = [{ "timestamp": record["live_isodate"], @@ -94,8 +102,11 @@ class NetBase(IOSExtraction): "data": record_data_usage, }] - # Only included first_usage and current_usage records when a ZPROCESS entry exists. - if "MANIPULATED" not in record["proc_name"] and "MISSING" not in record["proc_name"] and record["live_proc_id"] is not None: + # Only included first_usage and current_usage records when a + # ZPROCESS entry exists. + if ("MANIPULATED" not in record["proc_name"] + and "MISSING" not in record["proc_name"] + and record["live_proc_id"] is not None): records.extend([ { "timestamp": record["first_isodate"], @@ -122,7 +133,8 @@ class NetBase(IOSExtraction): # If we are instructed to run fast, we skip this. if self.fast_mode: - self.log.info("Flag --fast was enabled: skipping extended search for suspicious processes") + self.log.info("Flag --fast was enabled: skipping extended " + "search for suspicious processes") return self.log.info("Extended search for suspicious processes ...") @@ -139,7 +151,8 @@ class NetBase(IOSExtraction): for proc in self.results: if not proc["bundle_id"]: - self.log.debug("Found process with no Bundle ID with name: %s", proc["proc_name"]) + self.log.debug("Found process with no Bundle ID with " + "name: %s", proc["proc_name"]) binary_path = None for file in files: @@ -150,15 +163,20 @@ class NetBase(IOSExtraction): if binary_path: self.log.debug("Located at %s", binary_path) else: - msg = f"Could not find the binary associated with the process with name {proc['proc_name']}" + msg = ("Could not find the binary associated with the " + f"process with name {proc['proc_name']}") if not proc["proc_name"]: - msg = f"Found process entry with empty 'proc_name': {proc['live_proc_id']} at {proc['live_isodate']}" + msg = ("Found process entry with empty 'proc_name': " + f"{proc['live_proc_id']} at {proc['live_isodate']}") elif len(proc["proc_name"]) == 16: - msg = msg + " (However, the process name might have been truncated in the database)" + msg += (" (However, the process name might have " + "been truncated in the database)") self.log.warning(msg) if not proc["live_proc_id"]: - self.log.info(f"Found process entry in ZPROCESS but not in ZLIVEUSAGE: {proc['proc_name']} at {proc['live_isodate']}") + self.log.info("Found process entry in ZPROCESS but not in " + "ZLIVEUSAGE: %s at %s", + proc['proc_name'], proc['live_isodate']) def check_manipulated(self): """Check for missing or manipulate DB entries""" @@ -171,8 +189,9 @@ class NetBase(IOSExtraction): # Avoid duplicate warnings for same process. if result["live_proc_id"] not in missing_process_cache: missing_process_cache.add(result["live_proc_id"]) - self.log.warning("Found manipulated process entry %s. Entry on %s", - result["live_proc_id"], result["live_isodate"]) + self.log.warning("Found manipulated process entry %s. " + "Entry on %s", result["live_proc_id"], + result["live_isodate"]) # Set manipulated proc timestamp so it appears in timeline. result["first_isodate"] = result["isodate"] = result["live_isodate"] @@ -193,7 +212,8 @@ class NetBase(IOSExtraction): if proc_id not in all_proc_id: previous_proc = results_by_proc[last_proc_id] self.log.info("Missing process %d. Previous process at \"%s\" (%s)", - proc_id, previous_proc["first_isodate"], previous_proc["proc_name"]) + proc_id, previous_proc["first_isodate"], + previous_proc["proc_name"]) missing_procs[proc_id] = { "proc_id": proc_id, @@ -216,7 +236,8 @@ class NetBase(IOSExtraction): self.results.append(result) - self.results = sorted(self.results, key=operator.itemgetter("first_isodate")) + self.results = sorted(self.results, + key=operator.itemgetter("first_isodate")) def check_indicators(self) -> None: # Check for manipulated process records.