Adding more type hints

This commit is contained in:
Nex 2022-06-22 16:53:29 +02:00
parent 3899dce353
commit 7024909e05
12 changed files with 51 additions and 43 deletions

View File

@ -12,6 +12,7 @@ import string
import sys
import tempfile
import time
from typing import Callable
from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb
from adb_shell.auth.keygen import keygen, write_public_keyfile
@ -44,7 +45,7 @@ class AndroidExtraction(MVTModule):
self.serial = None
@staticmethod
def _adb_check_keys():
def _adb_check_keys() -> None:
"""Make sure Android adb keys exist."""
if not os.path.isdir(os.path.dirname(ADB_KEY_PATH)):
os.makedirs(os.path.dirname(ADB_KEY_PATH))
@ -55,7 +56,7 @@ class AndroidExtraction(MVTModule):
if not os.path.exists(ADB_PUB_KEY_PATH):
write_public_keyfile(ADB_KEY_PATH, ADB_PUB_KEY_PATH)
def _adb_connect(self):
def _adb_connect(self) -> None:
"""Connect to the device over adb."""
self._adb_check_keys()
@ -104,17 +105,17 @@ class AndroidExtraction(MVTModule):
else:
break
def _adb_disconnect(self):
def _adb_disconnect(self) -> None:
"""Close adb connection to the device."""
self.device.close()
def _adb_reconnect(self):
def _adb_reconnect(self) -> None:
"""Reconnect to device using adb."""
log.info("Reconnecting ...")
self._adb_disconnect()
self._adb_connect()
def _adb_command(self, command):
def _adb_command(self, command: str) -> str:
"""Execute an adb shell command.
:param command: Shell command to execute
@ -123,7 +124,7 @@ class AndroidExtraction(MVTModule):
"""
return self.device.shell(command, read_timeout_s=200.0)
def _adb_check_if_root(self):
def _adb_check_if_root(self) -> bool:
"""Check if we have a `su` binary on the Android device.
@ -132,7 +133,7 @@ class AndroidExtraction(MVTModule):
"""
return bool(self._adb_command("command -v su"))
def _adb_root_or_die(self):
def _adb_root_or_die(self) -> None:
"""Check if we have a `su` binary, otherwise raise an Exception."""
if not self._adb_check_if_root():
raise InsufficientPrivileges("This module is optionally available in case the device is already rooted. Do NOT root your own device!")
@ -146,7 +147,7 @@ class AndroidExtraction(MVTModule):
"""
return self._adb_command(f"su -c {command}")
def _adb_check_file_exists(self, file):
def _adb_check_file_exists(self, file: str) -> bool:
"""Verify that a file exists.
:param file: Path of the file
@ -163,7 +164,9 @@ class AndroidExtraction(MVTModule):
return bool(self._adb_command_as_root(f"[ ! -f {file} ] || echo 1"))
def _adb_download(self, remote_path, local_path, progress_callback=None, retry_root=True):
def _adb_download(self, remote_path: str, local_path: str,
progress_callback: Callable = None,
retry_root: bool = True) -> None:
"""Download a file form the device.
:param remote_path: Path to download from the device
@ -180,7 +183,8 @@ class AndroidExtraction(MVTModule):
else:
raise Exception(f"Unable to download file {remote_path}: {e}")
def _adb_download_root(self, remote_path, local_path, progress_callback=None):
def _adb_download_root(self, remote_path: str, local_path: str,
progress_callback: Callable = None) -> None:
try:
# Check if we have root, if not raise an Exception.
self._adb_root_or_die()
@ -208,7 +212,8 @@ class AndroidExtraction(MVTModule):
except AdbCommandFailureException as e:
raise Exception(f"Unable to download file {remote_path}: {e}")
def _adb_process_file(self, remote_path, process_routine):
def _adb_process_file(self, remote_path: str,
process_routine: Callable) -> None:
"""Download a local copy of a file which is only accessible as root.
This is a wrapper around process_routine.
@ -248,7 +253,7 @@ class AndroidExtraction(MVTModule):
# Disconnect from the device.
self._adb_disconnect()
def _generate_backup(self, package_name):
def _generate_backup(self, package_name: str) -> bytes:
self.log.warning("Please check phone and accept Android backup prompt. You may need to set a backup password. \a")
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport...

View File

