Adding indicator to matched results

This commit is contained in:
Nex 2022-01-23 15:01:49 +01:00
parent e1211991aa
commit cce9159eda
25 changed files with 149 additions and 94 deletions

View File

@ -26,7 +26,7 @@ class Indicators:
def _load_downloaded_indicators(self): def _load_downloaded_indicators(self):
if not os.path.isdir(self.data_dir): if not os.path.isdir(self.data_dir):
return False return
for f in os.listdir(self.data_dir): for f in os.listdir(self.data_dir):
if f.lower().endswith(".stix2"): if f.lower().endswith(".stix2"):
@ -37,7 +37,7 @@ class Indicators:
Checks if a variable MVT_STIX2 contains path to STIX Files. Checks if a variable MVT_STIX2 contains path to STIX Files.
""" """
if "MVT_STIX2" not in os.environ: if "MVT_STIX2" not in os.environ:
return False return
paths = os.environ["MVT_STIX2"].split(":") paths = os.environ["MVT_STIX2"].split(":")
for path in paths: for path in paths:
@ -164,10 +164,11 @@ class Indicators:
for ioc in ioc_file.get(ioc_type, []): for ioc in ioc_file.get(ioc_type, []):
yield { yield {
"value": ioc, "value": ioc,
"type": ioc_type,
"name": ioc_file["name"] "name": ioc_file["name"]
} }
def check_domain(self, url) -> bool: def check_domain(self, url):
"""Check if a given URL matches any of the provided domain indicators. """Check if a given URL matches any of the provided domain indicators.
:param url: URL to match against domain indicators :param url: URL to match against domain indicators
@ -179,7 +180,7 @@ class Indicators:
# TODO: If the IOC domain contains a subdomain, it is not currently # TODO: If the IOC domain contains a subdomain, it is not currently
# being matched. # being matched.
if not url: if not url:
return False return None
try: try:
# First we use the provided URL. # First we use the provided URL.
@ -211,10 +212,10 @@ class Indicators:
if ioc["value"].lower() in url: if ioc["value"].lower() in url:
self.log.warning("Maybe found a known suspicious domain %s matching indicators from \"%s\"", self.log.warning("Maybe found a known suspicious domain %s matching indicators from \"%s\"",
url, ioc["name"]) url, ioc["name"])
return True return ioc
# If nothing matched, we can quit here. # If nothing matched, we can quit here.
return False return None
# If all parsing worked, we start walking through available domain indicators. # If all parsing worked, we start walking through available domain indicators.
for ioc in self.get_iocs("domains"): for ioc in self.get_iocs("domains"):
@ -227,7 +228,7 @@ class Indicators:
self.log.warning("Found a known suspicious domain %s matching indicators from \"%s\"", self.log.warning("Found a known suspicious domain %s matching indicators from \"%s\"",
final_url.url, ioc["name"]) final_url.url, ioc["name"])
return True return ioc
# Then we just check the top level domain. # Then we just check the top level domain.
if final_url.top_level.lower() == ioc["value"]: if final_url.top_level.lower() == ioc["value"]:
@ -238,11 +239,9 @@ class Indicators:
self.log.warning("Found a sub-domain with a suspicious top level %s matching indicators from \"%s\"", self.log.warning("Found a sub-domain with a suspicious top level %s matching indicators from \"%s\"",
final_url.url, ioc["name"]) final_url.url, ioc["name"])
return True return ioc
return False def check_domains(self, urls):
def check_domains(self, urls) -> bool:
"""Check a list of URLs against the provided list of domain indicators. """Check a list of URLs against the provided list of domain indicators.
:param urls: List of URLs to check against domain indicators :param urls: List of URLs to check against domain indicators
@ -252,15 +251,14 @@ class Indicators:
""" """
if not urls: if not urls:
return False return None
for url in urls: for url in urls:
if self.check_domain(url): check = self.check_domain(url)
return True if check:
return check
return False def check_process(self, process):
def check_process(self, process) -> bool:
"""Check the provided process name against the list of process """Check the provided process name against the list of process
indicators. indicators.
@ -271,24 +269,22 @@ class Indicators:
""" """
if not process: if not process:
return False return None
proc_name = os.path.basename(process) proc_name = os.path.basename(process)
for ioc in self.get_iocs("processes"): for ioc in self.get_iocs("processes"):
if proc_name == ioc["value"]: if proc_name == ioc["value"]:
self.log.warning("Found a known suspicious process name \"%s\" matching indicators from \"%s\"", self.log.warning("Found a known suspicious process name \"%s\" matching indicators from \"%s\"",
process, ioc["name"]) process, ioc["name"])
return True return ioc
if len(proc_name) == 16: if len(proc_name) == 16:
if ioc["value"].startswith(proc_name): if ioc["value"].startswith(proc_name):
self.log.warning("Found a truncated known suspicious process name \"%s\" matching indicators from \"%s\"", self.log.warning("Found a truncated known suspicious process name \"%s\" matching indicators from \"%s\"",
process, ioc["name"]) process, ioc["name"])
return True return ioc
return False def check_processes(self, processes):
def check_processes(self, processes) -> bool:
"""Check the provided list of processes against the list of """Check the provided list of processes against the list of
process indicators. process indicators.
@ -299,15 +295,14 @@ class Indicators:
""" """
if not processes: if not processes:
return False return None
for process in processes: for process in processes:
if self.check_process(process): check = self.check_process(process)
return True if check:
return check
return False def check_email(self, email):
def check_email(self, email) -> bool:
"""Check the provided email against the list of email indicators. """Check the provided email against the list of email indicators.
:param email: Email address to check against email indicators :param email: Email address to check against email indicators
@ -317,17 +312,15 @@ class Indicators:
""" """
if not email: if not email:
return False return None
for ioc in self.get_iocs("emails"): for ioc in self.get_iocs("emails"):
if email.lower() == ioc["value"].lower(): if email.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious email address \"%s\" matching indicators from \"%s\"", self.log.warning("Found a known suspicious email address \"%s\" matching indicators from \"%s\"",
email, ioc["name"]) email, ioc["name"])
return True return ioc
return False def check_file_name(self, file_name):
def check_file_name(self, file_name) -> bool:
"""Check the provided file name against the list of file indicators. """Check the provided file name against the list of file indicators.
:param file_name: File name to check against file :param file_name: File name to check against file
@ -338,17 +331,15 @@ class Indicators:
""" """
if not file_name: if not file_name:
return False return None
for ioc in self.get_iocs("file_names"): for ioc in self.get_iocs("file_names"):
if ioc["value"] == file_name: if ioc["value"] == file_name:
self.log.warning("Found a known suspicious file name \"%s\" matching indicators from \"%s\"", self.log.warning("Found a known suspicious file name \"%s\" matching indicators from \"%s\"",
file_name, ioc["name"]) file_name, ioc["name"])
return True return ioc
return False def check_file_path(self, file_path):
def check_file_path(self, file_path) -> bool:
"""Check the provided file path against the list of file indicators (both path and name). """Check the provided file path against the list of file indicators (both path and name).
:param file_path: File path or file name to check against file :param file_path: File path or file name to check against file
@ -359,21 +350,20 @@ class Indicators:
""" """
if not file_path: if not file_path:
return False return None
if self.check_file_name(os.path.basename(file_path)): ioc = self.check_file_name(os.path.basename(file_path))
return True if ioc:
return ioc
for ioc in self.get_iocs("file_paths"): for ioc in self.get_iocs("file_paths"):
# Strip any trailing slash from indicator paths to match directories. # Strip any trailing slash from indicator paths to match directories.
if file_path.startswith(ioc["value"].rstrip("/")): if file_path.startswith(ioc["value"].rstrip("/")):
self.log.warning("Found a known suspicious file path \"%s\" matching indicators form \"%s\"", self.log.warning("Found a known suspicious file path \"%s\" matching indicators form \"%s\"",
file_path, ioc["name"]) file_path, ioc["name"])
return True return ioc
return False def check_profile(self, profile_uuid):
def check_profile(self, profile_uuid) -> bool:
"""Check the provided configuration profile UUID against the list of indicators. """Check the provided configuration profile UUID against the list of indicators.
:param profile_uuid: Profile UUID to check against configuration profile indicators :param profile_uuid: Profile UUID to check against configuration profile indicators
@ -386,9 +376,7 @@ class Indicators:
if profile_uuid in ioc["value"]: if profile_uuid in ioc["value"]:
self.log.warning("Found a known suspicious profile ID \"%s\" matching indicators from \"%s\"", self.log.warning("Found a known suspicious profile ID \"%s\" matching indicators from \"%s\"",
profile_uuid, ioc["name"]) profile_uuid, ioc["name"])
return True return ioc
return False
def download_indicators_files(log): def download_indicators_files(log):

