WIP for Triangulation post-processing module

This commit is contained in:
Donncha Ó Cearbhaill 2023-06-28 21:46:18 +02:00
parent 289b7efdeb
commit 95b2f04db6
7 changed files with 179 additions and 8 deletions

View File

@ -7,6 +7,7 @@ import logging
import os
from typing import Optional
from mvt.common.module import PostAnalysisModule
from mvt.common.command import Command
log = logging.getLogger(__name__)
@ -32,6 +33,7 @@ class CmdCheckIOCS(Command):
def run(self) -> None:
assert self.target_path is not None
all_modules = []
post_modules = []
for entry in self.modules:
if entry not in all_modules:
all_modules.append(entry)
@ -43,18 +45,24 @@ class CmdCheckIOCS(Command):
name_only, _ = os.path.splitext(file_name)
file_path = os.path.join(self.target_path, file_name)
for iocs_module in all_modules:
if self.module_name and iocs_module.__name__ != self.module_name:
for module in all_modules:
if self.module_name and module.__name__ != self.module_name:
continue
if iocs_module().get_slug() != name_only:
# Handle post-analysis modules at the end
if issubclass(module, PostAnalysisModule) and module not in post_modules:
post_modules.append(module)
continue
# Skip if the current result file does not match the module name
if module().get_slug() != name_only:
continue
log.info("Loading results from \"%s\" with module %s",
file_name, iocs_module.__name__)
file_name, module.__name__)
m = iocs_module.from_json(file_path,
log=logging.getLogger(iocs_module.__module__))
m = module.from_json(file_path,
log=logging.getLogger(module.__module__))
if self.iocs.total_ioc_count > 0:
m.indicators = self.iocs
m.indicators.log = m.log
@ -66,6 +74,13 @@ class CmdCheckIOCS(Command):
else:
total_detections += len(m.detected)
# Run post-analysis modules at end
for post_module in post_modules:
m = post_module.from_results(self.target_path, log=log)
m.run()
total_detections += len(m.detected)
if total_detections > 0:
log.warning("The check of the results produced %d detections!",
total_detections)

View File

@ -33,6 +33,7 @@ class Command:
) -> None:
self.name = ""
self.modules = []
self.modules_post = []
self.target_path = target_path
self.results_path = results_path
@ -139,7 +140,7 @@ class Command:
def list_modules(self) -> None:
self.log.info("Following is the list of available %s modules:",
self.name)
for module in self.modules:
for module in (self.modules + self.modules_post):
self.log.info(" - %s", module.__name__)
def init(self) -> None:

View File

@ -7,6 +7,7 @@ import csv
import logging
import os
import re
import glob
from typing import Any, Dict, List, Optional, Union
import simplejson as json
@ -225,3 +226,51 @@ def save_timeline(timeline: list, timeline_path: str) -> None:
event.get("event"),
event.get("data"),
])
class PostAnalysisModule(MVTModule):
"""
Base module for implementing post-processing rules against the output of
multiple MVT modules
"""
@classmethod
def from_results(cls, results_path: str, log: logging.Logger):
results = cls.load_results(results_path, log=log)
return cls(results=results, log=log)
@classmethod
def load_results(cls, results_path: str, log: logging.Logger):
"""Load the results from a directory of json file."""
# TODO: Move this to run once before loading all post-processing modules
module_results = {}
for json_path in glob.glob(os.path.join(results_path, "*.json")):
module_name, _ = os.path.splitext(os.path.basename(json_path))
with open(json_path, "r", encoding="utf-8") as handle:
try:
module_results[module_name] = json.load(handle)
except Exception as exc:
log.error("Unable to load results from file %s: %s",
json_path, exc)
if not module_results:
log.error("Did not find any MVT results at %s", results_path)
return module_results
def load_timeline(self):
"""Load timeline from CSV file"""
timeline = []
timeline_path = os.path.join(self.results_path, "timeline.csv")
with open(timeline_path, "r", encoding="utf-8") as handle:
csvinput = csv.reader(handle, delimiter=",", quotechar="\"",
quoting=csv.QUOTE_ALL, escapechar='\\')
for row in csvinput:
if row[0] == "UTC Timestamp":
continue
timeline.append({
"timestamp": row[0],
"module": row[1],
"event": row[2],
"data": row[3],
})
return timeline

View File

