Add initial implementation of SMS extraction using ADB

This commit is contained in:
Donncha Ó Cearbhaill 2022-02-17 18:17:38 +01:00
parent 699824d9ff
commit bdaaf15434
1 changed files with 77 additions and 17 deletions

View File

@ -6,8 +6,15 @@
import logging
import os
import sqlite3
import tarfile
import io
import zlib
import base64
import json
import datetime
from mvt.common.utils import check_for_links, convert_timestamp_to_iso
from mvt.common.module import InsufficientPrivileges
from .base import AndroidExtraction
@ -16,11 +23,11 @@ log = logging.getLogger(__name__)
SMS_BUGLE_PATH = "data/data/com.google.android.apps.messaging/databases/bugle_db"
SMS_BUGLE_QUERY = """
SELECT
ppl.normalized_destination AS number,
ppl.normalized_destination AS address,
p.timestamp AS timestamp,
CASE WHEN m.sender_id IN
(SELECT _id FROM participants WHERE contact_id=-1)
THEN 2 ELSE 1 END incoming, p.text AS text
THEN 2 ELSE 1 END incoming, p.text AS body
FROM messages m, conversations c, parts p,
participants ppl, conversation_participants cp
WHERE (m.conversation_id = c._id)
@ -32,10 +39,10 @@ WHERE (m.conversation_id = c._id)
SMS_MMSSMS_PATH = "data/data/com.android.providers.telephony/databases/mmssms.db"
SMS_MMSMS_QUERY = """
SELECT
address AS number,
address AS address,
date_sent AS timestamp,
type as incoming,
body AS text
body AS body
FROM sms;
"""
@ -50,12 +57,12 @@ class SMS(AndroidExtraction):
log=log, results=results)
def serialize(self, record):
text = record["text"].replace("\n", "\\n")
body = record["body"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": f"sms_{record['direction']}",
"data": f"{record['number']}: \"{text}\""
"data": f"{record['address']}: \"{body}\""
}
def check_indicators(self):
@ -63,10 +70,10 @@ class SMS(AndroidExtraction):
return
for message in self.results:
if "text" not in message:
if "body" not in message:
continue
message_links = check_for_links(message["text"])
message_links = check_for_links(message["body"])
if self.indicators.check_domains(message_links):
self.detected.append(message)
@ -96,7 +103,7 @@ class SMS(AndroidExtraction):
# If we find links in the messages or if they are empty we add
# them to the list of results.
if check_for_links(message["text"]) or message["text"].strip() == "":
if check_for_links(message["body"]) or message["body"].strip() == "":
self.results.append(message)
cur.close()
@ -104,12 +111,65 @@ class SMS(AndroidExtraction):
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
def _extract_sms_from_backup_tar(self, tar_data):
# Extract data from generated tar file
tar_bytes = io.BytesIO(tar_data)
tar = tarfile.open(fileobj=tar_bytes, mode='r')
for member in tar.getmembers():
if not member.name.endswith("_sms_backup"):
continue
self.log.debug("Extracting SMS messages from backup file %s", member.name)
sms_part_zlib = zlib.decompress(tar.extractfile(member).read())
json_data = json.loads(sms_part_zlib)
# TODO: Copied from SMS module. Refactor to avoid duplication
for message in json_data:
utc_timestamp = datetime.datetime.utcfromtimestamp(int(message["date"]) / 1000)
message["isodate"] = convert_timestamp_to_iso(utc_timestamp)
message["direction"] = ("sent" if int(message["date_sent"]) else "received")
message_links = check_for_links(message["body"])
if message_links or message["body"].strip() == "":
self.results.append(message)
log.info("Extracted a total of %d SMS messages containing links", len(self.results))
def _extract_sms_adb(self):
"""Use the Android backup command to extract SMS data from the native SMS app
It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression
algorithim. This module only supports an unencrypted ADB backup.
"""
Run ADB command to create a backup of SMS app
self.log.warning("Please check phone and accept Android backup prompt. Do not set an encryption password. \a")
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport...
backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress com.android.providers.telephony | base64")
backup_output = base64.b64decode(backup_output_b64)
if not backup_output.startswith(b"ANDROID BACKUP"):
self.log.error("Extracting SMS via Android backup failed. No valid backup data found.")
return
[magic_header, version, is_compressed, encryption, tar_data] = backup_output.split(b"\n", 4)
if encryption != b"none" or int(is_compressed):
self.log.error("The backup is encrypted or compressed and cannot be parsed. "
"[version: %s, encryption: %s, compression: %s]", version, encryption, is_compressed)
return
self._extract_sms_from_backup_tar(tar_data)
def run(self):
if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))):
self.SMS_DB_TYPE = 1
self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db)
elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))):
self.SMS_DB_TYPE = 2
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db)
else:
self.log.error("No SMS database found")
try:
if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))):
self.SMS_DB_TYPE = 1
self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db)
elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))):
self.SMS_DB_TYPE = 2
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db)
return
except InsufficientPrivileges:
pass
self.log.warn("No SMS database found. Trying extraction of SMS data using Android backup feature.")
self._extract_sms_adb()