@ -43,7 +43,7 @@ class ChromeHistory(AndroidExtraction):
if self.indicators.check_domain(result["url"]):
self.detected.append(result)
def _parse_db(self, db_path):
def _parse_db(self, db_path: str) -> None:
"""Parse a Chrome History database file.
:param db_path: Path to the History database to process.

View File

@ -25,7 +25,7 @@ class Files(AndroidExtraction):
log=log, results=results)
self.full_find = False
def find_files(self, folder):
def find_files(self, folder: str) -> None:
if self.full_find:
output = self._adb_command(f"find '{folder}' -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
@ -56,7 +56,7 @@ class Files(AndroidExtraction):
"data": record["path"],
}
def check_suspicious(self):
def check_suspicious(self) -> None:
"""Check for files with suspicious permissions"""
for result in sorted(self.results, key=lambda item: item["path"]):
if result.get("is_suid"):

View File

@ -122,7 +122,7 @@ class Packages(AndroidExtraction):
self.detected.append(result)
@staticmethod
def check_virustotal(packages):
def check_virustotal(packages: list) -> None:
hashes = []
for package in packages:
for file in package.get("files", []):
@ -175,7 +175,7 @@ class Packages(AndroidExtraction):
console.print(table)
@staticmethod
def parse_package_for_details(output):
def parse_package_for_details(output: str) -> dict:
details = {
"uid": "",
"version_name": "",
@ -214,7 +214,7 @@ class Packages(AndroidExtraction):
return details
def _get_files_for_package(self, package_name):
def _get_files_for_package(self, package_name: str) -> list:
output = self._adb_command(f"pm path {package_name}")
output = output.strip().replace("package:", "")
if not output:

View File

@ -75,7 +75,7 @@ class SMS(AndroidExtraction):
if self.indicators.check_domains(message_links):
self.detected.append(message)
def _parse_db(self, db_path):
def _parse_db(self, db_path: str) -> None:
"""Parse an Android bugle_db SMS database file.
:param db_path: Path to the Android SMS database file to process
@ -109,7 +109,7 @@ class SMS(AndroidExtraction):
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
def _extract_sms_adb(self):
def _extract_sms_adb(self) -> None:
"""Use the Android backup command to extract SMS data from the native SMS app
It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression

View File

@ -48,7 +48,7 @@ class Whatsapp(AndroidExtraction):
if self.indicators.check_domains(message_links):
self.detected.append(message)
def _parse_db(self, db_path):
def _parse_db(self, db_path: str) -> None:
"""Parse an Android msgstore.db WhatsApp database file.
:param db_path: Path to the Android WhatsApp database file to process

View File

@ -5,6 +5,7 @@
import fnmatch
import os
from tarfile import TarFile
from mvt.common.module import MVTModule
@ -13,14 +14,14 @@ class BackupExtraction(MVTModule):
"""This class provides a base for all backup extractios modules"""
ab = None
def from_folder(self, backup_path, files):
def from_folder(self, backup_path: str, files: list) -> None:
"""
Get all the files and list them
"""
self.backup_path = backup_path
self.files = files
def from_ab(self, file_path, tar, files):
def from_ab(self, file_path: str, tar: TarFile, files: list) -> None:
"""
Extract the files
"""
@ -28,10 +29,10 @@ class BackupExtraction(MVTModule):
self.tar = tar
self.files = files
def _get_files_by_pattern(self, pattern):
def _get_files_by_pattern(self, pattern: str) -> list:
return fnmatch.filter(self.files, pattern)
def _get_file_content(self, file_path):
def _get_file_content(self, file_path: str) -> bytes:
if self.ab:
try:
member = self.tar.getmember(file_path)
@ -43,4 +44,5 @@ class BackupExtraction(MVTModule):
data = handle.read()
handle.close()
return data

View File

