Merge pull request #246 from mvt-project/check-bugreport

Check bugreport
This commit is contained in:
Nex 2022-02-02 18:12:24 +01:00 committed by GitHub
commit b1b282ac20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 802 additions and 83 deletions

View File

@ -5,6 +5,8 @@
import logging
import os
from pathlib import Path
from zipfile import ZipFile
import click
from rich.logging import RichHandler
@ -21,6 +23,7 @@ from .lookups.koodous import koodous_lookup
from .lookups.virustotal import virustotal_lookup
from .modules.adb import ADB_MODULES
from .modules.backup import BACKUP_MODULES
from .modules.bugreport import BUGREPORT_MODULES
# Setup logging using Rich.
LOG_FORMAT = "[%(name)s] %(message)s"
@ -46,7 +49,7 @@ def version():
#==============================================================================
# Download APKs
# Command: download-apks
#==============================================================================
@cli.command("download-apks", help="Download all or non-safelisted installed APKs installed on the device")
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@ -99,7 +102,7 @@ def download_apks(ctx, all_apks, virustotal, koodous, all_checks, output, from_f
#==============================================================================
# Checks through ADB
# Command: check-adb
#==============================================================================
@cli.command("check-adb", help="Check an Android device over adb")
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@ -157,7 +160,81 @@ def check_adb(ctx, iocs, output, fast, list_modules, module, serial):
#==============================================================================
# Check ADB backup
# Command: check-bugreport
#==============================================================================
@cli.command("check-bugreport", help="Check an Android Bug Report")
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.argument("BUGREPORT_PATH", type=click.Path(exists=True))
@click.pass_context
def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path):
if list_modules:
log.info("Following is the list of available check-bugreport modules:")
for adb_module in BUGREPORT_MODULES:
log.info(" - %s", adb_module.__name__)
return
log.info("Checking an Android Bug Report located at: %s", bugreport_path)
if output and not os.path.exists(output):
try:
os.makedirs(output)
except Exception as e:
log.critical("Unable to create output folder %s: %s", output, e)
ctx.exit(1)
indicators = Indicators(log=log)
indicators.load_indicators_files(iocs)
if os.path.isfile(bugreport_path):
bugreport_format = "zip"
zip_archive = ZipFile(bugreport_path)
zip_files = []
for file_name in zip_archive.namelist():
zip_files.append(file_name)
elif os.path.isdir(bugreport_path):
bugreport_format = "dir"
folder_files = []
parent_path = Path(bugreport_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(bugreport_path)):
for file_name in subfiles:
folder_files.append(os.path.relpath(os.path.join(root, file_name), parent_path))
timeline = []
timeline_detected = []
for bugreport_module in BUGREPORT_MODULES:
if module and bugreport_module.__name__ != module:
continue
m = bugreport_module(base_folder=bugreport_path, output_folder=output,
log=logging.getLogger(bugreport_module.__module__))
if bugreport_format == "zip":
m.from_zip(zip_archive, zip_files)
else:
m.from_folder(bugreport_path, folder_files)
if indicators.total_ioc_count:
m.indicators = indicators
m.indicators.log = m.log
run_module(m)
timeline.extend(m.timeline)
timeline_detected.extend(m.timeline_detected)
if output:
if len(timeline) > 0:
save_timeline(timeline, os.path.join(output, "timeline.csv"))
if len(timeline_detected) > 0:
save_timeline(timeline_detected, os.path.join(output, "timeline_detected.csv"))
#==============================================================================
# Command: check-backup
#==============================================================================
@cli.command("check-backup", help="Check an Android Backup")
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)

View File

