Merge branch 'Te-k-feature/ios-check-usb'

This commit is contained in:
Nex 2022-06-29 00:57:32 +02:00
commit ae6fec5ac5
24 changed files with 592 additions and 60 deletions

View File

@ -28,7 +28,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest safety stix2
python -m pip install flake8 pytest safety stix2 pytest-mock
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
python -m pip install .
- name: Lint with flake8

View File

@ -13,8 +13,8 @@ from mvt.common.cmd_check_iocs import CmdCheckIOCS
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT, HELP_MSG_SERIAL)
from mvt.common.indicators import download_indicators_files
from mvt.common.logo import logo
from mvt.common.updates import IndicatorsUpdates
from .cmd_check_adb import CmdAndroidCheckADB
from .cmd_check_backup import CmdAndroidCheckBackup
@ -215,4 +215,5 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
#==============================================================================
@cli.command("download-iocs", help="Download public STIX2 indicators")
def download_indicators():
download_indicators_files(log)
ioc_updates = IndicatorsUpdates()
ioc_updates.update()

View File

@ -7,11 +7,13 @@ import json
import logging
import os
import requests
from appdirs import user_data_dir
from .url import URL
MVT_DATA_FOLDER = user_data_dir("mvt")
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
class Indicators:
"""This class is used to parse indicators from a STIX2 file and provide
@ -19,18 +21,17 @@ class Indicators:
"""
def __init__(self, log=logging.Logger) -> None:
self.data_dir = user_data_dir("mvt")
self.log = log
self.ioc_collections = []
self.total_ioc_count = 0
def _load_downloaded_indicators(self) -> None:
if not os.path.isdir(self.data_dir):
if not os.path.isdir(MVT_INDICATORS_FOLDER):
return
for f in os.listdir(self.data_dir):
if f.lower().endswith(".stix2"):
self.parse_stix2(os.path.join(self.data_dir, f))
for ioc_file_name in os.listdir(MVT_INDICATORS_FOLDER):
if ioc_file_name.lower().endswith(".stix2"):
self.parse_stix2(os.path.join(MVT_INDICATORS_FOLDER, ioc_file_name))
def _check_stix2_env_variable(self) -> None:
"""
@ -446,36 +447,3 @@ class Indicators:
self.log.warning("Found a known suspicious app with ID \"%s\" matching indicators from \"%s\"",
app_id, ioc["name"])
return ioc
def download_indicators_files(log: logging.Logger) -> None:
"""
Download indicators from repo into MVT app data directory.
"""
data_dir = user_data_dir("mvt")
if not os.path.isdir(data_dir):
os.makedirs(data_dir, exist_ok=True)
# Download latest list of indicators from the MVT repo.
res = requests.get("https://raw.githubusercontent.com/mvt-project/mvt-indicators/main/indicators.json")
if res.status_code != 200:
log.warning("Unable to find retrieve list of indicators from the MVT repository.")
return
for ioc_entry in res.json():
ioc_url = ioc_entry["stix2_url"]
log.info("Downloading indicator file %s from %s", ioc_entry["name"], ioc_url)
res = requests.get(ioc_url)
if res.status_code != 200:
log.warning("Could not find indicator file %s", ioc_url)
continue
clean_file_name = ioc_url.lstrip("https://").replace("/", "_")
ioc_path = os.path.join(data_dir, clean_file_name)
# Write file to disk. This will overwrite any older version of the STIX2 file.
with open(ioc_path, "w", encoding="utf-8") as handle:
handle.write(res.text)
log.info("Saved indicator file to %s", os.path.basename(ioc_path))

View File

@ -5,22 +5,55 @@
from rich import print
from .updates import check_for_updates
from .updates import IndicatorsUpdates, MVTUpdates
from .version import MVT_VERSION
def check_updates() -> None:
# First we check for MVT version udpates.
mvt_updates = MVTUpdates()
try:
latest_version = mvt_updates.check()
except Exception:
pass
else:
if latest_version:
print(f"\t\t[bold]Version {latest_version} is available! Upgrade mvt![/bold]")
# Then we check for indicators files updates.
ioc_updates = IndicatorsUpdates()
# Before proceeding, we check if we have downloaded an indicators index.
# If not, there's no point in proceeding with the updates check.
if ioc_updates.get_latest_update() == 0:
print("\t\t[bold]You have not yet downloaded any indicators, check the `download-iocs` command![/bold]")
return
# We only perform this check at a fixed frequency, in order to not
# overburden the user with too many lookups if the command is being run
# multiple times.
should_check, hours = ioc_updates.should_check()
if not should_check:
print(f"\t\tIndicators updates checked recently, next automatic check in {int(hours)} hours")
return
try:
ioc_to_update = ioc_updates.check()
except Exception:
pass
else:
if ioc_to_update:
print("\t\t[bold]There are updates to your indicators files! Run the `download-iocs` command to update![/bold]")
else:
print("\t\tYour indicators files seem to be up to date.")
def logo() -> None:
print("\n")
print("\t[bold]MVT[/bold] - Mobile Verification Toolkit")
print("\t\thttps://mvt.re")
print(f"\t\tVersion: {MVT_VERSION}")
try:
latest_version = check_for_updates()
except Exception:
pass
else:
if latest_version:
print(f"\t\t[bold]Version {latest_version} is available! Upgrade mvt![/bold]")
check_updates()
print("\n")

View File

@ -3,18 +3,209 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from datetime import datetime
import requests
import yaml
from packaging import version
from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER
from .version import MVT_VERSION
log = logging.getLogger(__name__)
def check_for_updates() -> str:
res = requests.get("https://pypi.org/pypi/mvt/json")
data = res.json()
latest_version = data.get("info", {}).get("version", "")
# In hours.
INDICATORS_CHECK_FREQUENCY = 12
if version.parse(latest_version) > version.parse(MVT_VERSION):
return latest_version
return ""
class MVTUpdates:
def check(self) -> str:
res = requests.get("https://pypi.org/pypi/mvt/json")
data = res.json()
latest_version = data.get("info", {}).get("version", "")
if version.parse(latest_version) > version.parse(MVT_VERSION):
return latest_version
return ""
class IndicatorsUpdates:
def __init__(self) -> None:
self.github_raw_url = "https://raw.githubusercontent.com/{}/{}/{}/{}"
self.index_owner = "mvt-project"
self.index_repo = "mvt-indicators"
self.index_branch = "main"
self.index_path = "indicators.yaml"
self.latest_update_path = os.path.join(MVT_DATA_FOLDER,
"latest_indicators_update")
self.latest_check_path = os.path.join(MVT_DATA_FOLDER,
"latest_indicators_check")
def get_latest_check(self) -> int:
if not os.path.exists(self.latest_check_path):
return 0
with open(self.latest_check_path, "r") as handle:
data = handle.read().strip()
if data:
return int(data)
return 0
def set_latest_check(self) -> None:
timestamp = int(datetime.utcnow().timestamp())
with open(self.latest_check_path, "w") as handle:
handle.write(str(timestamp))
def get_latest_update(self) -> int:
if not os.path.exists(self.latest_update_path):
return 0
with open(self.latest_update_path, "r") as handle:
data = handle.read().strip()
if data:
return int(data)
return 0
def set_latest_update(self) -> None:
timestamp = int(datetime.utcnow().timestamp())
with open(self.latest_update_path, "w") as handle:
handle.write(str(timestamp))
def get_remote_index(self) -> dict:
url = self.github_raw_url.format(self.index_owner, self.index_repo,
self.index_branch, self.index_path)
res = requests.get(url)
if res.status_code != 200:
log.error("Failed to retrieve indicators index located at %s (error %d)",
url, res.status_code)
return None
return yaml.safe_load(res.content)
def download_remote_ioc(self, ioc_url: str) -> str:
res = requests.get(ioc_url)
if res.status_code != 200:
log.error("Failed to download indicators file from %s (error %d)",
ioc_url, res.status_code)
return None
clean_file_name = ioc_url.lstrip("https://").replace("/", "_")
ioc_path = os.path.join(MVT_INDICATORS_FOLDER, clean_file_name)
with open(ioc_path, "w", encoding="utf-8") as handle:
handle.write(res.text)
return ioc_path
def update(self) -> None:
self.set_latest_check()
if not os.path.exists(MVT_INDICATORS_FOLDER):
os.makedirs(MVT_INDICATORS_FOLDER)
index = self.get_remote_index()
for ioc in index.get("indicators", []):
ioc_type = ioc.get("type", "")
if ioc_type == "github":
github = ioc.get("github", {})
owner = github.get("owner", "")
repo = github.get("repo", "")
branch = github.get("branch", "main")
path = github.get("path", "")
ioc_url = self.github_raw_url.format(owner, repo, branch, path)
else:
ioc_url = ioc.get("download_url", "")
if not ioc_url:
log.error("Could not find a way to download indicator file for %s",
ioc.get("name"))
continue
ioc_local_path = self.download_remote_ioc(ioc_url)
if not ioc_local_path:
continue
log.info("Downloaded indicators \"%s\" to %s",
ioc.get("name"), ioc_local_path)
self.set_latest_update()
def _get_remote_file_latest_commit(self, owner: str, repo: str,
branch: str, path: str) -> bool:
file_commit_url = f"https://api.github.com/repos/{self.index_owner}/{self.index_repo}/commits?path={self.index_path}"
res = requests.get(file_commit_url)
if res.status_code != 200:
log.error("Failed to get details about file %s (error %d)",
file_commit_url, res.status_code)
return False
details = res.json()
if len(details) == 0:
return False
latest_commit = details[0]
latest_commit_date = latest_commit.get("commit", {}).get("author", {}).get("date", None)
if not latest_commit_date:
log.error("Failed to retrieve date of latest update to indicators index file")
return False
latest_commit_dt = datetime.strptime(latest_commit_date, '%Y-%m-%dT%H:%M:%SZ')
latest_commit_ts = int(latest_commit_dt.timestamp())
return latest_commit_ts
def should_check(self) -> (bool, int):
now = datetime.utcnow()
latest_check_ts = self.get_latest_check()
latest_check_dt = datetime.fromtimestamp(latest_check_ts)
diff = now - latest_check_dt
diff_hours = divmod(diff.total_seconds(), 3600)[0]
if diff_hours >= INDICATORS_CHECK_FREQUENCY:
return True, 0
return False, INDICATORS_CHECK_FREQUENCY - diff_hours
def check(self) -> bool:
self.set_latest_check()
latest_update = self.get_latest_update()
latest_commit_ts = self._get_remote_file_latest_commit(self.index_owner,
self.index_repo,
self.index_branch,
self.index_path)
if latest_update < latest_commit_ts:
return True
index = self.get_remote_index()
for ioc in index.get("indicators", []):
if ioc.get("type", "") != "github":
continue
github = ioc.get("github", {})
owner = github.get("owner", "")
repo = github.get("repo", "")
branch = github.get("branch", "main")
path = github.get("path", "")
file_latest_commit_ts = self._get_remote_file_latest_commit(owner,
repo,
branch,
path)
if latest_update < file_latest_commit_ts:
return True
return False

View File

@ -13,13 +13,14 @@ from rich.prompt import Prompt
from mvt.common.cmd_check_iocs import CmdCheckIOCS
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT)
from mvt.common.indicators import download_indicators_files
HELP_MSG_OUTPUT, HELP_MSG_SERIAL)
from mvt.common.logo import logo
from mvt.common.options import MutuallyExclusiveOption
from mvt.common.updates import IndicatorsUpdates
from .cmd_check_backup import CmdIOSCheckBackup
from .cmd_check_fs import CmdIOSCheckFS
from .cmd_check_usb import CmdIOSCheckUSB
from .decrypt import DecryptBackup
from .modules.backup import BACKUP_MODULES
from .modules.fs import FS_MODULES
@ -213,4 +214,36 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
#==============================================================================
@cli.command("download-iocs", help="Download public STIX2 indicators")
def download_iocs():
download_indicators_files(log)
ioc_updates = IndicatorsUpdates()
ioc_updates.update()
#==============================================================================
# Command: check-usb
#==============================================================================
@cli.command("check-usb", help="Extract artifacts from a live iPhone through USB / lockdown")
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
# TODO: serial
# @click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
def check_usb(ctx, serial, iocs, output, fast, list_modules, module):
cmd = CmdIOSCheckUSB(results_path=output, ioc_files=iocs,
module_name=module, fast_mode=fast,
serial=serial)
if list_modules:
cmd.list_modules()
return
log.info("Checking iPhone through USB, this may take a while")
cmd.run()
if len(cmd.timeline_detected) > 0:
log.warning("The analysis of the data produced %d detections!",
len(cmd.timeline_detected))

46
mvt/ios/cmd_check_usb.py Normal file
View File

@ -0,0 +1,46 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import sys
from pymobiledevice3.exceptions import ConnectionFailedError
from pymobiledevice3.lockdown import LockdownClient
from mvt.common.command import Command
from .modules.usb import USB_MODULES
log = logging.getLogger(__name__)
class CmdIOSCheckUSB(Command):
name = "check-usb"
modules = USB_MODULES
def __init__(self, target_path: str = None, results_path: str = None,
ioc_files: list = [], module_name: str = None, serial: str = None,
fast_mode: bool = False):
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.lockdown = None
def init(self):
try:
if self.serial:
self.lockdown = LockdownClient(udid=self.serial)
else:
self.lockdown = LockdownClient()
except ConnectionRefusedError:
log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.")
sys.exit(-1)
except ConnectionFailedError:
log.error("Unable to connect to the device %s", self.serial)
sys.exit(-1)
def module_init(self, module):
module.lockdown = self.lockdown

View File

@ -0,0 +1,10 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .applications import Applications
from .device_info import DeviceInfo
from .processes import Processes
USB_MODULES = [Applications, DeviceInfo, Processes]

View File

@ -0,0 +1,45 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.services.installation_proxy import \
InstallationProxyService
from .base import IOSUSBExtraction
class Applications(IOSUSBExtraction):
"""This class extracts all applications installed on the phone"""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["CFBundleIdentifier"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
user_apps = InstallationProxyService(lockdown=self.lockdown).get_apps("User")
for user_app in user_apps:
user_app["type"] = "user"
system_apps = InstallationProxyService(lockdown=self.lockdown).get_apps("System")
for system_app in system_apps:
system_app["type"] = "system"
self.results = user_apps + system_apps
self.log.info("%d applications identified on the phone",
len(self.results))

View File

@ -0,0 +1,25 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.module import MVTModule
log = logging.getLogger(__name__)
class IOSUSBExtraction(MVTModule):
"""This class provides a base for all iOS USB extraction modules."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.device = None
self.serial = None
self.lockdown = None

