mirror of
https://github.com/mvt-project/mvt.git
synced 2024-06-01 18:55:31 +00:00
Move encoder class into utils
This commit is contained in:
parent
34a04ec4c2
commit
6d0f819926
|
@ -1,27 +0,0 @@
|
|||
import json
|
||||
|
||||
|
||||
class CustomJSONEncoder(json.JSONEncoder):
|
||||
"""
|
||||
Custom JSON encoder to handle non-standard types.
|
||||
|
||||
Some modules are storing non-UTF-8 bytes in their results dictionaries.
|
||||
This causes exceptions when the results are being encoded as JSON.
|
||||
|
||||
Of course this means that when MVT is run via `check-iocs` with existing
|
||||
results, the encoded version will be loaded back into the dictionary.
|
||||
Modules should ensure they encode anything that needs to be compared
|
||||
against an indicator in a JSON-friendly type.
|
||||
"""
|
||||
|
||||
def default(self, o):
|
||||
if isinstance(o, bytes):
|
||||
# If it's utf-8, try and use that first
|
||||
try:
|
||||
return o.decode("utf-8")
|
||||
except UnicodeError:
|
||||
# Otherwise use a hex representation for any byte type
|
||||
return "0x" + o.hex()
|
||||
|
||||
# For all other types try to use the string representation.
|
||||
return str(o)
|
|
@ -10,8 +10,7 @@ import os
|
|||
import re
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from .encoding import CustomJSONEncoder
|
||||
from .utils import exec_or_profile
|
||||
from .utils import CustomJSONEncoder, exec_or_profile
|
||||
|
||||
|
||||
class DatabaseNotFoundError(Exception):
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
import cProfile
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
@ -14,6 +15,32 @@ from typing import Any, Iterator, Union
|
|||
from rich.logging import RichHandler
|
||||
|
||||
|
||||
class CustomJSONEncoder(json.JSONEncoder):
|
||||
"""
|
||||
Custom JSON encoder to handle non-standard types.
|
||||
|
||||
Some modules are storing non-UTF-8 bytes in their results dictionaries.
|
||||
This causes exceptions when the results are being encoded as JSON.
|
||||
|
||||
Of course this means that when MVT is run via `check-iocs` with existing
|
||||
results, the encoded version will be loaded back into the dictionary.
|
||||
Modules should ensure they encode anything that needs to be compared
|
||||
against an indicator in a JSON-friendly type.
|
||||
"""
|
||||
|
||||
def default(self, o):
|
||||
if isinstance(o, bytes):
|
||||
# If it's utf-8, try and use that first
|
||||
try:
|
||||
return o.decode("utf-8")
|
||||
except UnicodeError:
|
||||
# Otherwise use a hex representation for any byte type
|
||||
return "0x" + o.hex()
|
||||
|
||||
# For all other types try to use the string representation.
|
||||
return str(o)
|
||||
|
||||
|
||||
def convert_chrometime_to_datetime(timestamp: int) -> datetime.datetime:
|
||||
"""Converts Chrome timestamp to a datetime.
|
||||
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
import json
|
||||
from datetime import datetime
|
||||
|
||||
from mvt.common.encoding import CustomJSONEncoder
|
||||
|
||||
|
||||
class TestCustomJSONEncoder:
|
||||
def test__normal_input(self):
|
||||
assert json.dumps({"a": "b"}, cls=CustomJSONEncoder) == '{"a": "b"}'
|
||||
|
||||
def test__datetime_object(self):
|
||||
assert (
|
||||
json.dumps(
|
||||
{"timestamp": datetime(2023, 11, 13, 12, 21, 49, 727467)},
|
||||
cls=CustomJSONEncoder,
|
||||
)
|
||||
== '{"timestamp": "2023-11-13 12:21:49.727467"}'
|
||||
)
|
||||
|
||||
def test__bytes_non_utf_8(self):
|
||||
assert (
|
||||
json.dumps({"identifier": b"\xa8\xa9"}, cls=CustomJSONEncoder)
|
||||
== '{"identifier": "0xa8a9"}'
|
||||
)
|
||||
|
||||
def test__bytes_valid_utf_8(self):
|
||||
assert (
|
||||
json.dumps({"name": "家".encode()}, cls=CustomJSONEncoder)
|
||||
== '{"name": "\\u5bb6"}'
|
||||
)
|
|
@ -3,10 +3,13 @@
|
|||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from mvt.common.utils import (
|
||||
CustomJSONEncoder,
|
||||
convert_datetime_to_iso,
|
||||
convert_mactime_to_iso,
|
||||
convert_unix_to_iso,
|
||||
|
@ -64,3 +67,29 @@ class TestHashes:
|
|||
hashes[1]["sha256"]
|
||||
== "cfae0e04ef139b5a2ae1e2b3d400ce67eb98e67ff66f56ba2a580fe41bc120d0"
|
||||
)
|
||||
|
||||
|
||||
class TestCustomJSONEncoder:
|
||||
def test__normal_input(self):
|
||||
assert json.dumps({"a": "b"}, cls=CustomJSONEncoder) == '{"a": "b"}'
|
||||
|
||||
def test__datetime_object(self):
|
||||
assert (
|
||||
json.dumps(
|
||||
{"timestamp": datetime(2023, 11, 13, 12, 21, 49, 727467)},
|
||||
cls=CustomJSONEncoder,
|
||||
)
|
||||
== '{"timestamp": "2023-11-13 12:21:49.727467"}'
|
||||
)
|
||||
|
||||
def test__bytes_non_utf_8(self):
|
||||
assert (
|
||||
json.dumps({"identifier": b"\xa8\xa9"}, cls=CustomJSONEncoder)
|
||||
== '{"identifier": "0xa8a9"}'
|
||||
)
|
||||
|
||||
def test__bytes_valid_utf_8(self):
|
||||
assert (
|
||||
json.dumps({"name": "家".encode()}, cls=CustomJSONEncoder)
|
||||
== '{"name": "\\u5bb6"}'
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue
Block a user