Starting to add type hints

This commit is contained in:
Nex 2022-06-17 22:30:46 +02:00
parent b77749e6ba
commit 2bc6fbef2f
86 changed files with 514 additions and 386 deletions

View File

@ -17,8 +17,9 @@ class CmdAndroidCheckADB(Command):
name = "check-adb"
modules = ADB_MODULES
def __init__(self, target_path=None, results_path=None, ioc_files=[],
module_name=None, serial=None, fast_mode=False):
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)

View File

@ -26,8 +26,9 @@ class CmdAndroidCheckBackup(Command):
name = "check-backup"
modules = BACKUP_MODULES
def __init__(self, target_path=None, results_path=None, ioc_files=[],
module_name=None, serial=None, fast_mode=False):
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)

View File

@ -20,8 +20,9 @@ class CmdAndroidCheckBugreport(Command):
name = "check-bugreport"
modules = BUGREPORT_MODULES
def __init__(self, target_path=None, results_path=None, ioc_files=[],
module_name=None, serial=None, fast_mode=False):
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)

View File

@ -176,7 +176,7 @@ class DownloadAPKs(AndroidExtraction):
with open(json_path, "w", encoding="utf-8") as handle:
json.dump(self.packages, handle, indent=4)
def run(self):
def run(self) -> None:
"""Run all steps of fetch-apk."""
self.get_packages()
self._adb_connect()

View File

@ -33,8 +33,9 @@ ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub")
class AndroidExtraction(MVTModule):
"""This class provides a base for all Android extraction modules."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
@ -273,6 +274,6 @@ class AndroidExtraction(MVTModule):
self.log.warn("All attempts to decrypt backup with password failed!")
def run(self):
def run(self) -> None:
"""Run the main procedure."""
raise NotImplementedError

View File

@ -20,13 +20,14 @@ CHROME_HISTORY_PATH = "data/data/com.android.chrome/app_chrome/Default/History"
class ChromeHistory(AndroidExtraction):
"""This module extracts records from Android's Chrome browsing history."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -34,7 +35,7 @@ class ChromeHistory(AndroidExtraction):
"data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, redirect source: {record['redirect_source']})"
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -77,7 +78,7 @@ class ChromeHistory(AndroidExtraction):
log.info("Extracted a total of %d history items", len(self.results))
def run(self):
def run(self) -> None:
try:
self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH),
self._parse_db)

View File

@ -15,13 +15,14 @@ log = logging.getLogger(__name__)
class DumpsysAccessibility(AndroidExtraction):
"""This module extracts stats on accessibility."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -32,7 +33,7 @@ class DumpsysAccessibility(AndroidExtraction):
self.detected.append(result)
continue
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys accessibility")
self._adb_disconnect()

View File

@ -15,15 +15,16 @@ log = logging.getLogger(__name__)
class DumpsysActivities(AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -35,7 +36,7 @@ class DumpsysActivities(AndroidExtraction):
self.detected.append({intent: activity})
continue
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys package")
self._adb_disconnect()

View File

@ -17,13 +17,14 @@ class DumpsysAppOps(AndroidExtraction):
slug = "dumpsys_appops"
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
records = []
for perm in record["permissions"]:
if "entries" not in perm:
@ -40,7 +41,7 @@ class DumpsysAppOps(AndroidExtraction):
return records
def check_indicators(self):
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
@ -54,7 +55,7 @@ class DumpsysAppOps(AndroidExtraction):
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission",
result["package_name"])
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys appops")
self._adb_disconnect()

View File

@ -15,13 +15,14 @@ log = logging.getLogger(__name__)
class DumpsysBatteryDaily(AndroidExtraction):
"""This module extracts records from battery daily updates."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["from"],
"module": self.__class__.__name__,
@ -29,7 +30,7 @@ class DumpsysBatteryDaily(AndroidExtraction):
"data": f"Recorded update of package {record['package_name']} with vers {record['vers']}"
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -40,7 +41,7 @@ class DumpsysBatteryDaily(AndroidExtraction):
self.detected.append(result)
continue
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys batterystats --daily")
self._adb_disconnect()

View File

@ -15,13 +15,14 @@ log = logging.getLogger(__name__)
class DumpsysBatteryHistory(AndroidExtraction):
"""This module extracts records from battery history events."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -32,7 +33,7 @@ class DumpsysBatteryHistory(AndroidExtraction):
self.detected.append(result)
continue
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys batterystats --history")
self._adb_disconnect()

View File

@ -17,13 +17,14 @@ class DumpsysDBInfo(AndroidExtraction):
slug = "dumpsys_dbinfo"
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -36,7 +37,7 @@ class DumpsysDBInfo(AndroidExtraction):
self.detected.append(result)
continue
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys dbinfo")
self._adb_disconnect()

View File

@ -14,13 +14,14 @@ log = logging.getLogger(__name__)
class DumpsysFull(AndroidExtraction):
"""This module extracts stats on battery consumption by processes."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys")

View File

