mirror of https://github.com/mvt-project/mvt.git
Add command to download latest public indicators
Squashed commit of the following: commit c0d9e8d5d188c13e7e5ec0612e99bfb7e25f47d4 Author: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org> Date: Fri Jan 7 16:05:12 2022 +0100 Update name of indicators JSON file commit f719e49c5f942cef64931ecf422b6a6e7b8c9f17 Author: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org> Date: Fri Jan 7 15:38:03 2022 +0100 Do not set indicators option on module if no indicators were loaded commit a289eb8de936f7d74c6c787cbb8daf5c5bec015c Author: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org> Date: Fri Jan 7 14:43:00 2022 +0100 Simplify code for loading IoCs commit 0804563415ee80d76c13d3b38ffe639fa14caa14 Author: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org> Date: Fri Jan 7 13:43:47 2022 +0100 Add metadata to IoC entries commit 97d0e893c1a0736c4931363ff40f09a030b90cf6 Author: tek <tek@randhome.io> Date: Fri Dec 17 16:43:09 2021 +0100 Implements automated loading of indicators commit c381e14df92ae4d7d846a1c97bcf6639cc526082 Author: tek <tek@randhome.io> Date: Fri Dec 17 12:41:15 2021 +0100 Improves download-indicators commit b938e02ddfd0b916fd883f510b467491a4a84e5f Author: tek <tek@randhome.io> Date: Fri Dec 17 01:44:26 2021 +0100 Adds download-indicators for mvt-ios and mvt-android
This commit is contained in:
parent
dc8eeb618e
commit
28d57e7178
|
@ -41,4 +41,6 @@ export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
|
|||
- [Predator from Cytrox](https://citizenlab.ca/2021/12/pegasus-vs-predator-dissidents-doubly-infected-iphone-reveals-cytrox-mercenary-spyware/) ([STIX2](https://raw.githubusercontent.com/AmnestyTech/investigations/master/2021-12-16_cytrox/cytrox.stix2))
|
||||
- [This repository](https://github.com/Te-k/stalkerware-indicators) contains IOCs for Android stalkerware including [a STIX MVT-compatible file](https://raw.githubusercontent.com/Te-k/stalkerware-indicators/master/stalkerware.stix2).
|
||||
|
||||
You can automaticallly download the latest public indicator files with the command `mvt-ios download-indicators` or `mvt-android download-indicators`.
|
||||
|
||||
Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs.
|
||||
|
|
|
@ -12,7 +12,8 @@ from rich.logging import RichHandler
|
|||
from mvt.common.help import HELP_MSG_MODULE, HELP_MSG_IOC
|
||||
from mvt.common.help import HELP_MSG_FAST, HELP_MSG_OUTPUT, HELP_MSG_LIST_MODULES
|
||||
from mvt.common.help import HELP_MSG_SERIAL
|
||||
from mvt.common.indicators import Indicators, IndicatorsFileBadFormat
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.indicators import download_indicators_files
|
||||
from mvt.common.logo import logo
|
||||
from mvt.common.module import run_module, save_timeline
|
||||
|
||||
|
@ -129,13 +130,7 @@ def check_adb(ctx, iocs, output, fast, list_modules, module, serial):
|
|||
ctx.exit(1)
|
||||
|
||||
indicators = Indicators(log=log)
|
||||
for ioc_path in iocs:
|
||||
try:
|
||||
indicators.parse_stix2(ioc_path)
|
||||
except IndicatorsFileBadFormat as e:
|
||||
log.critical(e)
|
||||
ctx.exit(1)
|
||||
log.info("Loaded a total of %d indicators", indicators.ioc_count)
|
||||
indicators.load_indicators_files(iocs)
|
||||
|
||||
timeline = []
|
||||
timeline_detected = []
|
||||
|
@ -145,13 +140,12 @@ def check_adb(ctx, iocs, output, fast, list_modules, module, serial):
|
|||
|
||||
m = adb_module(output_folder=output, fast_mode=fast,
|
||||
log=logging.getLogger(adb_module.__module__))
|
||||
if indicators.ioc_count:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
if serial:
|
||||
m.serial = serial
|
||||
|
||||
if indicators.ioc_count > 0:
|
||||
indicators.log = m.log
|
||||
m.indicators = indicators
|
||||
|
||||
run_module(m)
|
||||
timeline.extend(m.timeline)
|
||||
timeline_detected.extend(m.timeline_detected)
|
||||
|
@ -184,13 +178,7 @@ def check_backup(ctx, iocs, output, backup_path, serial):
|
|||
ctx.exit(1)
|
||||
|
||||
indicators = Indicators(log=log)
|
||||
for ioc_path in iocs:
|
||||
try:
|
||||
indicators.parse_stix2(ioc_path)
|
||||
except IndicatorsFileBadFormat as e:
|
||||
log.critical(e)
|
||||
ctx.exit(1)
|
||||
log.info("Loaded a total of %d indicators", indicators.ioc_count)
|
||||
indicators.load_indicators_files(iocs)
|
||||
|
||||
if os.path.isfile(backup_path):
|
||||
log.critical("The path you specified is a not a folder!")
|
||||
|
@ -203,12 +191,17 @@ def check_backup(ctx, iocs, output, backup_path, serial):
|
|||
for module in BACKUP_MODULES:
|
||||
m = module(base_folder=backup_path, output_folder=output,
|
||||
log=logging.getLogger(module.__module__))
|
||||
|
||||
if indicators.ioc_count:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
if serial:
|
||||
m.serial = serial
|
||||
|
||||
if indicators.ioc_count > 0:
|
||||
indicators.log = m.log
|
||||
m.indicators = indicators
|
||||
|
||||
run_module(m)
|
||||
|
||||
#==============================================================================
|
||||
# Command: download-indicators
|
||||
#==============================================================================
|
||||
@cli.command("download-indicators", help="Download public STIX2 indicators")
|
||||
def download_indicators():
|
||||
download_indicators_files(log)
|
||||
|
|
|
@ -3,24 +3,22 @@
|
|||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
from appdirs import user_data_dir
|
||||
|
||||
from .url import URL
|
||||
|
||||
|
||||
class IndicatorsFileBadFormat(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Indicators:
|
||||
"""This class is used to parse indicators from a STIX2 file and provide
|
||||
functions to compare extracted artifacts to the indicators.
|
||||
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, log=None):
|
||||
self.data_dir = user_data_dir("mvt")
|
||||
self.log = log
|
||||
self.ioc_domains = []
|
||||
self.ioc_processes = []
|
||||
|
@ -30,24 +28,48 @@ class Indicators:
|
|||
self.ioc_app_ids = []
|
||||
self.ios_profile_ids = []
|
||||
self.ioc_count = 0
|
||||
self._check_env_variable()
|
||||
|
||||
def _add_indicator(self, ioc, iocs_list):
|
||||
if ioc not in iocs_list:
|
||||
iocs_list.append(ioc)
|
||||
self.ioc_count += 1
|
||||
|
||||
def _check_env_variable(self):
|
||||
def _load_downloaded_indicators(self):
|
||||
if not os.path.isdir(self.data_dir):
|
||||
return False
|
||||
|
||||
for f in os.listdir(self.data_dir):
|
||||
if f.lower().endswith(".stix2"):
|
||||
self.parse_stix2(os.path.join(self.data_dir, f))
|
||||
|
||||
def _check_stix2_env_variable(self):
|
||||
"""
|
||||
Checks if a variable MVT_STIX2 contains path to STIX Files
|
||||
"""
|
||||
if "MVT_STIX2" in os.environ:
|
||||
paths = os.environ["MVT_STIX2"].split(":")
|
||||
for path in paths:
|
||||
if os.path.isfile(path):
|
||||
self.parse_stix2(path)
|
||||
else:
|
||||
self.log.info("Invalid STIX2 path %s in MVT_STIX2 environment variable", path)
|
||||
if "MVT_STIX2" not in os.environ:
|
||||
return False
|
||||
|
||||
paths = os.environ["MVT_STIX2"].split(":")
|
||||
for path in paths:
|
||||
if os.path.isfile(path):
|
||||
self.parse_stix2(path)
|
||||
else:
|
||||
self.log.info("Invalid STIX2 path %s in MVT_STIX2 environment variable", path)
|
||||
|
||||
def load_indicators_files(self, files):
|
||||
"""
|
||||
Load a list of indicators files
|
||||
"""
|
||||
for file_path in files:
|
||||
if os.path.isfile(file_path):
|
||||
self.parse_stix2(file_path)
|
||||
else:
|
||||
self.log.warning("This indicators file %s does not exist", file_path)
|
||||
|
||||
# Load downloaded indicators and any indicators from env variable
|
||||
self._load_downloaded_indicators()
|
||||
self._check_stix2_env_variable()
|
||||
self.log.info("Loaded a total of %d unique indicators", self.ioc_count)
|
||||
|
||||
def parse_stix2(self, file_path):
|
||||
"""Extract indicators from a STIX2 file.
|
||||
|
@ -56,14 +78,13 @@ class Indicators:
|
|||
:type file_path: str
|
||||
|
||||
"""
|
||||
self.log.info("Parsing STIX2 indicators file at path %s",
|
||||
file_path)
|
||||
|
||||
self.log.info("Parsing STIX2 indicators file at path %s", file_path)
|
||||
with open(file_path, "r") as handle:
|
||||
try:
|
||||
data = json.load(handle)
|
||||
except json.decoder.JSONDecodeError:
|
||||
raise IndicatorsFileBadFormat("Unable to parse STIX2 indicators file, the file seems malformed or in the wrong format")
|
||||
self.log.critical("Unable to parse STIX2 indicator file. The file is malformed or in the wrong format.")
|
||||
return
|
||||
|
||||
for entry in data.get("objects", []):
|
||||
if entry.get("type", "") != "indicator":
|
||||
|
@ -281,7 +302,7 @@ class Indicators:
|
|||
if not file_path:
|
||||
return False
|
||||
|
||||
for ioc_file in self.ioc_files:
|
||||
for ioc_file in self.ioc_files:
|
||||
# Strip any trailing slash from indicator paths to match directories.
|
||||
if file_path.startswith(ioc_file.rstrip("/")):
|
||||
return True
|
||||
|
@ -299,4 +320,36 @@ class Indicators:
|
|||
if profile_uuid in self.ios_profile_ids:
|
||||
return True
|
||||
|
||||
return False
|
||||
return False
|
||||
|
||||
|
||||
def download_indicators_files(log):
|
||||
"""
|
||||
Download indicators from repo into MVT app data directory
|
||||
"""
|
||||
data_dir = user_data_dir("mvt")
|
||||
if not os.path.isdir(data_dir):
|
||||
os.makedirs(data_dir, exist_ok=True)
|
||||
|
||||
# Download latest list of indicators from the MVT repo.
|
||||
res = requests.get("https://github.com/mvt-project/mvt/raw/main/public_indicators.json")
|
||||
if res.status_code != 200:
|
||||
log.warning("Unable to find retrieve list of indicators from the MVT repository.")
|
||||
return
|
||||
|
||||
for ioc_entry in res.json():
|
||||
ioc_url = ioc_entry["stix2_url"]
|
||||
log.info("Downloading indicator file '%s' from '%s'", ioc_entry["name"], ioc_url)
|
||||
|
||||
res = requests.get(ioc_url)
|
||||
if res.status_code != 200:
|
||||
log.warning("Could not find indicator file '%s'", ioc_url)
|
||||
continue
|
||||
|
||||
clean_file_name = ioc_url.lstrip("https://").replace("/", "_")
|
||||
ioc_path = os.path.join(data_dir, clean_file_name)
|
||||
|
||||
# Write file to disk. This will overwrite any older version of the STIX2 file.
|
||||
with io.open(ioc_path, "w") as f:
|
||||
f.write(res.text)
|
||||
log.info("Saved indicator file to '%s'", os.path.basename(ioc_path))
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
import logging
|
||||
import os
|
||||
import io
|
||||
|
||||
import click
|
||||
from rich.logging import RichHandler
|
||||
|
@ -13,7 +14,8 @@ from rich.prompt import Prompt
|
|||
from mvt.common.help import HELP_MSG_MODULE, HELP_MSG_IOC
|
||||
from mvt.common.help import HELP_MSG_FAST, HELP_MSG_OUTPUT
|
||||
from mvt.common.help import HELP_MSG_LIST_MODULES
|
||||
from mvt.common.indicators import Indicators, IndicatorsFileBadFormat
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.indicators import download_indicators_files
|
||||
from mvt.common.logo import logo
|
||||
from mvt.common.module import run_module, save_timeline
|
||||
from mvt.common.options import MutuallyExclusiveOption
|
||||
|
@ -157,13 +159,7 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
|
|||
ctx.exit(1)
|
||||
|
||||
indicators = Indicators(log=log)
|
||||
for ioc_path in iocs:
|
||||
try:
|
||||
indicators.parse_stix2(ioc_path)
|
||||
except IndicatorsFileBadFormat as e:
|
||||
log.critical(e)
|
||||
ctx.exit(1)
|
||||
log.info("Loaded a total of %d indicators", indicators.ioc_count)
|
||||
indicators.load_indicators_files(iocs)
|
||||
|
||||
timeline = []
|
||||
timeline_detected = []
|
||||
|
@ -174,8 +170,7 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
|
|||
m = backup_module(base_folder=backup_path, output_folder=output, fast_mode=fast,
|
||||
log=logging.getLogger(backup_module.__module__))
|
||||
m.is_backup = True
|
||||
|
||||
if indicators.ioc_count > 0:
|
||||
if indicators.ioc_count:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
|
||||
|
@ -220,13 +215,7 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
|
|||
ctx.exit(1)
|
||||
|
||||
indicators = Indicators(log=log)
|
||||
for ioc_path in iocs:
|
||||
try:
|
||||
indicators.parse_stix2(ioc_path)
|
||||
except IndicatorsFileBadFormat as e:
|
||||
log.critical(e)
|
||||
ctx.exit(1)
|
||||
log.info("Loaded a total of %d indicators", indicators.ioc_count)
|
||||
indicators.load_indicators_files(iocs)
|
||||
|
||||
timeline = []
|
||||
timeline_detected = []
|
||||
|
@ -238,8 +227,7 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
|
|||
log=logging.getLogger(fs_module.__module__))
|
||||
|
||||
m.is_fs_dump = True
|
||||
|
||||
if iocs:
|
||||
if indicators.ioc_count:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
|
||||
|
@ -280,13 +268,7 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
|
|||
log.info("Checking stored results against provided indicators...")
|
||||
|
||||
indicators = Indicators(log=log)
|
||||
for ioc_path in iocs:
|
||||
try:
|
||||
indicators.parse_stix2(ioc_path)
|
||||
except IndicatorsFileBadFormat as e:
|
||||
log.critical(e)
|
||||
ctx.exit(1)
|
||||
log.info("Loaded a total of %d indicators", indicators.ioc_count)
|
||||
indicators.load_indicators_files(iocs)
|
||||
|
||||
for file_name in os.listdir(folder):
|
||||
name_only, ext = os.path.splitext(file_name)
|
||||
|
@ -304,11 +286,18 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
|
|||
|
||||
m = iocs_module.from_json(file_path,
|
||||
log=logging.getLogger(iocs_module.__module__))
|
||||
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
if indicators.ioc_count:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
|
||||
try:
|
||||
m.check_indicators()
|
||||
except NotImplementedError:
|
||||
continue
|
||||
|
||||
#==============================================================================
|
||||
# Command: download-indicators
|
||||
#==============================================================================
|
||||
@cli.command("download-indicators", help="Download public STIX2 indicators")
|
||||
def download_indicators():
|
||||
download_indicators_files(log)
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
[
|
||||
{
|
||||
"name": "NSO Group Pegasus Indicators of Compromise",
|
||||
"source": "Amnesty International",
|
||||
"reference": "https://www.amnesty.org/en/latest/research/2021/07/forensic-methodology-report-how-to-catch-nso-groups-pegasus/",
|
||||
"stix2_url": "https://raw.githubusercontent.com/AmnestyTech/investigations/master/2021-07-18_nso/pegasus.stix2"
|
||||
},
|
||||
{
|
||||
"name": "Cytrox Predator Spyware Indicators of Compromise",
|
||||
"source": "Meta, Amnesty International, Citizen Lab",
|
||||
"reference": "https://citizenlab.ca/2021/12/pegasus-vs-predator-dissidents-doubly-infected-iphone-reveals-cytrox-mercenary-spyware/",
|
||||
"stix2_url": "https://raw.githubusercontent.com/AmnestyTech/investigations/master/2021-12-16_cytrox/cytrox.stix2"
|
||||
}
|
||||
]
|
Loading…
Reference in New Issue