View File

@ -6,7 +6,7 @@
import requests import requests
from packaging import version from packaging import version
MVT_VERSION = "1.4.6" MVT_VERSION = "1.4.7"
def check_for_updates(): def check_for_updates():

View File

@ -46,8 +46,10 @@ class ConfigurationProfiles(IOSExtraction):
payload_content = result["plist"]["PayloadContent"][0] payload_content = result["plist"]["PayloadContent"][0]
# Alert on any known malicious configuration profiles in the indicator list. # Alert on any known malicious configuration profiles in the indicator list.
if self.indicators.check_profile(result["plist"]["PayloadUUID"]): ioc = self.indicators.check_profile(result["plist"]["PayloadUUID"])
if ioc:
self.log.warning(f"Found a known malicious configuration profile \"{result['plist']['PayloadDisplayName']}\" with UUID '{result['plist']['PayloadUUID']}'.") self.log.warning(f"Found a known malicious configuration profile \"{result['plist']['PayloadDisplayName']}\" with UUID '{result['plist']['PayloadUUID']}'.")
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
continue continue

View File

@ -41,15 +41,19 @@ class Analytics(IOSExtraction):
if not isinstance(value, str): if not isinstance(value, str):
continue continue
if self.indicators.check_process(value): ioc = self.indicators.check_process(value)
if ioc:
self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s", self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s",
value, result["artifact"], result["timestamp"]) value, result["artifact"], result["timestamp"])
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
continue continue
if self.indicators.check_domain(value): ioc = self.indicators.check_domain(value)
if ioc:
self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s", self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s",
value, result["artifact"], result["timestamp"]) value, result["artifact"], result["timestamp"])
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def _extract_analytics_data(self): def _extract_analytics_data(self):