@ -21,15 +21,16 @@ INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL"
class DumpsysReceivers(AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -57,7 +58,7 @@ class DumpsysReceivers(AndroidExtraction):
self.detected.append({intent: receiver})
continue
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys package")

View File

@ -17,8 +17,9 @@ log = logging.getLogger(__name__)
class Files(AndroidExtraction):
"""This module extracts the list of files on the device."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
@ -46,7 +47,7 @@ class Files(AndroidExtraction):
for file_line in output.splitlines():
self.results.append({"path": file_line.rstrip()})
def serialize(self, record):
def serialize(self, record: dict) -> None:
if "modified_time" in record:
return {
"timestamp": record["modified_time"],
@ -63,7 +64,7 @@ class Files(AndroidExtraction):
result["path"])
self.detected.append(result)
def check_indicators(self):
def check_indicators(self) -> None:
"""Check file list for known suspicious files or suspicious properties"""
self.check_suspicious()
@ -75,7 +76,7 @@ class Files(AndroidExtraction):
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
self.detected.append(result)
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")

View File

@ -16,15 +16,16 @@ log = logging.getLogger(__name__)
class Getprop(AndroidExtraction):
"""This module extracts device properties from getprop command."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("getprop")
self._adb_disconnect()

View File

@ -14,13 +14,14 @@ log = logging.getLogger(__name__)
class Logcat(AndroidExtraction):
"""This module extracts details on installed packages."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):
def run(self) -> None:
self._adb_connect()
# Get the current logcat.

View File

@ -72,13 +72,14 @@ ROOT_PACKAGES = [
class Packages(AndroidExtraction):
"""This module extracts the list of installed packages."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
records = []
timestamps = [
@ -97,7 +98,7 @@ class Packages(AndroidExtraction):
return records
def check_indicators(self):
def check_indicators(self) -> None:
for result in self.results:
if result["package_name"] in ROOT_PACKAGES:
self.log.warning("Found an installed package related to rooting/jailbreaking: \"%s\"",
@ -238,7 +239,7 @@ class Packages(AndroidExtraction):
return package_files
def run(self):
def run(self) -> None:
self._adb_connect()
packages = self._adb_command("pm list packages -u -i -f")

View File

@ -13,13 +13,14 @@ log = logging.getLogger(__name__)
class Processes(AndroidExtraction):
"""This module extracts details on running processes."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -29,7 +30,7 @@ class Processes(AndroidExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("ps -e")

View File

@ -13,13 +13,14 @@ log = logging.getLogger(__name__)
class RootBinaries(AndroidExtraction):
"""This module extracts the list of installed packages."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):
def run(self) -> None:
root_binaries = [
"su",
"busybox",

View File

@ -15,15 +15,16 @@ class SELinuxStatus(AndroidExtraction):
slug = "selinux_status"
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results
def run(self):
def run(self) -> None:
self._adb_connect()
output = self._adb_command("getenforce")
self._adb_disconnect()

View File

@ -57,15 +57,16 @@ ANDROID_DANGEROUS_SETTINGS = [
class Settings(AndroidExtraction):
"""This module extracts Android system settings."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results
def check_indicators(self):
def check_indicators(self) -> None:
for namespace, settings in self.results.items():
for key, value in settings.items():
for danger in ANDROID_DANGEROUS_SETTINGS:
@ -76,7 +77,7 @@ class Settings(AndroidExtraction):
key, value, danger["description"])
break
def run(self):
def run(self) -> None:
self._adb_connect()
for namespace in ["system", "secure", "global"]:

View File

@ -46,13 +46,14 @@ FROM sms;
class SMS(AndroidExtraction):
"""This module extracts all SMS messages containing links."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
body = record["body"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],
@ -61,7 +62,7 @@ class SMS(AndroidExtraction):
"data": f"{record['address']}: \"{body}\""
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -126,7 +127,7 @@ class SMS(AndroidExtraction):
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
def run(self):
def run(self) -> None:
try:
if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))):
self.SMS_DB_TYPE = 1

View File

@ -20,13 +20,14 @@ WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db"
class Whatsapp(AndroidExtraction):
"""This module extracts all WhatsApp messages containing links."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
text = record["data"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],
@ -35,7 +36,7 @@ class Whatsapp(AndroidExtraction):
"data": f"\"{text}\""
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -84,7 +85,7 @@ class Whatsapp(AndroidExtraction):
log.info("Extracted a total of %d WhatsApp messages containing links", len(messages))
self.results = messages
def run(self):
def run(self) -> None:
try:
self._adb_process_file(os.path.join("/", WHATSAPP_PATH), self._parse_db)
except Exception as e:

View File

@ -3,19 +3,22 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.modules.backup.base import BackupExtraction
from mvt.android.parsers.backup import parse_sms_file
class SMS(BackupExtraction):
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = []
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -26,7 +29,7 @@ class SMS(BackupExtraction):
if self.indicators.check_domains(message["links"]):
self.detected.append(message)
def run(self):
def run(self) -> None:
for file in self._get_files_by_pattern("apps/com.android.providers.telephony/d_f/*_sms_backup"):
self.log.info("Processing SMS backup file at %s", file)
data = self._get_file_content(file)

View File

@ -15,13 +15,14 @@ log = logging.getLogger(__name__)
class Accessibility(BugReportModule):
"""This module extracts stats on accessibility."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -32,7 +33,7 @@ class Accessibility(BugReportModule):
self.detected.append(result)
continue
def run(self):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?")

View File

@ -15,15 +15,16 @@ log = logging.getLogger(__name__)
class Activities(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -35,7 +36,7 @@ class Activities(BugReportModule):
self.detected.append({intent: activity})
continue
def run(self):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?")

View File

@ -15,13 +15,14 @@ log = logging.getLogger(__name__)
class Appops(BugReportModule):
"""This module extracts information on package from App-Ops Manager."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
records = []
for perm in record["permissions"]:
if "entries" not in perm:
@ -38,7 +39,7 @@ class Appops(BugReportModule):
return records
def check_indicators(self):
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
@ -51,7 +52,7 @@ class Appops(BugReportModule):
if perm["name"] == "REQUEST_INSTALL_PACKAGES" and perm["access"] == "allow":
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission", result["package_name"])
def run(self):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?")

View File

@ -15,13 +15,14 @@ log = logging.getLogger(__name__)
class BatteryDaily(BugReportModule):
"""This module extracts records from battery daily updates."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["from"],
"module": self.__class__.__name__,
@ -29,7 +30,7 @@ class BatteryDaily(BugReportModule):
"data": f"Recorded update of package {record['package_name']} with vers {record['vers']}"
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -40,7 +41,7 @@ class BatteryDaily(BugReportModule):
self.detected.append(result)
continue
def run(self):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?")

View File

@ -15,13 +15,14 @@ log = logging.getLogger(__name__)
class BatteryHistory(BugReportModule):
"""This module extracts records from battery daily updates."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -32,7 +33,7 @@ class BatteryHistory(BugReportModule):
self.detected.append(result)
continue
def run(self):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?")

View File

@ -17,13 +17,14 @@ class DBInfo(BugReportModule):
slug = "dbinfo"
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -36,7 +37,7 @@ class DBInfo(BugReportModule):
self.detected.append(result)
continue
def run(self):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?")

View File

@ -16,15 +16,16 @@ log = logging.getLogger(__name__)
class Getprop(BugReportModule):
"""This module extracts device properties from getprop command."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results
def run(self):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?")

View File

@ -14,13 +14,14 @@ log = logging.getLogger(__name__)
class Packages(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
records = []
timestamps = [
@ -39,7 +40,7 @@ class Packages(BugReportModule):
return records
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -133,7 +134,7 @@ class Packages(BugReportModule):
return results
def run(self):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?")

View File

@ -21,15 +21,16 @@ INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL"
class Receivers(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -57,7 +58,7 @@ class Receivers(BugReportModule):
self.detected.append({intent: receiver})
continue
def run(self):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?")

View File

@ -9,7 +9,7 @@ from datetime import datetime
from mvt.common.utils import convert_timestamp_to_iso
def parse_dumpsys_accessibility(output):
def parse_dumpsys_accessibility(output: str) -> list:
results = []
in_services = False
@ -34,7 +34,7 @@ def parse_dumpsys_accessibility(output):
return results
def parse_dumpsys_activity_resolver_table(output):
def parse_dumpsys_activity_resolver_table(output: str) -> dict:
results = {}
in_activity_resolver_table = False
@ -90,7 +90,7 @@ def parse_dumpsys_activity_resolver_table(output):
return results
def parse_dumpsys_battery_daily(output):
def parse_dumpsys_battery_daily(output: str) -> list:
results = []
daily = None
daily_updates = []
@ -136,7 +136,7 @@ def parse_dumpsys_battery_daily(output):
return results
def parse_dumpsys_battery_history(output):
def parse_dumpsys_battery_history(output: str) -> list:
results = []
for line in output.splitlines():
@ -181,7 +181,7 @@ def parse_dumpsys_battery_history(output):
return results
def parse_dumpsys_dbinfo(output):
def parse_dumpsys_dbinfo(output: str) -> list:
results = []
rxp = re.compile(r'.*\[([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})\].*\[Pid:\((\d+)\)\](\w+).*sql\=\"(.+?)\"')
@ -234,7 +234,7 @@ def parse_dumpsys_dbinfo(output):
return results
def parse_dumpsys_receiver_resolver_table(output):
def parse_dumpsys_receiver_resolver_table(output: str) -> dict:
results = {}
in_receiver_resolver_table = False
@ -290,7 +290,7 @@ def parse_dumpsys_receiver_resolver_table(output):
return results
def parse_dumpsys_appops(output):
def parse_dumpsys_appops(output: str) -> list:
results = []
perm = {}
package = {}

View File

@ -6,7 +6,7 @@
import re
def parse_getprop(output):
def parse_getprop(output: str) -> dict:
results = {}
rxp = re.compile(r"\[(.+?)\]: \[(.+?)\]")

View File

@ -16,13 +16,14 @@ class CmdCheckIOCS(Command):
name = "check-iocs"
modules = []
def __init__(self, target_path=None, results_path=None, ioc_files=[],
module_name=None, serial=None, fast_mode=False):
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
def run(self):
def run(self) -> None:
all_modules = []
for entry in self.modules:
if entry not in all_modules:

View File

@ -9,6 +9,7 @@ import logging
import os
import sys
from datetime import datetime
from typing import Callable
from mvt.common.indicators import Indicators
from mvt.common.module import run_module, save_timeline
@ -18,9 +19,10 @@ from mvt.common.version import MVT_VERSION
class Command(object):
def __init__(self, target_path=None, results_path=None, ioc_files=[],
module_name=None, serial=None, fast_mode=False,
log=logging.getLogger(__name__)):
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__)):
self.name = ""
self.target_path = target_path
@ -37,7 +39,7 @@ class Command(object):
self.timeline = []
self.timeline_detected = []
def _create_storage(self):
def _create_storage(self) -> None:
if self.results_path and not os.path.exists(self.results_path):
try:
os.makedirs(self.results_path)
@ -46,7 +48,7 @@ class Command(object):
self.results_path, e)
sys.exit(1)
def _add_log_file_handler(self, logger):
def _add_log_file_handler(self, logger: logging.Logger) -> None:
if not self.results_path:
return
@ -56,7 +58,7 @@ class Command(object):
fh.setFormatter(formatter)
logger.addHandler(fh)
def _store_timeline(self):
def _store_timeline(self) -> None:
if not self.results_path:
return
@ -68,12 +70,16 @@ class Command(object):
save_timeline(self.timeline_detected,
os.path.join(self.results_path, "timeline_detected.csv"))
def _store_info(self):
def _store_info(self) -> None:
if not self.results_path:
return
target_path = None
if self.target_path:
target_path = os.path.abspath(self.target_path)
info = {
"target_path": os.path.abspath(self.target_path),
"target_path": target_path,
"mvt_version": MVT_VERSION,
"date": convert_timestamp_to_iso(datetime.now()),
"ioc_files": [],
@ -83,44 +89,45 @@ class Command(object):
for coll in self.iocs.ioc_collections:
info["ioc_files"].append(coll.get("stix2_file_path", ""))
if os.path.isfile(self.target_path):
h = hashlib.sha256()
with open(self.target_path, "rb") as handle:
h.update(handle.read())
if self.target_path:
if os.path.isfile(self.target_path):
h = hashlib.sha256()
with open(self.target_path, "rb") as handle:
h.update(handle.read())
info["hashes"].append({
"file_path": self.target_path,
"sha256": h.hexdigest(),
})
elif os.path.isdir(self.target_path):
for (root, dirs, files) in os.walk(self.target_path):
for file in files:
file_path = os.path.join(root, file)
h = hashlib.sha256()
info["hashes"].append({
"file_path": self.target_path,
"sha256": h.hexdigest(),
})
elif os.path.isdir(self.target_path):
for (root, dirs, files) in os.walk(self.target_path):
for file in files:
file_path = os.path.join(root, file)
h = hashlib.sha256()
with open(file_path, "rb") as handle:
h.update(handle.read())
with open(file_path, "rb") as handle:
h.update(handle.read())
info["hashes"].append({
"file_path": file_path,
"sha256": h.hexdigest(),
})
info["hashes"].append({
"file_path": file_path,
"sha256": h.hexdigest(),
})
with open(os.path.join(self.results_path, "info.json"), "w+") as handle:
json.dump(info, handle, indent=4)
def list_modules(self):
def list_modules(self) -> None:
self.log.info("Following is the list of available %s modules:", self.name)
for module in self.modules:
self.log.info(" - %s", module.__name__)
def init(self):
def init(self) -> None:
raise NotImplementedError
def module_init(self, module):
def module_init(self, module: Callable) -> None:
raise NotImplementedError
def run(self):
def run(self) -> None:
self._create_storage()
self._add_log_file_handler(self.log)

View File

@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import json
import logging
import os
import requests
@ -17,13 +18,13 @@ class Indicators:
functions to compare extracted artifacts to the indicators.
"""
def __init__(self, log=None):
def __init__(self, log=logging.Logger) -> None:
self.data_dir = user_data_dir("mvt")
self.log = log
self.ioc_collections = []
self.total_ioc_count = 0
def _load_downloaded_indicators(self):
def _load_downloaded_indicators(self) -> None:
if not os.path.isdir(self.data_dir):
return
@ -31,7 +32,7 @@ class Indicators:
if f.lower().endswith(".stix2"):
self.parse_stix2(os.path.join(self.data_dir, f))
def _check_stix2_env_variable(self):
def _check_stix2_env_variable(self) -> None:
"""
Checks if a variable MVT_STIX2 contains path to a STIX files.
"""
@ -46,8 +47,8 @@ class Indicators:
self.log.error("Path specified with env MVT_STIX2 is not a valid file: %s",
path)
def _new_collection(self, cid="", name="", description="", file_name="",
file_path=""):
def _new_collection(self, cid: str = "", name: str = "", description: str = "",
file_name: str = "", file_path: str = "") -> dict:
return {
"id": cid,
"name": name,
@ -65,14 +66,14 @@ class Indicators:
"count": 0,
}
def _add_indicator(self, ioc, ioc_coll, ioc_coll_list):
def _add_indicator(self, ioc: str, ioc_coll: dict, ioc_coll_list: list) -> None:
ioc = ioc.strip("'")
if ioc not in ioc_coll_list:
ioc_coll_list.append(ioc)
ioc_coll["count"] += 1
self.total_ioc_count += 1
def parse_stix2(self, file_path):
def parse_stix2(self, file_path: str) -> None:
"""Extract indicators from a STIX2 file.
:param file_path: Path to the STIX2 file to parse
@ -178,7 +179,7 @@ class Indicators:
self.ioc_collections.extend(collections)
def load_indicators_files(self, files, load_default=True):
def load_indicators_files(self, files: list, load_default: bool = True) -> None:
"""
Load a list of indicators files.
"""
@ -196,7 +197,7 @@ class Indicators:
self._check_stix2_env_variable()
self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count)
def get_iocs(self, ioc_type):
def get_iocs(self, ioc_type: str) -> dict:
for ioc_collection in self.ioc_collections:
for ioc in ioc_collection.get(ioc_type, []):
yield {
@ -206,7 +207,7 @@ class Indicators:
"stix2_file_name": ioc_collection["stix2_file_name"],
}
def check_domain(self, url):
def check_domain(self, url: str) -> dict:
"""Check if a given URL matches any of the provided domain indicators.
:param url: URL to match against domain indicators
@ -278,7 +279,7 @@ class Indicators:
return ioc
def check_domains(self, urls):
def check_domains(self, urls: list) -> dict:
"""Check a list of URLs against the provided list of domain indicators.
:param urls: List of URLs to check against domain indicators
@ -294,7 +295,7 @@ class Indicators:
if check:
return check
def check_process(self, process):
def check_process(self, process: str) -> dict:
"""Check the provided process name against the list of process
indicators.
@ -319,7 +320,7 @@ class Indicators:
process, ioc["name"])
return ioc
def check_processes(self, processes):
def check_processes(self, processes: list) -> dict:
"""Check the provided list of processes against the list of
process indicators.
@ -336,7 +337,7 @@ class Indicators:
if check:
return check
def check_email(self, email):
def check_email(self, email: str) -> dict:
"""Check the provided email against the list of email indicators.
:param email: Email address to check against email indicators
@ -353,7 +354,7 @@ class Indicators:
email, ioc["name"])
return ioc
def check_file_name(self, file_name):
def check_file_name(self, file_name: str) -> dict:
"""Check the provided file name against the list of file indicators.
:param file_name: File name to check against file
@ -371,7 +372,7 @@ class Indicators:
file_name, ioc["name"])
return ioc
def check_file_path(self, file_path):
def check_file_path(self, file_path: str) -> dict:
"""Check the provided file path against the list of file indicators (both path and name).
:param file_path: File path or file name to check against file
@ -394,7 +395,7 @@ class Indicators:
file_path, ioc["name"])
return ioc
def check_profile(self, profile_uuid):
def check_profile(self, profile_uuid: str) -> dict:
"""Check the provided configuration profile UUID against the list of indicators.
:param profile_uuid: Profile UUID to check against configuration profile indicators
@ -411,7 +412,7 @@ class Indicators:
profile_uuid, ioc["name"])
return ioc
def check_file_hash(self, file_hash):
def check_file_hash(self, file_hash: str) -> dict:
"""Check the provided SHA256 file hash against the list of indicators.
:param file_hash: SHA256 hash to check
@ -428,7 +429,7 @@ class Indicators:
file_hash, ioc["name"])
return ioc
def check_app_id(self, app_id):
def check_app_id(self, app_id: str) -> dict:
"""Check the provided app identifier (typically an Android package name)
against the list of indicators.
@ -447,7 +448,7 @@ class Indicators:
return ioc
def download_indicators_files(log):
def download_indicators_files(log: logging.Logger) -> None:
"""
Download indicators from repo into MVT app data directory.
"""

View File

@ -9,7 +9,7 @@ from .updates import check_for_updates
from .version import MVT_VERSION
def logo():
def logo() -> None:
print("\n")
print("\t[bold]MVT[/bold] - Mobile Verification Toolkit")
print("\t\thttps://mvt.re")

View File

@ -4,8 +4,10 @@
# https://license.mvt.re/1.1/
import csv
import logging
import os
import re
from typing import Callable
import simplejson as json
@ -28,8 +30,9 @@ class MVTModule(object):
enabled = True
slug = None
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=None):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = None):
"""Initialize module.
:param file_path: Path to the module's database file, if there is any
@ -56,7 +59,7 @@ class MVTModule(object):
self.timeline_detected = []
@classmethod
def from_json(cls, json_path, log=None):
def from_json(cls, json_path: str, log: logging.Logger = None):
with open(json_path, "r", encoding="utf-8") as handle:
results = json.load(handle)
if log:
@ -72,7 +75,7 @@ class MVTModule(object):
sub = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", self.__class__.__name__)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower()
def check_indicators(self):
def check_indicators(self) -> None:
"""Check the results of this module against a provided list of
indicators.
@ -80,7 +83,7 @@ class MVTModule(object):
"""
raise NotImplementedError
def save_to_json(self):
def save_to_json(self) -> None:
"""Save the collected results to a json file."""
if not self.results_path:
return
@ -103,11 +106,11 @@ class MVTModule(object):
with open(detected_json_path, "w", encoding="utf-8") as handle:
json.dump(self.detected, handle, indent=4, default=str)
def serialize(self, record):
def serialize(self, record: dict) -> None:
raise NotImplementedError
@staticmethod
def _deduplicate_timeline(timeline):
def _deduplicate_timeline(timeline: list) -> list:
"""Serialize entry as JSON to deduplicate repeated entries
:param timeline: List of entries from timeline to deduplicate
@ -118,7 +121,7 @@ class MVTModule(object):
timeline_set.add(json.dumps(record, sort_keys=True))
return [json.loads(record) for record in timeline_set]
def to_timeline(self):
def to_timeline(self) -> None:
"""Convert results into a timeline."""
for result in self.results:
record = self.serialize(result)
@ -140,12 +143,12 @@ class MVTModule(object):
self.timeline = self._deduplicate_timeline(self.timeline)
self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
def run(self):
def run(self) -> None:
"""Run the main module procedure."""
raise NotImplementedError
def run_module(module):
def run_module(module: Callable):
module.log.info("Running module %s...", module.__class__.__name__)
try:
@ -184,7 +187,7 @@ def run_module(module):
module.save_to_json()
def save_timeline(timeline, timeline_path):
def save_timeline(timeline: list, timeline_path: str):
"""Save the timeline in a csv file.
:param timeline: List of records to order and store

View File

@ -9,7 +9,7 @@ from packaging import version
from .version import MVT_VERSION
def check_for_updates():
def check_for_updates() -> None:
res = requests.get("https://pypi.org/pypi/mvt/json")
data = res.json()
latest_version = data.get("info", {}).get("version", "")

View File

@ -253,7 +253,7 @@ SHORTENER_DOMAINS = [
class URL:
def __init__(self, url):
def __init__(self, url: str) -> None:
if type(url) == bytes:
url = url.decode()
@ -262,7 +262,7 @@ class URL:
self.top_level = self.get_top_level()
self.is_shortened = False
def get_domain(self):
def get_domain(self) -> None:
"""Get the domain from a URL.
:param url: URL to parse
@ -273,11 +273,13 @@ class URL:
"""
# TODO: Properly handle exception.
try:
return get_tld(self.url, as_object=True, fix_protocol=True).parsed_url.netloc.lower().lstrip("www.")
return get_tld(self.url,
as_object=True,
fix_protocol=True).parsed_url.netloc.lower().lstrip("www.")
except Exception:
return None
def get_top_level(self):
def get_top_level(self) -> None:
"""Get only the top-level domain from a URL.
:param url: URL to parse
@ -306,7 +308,7 @@ class URL:
return self.is_shortened
def unshorten(self):
def unshorten(self) -> None:
"""Unshorten the URL by requesting an HTTP HEAD response."""
res = requests.head(self.url)
if str(res.status_code).startswith("30"):

