First commit to refactor of command definitions

This commit is contained in:
Nex 2022-06-15 17:41:19 +02:00
parent a9be771f79
commit a12c4e6b93
42 changed files with 384 additions and 298 deletions

View File

@ -3,32 +3,25 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import getpass
import io
import logging
import os
import tarfile
from pathlib import Path
from zipfile import ZipFile
import click
from rich.logging import RichHandler
from mvt.android.parsers.backup import (AndroidBackupParsingError,
InvalidBackupPassword, parse_ab_header,
parse_backup_file)
from mvt.android.cmd_check_adb import CmdAndroidCheckADB
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT, HELP_MSG_SERIAL)
from mvt.common.indicators import Indicators, download_indicators_files
from mvt.common.logo import logo
from mvt.common.module import run_module, save_timeline
from .download_apks import DownloadAPKs
from .cmd_download_apks import DownloadAPKs
from .modules.adb import ADB_MODULES
from .modules.adb.packages import Packages
from .modules.backup import BACKUP_MODULES
from .modules.bugreport import BUGREPORT_MODULES
# Setup logging using Rich.
LOG_FORMAT = "[%(name)s] %(message)s"
@ -122,48 +115,14 @@ def download_apks(ctx, all_apks, virustotal, output, from_file, serial):
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.pass_context
def check_adb(ctx, iocs, output, fast, list_modules, module, serial):
if list_modules:
log.info("Following is the list of available check-adb modules:")
for adb_module in ADB_MODULES:
log.info(" - %s", adb_module.__name__)
cmd = CmdAndroidCheckADB(results_path=output, ioc_files=iocs,
module_name=module, serial=serial, fast_mode=fast)
if list_modules:
cmd.list_modules()
return
log.info("Checking Android through adb bridge")
if output and not os.path.exists(output):
try:
os.makedirs(output)
except Exception as e:
log.critical("Unable to create output folder %s: %s", output, e)
ctx.exit(1)
indicators = Indicators(log=log)
indicators.load_indicators_files(iocs)
timeline = []
timeline_detected = []
for adb_module in ADB_MODULES:
if module and adb_module.__name__ != module:
continue
m = adb_module(output_folder=output, fast_mode=fast,
log=logging.getLogger(adb_module.__module__))
if indicators.total_ioc_count:
m.indicators = indicators
m.indicators.log = m.log
if serial:
m.serial = serial
run_module(m)
timeline.extend(m.timeline)
timeline_detected.extend(m.timeline_detected)
if output:
if len(timeline) > 0:
save_timeline(timeline, os.path.join(output, "timeline.csv"))
if len(timeline_detected) > 0:
save_timeline(timeline_detected, os.path.join(output, "timeline_detected.csv"))
cmd.run()
#==============================================================================
@ -178,66 +137,14 @@ def check_adb(ctx, iocs, output, fast, list_modules, module, serial):
@click.argument("BUGREPORT_PATH", type=click.Path(exists=True))
@click.pass_context
def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path):
if list_modules:
log.info("Following is the list of available check-bugreport modules:")
for adb_module in BUGREPORT_MODULES:
log.info(" - %s", adb_module.__name__)
cmd = CmdAndroidCheckBugreport(target_path=bugreport_path, results_path=output,
ioc_files=iocs, module_name=module)
if list_modules:
cmd.list_modules()
return
log.info("Checking an Android Bug Report located at: %s", bugreport_path)
if output and not os.path.exists(output):
try:
os.makedirs(output)
except Exception as e:
log.critical("Unable to create output folder %s: %s", output, e)
ctx.exit(1)
indicators = Indicators(log=log)
indicators.load_indicators_files(iocs)
if os.path.isfile(bugreport_path):
bugreport_format = "zip"
zip_archive = ZipFile(bugreport_path)
zip_files = []
for file_name in zip_archive.namelist():
zip_files.append(file_name)
elif os.path.isdir(bugreport_path):
bugreport_format = "dir"
folder_files = []
parent_path = Path(bugreport_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(bugreport_path)):
for file_name in subfiles:
folder_files.append(os.path.relpath(os.path.join(root, file_name), parent_path))
timeline = []
timeline_detected = []
for bugreport_module in BUGREPORT_MODULES:
if module and bugreport_module.__name__ != module:
continue
m = bugreport_module(base_folder=bugreport_path, output_folder=output,
log=logging.getLogger(bugreport_module.__module__))
if bugreport_format == "zip":
m.from_zip(zip_archive, zip_files)
else:
m.from_folder(bugreport_path, folder_files)
if indicators.total_ioc_count:
m.indicators = indicators
m.indicators.log = m.log
run_module(m)
timeline.extend(m.timeline)
timeline_detected.extend(m.timeline_detected)
if output:
if len(timeline) > 0:
save_timeline(timeline, os.path.join(output, "timeline.csv"))
if len(timeline_detected) > 0:
save_timeline(timeline_detected, os.path.join(output, "timeline_detected.csv"))
cmd.run()
#==============================================================================
@ -248,74 +155,18 @@ def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path):
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
def check_backup(ctx, iocs, output, backup_path, serial):
log.info("Checking ADB backup located at: %s", backup_path)
def check_backup(ctx, iocs, output, list_modules, backup_path, serial):
cmd = CmdAndroidCheckBackup(target_path=backup_path, results_path=output,
ioc_files=iocs)
if os.path.isfile(backup_path):
# AB File
backup_type = "ab"
with open(backup_path, "rb") as handle:
data = handle.read()
header = parse_ab_header(data)
if not header["backup"]:
log.critical("Invalid backup format, file should be in .ab format")
ctx.exit(1)
password = None
if header["encryption"] != "none":
password = getpass.getpass(prompt="Backup Password: ", stream=None)
try:
tardata = parse_backup_file(data, password=password)
except InvalidBackupPassword:
log.critical("Invalid backup password")
ctx.exit(1)
except AndroidBackupParsingError:
log.critical("Impossible to parse this backup file, please use Android Backup Extractor instead")
ctx.exit(1)
if list_modules:
cmd.list_modules()
return
dbytes = io.BytesIO(tardata)
tar = tarfile.open(fileobj=dbytes)
files = []
for member in tar:
files.append(member.name)
elif os.path.isdir(backup_path):
backup_type = "folder"
backup_path = Path(backup_path).absolute().as_posix()
files = []
for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)):
for fname in subfiles:
files.append(os.path.relpath(os.path.join(root, fname), backup_path))
else:
log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file")
ctx.exit(1)
if output and not os.path.exists(output):
try:
os.makedirs(output)
except Exception as e:
log.critical("Unable to create output folder %s: %s", output, e)
ctx.exit(1)
indicators = Indicators(log=log)
indicators.load_indicators_files(iocs)
for module in BACKUP_MODULES:
m = module(base_folder=backup_path, output_folder=output,
log=logging.getLogger(module.__module__))
if indicators.total_ioc_count:
m.indicators = indicators
m.indicators.log = m.log
if serial:
m.serial = serial
if backup_type == "folder":
m.from_folder(backup_path, files)
else:
m.from_ab(backup_path, tar, files)
run_module(m)
cmd.run()
#==============================================================================