View File

@ -36,7 +36,9 @@ class CacheFiles(IOSExtraction):
self.detected = {} self.detected = {}
for key, values in self.results.items(): for key, values in self.results.items():
for value in values: for value in values:
if self.indicators.check_domain(value["url"]): ioc = self.indicators.check_domain(value["url"])
if ioc:
value["matched_indicator"] = ioc
if key not in self.detected: if key not in self.detected:
self.detected[key] = [value, ] self.detected[key] = [value, ]
else: else:

View File

@ -40,7 +40,9 @@ class Filesystem(IOSExtraction):
if "path" not in result: if "path" not in result:
continue continue
if self.indicators.check_file_path(result["path"]): ioc = self.indicators.check_file_path(result["path"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
# If we are instructed to run fast, we skip the rest. # If we are instructed to run fast, we skip the rest.
@ -52,6 +54,7 @@ class Filesystem(IOSExtraction):
if ioc["value"] in parts: if ioc["value"] in parts:
self.log.warning("Found known suspicious process name mentioned in file at path \"%s\" matching indicators from \"%s\"", self.log.warning("Found known suspicious process name mentioned in file at path \"%s\" matching indicators from \"%s\"",
result["path"], ioc["name"]) result["path"], ioc["name"])
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def run(self): def run(self):

View File

@ -37,7 +37,12 @@ class SafariFavicon(IOSExtraction):
return return
for result in self.results: for result in self.results:
if self.indicators.check_domain(result["url"]) or self.indicators.check_domain(result["icon_url"]): ioc = self.indicators.check_domain(result["url"])
if not ioc:
ioc = self.indicators.check_domain(result["icon_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def _process_favicon_db(self, file_path): def _process_favicon_db(self, file_path):

View File

@ -34,15 +34,18 @@ class ShutdownLog(IOSExtraction):
return return
for result in self.results: for result in self.results:
if self.indicators.check_file_path(result["client"]): ioc = self.indicators.check_file_path(result["client"])
self.detected.append(result) if ioc:
continue result["matched_indicator"] = ioc
self.detected.append(result)
continue
for ioc in self.indicators.get_iocs("processes"): for ioc in self.indicators.get_iocs("processes"):
parts = result["client"].split("/") parts = result["client"].split("/")
if ioc in parts: if ioc in parts:
self.log.warning("Found mention of a known malicious process \"%s\" in shutdown.log", self.log.warning("Found mention of a known malicious process \"%s\" in shutdown.log",
ioc) ioc)
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
continue continue

View File

@ -18,9 +18,11 @@ class WebkitBase(IOSExtraction):
if not self.indicators: if not self.indicators:
return return
for item in self.results: for result in self.results:
if self.indicators.check_domain(item["url"]): ioc = self.indicators.check_domain(result["url"])
self.detected.append(item) if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def _process_webkit_folder(self, root_paths): def _process_webkit_folder(self, root_paths):
for found_path in self._get_fs_files_from_patterns(root_paths): for found_path in self._get_fs_files_from_patterns(root_paths):

View File

@ -42,7 +42,12 @@ class ChromeFavicon(IOSExtraction):
return return
for result in self.results: for result in self.results:
if self.indicators.check_domain(result["url"]) or self.indicators.check_domain(result["icon_url"]): ioc = self.indicators.check_domain(result["url"])
if not ioc:
ioc = self.indicators.check_domain(result["icon_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def run(self): def run(self):

View File

@ -41,7 +41,9 @@ class ChromeHistory(IOSExtraction):
return return
for result in self.results: for result in self.results:
if self.indicators.check_domain(result["url"]): ioc = self.indicators.check_domain(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def run(self): def run(self):

View File

@ -40,8 +40,12 @@ class FirefoxFavicon(IOSExtraction):
return return
for result in self.results: for result in self.results:
if (self.indicators.check_domain(result.get("url", "")) or ioc = self.indicators.check_domain(result.get("url", ""))
self.indicators.check_domain(result.get("history_url", ""))): if not ioc:
ioc = self.indicators.check_domain(result.get("history_url", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def run(self): def run(self):

View File

@ -44,7 +44,9 @@ class FirefoxHistory(IOSExtraction):
return return
for result in self.results: for result in self.results:
if self.indicators.check_domain(result["url"]): ioc = self.indicators.check_domain(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def run(self): def run(self):

View File

@ -43,7 +43,9 @@ class IDStatusCache(IOSExtraction):
for result in self.results: for result in self.results:
if result.get("user", "").startswith("mailto:"): if result.get("user", "").startswith("mailto:"):
email = result["user"][7:].strip("'") email = result["user"][7:].strip("'")
if self.indicators.check_email(email): ioc = self.indicators.check_email(email)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
continue continue

View File

@ -60,30 +60,38 @@ class LocationdClients(IOSExtraction):
parts = result["package"].split("/") parts = result["package"].split("/")
proc_name = parts[len(parts)-1] proc_name = parts[len(parts)-1]
if self.indicators.check_process(proc_name): ioc = self.indicators.check_process(proc_name)
if ioc:
self.log.warning("Found a suspicious process name in LocationD entry %s", self.log.warning("Found a suspicious process name in LocationD entry %s",
result["package"]) result["package"])
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
continue continue
if "BundlePath" in result: if "BundlePath" in result:
if self.indicators.check_file_path(result["BundlePath"]): ioc = self.indicators.check_file_path(result["BundlePath"])
if ioc:
self.log.warning("Found a suspicious file path in Location D: %s", self.log.warning("Found a suspicious file path in Location D: %s",
result["BundlePath"]) result["BundlePath"])
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
continue continue
if "Executable" in result: if "Executable" in result:
if self.indicators.check_file_path(result["Executable"]): ioc = self.indicators.check_file_path(result["Executable"])
if ioc:
self.log.warning("Found a suspicious file path in Location D: %s", self.log.warning("Found a suspicious file path in Location D: %s",
result["Executable"]) result["Executable"])
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
continue continue
if "Registered" in result: if "Registered" in result:
if self.indicators.check_file_path(result["Registered"]): ioc = self.indicators.check_file_path(result["Registered"])
if ioc:
self.log.warning("Found a suspicious file path in Location D: %s", self.log.warning("Found a suspicious file path in Location D: %s",
result["Registered"]) result["Registered"])
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
continue continue

View File

@ -41,7 +41,9 @@ class OSAnalyticsADDaily(IOSExtraction):
return return
for result in self.results: for result in self.results:
if self.indicators.check_process(result["package"]): ioc = self.indicators.check_process(result["package"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def run(self): def run(self):

View File

@ -44,16 +44,22 @@ class SafariBrowserState(IOSExtraction):
return return
for result in self.results: for result in self.results:
if "tab_url" in result and self.indicators.check_domain(result["tab_url"]): if "tab_url" in result:
self.detected.append(result) ioc = self.indicators.check_domain(result["tab_url"])
continue if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
if "session_data" not in result: if "session_data" not in result:
continue continue
for session_entry in result["session_data"]: for session_entry in result["session_data"]:
if "entry_url" in session_entry and self.indicators.check_domain(session_entry["entry_url"]): if "entry_url" in session_entry:
self.detected.append(result) ioc = self.indicators.check_domain(session_entry["entry_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def _process_browser_state_db(self, db_path): def _process_browser_state_db(self, db_path):
conn = sqlite3.connect(db_path) conn = sqlite3.connect(db_path)

View File

@ -80,7 +80,9 @@ class SafariHistory(IOSExtraction):
return return
for result in self.results: for result in self.results:
if self.indicators.check_domain(result["url"]): ioc = self.indicators.check_domain(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def _process_history_db(self, history_path): def _process_history_db(self, history_path):

View File

@ -54,9 +54,11 @@ class Shortcuts(IOSExtraction):
if not self.indicators: if not self.indicators:
return return
for action in self.results: for result in self.results:
if self.indicators.check_domains(action["action_urls"]): ioc = self.indicators.check_domains(result["action_urls"])
self.detected.append(action) if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self): def run(self):
self._find_ios_database(backup_ids=SHORTCUT_BACKUP_IDS, self._find_ios_database(backup_ids=SHORTCUT_BACKUP_IDS,

View File

@ -41,10 +41,12 @@ class SMS(IOSExtraction):
if not self.indicators: if not self.indicators:
return return
for message in self.results: for result in self.results:
message_links = check_for_links(message.get("text", "")) message_links = check_for_links(result.get("text", ""))
if self.indicators.check_domains(message_links): ioc = self.indicators.check_domains(message_links)
self.detected.append(message) if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self): def run(self):
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, self._find_ios_database(backup_ids=SMS_BACKUP_IDS,

View File

@ -71,8 +71,9 @@ class TCC(IOSExtraction):
return return
for result in self.results: for result in self.results:
if self.indicators.check_process(result["client"]): ioc = self.indicators.check_process(result["client"])
self.log.warning("Found malicious process in TCC database: %s", result["client"]) if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)
def process_db(self, file_path): def process_db(self, file_path):

View File

@ -37,7 +37,9 @@ class WebkitResourceLoadStatistics(IOSExtraction):
self.detected = {} self.detected = {}
for key, items in self.results.items(): for key, items in self.results.items():
for item in items: for item in items:
if self.indicators.check_domain(item["registrable_domain"]): ioc = self.indicators.check_domain(item["registrable_domain"])
if ioc:
item["matched_indicator"] = ioc
if key not in self.detected: if key not in self.detected:
self.detected[key] = [item, ] self.detected[key] = [item, ]
else: else:

View File

@ -66,7 +66,9 @@ class WebkitSessionResourceLog(IOSExtraction):
all_origins = set([entry["origin"]] + source_domains + destination_domains) all_origins = set([entry["origin"]] + source_domains + destination_domains)
if self.indicators.check_domains(all_origins): ioc = self.indicators.check_domains(all_origins)
if ioc:
entry["matched_indicator"] = ioc
self.detected.append(entry) self.detected.append(entry)
redirect_path = "" redirect_path = ""

View File

@ -47,9 +47,11 @@ class Whatsapp(IOSExtraction):
if not self.indicators: if not self.indicators:
return return
for message in self.results: for result in self.results:
if self.indicators.check_domains(message.get("links", [])): ioc = self.indicators.check_domains(result.get("links", []))
self.detected.append(message) if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self): def run(self):
self._find_ios_database(backup_ids=WHATSAPP_BACKUP_IDS, self._find_ios_database(backup_ids=WHATSAPP_BACKUP_IDS,

View File

@ -237,5 +237,7 @@ class NetBase(IOSExtraction):
if not result["proc_id"]: if not result["proc_id"]:
continue continue
if self.indicators.check_process(proc_name): ioc = self.indicators.check_process(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result) self.detected.append(result)