@ -26,6 +26,7 @@ from .decrypt import DecryptBackup
from .modules.backup import BACKUP_MODULES
from .modules.fs import FS_MODULES
from .modules.mixed import MIXED_MODULES
from .modules.post_analysis import POST_ANALYSIS_MODULES
init_logging()
log = logging.getLogger("mvt")
@ -234,7 +235,7 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
@click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder):
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES
cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES + POST_ANALYSIS_MODULES
if list_modules:
cmd.list_modules()

View File

@ -158,6 +158,7 @@ class Manifest(IOSExtraction):
"mode": oct(self._get_key(file_metadata, "Mode")),
"owner": self._get_key(file_metadata, "UserID"),
"size": self._get_key(file_metadata, "Size"),
"type": "file" if file_data["flags"] == 1 else "directory",
})
except Exception:
self.log.exception("Error reading manifest file metadata for file with ID %s "

View File

@ -0,0 +1,3 @@
from .attachment_deletion import PostAttachmentDeletion
POST_ANALYSIS_MODULES = [PostAttachmentDeletion]

View File

@ -0,0 +1,101 @@
import logging
import datetime
from typing import Optional
from mvt.common.module import PostAnalysisModule
class PostAttachmentDeletion(PostAnalysisModule):
"""
Heuristic detection for attachment deletion in a cert time period.
This module implements a hueuristic detection for a multiple iOS SMS attachmemt being deleted
in a short period of time. This is a similar concept to the following script used
by Kaspersky Labs to detect infections with the Triangulation iOS malware:
https://github.com/KasperskyLab/triangle_check/blob/main/triangle_check/__init__.py
"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.required_modules = ["manifest"]
def load_locationd_events(self):
locationd_clients = self.results["locationd_clients"]
locations_stopped_event = [event for event in locationd_clients if "LocationTimeStopped" in event]
return locations_stopped_event
def run(self) -> None:
"""
Run the post-processing module.
The logical is to look for all SMS attachment directories which were recently created
shortly before their last modified time, but which have no contained files.
"""
for module in self.required_modules:
if module not in self.results:
raise Exception(f"Required module {module} was not found in results. Did you run the required modules?")
locationd_events = []
locationd_client_iocs = [
"com.apple.locationd.bundle-/System/Library/LocationBundles/IonosphereHarvest.bundle",
"com.apple.locationd.bundle-/System/Library/LocationBundles/WRMLinkSelection.bundle"
]
for event in self.load_locationd_events():
for ioc in locationd_client_iocs:
if ioc in event["Registered"]:
locationd_events.append(event)
print(event)
# Filter the relevant events from the manifest:
events_by_time = {}
sms_files = [event for event in self.results["manifest"] if event["relative_path"].startswith("Library/SMS/Attachments/")]
attachment_folders = {}
for record in sorted(sms_files, key=lambda x: x["relative_path"]):
num_path_segments = record["relative_path"].count('/')
# Skip entries with a full-path
# if not (num_path_segments == 3 or num_path_segments == 4):
# continue
attachment_root = "/".join(record["relative_path"].split('/', 5)[:5])
attachment_folder = attachment_folders.get(attachment_root, [])
attachment_folder.append(record)
attachment_folders[attachment_root] = attachment_folder
# Look for directories containing no files, which had a short lifespan
for key, items in attachment_folders.items():
has_files = any([item["flags"] == 1 for item in items])
if has_files:
continue
for item in sorted(items, key=lambda x: x["created"]):
# item_created = datetime.datetime.strptime(item["created"], "%Y-%m-%d %H:%M:%S.%f")
item_modified = datetime.datetime.strptime(item["modified"], "%Y-%m-%d %H:%M:%S.%f") # M
status_changed = datetime.datetime.strptime(item["status_changed"], "%Y-%m-%d %H:%M:%S.%f") # C
# self.append_timeline(fs_stat['LastModified'], ('M', relativePath))
# self.append_timeline(fs_stat['LastStatusChange'], ('C', relativePath))
# self.append_timeline(fs_stat['Birth'], ('B', relativePath))
# Skip items which were created and modified at the same time, likely never had files.
# print(item["relative_path"], status_changed, item_modified)
if item_modified == status_changed:
print("changed == modified", item["relative_path"], status_changed, item_modified)
continue
if (item_modified - status_changed): # < datetime.timedelta(minutes=10):
self.log.info(f"Possible attachment deletion. Attachment folder '{key}' with no files, created and modified within 10 minutes. '{item['relative_path']}' created {item_created}, modified {item_modified})")