@ -6,6 +6,7 @@
import fnmatch
import logging
import os
from zipfile import ZipFile
from mvt.common.module import MVTModule
@ -17,15 +18,15 @@ class BugReportModule(MVTModule):
zip_archive = None
def from_folder(self, extract_path, extract_files):
def from_folder(self, extract_path: str, extract_files: str) -> None:
self.extract_path = extract_path
self.extract_files = extract_files
def from_zip(self, zip_archive, zip_files):
def from_zip(self, zip_archive: ZipFile, zip_files: list) -> None:
self.zip_archive = zip_archive
self.zip_files = zip_files
def _get_files_by_pattern(self, pattern):
def _get_files_by_pattern(self, pattern: str) -> list:
file_names = []
if self.zip_archive:
for zip_file in self.zip_files:
@ -35,13 +36,13 @@ class BugReportModule(MVTModule):
return fnmatch.filter(file_names, pattern)
def _get_files_by_patterns(self, patterns):
def _get_files_by_patterns(self, patterns: list) -> list:
for pattern in patterns:
matches = self._get_files_by_pattern(pattern)
if matches:
return matches
def _get_file_content(self, file_path):
def _get_file_content(self, file_path: str) -> bytes:
if self.zip_archive:
handle = self.zip_archive.open(file_path)
else:
@ -52,7 +53,7 @@ class BugReportModule(MVTModule):
return data
def _get_dumpstate_file(self):
def _get_dumpstate_file(self) -> bytes:
main = self._get_files_by_pattern("main_entry.txt")
if main:
main_content = self._get_file_content(main[0])

View File

@ -52,7 +52,7 @@ class Packages(BugReportModule):
continue
@staticmethod
def parse_package_for_details(output):
def parse_package_for_details(output: str) -> dict:
details = {
"uid": "",
"version_name": "",
@ -103,7 +103,7 @@ class Packages(BugReportModule):
return details
def parse_packages_list(self, output):
def parse_packages_list(self, output: str) -> list:
pkg_rxp = re.compile(r" Package \[(.+?)\].*")
results = []

View File

@ -67,7 +67,7 @@ class MVTModule(object):
len(results), json_path)
return cls(results=results, log=log)
def get_slug(self):
def get_slug(self) -> str:
"""Use the module's class name to retrieve a slug"""
if self.slug:
return self.slug
@ -148,7 +148,7 @@ class MVTModule(object):
raise NotImplementedError
def run_module(module: Callable):
def run_module(module: Callable) -> None:
module.log.info("Running module %s...", module.__class__.__name__)
try:
@ -187,7 +187,7 @@ def run_module(module: Callable):
module.save_to_json()
def save_timeline(timeline: list, timeline_path: str):
def save_timeline(timeline: list, timeline_path: str) -> None:
"""Save the timeline in a csv file.
:param timeline: List of records to order and store

View File

@ -9,7 +9,7 @@ from packaging import version
from .version import MVT_VERSION
def check_for_updates() -> None:
def check_for_updates() -> str:
res = requests.get("https://pypi.org/pypi/mvt/json")
data = res.json()
latest_version = data.get("info", {}).get("version", "")
@ -17,4 +17,4 @@ def check_for_updates() -> None:
if version.parse(latest_version) > version.parse(MVT_VERSION):
return latest_version
return None
return ""

View File

@ -37,7 +37,7 @@ def convert_mactime_to_unix(timestamp, from_2001: bool = True):
return None
def convert_chrometime_to_unix(timestamp):
def convert_chrometime_to_unix(timestamp: int) -> int:
"""Converts Chrome timestamp to a Unix timestamp.
:param timestamp: Chrome timestamp as int.
@ -50,7 +50,7 @@ def convert_chrometime_to_unix(timestamp):
return epoch_start + delta
def convert_timestamp_to_iso(timestamp):
def convert_timestamp_to_iso(timestamp: str) -> str:
"""Converts Unix timestamp to ISO string.
:param timestamp: Unix timestamp.
@ -65,7 +65,7 @@ def convert_timestamp_to_iso(timestamp):
return None
def check_for_links(text):
def check_for_links(text: str) -> list:
"""Checks if a given text contains HTTP links.
:param text: Any provided text.
@ -76,7 +76,7 @@ def check_for_links(text):
return re.findall(r"(?P<url>https?://[^\s]+)", text, re.IGNORECASE)
def get_sha256_from_file_path(file_path):
def get_sha256_from_file_path(file_path: str) -> str:
"""Calculate the SHA256 hash of a file from a file path.
:param file_path: Path to the file to hash
@ -93,7 +93,7 @@ def get_sha256_from_file_path(file_path):
# Note: taken from here:
# https://stackoverflow.com/questions/57014259/json-dumps-on-dictionary-with-bytes-for-keys
def keys_bytes_to_string(obj):
def keys_bytes_to_string(obj) -> str:
"""Convert object keys from bytes to string.
:param obj: Object to convert from bytes to string.