Started linting the code

This commit is contained in:
Nex 2022-08-12 16:20:16 +02:00
parent 146b9245ab
commit 63ff5fd334
68 changed files with 357 additions and 268 deletions

View File

@ -11,3 +11,6 @@ upload:
test-upload:
python3 -m twine upload --repository testpypi dist/*
pylint:
pylint --rcfile=setup.cfg mvt

View File

@ -156,14 +156,13 @@ def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path):
# Command: check-backup
#==============================================================================
@cli.command("check-backup", help="Check an Android Backup")
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
def check_backup(ctx, serial, iocs, output, list_modules, backup_path):
def check_backup(ctx, iocs, output, list_modules, backup_path):
cmd = CmdAndroidCheckBackup(target_path=backup_path, results_path=output,
ioc_files=iocs)

View File

@ -14,12 +14,12 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckADB(Command):
name = "check-adb"
modules = ADB_MODULES
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-adb"
self.modules = ADB_MODULES

View File

@ -25,9 +25,6 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBackup(Command):
name = "check-backup"
modules = BACKUP_MODULES
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
@ -35,6 +32,9 @@ class CmdAndroidCheckBackup(Command):
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-backup"
self.modules = BACKUP_MODULES
self.backup_type = None
self.backup_archive = None
self.backup_files = []

View File

@ -18,9 +18,6 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBugreport(Command):
name = "check-bugreport"
modules = BUGREPORT_MODULES
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
@ -28,6 +25,9 @@ class CmdAndroidCheckBugreport(Command):
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-bugreport"
self.modules = BUGREPORT_MODULES
self.bugreport_format = None
self.bugreport_archive = None
self.bugreport_files = []
@ -41,7 +41,7 @@ class CmdAndroidCheckBugreport(Command):
elif os.path.isdir(self.target_path):
self.bugreport_format = "dir"
parent_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
for root, _, subfiles in os.walk(os.path.abspath(self.target_path)):
for file_name in subfiles:
self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path))

View File

@ -25,8 +25,6 @@ from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header,
parse_backup_file)
from mvt.common.module import InsufficientPrivileges, MVTModule
log = logging.getLogger(__name__)
ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey")
ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub")
@ -75,7 +73,7 @@ class AndroidExtraction(MVTModule):
try:
self.device = AdbDeviceUsb(serial=self.serial)
except UsbDeviceNotFoundError:
log.critical("No device found. Make sure it is connected and unlocked.")
self.log.critical("No device found. Make sure it is connected and unlocked.")
sys.exit(-1)
# Otherwise we try to use the TCP transport.
else:
@ -90,18 +88,18 @@ class AndroidExtraction(MVTModule):
try:
self.device.connect(rsa_keys=[signer], auth_timeout_s=5)
except (USBErrorBusy, USBErrorAccess):
log.critical("Device is busy, maybe run `adb kill-server` and try again.")
self.log.critical("Device is busy, maybe run `adb kill-server` and try again.")
sys.exit(-1)
except DeviceAuthError:
log.error("You need to authorize this computer on the Android device. Retrying in 5 seconds...")
self.log.error("You need to authorize this computer on the Android device. Retrying in 5 seconds...")
time.sleep(5)
except UsbReadFailedError:
log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.")
self.log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.")
sys.exit(-1)
except OSError as e:
if e.errno == 113 and self.serial:
log.critical("Unable to connect to the device %s: did you specify the correct IP addres?",
self.serial)
self.log.critical("Unable to connect to the device %s: did you specify the correct IP addres?",
self.serial)
sys.exit(-1)
else:
break
@ -112,7 +110,7 @@ class AndroidExtraction(MVTModule):
def _adb_reconnect(self) -> None:
"""Reconnect to device using adb."""
log.info("Reconnecting ...")
self.log.info("Reconnecting ...")
self._adb_disconnect()
self._adb_connect()
@ -197,15 +195,15 @@ class AndroidExtraction(MVTModule):
new_remote_path = f"/sdcard/{tmp_filename}"
# We copy the file from the data folder to /sdcard/.
cp = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
if cp.startswith("cp: ") and "No such file or directory" in cp:
cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
raise Exception(f"Unable to process file {remote_path}: File not found")
elif cp.startswith("cp: ") and "Permission denied" in cp:
if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
raise Exception(f"Unable to process file {remote_path}: Permission denied")
# We download from /sdcard/ to the local temporary file.
# If it doesn't work now, don't try again (retry_root=False)
self._adb_download(new_remote_path, local_path, retry_root=False)
self._adb_download(new_remote_path, local_path, progress_callback, retry_root=False)
# Delete the copy on /sdcard/.
self._adb_command(f"rm -rf {new_remote_path}")
@ -235,10 +233,10 @@ class AndroidExtraction(MVTModule):
new_remote_path = f"/sdcard/Download/{local_name}"
# We copy the file from the data folder to /sdcard/.
cp = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
if cp.startswith("cp: ") and "No such file or directory" in cp:
cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
raise Exception(f"Unable to process file {remote_path}: File not found")
elif cp.startswith("cp: ") and "Permission denied" in cp:
if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
raise Exception(f"Unable to process file {remote_path}: Permission denied")
# We download from /sdcard/ to the local temporary file.
@ -258,19 +256,18 @@ class AndroidExtraction(MVTModule):
self.log.warning("Please check phone and accept Android backup prompt. You may need to set a backup password. \a")
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport...
backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress '{}' | base64".format(
package_name))
backup_output_b64 = self._adb_command(f"/system/bin/bu backup -nocompress '{package_name}' | base64")
backup_output = base64.b64decode(backup_output_b64)
header = parse_ab_header(backup_output)
if not header["backup"]:
self.log.error("Extracting SMS via Android backup failed. No valid backup data found.")
return
return None
if header["encryption"] == "none":
return parse_backup_file(backup_output, password=None)
for password_retry in range(0, 3):
for _ in range(0, 3):
backup_password = Prompt.ask("Enter backup password", password=True)
try:
decrypted_backup_tar = parse_backup_file(backup_output, backup_password)
@ -280,6 +277,8 @@ class AndroidExtraction(MVTModule):
self.log.warn("All attempts to decrypt backup with password failed!")
return None
def run(self) -> None:
"""Run the main procedure."""
raise NotImplementedError

View File

@ -26,7 +26,7 @@ class ChromeHistory(AndroidExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -23,7 +23,7 @@ class DumpsysAppOps(AndroidExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
records = []
for perm in record["permissions"]:
if "entries" not in perm:

View File

@ -21,7 +21,7 @@ class DumpsysBatteryDaily(AndroidExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["from"],
"module": self.__class__.__name__,

View File

@ -51,11 +51,11 @@ class DumpsysReceivers(AndroidExtraction):
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
receiver["receiver"])
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
continue
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
continue
def run(self) -> None:
self._adb_connect()

View File

@ -34,7 +34,7 @@ class Files(AndroidExtraction):
log=log, results=results)
self.full_find = False
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
if "modified_time" in record:
return {
"timestamp": record["modified_time"],
@ -43,6 +43,8 @@ class Files(AndroidExtraction):
"data": record["path"],
}
return None
def check_indicators(self) -> None:
for result in self.results:
if result.get("is_suid"):

View File

@ -78,7 +78,7 @@ class Packages(AndroidExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
records = []
timestamps = [
@ -87,11 +87,11 @@ class Packages(AndroidExtraction):
{"event": "package_last_update", "timestamp": record["last_update_time"]},
]
for ts in timestamps:
for timestamp in timestamps:
records.append({
"timestamp": ts["timestamp"],
"timestamp": timestamp["timestamp"],
"module": self.__class__.__name__,
"event": ts["event"],
"event": timestamp["event"],
"data": f"{record['package_name']} (system: {record['system']}, third party: {record['third_party']})",
})
@ -222,10 +222,10 @@ class Packages(AndroidExtraction):
for file_path in output.splitlines():
file_path = file_path.strip()
md5 = self._adb_command(f"md5sum {file_path}").split(" ")[0]
sha1 = self._adb_command(f"sha1sum {file_path}").split(" ")[0]
sha256 = self._adb_command(f"sha256sum {file_path}").split(" ")[0]
sha512 = self._adb_command(f"sha512sum {file_path}").split(" ")[0]
md5 = self._adb_command(f"md5sum {file_path}").split(" ", maxsplit=1)[0]
sha1 = self._adb_command(f"sha1sum {file_path}").split(" ", maxsplit=1)[0]
sha256 = self._adb_command(f"sha256sum {file_path}").split(" ", maxsplit=1)[0]
sha512 = self._adb_command(f"sha512sum {file_path}").split(" ", maxsplit=1)[0]
package_files.append({
"path": file_path,

View File

@ -70,7 +70,7 @@ class Settings(AndroidExtraction):
self.results = {} if not results else results
def check_indicators(self) -> None:
for namespace, settings in self.results.items():
for _, settings in self.results.items():
for key, value in settings.items():
for danger in ANDROID_DANGEROUS_SETTINGS:
# Check if one of the dangerous settings is using an unsafe

View File

@ -52,7 +52,9 @@ class SMS(AndroidExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
self.sms_db_type = 0
def serialize(self, record: dict) -> dict | list:
body = record["body"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],
@ -83,9 +85,9 @@ class SMS(AndroidExtraction):
conn = sqlite3.connect(db_path)
cur = conn.cursor()
if self.SMS_DB_TYPE == 1:
if self.sms_db_type == 1:
cur.execute(SMS_BUGLE_QUERY)
elif self.SMS_DB_TYPE == 2:
elif self.sms_db_type == 2:
cur.execute(SMS_MMSMS_QUERY)
names = [description[0] for description in cur.description]
@ -133,10 +135,10 @@ class SMS(AndroidExtraction):
def run(self) -> None:
try:
if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))):
self.SMS_DB_TYPE = 1
self.sms_db_type = 1
self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db)
elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))):
self.SMS_DB_TYPE = 2
self.sms_db_type = 2
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db)
return
except InsufficientPrivileges:

View File

@ -26,7 +26,7 @@ class Whatsapp(AndroidExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
text = record["data"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],
@ -74,8 +74,9 @@ class Whatsapp(AndroidExtraction):
# If we find links in the messages or if they are empty we add them to the list.
if check_for_links(message["data"]) or message["data"].strip() == "":
if (message.get('thumb_image') is not None):
if message.get('thumb_image'):
message['thumb_image'] = base64.b64encode(message['thumb_image'])
messages.append(message)
cur.close()

View File

@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import fnmatch
import logging
import os
from tarfile import TarFile
@ -12,7 +13,19 @@ from mvt.common.module import MVTModule
class BackupExtraction(MVTModule):
"""This class provides a base for all backup extractios modules"""
ab = None
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.ab = None
self.backup_path = None
self.tar = None
self.files = []
def from_folder(self, backup_path: str, files: list) -> None:
"""

View File

@ -21,7 +21,7 @@ class Appops(BugReportModule):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
records = []
for perm in record["permissions"]:
if "entries" not in perm:

View File

@ -4,6 +4,7 @@
# https://github.com/mvt-project/mvt/blob/main/LICENSE
import fnmatch
import logging
import os
from zipfile import ZipFile
@ -13,7 +14,18 @@ from mvt.common.module import MVTModule
class BugReportModule(MVTModule):
"""This class provides a base for all Android Bug Report modules."""
zip_archive = None
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.zip_archive = None
self.extract_path = None
self.extract_files = []
self.zip_files = []
def from_folder(self, extract_path: str, extract_files: str) -> None:
self.extract_path = extract_path

View File

@ -21,7 +21,7 @@ class BatteryDaily(BugReportModule):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["from"],
"module": self.__class__.__name__,

View File

@ -24,7 +24,7 @@ class Packages(BugReportModule):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
records = []
timestamps = [
@ -33,11 +33,11 @@ class Packages(BugReportModule):
{"event": "package_last_update", "timestamp": record["last_update_time"]},
]
for ts in timestamps:
for timestamp in timestamps:
records.append({
"timestamp": ts["timestamp"],
"timestamp": timestamp["timestamp"],
"module": self.__class__.__name__,
"event": ts["event"],
"event": timestamp["event"],
"data": f"Install or update of package {record['package_name']}",
})

View File

@ -51,11 +51,11 @@ class Receivers(BugReportModule):
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
receiver["receiver"])
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
continue
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
continue
def run(self) -> None:
content = self._get_dumpstate_file()

View File

@ -49,7 +49,7 @@ def parse_ab_header(data):
'encryption': "none", 'version': 4}
"""
if data.startswith(b"ANDROID BACKUP"):
[magic_header, version, is_compressed, encryption, tar_data] = data.split(b"\n", 4)
[_, version, is_compressed, encryption, _] = data.split(b"\n", 4)
return {
"backup": True,
"compression": (is_compressed == b"1"),
@ -149,7 +149,7 @@ def parse_backup_file(data, password=None):
if not data.startswith(b"ANDROID BACKUP"):
raise AndroidBackupParsingError("Invalid file header")
[magic_header, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4)
[_, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4)
version = int(version)
is_compressed = int(is_compressed)
@ -171,13 +171,14 @@ def parse_tar_for_sms(data):
Returns an array of SMS
"""
dbytes = io.BytesIO(data)
tar = tarfile.open(fileobj=dbytes)
res = []
for member in tar.getmembers():
if member.name.startswith("apps/com.android.providers.telephony/d_f/") and \
(member.name.endswith("_sms_backup") or member.name.endswith("_mms_backup")):
dhandler = tar.extractfile(member)
res.extend(parse_sms_file(dhandler.read()))
with tarfile.open(fileobj=dbytes) as tar:
for member in tar.getmembers():
if member.name.startswith("apps/com.android.providers.telephony/d_f/") and \
(member.name.endswith("_sms_backup") or member.name.endswith("_mms_backup")):
dhandler = tar.extractfile(member)
res.extend(parse_sms_file(dhandler.read()))
return res
@ -192,7 +193,7 @@ def parse_sms_file(data):
json_data = json.loads(data)
for entry in json_data:
# Adapt MMS format to SMS format
# Adapt MMS format to SMS format.
if "mms_body" in entry:
entry["body"] = entry["mms_body"]
entry.pop("mms_body")

View File

@ -213,14 +213,14 @@ def parse_dumpsys_dbinfo(output: str) -> list:
matches = rxp_no_pid.findall(line)
if not matches:
continue
else:
match = matches[0]
results.append({
"isodate": match[0],
"action": match[1],
"sql": match[2],
"path": pool,
})
match = matches[0]
results.append({
"isodate": match[0],
"action": match[1],
"sql": match[2],
"path": pool,
})
else:
match = matches[0]
results.append({

View File

@ -13,9 +13,6 @@ log = logging.getLogger(__name__)
class CmdCheckIOCS(Command):
name = "check-iocs"
modules = []
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
@ -23,6 +20,8 @@ class CmdCheckIOCS(Command):
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-iocs"
def run(self) -> None:
all_modules = []
for entry in self.modules:
@ -33,7 +32,7 @@ class CmdCheckIOCS(Command):
total_detections = 0
for file_name in os.listdir(self.target_path):
name_only, ext = os.path.splitext(file_name)
name_only, _ = os.path.splitext(file_name)
file_path = os.path.join(self.target_path, file_name)
for iocs_module in all_modules:

View File

@ -17,13 +17,14 @@ from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.version import MVT_VERSION
class Command(object):
class Command:
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__)):
self.name = ""
self.modules = []
self.target_path = target_path
self.results_path = results_path
@ -56,11 +57,11 @@ class Command(object):
if not self.results_path:
return
fh = logging.FileHandler(os.path.join(self.results_path, "command.log"))
file_handler = logging.FileHandler(os.path.join(self.results_path, "command.log"))
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
logger.addHandler(fh)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def _store_timeline(self) -> None:
if not self.results_path:
@ -99,23 +100,23 @@ class Command(object):
# enough.
if self.target_path and os.environ.get("MVT_HASH_FILES"):
if os.path.isfile(self.target_path):
h = hashlib.sha256()
sha256 = hashlib.sha256()
with open(self.target_path, "rb") as handle:
h.update(handle.read())
sha256.update(handle.read())
info["hashes"].append({
"file_path": self.target_path,
"sha256": h.hexdigest(),
"sha256": sha256.hexdigest(),
})
elif os.path.isdir(self.target_path):
for (root, dirs, files) in os.walk(self.target_path):
for (root, _, files) in os.walk(self.target_path):
for file in files:
file_path = os.path.join(root, file)
h = hashlib.sha256()
sha256 = hashlib.sha256()
try:
with open(file_path, "rb") as handle:
h.update(handle.read())
sha256.update(handle.read())
except FileNotFoundError:
self.log.error("Failed to hash the file %s: might be a symlink", file_path)
continue
@ -125,10 +126,10 @@ class Command(object):
info["hashes"].append({
"file_path": file_path,
"sha256": h.hexdigest(),
"sha256": sha256.hexdigest(),
})
with open(os.path.join(self.results_path, "info.json"), "w+") as handle:
with open(os.path.join(self.results_path, "info.json"), "w+", encoding="utf-8") as handle:
json.dump(info, handle, indent=4)
def list_modules(self) -> None:

View File

@ -74,6 +74,44 @@ class Indicators:
ioc_coll["count"] += 1
self.total_ioc_count += 1
def _process_indicator(self, indicator: dict, collection: dict) -> None:
key, value = indicator.get("pattern", "").strip("[]").split("=")
if key == "domain-name:value":
# We force domain names to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["domains"])
elif key == "process:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["processes"])
elif key == "email-addr:value":
# We force email addresses to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["emails"])
elif key == "file:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_names"])
elif key == "file:path":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_paths"])
elif key == "file:hashes.sha256":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["files_sha256"])
elif key == "app:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["app_ids"])
elif key == "configuration-profile:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["ios_profile_ids"])
def parse_stix2(self, file_path: str) -> None:
"""Extract indicators from a STIX2 file.
@ -132,47 +170,9 @@ class Indicators:
# Now we look for the correct collection matching the malware ID we
# got from the relationship.
for collection in collections:
if collection["id"] != malware_id:
continue
key, value = indicator.get("pattern", "").strip("[]").split("=")
if key == "domain-name:value":
# We force domain names to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["domains"])
elif key == "process:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["processes"])
elif key == "email-addr:value":
# We force email addresses to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["emails"])
elif key == "file:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_names"])
elif key == "file:path":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_paths"])
elif key == "file:hashes.sha256":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["files_sha256"])
elif key == "app:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["app_ids"])
elif key == "configuration-profile:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["ios_profile_ids"])
break
if collection["id"] == malware_id:
self._process_indicator(indicator, collection)
break
for coll in collections:
self.log.info("Extracted %d indicators for collection with name \"%s\"",
@ -198,7 +198,7 @@ class Indicators:
self._check_stix2_env_variable()
self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count)
def get_iocs(self, ioc_type: str) -> dict:
def get_iocs(self, ioc_type: str) -> dict | None:
for ioc_collection in self.ioc_collections:
for ioc in ioc_collection.get(ioc_type, []):
yield {
@ -208,7 +208,7 @@ class Indicators:
"stix2_file_name": ioc_collection["stix2_file_name"],
}
def check_domain(self, url: str) -> dict:
def check_domain(self, url: str) -> dict | None:
"""Check if a given URL matches any of the provided domain indicators.
:param url: URL to match against domain indicators
@ -280,7 +280,9 @@ class Indicators:
return ioc
def check_domains(self, urls: list) -> dict:
return None
def check_domains(self, urls: list) -> dict | None:
"""Check a list of URLs against the provided list of domain indicators.
:param urls: List of URLs to check against domain indicators
@ -296,7 +298,9 @@ class Indicators:
if check:
return check
def check_process(self, process: str) -> dict:
return None
def check_process(self, process: str) -> dict | None:
"""Check the provided process name against the list of process
indicators.
@ -321,7 +325,9 @@ class Indicators:
process, ioc["name"])
return ioc
def check_processes(self, processes: list) -> dict:
return None
def check_processes(self, processes: list) -> dict | None:
"""Check the provided list of processes against the list of
process indicators.
@ -338,7 +344,9 @@ class Indicators:
if check:
return check
def check_email(self, email: str) -> dict:
return None
def check_email(self, email: str) -> dict | None:
"""Check the provided email against the list of email indicators.
:param email: Email address to check against email indicators
@ -355,7 +363,9 @@ class Indicators:
email, ioc["name"])
return ioc
def check_file_name(self, file_name: str) -> dict:
return None
def check_file_name(self, file_name: str) -> dict | None:
"""Check the provided file name against the list of file indicators.
:param file_name: File name to check against file
@ -373,7 +383,9 @@ class Indicators:
file_name, ioc["name"])
return ioc
def check_file_path(self, file_path: str) -> dict:
return None
def check_file_path(self, file_path: str) -> dict | None:
"""Check the provided file path against the list of file indicators (both path and name).
:param file_path: File path or file name to check against file
@ -396,7 +408,9 @@ class Indicators:
file_path, ioc["name"])
return ioc
def check_profile(self, profile_uuid: str) -> dict:
return None
def check_profile(self, profile_uuid: str) -> dict | None:
"""Check the provided configuration profile UUID against the list of indicators.
:param profile_uuid: Profile UUID to check against configuration profile indicators
@ -413,7 +427,9 @@ class Indicators:
profile_uuid, ioc["name"])
return ioc
def check_file_hash(self, file_hash: str) -> dict:
return None
def check_file_hash(self, file_hash: str) -> dict | None:
"""Check the provided SHA256 file hash against the list of indicators.
:param file_hash: SHA256 hash to check
@ -430,7 +446,9 @@ class Indicators:
file_hash, ioc["name"])
return ioc
def check_app_id(self, app_id: str) -> dict:
return None
def check_app_id(self, app_id: str) -> dict | None:
"""Check the provided app identifier (typically an Android package name)
against the list of indicators.
@ -447,3 +465,5 @@ class Indicators:
self.log.warning("Found a known suspicious app with ID \"%s\" matching indicators from \"%s\"",
app_id, ioc["name"])
return ioc
return None

View File

@ -24,7 +24,7 @@ class InsufficientPrivileges(Exception):
pass
class MVTModule(object):
class MVTModule:
"""This class provides a base for all extraction modules."""
enabled = True
@ -106,7 +106,7 @@ class MVTModule(object):
with open(detected_json_path, "w", encoding="utf-8") as handle:
json.dump(self.detected, handle, indent=4, default=str)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
raise NotImplementedError
@staticmethod
@ -126,7 +126,7 @@ class MVTModule(object):
for result in self.results:
record = self.serialize(result)
if record:
if type(record) == list:
if isinstance(record, list):
self.timeline.extend(record)
else:
self.timeline.append(record)
@ -134,7 +134,7 @@ class MVTModule(object):
for detected in self.detected:
record = self.serialize(detected)
if record:
if type(record) == list:
if isinstance(record, list):
self.timeline_detected.extend(record)
else:
self.timeline_detected.append(record)
@ -173,7 +173,6 @@ def run_module(module: Callable) -> None:
except NotImplementedError:
module.log.info("The %s module does not support checking for indicators",
module.__class__.__name__)
pass
else:
if module.indicators and not module.detected:
module.log.info("The %s module produced no detections!",

View File

@ -13,27 +13,21 @@ class MutuallyExclusiveOption(Option):
def __init__(self, *args, **kwargs):
self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", []))
help = kwargs.get("help", "")
help_msg = kwargs.get("help", "")
if self.mutually_exclusive:
ex_str = ", ".join(self.mutually_exclusive)
kwargs["help"] = help + (
kwargs["help"] = help_msg + (
" NOTE: This argument is mutually exclusive with "
"arguments: [" + ex_str + "]."
)
super(MutuallyExclusiveOption, self).__init__(*args, **kwargs)
super().__init__(*args, **kwargs)
def handle_parse_result(self, ctx, opts, args):
if self.mutually_exclusive.intersection(opts) and self.name in opts:
raise UsageError(
"Illegal usage: `{}` is mutually exclusive with "
"arguments `{}`.".format(
self.name,
", ".join(self.mutually_exclusive)
)
f"Illegal usage: `{self.name}` is mutually exclusive with "
f"arguments `{', '.join(self.mutually_exclusive)}`."
)
return super(MutuallyExclusiveOption, self).handle_parse_result(
ctx,
opts,
args
)
return super().handle_parse_result(ctx, opts, args)

View File

@ -55,7 +55,7 @@ class IndicatorsUpdates:
if not os.path.exists(self.latest_check_path):
return 0
with open(self.latest_check_path, "r") as handle:
with open(self.latest_check_path, "r", encoding="utf-8") as handle:
data = handle.read().strip()
if data:
return int(data)
@ -64,14 +64,14 @@ class IndicatorsUpdates:
def set_latest_check(self) -> None:
timestamp = int(datetime.utcnow().timestamp())
with open(self.latest_check_path, "w") as handle:
with open(self.latest_check_path, "w", encoding="utf-8") as handle:
handle.write(str(timestamp))
def get_latest_update(self) -> int:
if not os.path.exists(self.latest_update_path):
return 0
with open(self.latest_update_path, "r") as handle:
with open(self.latest_update_path, "r", encoding="utf-8") as handle:
data = handle.read().strip()
if data:
return int(data)
@ -80,7 +80,7 @@ class IndicatorsUpdates:
def set_latest_update(self) -> None:
timestamp = int(datetime.utcnow().timestamp())
with open(self.latest_update_path, "w") as handle:
with open(self.latest_update_path, "w", encoding="utf-8") as handle:
handle.write(str(timestamp))
def get_remote_index(self) -> dict:
@ -145,23 +145,25 @@ class IndicatorsUpdates:
self.set_latest_update()
def _get_remote_file_latest_commit(self, owner: str, repo: str,
branch: str, path: str) -> bool:
file_commit_url = f"https://api.github.com/repos/{self.index_owner}/{self.index_repo}/commits?path={self.index_path}"
branch: str, path: str) -> int:
# TODO: The branch is currently not taken into consideration.
# How do we specify which branch to look up to the API?
file_commit_url = f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
res = requests.get(file_commit_url)
if res.status_code != 200:
log.error("Failed to get details about file %s (error %d)",
file_commit_url, res.status_code)
return False
return -1
details = res.json()
if len(details) == 0:
return False
return -1
latest_commit = details[0]
latest_commit_date = latest_commit.get("commit", {}).get("author", {}).get("date", None)
if not latest_commit_date:
log.error("Failed to retrieve date of latest update to indicators index file")
return False
return -1
latest_commit_dt = datetime.strptime(latest_commit_date, '%Y-%m-%dT%H:%M:%SZ')
latest_commit_ts = int(latest_commit_dt.timestamp())

View File

@ -256,7 +256,7 @@ SHORTENER_DOMAINS = [
class URL:
def __init__(self, url: str) -> None:
if type(url) == bytes:
if isinstance(url, bytes):
url = url.decode()
self.url = url
@ -315,3 +315,5 @@ class URL:
res = requests.head(self.url)
if str(res.status_code).startswith("30"):
return res.headers["Location"]
return ""

View File

@ -23,7 +23,7 @@ def convert_mactime_to_unix(timestamp, from_2001: bool = True):
# This is to fix formats in case of, for example, SMS messages database
# timestamp format.
if type(timestamp) == int and len(str(timestamp)) == 18:
if isinstance(timestamp, int) and len(str(timestamp)) == 18:
timestamp = int(str(timestamp)[:9])
# MacTime counts from 2001-01-01.
@ -106,8 +106,8 @@ def keys_bytes_to_string(obj) -> str:
if isinstance(obj, (tuple, list, set)):
value = [keys_bytes_to_string(x) for x in obj]
return value
else:
return obj
return obj
for key, value in obj.items():
if isinstance(key, bytes):

View File

@ -35,11 +35,12 @@ def virustotal_lookup(file_hash: str):
if res.status_code == 200:
report = res.json()
return report["data"]
elif res.status_code == 404:
if res.status_code == 404:
log.info("Could not find results for file with hash %s", file_hash)
elif res.status_code == 429:
raise VTQuotaExceeded("You have exceeded the quota for your VirusTotal API key")
else:
raise Exception("Unexpected response from VirusTotal: %s", res.status_code)
raise Exception(f"Unexpected response from VirusTotal: {res.status_code}")
return None

View File

@ -71,7 +71,7 @@ def decrypt_backup(ctx, destination, password, key_file, backup_path):
if key_file:
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring environment variable, using --key-file '%s' instead",
log.info("Ignoring %s environment variable, using --key-file '%s' instead",
MVT_IOS_BACKUP_PASSWORD, key_file)
backup.decrypt_with_key_file(key_file)

View File

@ -15,9 +15,6 @@ log = logging.getLogger(__name__)
class CmdIOSCheckBackup(Command):
name = "check-backup"
modules = BACKUP_MODULES + MIXED_MODULES
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
@ -25,5 +22,8 @@ class CmdIOSCheckBackup(Command):
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-backup"
self.modules = BACKUP_MODULES + MIXED_MODULES
def module_init(self, module):
module.is_backup = True

View File

@ -15,9 +15,6 @@ log = logging.getLogger(__name__)
class CmdIOSCheckFS(Command):
name = "check-fs"
modules = FS_MODULES + MIXED_MODULES
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
@ -25,5 +22,8 @@ class CmdIOSCheckFS(Command):
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-fs"
self.modules = FS_MODULES + MIXED_MODULES
def module_init(self, module):
module.is_fs_dump = True

View File

@ -26,9 +26,9 @@ class ConfigurationProfiles(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
if not record["install_date"]:
return
return {}
payload_name = record['plist'].get('PayloadDisplayName')
payload_description = record['plist'].get('PayloadDescription')
@ -106,12 +106,12 @@ class ConfigurationProfiles(IOSExtraction):
conf_plist["LastPushTokenHash"] = b64encode(conf_plist["LastPushTokenHash"])
if "PayloadContent" in conf_plist:
for x in range(len(conf_plist["PayloadContent"])):
if "PERSISTENT_REF" in conf_plist["PayloadContent"][x]:
conf_plist["PayloadContent"][x]["PERSISTENT_REF"] = b64encode(conf_plist["PayloadContent"][x]["PERSISTENT_REF"])
for content_entry in range(len(conf_plist["PayloadContent"])):
if "PERSISTENT_REF" in conf_plist["PayloadContent"][content_entry]:
conf_plist["PayloadContent"][content_entry]["PERSISTENT_REF"] = b64encode(conf_plist["PayloadContent"][content_entry]["PERSISTENT_REF"])
if "IdentityPersistentRef" in conf_plist["PayloadContent"][x]:
conf_plist["PayloadContent"][x]["IdentityPersistentRef"] = b64encode(conf_plist["PayloadContent"][x]["IdentityPersistentRef"])
if "IdentityPersistentRef" in conf_plist["PayloadContent"][content_entry]:
conf_plist["PayloadContent"][content_entry]["IdentityPersistentRef"] = b64encode(conf_plist["PayloadContent"][content_entry]["IdentityPersistentRef"])
self.results.append({
"file_id": conf_file["file_id"],

View File

@ -46,23 +46,24 @@ class Manifest(IOSExtraction):
"""
if isinstance(timestamp_or_unix_time_int, datetime.datetime):
return convert_timestamp_to_iso(timestamp_or_unix_time_int)
else:
timestamp = datetime.datetime.utcfromtimestamp(timestamp_or_unix_time_int)
return convert_timestamp_to_iso(timestamp)
def serialize(self, record: dict) -> None:
timestamp = datetime.datetime.utcfromtimestamp(timestamp_or_unix_time_int)
return convert_timestamp_to_iso(timestamp)
def serialize(self, record: dict) -> []:
records = []
if "modified" not in record or "status_changed" not in record:
return
for ts in set([record["created"], record["modified"], record["status_changed"]]):
return records
for timestamp in set([record["created"], record["modified"], record["status_changed"]]):
macb = ""
macb += "M" if ts == record["modified"] else "-"
macb += "M" if timestamp == record["modified"] else "-"
macb += "-"
macb += "C" if ts == record["status_changed"] else "-"
macb += "B" if ts == record["created"] else "-"
macb += "C" if timestamp == record["status_changed"] else "-"
macb += "B" if timestamp == record["created"] else "-"
records.append({
"timestamp": ts,
"timestamp": timestamp,
"module": self.__class__.__name__,
"event": macb,
"data": f"{record['relative_path']} - {record['domain']}"
@ -136,7 +137,6 @@ class Manifest(IOSExtraction):
except Exception:
self.log.exception("Error reading manifest file metadata for file with ID %s and relative path %s",
file_data["fileID"], file_data["relativePath"])
pass
self.results.append(cleaned_metadata)

View File

@ -27,7 +27,7 @@ class ProfileEvents(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,

View File

@ -94,7 +94,7 @@ class IOSExtraction(MVTModule):
elif domain:
cur.execute(f"{base_sql} domain = ?;", (domain,))
except Exception as e:
raise DatabaseCorruptedError("failed to query Manifest.db: %s", e)
raise DatabaseCorruptedError(f"failed to query Manifest.db: {e}")
for row in cur:
yield {

View File

@ -27,7 +27,7 @@ class Analytics(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -25,7 +25,7 @@ class AnalyticsIOSVersions(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -45,25 +45,25 @@ class AnalyticsIOSVersions(IOSExtraction):
if not build:
continue
ts = result.get("isodate", None)
if not ts:
isodate = result.get("isodate", None)
if not isodate:
continue
if build not in builds.keys():
builds[build] = ts
builds[build] = isodate
continue
result_dt = datetime.strptime(ts, dt_format)
result_dt = datetime.strptime(isodate, dt_format)
cur_dt = datetime.strptime(builds[build], dt_format)
if result_dt < cur_dt:
builds[build] = ts
builds[build] = isodate
for build, ts in builds.items():
for build, isodate in builds.items():
version = find_version_by_build(build)
self.results.append({
"isodate": ts,
"isodate": isodate,
"build": build,
"version": version,
})

View File

@ -20,7 +20,7 @@ class CacheFiles(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
records = []
for item in self.results[record]:
records.append({
@ -74,7 +74,7 @@ class CacheFiles(IOSExtraction):
def run(self) -> None:
self.results = {}
for root, dirs, files in os.walk(self.target_path):
for root, _, files in os.walk(self.target_path):
for file_name in files:
if file_name != "Cache.db":
continue

View File

@ -27,7 +27,7 @@ class Filesystem(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["modified"],
"module": self.__class__.__name__,

View File

@ -27,7 +27,7 @@ class SafariFavicon(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -25,7 +25,7 @@ class ShutdownLog(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -27,7 +27,7 @@ class IOSVersionHistory(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -29,7 +29,7 @@ class WebkitIndexedDB(WebkitBase):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -27,7 +27,7 @@ class WebkitLocalStorage(WebkitBase):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -29,7 +29,7 @@ class Calls(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -32,7 +32,7 @@ class ChromeFavicon(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -31,7 +31,7 @@ class ChromeHistory(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -30,7 +30,7 @@ class FirefoxFavicon(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -34,7 +34,7 @@ class FirefoxHistory(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -31,7 +31,7 @@ class IDStatusCache(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -42,27 +42,27 @@ class InteractionC(IOSExtraction):
"last_outgoing_recipient_date",
]
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
records = []
processed = []
for ts in self.timestamps:
for timestamp in self.timestamps:
# Check if the record has the current timestamp.
if ts not in record or not record[ts]:
if timestamp not in record or not record[timestamp]:
continue
# Check if the timestamp was already processed.
if record[ts] in processed:
if record[timestamp] in processed:
continue
records.append({
"timestamp": record[ts],
"timestamp": record[timestamp],
"module": self.__class__.__name__,
"event": ts,
"event": timestamp,
"data": f"[{record['bundle_id']}] {record['account']} - from {record['sender_display_name']} "
f"({record['sender_identifier']}) to {record['recipient_display_name']} "
f"({record['recipient_identifier']}): {record['content']}"
})
processed.append(record[ts])
processed.append(record[timestamp])
return records

View File

@ -42,7 +42,7 @@ class LocationdClients(IOSExtraction):
"BeaconRegionTimeStopped",
]
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
records = []
for timestamp in self.timestamps:
if timestamp in record.keys():
@ -102,12 +102,12 @@ class LocationdClients(IOSExtraction):
with open(file_path, "rb") as handle:
file_plist = plistlib.load(handle)
for key, values in file_plist.items():
for key, _ in file_plist.items():
result = file_plist[key]
result["package"] = key
for ts in self.timestamps:
if ts in result.keys():
result[ts] = convert_timestamp_to_iso(convert_mactime_to_unix(result[ts]))
for timestamp in self.timestamps:
if timestamp in result.keys():
result[timestamp] = convert_timestamp_to_iso(convert_mactime_to_unix(result[timestamp]))
self.results.append(result)

View File

@ -29,7 +29,7 @@ class OSAnalyticsADDaily(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
record_data = f"{record['package']} WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \
f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}"
return {

View File

@ -34,7 +34,7 @@ class SafariBrowserState(IOSExtraction):
self._session_history_count = 0
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["last_viewed_timestamp"],
"module": self.__class__.__name__,

View File

@ -34,7 +34,7 @@ class SafariHistory(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -33,13 +33,14 @@ class Shortcuts(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
found_urls = ""
if record["action_urls"]:
found_urls = "- URLs in actions: {}".format(", ".join(record["action_urls"]))
found_urls = f"- URLs in actions: {', '.join(record['action_urls'])}"
desc = ""
if record["description"]:
desc = record["description"].decode('utf-8', errors='ignore')
desc = record["description"].decode("utf-8", errors="ignore")
return [{
"timestamp": record["isodate"],

View File

@ -31,7 +31,7 @@ class SMS(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
text = record["text"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],

View File

@ -30,7 +30,7 @@ class SMSAttachments(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,

View File

@ -56,12 +56,13 @@ class TCC(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
if "last_modified" in record:
if "allowed_value" in record:
msg = f"Access to {record['service']} by {record['client']} {record['allowed_value']}"
else:
msg = f"Access to {record['service']} by {record['client']} {record['auth_value']}"
return {
"timestamp": record["last_modified"],
"module": self.__class__.__name__,
@ -69,6 +70,8 @@ class TCC(IOSExtraction):
"data": msg
}
return {}
def check_indicators(self) -> None:
if not self.indicators:
return

View File

@ -58,7 +58,7 @@ class WebkitSessionResourceLog(IOSExtraction):
if not self.indicators:
return
for key, entries in self.results.items():
for _, entries in self.results.items():
for entry in entries:
source_domains = self._extract_domains(entry["redirect_source"])
destination_domains = self._extract_domains(entry["redirect_destination"])

View File

@ -30,7 +30,7 @@ class Whatsapp(IOSExtraction):
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
text = record.get("ZTEXT", "").replace("\n", "\\n")
links_text = ""
if record["links"]:

View File

@ -81,7 +81,7 @@ class NetBase(IOSExtraction):
self.log.info("Extracted information on %d processes", len(self.results))
def serialize(self, record: dict) -> None:
def serialize(self, record: dict) -> dict | list:
record_data = f"{record['proc_name']} (Bundle ID: {record['bundle_id']}, ID: {record['proc_id']})"
record_data_usage = record_data + f" WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \
f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}"
@ -134,7 +134,7 @@ class NetBase(IOSExtraction):
except PermissionError:
continue
files.append([posix_path.name, posix_path.__str__()])
files.append([posix_path.name, str(posix_path)])
for proc in self.results:
if not proc["bundle_id"]:
@ -150,7 +150,7 @@ class NetBase(IOSExtraction):
self.log.debug("Located at %s", binary_path)
else:
msg = f"Could not find the binary associated with the process with name {proc['proc_name']}"
if (proc["proc_name"] is None):
if not proc["proc_name"]:
msg = f"Found process entry with empty 'proc_name': {proc['live_proc_id']} at {proc['live_isodate']}"
elif len(proc["proc_name"]) == 16:
msg = msg + " (However, the process name might have been truncated in the database)"
@ -209,7 +209,7 @@ class NetBase(IOSExtraction):
# Set default DataUsage keys.
result = {key: None for key in self.results[0].keys()}
result["first_isodate"] = result["isodate"] = result["live_isodate"] = proc["prev_proc_first"]
result["proc_name"] = "MISSING [follows {}]".format(proc["prev_proc_name"])
result["proc_name"] = f"MISSING [follows {proc['prev_proc_name']}]"
result["proc_id"] = result["live_proc_id"] = proc["proc_id"]
result["bundle_id"] = None

View File

@ -247,10 +247,12 @@ IPHONE_IOS_VERSIONS = [
def get_device_desc_from_id(identifier: str,
devices_list: list = IPHONE_MODELS) -> str:
for model in IPHONE_MODELS:
for model in devices_list:
if identifier == model["identifier"]:
return model["description"]
return ""
def find_version_by_build(build: str) -> str:
build = build.upper()
@ -258,6 +260,8 @@ def find_version_by_build(build: str) -> str:
if build == version["build"]:
return version["version"]
return ""
def latest_ios_version() -> str:
return IPHONE_IOS_VERSIONS[-1]

View File

@ -43,7 +43,7 @@ console_scripts =
mvt-android = mvt.android:cli
[flake8]
max-complexit = 10
max-complexity = 10
max-line-length = 1000
ignore =
C901,
@ -52,3 +52,34 @@ ignore =
E127,
W503,
E226
[pylint]
score = no
reports = no
output-format = colorized
max-locals = 25
max-args = 10
good-names = i,e,m
min-similarity-lines=10
ignore-comments=yes
ignore-docstrings=yes
ignore-imports=yes
ignored-argument-names=args|kwargs
# https://pylint.pycqa.org/en/stable/technical_reference/features.html
disable =
too-many-instance-attributes,
broad-except,
abstract-method,
dangerous-default-value,
#duplicate-code,
line-too-long,
too-few-public-methods,
missing-docstring,
missing-module-docstring,
missing-class-docstring,
missing-function-docstring,