Refactoring support for VirusTotal lookups, and removed Koodous lookups (ref: #273)

This commit is contained in:
Nex 2022-06-14 15:46:01 +02:00
parent 3a6e4a7001
commit a7d35dba4a
8 changed files with 136 additions and 209 deletions

View File

@ -13,22 +13,14 @@ It might take several minutes to complete.
!!! info
MVT will likely warn you it was unable to download certain installed packages. There is no reason to be alarmed: this is typically expected behavior when MVT attempts to download a system package it has no privileges to access.
Optionally, you can decide to enable lookups of the SHA256 hash of all the extracted APKs on [VirusTotal](https://www.virustotal.com) and/or [Koodous](https://koodous.com). While these lookups do not provide any conclusive assessment on all of the extracted APKs, they might highlight any known malicious ones:
Optionally, you can decide to enable lookups of the SHA256 hash of all the extracted APKs on [VirusTotal](https://www.virustotal.com). While these lookups do not provide any conclusive assessment on all of the extracted APKs, they might highlight any known malicious ones:
```bash
mvt-android download-apks --output /path/to/folder --virustotal
mvt-android download-apks --output /path/to/folder --koodous
```
Or, to launch all available lookups:
In case you have a previous extraction of APKs you want to later check against VirusTotal, you can do so with the following arguments:
```bash
mvt-android download-apks --output /path/to/folder --all-checks
mvt-android download-apks --from-file /path/to/folder/apks.json --virustotal
```
In case you have a previous extraction of APKs you want to later check against VirusTotal and Koodous, you can do so with the following arguments:
```bash
mvt-android download-apks --from-file /path/to/folder/apks.json --all-checks
```

View File

@ -25,9 +25,8 @@ from mvt.common.logo import logo
from mvt.common.module import run_module, save_timeline
from .download_apks import DownloadAPKs
from .lookups.koodous import koodous_lookup
from .lookups.virustotal import virustotal_lookup
from .modules.adb import ADB_MODULES
from .modules.adb.packages import Packages
from .modules.backup import BACKUP_MODULES
from .modules.bugreport import BUGREPORT_MODULES
@ -62,14 +61,12 @@ def version():
@click.option("--all-apks", "-a", is_flag=True,
help="Extract all packages installed on the phone, including system packages")
@click.option("--virustotal", "-v", is_flag=True, help="Check packages on VirusTotal")
@click.option("--koodous", "-k", is_flag=True, help="Check packages on Koodous")
@click.option("--all-checks", "-A", is_flag=True, help="Run all available checks")
@click.option("--output", "-o", type=click.Path(exists=False),
help="Specify a path to a folder where you want to store the APKs")
@click.option("--from-file", "-f", type=click.Path(exists=True),
help="Instead of acquiring from phone, load an existing packages.json file for lookups (mainly for debug purposes)")
@click.pass_context
def download_apks(ctx, all_apks, virustotal, koodous, all_checks, output, from_file, serial):
def download_apks(ctx, all_apks, virustotal, output, from_file, serial):
try:
if from_file:
download = DownloadAPKs.from_json(from_file)
@ -92,16 +89,20 @@ def download_apks(ctx, all_apks, virustotal, koodous, all_checks, output, from_f
download.serial = serial
download.run()
packages = download.packages
packages_to_lookup = []
if all_apks:
packages_to_lookup = download.packages
else:
for package in download.packages:
if not package.get("system", False):
packages_to_lookup.append(package)
if len(packages) == 0:
return
if len(packages_to_lookup) == 0:
return
if virustotal or all_checks:
virustotal_lookup(packages)
if koodous or all_checks:
koodous_lookup(packages)
if virustotal:
m = Packages()
m.check_virustotal(packages_to_lookup)
except KeyboardInterrupt:
print("")
ctx.exit(1)

View File

@ -1,4 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

View File

@ -1,62 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import requests
from rich.console import Console
from rich.progress import track
from rich.table import Table
from rich.text import Text
log = logging.getLogger(__name__)
def koodous_lookup(packages):
# NOTE: This is temporary, until we resolved the issue.
log.error("Unfortunately Koodous lookup is disabled until further notice, due to unresolved issues with the API service.")
return
log.info("Looking up all extracted files on Koodous (www.koodous.com)")
log.info("This might take a while...")
table = Table(title="Koodous Packages Detections")
table.add_column("Package name")
table.add_column("File name")
table.add_column("Trusted")
table.add_column("Detected")
table.add_column("Rating")
total_packages = len(packages)
for i in track(range(total_packages), description=f"Looking up {total_packages} packages..."):
package = packages[i]
for file in package.get("files", []):
url = f"https://api.koodous.com/apks/{file['sha256']}"
res = requests.get(url)
report = res.json()
row = [package["package_name"], file["path"]]
if "package_name" in report:
trusted = "no"
if report["trusted"]:
trusted = Text("yes", "green bold")
detected = "no"
if report["detected"]:
detected = Text("yes", "red bold")
rating = "0"
if int(report["rating"]) < 0:
rating = Text(str(report["rating"]), "red bold")
row.extend([trusted, detected, rating])
else:
row.extend(["n/a", "n/a", "n/a"])
table.add_row(*row)
console = Console()
console.print(table)

View File

@ -1,100 +0,0 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import requests
from rich.console import Console
from rich.progress import track
from rich.table import Table
from rich.text import Text
log = logging.getLogger(__name__)
def get_virustotal_report(hashes):
apikey = "233f22e200ca5822bd91103043ccac138b910db79f29af5616a9afe8b6f215ad"
url = f"https://www.virustotal.com/partners/sysinternals/file-reports?apikey={apikey}"
items = []
for sha256 in hashes:
items.append({
"autostart_location": "",
"autostart_entry": "",
"hash": sha256,
"local_name": "",
"creation_datetime": "",
})
headers = {"User-Agent": "VirusTotal", "Content-Type": "application/json"}
res = requests.post(url, headers=headers, json=items)
if res.status_code == 200:
report = res.json()
return report["data"]
else:
log.error("Unexpected response from VirusTotal: %s", res.status_code)
return None
def virustotal_lookup(packages):
# NOTE: This is temporary, until we resolved the issue.
log.error("Unfortunately VirusTotal lookup is disabled until further notice, due to unresolved issues with the API service.")
return
log.info("Looking up all extracted files on VirusTotal (www.virustotal.com)")
unique_hashes = []
for package in packages:
for file in package.get("files", []):
if file["sha256"] not in unique_hashes:
unique_hashes.append(file["sha256"])
total_unique_hashes = len(unique_hashes)
detections = {}
def virustotal_query(batch):
report = get_virustotal_report(batch)
if not report:
return
for entry in report:
if entry["hash"] not in detections and entry["found"] is True:
detections[entry["hash"]] = entry["detection_ratio"]
batch = []
for i in track(range(total_unique_hashes), description=f"Looking up {total_unique_hashes} files..."):
file_hash = unique_hashes[i]
batch.append(file_hash)
if len(batch) == 25:
virustotal_query(batch)
batch = []
if batch:
virustotal_query(batch)
table = Table(title="VirusTotal Packages Detections")
table.add_column("Package name")
table.add_column("File path")
table.add_column("Detections")
for package in packages:
for file in package.get("files", []):
row = [package["package_name"], file["path"]]
if file["sha256"] in detections:
detection = detections[file["sha256"]]
positives = detection.split("/")[0]
if int(positives) > 0:
row.append(Text(detection, "red bold"))
else:
row.append(detection)
else:
row.append("not found")
table.add_row(*row)
console = Console()
console.print(table)

View File

@ -7,9 +7,12 @@ import logging
import os
import pkg_resources
from rich.console import Console
from rich.progress import track
from rich.table import Table
from rich.text import Text
from mvt.android.lookups.koodous import koodous_lookup
from mvt.android.lookups.virustotal import virustotal_lookup
from mvt.common.virustotal import VTNoKey, VTQuotaExceeded, virustotal_lookup
from .base import AndroidExtraction
@ -119,6 +122,59 @@ class Packages(AndroidExtraction):
result["matched_indicator"] = ioc
self.detected.append(result)
@staticmethod
def check_virustotal(packages):
hashes = []
for package in packages:
for file in package.get("files", []):
if file["sha256"] not in hashes:
hashes.append(file["sha256"])
total_hashes = len(hashes)
detections = {}
for i in track(range(total_hashes), description=f"Looking up {total_hashes} files..."):
try:
results = virustotal_lookup(hashes[i])
except VTNoKey as e:
log.info(e)
return
except VTQuotaExceeded as e:
log.error("Unable to continue: %s", e)
break
if not results:
continue
positives = results["attributes"]["last_analysis_stats"]["malicious"]
total = len(results["attributes"]["last_analysis_results"])
detections[hashes[i]] = f"{positives}/{total}"
table = Table(title="VirusTotal Packages Detections")
table.add_column("Package name")
table.add_column("File path")
table.add_column("Detections")
for package in packages:
for file in package.get("files", []):
row = [package["package_name"], file["path"]]
if file["sha256"] in detections:
detection = detections[file["sha256"]]
positives = detection.split("/")[0]
if int(positives) > 0:
row.append(Text(detection, "red bold"))
else:
row.append(detection)
else:
row.append("not found")
table.add_row(*row)
console = Console()
console.print(table)
@staticmethod
def parse_package_for_details(output):
details = {
@ -262,8 +318,7 @@ class Packages(AndroidExtraction):
result["package_name"], result["installer"], result["timestamp"])
if not self.fast_mode:
virustotal_lookup(packages_to_lookup)
koodous_lookup(packages_to_lookup)
self.check_virustotal(packages_to_lookup)
self.log.info("Extracted at total of %d installed package names",
len(self.results))

45
mvt/common/virustotal.py Normal file
View File

@ -0,0 +1,45 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
import requests
log = logging.getLogger(__name__)
MVT_VT_API_KEY = "MVT_VT_API_KEY"
class VTNoKey(Exception):
pass
class VTQuotaExceeded(Exception):
pass
def virustotal_lookup(file_hash):
if MVT_VT_API_KEY not in os.environ:
raise VTNoKey("No VirusTotal API key provided: to use VirusTotal lookups please provide your API key with `export MVT_VT_API_KEY=<key>`")
headers = {
"User-Agent": "VirusTotal",
"Content-Type": "application/json",
"x-apikey": os.environ[MVT_VT_API_KEY],
}
res = requests.get(f"https://www.virustotal.com/api/v3/files/{file_hash}", headers=headers)
if res.status_code == 200:
report = res.json()
return report["data"]
elif res.status_code == 404:
log.info("Could not find results for file with hash %s", file_hash)
elif res.status_code == 429:
raise VTQuotaExceeded("You have exceeded the quota for your VirusTotal API key")
else:
raise Exception("Unexpected response from VirusTotal: %s", res.status_code)
return None

View File

@ -30,7 +30,7 @@ logging.basicConfig(level="INFO", format=LOG_FORMAT, handlers=[
log = logging.getLogger(__name__)
# Set this environment variable to a password if needed.
PASSWD_ENV = "MVT_IOS_BACKUP_PASSWORD"
MVT_IOS_BACKUP_PASSWORD = "MVT_IOS_BACKUP_PASSWORD"
#==============================================================================
@ -56,7 +56,7 @@ def version():
@click.option("--destination", "-d", required=True,
help="Path to the folder where to store the decrypted backup")
@click.option("--password", "-p", cls=MutuallyExclusiveOption,
help=f"Password to use to decrypt the backup (or, set {PASSWD_ENV} environment variable)",
help=f"Password to use to decrypt the backup (or, set {MVT_IOS_BACKUP_PASSWORD} environment variable)",
mutually_exclusive=["key_file"])
@click.option("--key-file", "-k", cls=MutuallyExclusiveOption,
type=click.Path(exists=True),
@ -68,22 +68,22 @@ def decrypt_backup(ctx, destination, password, key_file, backup_path):
backup = DecryptBackup(backup_path, destination)
if key_file:
if PASSWD_ENV in os.environ:
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring environment variable, using --key-file '%s' instead",
PASSWD_ENV, key_file)
MVT_IOS_BACKUP_PASSWORD, key_file)
backup.decrypt_with_key_file(key_file)
elif password:
log.info("Your password may be visible in the process table because it was supplied on the command line!")
if PASSWD_ENV in os.environ:
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring %s environment variable, using --password argument instead",
PASSWD_ENV)
MVT_IOS_BACKUP_PASSWORD)
backup.decrypt_with_password(password)
elif PASSWD_ENV in os.environ:
log.info("Using password from %s environment variable", PASSWD_ENV)
backup.decrypt_with_password(os.environ[PASSWD_ENV])
elif MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Using password from %s environment variable", MVT_IOS_BACKUP_PASSWORD)
backup.decrypt_with_password(os.environ[MVT_IOS_BACKUP_PASSWORD])
else:
sekrit = Prompt.ask("Enter backup password", password=True)
backup.decrypt_with_password(sekrit)
@ -99,7 +99,7 @@ def decrypt_backup(ctx, destination, password, key_file, backup_path):
#==============================================================================
@cli.command("extract-key", help="Extract decryption key from an iTunes backup")
@click.option("--password", "-p",
help=f"Password to use to decrypt the backup (or, set {PASSWD_ENV} environment variable)")
help=f"Password to use to decrypt the backup (or, set {MVT_IOS_BACKUP_PASSWORD} environment variable)")
@click.option("--key-file", "-k",
help="Key file to be written (if unset, will print to STDOUT)",
required=False,
@ -111,12 +111,12 @@ def extract_key(password, backup_path, key_file):
if password:
log.info("Your password may be visible in the process table because it was supplied on the command line!")
if PASSWD_ENV in os.environ:
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring %s environment variable, using --password argument instead",
PASSWD_ENV)
elif PASSWD_ENV in os.environ:
log.info("Using password from %s environment variable", PASSWD_ENV)
password = os.environ[PASSWD_ENV]
MVT_IOS_BACKUP_PASSWORD)
elif MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Using password from %s environment variable", MVT_IOS_BACKUP_PASSWORD)
password = os.environ[MVT_IOS_BACKUP_PASSWORD]
else:
password = Prompt.ask("Enter backup password", password=True)