mirror of
https://github.com/mvt-project/mvt.git
synced 2024-06-26 14:28:54 +00:00
Merge pull request #359 from mvt-project/optimise-domain-checking
Optimise domain checking performance
This commit is contained in:
commit
27bd5f03a8
27
docs/development.md
Normal file
27
docs/development.md
Normal file
|
@ -0,0 +1,27 @@
|
|||
# Development
|
||||
|
||||
The Mobile Verification Toolkit team welcomes contributions of new forensic modules or other contributions which help improve the software.
|
||||
|
||||
## Testing
|
||||
|
||||
MVT uses `pytest` for unit and integration tests. Code style consistency is maintained with `flake8`, `ruff` and `black`. All can
|
||||
be run automatically with:
|
||||
|
||||
```bash
|
||||
make check
|
||||
```
|
||||
|
||||
Run these tests before making new commits or opening pull requests.
|
||||
|
||||
## Profiling
|
||||
|
||||
Some MVT modules extract and process significant amounts of data during the analysis process or while checking results against known indicators. Care must be
|
||||
take to avoid inefficient code paths as we add new modules.
|
||||
|
||||
MVT modules can be profiled with Python built-in `cProfile` by setting the `MVT_PROFILE` environment variable.
|
||||
|
||||
```bash
|
||||
MVT_PROFILE=1 dev/mvt-ios check-backup test_backup
|
||||
```
|
||||
|
||||
Open an issue or PR if you are encountering significant performance issues when analyzing a device with MVT.
|
|
@ -1,7 +1,7 @@
|
|||
site_name: Mobile Verification Toolkit
|
||||
repo_url: https://github.com/mvt-project/mvt
|
||||
edit_uri: edit/main/docs/
|
||||
copyright: Copyright © 2021-2022 MVT Project Developers
|
||||
copyright: Copyright © 2021-2023 MVT Project Developers
|
||||
site_description: Mobile Verification Toolkit Documentation
|
||||
markdown_extensions:
|
||||
- attr_list
|
||||
|
@ -46,4 +46,5 @@ nav:
|
|||
- Check an Android Backup (SMS messages): "android/backup.md"
|
||||
- Download APKs: "android/download_apks.md"
|
||||
- Indicators of Compromise: "iocs.md"
|
||||
- Development: "development.md"
|
||||
- License: "license.md"
|
||||
|
|
|
@ -8,6 +8,7 @@ import os
|
|||
from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.utils import exec_or_profile
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -69,7 +70,7 @@ class CmdCheckIOCS(Command):
|
|||
m.indicators.log = m.log
|
||||
|
||||
try:
|
||||
m.check_indicators()
|
||||
exec_or_profile("m.check_indicators()", globals(), locals())
|
||||
except NotImplementedError:
|
||||
continue
|
||||
else:
|
||||
|
|
|
@ -7,7 +7,9 @@ import json
|
|||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, Iterator, List, Optional, Union
|
||||
from functools import lru_cache
|
||||
|
||||
import ahocorasick
|
||||
from appdirs import user_data_dir
|
||||
|
||||
from .url import URL
|
||||
|
@ -241,6 +243,40 @@ class Indicators:
|
|||
"stix2_file_name": ioc_collection["stix2_file_name"],
|
||||
}
|
||||
|
||||
@lru_cache()
|
||||
def get_ioc_matcher(
|
||||
self, ioc_type: Optional[str] = None, ioc_list: Optional[list] = None
|
||||
) -> ahocorasick.Automaton:
|
||||
"""
|
||||
Build an Aho-Corasick automaton from a list of iocs (i.e indicators)
|
||||
Returns an Aho-Corasick automaton
|
||||
|
||||
This data-structue and algorithim allows for fast matching of a large number
|
||||
of match strings (i.e IOCs) against a large body of text. This will also
|
||||
match strings containing the IOC, so it is important to confirm the
|
||||
match is a valid IOC before using it.
|
||||
|
||||
for _, ioc in domains_automaton.iter(url.domain.lower()):
|
||||
if ioc.value == url.domain.lower():
|
||||
print(ioc)
|
||||
|
||||
We use an LRU cache to avoid rebuilding the automaton every time we call a
|
||||
function such as check_domain().
|
||||
"""
|
||||
automaton = ahocorasick.Automaton()
|
||||
if ioc_type:
|
||||
iocs = self.get_iocs(ioc_type)
|
||||
elif ioc_list:
|
||||
iocs = ioc_list
|
||||
else:
|
||||
raise ValueError("Must provide either ioc_tyxpe or ioc_list")
|
||||
|
||||
for ioc in iocs:
|
||||
automaton.add_word(ioc["value"], ioc)
|
||||
automaton.make_automaton()
|
||||
return automaton
|
||||
|
||||
@lru_cache()
|
||||
def check_domain(self, url: str) -> Union[dict, None]:
|
||||
"""Check if a given URL matches any of the provided domain indicators.
|
||||
|
||||
|
@ -254,6 +290,9 @@ class Indicators:
|
|||
if not isinstance(url, str):
|
||||
return None
|
||||
|
||||
# Create an Aho-Corasick automaton from the list of domains
|
||||
domain_matcher = self.get_ioc_matcher("domains")
|
||||
|
||||
try:
|
||||
# First we use the provided URL.
|
||||
orig_url = URL(url)
|
||||
|
@ -265,6 +304,7 @@ class Indicators:
|
|||
|
||||
self.log.debug("Found a shortened URL %s -> %s", url, unshortened)
|
||||
if unshortened is None:
|
||||
self.log.warning("Unable to unshorten URL %s", url)
|
||||
return None
|
||||
|
||||
# Now we check for any nested URL shorteners.
|
||||
|
@ -285,12 +325,13 @@ class Indicators:
|
|||
except Exception:
|
||||
# If URL parsing failed, we just try to do a simple substring
|
||||
# match.
|
||||
for ioc in self.get_iocs("domains"):
|
||||
for idx, ioc in domain_matcher.iter(url):
|
||||
if ioc["value"].lower() in url:
|
||||
self.log.warning(
|
||||
"Maybe found a known suspicious domain %s "
|
||||
'matching indicators from "%s"',
|
||||
'matching indicator "%s" from "%s"',
|
||||
url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
@ -300,43 +341,47 @@ class Indicators:
|
|||
|
||||
# If all parsing worked, we start walking through available domain
|
||||
# indicators.
|
||||
for ioc in self.get_iocs("domains"):
|
||||
for idx, ioc in domain_matcher.iter(final_url.domain.lower()):
|
||||
# First we check the full domain.
|
||||
if final_url.domain.lower() == ioc["value"]:
|
||||
if orig_url.is_shortened and orig_url.url != final_url.url:
|
||||
self.log.warning(
|
||||
"Found a known suspicious domain %s "
|
||||
'shortened as %s matching indicators from "%s"',
|
||||
'shortened as %s matching indicator "%s" from "%s"',
|
||||
final_url.url,
|
||||
orig_url.url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
else:
|
||||
self.log.warning(
|
||||
"Found a known suspicious domain %s "
|
||||
'matching indicators from "%s"',
|
||||
'matching indicator "%s" from "%s"',
|
||||
final_url.url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
|
||||
return ioc
|
||||
|
||||
# Then we just check the top level domain.
|
||||
# Then we just check the top level domain.
|
||||
for idx, ioc in domain_matcher.iter(final_url.top_level.lower()):
|
||||
if final_url.top_level.lower() == ioc["value"]:
|
||||
if orig_url.is_shortened and orig_url.url != final_url.url:
|
||||
self.log.warning(
|
||||
"Found a sub-domain with suspicious top "
|
||||
"level %s shortened as %s matching "
|
||||
'indicators from "%s"',
|
||||
'indicator "%s" from "%s"',
|
||||
final_url.url,
|
||||
orig_url.url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
else:
|
||||
self.log.warning(
|
||||
"Found a sub-domain with a suspicious top "
|
||||
'level %s matching indicators from "%s"',
|
||||
'level %s matching indicator "%s" from "%s"',
|
||||
final_url.url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ from typing import Any, Dict, List, Optional, Union
|
|||
|
||||
import simplejson as json
|
||||
|
||||
from .utils import exec_or_profile
|
||||
|
||||
|
||||
class DatabaseNotFoundError(Exception):
|
||||
pass
|
||||
|
@ -162,7 +164,7 @@ def run_module(module: MVTModule) -> None:
|
|||
module.log.info("Running module %s...", module.__class__.__name__)
|
||||
|
||||
try:
|
||||
module.run()
|
||||
exec_or_profile("module.run()", globals(), locals())
|
||||
except NotImplementedError:
|
||||
module.log.exception(
|
||||
"The run() procedure of module %s was not implemented yet!",
|
||||
|
@ -192,7 +194,7 @@ def run_module(module: MVTModule) -> None:
|
|||
)
|
||||
else:
|
||||
try:
|
||||
module.check_indicators()
|
||||
exec_or_profile("module.check_indicators()", globals(), locals())
|
||||
except NotImplementedError:
|
||||
module.log.info(
|
||||
"The %s module does not support checking for indicators",
|
||||
|
|
|
@ -8,6 +8,7 @@ import hashlib
|
|||
import logging
|
||||
import os
|
||||
import re
|
||||
import cProfile
|
||||
from typing import Any, Iterator, Union
|
||||
|
||||
from rich.logging import RichHandler
|
||||
|
@ -225,3 +226,11 @@ def set_verbose_logging(verbose: bool = False):
|
|||
handler.setLevel(logging.DEBUG)
|
||||
else:
|
||||
handler.setLevel(logging.INFO)
|
||||
|
||||
|
||||
def exec_or_profile(module, globals, locals):
|
||||
"""Hook for profiling MVT modules"""
|
||||
if int(os.environ.get("MVT_PROFILE", False)):
|
||||
cProfile.runctx(module, globals, locals)
|
||||
else:
|
||||
exec(module, globals, locals)
|
||||
|
|
Loading…
Reference in New Issue
Block a user