Refactor detection in artifacts

This commit is contained in:
tek 2023-07-23 00:35:35 +02:00
parent 35dfe85d39
commit 3e38180dd9
21 changed files with 290 additions and 197 deletions

View File

View File

@ -0,0 +1,22 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from datetime import datetime, timedelta
from mvt.common.artifact import Artifact
class AndroidArtifact(Artifact):
def warn_patch_level(self, patch_level: str, log) -> bool:
"""Alert if Android patch level out-of-date"""
patch_date = datetime.strptime(patch_level, "%Y-%m-%d")
if (datetime.now() - patch_date) > timedelta(days=6 * 31):
log.warning(
"This phone has not received security updates "
"for more than six months (last update: %s)",
patch_level,
)
return True
return False

View File

@ -0,0 +1,57 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import re
from typing import Dict, List
from .artifact import AndroidArtifact
INTERESTING_PROPERTIES = [
"gsm.sim.operator.alpha",
"gsm.sim.operator.iso-country",
"persist.sys.timezone",
"ro.boot.serialno",
"ro.build.version.sdk",
"ro.build.version.security_patch",
"ro.product.cpu.abi",
"ro.product.locale",
"ro.product.vendor.manufacturer",
"ro.product.vendor.model",
"ro.product.vendor.name",
]
class GetProp(AndroidArtifact):
def parse(self, entry: str) -> None:
self.results: List[Dict[str, str]] = []
rxp = re.compile(r"\[(.+?)\]: \[(.+?)\]")
for line in entry.splitlines():
line = line.strip()
if line == "":
continue
matches = re.findall(rxp, line)
if not matches or len(matches[0]) != 2:
continue
entry = {"name": matches[0][0], "value": matches[0][1]}
self.results.append(entry)
def check_indicators(self) -> None:
for entry in self.results:
if entry["name"] in INTERESTING_PROPERTIES:
self.log.info("%s: %s", entry["name"], entry["value"])
if entry["name"] == "ro.build.version.security_patch":
self.warn_patch_level(entry["value"], self.log)
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_android_property_name(result.get("name", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@ -0,0 +1,69 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .artifact import AndroidArtifact
class Processes(AndroidArtifact):
def parse(self, entry: str) -> None:
for line in entry.split("\n")[1:]:
proc = line.split()
# Skip empty lines
if len(proc) == 0:
continue
# Sometimes WCHAN is empty.
if len(proc) == 8:
proc = proc[:5] + [""] + proc[5:]
# Sometimes there is the security label.
if proc[0].startswith("u:r"):
label = proc[0]
proc = proc[1:]
else:
label = ""
# Sometimes there is no WCHAN.
if len(proc) < 9:
proc = proc[:5] + [""] + proc[5:]
self.results.append(
{
"user": proc[0],
"pid": int(proc[1]),
"ppid": int(proc[2]),
"virtual_memory_size": int(proc[3]),
"resident_set_size": int(proc[4]),
"wchan": proc[5],
"aprocress": proc[6],
"stat": proc[7],
"proc_name": proc[8].strip("[]"),
"label": label,
}
)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
proc_name = result.get("proc_name", "")
if not proc_name:
continue
# Skipping this process because of false positives.
if result["proc_name"] == "gatekeeperd":
continue
ioc = self.indicators.check_app_id(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc = self.indicators.check_process(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@ -2,45 +2,7 @@
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from mvt.android.utils import warn_android_patch_level
INTERESTING_PROPERTIES = [
"gsm.sim.operator.alpha",
"gsm.sim.operator.iso-country",
"persist.sys.timezone",
"ro.boot.serialno",
"ro.build.version.sdk",
"ro.build.version.security_patch",
"ro.product.cpu.abi",
"ro.product.locale",
"ro.product.vendor.manufacturer",
"ro.product.vendor.model",
"ro.product.vendor.name",
]
class GetPropDetectionMixin(object):
"""Mixin to have cosistent detection logic across various extraction modules."""
def check_indicators(self) -> None:
for entry in self.results:
if entry["name"] in INTERESTING_PROPERTIES:
self.log.info("%s: %s", entry["name"], entry["value"])
if entry["name"] == "ro.build.version.security_patch":
warn_android_patch_level(entry["value"], self.log)
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_android_property_name(result.get("name", ""))
print(result.get("name", ""), ioc)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
from .artifact import AndroidArtifact
ANDROID_DANGEROUS_SETTINGS = [
{
@ -91,7 +53,7 @@ ANDROID_DANGEROUS_SETTINGS = [
]
class SettingsDetectionMixin(object):
class Settings(AndroidArtifact):
def check_indicators(self) -> None:
for namespace, settings in self.results.items():
for key, value in settings.items():
@ -107,29 +69,3 @@ class SettingsDetectionMixin(object):
danger["description"],
)
break
class ProcessDetectionMixin(object):
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
proc_name = result.get("proc_name", "")
if not proc_name:
continue
# Skipping this process because of false positives.
if result["proc_name"] == "gatekeeperd":
continue
ioc = self.indicators.check_app_id(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc = self.indicators.check_process(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@ -6,13 +6,12 @@
import logging
from typing import Optional
from mvt.android.modules.detection_mixins import GetPropDetectionMixin
from mvt.android.parsers import parse_getprop
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
from .base import AndroidExtraction
class Getprop(GetPropDetectionMixin, AndroidExtraction):
class Getprop(GetPropArtifact, AndroidExtraction):
"""This module extracts device properties from getprop command."""
def __init__(
@ -40,5 +39,5 @@ class Getprop(GetPropDetectionMixin, AndroidExtraction):
output = self._adb_command("getprop")
self._adb_disconnect()
self.results = parse_getprop(output)
self.parse(output)
self.log.info("Extracted %d Android system properties", len(self.results))

View File

@ -6,11 +6,12 @@
import logging
from typing import Optional
from mvt.android.artifacts.processes import Processes as ProcessesArtifact
from .base import AndroidExtraction
from mvt.android.modules.detection_mixins import ProcessDetectionMixin
class Processes(ProcessDetectionMixin, AndroidExtraction):
class Processes(ProcessesArtifact, AndroidExtraction):
"""This module extracts details on running processes."""
def __init__(
@ -35,33 +36,7 @@ class Processes(ProcessDetectionMixin, AndroidExtraction):
self._adb_connect()
output = self._adb_command("ps -A")
for line in output.splitlines()[1:]:
line = line.strip()
if line == "":
continue
fields = line.split()
proc = {
"user": fields[0],
"pid": fields[1],
"parent_pid": fields[2],
"vsize": fields[3],
"rss": fields[4],
}
# Sometimes WCHAN is empty, so we need to re-align output fields.
if len(fields) == 8:
proc["wchan"] = ""
proc["pc"] = fields[5]
proc["name"] = fields[7]
elif len(fields) == 9:
proc["wchan"] = fields[5]
proc["pc"] = fields[6]
proc["name"] = fields[8]
self.results.append(proc)
self.parse(output)
self._adb_disconnect()
self.log.info("Extracted records on a total of %d processes", len(self.results))

View File

@ -6,11 +6,12 @@
import logging
from typing import Optional
from mvt.android.artifacts.settings import Settings as SettingsArtifact
from .base import AndroidExtraction
from mvt.android.modules.detection_mixins import SettingsDetectionMixin
class Settings(SettingsDetectionMixin, AndroidExtraction):
class Settings(SettingsArtifact, AndroidExtraction):
"""This module extracts Android system settings."""
def __init__(

View File

@ -6,13 +6,12 @@
import logging
from typing import Optional
from mvt.android.modules.detection_mixins import GetPropDetectionMixin
from mvt.android.parsers.getprop import parse_getprop
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
from .base import AndroidQFModule
class Getprop(GetPropDetectionMixin, AndroidQFModule):
class Getprop(GetPropArtifact, AndroidQFModule):
"""This module extracts data from get properties."""
def __init__(
@ -43,5 +42,5 @@ class Getprop(GetPropDetectionMixin, AndroidQFModule):
with open(getprop_files[0]) as f:
data = f.read()
self.results = parse_getprop(data)
self.parse(data)
self.log.info("Extracted a total of %d properties", len(self.results))

View File

@ -6,11 +6,12 @@
import logging
from typing import Optional
from mvt.android.artifacts.processes import Processes as ProcessesArtifact
from .base import AndroidQFModule
from mvt.android.modules.detection_mixins import ProcessDetectionMixin
class Processes(ProcessDetectionMixin, AndroidQFModule):
class Processes(ProcessesArtifact, AndroidQFModule):
"""This module analyse running processes"""
def __init__(
@ -31,46 +32,12 @@ class Processes(ProcessDetectionMixin, AndroidQFModule):
results=results,
)
def _parse_ps(self, data):
for line in data.split("\n")[1:]:
proc = line.split()
# Sometimes WCHAN is empty.
if len(proc) == 8:
proc = proc[:5] + [""] + proc[5:]
# Sometimes there is the security label.
if proc[0].startswith("u:r"):
label = proc[0]
proc = proc[1:]
else:
label = ""
# Sometimes there is no WCHAN.
if len(proc) < 9:
proc = proc[:5] + [""] + proc[5:]
self.results.append(
{
"user": proc[0],
"pid": int(proc[1]),
"ppid": int(proc[2]),
"virtual_memory_size": int(proc[3]),
"resident_set_size": int(proc[4]),
"wchan": proc[5],
"aprocress": proc[6],
"stat": proc[7],
"proc_name": proc[8].strip("[]"),
"label": label,
}
)
def run(self) -> None:
ps_files = self._get_files_by_pattern("*/ps.txt")
if not ps_files:
return
with open(ps_files[0]) as handle:
self._parse_ps(handle.read())
self.parse(handle.read())
self.log.info("Identified %d running processes", len(self.results))

View File

@ -6,13 +6,12 @@
import logging
from typing import Optional
from mvt.android.modules.detection_mixins import SettingsDetectionMixin
from mvt.android.artifacts.settings import Settings as SettingsArtifact
from .base import AndroidQFModule
class Settings(SettingsDetectionMixin, AndroidQFModule):
class Settings(SettingsArtifact, AndroidQFModule):
"""This module analyse setting files"""
def __init__(

View File

@ -6,13 +6,12 @@
import logging
from typing import Optional
from mvt.android.modules.detection_mixins import GetPropDetectionMixin
from mvt.android.parsers import parse_getprop
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
from .base import BugReportModule
class Getprop(GetPropDetectionMixin, BugReportModule):
class Getprop(GetPropArtifact, BugReportModule):
"""This module extracts device properties from getprop command."""
def __init__(
@ -33,7 +32,7 @@ class Getprop(GetPropDetectionMixin, BugReportModule):
results=results,
)
self.results = {} if not results else results
self.results = [] if not results else results
def run(self) -> None:
content = self._get_dumpstate_file()
@ -60,5 +59,5 @@ class Getprop(GetPropDetectionMixin, BugReportModule):
lines.append(line)
self.results = parse_getprop("\n".join(lines))
self.parse("\n".join(lines))
self.log.info("Extracted %d Android system properties", len(self.results))

View File

@ -12,4 +12,3 @@ from .dumpsys import (
parse_dumpsys_dbinfo,
parse_dumpsys_receiver_resolver_table,
)
from .getprop import parse_getprop

View File

@ -1,26 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import re
from typing import Dict, List
def parse_getprop(output: str) -> List[Dict[str, str]]:
results = []
rxp = re.compile(r"\[(.+?)\]: \[(.+?)\]")
for line in output.splitlines():
line = line.strip()
if line == "":
continue
matches = re.findall(rxp, line)
if not matches or len(matches[0]) != 2:
continue
entry = {"name": matches[0][0], "value": matches[0][1]}
results.append(entry)
return results

View File

@ -1,19 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from datetime import datetime, timedelta
def warn_android_patch_level(patch_level: str, log) -> bool:
"""Alert if Android patch level out-of-date"""
patch_date = datetime.strptime(patch_level, "%Y-%m-%d")
if (datetime.now() - patch_date) > timedelta(months=6):
log.warning(
"This phone has not received security updates "
"for more than six months (last update: %s)",
patch_level,
)
return True
return False

5
mvt/common/artifact.py Normal file
View File

@ -0,0 +1,5 @@
class Artifact:
def __init__(self, *args, **kwargs):
self.results = []
self.detected = []
super().__init__(*args, **kwargs)

View File

@ -5,7 +5,7 @@
import logging
import sqlite3
from typing import Union, Optional
from typing import Optional, Union
from mvt.common.utils import convert_mactime_to_iso

View File

@ -0,0 +1,41 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.artifacts.getprop import GetProp
from mvt.common.indicators import Indicators
from ..utils import get_artifact
class TestGetPropArtifact:
def test_parsing(self):
gp = GetProp()
file = get_artifact("android_data/getprop.txt")
with open(file) as f:
data = f.read()
assert len(gp.results) == 0
gp.parse(data)
assert len(gp.results) == 13
assert gp.results[0]["name"] == "af.fast_track_multiplier"
assert gp.results[0]["value"] == "1"
def test_ioc_check(self, indicator_file):
gp = GetProp()
file = get_artifact("android_data/getprop.txt")
with open(file) as f:
data = f.read()
gp.parse(data)
ind = Indicators(log=logging.getLogger())
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["android_property_names"].append(
"dalvik.vm.appimageformat"
)
gp.indicators = ind
assert len(gp.detected) == 0
gp.check_indicators()
assert len(gp.detected) == 1

View File

@ -0,0 +1,38 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.artifacts.processes import Processes
from mvt.common.indicators import Indicators
from ..utils import get_artifact
class TestProcessesArtifact:
def test_parsing(self):
p = Processes()
file = get_artifact("android_data/ps.txt")
with open(file) as f:
data = f.read()
assert len(p.results) == 0
p.parse(data)
assert len(p.results) == 17
assert p.results[0]["proc_name"] == "init"
def test_ioc_check(self, indicator_file):
p = Processes()
file = get_artifact("android_data/ps.txt")
with open(file) as f:
data = f.read()
p.parse(data)
ind = Indicators(log=logging.getLogger())
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["processes"].append("lru-add-drain")
p.indicators = ind
assert len(p.detected) == 0
p.check_indicators()
assert len(p.detected) == 1

View File

@ -0,0 +1,14 @@
[af.fast_track_multiplier]: [1]
[audio.deep_buffer.media]: [true]
[audio.offload.min.duration.secs]: [30]
[audio.offload.video]: [true]
[av.debug.disable.pers.cache]: [1]
[dalvik.vm.appimageformat]: [lz4]
[dalvik.vm.dex2oat-Xms]: [64m]
[dalvik.vm.dex2oat-Xmx]: [512m]
[dalvik.vm.dex2oat-max-image-block-size]: [524288]
[dalvik.vm.dex2oat-minidebuginfo]: [true]
[dalvik.vm.dex2oat-resolve-startup-strings]: [true]
[dalvik.vm.dexopt.secondary]: [true]
[dalvik.vm.heapgrowthlimit]: [192m]

View File

@ -0,0 +1,18 @@
USER PID PPID VSZ RSS WCHAN ADDR S NAME
root 1 0 68696 2864 0 0 S init
root 2 0 0 0 0 0 S [kthreadd]
root 4 2 0 0 0 0 S [kworker/0:0H]
root 5 2 0 0 0 0 S [kworker/u16:0]
root 6 2 0 0 0 0 S [ksoftirqd/0]
root 7 2 0 0 0 0 S [rcu_preempt]
root 8 2 0 0 0 0 S [rcu_sched]
root 9 2 0 0 0 0 S [rcu_bh]
root 10 2 0 0 0 0 S [rcuop/0]
root 11 2 0 0 0 0 S [rcuos/0]
root 12 2 0 0 0 0 S [rcuob/0]
root 13 2 0 0 0 0 S [migration/0]
root 14 2 0 0 0 0 S [lru-add-drain]
root 15 2 0 0 0 0 S [cpuhp/0]
root 16 2 0 0 0 0 S [cpuhp/1]
root 17 2 0 0 0 0 S [migration/1]
root 18 2 0 0 0 0 S [ksoftirqd/1]