View File

@ -0,0 +1,38 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import base64
import logging
from mvt.ios.versions import latest_ios_version
from .base import IOSUSBExtraction
class DeviceInfo(IOSUSBExtraction):
"""This class extracts all processes running on the phone."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self) -> None:
self.results = self.lockdown.all_values
for entry in self.results:
if isinstance(self.results[entry], bytes):
self.results[entry] = base64.b64encode(self.results[entry])
elif isinstance(self.results[entry], dict):
for second_entry in self.results[entry]:
if isinstance(self.results[entry][second_entry], bytes):
self.results[entry][second_entry] = base64.b64encode(self.results[entry][second_entry])
if "ProductVersion" in self.results:
latest = latest_ios_version()
if self.results["ProductVersion"] != latest["version"]:
self.log.warning("This phone is running an outdated iOS version: %s (latest is %s)",
self.results["ProductVersion"], latest['version'])

View File

@ -0,0 +1,42 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.services.os_trace import OsTraceService
from .base import IOSUSBExtraction
class Processes(IOSUSBExtraction):
"""This class extracts all processes running on the phone."""
def __init__(self, file_path: str = None, target_path: str = None,
results_path: str = None, fast_mode: bool = False,
log: logging.Logger = None, results: list = []) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_process(result["name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
processes = OsTraceService(lockdown=self.lockdown).get_pid_list().get('Payload')
for pid in processes:
self.results.append({
"pid": pid,
"name": processes[pid]["ProcessName"]
})
self.log.info("%d running processes identified on the phone",
len(self.results))

View File

@ -33,6 +33,8 @@ install_requires =
adb-shell >=0.4.2
libusb1 >=2.0.1
cryptography >=36.0.1
pymobiledevice3 >= 1.23.9
pyyaml >=6.0
[options.packages.find]
where = ./

View File

View File

@ -0,0 +1,35 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.lockdown import LockdownClient
from mvt.common.module import run_module
from mvt.ios.modules.usb.applications import Applications
class TestUSBApplication:
def test_run(self, mocker):
mocker.patch("pymobiledevice3.lockdown.LockdownClient.start_service")
mocker.patch("pymobiledevice3.usbmux.select_device")
mocker.patch("pymobiledevice3.service_connection.ServiceConnection.create")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.query_type",
return_value="com.apple.mobile.lockdown")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.validate_pairing",
return_value=True)
mocker.patch(
"pymobiledevice3.services.installation_proxy.InstallationProxyService.get_apps",
return_value=[{"CFBundleIdentifier": "com.bad.app"}]
)
lockdown = LockdownClient()
m = Applications(log=logging)
m.lockdown = lockdown
run_module(m)
assert len(m.results) == 2

View File

@ -0,0 +1,34 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from pymobiledevice3.lockdown import LockdownClient
from mvt.common.module import run_module
from mvt.ios.modules.usb.device_info import DeviceInfo
class TestUSBDeviceInfo:
def test_run(self, mocker):
mocker.patch("pymobiledevice3.usbmux.select_device")
mocker.patch("pymobiledevice3.service_connection.ServiceConnection.create")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.query_type",
return_value="com.apple.mobile.lockdown")
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.validate_pairing",
return_value=True)
mocker.patch(
"pymobiledevice3.lockdown.LockdownClient.get_value",
return_value={'DeviceClass': 'iPhone', 'ProductVersion': '14.3'}
)
lockdown = LockdownClient()
m = DeviceInfo(log=logging)
m.lockdown = lockdown
run_module(m)
assert len(m.results) == 2

View File

@ -0,0 +1,29 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.indicators import Indicators
from mvt.common.module import run_module
from mvt.ios.modules.usb.processes import Processes
class TestUSBProcesses:
def test_run(self, mocker, indicator_file):
mocker.patch("pymobiledevice3.services.base_service.BaseService.__init__")
mocker.patch(
"pymobiledevice3.services.os_trace.OsTraceService.get_pid_list",
return_value={"Payload": {"1": {"ProcessName": "storebookkeeperd"}, "1854": {"ProcessName": "cfprefssd"}}}
)
ind = Indicators(log=logging)
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["processes"].append("cfprefssd")
m = Processes(log=logging)
m.indicators = ind
run_module(m)
assert len(m.results) == 2
assert len(m.detected) == 1