Add `CustomJSONEncoder` to handle bytes types (#414)

Adds a custom JSON encoder class to fix serialisation issues where modules included bytes types containing non-utf8 bytes, which can't be serialised to JSON.

---------

Co-authored-by: Rory Flynn <rory.flynn@amnesty.org>
This commit is contained in:
Rory Flynn 2023-11-15 11:40:24 +01:00 committed by GitHub
parent 815678dff7
commit 4b4cad46ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 56 additions and 5 deletions

View File

@ -4,14 +4,13 @@
# https://license.mvt.re/1.1/
import csv
import json
import logging
import os
import re
from typing import Any, Dict, List, Optional, Union
import simplejson as json
from .utils import exec_or_profile
from .utils import CustomJSONEncoder, exec_or_profile
class DatabaseNotFoundError(Exception):
@ -103,7 +102,7 @@ class MVTModule:
results_json_path = os.path.join(self.results_path, results_file_name)
with open(results_json_path, "w", encoding="utf-8") as handle:
try:
json.dump(self.results, handle, indent=4, default=str)
json.dump(self.results, handle, indent=4, cls=CustomJSONEncoder)
except Exception as exc:
self.log.error(
"Unable to store results of module %s to file %s: %s",
@ -116,7 +115,7 @@ class MVTModule:
detected_file_name = f"{name}_detected.json"
detected_json_path = os.path.join(self.results_path, detected_file_name)
with open(detected_json_path, "w", encoding="utf-8") as handle:
json.dump(self.detected, handle, indent=4, default=str)
json.dump(self.detected, handle, indent=4, cls=CustomJSONEncoder)
def serialize(self, record: dict) -> Union[dict, list, None]:
raise NotImplementedError

View File

@ -6,6 +6,7 @@
import cProfile
import datetime
import hashlib
import json
import logging
import os
import re
@ -14,6 +15,28 @@ from typing import Any, Iterator, Union
from rich.logging import RichHandler
class CustomJSONEncoder(json.JSONEncoder):
"""
Custom JSON encoder to handle non-standard types.
Some modules are storing non-UTF-8 bytes in their results dictionaries.
This causes exceptions when the results are being encoded as JSON.
Of course this means that when MVT is run via `check-iocs` with existing
results, the encoded version will be loaded back into the dictionary.
Modules should ensure they encode anything that needs to be compared
against an indicator in a JSON-friendly type.
"""
def default(self, o):
if isinstance(o, bytes):
# Decode as utf-8, replace any invalid UTF-8 bytes with escaped hex
return o.decode("utf-8", errors="backslashreplace")
# For all other types try to use the string representation.
return str(o)
def convert_chrometime_to_datetime(timestamp: int) -> datetime.datetime:
"""Converts Chrome timestamp to a datetime.

View File

@ -3,10 +3,13 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import json
import logging
import os
from datetime import datetime
from mvt.common.utils import (
CustomJSONEncoder,
convert_datetime_to_iso,
convert_mactime_to_iso,
convert_unix_to_iso,
@ -64,3 +67,29 @@ class TestHashes:
hashes[1]["sha256"]
== "cfae0e04ef139b5a2ae1e2b3d400ce67eb98e67ff66f56ba2a580fe41bc120d0"
)
class TestCustomJSONEncoder:
def test__normal_input(self):
assert json.dumps({"a": "b"}, cls=CustomJSONEncoder) == '{"a": "b"}'
def test__datetime_object(self):
assert (
json.dumps(
{"timestamp": datetime(2023, 11, 13, 12, 21, 49, 727467)},
cls=CustomJSONEncoder,
)
== '{"timestamp": "2023-11-13 12:21:49.727467"}'
)
def test__bytes_non_utf_8(self):
assert (
json.dumps({"identifier": b"\xa8\xa9"}, cls=CustomJSONEncoder)
== """{"identifier": "\\\\xa8\\\\xa9"}"""
)
def test__bytes_valid_utf_8(self):
assert (
json.dumps({"name": "".encode()}, cls=CustomJSONEncoder)
== '{"name": "\\u5bb6"}'
)