View File

@ -0,0 +1,24 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.command import Command
from .modules.adb import ADB_MODULES
log = logging.getLogger(__name__)
class CmdAndroidCheckADB(Command):
name = "check-adb"
modules = ADB_MODULES
def __init__(self, target_path=None, results_path=None, ioc_files=[], module_name=None,
serial=None, fast_mode=False):
super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files,
module_name=module_name, serial=serial,
fast_mode=fast_mode, log=log)

View File

@ -0,0 +1,82 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import getpass
import io
import logging
import os
import sys
import tarfile
from pathlib import Path
from mvt.android.parsers.backup import (AndroidBackupParsingError,
InvalidBackupPassword, parse_ab_header,
parse_backup_file)
from mvt.common.command import Command
from .modules.backup import BACKUP_MODULES
log = logging.getLogger(__name__)
class CmdAndroidCheckBackup(Command):
name = "check-backup"
modules = BACKUP_MODULES
def __init__(self, target_path=None, results_path=None, ioc_files=[], module_name=None,
serial=None, fast_mode=False):
super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files,
module_name=module_name, serial=serial,
fast_mode=fast_mode, log=log)
self.backup_type = None
self.backup_archive = None
self.backup_files = []
def init(self):
if os.path.isfile(self.target_path):
self.backup_type = "ab"
with open(self.target_path, "rb") as handle:
data = handle.read()
header = parse_ab_header(data)
if not header["backup"]:
log.critical("Invalid backup format, file should be in .ab format")
sys.exit(1)
password = None
if header["encryption"] != "none":
password = getpass.getpass(prompt="Backup Password: ", stream=None)
try:
tardata = parse_backup_file(data, password=password)
except InvalidBackupPassword:
log.critical("Invalid backup password")
sys.exit(1)
except AndroidBackupParsingError as e:
log.critical("Impossible to parse this backup file: %s", e)
log.critical("Please use Android Backup Extractor (ABE) instead")
sys.exit(1)
dbytes = io.BytesIO(tardata)
self.backup_archive = tarfile.open(fileobj=dbytes)
for member in self.backup_archive:
self.backup_files.append(member.name)
elif os.path.isdir(self.target_path):
self.backup_type = "folder"
self.target_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
for fname in subfiles:
self.backup_files.append(os.path.relpath(os.path.join(root, fname), self.target_path))
else:
log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file")
sys.exit(1)
def module_init(self, module):
if self.backup_type == "folder":
module.from_folder(self.target_path, self.backup_files)
else:
module.from_ab(self.target_path, self.backup_archive, self.backup_files)