View File

@ -8,7 +8,7 @@ import hashlib
import re
def convert_mactime_to_unix(timestamp, from_2001=True):
def convert_mactime_to_unix(timestamp, from_2001: bool = True):
"""Converts Mac Standard Time to a Unix timestamp.
:param timestamp: MacTime timestamp (either int or float).

View File

@ -21,7 +21,7 @@ class VTQuotaExceeded(Exception):
pass
def virustotal_lookup(file_hash):
def virustotal_lookup(file_hash: str):
if MVT_VT_API_KEY not in os.environ:
raise VTNoKey("No VirusTotal API key provided: to use VirusTotal lookups please provide your API key with `export MVT_VT_API_KEY=<key>`")

View File

@ -18,8 +18,9 @@ class CmdIOSCheckBackup(Command):
name = "check-backup"
modules = BACKUP_MODULES + MIXED_MODULES
def __init__(self, target_path=None, results_path=None, ioc_files=[],
module_name=None, serial=None, fast_mode=False):
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)

View File

@ -18,8 +18,9 @@ class CmdIOSCheckFS(Command):
name = "check-fs"
modules = FS_MODULES + MIXED_MODULES
def __init__(self, target_path=None, results_path=None, ioc_files=[],
module_name=None, serial=None, fast_mode=False):
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)

View File

@ -24,7 +24,7 @@ class DecryptBackup:
"""
def __init__(self, backup_path, dest_path=None):
def __init__(self, backup_path: str, dest_path: str = None) -> None:
"""Decrypts an encrypted iOS backup.
:param backup_path: Path to the encrypted backup folder
:param dest_path: Path to the folder where to store the decrypted backup
@ -38,7 +38,7 @@ class DecryptBackup:
return self._backup is not None
@staticmethod
def is_encrypted(backup_path) -> bool:
def is_encrypted(backup_path: str) -> bool:
"""Query Manifest.db file to see if it's encrypted or not.
:param backup_path: Path to the backup to decrypt
@ -54,13 +54,14 @@ class DecryptBackup:
log.critical("The backup does not seem encrypted!")
return False
def _process_file(self, relative_path, domain, item, file_id, item_folder):
def _process_file(self, relative_path: str, domain: str, item,
file_id: str, item_folder: str) -> None:
self._backup.getFileDecryptedCopy(manifestEntry=item,
targetName=file_id,
targetFolder=item_folder)
log.info("Decrypted file %s [%s] to %s/%s", relative_path, domain, item_folder, file_id)
def process_backup(self):
def process_backup(self) -> None:
if not os.path.exists(self.dest_path):
os.makedirs(self.dest_path)
@ -111,7 +112,7 @@ class DecryptBackup:
shutil.copy(os.path.join(self.backup_path, file_name),
self.dest_path)
def decrypt_with_password(self, password):
def decrypt_with_password(self, password: str) -> None:
"""Decrypts an encrypted iOS backup.
:param password: Password to use to decrypt the original backup
@ -149,7 +150,7 @@ class DecryptBackup:
log.exception(e)
log.critical("Failed to decrypt backup. Did you provide the correct password? Did you point to the right backup path?")
def decrypt_with_key_file(self, key_file):
def decrypt_with_key_file(self, key_file: str) -> None:
"""Decrypts an encrypted iOS backup using a key file.
:param key_file: File to read the key bytes to decrypt the backup
@ -179,7 +180,7 @@ class DecryptBackup:
log.exception(e)
log.critical("Failed to decrypt backup. Did you provide the correct key file?")
def get_key(self):
def get_key(self) -> None:
"""Retrieve and prints the encryption key."""
if not self._backup:
return
@ -188,7 +189,7 @@ class DecryptBackup:
log.info("Derived decryption key for backup at path %s is: \"%s\"",
self.backup_path, self._decryption_key)
def write_key(self, key_path):
def write_key(self, key_path: str) -> None:
"""Save extracted key to file.
:param key_path: Path to the file where to write the derived decryption key.

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
import plistlib
@ -15,15 +16,16 @@ from ..base import IOSExtraction
class BackupInfo(IOSExtraction):
"""This module extracts information about the device and the backup."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {}
def run(self):
def run(self) -> None:
info_path = os.path.join(self.target_path, "Info.plist")
if not os.path.exists(info_path):
raise DatabaseNotFoundError("No Info.plist at backup path, unable to extract device information")

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
import plistlib
from base64 import b64encode
@ -17,13 +18,14 @@ CONF_PROFILES_DOMAIN = "SysSharedContainerDomain-systemgroup.com.apple.configura
class ConfigurationProfiles(IOSExtraction):
"""This module extracts the full plist data from configuration profiles."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
if not record["install_date"]:
return
@ -36,7 +38,7 @@ class ConfigurationProfiles(IOSExtraction):
"data": f"{record['plist']['PayloadType']} installed: {record['plist']['PayloadUUID']} - {payload_name}: {payload_description}"
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -58,7 +60,7 @@ class ConfigurationProfiles(IOSExtraction):
self.detected.append(result)
continue
def run(self):
def run(self) -> None:
for conf_file in self._get_backup_files_from_manifest(domain=CONF_PROFILES_DOMAIN):
conf_rel_path = conf_file["relative_path"]
# Filter out all configuration files that are not configuration profiles.

View File

@ -5,6 +5,7 @@
import datetime
import io
import logging
import os
import plistlib
import sqlite3
@ -18,8 +19,9 @@ from ..base import IOSExtraction
class Manifest(IOSExtraction):
"""This module extracts information from a backup Manifest.db file."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
@ -47,7 +49,7 @@ class Manifest(IOSExtraction):
timestamp = datetime.datetime.utcfromtimestamp(timestamp_or_unix_time_int)
return convert_timestamp_to_iso(timestamp)
def serialize(self, record):
def serialize(self, record: dict) -> None:
records = []
if "modified" not in record or "status_changed" not in record:
return
@ -67,7 +69,7 @@ class Manifest(IOSExtraction):
return records
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -92,7 +94,7 @@ class Manifest(IOSExtraction):
ioc["value"], rel_path)
self.detected.append(result)
def run(self):
def run(self) -> None:
manifest_db_path = os.path.join(self.target_path, "Manifest.db")
if not os.path.isfile(manifest_db_path):
raise DatabaseNotFoundError("unable to find backup's Manifest.db")

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import plistlib
from mvt.common.utils import convert_timestamp_to_iso
@ -18,14 +19,14 @@ class ProfileEvents(IOSExtraction):
"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
@ -33,7 +34,7 @@ class ProfileEvents(IOSExtraction):
"data": f"Process {record.get('process')} started operation {record.get('operation')} of profile {record.get('profile_id')}"
}
def run(self):
def run(self) -> None:
for events_file in self._get_backup_files_from_manifest(relative_path=CONF_PROFILES_EVENTS_RELPATH):
events_file_path = self._get_backup_file_from_id(events_file["file_id"])
if not events_file_path:

View File

@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import glob
import logging
import os
import shutil
import sqlite3
@ -16,8 +17,9 @@ from mvt.common.module import (DatabaseCorruptedError, DatabaseNotFoundError,
class IOSExtraction(MVTModule):
"""This class provides a base for all iOS filesystem/backup extraction modules."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import plistlib
import sqlite3
@ -18,13 +19,14 @@ ANALYTICS_DB_PATH = [
class Analytics(IOSExtraction):
"""This module extracts information from the private/var/Keychains/Analytics/*.db files."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["timestamp"],
"module": self.__class__.__name__,
@ -32,7 +34,7 @@ class Analytics(IOSExtraction):
"data": f"{record}",
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -117,7 +119,7 @@ class Analytics(IOSExtraction):
self.log.info("Extracted information on %d analytics data from %s", len(self.results), artifact)
def run(self):
def run(self) -> None:
for file_path in self._get_fs_files_from_patterns(ANALYTICS_DB_PATH):
self.file_path = file_path
self.log.info("Found Analytics database file at path: %s", file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
import sqlite3
@ -11,13 +12,14 @@ from ..base import IOSExtraction
class CacheFiles(IOSExtraction):
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
records = []
for item in self.results[record]:
records.append({
@ -29,7 +31,7 @@ class CacheFiles(IOSExtraction):
return records
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -69,7 +71,7 @@ class CacheFiles(IOSExtraction):
"isodate": row[5],
})
def run(self):
def run(self) -> None:
self.results = {}
for root, dirs, files in os.walk(self.target_path):
for file_name in files:

View File

@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import datetime
import logging
import os
from mvt.common.utils import convert_timestamp_to_iso
@ -18,13 +19,14 @@ class Filesystem(IOSExtraction):
"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["modified"],
"module": self.__class__.__name__,
@ -32,7 +34,7 @@ class Filesystem(IOSExtraction):
"data": record["path"],
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -57,7 +59,7 @@ class Filesystem(IOSExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
for root, dirs, files in os.walk(self.target_path):
for dir_name in dirs:
try:

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from ..net_base import NetBase
@ -20,13 +21,14 @@ class Netusage(NetBase):
"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):
def run(self) -> None:
for netusage_path in self._get_fs_files_from_patterns(NETUSAGE_ROOT_PATHS):
self.file_path = netusage_path
self.log.info("Found NetUsage database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
@ -18,13 +19,14 @@ SAFARI_FAVICON_ROOT_PATHS = [
class SafariFavicon(IOSExtraction):
"""This module extracts all Safari favicon records."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -32,7 +34,7 @@ class SafariFavicon(IOSExtraction):
"data": f"Safari favicon from {record['url']} with icon URL {record['icon_url']} ({record['type']})",
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -92,7 +94,7 @@ class SafariFavicon(IOSExtraction):
cur.close()
conn.close()
def run(self):
def run(self) -> None:
for file_path in self._get_fs_files_from_patterns(SAFARI_FAVICON_ROOT_PATHS):
self.log.info("Found Safari favicon cache database at path: %s", file_path)
self._process_favicon_db(file_path)

View File

@ -3,6 +3,8 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from ..base import IOSExtraction
@ -15,13 +17,14 @@ SHUTDOWN_LOG_PATH = [
class ShutdownLog(IOSExtraction):
"""This module extracts processes information from the shutdown log file."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -29,7 +32,7 @@ class ShutdownLog(IOSExtraction):
"data": f"Client {record['client']} with PID {record['pid']} was running when the device was shut down",
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -83,7 +86,7 @@ class ShutdownLog(IOSExtraction):
self.results = sorted(self.results, key=lambda entry: entry["isodate"])
def run(self):
def run(self) -> None:
self._find_ios_database(root_paths=SHUTDOWN_LOG_PATH)
self.log.info("Found shutdown log at path: %s", self.file_path)
with open(self.file_path, "r", encoding="utf-8") as handle:

View File

@ -5,6 +5,7 @@
import datetime
import json
import logging
from mvt.common.utils import convert_timestamp_to_iso
@ -18,13 +19,14 @@ IOS_ANALYTICS_JOURNAL_PATHS = [
class IOSVersionHistory(IOSExtraction):
"""This module extracts iOS update history from Analytics Journal log files."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -32,7 +34,7 @@ class IOSVersionHistory(IOSExtraction):
"data": f"Recorded iOS version {record['os_version']}",
}
def run(self):
def run(self) -> None:
for found_path in self._get_fs_files_from_patterns(IOS_ANALYTICS_JOURNAL_PATHS):
with open(found_path, "r", encoding="utf-8") as analytics_log:
log_line = json.loads(analytics_log.readline().strip())

View File

@ -14,7 +14,7 @@ from ..base import IOSExtraction
class WebkitBase(IOSExtraction):
"""This class is a base for other WebKit-related modules."""
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return

View File

@ -3,6 +3,8 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from .webkit_base import WebkitBase
WEBKIT_INDEXEDDB_ROOT_PATHS = [
@ -19,13 +21,14 @@ class WebkitIndexedDB(WebkitBase):
slug = "webkit_indexeddb"
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -33,7 +36,7 @@ class WebkitIndexedDB(WebkitBase):
"data": f"IndexedDB folder {record['folder']} containing file for URL {record['url']}",
}
def run(self):
def run(self) -> None:
self._process_webkit_folder(WEBKIT_INDEXEDDB_ROOT_PATHS)
self.log.info("Extracted a total of %d WebKit IndexedDB records",
len(self.results))

View File

@ -3,6 +3,8 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from .webkit_base import WebkitBase
WEBKIT_LOCALSTORAGE_ROOT_PATHS = [
@ -17,13 +19,14 @@ class WebkitLocalStorage(WebkitBase):
"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -31,7 +34,7 @@ class WebkitLocalStorage(WebkitBase):
"data": f"WebKit Local Storage folder {record['folder']} containing file for URL {record['url']}",
}
def run(self):
def run(self) -> None:
self._process_webkit_folder(WEBKIT_LOCALSTORAGE_ROOT_PATHS)
self.log.info("Extracted a total of %d records from WebKit Local Storages",
len(self.results))

View File

@ -3,6 +3,8 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from .webkit_base import WebkitBase
WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS = [
@ -17,13 +19,14 @@ class WebkitSafariViewService(WebkitBase):
"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):
def run(self) -> None:
self._process_webkit_folder(WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS)
self.log.info("Extracted a total of %d records from WebKit SafariViewService WebsiteData",
len(self.results))

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
@ -20,13 +21,14 @@ CALLS_ROOT_PATHS = [
class Calls(IOSExtraction):
"""This module extracts phone calls details"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -34,7 +36,7 @@ class Calls(IOSExtraction):
"data": f"From {record['number']} using {record['provider']} during {record['duration']} seconds"
}
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=CALLS_BACKUP_IDS,
root_paths=CALLS_ROOT_PATHS)
self.log.info("Found Calls database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from mvt.common.utils import (convert_chrometime_to_unix,
@ -23,13 +24,14 @@ CHROME_FAVICON_ROOT_PATHS = [
class ChromeFavicon(IOSExtraction):
"""This module extracts all Chrome favicon records."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -37,7 +39,7 @@ class ChromeFavicon(IOSExtraction):
"data": f"{record['icon_url']} from {record['url']}"
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -50,7 +52,7 @@ class ChromeFavicon(IOSExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=CHROME_FAVICON_BACKUP_IDS,
root_paths=CHROME_FAVICON_ROOT_PATHS)
self.log.info("Found Chrome favicon cache database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from mvt.common.utils import (convert_chrometime_to_unix,
@ -22,13 +23,14 @@ CHROME_HISTORY_ROOT_PATHS = [
class ChromeHistory(IOSExtraction):
"""This module extracts all Chome visits."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -36,7 +38,7 @@ class ChromeHistory(IOSExtraction):
"data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, redirect source: {record['redirect_source']})"
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -46,7 +48,7 @@ class ChromeHistory(IOSExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=CHROME_HISTORY_BACKUP_IDS,
root_paths=CHROME_HISTORY_ROOT_PATHS)
self.log.info("Found Chrome history database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from ..base import IOSExtraction
@ -18,13 +19,14 @@ CONTACTS_ROOT_PATHS = [
class Contacts(IOSExtraction):
"""This module extracts all contact details from the phone's address book."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=CONTACTS_BACKUP_IDS, root_paths=CONTACTS_ROOT_PATHS)
self.log.info("Found Contacts database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from datetime import datetime
@ -21,13 +22,14 @@ FIREFOX_HISTORY_ROOT_PATHS = [
class FirefoxFavicon(IOSExtraction):
"""This module extracts all Firefox favicon"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -35,7 +37,7 @@ class FirefoxFavicon(IOSExtraction):
"data": f"Firefox favicon {record['url']} when visiting {record['history_url']}",
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -48,7 +50,7 @@ class FirefoxFavicon(IOSExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS,
root_paths=FIREFOX_HISTORY_ROOT_PATHS)
self.log.info("Found Firefox favicon database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from datetime import datetime
@ -25,13 +26,14 @@ class FirefoxHistory(IOSExtraction):
"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -39,7 +41,7 @@ class FirefoxHistory(IOSExtraction):
"data": f"Firefox visit with ID {record['id']} to URL: {record['url']}",
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -49,7 +51,7 @@ class FirefoxHistory(IOSExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS, root_paths=FIREFOX_HISTORY_ROOT_PATHS)
self.log.info("Found Firefox history database at path: %s", self.file_path)

View File

@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import collections
import logging
import plistlib
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
@ -22,13 +23,14 @@ IDSTATUSCACHE_ROOT_PATHS = [
class IDStatusCache(IOSExtraction):
"""Extracts Apple Authentication information from idstatuscache.plist"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -36,7 +38,7 @@ class IDStatusCache(IOSExtraction):
"data": f"Lookup of {record['user']} within {record['package']} (Status {record['idstatus']})"
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -83,7 +85,7 @@ class IDStatusCache(IOSExtraction):
entry["occurrences"] = entry_counter[entry["user"]]
self.results.append(entry)
def run(self):
def run(self) -> None:
if self.is_backup:
self._find_ios_database(backup_ids=IDSTATUSCACHE_BACKUP_IDS)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
@ -20,8 +21,9 @@ INTERACTIONC_ROOT_PATHS = [
class InteractionC(IOSExtraction):
"""This module extracts data from InteractionC db."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
@ -39,7 +41,7 @@ class InteractionC(IOSExtraction):
"last_outgoing_recipient_date",
]
def serialize(self, record):
def serialize(self, record: dict) -> None:
records = []
processed = []
for ts in self.timestamps:
@ -63,7 +65,7 @@ class InteractionC(IOSExtraction):
return records
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=INTERACTIONC_BACKUP_IDS, root_paths=INTERACTIONC_ROOT_PATHS)
self.log.info("Found InteractionC database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import plistlib
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
@ -21,8 +22,9 @@ LOCATIOND_ROOT_PATHS = [
class LocationdClients(IOSExtraction):
"""Extract information from apps who used geolocation."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
@ -39,7 +41,7 @@ class LocationdClients(IOSExtraction):
"BeaconRegionTimeStopped",
]
def serialize(self, record):
def serialize(self, record: dict) -> None:
records = []
for timestamp in self.timestamps:
if timestamp in record.keys():
@ -52,7 +54,7 @@ class LocationdClients(IOSExtraction):
return records
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -108,7 +110,7 @@ class LocationdClients(IOSExtraction):
self.results.append(result)
def run(self):
def run(self) -> None:
if self.is_backup:
self._find_ios_database(backup_ids=LOCATIOND_BACKUP_IDS)

View File

@ -3,6 +3,8 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from ..net_base import NetBase
DATAUSAGE_BACKUP_IDS = [
@ -20,13 +22,14 @@ class Datausage(NetBase):
"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=DATAUSAGE_BACKUP_IDS,
root_paths=DATAUSAGE_ROOT_PATHS)
self.log.info("Found DataUsage database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import plistlib
from mvt.common.utils import convert_timestamp_to_iso
@ -20,13 +21,14 @@ OSANALYTICS_ADDAILY_ROOT_PATHS = [
class OSAnalyticsADDaily(IOSExtraction):
"""Extract network usage information by process, from com.apple.osanalytics.addaily.plist"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
record_data = f"{record['package']} WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \
f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}"
return {
@ -36,7 +38,7 @@ class OSAnalyticsADDaily(IOSExtraction):
"data": record_data,
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -46,7 +48,7 @@ class OSAnalyticsADDaily(IOSExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=OSANALYTICS_ADDAILY_BACKUP_IDS,
root_paths=OSANALYTICS_ADDAILY_ROOT_PATHS)
self.log.info("Found com.apple.osanalytics.addaily plist at path: %s", self.file_path)

View File

@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import io
import logging
import os
import plistlib
import sqlite3
@ -23,15 +24,16 @@ SAFARI_BROWSER_STATE_ROOT_PATHS = [
class SafariBrowserState(IOSExtraction):
"""This module extracts all Safari browser state records."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self._session_history_count = 0
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["last_viewed_timestamp"],
"module": self.__class__.__name__,
@ -39,7 +41,7 @@ class SafariBrowserState(IOSExtraction):
"data": f"{record['tab_title']} - {record['tab_url']}"
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -118,7 +120,7 @@ class SafariBrowserState(IOSExtraction):
"safari_browser_state_db": os.path.relpath(db_path, self.target_path),
})
def run(self):
def run(self) -> None:
if self.is_backup:
for backup_file in self._get_backup_files_from_manifest(relative_path=SAFARI_BROWSER_STATE_BACKUP_RELPATH):
self.file_path = self._get_backup_file_from_id(backup_file["file_id"])

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
import sqlite3
@ -25,13 +26,14 @@ class SafariHistory(IOSExtraction):
"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -73,7 +75,7 @@ class SafariHistory(IOSExtraction):
if elapsed_time.seconds == 0:
self.log.warning("Redirect took less than a second! (%d milliseconds)", elapsed_ms)
def check_indicators(self):
def check_indicators(self) -> None:
self._find_injections()
if not self.indicators:
@ -117,7 +119,7 @@ class SafariHistory(IOSExtraction):
cur.close()
conn.close()
def run(self):
def run(self) -> None:
if self.is_backup:
for history_file in self._get_backup_files_from_manifest(relative_path=SAFARI_HISTORY_BACKUP_RELPATH):
history_path = self._get_backup_file_from_id(history_file["file_id"])

View File

@ -5,6 +5,7 @@
import io
import itertools
import logging
import plistlib
import sqlite3
@ -24,13 +25,14 @@ SHORTCUT_ROOT_PATHS = [
class Shortcuts(IOSExtraction):
"""This module extracts all info about SMS/iMessage attachments."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
found_urls = ""
if record["action_urls"]:
found_urls = "- URLs in actions: {}".format(", ".join(record["action_urls"]))
@ -50,7 +52,7 @@ class Shortcuts(IOSExtraction):
"data": f"iOS Shortcut '{record['shortcut_name'].decode('utf-8')}': {desc} {found_urls}"
}]
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -60,7 +62,7 @@ class Shortcuts(IOSExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=SHORTCUT_BACKUP_IDS,
root_paths=SHORTCUT_ROOT_PATHS)
self.log.info("Found Shortcuts database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from base64 import b64encode
@ -22,13 +23,14 @@ SMS_ROOT_PATHS = [
class SMS(IOSExtraction):
"""This module extracts all SMS messages containing links."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
text = record["text"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],
@ -37,7 +39,7 @@ class SMS(IOSExtraction):
"data": f"{record['service']}: {record['guid']} \"{text}\" from {record['phone_number']} ({record['account']})"
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -48,7 +50,7 @@ class SMS(IOSExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=SMS_BACKUP_IDS,
root_paths=SMS_ROOT_PATHS)
self.log.info("Found SMS database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from base64 import b64encode
@ -21,13 +22,14 @@ SMS_ROOT_PATHS = [
class SMSAttachments(IOSExtraction):
"""This module extracts all info about SMS/iMessage attachments."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@ -36,7 +38,7 @@ class SMSAttachments(IOSExtraction):
f"with {record['total_bytes']} bytes (is_sticker: {record['is_sticker']}, has_user_info: {record['has_user_info']})"
}
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=SMS_BACKUP_IDS,
root_paths=SMS_ROOT_PATHS)
self.log.info("Found SMS database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sqlite3
from datetime import datetime
@ -47,13 +48,14 @@ AUTH_REASONS = {
class TCC(IOSExtraction):
"""This module extracts records from the TCC.db SQLite database."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
if "last_modified" in record:
if "allowed_value" in record:
msg = f"Access to {record['service']} by {record['client']} {record['allowed_value']}"
@ -66,7 +68,7 @@ class TCC(IOSExtraction):
"data": msg
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -156,7 +158,7 @@ class TCC(IOSExtraction):
cur.close()
conn.close()
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=TCC_BACKUP_IDS, root_paths=TCC_ROOT_PATHS)
self.log.info("Found TCC database at path: %s", self.file_path)
self.process_db(self.file_path)

View File

@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import datetime
import logging
import os
import sqlite3
@ -22,15 +23,16 @@ class WebkitResourceLoadStatistics(IOSExtraction):
"""This module extracts records from WebKit ResourceLoadStatistics observations.db."""
# TODO: Add serialize().
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -73,7 +75,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
if len(self.results[key]) > 0:
self.log.info("Extracted a total of %d records from %s", len(self.results[key]), db_path)
def run(self):
def run(self) -> None:
if self.is_backup:
try:
for backup_file in self._get_backup_files_from_manifest(relative_path=WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH):

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
import plistlib
@ -29,8 +30,9 @@ class WebkitSessionResourceLog(IOSExtraction):
"""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
@ -51,7 +53,7 @@ class WebkitSessionResourceLog(IOSExtraction):
return domains
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -117,7 +119,7 @@ class WebkitSessionResourceLog(IOSExtraction):
return items
def run(self):
def run(self) -> None:
if self.is_backup:
for log_file in self._get_backup_files_from_manifest(relative_path=WEBKIT_SESSION_RESOURCE_LOG_BACKUP_RELPATH):
log_path = self._get_backup_file_from_id(log_file["file_id"])

View File

@ -24,13 +24,14 @@ WHATSAPP_ROOT_PATHS = [
class Whatsapp(IOSExtraction):
"""This module extracts all WhatsApp messages containing links."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
def serialize(self, record: dict) -> None:
text = record.get("ZTEXT", "").replace("\n", "\\n")
links_text = ""
if record["links"]:
@ -43,7 +44,7 @@ class Whatsapp(IOSExtraction):
"data": f"\'{text}\' from {record.get('ZFROMJID', 'Unknown')}{links_text}",
}
def check_indicators(self):
def check_indicators(self) -> None:
if not self.indicators:
return
@ -53,7 +54,7 @@ class Whatsapp(IOSExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
def run(self) -> None:
self._find_ios_database(backup_ids=WHATSAPP_BACKUP_IDS,
root_paths=WHATSAPP_ROOT_PATHS)
self.log.info("Found WhatsApp database at path: %s", self.file_path)

View File

@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import operator
import sqlite3
from pathlib import Path
@ -15,8 +16,9 @@ from .base import IOSExtraction
class NetBase(IOSExtraction):
"""This class provides a base for DataUsage and NetUsage extraction modules."""
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
@ -78,7 +80,7 @@ class NetBase(IOSExtraction):
self.log.info("Extracted information on %d processes", len(self.results))
def serialize(self, record):
def serialize(self, record: dict) -> None:
record_data = f"{record['proc_name']} (Bundle ID: {record['bundle_id']}, ID: {record['proc_id']})"
record_data_usage = record_data + f" WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \
f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}"
@ -214,7 +216,7 @@ class NetBase(IOSExtraction):
self.results = sorted(self.results, key=operator.itemgetter("first_isodate"))
def check_indicators(self):
def check_indicators(self) -> None:
# Check for manipulated process records.
# TODO: Catching KeyError for live_isodate for retro-compatibility.
# This is not very good.

View File

@ -244,18 +244,19 @@ IPHONE_IOS_VERSIONS = [
]
def get_device_desc_from_id(identifier, devices_list=IPHONE_MODELS):
def get_device_desc_from_id(identifier: str,
devices_list: list = IPHONE_MODELS) -> str:
for model in IPHONE_MODELS:
if identifier == model["identifier"]:
return model["description"]
def find_version_by_build(build):
def find_version_by_build(build: str) -> str:
build = build.upper()
for version in IPHONE_IOS_VERSIONS:
if build == version["build"]:
return version["version"]
def latest_ios_version():
def latest_ios_version() -> str:
return IPHONE_IOS_VERSIONS[-1]