2021-07-16 06:05:01 +00:00
|
|
|
# Mobile Verification Toolkit (MVT)
|
2021-08-01 19:11:08 +00:00
|
|
|
# Copyright (c) 2021 The MVT Project Authors.
|
|
|
|
# Use of this software is governed by the MVT License 1.1 that can be found at
|
|
|
|
# https://license.mvt.re/1.1/
|
2021-07-16 06:05:01 +00:00
|
|
|
|
|
|
|
import csv
|
2021-07-30 09:40:09 +00:00
|
|
|
import io
|
|
|
|
import os
|
|
|
|
import re
|
|
|
|
|
2021-07-16 06:05:01 +00:00
|
|
|
import simplejson as json
|
|
|
|
|
|
|
|
from .indicators import Indicators
|
|
|
|
|
2021-07-30 09:40:09 +00:00
|
|
|
|
2021-07-25 09:47:05 +00:00
|
|
|
class DatabaseNotFoundError(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
class DatabaseCorruptedError(Exception):
|
|
|
|
pass
|
|
|
|
|
2021-08-05 06:49:34 +00:00
|
|
|
class InsufficientPrivileges(Exception):
|
|
|
|
pass
|
|
|
|
|
2021-07-16 06:05:01 +00:00
|
|
|
class MVTModule(object):
|
|
|
|
"""This class provides a base for all extraction modules."""
|
|
|
|
|
|
|
|
enabled = True
|
|
|
|
slug = None
|
|
|
|
|
|
|
|
def __init__(self, file_path=None, base_folder=None, output_folder=None,
|
2021-08-12 16:33:58 +00:00
|
|
|
fast_mode=False, log=None, results=[]):
|
2021-07-16 06:05:01 +00:00
|
|
|
"""Initialize module.
|
|
|
|
:param file_path: Path to the module's database file, if there is any.
|
|
|
|
:param base_folder: Path to the base folder (backup or filesystem dump)
|
|
|
|
:param output_folder: Folder where results will be stored
|
|
|
|
:param fast_mode: Flag to enable or disable slow modules
|
|
|
|
:param log: Handle to logger
|
|
|
|
:param results: Provided list of results entries
|
|
|
|
"""
|
|
|
|
self.file_path = file_path
|
|
|
|
self.base_folder = base_folder
|
|
|
|
self.output_folder = output_folder
|
|
|
|
self.fast_mode = fast_mode
|
|
|
|
self.log = log
|
|
|
|
self.indicators = None
|
|
|
|
self.results = results
|
|
|
|
self.detected = []
|
|
|
|
self.timeline = []
|
|
|
|
self.timeline_detected = []
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def from_json(cls, json_path, log=None):
|
|
|
|
with open(json_path, "r") as handle:
|
|
|
|
results = json.load(handle)
|
|
|
|
if log:
|
|
|
|
log.info("Loaded %d results from \"%s\"",
|
|
|
|
len(results), json_path)
|
|
|
|
return cls(results=results, log=log)
|
|
|
|
|
|
|
|
def get_slug(self):
|
|
|
|
if self.slug:
|
|
|
|
return self.slug
|
|
|
|
|
|
|
|
sub = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", self.__class__.__name__)
|
|
|
|
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower()
|
|
|
|
|
|
|
|
def load_indicators(self, file_path):
|
|
|
|
self.indicators = Indicators(file_path, self.log)
|
|
|
|
|
|
|
|
def check_indicators(self):
|
2021-07-22 21:21:31 +00:00
|
|
|
"""Check the results of this module against a provided list of
|
2021-07-16 06:05:01 +00:00
|
|
|
indicators."""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
def save_to_json(self):
|
|
|
|
"""Save the collected results to a json file.
|
|
|
|
"""
|
|
|
|
if not self.output_folder:
|
|
|
|
return
|
|
|
|
|
|
|
|
name = self.get_slug()
|
|
|
|
|
|
|
|
if self.results:
|
|
|
|
results_file_name = f"{name}.json"
|
|
|
|
results_json_path = os.path.join(self.output_folder, results_file_name)
|
2021-08-15 16:53:02 +00:00
|
|
|
with io.open(results_json_path, "w", encoding="utf-8") as handle:
|
2021-07-25 12:13:10 +00:00
|
|
|
try:
|
2021-08-15 16:53:02 +00:00
|
|
|
json.dump(self.results, handle, indent=4, default=str)
|
2021-07-25 12:13:10 +00:00
|
|
|
except Exception as e:
|
2021-07-25 13:51:24 +00:00
|
|
|
self.log.error("Unable to store results of module %s to file %s: %s",
|
|
|
|
self.__class__.__name__, results_file_name, e)
|
2021-07-16 06:05:01 +00:00
|
|
|
|
|
|
|
if self.detected:
|
|
|
|
detected_file_name = f"{name}_detected.json"
|
|
|
|
detected_json_path = os.path.join(self.output_folder, detected_file_name)
|
2021-08-15 16:53:02 +00:00
|
|
|
with io.open(detected_json_path, "w", encoding="utf-8") as handle:
|
|
|
|
json.dump(self.detected, handle, indent=4, default=str)
|
2021-07-16 06:05:01 +00:00
|
|
|
|
|
|
|
def serialize(self, record):
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
def to_timeline(self):
|
|
|
|
"""Convert results into a timeline.
|
|
|
|
"""
|
|
|
|
for result in self.results:
|
|
|
|
record = self.serialize(result)
|
2021-07-26 23:23:22 +00:00
|
|
|
if record:
|
|
|
|
if type(record) == list:
|
|
|
|
self.timeline.extend(record)
|
|
|
|
else:
|
|
|
|
self.timeline.append(record)
|
2021-07-16 06:05:01 +00:00
|
|
|
|
|
|
|
for detected in self.detected:
|
|
|
|
record = self.serialize(detected)
|
2021-07-26 23:23:22 +00:00
|
|
|
if record:
|
|
|
|
if type(record) == list:
|
|
|
|
self.timeline_detected.extend(record)
|
|
|
|
else:
|
|
|
|
self.timeline_detected.append(record)
|
2021-07-16 06:05:01 +00:00
|
|
|
|
2021-07-29 11:46:58 +00:00
|
|
|
# De-duplicate timeline entries.
|
2021-07-16 06:05:01 +00:00
|
|
|
self.timeline = self.timeline_deduplicate(self.timeline)
|
|
|
|
self.timeline_detected = self.timeline_deduplicate(self.timeline_detected)
|
|
|
|
|
|
|
|
def timeline_deduplicate(self, timeline):
|
|
|
|
"""Serialize entry as JSON to deduplicate repeated entries"""
|
|
|
|
timeline_set = set()
|
|
|
|
for record in timeline:
|
|
|
|
timeline_set.add(json.dumps(record, sort_keys=True))
|
|
|
|
return [json.loads(record) for record in timeline_set]
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
"""Run the main module procedure.
|
|
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
|
|
|
def run_module(module):
|
|
|
|
module.log.info("Running module %s...", module.__class__.__name__)
|
|
|
|
|
|
|
|
try:
|
|
|
|
module.run()
|
|
|
|
except NotImplementedError:
|
|
|
|
module.log.exception("The run() procedure of module %s was not implemented yet!",
|
|
|
|
module.__class__.__name__)
|
2021-08-05 06:49:34 +00:00
|
|
|
except InsufficientPrivileges as e:
|
2021-08-06 16:40:06 +00:00
|
|
|
module.log.info("Insufficient privileges for module %s: %s", module.__class__.__name__, e)
|
2021-07-25 09:47:05 +00:00
|
|
|
except DatabaseNotFoundError as e:
|
2021-07-25 08:46:10 +00:00
|
|
|
module.log.info("There might be no data to extract by module %s: %s",
|
|
|
|
module.__class__.__name__, e)
|
2021-07-25 09:47:05 +00:00
|
|
|
except DatabaseCorruptedError as e:
|
|
|
|
module.log.error("The %s module database seems to be corrupted and recovery failed: %s",
|
|
|
|
module.__class__.__name__, e)
|
2021-07-16 06:05:01 +00:00
|
|
|
except Exception as e:
|
|
|
|
module.log.exception("Error in running extraction from module %s: %s",
|
|
|
|
module.__class__.__name__, e)
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
module.check_indicators()
|
|
|
|
except NotImplementedError:
|
2021-08-12 10:56:12 +00:00
|
|
|
module.log.info("The %s module does not support checking for indicators",
|
|
|
|
module.__class__.__name__)
|
2021-07-16 06:05:01 +00:00
|
|
|
pass
|
2021-08-12 10:56:12 +00:00
|
|
|
else:
|
|
|
|
if module.indicators and not module.detected:
|
|
|
|
module.log.info("The %s module produced no detections!",
|
|
|
|
module.__class__.__name__)
|
2021-07-16 06:05:01 +00:00
|
|
|
|
|
|
|
try:
|
|
|
|
module.to_timeline()
|
|
|
|
except NotImplementedError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
module.save_to_json()
|
|
|
|
|
|
|
|
|
|
|
|
def save_timeline(timeline, timeline_path):
|
|
|
|
"""Save the timeline in a csv file.
|
|
|
|
:param timeline: List of records to order and store.
|
|
|
|
:param timeline_path: Path to the csv file to store the timeline to.
|
|
|
|
"""
|
2021-07-21 09:53:41 +00:00
|
|
|
with io.open(timeline_path, "a+", encoding="utf-8") as handle:
|
2021-07-16 06:05:01 +00:00
|
|
|
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"")
|
|
|
|
csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"])
|
|
|
|
for event in sorted(timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""):
|
|
|
|
csvoutput.writerow([
|
2021-08-15 16:53:02 +00:00
|
|
|
event.get("timestamp"),
|
|
|
|
event.get("module"),
|
|
|
|
event.get("event"),
|
|
|
|
event.get("data"),
|
2021-07-16 06:05:01 +00:00
|
|
|
])
|