Starting to enforce line lengths on mvt-ios

This commit is contained in:
Nex 2022-08-12 19:38:57 +02:00
parent 424b86a261
commit 0f503f72b5
4 changed files with 109 additions and 54 deletions

View File

@ -58,11 +58,13 @@ def version():
@click.option("--destination", "-d", required=True,
help="Path to the folder where to store the decrypted backup")
@click.option("--password", "-p", cls=MutuallyExclusiveOption,
help=f"Password to use to decrypt the backup (or, set {MVT_IOS_BACKUP_PASSWORD} environment variable)",
help="Password to use to decrypt the backup (or, set "
f"{MVT_IOS_BACKUP_PASSWORD} environment variable)",
mutually_exclusive=["key_file"])
@click.option("--key-file", "-k", cls=MutuallyExclusiveOption,
type=click.Path(exists=True),
help="File containing raw encryption key to use to decrypt the backup",
help="File containing raw encryption key to use to decrypt "
"the backup",
mutually_exclusive=["password"])
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
@ -71,20 +73,22 @@ def decrypt_backup(ctx, destination, password, key_file, backup_path):
if key_file:
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring %s environment variable, using --key-file '%s' instead",
MVT_IOS_BACKUP_PASSWORD, key_file)
log.info("Ignoring %s environment variable, using --key-file"
"'%s' instead", MVT_IOS_BACKUP_PASSWORD, key_file)
backup.decrypt_with_key_file(key_file)
elif password:
log.info("Your password may be visible in the process table because it was supplied on the command line!")
log.info("Your password may be visible in the process table because it "
"was supplied on the command line!")
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring %s environment variable, using --password argument instead",
MVT_IOS_BACKUP_PASSWORD)
log.info("Ignoring %s environment variable, using --password"
"argument instead", MVT_IOS_BACKUP_PASSWORD)
backup.decrypt_with_password(password)
elif MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Using password from %s environment variable", MVT_IOS_BACKUP_PASSWORD)
log.info("Using password from %s environment variable",
MVT_IOS_BACKUP_PASSWORD)
backup.decrypt_with_password(os.environ[MVT_IOS_BACKUP_PASSWORD])
else:
sekrit = Prompt.ask("Enter backup password", password=True)
@ -101,23 +105,27 @@ def decrypt_backup(ctx, destination, password, key_file, backup_path):
#==============================================================================
@cli.command("extract-key", help="Extract decryption key from an iTunes backup")
@click.option("--password", "-p",
help=f"Password to use to decrypt the backup (or, set {MVT_IOS_BACKUP_PASSWORD} environment variable)")
help="Password to use to decrypt the backup (or, set "
f"{MVT_IOS_BACKUP_PASSWORD} environment variable)")
@click.option("--key-file", "-k",
help="Key file to be written (if unset, will print to STDOUT)",
required=False,
type=click.Path(exists=False, file_okay=True, dir_okay=False, writable=True))
type=click.Path(exists=False, file_okay=True, dir_okay=False,
writable=True))
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
def extract_key(password, key_file, backup_path):
backup = DecryptBackup(backup_path)
if password:
log.info("Your password may be visible in the process table because it was supplied on the command line!")
log.info("Your password may be visible in the process table because it "
"was supplied on the command line!")
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring %s environment variable, using --password argument instead",
MVT_IOS_BACKUP_PASSWORD)
log.info("Ignoring %s environment variable, using --password "
"argument instead", MVT_IOS_BACKUP_PASSWORD)
elif MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Using password from %s environment variable", MVT_IOS_BACKUP_PASSWORD)
log.info("Using password from %s environment variable",
MVT_IOS_BACKUP_PASSWORD)
password = os.environ[MVT_IOS_BACKUP_PASSWORD]
else:
password = Prompt.ask("Enter backup password", password=True)
@ -135,7 +143,8 @@ def extract_key(password, key_file, backup_path):
@cli.command("check-backup", help="Extract artifacts from an iTunes backup")
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@ -164,7 +173,8 @@ def check_backup(ctx, iocs, output, fast, list_modules, module, backup_path):
@cli.command("check-fs", help="Extract artifacts from a full filesystem dump")
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)

View File

