diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..fa99484 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,27 @@ +# Development + +The Mobile Verification Toolkit team welcomes contributions of new forensic modules or other contributions which help improve the software. + +## Testing + +MVT uses `pytest` for unit and integration tests. Code style consistency is maintained with `flake8`, `ruff` and `black`. All can +be run automatically with: + +```bash +make check +``` + +Run these tests before making new commits or opening pull requests. + +## Profiling + +Some MVT modules extract and process significant amounts of data during the analysis process or while checking results against known indicators. Care must be +take to avoid inefficient code paths as we add new modules. + +MVT modules can be profiled with Python built-in `cProfile` by setting the `MVT_PROFILE` environment variable. + +```bash +MVT_PROFILE=1 dev/mvt-ios check-backup test_backup +``` + +Open an issue or PR if you are encountering significant performance issues when analyzing a device with MVT. \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index 81d2465..6c410dd 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -1,7 +1,7 @@ site_name: Mobile Verification Toolkit repo_url: https://github.com/mvt-project/mvt edit_uri: edit/main/docs/ -copyright: Copyright © 2021-2022 MVT Project Developers +copyright: Copyright © 2021-2023 MVT Project Developers site_description: Mobile Verification Toolkit Documentation markdown_extensions: - attr_list @@ -46,4 +46,5 @@ nav: - Check an Android Backup (SMS messages): "android/backup.md" - Download APKs: "android/download_apks.md" - Indicators of Compromise: "iocs.md" + - Development: "development.md" - License: "license.md" diff --git a/mvt/common/cmd_check_iocs.py b/mvt/common/cmd_check_iocs.py index 6e01bd3..2277241 100644 --- a/mvt/common/cmd_check_iocs.py +++ b/mvt/common/cmd_check_iocs.py @@ -8,6 +8,7 @@ import os from typing import Optional from mvt.common.command import Command +from mvt.common.utils import exec_or_profile log = logging.getLogger(__name__) @@ -69,7 +70,7 @@ class CmdCheckIOCS(Command): m.indicators.log = m.log try: - m.check_indicators() + exec_or_profile("m.check_indicators()", globals(), locals()) except NotImplementedError: continue else: diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 63c4582..e764a86 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -7,7 +7,9 @@ import json import logging import os from typing import Any, Dict, Iterator, List, Optional, Union +from functools import lru_cache +import ahocorasick from appdirs import user_data_dir from .url import URL @@ -241,6 +243,40 @@ class Indicators: "stix2_file_name": ioc_collection["stix2_file_name"], } + @lru_cache() + def get_ioc_matcher( + self, ioc_type: Optional[str] = None, ioc_list: Optional[list] = None + ) -> ahocorasick.Automaton: + """ + Build an Aho-Corasick automaton from a list of iocs (i.e indicators) + Returns an Aho-Corasick automaton + + This data-structue and algorithim allows for fast matching of a large number + of match strings (i.e IOCs) against a large body of text. This will also + match strings containing the IOC, so it is important to confirm the + match is a valid IOC before using it. + + for _, ioc in domains_automaton.iter(url.domain.lower()): + if ioc.value == url.domain.lower(): + print(ioc) + + We use an LRU cache to avoid rebuilding the automaton every time we call a + function such as check_domain(). + """ + automaton = ahocorasick.Automaton() + if ioc_type: + iocs = self.get_iocs(ioc_type) + elif ioc_list: + iocs = ioc_list + else: + raise ValueError("Must provide either ioc_tyxpe or ioc_list") + + for ioc in iocs: + automaton.add_word(ioc["value"], ioc) + automaton.make_automaton() + return automaton + + @lru_cache() def check_domain(self, url: str) -> Union[dict, None]: """Check if a given URL matches any of the provided domain indicators. @@ -254,6 +290,9 @@ class Indicators: if not isinstance(url, str): return None + # Create an Aho-Corasick automaton from the list of domains + domain_matcher = self.get_ioc_matcher("domains") + try: # First we use the provided URL. orig_url = URL(url) @@ -265,6 +304,7 @@ class Indicators: self.log.debug("Found a shortened URL %s -> %s", url, unshortened) if unshortened is None: + self.log.warning("Unable to unshorten URL %s", url) return None # Now we check for any nested URL shorteners. @@ -285,12 +325,13 @@ class Indicators: except Exception: # If URL parsing failed, we just try to do a simple substring # match. - for ioc in self.get_iocs("domains"): + for idx, ioc in domain_matcher.iter(url): if ioc["value"].lower() in url: self.log.warning( "Maybe found a known suspicious domain %s " - 'matching indicators from "%s"', + 'matching indicator "%s" from "%s"', url, + ioc["value"], ioc["name"], ) return ioc @@ -300,43 +341,47 @@ class Indicators: # If all parsing worked, we start walking through available domain # indicators. - for ioc in self.get_iocs("domains"): + for idx, ioc in domain_matcher.iter(final_url.domain.lower()): # First we check the full domain. if final_url.domain.lower() == ioc["value"]: if orig_url.is_shortened and orig_url.url != final_url.url: self.log.warning( "Found a known suspicious domain %s " - 'shortened as %s matching indicators from "%s"', + 'shortened as %s matching indicator "%s" from "%s"', final_url.url, orig_url.url, + ioc["value"], ioc["name"], ) else: self.log.warning( "Found a known suspicious domain %s " - 'matching indicators from "%s"', + 'matching indicator "%s" from "%s"', final_url.url, + ioc["value"], ioc["name"], ) - return ioc - # Then we just check the top level domain. + # Then we just check the top level domain. + for idx, ioc in domain_matcher.iter(final_url.top_level.lower()): if final_url.top_level.lower() == ioc["value"]: if orig_url.is_shortened and orig_url.url != final_url.url: self.log.warning( "Found a sub-domain with suspicious top " "level %s shortened as %s matching " - 'indicators from "%s"', + 'indicator "%s" from "%s"', final_url.url, orig_url.url, + ioc["value"], ioc["name"], ) else: self.log.warning( "Found a sub-domain with a suspicious top " - 'level %s matching indicators from "%s"', + 'level %s matching indicator "%s" from "%s"', final_url.url, + ioc["value"], ioc["name"], ) diff --git a/mvt/common/module.py b/mvt/common/module.py index 0dc02d1..2b7f21d 100644 --- a/mvt/common/module.py +++ b/mvt/common/module.py @@ -11,6 +11,8 @@ from typing import Any, Dict, List, Optional, Union import simplejson as json +from .utils import exec_or_profile + class DatabaseNotFoundError(Exception): pass @@ -162,7 +164,7 @@ def run_module(module: MVTModule) -> None: module.log.info("Running module %s...", module.__class__.__name__) try: - module.run() + exec_or_profile("module.run()", globals(), locals()) except NotImplementedError: module.log.exception( "The run() procedure of module %s was not implemented yet!", @@ -192,7 +194,7 @@ def run_module(module: MVTModule) -> None: ) else: try: - module.check_indicators() + exec_or_profile("module.check_indicators()", globals(), locals()) except NotImplementedError: module.log.info( "The %s module does not support checking for indicators", diff --git a/mvt/common/utils.py b/mvt/common/utils.py index 0e33d47..961b6f0 100644 --- a/mvt/common/utils.py +++ b/mvt/common/utils.py @@ -8,6 +8,7 @@ import hashlib import logging import os import re +import cProfile from typing import Any, Iterator, Union from rich.logging import RichHandler @@ -225,3 +226,11 @@ def set_verbose_logging(verbose: bool = False): handler.setLevel(logging.DEBUG) else: handler.setLevel(logging.INFO) + + +def exec_or_profile(module, globals, locals): + """Hook for profiling MVT modules""" + if int(os.environ.get("MVT_PROFILE", False)): + cProfile.runctx(module, globals, locals) + else: + exec(module, globals, locals) diff --git a/setup.cfg b/setup.cfg index 80edd89..f2a3139 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,6 +33,7 @@ install_requires = libusb1 >=3.0.0 cryptography >=38.0.1 pyyaml >=6.0 + pyahocorasick >= 2.0.0 [options.packages.find] where = ./