Storing information about analysis in info.json (closes: #274)

This commit is contained in:
Nex 2022-06-17 17:48:07 +02:00
parent 1643454190
commit b77749e6ba
1 changed files with 58 additions and 11 deletions

View File

@ -3,12 +3,17 @@
# Use of this software is governed by the MVT License 1.1 that can be found at # Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/ # https://license.mvt.re/1.1/
import hashlib
import json
import logging import logging
import os import os
import sys import sys
from datetime import datetime
from mvt.common.indicators import Indicators from mvt.common.indicators import Indicators
from mvt.common.module import run_module, save_timeline from mvt.common.module import run_module, save_timeline
from mvt.common.utils import convert_timestamp_to_iso
from mvt.common.version import MVT_VERSION
class Command(object): class Command(object):
@ -32,11 +37,6 @@ class Command(object):
self.timeline = [] self.timeline = []
self.timeline_detected = [] self.timeline_detected = []
def list_modules(self):
self.log.info("Following is the list of available %s modules:", self.name)
for module in self.modules:
self.log.info(" - %s", module.__name__)
def _create_storage(self): def _create_storage(self):
if self.results_path and not os.path.exists(self.results_path): if self.results_path and not os.path.exists(self.results_path):
try: try:
@ -46,6 +46,16 @@ class Command(object):
self.results_path, e) self.results_path, e)
sys.exit(1) sys.exit(1)
def _add_log_file_handler(self, logger):
if not self.results_path:
return
fh = logging.FileHandler(os.path.join(self.results_path, "command.log"))
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
logger.addHandler(fh)
def _store_timeline(self): def _store_timeline(self):
if not self.results_path: if not self.results_path:
return return
@ -58,15 +68,51 @@ class Command(object):
save_timeline(self.timeline_detected, save_timeline(self.timeline_detected,
os.path.join(self.results_path, "timeline_detected.csv")) os.path.join(self.results_path, "timeline_detected.csv"))
def _add_log_file_handler(self, logger): def _store_info(self):
if not self.results_path: if not self.results_path:
return return
fh = logging.FileHandler(os.path.join(self.results_path, "command.log")) info = {
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") "target_path": os.path.abspath(self.target_path),
fh.setLevel(logging.DEBUG) "mvt_version": MVT_VERSION,
fh.setFormatter(formatter) "date": convert_timestamp_to_iso(datetime.now()),
logger.addHandler(fh) "ioc_files": [],
"hashes": [],
}
for coll in self.iocs.ioc_collections:
info["ioc_files"].append(coll.get("stix2_file_path", ""))
if os.path.isfile(self.target_path):
h = hashlib.sha256()
with open(self.target_path, "rb") as handle:
h.update(handle.read())
info["hashes"].append({
"file_path": self.target_path,
"sha256": h.hexdigest(),
})
elif os.path.isdir(self.target_path):
for (root, dirs, files) in os.walk(self.target_path):
for file in files:
file_path = os.path.join(root, file)
h = hashlib.sha256()
with open(file_path, "rb") as handle:
h.update(handle.read())
info["hashes"].append({
"file_path": file_path,
"sha256": h.hexdigest(),
})
with open(os.path.join(self.results_path, "info.json"), "w+") as handle:
json.dump(info, handle, indent=4)
def list_modules(self):
self.log.info("Following is the list of available %s modules:", self.name)
for module in self.modules:
self.log.info(" - %s", module.__name__)
def init(self): def init(self):
raise NotImplementedError raise NotImplementedError
@ -113,3 +159,4 @@ class Command(object):
self.timeline_detected.extend(m.timeline_detected) self.timeline_detected.extend(m.timeline_detected)
self._store_timeline() self._store_timeline()
self._store_info()