diff --git a/mvt/common/logo.py b/mvt/common/logo.py index eb91c1f..a3ea034 100644 --- a/mvt/common/logo.py +++ b/mvt/common/logo.py @@ -5,7 +5,7 @@ from rich import print -from .updates import MVTUpdates, IndicatorsUpdates +from .updates import IndicatorsUpdates, MVTUpdates from .version import MVT_VERSION @@ -29,6 +29,14 @@ def check_updates() -> None: print("\t\t[bold]You have not yet downloaded any indicators, check the `download-iocs` command!") return + # We only perform this check at a fixed frequency, in order to not + # overburden the user with too many lookups if the command is being run + # multiple times. + should_check, hours = ioc_updates.should_check() + if not should_check: + print(f"\t\tIndicators updates checked recently, next automatic check in {int(hours)} hours") + return + try: ioc_to_update = ioc_updates.check() except Exception: @@ -36,7 +44,8 @@ def check_updates() -> None: else: if ioc_to_update: print("\t\t[bold]There are updates to your indicators files! Run the `download-iocs` command to update!") - return + else: + print("\t\tYour indicators files seem to be up to date.") def logo() -> None: diff --git a/mvt/common/updates.py b/mvt/common/updates.py index dba5d90..1e66798 100644 --- a/mvt/common/updates.py +++ b/mvt/common/updates.py @@ -3,12 +3,12 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import os -import yaml import logging +import os from datetime import datetime import requests +import yaml from packaging import version from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER @@ -16,6 +16,9 @@ from .version import MVT_VERSION log = logging.getLogger(__name__) +# In hours. +INDICATORS_CHECK_FREQUENCY = 12 + class MVTUpdates: @@ -42,6 +45,24 @@ class IndicatorsUpdates: self.latest_update_path = os.path.join(MVT_DATA_FOLDER, "latest_indicators_update") + self.latest_check_path = os.path.join(MVT_DATA_FOLDER, + "latest_indicators_check") + + def get_latest_check(self) -> int: + if not os.path.exists(self.latest_check_path): + return 0 + + with open(self.latest_check_path, "r") as handle: + data = handle.read().strip() + if data: + return int(data) + + return 0 + + def set_latest_check(self) -> None: + timestamp = int(datetime.utcnow().timestamp()) + with open(self.latest_check_path, "w") as handle: + handle.write(str(timestamp)) def get_latest_update(self) -> int: if not os.path.exists(self.latest_update_path): @@ -50,11 +71,9 @@ class IndicatorsUpdates: with open(self.latest_update_path, "r") as handle: data = handle.read().strip() if data: - latest_update = int(data) - else: - latest_update = 0 + return int(data) - return latest_update + return 0 def set_latest_update(self) -> None: timestamp = int(datetime.utcnow().timestamp()) @@ -144,7 +163,22 @@ class IndicatorsUpdates: return latest_commit_ts + def should_check(self) -> (bool, int): + now = datetime.utcnow() + latest_check_ts = self.get_latest_check() + latest_check_dt = datetime.fromtimestamp(latest_check_ts) + + diff = now - latest_check_dt + diff_hours = divmod(diff.total_seconds(), 3600)[0] + + if diff_hours >= INDICATORS_CHECK_FREQUENCY: + return True, 0 + + return False, INDICATORS_CHECK_FREQUENCY - diff_hours + def check(self) -> bool: + self.set_latest_check() + latest_update = self.get_latest_update() latest_commit_ts = self._get_remote_file_latest_commit(self.index_owner,