mirror of
https://github.com/mvt-project/mvt.git
synced 2024-06-25 05:48:57 +00:00
Merge branch 'main' of github.com:mvt-project/mvt
This commit is contained in:
commit
e0c9a44b10
|
@ -15,6 +15,8 @@ class IndicatorsFileBadFormat(Exception):
|
|||
class Indicators:
|
||||
"""This class is used to parse indicators from a STIX2 file and provide
|
||||
functions to compare extracted artifacts to the indicators.
|
||||
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, log=None):
|
||||
|
@ -37,6 +39,7 @@ class Indicators:
|
|||
|
||||
:param file_path: Path to the STIX2 file to parse
|
||||
:type file_path: str
|
||||
|
||||
"""
|
||||
self.log.info("Parsing STIX2 indicators file at path %s",
|
||||
file_path)
|
||||
|
@ -82,6 +85,7 @@ class Indicators:
|
|||
:type url: str
|
||||
:returns: True if the URL matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
# TODO: If the IOC domain contains a subdomain, it is not currently
|
||||
# being matched.
|
||||
|
@ -153,6 +157,7 @@ class Indicators:
|
|||
:type urls: list
|
||||
:returns: True if any URL matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not urls:
|
||||
return False
|
||||
|
@ -171,6 +176,7 @@ class Indicators:
|
|||
:type process: str
|
||||
:returns: True if process matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not process:
|
||||
return False
|
||||
|
@ -196,6 +202,7 @@ class Indicators:
|
|||
:type processes: list
|
||||
:returns: True if process matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not processes:
|
||||
return False
|
||||
|
@ -213,6 +220,7 @@ class Indicators:
|
|||
:type email: str
|
||||
:returns: True if email address matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not email:
|
||||
return False
|
||||
|
@ -231,6 +239,7 @@ class Indicators:
|
|||
:type file_path: str
|
||||
:returns: True if the file path matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not file_path:
|
||||
return False
|
||||
|
|
|
@ -23,8 +23,7 @@ class InsufficientPrivileges(Exception):
|
|||
pass
|
||||
|
||||
class MVTModule(object):
|
||||
"""This class provides a base for all extraction modules.
|
||||
"""
|
||||
"""This class provides a base for all extraction modules."""
|
||||
|
||||
enabled = True
|
||||
slug = None
|
||||
|
@ -66,8 +65,7 @@ class MVTModule(object):
|
|||
return cls(results=results, log=log)
|
||||
|
||||
def get_slug(self):
|
||||
"""Use the module's class name to retrieve a slug
|
||||
"""
|
||||
"""Use the module's class name to retrieve a slug"""
|
||||
if self.slug:
|
||||
return self.slug
|
||||
|
||||
|
@ -77,12 +75,13 @@ class MVTModule(object):
|
|||
def check_indicators(self):
|
||||
"""Check the results of this module against a provided list of
|
||||
indicators.
|
||||
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def save_to_json(self):
|
||||
"""Save the collected results to a json file.
|
||||
"""
|
||||
"""Save the collected results to a json file."""
|
||||
if not self.output_folder:
|
||||
return
|
||||
|
||||
|
@ -112,6 +111,7 @@ class MVTModule(object):
|
|||
"""Serialize entry as JSON to deduplicate repeated entries
|
||||
|
||||
:param timeline: List of entries from timeline to deduplicate
|
||||
|
||||
"""
|
||||
timeline_set = set()
|
||||
for record in timeline:
|
||||
|
@ -141,8 +141,7 @@ class MVTModule(object):
|
|||
self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
|
||||
|
||||
def run(self):
|
||||
"""Run the main module procedure.
|
||||
"""
|
||||
"""Run the main module procedure."""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
|
@ -190,6 +189,7 @@ def save_timeline(timeline, timeline_path):
|
|||
|
||||
:param timeline: List of records to order and store
|
||||
:param timeline_path: Path to the csv file to store the timeline to
|
||||
|
||||
"""
|
||||
with io.open(timeline_path, "a+", encoding="utf-8") as handle:
|
||||
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"")
|
||||
|
|
|
@ -268,6 +268,7 @@ class URL:
|
|||
:type url: str
|
||||
:returns: Domain name extracted from URL
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
# TODO: Properly handle exception.
|
||||
try:
|
||||
|
@ -282,6 +283,7 @@ class URL:
|
|||
:type url: str
|
||||
:returns: Top-level domain name extracted from URL
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
# TODO: Properly handle exception.
|
||||
try:
|
||||
|
@ -292,8 +294,11 @@ class URL:
|
|||
def check_if_shortened(self) -> bool:
|
||||
"""Check if the URL is among list of shortener services.
|
||||
|
||||
|
||||
:returns: True if the URL is shortened, otherwise False
|
||||
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if self.domain.lower() in SHORTENER_DOMAINS:
|
||||
self.is_shortened = True
|
||||
|
@ -301,8 +306,7 @@ class URL:
|
|||
return self.is_shortened
|
||||
|
||||
def unshorten(self):
|
||||
"""Unshorten the URL by requesting an HTTP HEAD response.
|
||||
"""
|
||||
"""Unshorten the URL by requesting an HTTP HEAD response."""
|
||||
res = requests.head(self.url)
|
||||
if str(res.status_code).startswith("30"):
|
||||
return res.headers["Location"]
|
||||
|
|
|
@ -16,6 +16,7 @@ def convert_mactime_to_unix(timestamp, from_2001=True):
|
|||
:param from_2001: bool: Whether to (Default value = True)
|
||||
:param from_2001: Default value = True)
|
||||
:returns: Unix epoch timestamp.
|
||||
|
||||
"""
|
||||
if not timestamp:
|
||||
return None
|
||||
|
@ -42,6 +43,7 @@ def convert_chrometime_to_unix(timestamp):
|
|||
:param timestamp: Chrome timestamp as int.
|
||||
:type timestamp: int
|
||||
:returns: Unix epoch timestamp.
|
||||
|
||||
"""
|
||||
epoch_start = datetime.datetime(1601, 1 , 1)
|
||||
delta = datetime.timedelta(microseconds=timestamp)
|
||||
|
@ -55,6 +57,7 @@ def convert_timestamp_to_iso(timestamp):
|
|||
:type timestamp: int
|
||||
:returns: ISO timestamp string in YYYY-mm-dd HH:MM:SS.ms format.
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
try:
|
||||
return timestamp.strftime("%Y-%m-%d %H:%M:%S.%f")
|
||||
|
@ -67,6 +70,7 @@ def check_for_links(text):
|
|||
:param text: Any provided text.
|
||||
:type text: str
|
||||
:returns: Search results.
|
||||
|
||||
"""
|
||||
return re.findall("(?P<url>https?://[^\s]+)", text, re.IGNORECASE)
|
||||
|
||||
|
@ -92,6 +96,7 @@ def keys_bytes_to_string(obj):
|
|||
:param obj: Object to convert from bytes to string.
|
||||
:returns: Object converted to string.
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
new_obj = {}
|
||||
if not isinstance(obj, dict):
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
import requests
|
||||
from packaging import version
|
||||
|
||||
MVT_VERSION = "1.2.10"
|
||||
MVT_VERSION = "1.2.12"
|
||||
|
||||
def check_for_updates():
|
||||
res = requests.get("https://pypi.org/pypi/mvt/json")
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
from .cache_files import CacheFiles
|
||||
from .filesystem import Filesystem
|
||||
from .net_netusage import Netusage
|
||||
from .networking_analytics import NetworkingAnalytics
|
||||
from .safari_favicon import SafariFavicon
|
||||
from .shutdownlog import ShutdownLog
|
||||
from .version_history import IOSVersionHistory
|
||||
|
@ -13,6 +14,6 @@ from .webkit_indexeddb import WebkitIndexedDB
|
|||
from .webkit_localstorage import WebkitLocalStorage
|
||||
from .webkit_safariviewservice import WebkitSafariViewService
|
||||
|
||||
FS_MODULES = [CacheFiles, Filesystem, Netusage, SafariFavicon, ShutdownLog,
|
||||
FS_MODULES = [CacheFiles, Filesystem, Netusage, NetworkingAnalytics, SafariFavicon, ShutdownLog,
|
||||
IOSVersionHistory, WebkitIndexedDB, WebkitLocalStorage,
|
||||
WebkitSafariViewService,]
|
||||
|
|
|
@ -28,8 +28,8 @@ class Filesystem(IOSExtraction):
|
|||
return {
|
||||
"timestamp": record["modified"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": "file_modified",
|
||||
"data": record["file_path"],
|
||||
"event": "entry_modified",
|
||||
"data": record["path"],
|
||||
}
|
||||
|
||||
def check_indicators(self):
|
||||
|
@ -37,16 +37,39 @@ class Filesystem(IOSExtraction):
|
|||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_file(result["file_path"]):
|
||||
if self.indicators.check_file(result["path"]):
|
||||
self.log.warning("Found a known malicious file at path: %s", result["path"])
|
||||
self.detected.append(result)
|
||||
|
||||
# If we are instructed to run fast, we skip this.
|
||||
if self.fast_mode:
|
||||
self.log.info("Flag --fast was enabled: skipping extended search for suspicious files/processes")
|
||||
else:
|
||||
for ioc in self.indicators.ioc_processes:
|
||||
parts = result["path"].split("/")
|
||||
if ioc in parts:
|
||||
self.log.warning("Found a known malicious file/process at path: %s", result["path"])
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
for root, dirs, files in os.walk(self.base_folder):
|
||||
for dir_name in dirs:
|
||||
try:
|
||||
dir_path = os.path.join(root, dir_name)
|
||||
result = {
|
||||
"path": os.path.relpath(dir_path, self.base_folder),
|
||||
"modified": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(dir_path).st_mtime)),
|
||||
}
|
||||
except:
|
||||
continue
|
||||
else:
|
||||
self.results.append(result)
|
||||
|
||||
for file_name in files:
|
||||
try:
|
||||
file_path = os.path.join(root, file_name)
|
||||
result = {
|
||||
"file_path": os.path.relpath(file_path, self.base_folder),
|
||||
"path": os.path.relpath(file_path, self.base_folder),
|
||||
"modified": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(file_path).st_mtime)),
|
||||
}
|
||||
except:
|
||||
|
|
91
mvt/ios/modules/fs/networking_analytics.py
Normal file
91
mvt/ios/modules/fs/networking_analytics.py
Normal file
|
@ -0,0 +1,91 @@
|
|||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021 The MVT Project Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import plistlib
|
||||
import sqlite3
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
NETWORKING_ANALYTICS_DB_PATH = [
|
||||
"private/var/Keychains/Analytics/networking_analytics.db",
|
||||
]
|
||||
|
||||
class NetworkingAnalytics(IOSExtraction):
|
||||
"""This module extracts information from the networking_analytics.db file."""
|
||||
|
||||
def __init__(self, file_path=None, base_folder=None, output_folder=None,
|
||||
fast_mode=False, log=None, results=[]):
|
||||
super().__init__(file_path=file_path, base_folder=base_folder,
|
||||
output_folder=output_folder, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def serialize(self, record):
|
||||
return {
|
||||
"timestamp": record["timestamp"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": "network_crash",
|
||||
"data": f"{record}",
|
||||
}
|
||||
|
||||
def check_indicators(self):
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
for ioc in self.indicators.ioc_processes:
|
||||
for key in result.keys():
|
||||
if ioc == result[key]:
|
||||
self.log.warning("Found mention of a known malicious process \"%s\" in networking_analytics.db at %s",
|
||||
ioc, result["timestamp"])
|
||||
self.detected.append(result)
|
||||
break
|
||||
|
||||
def _extract_networking_analytics_data(self):
|
||||
conn = sqlite3.connect(self.file_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute("""
|
||||
SELECT
|
||||
timestamp,
|
||||
data
|
||||
FROM hard_failures
|
||||
UNION
|
||||
SELECT
|
||||
timestamp,
|
||||
data
|
||||
FROM soft_failures;
|
||||
""")
|
||||
|
||||
for row in cur:
|
||||
if row[0] and row[1]:
|
||||
timestamp = convert_timestamp_to_iso(convert_mactime_to_unix(row[0], False))
|
||||
data = plistlib.loads(row[1])
|
||||
data["timestamp"] = timestamp
|
||||
elif row[0]:
|
||||
timestamp = convert_timestamp_to_iso(convert_mactime_to_unix(row[0], False))
|
||||
data = {}
|
||||
data["timestamp"] = timestamp
|
||||
elif row[1]:
|
||||
timestamp = ""
|
||||
data = plistlib.loads(row[1])
|
||||
data["timestamp"] = timestamp
|
||||
|
||||
self.results.append(data)
|
||||
|
||||
self.results = sorted(self.results, key=lambda entry: entry["timestamp"])
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
self.log.info("Extracted information on %d network crashes", len(self.results))
|
||||
|
||||
def run(self):
|
||||
self._find_ios_database(root_paths=NETWORKING_ANALYTICS_DB_PATH)
|
||||
if self.file_path:
|
||||
self.log.info("Found networking_analytics.db log at path: %s", self.file_path)
|
||||
self._extract_networking_analytics_data()
|
||||
else:
|
||||
self.log.info("networking_analytics.db not found")
|
14
setup.py
14
setup.py
|
@ -16,18 +16,18 @@ with open(readme_path, encoding="utf-8") as handle:
|
|||
|
||||
requires = (
|
||||
# Base dependencies:
|
||||
"click>=8.0.1",
|
||||
"rich>=10.6.0",
|
||||
"click>=8.0.3",
|
||||
"rich>=10.12.0",
|
||||
"tld>=0.12.6",
|
||||
"tqdm>=4.61.2",
|
||||
"tqdm>=4.62.3",
|
||||
"requests>=2.26.0",
|
||||
"simplejson>=3.17.3",
|
||||
"simplejson>=3.17.5",
|
||||
"packaging>=21.0",
|
||||
# iOS dependencies:
|
||||
"iOSbackup>=0.9.912",
|
||||
"iOSbackup>=0.9.921",
|
||||
# Android dependencies:
|
||||
"adb-shell>=0.4.0",
|
||||
"libusb1>=1.9.3",
|
||||
"adb-shell>=0.4.1",
|
||||
"libusb1>=2.0.1",
|
||||
)
|
||||
|
||||
def get_package_data(package):
|
||||
|
|
Loading…
Reference in New Issue
Block a user