From bdaaf1543458f9a1cef2ed136042f08a65b2c093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Thu, 17 Feb 2022 18:17:38 +0100 Subject: [PATCH] Add initial implementation of SMS extraction using ADB --- mvt/android/modules/adb/sms.py | 94 ++++++++++++++++++++++++++++------ 1 file changed, 77 insertions(+), 17 deletions(-) diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index 7ba0233..3614144 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -6,8 +6,15 @@ import logging import os import sqlite3 +import tarfile +import io +import zlib +import base64 +import json +import datetime from mvt.common.utils import check_for_links, convert_timestamp_to_iso +from mvt.common.module import InsufficientPrivileges from .base import AndroidExtraction @@ -16,11 +23,11 @@ log = logging.getLogger(__name__) SMS_BUGLE_PATH = "data/data/com.google.android.apps.messaging/databases/bugle_db" SMS_BUGLE_QUERY = """ SELECT - ppl.normalized_destination AS number, + ppl.normalized_destination AS address, p.timestamp AS timestamp, CASE WHEN m.sender_id IN (SELECT _id FROM participants WHERE contact_id=-1) -THEN 2 ELSE 1 END incoming, p.text AS text +THEN 2 ELSE 1 END incoming, p.text AS body FROM messages m, conversations c, parts p, participants ppl, conversation_participants cp WHERE (m.conversation_id = c._id) @@ -32,10 +39,10 @@ WHERE (m.conversation_id = c._id) SMS_MMSSMS_PATH = "data/data/com.android.providers.telephony/databases/mmssms.db" SMS_MMSMS_QUERY = """ SELECT - address AS number, + address AS address, date_sent AS timestamp, type as incoming, - body AS text + body AS body FROM sms; """ @@ -50,12 +57,12 @@ class SMS(AndroidExtraction): log=log, results=results) def serialize(self, record): - text = record["text"].replace("\n", "\\n") + body = record["body"].replace("\n", "\\n") return { "timestamp": record["isodate"], "module": self.__class__.__name__, "event": f"sms_{record['direction']}", - "data": f"{record['number']}: \"{text}\"" + "data": f"{record['address']}: \"{body}\"" } def check_indicators(self): @@ -63,10 +70,10 @@ class SMS(AndroidExtraction): return for message in self.results: - if "text" not in message: + if "body" not in message: continue - message_links = check_for_links(message["text"]) + message_links = check_for_links(message["body"]) if self.indicators.check_domains(message_links): self.detected.append(message) @@ -96,7 +103,7 @@ class SMS(AndroidExtraction): # If we find links in the messages or if they are empty we add # them to the list of results. - if check_for_links(message["text"]) or message["text"].strip() == "": + if check_for_links(message["body"]) or message["body"].strip() == "": self.results.append(message) cur.close() @@ -104,12 +111,65 @@ class SMS(AndroidExtraction): log.info("Extracted a total of %d SMS messages containing links", len(self.results)) + def _extract_sms_from_backup_tar(self, tar_data): + # Extract data from generated tar file + tar_bytes = io.BytesIO(tar_data) + tar = tarfile.open(fileobj=tar_bytes, mode='r') + for member in tar.getmembers(): + if not member.name.endswith("_sms_backup"): + continue + + self.log.debug("Extracting SMS messages from backup file %s", member.name) + sms_part_zlib = zlib.decompress(tar.extractfile(member).read()) + json_data = json.loads(sms_part_zlib) + + # TODO: Copied from SMS module. Refactor to avoid duplication + for message in json_data: + utc_timestamp = datetime.datetime.utcfromtimestamp(int(message["date"]) / 1000) + message["isodate"] = convert_timestamp_to_iso(utc_timestamp) + message["direction"] = ("sent" if int(message["date_sent"]) else "received") + + message_links = check_for_links(message["body"]) + if message_links or message["body"].strip() == "": + self.results.append(message) + + log.info("Extracted a total of %d SMS messages containing links", len(self.results)) + + def _extract_sms_adb(self): + """Use the Android backup command to extract SMS data from the native SMS app + + It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression + algorithim. This module only supports an unencrypted ADB backup. + """ + Run ADB command to create a backup of SMS app + self.log.warning("Please check phone and accept Android backup prompt. Do not set an encryption password. \a") + + # TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... + backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress com.android.providers.telephony | base64") + backup_output = base64.b64decode(backup_output_b64) + if not backup_output.startswith(b"ANDROID BACKUP"): + self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") + return + + [magic_header, version, is_compressed, encryption, tar_data] = backup_output.split(b"\n", 4) + if encryption != b"none" or int(is_compressed): + self.log.error("The backup is encrypted or compressed and cannot be parsed. " + "[version: %s, encryption: %s, compression: %s]", version, encryption, is_compressed) + return + + self._extract_sms_from_backup_tar(tar_data) + def run(self): - if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))): - self.SMS_DB_TYPE = 1 - self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db) - elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))): - self.SMS_DB_TYPE = 2 - self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db) - else: - self.log.error("No SMS database found") + try: + if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))): + self.SMS_DB_TYPE = 1 + self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db) + elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))): + self.SMS_DB_TYPE = 2 + self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db) + return + except InsufficientPrivileges: + pass + + self.log.warn("No SMS database found. Trying extraction of SMS data using Android backup feature.") + self._extract_sms_adb()