View File

@ -0,0 +1,50 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from pathlib import Path
from zipfile import ZipFile
from mvt.common.command import Command
from .modules.bugreport import BUGREPORT_MODULES
log = logging.getLogger(__name__)
class CmdAndroidCheckBugreport(Command):
name = "check-bugreport"
modules = BUGREPORT_MODULES
def __init__(self, target_path=None, results_path=None, ioc_files=[], module_name=None,
serial=None, fast_mode=False):
super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files,
module_name=module_name, serial=serial,
fast_mode=fast_mode, log=log)
self.bugreport_format = None
self.bugreport_archive = None
self.bugreport_files = []
def init(self):
if os.path.isfile(self.target_path):
self.bugreport_format = "zip"
self.bugreport_archive = ZipFile(self.target_path)
for file_name in self.bugreport_archive.namelist():
self.bugreport_files.append(file_name)
elif os.path.isdir(self.target_path):
self.bugreport_format = "dir"
parent_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
for file_name in subfiles:
self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path))
def module_init(self, module):
if self.bugreport_format == "zip":
module.from_zip(self.bugreport_archive, self.bugreport_files)
else:
module.from_folder(self.target_path, self.bugreport_files)

View File

@ -33,10 +33,10 @@ ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub")
class AndroidExtraction(MVTModule):
"""This class provides a base for all Android extraction modules."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.device = None

View File

@ -20,10 +20,10 @@ CHROME_HISTORY_PATH = "data/data/com.android.chrome/app_chrome/Default/History"
class ChromeHistory(AndroidExtraction):
"""This module extracts records from Android's Chrome browsing history."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):

View File

@ -15,10 +15,10 @@ log = logging.getLogger(__name__)
class DumpsysAccessibility(AndroidExtraction):
"""This module extracts stats on accessibility."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):

View File

@ -15,10 +15,10 @@ log = logging.getLogger(__name__)
class DumpsysActivities(AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}

View File

@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import logging
import re
from mvt.android.parsers.dumpsys import parse_dumpsys_appops
@ -18,10 +17,10 @@ class DumpsysAppOps(AndroidExtraction):
slug = "dumpsys_appops"
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):

View File

@ -15,10 +15,10 @@ log = logging.getLogger(__name__)
class DumpsysBatteryDaily(AndroidExtraction):
"""This module extracts records from battery daily updates."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):

View File

