mirror of https://github.com/mvt-project/mvt.git
Refactor some of the decryption code
This commit is contained in:
parent
a4d08f8f35
commit
b44c67e699
|
@ -64,6 +64,7 @@ def parse_ab_header(data):
|
|||
"version": int(version),
|
||||
"encryption": encryption.decode("utf-8")
|
||||
}
|
||||
|
||||
return {
|
||||
"backup": False,
|
||||
"compression": None,
|
||||
|
@ -71,6 +72,78 @@ def parse_ab_header(data):
|
|||
"encryption": None
|
||||
}
|
||||
|
||||
def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_blob, format_version, checksum_salt):
|
||||
"""Generate AES key from user password uisng PBKDF2
|
||||
|
||||
The backup master key is extracted from the master key blog after decryption.
|
||||
"""
|
||||
# Derive key from password using PBKDF2
|
||||
kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=user_salt, iterations=pbkdf2_rounds)
|
||||
key = kdf.derive(password.encode("utf-8"))
|
||||
|
||||
# Decrypt master key blob
|
||||
cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv))
|
||||
decryptor = cipher.decryptor()
|
||||
try:
|
||||
decryted_master_key_blob = decryptor.update(master_key_blob) + decryptor.finalize()
|
||||
except TypeError:
|
||||
raise InvalidBackupPassword()
|
||||
|
||||
# Extract key and IV from decrypted blob.
|
||||
key_blob = io.BytesIO(decryted_master_key_blob)
|
||||
master_iv_length = ord(key_blob.read(1))
|
||||
master_iv = key_blob.read(master_iv_length)
|
||||
|
||||
master_key_length = ord(key_blob.read(1))
|
||||
master_key = key_blob.read(master_key_length)
|
||||
|
||||
master_key_checksum_length = ord(key_blob.read(1))
|
||||
master_key_checksum = key_blob.read(master_key_checksum_length)
|
||||
|
||||
# Handle quirky encoding of master key bytes in Android original Java crypto code
|
||||
if format_version > 1:
|
||||
hmac_mk = to_utf8_bytes(master_key)
|
||||
else:
|
||||
hmac_mk = master_key
|
||||
|
||||
# Derive checksum to confirm successful backup decryption.
|
||||
kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=checksum_salt, iterations=pbkdf2_rounds)
|
||||
calculated_checksum = kdf.derive(hmac_mk)
|
||||
|
||||
if master_key_checksum != calculated_checksum:
|
||||
raise InvalidBackupPassword()
|
||||
|
||||
return master_key, master_iv
|
||||
|
||||
def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_version):
|
||||
"""
|
||||
Generate encryption keyffrom password and do decryption
|
||||
"""
|
||||
if encryption_algo != b"AES-256":
|
||||
raise AndroidBackupNotImplemented("Encryption Algorithm not implemented")
|
||||
|
||||
if password is None:
|
||||
raise InvalidBackupPassword()
|
||||
|
||||
[user_salt, checksum_salt, pbkdf2_rounds, user_iv, master_key_blob, encrypted_data] = encrypted_backup.split(b"\n", 5)
|
||||
user_salt = bytes.fromhex(user_salt.decode("utf-8"))
|
||||
checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8"))
|
||||
pbkdf2_rounds = int(pbkdf2_rounds)
|
||||
user_iv = bytes.fromhex(user_iv.decode("utf-8"))
|
||||
master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8"))
|
||||
|
||||
# Derive decryption master key from password
|
||||
master_key, master_iv = decrypt_master_key(password=password, user_salt=user_salt, user_iv=user_iv,
|
||||
pbkdf2_rounds=pbkdf2_rounds, master_key_blob=master_key_blob,
|
||||
format_version=format_version, checksum_salt=checksum_salt)
|
||||
|
||||
# Decrypt and unpad backup data using derivied key
|
||||
cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv))
|
||||
decryptor = cipher.decryptor()
|
||||
decrypted_tar = decryptor.update(encrypted_data) + decryptor.finalize()
|
||||
|
||||
unpadder = padding.PKCS7(128).unpadder()
|
||||
return unpadder.update(decrypted_tar)
|
||||
|
||||
def parse_backup_file(data, password=None):
|
||||
"""
|
||||
|
@ -79,70 +152,18 @@ def parse_backup_file(data, password=None):
|
|||
if not data.startswith(b"ANDROID BACKUP"):
|
||||
raise AndroidBackupParsingError("Invalid file header")
|
||||
|
||||
[magic_header, version, is_compressed, encryption, tar_data] = data.split(b"\n", 4)
|
||||
[magic_header, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4)
|
||||
version = int(version)
|
||||
is_compressed = int(is_compressed)
|
||||
|
||||
if encryption != b"none":
|
||||
if encryption != b"AES-256":
|
||||
raise AndroidBackupNotImplemented("Encryption Algorithm not implemented")
|
||||
if password is None:
|
||||
raise InvalidBackupPassword()
|
||||
[user_salt, checksum_salt, pbkdf2_rounds, user_iv, master_key_blob, encrypted_data] = tar_data.split(b"\n", 5)
|
||||
user_salt = bytes.fromhex(user_salt.decode("utf-8"))
|
||||
checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8"))
|
||||
pbkdf2_rounds = int(pbkdf2_rounds)
|
||||
user_iv = bytes.fromhex(user_iv.decode("utf-8"))
|
||||
master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8"))
|
||||
if encryption_algo != b"none":
|
||||
tar_data = decrypt_backup_data(tar_data, password, encryption_algo, format_version=version)
|
||||
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA1(),
|
||||
length=32,
|
||||
salt=user_salt,
|
||||
iterations=pbkdf2_rounds)
|
||||
key = kdf.derive(password.encode("utf-8"))
|
||||
|
||||
cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv))
|
||||
decryptor = cipher.decryptor()
|
||||
try:
|
||||
plain_text = decryptor.update(master_key_blob) + decryptor.finalize()
|
||||
|
||||
blob = io.BytesIO(plain_text)
|
||||
master_iv_length = ord(blob.read(1))
|
||||
master_iv = blob.read(master_iv_length)
|
||||
master_key_length = ord(blob.read(1))
|
||||
master_key = blob.read(master_key_length)
|
||||
master_key_checksum_length = ord(blob.read(1))
|
||||
master_key_checksum = blob.read(master_key_checksum_length)
|
||||
except TypeError:
|
||||
raise InvalidBackupPassword()
|
||||
|
||||
if version > 1:
|
||||
hmac_mk = to_utf8_bytes(master_key)
|
||||
else:
|
||||
hmac_mk = master_key
|
||||
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA1(),
|
||||
length=32,
|
||||
salt=checksum_salt,
|
||||
iterations=pbkdf2_rounds)
|
||||
calculated_checksum = kdf.derive(hmac_mk)
|
||||
|
||||
if master_key_checksum != calculated_checksum:
|
||||
raise InvalidBackupPassword()
|
||||
|
||||
cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv))
|
||||
decryptor = cipher.decryptor()
|
||||
tar_data = decryptor.update(encrypted_data) + decryptor.finalize()
|
||||
unpadder = padding.PKCS7(128).unpadder()
|
||||
tar_data = data = unpadder.update(tar_data)
|
||||
|
||||
if is_compressed == 1:
|
||||
if is_compressed:
|
||||
try:
|
||||
tar_data = zlib.decompress(tar_data)
|
||||
except zlib.error:
|
||||
raise AndroidBackupParsingError("Impossible to decompress the file")
|
||||
raise AndroidBackupParsingError("Impossible to decompress the backup file")
|
||||
|
||||
return tar_data
|
||||
|
||||
|
|
Loading…
Reference in New Issue