Added frequency of indicators updates check

This commit is contained in:
Nex 2022-06-28 12:54:33 +02:00
parent 8ff8e599d8
commit aedef123c9
2 changed files with 51 additions and 8 deletions

View File

@ -5,7 +5,7 @@
from rich import print
from .updates import MVTUpdates, IndicatorsUpdates
from .updates import IndicatorsUpdates, MVTUpdates
from .version import MVT_VERSION
@ -29,6 +29,14 @@ def check_updates() -> None:
print("\t\t[bold]You have not yet downloaded any indicators, check the `download-iocs` command!")
return
# We only perform this check at a fixed frequency, in order to not
# overburden the user with too many lookups if the command is being run
# multiple times.
should_check, hours = ioc_updates.should_check()
if not should_check:
print(f"\t\tIndicators updates checked recently, next automatic check in {int(hours)} hours")
return
try:
ioc_to_update = ioc_updates.check()
except Exception:
@ -36,7 +44,8 @@ def check_updates() -> None:
else:
if ioc_to_update:
print("\t\t[bold]There are updates to your indicators files! Run the `download-iocs` command to update!")
return
else:
print("\t\tYour indicators files seem to be up to date.")
def logo() -> None:

View File

@ -3,12 +3,12 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import yaml
import logging
import os
from datetime import datetime
import requests
import yaml
from packaging import version
from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER
@ -16,6 +16,9 @@ from .version import MVT_VERSION
log = logging.getLogger(__name__)
# In hours.
INDICATORS_CHECK_FREQUENCY = 12
class MVTUpdates:
@ -42,6 +45,24 @@ class IndicatorsUpdates:
self.latest_update_path = os.path.join(MVT_DATA_FOLDER,
"latest_indicators_update")
self.latest_check_path = os.path.join(MVT_DATA_FOLDER,
"latest_indicators_check")
def get_latest_check(self) -> int:
if not os.path.exists(self.latest_check_path):
return 0
with open(self.latest_check_path, "r") as handle:
data = handle.read().strip()
if data:
return int(data)
return 0
def set_latest_check(self) -> None:
timestamp = int(datetime.utcnow().timestamp())
with open(self.latest_check_path, "w") as handle:
handle.write(str(timestamp))
def get_latest_update(self) -> int:
if not os.path.exists(self.latest_update_path):
@ -50,11 +71,9 @@ class IndicatorsUpdates:
with open(self.latest_update_path, "r") as handle:
data = handle.read().strip()
if data:
latest_update = int(data)
else:
latest_update = 0
return int(data)
return latest_update
return 0
def set_latest_update(self) -> None:
timestamp = int(datetime.utcnow().timestamp())
@ -144,7 +163,22 @@ class IndicatorsUpdates:
return latest_commit_ts
def should_check(self) -> (bool, int):
now = datetime.utcnow()
latest_check_ts = self.get_latest_check()
latest_check_dt = datetime.fromtimestamp(latest_check_ts)
diff = now - latest_check_dt
diff_hours = divmod(diff.total_seconds(), 3600)[0]
if diff_hours >= INDICATORS_CHECK_FREQUENCY:
return True, 0
return False, INDICATORS_CHECK_FREQUENCY - diff_hours
def check(self) -> bool:
self.set_latest_check()
latest_update = self.get_latest_update()
latest_commit_ts = self._get_remote_file_latest_commit(self.index_owner,