From 6a63256b5cba4e7d85f16919a12aed7f47a26776 Mon Sep 17 00:00:00 2001 From: Nex Date: Wed, 18 Aug 2021 13:08:32 +0200 Subject: [PATCH] Added ability to import multiple STIX2 indicators files --- mvt/common/indicators.py | 30 ++++++++++++++-------------- mvt/ios/cli.py | 42 +++++++++++++++++++++------------------- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 17905f6..cbd98fe 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -17,32 +17,30 @@ class Indicators: functions to compare extracted artifacts to the indicators. """ - def __init__(self, file_path, log=None): - self.file_path = file_path - with open(self.file_path, "r") as handle: - try: - self.data = json.load(handle) - except json.decoder.JSONDecodeError: - raise IndicatorsFileBadFormat("Unable to parse STIX2 indicators file, the file seems malformed or in the wrong format") - + def __init__(self, log=None): self.log = log self.ioc_domains = [] self.ioc_processes = [] self.ioc_emails = [] self.ioc_files = [] - self._parse_stix_file() - def _parse_stix_file(self): - """Extract IOCs of given type from STIX2 definitions. + def parse_stix2(self, file_path): + """Extract indicators from a STIX2 file. """ - for entry in self.data["objects"]: + self.log.info("Parsing STIX2 indicators file at path %s", + file_path) + + with open(file_path, "r") as handle: try: - if entry["type"] != "indicator": - continue - except KeyError: + data = json.load(handle) + except json.decoder.JSONDecodeError: + raise IndicatorsFileBadFormat("Unable to parse STIX2 indicators file, the file seems malformed or in the wrong format") + + for entry in data.get("objects", []): + if entry.get("type", "") != "indicator": continue - key, value = entry["pattern"].strip("[]").split("=") + key, value = entry.get("pattern", "").strip("[]").split("=") value = value.strip("'") if key == "domain-name:value": diff --git a/mvt/ios/cli.py b/mvt/ios/cli.py index daa30d9..eeea9d9 100644 --- a/mvt/ios/cli.py +++ b/mvt/ios/cli.py @@ -121,7 +121,8 @@ def extract_key(password, backup_path, key_file): # Command: check-backup #============================================================================== @cli.command("check-backup", help="Extract artifacts from an iTunes backup") -@click.option("--iocs", "-i", type=click.Path(exists=True), help="Path to indicators file") +@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, + default=[], help="Path to indicators file") @click.option("--output", "-o", type=click.Path(exists=False), help=OUTPUT_HELP_MESSAGE) @click.option("--fast", "-f", is_flag=True, help="Avoid running time/resource consuming features") @click.option("--list-modules", "-l", is_flag=True, help="Print list of available modules and exit") @@ -145,11 +146,11 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module): log.critical("Unable to create output folder %s: %s", output, e) ctx.exit(1) - if iocs: - # Pre-load indicators for performance reasons. - log.info("Loading indicators from provided file at: %s", iocs) + for ioc_path in iocs: try: - indicators = Indicators(iocs) + indicators = Indicators() + indicators.log = log + indicators.parse_stix2(ioc_path) except IndicatorsFileBadFormat as e: log.critical(e) ctx.exit(1) @@ -183,7 +184,8 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module): # Command: check-fs #============================================================================== @cli.command("check-fs", help="Extract artifacts from a full filesystem dump") -@click.option("--iocs", "-i", type=click.Path(exists=True), help="Path to indicators file") +@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, + default=[], help="Path to indicators file") @click.option("--output", "-o", type=click.Path(exists=False), help=OUTPUT_HELP_MESSAGE) @click.option("--fast", "-f", is_flag=True, help="Avoid running time/resource consuming features") @click.option("--list-modules", "-l", is_flag=True, help="Print list of available modules and exit") @@ -207,11 +209,11 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module): log.critical("Unable to create output folder %s: %s", output, e) ctx.exit(1) - if iocs: - # Pre-load indicators for performance reasons. - log.info("Loading indicators from provided file at: %s", iocs) + for ioc_path in iocs: try: - indicators = Indicators(iocs) + indicators = Indicators() + indicators.log = log + indicators.parse_stix2(ioc_path) except IndicatorsFileBadFormat as e: log.critical(e) ctx.exit(1) @@ -246,8 +248,8 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module): # Command: check-iocs #============================================================================== @cli.command("check-iocs", help="Compare stored JSON results to provided indicators") -@click.option("--iocs", "-i", required=True, type=click.Path(exists=True), - help="Path to indicators file") +@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, + default=[], required=True, help="Path to indicators file") @click.option("--list-modules", "-l", is_flag=True, help="Print list of available modules and exit") @click.option("--module", "-m", help="Name of a single module you would like to run instead of all") @click.argument("FOLDER", type=click.Path(exists=True)) @@ -267,14 +269,14 @@ def check_iocs(ctx, iocs, list_modules, module, folder): log.info("Checking stored results against provided indicators...") - # Pre-load indicators for performance reasons. - log.info("Loading indicators from provided file at: %s", iocs) - - try: - indicators = Indicators(iocs) - except IndicatorsFileBadFormat as e: - log.critical(e) - ctx.exit(1) + for ioc_path in iocs: + try: + indicators = Indicators() + indicators.log = log + indicators.parse_stix2(ioc_path) + except IndicatorsFileBadFormat as e: + log.critical(e) + ctx.exit(1) for file_name in os.listdir(folder): name_only, ext = os.path.splitext(file_name)