From bdad23feee98372cdec44885b6ed301547c62261 Mon Sep 17 00:00:00 2001 From: Nex Date: Wed, 2 Feb 2022 14:53:26 +0100 Subject: [PATCH] Refactored indicators to support multiple malware/collections per stix2 file --- mvt/common/indicators.py | 180 ++++++++++++++++++++++++--------------- 1 file changed, 110 insertions(+), 70 deletions(-) diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 3bda7e2..541ecfe 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -20,7 +20,7 @@ class Indicators: def __init__(self, log=None): self.data_dir = user_data_dir("mvt") self.log = log - self.ioc_files = [] + self.ioc_collections = [] self.total_ioc_count = 0 def _load_downloaded_indicators(self): @@ -33,7 +33,7 @@ class Indicators: def _check_stix2_env_variable(self): """ - Checks if a variable MVT_STIX2 contains path to STIX Files. + Checks if a variable MVT_STIX2 contains path to a STIX files. """ if "MVT_STIX2" not in os.environ: return @@ -43,14 +43,17 @@ class Indicators: if os.path.isfile(path): self.parse_stix2(path) else: - self.log.info("Invalid STIX2 path %s in MVT_STIX2 environment variable", path) + self.log.error("Path specified with env MVT_STIX2 is not a valid file: %s", + path) - def _generate_indicators_file(self): + def _new_collection(self, cid="", name="", description="", file_name="", + file_path=""): return { - "name": "", - "description": "", - "file_name": "", - "file_path": "", + "id": cid, + "name": name, + "description": description, + "stix2_file_name": file_name, + "stix2_file_path": file_path, "domains": [], "processes": [], "emails": [], @@ -62,10 +65,11 @@ class Indicators: "count": 0, } - def _add_indicator(self, ioc, ioc_file, iocs_list): - if ioc not in iocs_list: - iocs_list.append(ioc) - ioc_file["count"] += 1 + def _add_indicator(self, ioc, ioc_coll, ioc_coll_list): + ioc = ioc.strip("'") + if ioc not in ioc_coll_list: + ioc_coll_list.append(ioc) + ioc_coll["count"] += 1 self.total_ioc_count += 1 def parse_stix2(self, file_path): @@ -77,69 +81,104 @@ class Indicators: """ self.log.info("Parsing STIX2 indicators file at path %s", file_path) - ioc_file = self._generate_indicators_file() - ioc_file["file_path"] = file_path - ioc_file["file_name"] = os.path.basename(file_path) - with open(file_path, "r", encoding="utf-8") as handle: try: data = json.load(handle) except json.decoder.JSONDecodeError: - self.log.critical("Unable to parse STIX2 indicator file. The file is malformed or in the wrong format.") + self.log.critical("Unable to parse STIX2 indicator file. " \ + "The file is corrupted or in the wrong format!") return + malware = {} + indicators = [] + relationships = [] for entry in data.get("objects", []): entry_type = entry.get("type", "") if entry_type == "malware": - ioc_file["name"] = entry.get("name", "") or ioc_file["file_name"] - ioc_file["description"] = entry.get("description", "") or ioc_file["file_name"] - continue + malware[entry["id"]] = { + "name": entry["name"], + "description": entry["description"], + } + elif entry_type == "indicator": + indicators.append(entry) + elif entry_type == "relationship": + relationships.append(entry) - if entry_type != "indicator": - continue + collections = [] + for mal_id, mal_values in malware.items(): + collection = self._new_collection(mal_id, mal_values.get("name"), + mal_values.get("description"), + os.path.basename(file_path), + file_path) + collections.append(collection) - key, value = entry.get("pattern", "").strip("[]").split("=") - value = value.strip("'") + # We loop through all indicators. + for indicator in indicators: + malware_id = None + malware_name = None + malware_description = None - if key == "domain-name:value": - # We force domain names to lower case. - self._add_indicator(ioc=value.lower(), - ioc_file=ioc_file, - iocs_list=ioc_file["domains"]) - elif key == "process:name": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["processes"]) - elif key == "email-addr:value": - # We force email addresses to lower case. - self._add_indicator(ioc=value.lower(), - ioc_file=ioc_file, - iocs_list=ioc_file["emails"]) - elif key == "file:name": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["file_names"]) - elif key == "file:path": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["file_paths"]) - elif key == "file:hashes.sha256": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["files_sha256"]) - elif key == "app:id": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["app_ids"]) - elif key == "configuration-profile:id": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["ios_profile_ids"]) + # We loop through all relationships and find the one pertinent to + # the current indicator. + for relationship in relationships: + if relationship["source_ref"] != indicator["id"]: + continue - self.log.info("Loaded %d indicators from \"%s\" indicators file", - ioc_file["count"], ioc_file["name"]) + # Look for a malware definition with the correct identifier. + if relationship["target_ref"] in malware.keys(): + malware_id = relationship["target_ref"] + malware_name = malware[relationship["target_ref"]].get("name", "") + malware_description = malware[relationship["target_ref"]].get("description", "") + break - self.ioc_files.append(ioc_file) + # Now we look for the correct collection matching the malware ID we + # got from the relationship. + for collection in collections: + if collection["id"] != malware_id: + continue + + key, value = indicator.get("pattern", "").strip("[]").split("=") + + if key == "domain-name:value": + # We force domain names to lower case. + self._add_indicator(ioc=value.lower(), + ioc_coll=collection, + ioc_coll_list=collection["domains"]) + elif key == "process:name": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["processes"]) + elif key == "email-addr:value": + # We force email addresses to lower case. + self._add_indicator(ioc=value.lower(), + ioc_coll=collection, + ioc_coll_list=collection["emails"]) + elif key == "file:name": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["file_names"]) + elif key == "file:path": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["file_paths"]) + elif key == "file:hashes.sha256": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["files_sha256"]) + elif key == "app:id": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["app_ids"]) + elif key == "configuration-profile:id": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["ios_profile_ids"]) + + for coll in collections: + self.log.info("Extracted %d indicators for collection with name \"%s\"", + coll["count"], coll["name"]) + + self.ioc_collections.extend(collections) def load_indicators_files(self, files, load_default=True): """ @@ -149,7 +188,8 @@ class Indicators: if os.path.isfile(file_path): self.parse_stix2(file_path) else: - self.log.warning("This indicators file %s does not exist", file_path) + self.log.warning("No indicators file exists at path %s", + file_path) # Load downloaded indicators and any indicators from env variable. if load_default: @@ -159,12 +199,12 @@ class Indicators: self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count) def get_iocs(self, ioc_type): - for ioc_file in self.ioc_files: - for ioc in ioc_file.get(ioc_type, []): + for ioc_collection in self.ioc_collections: + for ioc in ioc_collection.get(ioc_type, []): yield { "value": ioc, "type": ioc_type, - "name": ioc_file["name"] + "name": ioc_collection["name"], } def check_domain(self, url): @@ -424,18 +464,18 @@ def download_indicators_files(log): for ioc_entry in res.json(): ioc_url = ioc_entry["stix2_url"] - log.info("Downloading indicator file '%s' from '%s'", ioc_entry["name"], ioc_url) + log.info("Downloading indicator file %s from %s", ioc_entry["name"], ioc_url) res = requests.get(ioc_url) if res.status_code != 200: - log.warning("Could not find indicator file '%s'", ioc_url) + log.warning("Could not find indicator file %s", ioc_url) continue clean_file_name = ioc_url.lstrip("https://").replace("/", "_") ioc_path = os.path.join(data_dir, clean_file_name) # Write file to disk. This will overwrite any older version of the STIX2 file. - with open(ioc_path, "w", encoding="utf-8") as f: - f.write(res.text) + with open(ioc_path, "w", encoding="utf-8") as handle: + handle.write(res.text) - log.info("Saved indicator file to '%s'", os.path.basename(ioc_path)) + log.info("Saved indicator file to %s", os.path.basename(ioc_path))