@ -15,10 +15,10 @@ log = logging.getLogger(__name__)
class DumpsysBatteryHistory(AndroidExtraction):
"""This module extracts records from battery history events."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):

View File

@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import logging
import re
from mvt.android.parsers import parse_dumpsys_dbinfo
@ -18,10 +17,10 @@ class DumpsysDBInfo(AndroidExtraction):
slug = "dumpsys_dbinfo"
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):

View File

@ -14,18 +14,18 @@ log = logging.getLogger(__name__)
class DumpsysFull(AndroidExtraction):
"""This module extracts stats on battery consumption by processes."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):
self._adb_connect()
output = self._adb_command("dumpsys")
if self.output_folder:
output_path = os.path.join(self.output_folder, "dumpsys.txt")
if self.results_path:
output_path = os.path.join(self.results_path, "dumpsys.txt")
with open(output_path, "w", encoding="utf-8") as handle:
handle.write(output)

View File

@ -21,10 +21,10 @@ INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL"
class DumpsysReceivers(AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}

View File

@ -17,10 +17,10 @@ log = logging.getLogger(__name__)
class Files(AndroidExtraction):
"""This module extracts the list of files on the device."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.full_find = False

View File

@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import logging
import re
from datetime import datetime, timedelta
from mvt.android.parsers import parse_getprop
@ -17,10 +16,10 @@ log = logging.getLogger(__name__)
class Getprop(AndroidExtraction):
"""This module extracts device properties from getprop command."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results

View File

@ -14,10 +14,10 @@ log = logging.getLogger(__name__)
class Logcat(AndroidExtraction):
"""This module extracts details on installed packages."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):
@ -28,8 +28,8 @@ class Logcat(AndroidExtraction):
# Get the locat prior to last reboot.
last_output = self._adb_command("logcat -L")
if self.output_folder:
logcat_path = os.path.join(self.output_folder,
if self.results_path:
logcat_path = os.path.join(self.results_path,
"logcat.txt")
with open(logcat_path, "w", encoding="utf-8") as handle:
handle.write(output)
@ -37,7 +37,7 @@ class Logcat(AndroidExtraction):
log.info("Current logcat logs stored at %s",
logcat_path)
logcat_last_path = os.path.join(self.output_folder,
logcat_last_path = os.path.join(self.results_path,
"logcat_last.txt")
with open(logcat_last_path, "w", encoding="utf-8") as handle:
handle.write(last_output)

View File

@ -4,9 +4,7 @@
# https://license.mvt.re/1.1/
import logging
import os
import pkg_resources
from rich.console import Console
from rich.progress import track
from rich.table import Table
@ -74,10 +72,10 @@ ROOT_PACKAGES = [
class Packages(AndroidExtraction):
"""This module extracts the list of installed packages."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):

View File

@ -13,10 +13,10 @@ log = logging.getLogger(__name__)
class Processes(AndroidExtraction):
"""This module extracts details on running processes."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):

View File

@ -13,10 +13,10 @@ log = logging.getLogger(__name__)
class RootBinaries(AndroidExtraction):
"""This module extracts the list of installed packages."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self):

View File

@ -4,9 +4,6 @@
# https://license.mvt.re/1.1/
import logging
import os
import pkg_resources
from .base import AndroidExtraction
@ -18,10 +15,10 @@ class SELinuxStatus(AndroidExtraction):
slug = "selinux_status"
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results

View File

