mirror of
https://github.com/mvt-project/mvt.git
synced 2024-06-16 01:34:03 +00:00
Small code clean-ups
This commit is contained in:
parent
67eea3edec
commit
c8185fdbd8
|
@ -3,7 +3,6 @@
|
||||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||||
# https://license.mvt.re/1.1/
|
# https://license.mvt.re/1.1/
|
||||||
|
|
||||||
import io
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
@ -42,9 +41,8 @@ class DumpsysAccessibility(AndroidExtraction):
|
||||||
log.info("Found installed accessibility service \"%s\"", service)
|
log.info("Found installed accessibility service \"%s\"", service)
|
||||||
|
|
||||||
if self.output_folder:
|
if self.output_folder:
|
||||||
acc_path = os.path.join(self.output_folder,
|
acc_path = os.path.join(self.output_folder, "dumpsys_accessibility.txt")
|
||||||
"dumpsys_accessibility.txt")
|
with open(acc_path, "w", encoding="utf-8") as handle:
|
||||||
with io.open(acc_path, "w", encoding="utf-8") as handle:
|
|
||||||
handle.write(stats)
|
handle.write(stats)
|
||||||
|
|
||||||
log.info("Records from dumpsys accessibility stored at %s",
|
log.info("Records from dumpsys accessibility stored at %s",
|
||||||
|
|
|
@ -4,7 +4,6 @@
|
||||||
# https://license.mvt.re/1.1/
|
# https://license.mvt.re/1.1/
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from .base import AndroidExtraction
|
from .base import AndroidExtraction
|
||||||
|
|
|
@ -4,8 +4,6 @@
|
||||||
# https://license.mvt.re/1.1/
|
# https://license.mvt.re/1.1/
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import re
|
|
||||||
|
|
||||||
from .base import AndroidExtraction
|
from .base import AndroidExtraction
|
||||||
|
|
||||||
|
@ -60,6 +58,7 @@ ANDROID_DANGEROUS_SETTINGS = [
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class Settings(AndroidExtraction):
|
class Settings(AndroidExtraction):
|
||||||
"""This module extracts Android system settings."""
|
"""This module extracts Android system settings."""
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,6 @@
|
||||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||||
# https://license.mvt.re/1.1/
|
# https://license.mvt.re/1.1/
|
||||||
|
|
||||||
import io
|
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
@ -436,7 +435,7 @@ def download_indicators_files(log):
|
||||||
ioc_path = os.path.join(data_dir, clean_file_name)
|
ioc_path = os.path.join(data_dir, clean_file_name)
|
||||||
|
|
||||||
# Write file to disk. This will overwrite any older version of the STIX2 file.
|
# Write file to disk. This will overwrite any older version of the STIX2 file.
|
||||||
with io.open(ioc_path, "w") as f:
|
with open(ioc_path, "w", encoding="utf-8") as f:
|
||||||
f.write(res.text)
|
f.write(res.text)
|
||||||
|
|
||||||
log.info("Saved indicator file to '%s'", os.path.basename(ioc_path))
|
log.info("Saved indicator file to '%s'", os.path.basename(ioc_path))
|
||||||
|
|
|
@ -4,7 +4,6 @@
|
||||||
# https://license.mvt.re/1.1/
|
# https://license.mvt.re/1.1/
|
||||||
|
|
||||||
import csv
|
import csv
|
||||||
import io
|
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
@ -91,7 +90,7 @@ class MVTModule(object):
|
||||||
if self.results:
|
if self.results:
|
||||||
results_file_name = f"{name}.json"
|
results_file_name = f"{name}.json"
|
||||||
results_json_path = os.path.join(self.output_folder, results_file_name)
|
results_json_path = os.path.join(self.output_folder, results_file_name)
|
||||||
with io.open(results_json_path, "w", encoding="utf-8") as handle:
|
with open(results_json_path, "w", encoding="utf-8") as handle:
|
||||||
try:
|
try:
|
||||||
json.dump(self.results, handle, indent=4, default=str)
|
json.dump(self.results, handle, indent=4, default=str)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -101,7 +100,7 @@ class MVTModule(object):
|
||||||
if self.detected:
|
if self.detected:
|
||||||
detected_file_name = f"{name}_detected.json"
|
detected_file_name = f"{name}_detected.json"
|
||||||
detected_json_path = os.path.join(self.output_folder, detected_file_name)
|
detected_json_path = os.path.join(self.output_folder, detected_file_name)
|
||||||
with io.open(detected_json_path, "w", encoding="utf-8") as handle:
|
with open(detected_json_path, "w", encoding="utf-8") as handle:
|
||||||
json.dump(self.detected, handle, indent=4, default=str)
|
json.dump(self.detected, handle, indent=4, default=str)
|
||||||
|
|
||||||
def serialize(self, record):
|
def serialize(self, record):
|
||||||
|
@ -192,7 +191,7 @@ def save_timeline(timeline, timeline_path):
|
||||||
:param timeline_path: Path to the csv file to store the timeline to
|
:param timeline_path: Path to the csv file to store the timeline to
|
||||||
|
|
||||||
"""
|
"""
|
||||||
with io.open(timeline_path, "a+", encoding="utf-8") as handle:
|
with open(timeline_path, "a+", encoding="utf-8") as handle:
|
||||||
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"")
|
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"")
|
||||||
csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"])
|
csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"])
|
||||||
for event in sorted(timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""):
|
for event in sorted(timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""):
|
||||||
|
|
|
@ -184,7 +184,7 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
|
||||||
|
|
||||||
if len(timeline_detected) > 0:
|
if len(timeline_detected) > 0:
|
||||||
log.warning("The analysis of the backup produced %d detections!",
|
log.warning("The analysis of the backup produced %d detections!",
|
||||||
len(timeline_detected))
|
len(timeline_detected))
|
||||||
|
|
||||||
|
|
||||||
#==============================================================================
|
#==============================================================================
|
||||||
|
@ -247,6 +247,7 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
|
||||||
log.warning("The analysis of the filesystem produced %d detections!",
|
log.warning("The analysis of the filesystem produced %d detections!",
|
||||||
len(timeline_detected))
|
len(timeline_detected))
|
||||||
|
|
||||||
|
|
||||||
#==============================================================================
|
#==============================================================================
|
||||||
# Command: check-iocs
|
# Command: check-iocs
|
||||||
#==============================================================================
|
#==============================================================================
|
||||||
|
|
|
@ -97,7 +97,6 @@ class TCC(IOSExtraction):
|
||||||
FROM access;""")
|
FROM access;""")
|
||||||
db_version = "v1"
|
db_version = "v1"
|
||||||
|
|
||||||
|
|
||||||
for row in cur:
|
for row in cur:
|
||||||
service = row[0]
|
service = row[0]
|
||||||
client = row[1]
|
client = row[1]
|
||||||
|
@ -113,7 +112,7 @@ class TCC(IOSExtraction):
|
||||||
if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]:
|
if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]:
|
||||||
device = "microphone" if service == "kTCCServiceMicrophone" else "camera"
|
device = "microphone" if service == "kTCCServiceMicrophone" else "camera"
|
||||||
self.log.info("Found client \"%s\" with access %s to %s on %s by %s",
|
self.log.info("Found client \"%s\" with access %s to %s on %s by %s",
|
||||||
client, auth_value_desc, device, last_modified, auth_reason_desc)
|
client, auth_value_desc, device, last_modified, auth_reason_desc)
|
||||||
|
|
||||||
self.results.append({
|
self.results.append({
|
||||||
"service": service,
|
"service": service,
|
||||||
|
@ -132,7 +131,7 @@ class TCC(IOSExtraction):
|
||||||
if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]:
|
if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]:
|
||||||
device = "microphone" if service == "kTCCServiceMicrophone" else "camera"
|
device = "microphone" if service == "kTCCServiceMicrophone" else "camera"
|
||||||
self.log.info("Found client \"%s\" with access %s to %s at %s",
|
self.log.info("Found client \"%s\" with access %s to %s at %s",
|
||||||
client, allowed_desc, device, last_modified)
|
client, allowed_desc, device, last_modified)
|
||||||
self.results.append({
|
self.results.append({
|
||||||
"service": service,
|
"service": service,
|
||||||
"client": client,
|
"client": client,
|
||||||
|
@ -145,7 +144,7 @@ class TCC(IOSExtraction):
|
||||||
if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]:
|
if service in ["kTCCServiceMicrophone", "kTCCServiceCamera"]:
|
||||||
device = "microphone" if service == "kTCCServiceMicrophone" else "camera"
|
device = "microphone" if service == "kTCCServiceMicrophone" else "camera"
|
||||||
self.log.info("Found client \"%s\" with access %s to %s",
|
self.log.info("Found client \"%s\" with access %s to %s",
|
||||||
client, allowed_desc, device)
|
client, allowed_desc, device)
|
||||||
self.results.append({
|
self.results.append({
|
||||||
"service": service,
|
"service": service,
|
||||||
"client": client,
|
"client": client,
|
||||||
|
|
|
@ -17,7 +17,7 @@ class TestSafariBrowserStateModule:
|
||||||
m = SafariBrowserState(base_folder=get_backup_folder(), log=logging, results=[])
|
m = SafariBrowserState(base_folder=get_backup_folder(), log=logging, results=[])
|
||||||
m.is_backup = True
|
m.is_backup = True
|
||||||
run_module(m)
|
run_module(m)
|
||||||
assert m.file_path != None
|
assert m.file_path is not None
|
||||||
assert len(m.results) == 1
|
assert len(m.results) == 1
|
||||||
assert len(m.timeline) == 1
|
assert len(m.timeline) == 1
|
||||||
assert len(m.detected) == 0
|
assert len(m.detected) == 0
|
||||||
|
|
Loading…
Reference in New Issue
Block a user