@ -47,7 +47,6 @@ class DumpsysAccessibility(AndroidExtraction):
break
service = line.split(":")[1].strip()
log.info("Found installed accessibility service \"%s\"", service)
results.append({
"package_name": service.split("/")[0],
@ -62,6 +61,9 @@ class DumpsysAccessibility(AndroidExtraction):
output = self._adb_command("dumpsys accessibility")
self.results = self.parse_accessibility(output)
for result in self.results:
log.info("Found installed accessibility service \"%s\"", result.get("service"))
self.log.info("Identified a total of %d accessibility services", len(self.results))
self._adb_disconnect()

View File

@ -39,23 +39,22 @@ class DumpsysBatteryDaily(AndroidExtraction):
continue
@staticmethod
def parse_battery_history(output):
def parse_battery_daily(output):
results = []
daily = None
daily_updates = []
for line in output.split("\n")[1:]:
if line.startswith(" Daily from "):
if len(daily_updates) > 0:
results.extend(daily_updates)
daily_updates = []
timeframe = line[13:].strip()
date_from, date_to = timeframe.strip(":").split(" to ", 1)
daily = {"from": date_from[0:10], "to": date_to[0:10]}
if not daily:
continue
if line.strip() == "":
results.extend(daily_updates)
daily = None
daily_updates = []
if not daily:
continue
if not line.strip().startswith("Update "):
@ -86,7 +85,7 @@ class DumpsysBatteryDaily(AndroidExtraction):
self._adb_connect()
output = self._adb_command("dumpsys batterystats --daily")
self.results = self.parse_battery_history(output)
self.results = self.parse_battery_daily(output)
self.log.info("Extracted %d records from battery daily stats", len(self.results))

View File

@ -39,6 +39,7 @@ DANGEROUS_PERMISSIONS = [
"com.android.browser.permission.READ_HISTORY_BOOKMARKS",
]
class Packages(AndroidExtraction):
"""This module extracts the list of installed packages."""

View File

@ -0,0 +1,15 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .accessibility import Accessibility
from .activities import Activities
from .battery_daily import BatteryDaily
from .battery_history import BatteryHistory
from .dbinfo import DBInfo
from .packages import Packages
from .receivers import Receivers
BUGREPORT_MODULES = [Accessibility, Activities, BatteryDaily, BatteryHistory,
DBInfo, Packages, Receivers]

View File

@ -0,0 +1,63 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.modules.adb.dumpsys_accessibility import DumpsysAccessibility
from .base import BugReportModule
log = logging.getLogger(__name__)
class Accessibility(BugReportModule):
"""This module extracts stats on accessibility."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self):
dumpstate_files = self._get_files_by_pattern("dumpstate-*")
if not dumpstate_files:
return
content = self._get_file_content(dumpstate_files[0])
if not content:
return
lines = []
in_accessibility = False
for line in content.decode().split("\n"):
if line.strip() == "DUMP OF SERVICE accessibility:":
in_accessibility = True
continue
if not in_accessibility:
continue
if line.strip() == "------------------------------------------------------------------------------":
break
lines.append(line)
self.results = DumpsysAccessibility.parse_accessibility("\n".join(lines))
for result in self.results:
log.info("Found installed accessibility service \"%s\"", result.get("service"))
self.log.info("Identified a total of %d accessibility services", len(self.results))

View File

@ -0,0 +1,62 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.modules.adb.dumpsys_activities import DumpsysActivities
from .base import BugReportModule
log = logging.getLogger(__name__)
class Activities(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self):
if not self.indicators:
return
for intent, activities in self.results.items():
for activity in activities:
ioc = self.indicators.check_app_id(activity["package_name"])
if ioc:
activity["matched_indicator"] = ioc
self.detected.append({intent: activity})
continue
def run(self):
dumpstate_files = self._get_files_by_pattern("dumpstate-*")
if not dumpstate_files:
return
content = self._get_file_content(dumpstate_files[0])
if not content:
return
lines = []
in_package = False
for line in content.decode().split("\n"):
if line.strip() == "DUMP OF SERVICE package:":
in_package = True
continue
if not in_package:
continue
if line.strip() == "------------------------------------------------------------------------------":
break
lines.append(line)
self.results = DumpsysActivities.parse_activity_resolver_table("\n".join(lines))

View File

@ -0,0 +1,47 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# See the file 'LICENSE' for usage and copying permissions, or find a copy at
# https://github.com/mvt-project/mvt/blob/main/LICENSE
import fnmatch
import logging
import os
from mvt.common.module import MVTModule
log = logging.getLogger(__name__)
class BugReportModule(MVTModule):
"""This class provides a base for all Android Bug Report modules."""
zip_archive = None
def from_folder(self, extract_path, extract_files):
self.extract_path = extract_path
self.extract_files = extract_files
def from_zip(self, zip_archive, zip_files):
self.zip_archive = zip_archive
self.zip_files = zip_files
def _get_files_by_pattern(self, pattern):
file_names = []
if self.zip_archive:
for zip_file in self.zip_files:
file_names.append(zip_file)
else:
file_names = self.extract_files
return fnmatch.filter(file_names, pattern)
def _get_file_content(self, file_path):
if self.zip_archive:
handle = self.zip_archive.open(file_path)
else:
handle = open(os.path.join(self.extract_path, file_path), "rb")
data = handle.read()
handle.close()
return data

View File

@ -0,0 +1,76 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.modules.adb.dumpsys_battery_daily import DumpsysBatteryDaily
from .base import BugReportModule
log = logging.getLogger(__name__)
class BatteryDaily(BugReportModule):
"""This module extracts records from battery daily updates."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
return {
"timestamp": record["from"],
"module": self.__class__.__name__,
"event": "battery_daily",
"data": f"Recorded update of package {record['package_name']} with vers {record['vers']}"
}
def check_indicators(self):
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self):
dumpstate_files = self._get_files_by_pattern("dumpstate-*")
if not dumpstate_files:
return
content = self._get_file_content(dumpstate_files[0])
if not content:
return
lines = []
in_batterystats = False
in_daily = False
for line in content.decode().split("\n"):
if line.strip() == "DUMP OF SERVICE batterystats:":
in_batterystats = True
continue
if not in_batterystats:
continue
if line.strip() == "Daily stats:":
lines.append(line)
in_daily = True
continue
if not in_daily:
continue
if line.strip() == "":
break
lines.append(line)
self.results = DumpsysBatteryDaily.parse_battery_daily("\n".join(lines))

View File

@ -0,0 +1,69 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.modules.adb.dumpsys_battery_history import \
DumpsysBatteryHistory
from .base import BugReportModule
log = logging.getLogger(__name__)
class BatteryHistory(BugReportModule):
"""This module extracts records from battery daily updates."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self):
dumpstate_files = self._get_files_by_pattern("dumpstate-*")
if not dumpstate_files:
return
content = self._get_file_content(dumpstate_files[0])
if not content:
return
lines = []
in_batterystats = False
in_history = False
for line in content.decode().split("\n"):
if line.strip() == "********** Print latest newbatterystats **********":
in_batterystats = True
continue
if not in_batterystats:
continue
if line.strip().startswith("Battery History "):
lines.append(line)
in_history = True
continue
if not in_history:
continue
if line.strip() == "":
break
lines.append(line)
self.results = DumpsysBatteryHistory.parse_battery_history("\n".join(lines))

View File

@ -0,0 +1,63 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.modules.adb.dumpsys_dbinfo import DumpsysDBInfo
from .base import BugReportModule
log = logging.getLogger(__name__)
class DBInfo(BugReportModule):
"""This module extracts records from battery daily updates."""
slug = "dbinfo"
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self):
if not self.indicators:
return
for result in self.results:
path = result.get("path", "")
for part in path.split("/"):
ioc = self.indicators.check_app_id(part)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self):
dumpstate_files = self._get_files_by_pattern("dumpstate-*")
if not dumpstate_files:
return
content = self._get_file_content(dumpstate_files[0])
if not content:
return
in_dbinfo = False
lines = []
for line in content.decode().split("\n"):
if line.strip() == "DUMP OF SERVICE dbinfo:":
in_dbinfo = True
continue
if not in_dbinfo:
continue
if line.strip() == "------------------------------------------------------------------------------":
break
lines.append(line)
self.results = DumpsysDBInfo.parse_dbinfo("\n".join(lines))

View File

@ -0,0 +1,118 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import re
from mvt.android.modules.adb.packages import Packages as PCK
from .base import BugReportModule
log = logging.getLogger(__name__)
class Packages(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
records = []
timestamps = [
{"event": "package_install", "timestamp": record["timestamp"]},
{"event": "package_first_install", "timestamp": record["first_install_time"]},
{"event": "package_last_update", "timestamp": record["last_update_time"]},
]
for ts in timestamps:
records.append({
"timestamp": ts["timestamp"],
"module": self.__class__.__name__,
"event": ts["event"],
"data": f"Install or update of package {record['package_name']}",
})
return records
def check_indicators(self):
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
@staticmethod
def parse_packages_list(output):
pkg_rxp = re.compile(r" Package \[(.+?)\].*")
results = []
package_name = None
package = {}
lines = []
for line in output.split("\n"):
if line.startswith(" Package ["):
if len(lines) > 0:
details = PCK.parse_package_for_details("\n".join(lines))
package.update(details)
results.append(package)
package = {}
matches = pkg_rxp.findall(line)
if not matches:
continue
package_name = matches[0]
package["package_name"] = package_name
continue
if not package_name:
continue
lines.append(line)
return results
def run(self):
dumpstate_files = self._get_files_by_pattern("dumpstate-*")
if not dumpstate_files:
return
content = self._get_file_content(dumpstate_files[0])
if not content:
return
in_package = False
in_packages_list = False
lines = []
for line in content.decode().split("\n"):
if line.strip() == "DUMP OF SERVICE package:":
in_package = True
continue
if not in_package:
continue
if line.strip() == "Packages:":
in_packages_list = True
continue
if not in_packages_list:
continue
if line.strip() == "":
break
lines.append(line)
self.results = self.parse_packages_list("\n".join(lines))

View File

@ -0,0 +1,84 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.android.modules.adb.dumpsys_receivers import DumpsysReceivers
from .base import BugReportModule
log = logging.getLogger(__name__)
INTENT_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS"
INTENT_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED"
INTENT_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED"
INTENT_PHONE_STATE = "android.intent.action.PHONE_STATE"
INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL"
class Receivers(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self):
if not self.indicators:
return
for intent, receivers in self.results.items():
for receiver in receivers:
if intent == INTENT_NEW_OUTGOING_SMS:
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
receiver["receiver"])
elif intent == INTENT_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
receiver["receiver"])
elif intent == INTENT_DATA_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
receiver["receiver"])
elif intent == INTENT_PHONE_STATE:
self.log.info("Found a receiver monitoring telephony state/incoming calls: \"%s\"",
receiver["receiver"])
elif intent == INTENT_NEW_OUTGOING_CALL:
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
receiver["receiver"])
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
continue
def run(self):
dumpstate_files = self._get_files_by_pattern("dumpstate-*")
if not dumpstate_files:
return
content = self._get_file_content(dumpstate_files[0])
if not content:
return
in_receivers = False
lines = []
for line in content.decode().split("\n"):
if line.strip() == "DUMP OF SERVICE package:":
in_receivers = True
continue
if not in_receivers:
continue
if line.strip() == "------------------------------------------------------------------------------":
break
lines.append(line)
self.results = DumpsysReceivers.parse_receiver_resolver_table("\n".join(lines))

View File

@ -20,7 +20,7 @@ class Indicators:
def __init__(self, log=None):
self.data_dir = user_data_dir("mvt")
self.log = log
self.ioc_files = []
self.ioc_collections = []
self.total_ioc_count = 0
def _load_downloaded_indicators(self):
@ -33,7 +33,7 @@ class Indicators:
def _check_stix2_env_variable(self):
"""
Checks if a variable MVT_STIX2 contains path to STIX Files.
Checks if a variable MVT_STIX2 contains path to a STIX files.
"""
if "MVT_STIX2" not in os.environ:
return
@ -43,14 +43,17 @@ class Indicators:
if os.path.isfile(path):
self.parse_stix2(path)
else:
self.log.info("Invalid STIX2 path %s in MVT_STIX2 environment variable", path)
self.log.error("Path specified with env MVT_STIX2 is not a valid file: %s",
path)
def _generate_indicators_file(self):
def _new_collection(self, cid="", name="", description="", file_name="",
file_path=""):
return {
"name": "",
"description": "",
"file_name": "",
"file_path": "",
"id": cid,
"name": name,
"description": description,
"stix2_file_name": file_name,
"stix2_file_path": file_path,
"domains": [],
"processes": [],
"emails": [],
@ -62,10 +65,11 @@ class Indicators:
"count": 0,
}
def _add_indicator(self, ioc, ioc_file, iocs_list):
if ioc not in iocs_list:
iocs_list.append(ioc)
ioc_file["count"] += 1
def _add_indicator(self, ioc, ioc_coll, ioc_coll_list):
ioc = ioc.strip("'")
if ioc not in ioc_coll_list:
ioc_coll_list.append(ioc)
ioc_coll["count"] += 1
self.total_ioc_count += 1
def parse_stix2(self, file_path):
@ -77,69 +81,106 @@ class Indicators:
"""
self.log.info("Parsing STIX2 indicators file at path %s", file_path)
ioc_file = self._generate_indicators_file()
ioc_file["file_path"] = file_path
ioc_file["file_name"] = os.path.basename(file_path)
with open(file_path, "r", encoding="utf-8") as handle:
try:
data = json.load(handle)
except json.decoder.JSONDecodeError:
self.log.critical("Unable to parse STIX2 indicator file. The file is malformed or in the wrong format.")
self.log.critical("Unable to parse STIX2 indicator file. " \
"The file is corrupted or in the wrong format!")
return
malware = {}
indicators = []
relationships = []
for entry in data.get("objects", []):
entry_type = entry.get("type", "")
if entry_type == "malware":
ioc_file["name"] = entry.get("name", "") or ioc_file["file_name"]
ioc_file["description"] = entry.get("description", "") or ioc_file["file_name"]
continue
malware[entry["id"]] = {
"name": entry["name"],
"description": entry["description"],
}
elif entry_type == "indicator":
indicators.append(entry)
elif entry_type == "relationship":
relationships.append(entry)
if entry_type != "indicator":
continue
collections = []
for mal_id, mal_values in malware.items():
collection = self._new_collection(mal_id, mal_values.get("name"),
mal_values.get("description"),
os.path.basename(file_path),
file_path)
collections.append(collection)
key, value = entry.get("pattern", "").strip("[]").split("=")
value = value.strip("'")
# We loop through all indicators.
for indicator in indicators:
malware_id = None
malware_name = None
malware_description = None
if key == "domain-name:value":
# We force domain names to lower case.
self._add_indicator(ioc=value.lower(),
ioc_file=ioc_file,
iocs_list=ioc_file["domains"])
elif key == "process:name":
self._add_indicator(ioc=value,
ioc_file=ioc_file,
iocs_list=ioc_file["processes"])
elif key == "email-addr:value":
# We force email addresses to lower case.
self._add_indicator(ioc=value.lower(),
ioc_file=ioc_file,
iocs_list=ioc_file["emails"])
elif key == "file:name":
self._add_indicator(ioc=value,
ioc_file=ioc_file,
iocs_list=ioc_file["file_names"])
elif key == "file:path":
self._add_indicator(ioc=value,
ioc_file=ioc_file,
iocs_list=ioc_file["file_paths"])
elif key == "file:hashes.sha256":
self._add_indicator(ioc=value,
ioc_file=ioc_file,
iocs_list=ioc_file["files_sha256"])
elif key == "app:id":
self._add_indicator(ioc=value,
ioc_file=ioc_file,
iocs_list=ioc_file["app_ids"])
elif key == "configuration-profile:id":
self._add_indicator(ioc=value,
ioc_file=ioc_file,
iocs_list=ioc_file["ios_profile_ids"])
# We loop through all relationships and find the one pertinent to
# the current indicator.
for relationship in relationships:
if relationship["source_ref"] != indicator["id"]:
continue
self.log.info("Loaded %d indicators from \"%s\" indicators file",
ioc_file["count"], ioc_file["name"])
# Look for a malware definition with the correct identifier.
if relationship["target_ref"] in malware.keys():
malware_id = relationship["target_ref"]
malware_name = malware[relationship["target_ref"]].get("name", "")
malware_description = malware[relationship["target_ref"]].get("description", "")
break
self.ioc_files.append(ioc_file)
# Now we look for the correct collection matching the malware ID we
# got from the relationship.
for collection in collections:
if collection["id"] != malware_id:
continue
key, value = indicator.get("pattern", "").strip("[]").split("=")
if key == "domain-name:value":
# We force domain names to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["domains"])
elif key == "process:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["processes"])
elif key == "email-addr:value":
# We force email addresses to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["emails"])
elif key == "file:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_names"])
elif key == "file:path":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_paths"])
elif key == "file:hashes.sha256":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["files_sha256"])
elif key == "app:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["app_ids"])
elif key == "configuration-profile:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["ios_profile_ids"])
break
for coll in collections:
self.log.info("Extracted %d indicators for collection with name \"%s\"",
coll["count"], coll["name"])
self.ioc_collections.extend(collections)
def load_indicators_files(self, files, load_default=True):
"""
@ -149,7 +190,8 @@ class Indicators:
if os.path.isfile(file_path):
self.parse_stix2(file_path)
else:
self.log.warning("This indicators file %s does not exist", file_path)
self.log.warning("No indicators file exists at path %s",
file_path)
# Load downloaded indicators and any indicators from env variable.
if load_default:
@ -159,12 +201,13 @@ class Indicators:
self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count)
def get_iocs(self, ioc_type):
for ioc_file in self.ioc_files:
for ioc in ioc_file.get(ioc_type, []):
for ioc_collection in self.ioc_collections:
for ioc in ioc_collection.get(ioc_type, []):
yield {
"value": ioc,
"type": ioc_type,
"name": ioc_file["name"]
"name": ioc_collection["name"],
"stix2_file_name": ioc_collection["stix2_file_name"],
}
def check_domain(self, url):
@ -424,18 +467,18 @@ def download_indicators_files(log):
for ioc_entry in res.json():
ioc_url = ioc_entry["stix2_url"]
log.info("Downloading indicator file '%s' from '%s'", ioc_entry["name"], ioc_url)
log.info("Downloading indicator file %s from %s", ioc_entry["name"], ioc_url)
res = requests.get(ioc_url)
if res.status_code != 200:
log.warning("Could not find indicator file '%s'", ioc_url)
log.warning("Could not find indicator file %s", ioc_url)
continue
clean_file_name = ioc_url.lstrip("https://").replace("/", "_")
ioc_path = os.path.join(data_dir, clean_file_name)
# Write file to disk. This will overwrite any older version of the STIX2 file.
with open(ioc_path, "w", encoding="utf-8") as f:
f.write(res.text)
with open(ioc_path, "w", encoding="utf-8") as handle:
handle.write(res.text)
log.info("Saved indicator file to '%s'", os.path.basename(ioc_path))
log.info("Saved indicator file to %s", os.path.basename(ioc_path))

View File

@ -73,7 +73,7 @@ def check_for_links(text):
:returns: Search results.
"""
return re.findall("(?P<url>https?://[^\s]+)", text, re.IGNORECASE)
return re.findall(r"(?P<url>https?://[^\s]+)", text, re.IGNORECASE)
def get_sha256_from_file_path(file_path):