@ -57,10 +57,10 @@ ANDROID_DANGEROUS_SETTINGS = [
class Settings(AndroidExtraction):
"""This module extracts Android system settings."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results

View File

@ -3,8 +3,6 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import base64
import getpass
import logging
import os
import sqlite3
@ -48,10 +46,10 @@ FROM sms;
class SMS(AndroidExtraction):
"""This module extracts all SMS messages containing links."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):

View File

@ -20,10 +20,10 @@ WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db"
class Whatsapp(AndroidExtraction):
"""This module extracts all WhatsApp messages containing links."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):

View File

@ -5,14 +5,13 @@
from mvt.android.modules.backup.base import BackupExtraction
from mvt.android.parsers.backup import parse_sms_file
from mvt.common.utils import check_for_links
class SMS(BackupExtraction):
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = []

View File

@ -15,10 +15,10 @@ log = logging.getLogger(__name__)
class Accessibility(BugReportModule):
"""This module extracts stats on accessibility."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):

View File

@ -15,10 +15,10 @@ log = logging.getLogger(__name__)
class Activities(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}

View File

@ -15,10 +15,10 @@ log = logging.getLogger(__name__)
class Appops(BugReportModule):
"""This module extracts information on package from App-Ops Manager."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):

View File

@ -15,10 +15,10 @@ log = logging.getLogger(__name__)
class BatteryDaily(BugReportModule):
"""This module extracts records from battery daily updates."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):

View File

@ -15,10 +15,10 @@ log = logging.getLogger(__name__)
class BatteryHistory(BugReportModule):
"""This module extracts records from battery daily updates."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):

View File

@ -17,10 +17,10 @@ class DBInfo(BugReportModule):
slug = "dbinfo"
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):

View File

@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import logging
import re
from datetime import datetime, timedelta
from mvt.android.parsers import parse_getprop
@ -17,10 +16,10 @@ log = logging.getLogger(__name__)
class Getprop(BugReportModule):
"""This module extracts device properties from getprop command."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results

View File

@ -14,10 +14,10 @@ log = logging.getLogger(__name__)
class Packages(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):

View File

@ -21,10 +21,10 @@ INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL"
class Receivers(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}

View File

@ -70,11 +70,11 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_b
The backup master key is extracted from the master key blog after decryption.
"""
# Derive key from password using PBKDF2
# Derive key from password using PBKDF2.
kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=user_salt, iterations=pbkdf2_rounds)
key = kdf.derive(password.encode("utf-8"))
# Decrypt master key blob
# Decrypt master key blob.
cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv))
decryptor = cipher.decryptor()
try:
@ -93,7 +93,7 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_b
except TypeError:
raise InvalidBackupPassword()
# Handle quirky encoding of master key bytes in Android original Java crypto code
# Handle quirky encoding of master key bytes in Android original Java crypto code.
if format_version > 1:
hmac_mk = to_utf8_bytes(master_key)
else:
@ -112,6 +112,7 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_b
def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_version):
"""
Generate encryption keyffrom password and do decryption
"""
if encryption_algo != b"AES-256":
raise AndroidBackupNotImplemented("Encryption Algorithm not implemented")
@ -126,12 +127,12 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_vers
user_iv = bytes.fromhex(user_iv.decode("utf-8"))
master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8"))
# Derive decryption master key from password
# Derive decryption master key from password.
master_key, master_iv = decrypt_master_key(password=password, user_salt=user_salt, user_iv=user_iv,
pbkdf2_rounds=pbkdf2_rounds, master_key_blob=master_key_blob,
format_version=format_version, checksum_salt=checksum_salt)
# Decrypt and unpad backup data using derivied key
# Decrypt and unpad backup data using derivied key.
cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv))
decryptor = cipher.decryptor()
decrypted_tar = decryptor.update(encrypted_data) + decryptor.finalize()
@ -143,6 +144,7 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_vers
def parse_backup_file(data, password=None):
"""
Parse an ab file, returns a tar file
"""
if not data.startswith(b"ANDROID BACKUP"):
raise AndroidBackupParsingError("Invalid file header")

94
mvt/common/command.py Normal file
View File

@ -0,0 +1,94 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
import sys
from mvt.common.indicators import Indicators
from mvt.common.module import run_module, save_timeline
class Command(object):
def __init__(self, target_path=None, results_path=None, ioc_files=[], module_name=None,
serial=None, fast_mode=False, log=logging.getLogger(__name__)):
self.target_path = target_path
self.results_path = results_path
self.ioc_files = ioc_files
self.module_name = module_name
self.serial = serial
self.fast_mode = fast_mode
self.log = log
self.iocs = Indicators(log=log)
self.iocs.load_indicators_files(ioc_files)
self.timeline = []
self.timeline_detected = []
def list_modules(self):
self.log.info("Following is the list of available %s modules:", self.name)
for module in self.modules:
self.log.info(" - %s", module.__name__)
def _create_storage(self):
if self.results_path and not os.path.exists(self.results_path):
try:
os.makedirs(self.results_path)
except Exception as e:
self.log.critical("Unable to create output folder %s: %s",
self.results_path, e)
sys.exit(1)
def _store_timeline(self):
if self.results_path:
if len(self.timeline) > 0:
save_timeline(self.timeline,
os.path.join(self.results_path, "timeline.csv"))
if len(self.timeline_detected) > 0:
save_timeline(self.timeline_detected,
os.path.join(self.results_path, "timeline_detected.csv"))
def init(self):
raise NotImplementedError
def module_init(self, module):
raise NotImplementedError
def run(self):
self._create_storage()
try:
self.init()
except NotImplementedError:
pass
for module in self.modules:
if self.module_name and module.__name__ != self.module_name:
continue
m = module(target_path=self.target_path, results_path=self.results_path, fast_mode=self.fast_mode,
log=logging.getLogger(module.__module__))
if self.iocs.total_ioc_count:
m.indicators = self.iocs
m.indicators.log = m.log
if self.serial:
m.serial = self.serial
try:
self.module_init(m)
except NotImplementedError:
pass
run_module(m)
self.timeline.extend(m.timeline)
self.timeline_detected.extend(m.timeline_detected)
self._store_timeline()

View File

@ -28,16 +28,16 @@ class MVTModule(object):
enabled = True
slug = None
def __init__(self, file_path=None, base_folder=None, output_folder=None,
def __init__(self, file_path=None, target_path=None, results_path=None,
fast_mode=False, log=None, results=None):
"""Initialize module.
:param file_path: Path to the module's database file, if there is any
:type file_path: str
:param base_folder: Path to the base folder (backup or filesystem dump)
:param target_path: Path to the target folder (backup or filesystem dump)
:type file_path: str
:param output_folder: Folder where results will be stored
:type output_folder: str
:param results_path: Folder where results will be stored
:type results_path: str
:param fast_mode: Flag to enable or disable slow modules
:type fast_mode: bool
:param log: Handle to logger
@ -45,8 +45,8 @@ class MVTModule(object):
:type results: list
"""
self.file_path = file_path
self.base_folder = base_folder
self.output_folder = output_folder
self.target_path = target_path
self.results_path = results_path
self.fast_mode = fast_mode
self.log = log
self.indicators = None
@ -82,14 +82,14 @@ class MVTModule(object):
def save_to_json(self):
"""Save the collected results to a json file."""
if not self.output_folder:
if not self.results_path:
return
name = self.get_slug()
if self.results:
results_file_name = f"{name}.json"
results_json_path = os.path.join(self.output_folder, results_file_name)
results_json_path = os.path.join(self.results_path, results_file_name)
with open(results_json_path, "w", encoding="utf-8") as handle:
try:
json.dump(self.results, handle, indent=4, default=str)
@ -99,7 +99,7 @@ class MVTModule(object):
if self.detected:
detected_file_name = f"{name}_detected.json"
detected_json_path = os.path.join(self.output_folder, detected_file_name)
detected_json_path = os.path.join(self.results_path, detected_file_name)
with open(detected_json_path, "w", encoding="utf-8") as handle:
json.dump(self.detected, handle, indent=4, default=str)

View File

@ -48,7 +48,7 @@ max-line-length = 1000
ignore =
C901,
E265,
E127,
F401,
E127,
W503,
E226

View File

@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import hashlib
import logging
from mvt.android.parsers.backup import parse_backup_file, parse_tar_for_sms

View File

@ -8,7 +8,6 @@ import os
from pathlib import Path
from mvt.android.modules.bugreport.appops import Appops
from mvt.common.indicators import Indicators
from mvt.common.module import run_module
from ..utils import get_artifact_folder

View File

@ -3,9 +3,6 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import hashlib
import logging
from mvt.android.parsers.dumpsys import parse_dumpsys_appops
from ..utils import get_artifact