diff --git a/mvt/android/cli.py b/mvt/android/cli.py index 0795d84..74deeed 100644 --- a/mvt/android/cli.py +++ b/mvt/android/cli.py @@ -13,8 +13,8 @@ from mvt.common.cmd_check_iocs import CmdCheckIOCS from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC, HELP_MSG_LIST_MODULES, HELP_MSG_MODULE, HELP_MSG_OUTPUT, HELP_MSG_SERIAL) -from mvt.common.indicators import download_indicators_files from mvt.common.logo import logo +from mvt.common.updates import IndicatorsUpdates from .cmd_check_adb import CmdAndroidCheckADB from .cmd_check_backup import CmdAndroidCheckBackup @@ -215,4 +215,5 @@ def check_iocs(ctx, iocs, list_modules, module, folder): #============================================================================== @cli.command("download-iocs", help="Download public STIX2 indicators") def download_indicators(): - download_indicators_files(log) + ioc_updates = IndicatorsUpdates() + ioc_updates.update() diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 2c4041e..3ddb29d 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -7,11 +7,13 @@ import json import logging import os -import requests from appdirs import user_data_dir from .url import URL +MVT_DATA_FOLDER = user_data_dir("mvt") +MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators") + class Indicators: """This class is used to parse indicators from a STIX2 file and provide @@ -19,18 +21,17 @@ class Indicators: """ def __init__(self, log=logging.Logger) -> None: - self.data_dir = user_data_dir("mvt") self.log = log self.ioc_collections = [] self.total_ioc_count = 0 def _load_downloaded_indicators(self) -> None: - if not os.path.isdir(self.data_dir): + if not os.path.isdir(MVT_INDICATORS_FOLDER): return - for f in os.listdir(self.data_dir): - if f.lower().endswith(".stix2"): - self.parse_stix2(os.path.join(self.data_dir, f)) + for ioc_file_name in os.listdir(MVT_INDICATORS_FOLDER): + if ioc_file_name.lower().endswith(".stix2"): + self.parse_stix2(os.path.join(MVT_INDICATORS_FOLDER, ioc_file_name)) def _check_stix2_env_variable(self) -> None: """ @@ -446,36 +447,3 @@ class Indicators: self.log.warning("Found a known suspicious app with ID \"%s\" matching indicators from \"%s\"", app_id, ioc["name"]) return ioc - - -def download_indicators_files(log: logging.Logger) -> None: - """ - Download indicators from repo into MVT app data directory. - """ - data_dir = user_data_dir("mvt") - if not os.path.isdir(data_dir): - os.makedirs(data_dir, exist_ok=True) - - # Download latest list of indicators from the MVT repo. - res = requests.get("https://raw.githubusercontent.com/mvt-project/mvt-indicators/main/indicators.json") - if res.status_code != 200: - log.warning("Unable to find retrieve list of indicators from the MVT repository.") - return - - for ioc_entry in res.json(): - ioc_url = ioc_entry["stix2_url"] - log.info("Downloading indicator file %s from %s", ioc_entry["name"], ioc_url) - - res = requests.get(ioc_url) - if res.status_code != 200: - log.warning("Could not find indicator file %s", ioc_url) - continue - - clean_file_name = ioc_url.lstrip("https://").replace("/", "_") - ioc_path = os.path.join(data_dir, clean_file_name) - - # Write file to disk. This will overwrite any older version of the STIX2 file. - with open(ioc_path, "w", encoding="utf-8") as handle: - handle.write(res.text) - - log.info("Saved indicator file to %s", os.path.basename(ioc_path)) diff --git a/mvt/common/logo.py b/mvt/common/logo.py index 908ca3b..9505ecc 100644 --- a/mvt/common/logo.py +++ b/mvt/common/logo.py @@ -5,22 +5,55 @@ from rich import print -from .updates import check_for_updates +from .updates import IndicatorsUpdates, MVTUpdates from .version import MVT_VERSION +def check_updates() -> None: + # First we check for MVT version udpates. + mvt_updates = MVTUpdates() + try: + latest_version = mvt_updates.check() + except Exception: + pass + else: + if latest_version: + print(f"\t\t[bold]Version {latest_version} is available! Upgrade mvt![/bold]") + + # Then we check for indicators files updates. + ioc_updates = IndicatorsUpdates() + + # Before proceeding, we check if we have downloaded an indicators index. + # If not, there's no point in proceeding with the updates check. + if ioc_updates.get_latest_update() == 0: + print("\t\t[bold]You have not yet downloaded any indicators, check the `download-iocs` command![/bold]") + return + + # We only perform this check at a fixed frequency, in order to not + # overburden the user with too many lookups if the command is being run + # multiple times. + should_check, hours = ioc_updates.should_check() + if not should_check: + print(f"\t\tIndicators updates checked recently, next automatic check in {int(hours)} hours") + return + + try: + ioc_to_update = ioc_updates.check() + except Exception: + pass + else: + if ioc_to_update: + print("\t\t[bold]There are updates to your indicators files! Run the `download-iocs` command to update![/bold]") + else: + print("\t\tYour indicators files seem to be up to date.") + + def logo() -> None: print("\n") print("\t[bold]MVT[/bold] - Mobile Verification Toolkit") print("\t\thttps://mvt.re") print(f"\t\tVersion: {MVT_VERSION}") - try: - latest_version = check_for_updates() - except Exception: - pass - else: - if latest_version: - print(f"\t\t[bold]Version {latest_version} is available! Upgrade mvt![/bold]") + check_updates() print("\n") diff --git a/mvt/common/updates.py b/mvt/common/updates.py index 4057bef..f686d75 100644 --- a/mvt/common/updates.py +++ b/mvt/common/updates.py @@ -3,18 +3,209 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging +import os +from datetime import datetime + import requests +import yaml from packaging import version +from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER from .version import MVT_VERSION +log = logging.getLogger(__name__) -def check_for_updates() -> str: - res = requests.get("https://pypi.org/pypi/mvt/json") - data = res.json() - latest_version = data.get("info", {}).get("version", "") +# In hours. +INDICATORS_CHECK_FREQUENCY = 12 - if version.parse(latest_version) > version.parse(MVT_VERSION): - return latest_version - return "" +class MVTUpdates: + + def check(self) -> str: + res = requests.get("https://pypi.org/pypi/mvt/json") + data = res.json() + latest_version = data.get("info", {}).get("version", "") + + if version.parse(latest_version) > version.parse(MVT_VERSION): + return latest_version + + return "" + + +class IndicatorsUpdates: + + def __init__(self) -> None: + self.github_raw_url = "https://raw.githubusercontent.com/{}/{}/{}/{}" + + self.index_owner = "mvt-project" + self.index_repo = "mvt-indicators" + self.index_branch = "main" + self.index_path = "indicators.yaml" + + self.latest_update_path = os.path.join(MVT_DATA_FOLDER, + "latest_indicators_update") + self.latest_check_path = os.path.join(MVT_DATA_FOLDER, + "latest_indicators_check") + + def get_latest_check(self) -> int: + if not os.path.exists(self.latest_check_path): + return 0 + + with open(self.latest_check_path, "r") as handle: + data = handle.read().strip() + if data: + return int(data) + + return 0 + + def set_latest_check(self) -> None: + timestamp = int(datetime.utcnow().timestamp()) + with open(self.latest_check_path, "w") as handle: + handle.write(str(timestamp)) + + def get_latest_update(self) -> int: + if not os.path.exists(self.latest_update_path): + return 0 + + with open(self.latest_update_path, "r") as handle: + data = handle.read().strip() + if data: + return int(data) + + return 0 + + def set_latest_update(self) -> None: + timestamp = int(datetime.utcnow().timestamp()) + with open(self.latest_update_path, "w") as handle: + handle.write(str(timestamp)) + + def get_remote_index(self) -> dict: + url = self.github_raw_url.format(self.index_owner, self.index_repo, + self.index_branch, self.index_path) + res = requests.get(url) + if res.status_code != 200: + log.error("Failed to retrieve indicators index located at %s (error %d)", + url, res.status_code) + return None + + return yaml.safe_load(res.content) + + def download_remote_ioc(self, ioc_url: str) -> str: + res = requests.get(ioc_url) + if res.status_code != 200: + log.error("Failed to download indicators file from %s (error %d)", + ioc_url, res.status_code) + return None + + clean_file_name = ioc_url.lstrip("https://").replace("/", "_") + ioc_path = os.path.join(MVT_INDICATORS_FOLDER, clean_file_name) + + with open(ioc_path, "w", encoding="utf-8") as handle: + handle.write(res.text) + + return ioc_path + + def update(self) -> None: + self.set_latest_check() + + if not os.path.exists(MVT_INDICATORS_FOLDER): + os.makedirs(MVT_INDICATORS_FOLDER) + + index = self.get_remote_index() + for ioc in index.get("indicators", []): + ioc_type = ioc.get("type", "") + + if ioc_type == "github": + github = ioc.get("github", {}) + owner = github.get("owner", "") + repo = github.get("repo", "") + branch = github.get("branch", "main") + path = github.get("path", "") + + ioc_url = self.github_raw_url.format(owner, repo, branch, path) + else: + ioc_url = ioc.get("download_url", "") + + if not ioc_url: + log.error("Could not find a way to download indicator file for %s", + ioc.get("name")) + continue + + ioc_local_path = self.download_remote_ioc(ioc_url) + if not ioc_local_path: + continue + + log.info("Downloaded indicators \"%s\" to %s", + ioc.get("name"), ioc_local_path) + + self.set_latest_update() + + def _get_remote_file_latest_commit(self, owner: str, repo: str, + branch: str, path: str) -> bool: + file_commit_url = f"https://api.github.com/repos/{self.index_owner}/{self.index_repo}/commits?path={self.index_path}" + res = requests.get(file_commit_url) + if res.status_code != 200: + log.error("Failed to get details about file %s (error %d)", + file_commit_url, res.status_code) + return False + + details = res.json() + if len(details) == 0: + return False + + latest_commit = details[0] + latest_commit_date = latest_commit.get("commit", {}).get("author", {}).get("date", None) + if not latest_commit_date: + log.error("Failed to retrieve date of latest update to indicators index file") + return False + + latest_commit_dt = datetime.strptime(latest_commit_date, '%Y-%m-%dT%H:%M:%SZ') + latest_commit_ts = int(latest_commit_dt.timestamp()) + + return latest_commit_ts + + def should_check(self) -> (bool, int): + now = datetime.utcnow() + latest_check_ts = self.get_latest_check() + latest_check_dt = datetime.fromtimestamp(latest_check_ts) + + diff = now - latest_check_dt + diff_hours = divmod(diff.total_seconds(), 3600)[0] + + if diff_hours >= INDICATORS_CHECK_FREQUENCY: + return True, 0 + + return False, INDICATORS_CHECK_FREQUENCY - diff_hours + + def check(self) -> bool: + self.set_latest_check() + + latest_update = self.get_latest_update() + latest_commit_ts = self._get_remote_file_latest_commit(self.index_owner, + self.index_repo, + self.index_branch, + self.index_path) + + if latest_update < latest_commit_ts: + return True + + index = self.get_remote_index() + for ioc in index.get("indicators", []): + if ioc.get("type", "") != "github": + continue + + github = ioc.get("github", {}) + owner = github.get("owner", "") + repo = github.get("repo", "") + branch = github.get("branch", "main") + path = github.get("path", "") + + file_latest_commit_ts = self._get_remote_file_latest_commit(owner, + repo, + branch, + path) + if latest_update < file_latest_commit_ts: + return True + + return False diff --git a/mvt/ios/cli.py b/mvt/ios/cli.py index 8979265..65eba46 100644 --- a/mvt/ios/cli.py +++ b/mvt/ios/cli.py @@ -14,9 +14,9 @@ from mvt.common.cmd_check_iocs import CmdCheckIOCS from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC, HELP_MSG_LIST_MODULES, HELP_MSG_MODULE, HELP_MSG_OUTPUT, HELP_MSG_SERIAL) -from mvt.common.indicators import download_indicators_files from mvt.common.logo import logo from mvt.common.options import MutuallyExclusiveOption +from mvt.common.updates import IndicatorsUpdates from .cmd_check_backup import CmdIOSCheckBackup from .cmd_check_fs import CmdIOSCheckFS @@ -214,7 +214,8 @@ def check_iocs(ctx, iocs, list_modules, module, folder): #============================================================================== @cli.command("download-iocs", help="Download public STIX2 indicators") def download_iocs(): - download_indicators_files(log) + ioc_updates = IndicatorsUpdates() + ioc_updates.update() #============================================================================== diff --git a/setup.cfg b/setup.cfg index 65048ee..4f347cc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,6 +34,7 @@ install_requires = libusb1 >=2.0.1 cryptography >=36.0.1 pymobiledevice3 >= 1.23.9 + pyyaml >=6.0 [options.packages.find] where = ./