@ -59,7 +59,8 @@ class DecryptBackup:
self._backup.getFileDecryptedCopy(manifestEntry=item,
targetName=file_id,
targetFolder=item_folder)
log.info("Decrypted file %s [%s] to %s/%s", relative_path, domain, item_folder, file_id)
log.info("Decrypted file %s [%s] to %s/%s", relative_path, domain,
item_folder, file_id)
def process_backup(self) -> None:
if not os.path.exists(self.dest_path):
@ -79,8 +80,10 @@ class DecryptBackup:
relative_path = item["relativePath"]
domain = item["domain"]
# This may be a partial backup. Skip files from the manifest which do not exist locally.
source_file_path = os.path.join(self.backup_path, file_id[0:2], file_id)
# This may be a partial backup. Skip files from the manifest
# which do not exist locally.
source_file_path = os.path.join(self.backup_path, file_id[0:2],
file_id)
if not os.path.exists(source_file_path):
log.debug("Skipping file %s. File not found in encrypted backup directory.",
source_file_path)
@ -128,7 +131,8 @@ class DecryptBackup:
self.backup_path, newpath)
self.backup_path = newpath
elif len(possible) > 1:
log.critical("No Manifest.plist in %s, and %d Manifest.plist files in subdirs. Please choose one!",
log.critical("No Manifest.plist in %s, and %d Manifest.plist "
"files in subdirs. Please choose one!",
self.backup_path, len(possible))
return
@ -143,12 +147,16 @@ class DecryptBackup:
except Exception as e:
if isinstance(e, KeyError) and len(e.args) > 0 and e.args[0] == b"KEY":
log.critical("Failed to decrypt backup. Password is probably wrong.")
elif isinstance(e, FileNotFoundError) and os.path.basename(e.filename) == "Manifest.plist":
log.critical("Failed to find a valid backup at %s. Did you point to the right backup path?",
elif (isinstance(e, FileNotFoundError)
and os.path.basename(e.filename) == "Manifest.plist"):
log.critical("Failed to find a valid backup at %s. "
"Did you point to the right backup path?",
self.backup_path)
else:
log.exception(e)
log.critical("Failed to decrypt backup. Did you provide the correct password? Did you point to the right backup path?")
log.critical("Failed to decrypt backup. Did you provide the "
"correct password? Did you point to the right "
"backup path?")
def decrypt_with_key_file(self, key_file: str) -> None:
"""Decrypts an encrypted iOS backup using a key file.
@ -168,7 +176,8 @@ class DecryptBackup:
# Key should be 64 hex encoded characters (32 raw bytes)
if len(key_bytes) != 64:
log.critical("Invalid key from key file. Did you provide the correct key file?")
log.critical("Invalid key from key file. Did you provide the "
"correct key file?")
return
try:
@ -178,7 +187,8 @@ class DecryptBackup:
backuproot=os.path.dirname(self.backup_path))
except Exception as e:
log.exception(e)
log.critical("Failed to decrypt backup. Did you provide the correct key file?")
log.critical("Failed to decrypt backup. Did you provide the "
"correct key file?")
def get_key(self) -> None:
"""Retrieve and prints the encryption key."""
@ -192,7 +202,8 @@ class DecryptBackup:
def write_key(self, key_path: str) -> None:
"""Save extracted key to file.
:param key_path: Path to the file where to write the derived decryption key.
:param key_path: Path to the file where to write the derived decryption
key.
"""
if not self._decryption_key:
@ -206,5 +217,6 @@ class DecryptBackup:
log.critical("Failed to write key to file: %s", key_path)
return
else:
log.info("Wrote decryption key to file: %s. This file is equivalent to a plaintext password. Keep it safe!",
log.info("Wrote decryption key to file: %s. This file is "
"equivalent to a plaintext password. Keep it safe!",
key_path)

View File

@ -15,7 +15,8 @@ from mvt.common.module import (DatabaseCorruptedError, DatabaseNotFoundError,
class IOSExtraction(MVTModule):
"""This class provides a base for all iOS filesystem/backup extraction modules."""
"""This class provides a base for all iOS filesystem/backup extraction
modules."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
@ -52,12 +53,17 @@ class IOSExtraction(MVTModule):
if not recover:
return
self.log.info("Database at path %s is malformed. Trying to recover...", file_path)
self.log.info("Database at path %s is malformed. Trying to recover...",
file_path)
if not shutil.which("sqlite3"):
raise DatabaseCorruptedError("failed to recover without sqlite3 binary: please install sqlite3!")
raise DatabaseCorruptedError("failed to recover without sqlite3 "
"binary: please install sqlite3!")
if '"' in file_path:
raise DatabaseCorruptedError(f"database at path '{file_path}' is corrupted. unable to recover because it has a quotation mark (\") in its name")
raise DatabaseCorruptedError(f"database at path '{file_path}' is "
"corrupted. unable to recover because "
"it has a quotation mark (\") in its "
"name")
bak_path = f"{file_path}.bak"
shutil.move(file_path, bak_path)
@ -72,8 +78,10 @@ class IOSExtraction(MVTModule):
def _get_backup_files_from_manifest(self, relative_path=None, domain=None):
"""Locate files from Manifest.db.
:param relative_path: Relative path to use as filter from Manifest.db. (Default value = None)
:param domain: Domain to use as filter from Manifest.db. (Default value = None)
:param relative_path: Relative path to use as filter from Manifest.db.
(Default value = None)
:param domain: Domain to use as filter from Manifest.db.
(Default value = None)
"""
manifest_db_path = os.path.join(self.target_path, "Manifest.db")
@ -90,7 +98,8 @@ class IOSExtraction(MVTModule):
(relative_path, domain))
else:
if relative_path:
cur.execute(f"{base_sql} relativePath = ?;", (relative_path,))
cur.execute(f"{base_sql} relativePath = ?;",
(relative_path,))
elif domain:
cur.execute(f"{base_sql} domain = ?;", (domain,))
except Exception as e:
@ -112,7 +121,8 @@ class IOSExtraction(MVTModule):
def _get_fs_files_from_patterns(self, root_paths):
for root_path in root_paths:
for found_path in glob.glob(os.path.join(self.target_path, root_path)):
for found_path in glob.glob(os.path.join(self.target_path,
root_path)):
if not os.path.exists(found_path):
continue
@ -126,7 +136,8 @@ class IOSExtraction(MVTModule):
you should use the helper functions above.
:param backup_id: iTunes backup database file's ID (or hash).
:param root_paths: Glob patterns for files to seek in filesystem dump. (Default value = [])
:param root_paths: Glob patterns for files to seek in filesystem dump.
(Default value = [])
:param backup_ids: Default value = None)
"""
@ -155,6 +166,7 @@ class IOSExtraction(MVTModule):
if file_path:
self.file_path = file_path
else:
raise DatabaseNotFoundError("unable to find the module's database file")
raise DatabaseNotFoundError("unable to find the module's "
"database file")
self._recover_sqlite_db_if_needed(self.file_path)

View File

@ -15,7 +15,8 @@ from .base import IOSExtraction
class NetBase(IOSExtraction):
"""This class provides a base for DataUsage and NetUsage extraction modules."""
"""This class provides a base for DataUsage and NetUsage extraction
modules."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
@ -45,7 +46,10 @@ class NetBase(IOSExtraction):
FROM ZLIVEUSAGE
LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK
UNION
SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK, NULL, NULL, NULL, NULL, NULL, NULL, NULL FROM ZPROCESS WHERE Z_PK NOT IN (SELECT ZHASPROCESS FROM ZLIVEUSAGE);
SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK,
NULL, NULL, NULL, NULL, NULL, NULL, NULL
FROM ZPROCESS WHERE Z_PK NOT IN
(SELECT ZHASPROCESS FROM ZLIVEUSAGE);
""")
for row in cur:
@ -83,9 +87,13 @@ class NetBase(IOSExtraction):
self.log.info("Extracted information on %d processes", len(self.results))
def serialize(self, record: dict) -> Union[dict, list]:
record_data = f"{record['proc_name']} (Bundle ID: {record['bundle_id']}, ID: {record['proc_id']})"
record_data_usage = record_data + f" WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \
f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}"
record_data = (f"{record['proc_name']} (Bundle ID: {record['bundle_id']},"
f" ID: {record['proc_id']})")
record_data_usage = (record_data + " "
f"WIFI IN: {record['wifi_in']}, "
f"WIFI OUT: {record['wifi_out']} - "
f"WWAN IN: {record['wwan_in']}, "
f"WWAN OUT: {record['wwan_out']}")
records = [{
"timestamp": record["live_isodate"],
@ -94,8 +102,11 @@ class NetBase(IOSExtraction):
"data": record_data_usage,
}]
# Only included first_usage and current_usage records when a ZPROCESS entry exists.
if "MANIPULATED" not in record["proc_name"] and "MISSING" not in record["proc_name"] and record["live_proc_id"] is not None:
# Only included first_usage and current_usage records when a
# ZPROCESS entry exists.
if ("MANIPULATED" not in record["proc_name"]
and "MISSING" not in record["proc_name"]
and record["live_proc_id"] is not None):
records.extend([
{
"timestamp": record["first_isodate"],
@ -122,7 +133,8 @@ class NetBase(IOSExtraction):
# If we are instructed to run fast, we skip this.
if self.fast_mode:
self.log.info("Flag --fast was enabled: skipping extended search for suspicious processes")
self.log.info("Flag --fast was enabled: skipping extended "
"search for suspicious processes")
return
self.log.info("Extended search for suspicious processes ...")
@ -139,7 +151,8 @@ class NetBase(IOSExtraction):
for proc in self.results:
if not proc["bundle_id"]:
self.log.debug("Found process with no Bundle ID with name: %s", proc["proc_name"])
self.log.debug("Found process with no Bundle ID with "
"name: %s", proc["proc_name"])
binary_path = None
for file in files:
@ -150,15 +163,20 @@ class NetBase(IOSExtraction):
if binary_path:
self.log.debug("Located at %s", binary_path)
else:
msg = f"Could not find the binary associated with the process with name {proc['proc_name']}"
msg = ("Could not find the binary associated with the "
f"process with name {proc['proc_name']}")
if not proc["proc_name"]:
msg = f"Found process entry with empty 'proc_name': {proc['live_proc_id']} at {proc['live_isodate']}"
msg = ("Found process entry with empty 'proc_name': "
f"{proc['live_proc_id']} at {proc['live_isodate']}")
elif len(proc["proc_name"]) == 16:
msg = msg + " (However, the process name might have been truncated in the database)"
msg += (" (However, the process name might have "
"been truncated in the database)")
self.log.warning(msg)
if not proc["live_proc_id"]:
self.log.info(f"Found process entry in ZPROCESS but not in ZLIVEUSAGE: {proc['proc_name']} at {proc['live_isodate']}")
self.log.info("Found process entry in ZPROCESS but not in "
"ZLIVEUSAGE: %s at %s",
proc['proc_name'], proc['live_isodate'])
def check_manipulated(self):
"""Check for missing or manipulate DB entries"""
@ -171,8 +189,9 @@ class NetBase(IOSExtraction):
# Avoid duplicate warnings for same process.
if result["live_proc_id"] not in missing_process_cache:
missing_process_cache.add(result["live_proc_id"])
self.log.warning("Found manipulated process entry %s. Entry on %s",
result["live_proc_id"], result["live_isodate"])
self.log.warning("Found manipulated process entry %s. "
"Entry on %s", result["live_proc_id"],
result["live_isodate"])
# Set manipulated proc timestamp so it appears in timeline.
result["first_isodate"] = result["isodate"] = result["live_isodate"]
@ -193,7 +212,8 @@ class NetBase(IOSExtraction):
if proc_id not in all_proc_id:
previous_proc = results_by_proc[last_proc_id]
self.log.info("Missing process %d. Previous process at \"%s\" (%s)",
proc_id, previous_proc["first_isodate"], previous_proc["proc_name"])
proc_id, previous_proc["first_isodate"],
previous_proc["proc_name"])
missing_procs[proc_id] = {
"proc_id": proc_id,
@ -216,7 +236,8 @@ class NetBase(IOSExtraction):
self.results.append(result)
self.results = sorted(self.results, key=operator.itemgetter("first_isodate"))
self.results = sorted(self.results,
key=operator.itemgetter("first_isodate"))
def check_indicators(self) -> None:
# Check for manipulated process records.