Enforcing line lenght

This commit is contained in:
Nex 2022-08-13 17:50:00 +02:00
parent 2781f33fb5
commit 00d82f7f00
4 changed files with 40 additions and 34 deletions

View File

@ -59,7 +59,8 @@ class Command:
file_handler = logging.FileHandler(os.path.join(self.results_path, file_handler = logging.FileHandler(os.path.join(self.results_path,
"command.log")) "command.log"))
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") formatter = logging.Formatter("%(asctime)s - %(name)s - "
"%(levelname)s - %(message)s")
file_handler.setLevel(logging.DEBUG) file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter) file_handler.setFormatter(formatter)
logger.addHandler(file_handler) logger.addHandler(file_handler)
@ -120,12 +121,12 @@ class Command:
with open(file_path, "rb") as handle: with open(file_path, "rb") as handle:
sha256.update(handle.read()) sha256.update(handle.read())
except FileNotFoundError: except FileNotFoundError:
self.log.error("Failed to hash the file %s: might be a symlink", self.log.error("Failed to hash the file %s: might "
file_path) "be a symlink", file_path)
continue continue
except PermissionError: except PermissionError:
self.log.error("Failed to hash the file %s: permission denied", self.log.error("Failed to hash the file %s: "
file_path) "permission denied", file_path)
continue continue
info["hashes"].append({ info["hashes"].append({

View File

@ -47,8 +47,8 @@ class Indicators:
if os.path.isfile(path): if os.path.isfile(path):
self.parse_stix2(path) self.parse_stix2(path)
else: else:
self.log.error("Path specified with env MVT_STIX2 is not a valid file: %s", self.log.error("Path specified with env MVT_STIX2 is not "
path) "a valid file: %s", path)
def _new_collection(self, cid: str = "", name: str = "", def _new_collection(self, cid: str = "", name: str = "",
description: str = "", file_name: str = "", description: str = "", file_name: str = "",
@ -130,7 +130,8 @@ class Indicators:
data = json.load(handle) data = json.load(handle)
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
self.log.critical("Unable to parse STIX2 indicator file. " self.log.critical("Unable to parse STIX2 indicator file. "
"The file is corrupted or in the wrong format!") "The file is corrupted or in the wrong "
"format!")
return return
malware = {} malware = {}
@ -264,14 +265,16 @@ class Indicators:
# If nothing matched, we can quit here. # If nothing matched, we can quit here.
return None return None
# If all parsing worked, we start walking through available domain indicators. # If all parsing worked, we start walking through available domain
# indicators.
for ioc in self.get_iocs("domains"): for ioc in self.get_iocs("domains"):
# First we check the full domain. # First we check the full domain.
if final_url.domain.lower() == ioc["value"]: if final_url.domain.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url: if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning("Found a known suspicious domain %s shortened as %s matching " self.log.warning("Found a known suspicious domain %s "
"indicators from \"%s\"", "shortened as %s matching indicators "
final_url.url, orig_url.url, ioc["name"]) "from \"%s\"", final_url.url, orig_url.url,
ioc["name"])
else: else:
self.log.warning("Found a known suspicious domain %s " self.log.warning("Found a known suspicious domain %s "
"matching indicators from \"%s\"", "matching indicators from \"%s\"",
@ -282,12 +285,13 @@ class Indicators:
# Then we just check the top level domain. # Then we just check the top level domain.
if final_url.top_level.lower() == ioc["value"]: if final_url.top_level.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url: if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning("Found a sub-domain with suspicious top level %s shortened " self.log.warning("Found a sub-domain with suspicious top "
"as %s matching indicators from \"%s\"", "level %s shortened as %s matching "
final_url.url, orig_url.url, ioc["name"]) "indicators from \"%s\"", final_url.url,
orig_url.url, ioc["name"])
else: else:
self.log.warning("Found a sub-domain with a suspicious top level %s matching " self.log.warning("Found a sub-domain with a suspicious top "
"indicators from \"%s\"", "level %s matching indicators from \"%s\"",
final_url.url, ioc["name"]) final_url.url, ioc["name"])
return ioc return ioc
@ -334,9 +338,9 @@ class Indicators:
if len(proc_name) == 16: if len(proc_name) == 16:
if ioc["value"].startswith(proc_name): if ioc["value"].startswith(proc_name):
self.log.warning("Found a truncated known suspicious process name \"%s\" " self.log.warning("Found a truncated known suspicious "
"matching indicators from \"%s\"", "process name \"%s\" matching indicators "
process, ioc["name"]) "from \"%s\"", process, ioc["name"])
return ioc return ioc
return None return None
@ -464,8 +468,8 @@ class Indicators:
for ioc in self.get_iocs("files_sha256"): for ioc in self.get_iocs("files_sha256"):
if file_hash.lower() == ioc["value"].lower(): if file_hash.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious file with hash \"%s\" matching " self.log.warning("Found a known suspicious file with hash "
"indicators from \"%s\"", "\"%s\" matching indicators from \"%s\"",
file_hash, ioc["name"]) file_hash, ioc["name"])
return ioc return ioc
@ -485,9 +489,9 @@ class Indicators:
for ioc in self.get_iocs("app_ids"): for ioc in self.get_iocs("app_ids"):
if app_id.lower() == ioc["value"].lower(): if app_id.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious app with ID \"%s\" matching " self.log.warning("Found a known suspicious app with ID \"%s\" "
"indicators from \"%s\"", "matching indicators from \"%s\"", app_id,
app_id, ioc["name"]) ioc["name"])
return ioc return ioc
return None return None

View File

@ -158,8 +158,8 @@ def run_module(module: Callable) -> None:
try: try:
module.run() module.run()
except NotImplementedError: except NotImplementedError:
module.log.exception("The run() procedure of module %s was not implemented yet!", module.log.exception("The run() procedure of module %s was not "
module.__class__.__name__) "implemented yet!", module.__class__.__name__)
except InsufficientPrivileges as exc: except InsufficientPrivileges as exc:
module.log.info("Insufficient privileges for module %s: %s", module.log.info("Insufficient privileges for module %s: %s",
module.__class__.__name__, exc) module.__class__.__name__, exc)
@ -176,8 +176,8 @@ def run_module(module: Callable) -> None:
try: try:
module.check_indicators() module.check_indicators()
except NotImplementedError: except NotImplementedError:
module.log.info("The %s module does not support checking for indicators", module.log.info("The %s module does not support checking for "
module.__class__.__name__) "indicators", module.__class__.__name__)
else: else:
if module.indicators and not module.detected: if module.indicators and not module.detected:
module.log.info("The %s module produced no detections!", module.log.info("The %s module produced no detections!",

View File

@ -88,8 +88,8 @@ class IndicatorsUpdates:
self.index_branch, self.index_path) self.index_branch, self.index_path)
res = requests.get(url) res = requests.get(url)
if res.status_code != 200: if res.status_code != 200:
log.error("Failed to retrieve indicators index located at %s (error %d)", log.error("Failed to retrieve indicators index located at %s "
url, res.status_code) "(error %d)", url, res.status_code)
return None return None
return yaml.safe_load(res.content) return yaml.safe_load(res.content)
@ -131,8 +131,8 @@ class IndicatorsUpdates:
ioc_url = ioc.get("download_url", "") ioc_url = ioc.get("download_url", "")
if not ioc_url: if not ioc_url:
log.error("Could not find a way to download indicator file for %s", log.error("Could not find a way to download indicator file "
ioc.get("name")) "for %s", ioc.get("name"))
continue continue
ioc_local_path = self.download_remote_ioc(ioc_url) ioc_local_path = self.download_remote_ioc(ioc_url)
@ -162,7 +162,8 @@ class IndicatorsUpdates:
latest_commit = details[0] latest_commit = details[0]
latest_commit_date = latest_commit.get("commit", {}).get("author", {}).get("date", None) latest_commit_date = latest_commit.get("commit", {}).get("author", {}).get("date", None)
if not latest_commit_date: if not latest_commit_date:
log.error("Failed to retrieve date of latest update to indicators index file") log.error("Failed to retrieve date of latest update to indicators "
"index file")
return -1 return -1
latest_commit_dt = datetime.strptime(latest_commit_date, latest_commit_dt = datetime.strptime(latest_commit_date,