mirror of
https://github.com/mvt-project/mvt.git
synced 2024-07-01 16:39:03 +00:00
Add a CustomJSONEncoder to handle binary types
This commit is contained in:
parent
815678dff7
commit
34a04ec4c2
27
mvt/common/encoding.py
Normal file
27
mvt/common/encoding.py
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class CustomJSONEncoder(json.JSONEncoder):
|
||||||
|
"""
|
||||||
|
Custom JSON encoder to handle non-standard types.
|
||||||
|
|
||||||
|
Some modules are storing non-UTF-8 bytes in their results dictionaries.
|
||||||
|
This causes exceptions when the results are being encoded as JSON.
|
||||||
|
|
||||||
|
Of course this means that when MVT is run via `check-iocs` with existing
|
||||||
|
results, the encoded version will be loaded back into the dictionary.
|
||||||
|
Modules should ensure they encode anything that needs to be compared
|
||||||
|
against an indicator in a JSON-friendly type.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def default(self, o):
|
||||||
|
if isinstance(o, bytes):
|
||||||
|
# If it's utf-8, try and use that first
|
||||||
|
try:
|
||||||
|
return o.decode("utf-8")
|
||||||
|
except UnicodeError:
|
||||||
|
# Otherwise use a hex representation for any byte type
|
||||||
|
return "0x" + o.hex()
|
||||||
|
|
||||||
|
# For all other types try to use the string representation.
|
||||||
|
return str(o)
|
|
@ -4,13 +4,13 @@
|
||||||
# https://license.mvt.re/1.1/
|
# https://license.mvt.re/1.1/
|
||||||
|
|
||||||
import csv
|
import csv
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from typing import Any, Dict, List, Optional, Union
|
from typing import Any, Dict, List, Optional, Union
|
||||||
|
|
||||||
import simplejson as json
|
from .encoding import CustomJSONEncoder
|
||||||
|
|
||||||
from .utils import exec_or_profile
|
from .utils import exec_or_profile
|
||||||
|
|
||||||
|
|
||||||
|
@ -103,7 +103,7 @@ class MVTModule:
|
||||||
results_json_path = os.path.join(self.results_path, results_file_name)
|
results_json_path = os.path.join(self.results_path, results_file_name)
|
||||||
with open(results_json_path, "w", encoding="utf-8") as handle:
|
with open(results_json_path, "w", encoding="utf-8") as handle:
|
||||||
try:
|
try:
|
||||||
json.dump(self.results, handle, indent=4, default=str)
|
json.dump(self.results, handle, indent=4, cls=CustomJSONEncoder)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
self.log.error(
|
self.log.error(
|
||||||
"Unable to store results of module %s to file %s: %s",
|
"Unable to store results of module %s to file %s: %s",
|
||||||
|
@ -116,7 +116,7 @@ class MVTModule:
|
||||||
detected_file_name = f"{name}_detected.json"
|
detected_file_name = f"{name}_detected.json"
|
||||||
detected_json_path = os.path.join(self.results_path, detected_file_name)
|
detected_json_path = os.path.join(self.results_path, detected_file_name)
|
||||||
with open(detected_json_path, "w", encoding="utf-8") as handle:
|
with open(detected_json_path, "w", encoding="utf-8") as handle:
|
||||||
json.dump(self.detected, handle, indent=4, default=str)
|
json.dump(self.detected, handle, indent=4, cls=CustomJSONEncoder)
|
||||||
|
|
||||||
def serialize(self, record: dict) -> Union[dict, list, None]:
|
def serialize(self, record: dict) -> Union[dict, list, None]:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
30
tests/common/test_encoding.py
Normal file
30
tests/common/test_encoding.py
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
import json
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from mvt.common.encoding import CustomJSONEncoder
|
||||||
|
|
||||||
|
|
||||||
|
class TestCustomJSONEncoder:
|
||||||
|
def test__normal_input(self):
|
||||||
|
assert json.dumps({"a": "b"}, cls=CustomJSONEncoder) == '{"a": "b"}'
|
||||||
|
|
||||||
|
def test__datetime_object(self):
|
||||||
|
assert (
|
||||||
|
json.dumps(
|
||||||
|
{"timestamp": datetime(2023, 11, 13, 12, 21, 49, 727467)},
|
||||||
|
cls=CustomJSONEncoder,
|
||||||
|
)
|
||||||
|
== '{"timestamp": "2023-11-13 12:21:49.727467"}'
|
||||||
|
)
|
||||||
|
|
||||||
|
def test__bytes_non_utf_8(self):
|
||||||
|
assert (
|
||||||
|
json.dumps({"identifier": b"\xa8\xa9"}, cls=CustomJSONEncoder)
|
||||||
|
== '{"identifier": "0xa8a9"}'
|
||||||
|
)
|
||||||
|
|
||||||
|
def test__bytes_valid_utf_8(self):
|
||||||
|
assert (
|
||||||
|
json.dumps({"name": "家".encode()}, cls=CustomJSONEncoder)
|
||||||
|
== '{"name": "\\u5bb6"}'
|
||||||
|
)
|
Loading…
Reference in New Issue
Block a user