Catch and recover malformed SMS database

This commit is contained in:
tek 2021-12-03 17:46:41 +01:00
parent 356bddc3af
commit 7dd5fe7831
2 changed files with 43 additions and 23 deletions

View File

@ -26,27 +26,28 @@ class IOSExtraction(MVTModule):
self.is_fs_dump = False
self.is_sysdiagnose = False
def _recover_sqlite_db_if_needed(self, file_path):
def _recover_sqlite_db_if_needed(self, file_path, forced=False):
"""Tries to recover a malformed database by running a .clone command.
:param file_path: Path to the malformed database file.
"""
# TODO: Find a better solution.
conn = sqlite3.connect(file_path)
cur = conn.cursor()
if not forced:
conn = sqlite3.connect(file_path)
cur = conn.cursor()
try:
recover = False
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
except sqlite3.DatabaseError as e:
if "database disk image is malformed" in str(e):
recover = True
finally:
conn.close()
try:
recover = False
cur.execute("SELECT name FROM sqlite_master WHERE type='table';")
except sqlite3.DatabaseError as e:
if "database disk image is malformed" in str(e):
recover = True
finally:
conn.close()
if not recover:
return
if not recover:
return
self.log.info("Database at path %s is malformed. Trying to recover...", file_path)

View File

@ -51,18 +51,37 @@ class SMS(IOSExtraction):
root_paths=SMS_ROOT_PATHS)
self.log.info("Found SMS database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute("""
SELECT
message.*,
handle.id as "phone_number"
FROM message, handle
WHERE handle.rowid = message.handle_id;
""")
try:
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute("""
SELECT
message.*,
handle.id as "phone_number"
FROM message, handle
WHERE handle.rowid = message.handle_id;
""")
# Force the query early to catch database issues
items = list(cur)
except sqlite3.DatabaseError as e:
conn.close()
if "database disk image is malformed" in str(e):
self._recover_sqlite_db_if_needed(self.file_path, forced=True)
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute("""
SELECT
message.*,
handle.id as "phone_number"
FROM message, handle
WHERE handle.rowid = message.handle_id;
""")
items = list(cur)
else:
raise e
names = [description[0] for description in cur.description]
for item in cur:
for item in items:
message = {}
for index, value in enumerate(item):
# We base64 escape some of the attributes that could contain