mirror of https://github.com/mvt-project/mvt.git
Update WhatsApp module to search for links in attachments
This commit is contained in:
parent
3f3261511a
commit
e10f1767e6
|
@ -32,11 +32,14 @@ class Whatsapp(IOSExtraction):
|
|||
|
||||
def serialize(self, record):
|
||||
text = record.get("ZTEXT", "").replace("\n", "\\n")
|
||||
links_text = ""
|
||||
if record["links"]:
|
||||
links_text = " - Embedded links: " + ", ".join(record["links"])
|
||||
return {
|
||||
"timestamp": record.get("isodate"),
|
||||
"module": self.__class__.__name__,
|
||||
"event": "message",
|
||||
"data": f"{text} from {record.get('ZFROMJID', 'Unknown')}",
|
||||
"data": f"\'{text}\' from {record.get('ZFROMJID', 'Unknown')}{links_text}",
|
||||
}
|
||||
|
||||
def check_indicators(self):
|
||||
|
@ -44,8 +47,7 @@ class Whatsapp(IOSExtraction):
|
|||
return
|
||||
|
||||
for message in self.results:
|
||||
message_links = check_for_links(message.get("ZTEXT", ""))
|
||||
if self.indicators.check_domains(message_links):
|
||||
if self.indicators.check_domains(message["links"]):
|
||||
self.detected.append(message)
|
||||
|
||||
def run(self):
|
||||
|
@ -55,26 +57,49 @@ class Whatsapp(IOSExtraction):
|
|||
|
||||
conn = sqlite3.connect(self.file_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT * FROM ZWAMESSAGE;")
|
||||
|
||||
# Query all messages and join tables which can contain media attachments and links
|
||||
cur.execute("""
|
||||
SELECT
|
||||
ZWAMESSAGE.*,
|
||||
ZWAMEDIAITEM.ZAUTHORNAME,
|
||||
ZWAMEDIAITEM.ZMEDIAURL,
|
||||
ZWAMESSAGEDATAITEM.ZCONTENT1,
|
||||
ZWAMESSAGEDATAITEM.ZCONTENT2,
|
||||
ZWAMESSAGEDATAITEM.ZMATCHEDTEXT,
|
||||
ZWAMESSAGEDATAITEM.ZSUMMARY,
|
||||
ZWAMESSAGEDATAITEM.ZTITLE
|
||||
FROM ZWAMESSAGE
|
||||
LEFT JOIN ZWAMEDIAITEM ON ZWAMEDIAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK
|
||||
LEFT JOIN ZWAMESSAGEDATAITEM ON ZWAMESSAGEDATAITEM.ZMESSAGE = ZWAMESSAGE.Z_PK;
|
||||
""")
|
||||
names = [description[0] for description in cur.description]
|
||||
|
||||
for message in cur:
|
||||
new_message = {}
|
||||
for index, value in enumerate(message):
|
||||
new_message[names[index]] = value
|
||||
for message_row in cur:
|
||||
message = {}
|
||||
for index, value in enumerate(message_row):
|
||||
message[names[index]] = value
|
||||
|
||||
if not new_message.get("ZTEXT", None):
|
||||
continue
|
||||
message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(message.get("ZMESSAGEDATE")))
|
||||
message["ZTEXT"] = message["ZTEXT"] if message["ZTEXT"] else ""
|
||||
|
||||
# We convert Mac's silly timestamp again.
|
||||
new_message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(new_message.get("ZMESSAGEDATE")))
|
||||
# Extract links from the WhatsApp message. URLs can be stored in multiple fields/columns. Check each of them!
|
||||
message_links = []
|
||||
fields_with_links = ["ZTEXT", "ZMATCHEDTEXT", "ZMEDIAURL", "ZCONTENT1", "ZCONTENT2"]
|
||||
for field in fields_with_links:
|
||||
if message.get(field):
|
||||
message_links.extend(check_for_links(message.get(field, "")))
|
||||
|
||||
# Extract links from the WhatsApp message.
|
||||
message_links = check_for_links(new_message["ZTEXT"])
|
||||
# Remove WhatsApp internal media URLs
|
||||
filtered_links = []
|
||||
for link in message_links:
|
||||
if not (link.startswith("https://mmg-fna.whatsapp.net/") or link.startswith("https://mmg.whatsapp.net/")):
|
||||
filtered_links.append(link)
|
||||
|
||||
# If we find messages, or if there's an empty message we add it to the list.
|
||||
if new_message["ZTEXT"] and (message_links or new_message["ZTEXT"].strip() == ""):
|
||||
self.results.append(new_message)
|
||||
# If we find messages with links, or if there's an empty message we add it to the results list.
|
||||
if filtered_links or (message.get("ZTEXT") or "").strip() == "":
|
||||
message["links"] = list(set(filtered_links))
|
||||
self.results.append(message)
|
||||
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
|
Loading…
Reference in New Issue