From b44c67e699dd96bce838f07bf7c6457ea59a0b08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Fri, 4 Mar 2022 17:04:32 +0100 Subject: [PATCH] Refactor some of the decryption code --- mvt/android/parsers/backup.py | 135 ++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 57 deletions(-) diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py index ee396d6..c9c2a1d 100644 --- a/mvt/android/parsers/backup.py +++ b/mvt/android/parsers/backup.py @@ -64,6 +64,7 @@ def parse_ab_header(data): "version": int(version), "encryption": encryption.decode("utf-8") } + return { "backup": False, "compression": None, @@ -71,6 +72,78 @@ def parse_ab_header(data): "encryption": None } +def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_blob, format_version, checksum_salt): + """Generate AES key from user password uisng PBKDF2 + + The backup master key is extracted from the master key blog after decryption. + """ + # Derive key from password using PBKDF2 + kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=user_salt, iterations=pbkdf2_rounds) + key = kdf.derive(password.encode("utf-8")) + + # Decrypt master key blob + cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv)) + decryptor = cipher.decryptor() + try: + decryted_master_key_blob = decryptor.update(master_key_blob) + decryptor.finalize() + except TypeError: + raise InvalidBackupPassword() + + # Extract key and IV from decrypted blob. + key_blob = io.BytesIO(decryted_master_key_blob) + master_iv_length = ord(key_blob.read(1)) + master_iv = key_blob.read(master_iv_length) + + master_key_length = ord(key_blob.read(1)) + master_key = key_blob.read(master_key_length) + + master_key_checksum_length = ord(key_blob.read(1)) + master_key_checksum = key_blob.read(master_key_checksum_length) + + # Handle quirky encoding of master key bytes in Android original Java crypto code + if format_version > 1: + hmac_mk = to_utf8_bytes(master_key) + else: + hmac_mk = master_key + + # Derive checksum to confirm successful backup decryption. + kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=checksum_salt, iterations=pbkdf2_rounds) + calculated_checksum = kdf.derive(hmac_mk) + + if master_key_checksum != calculated_checksum: + raise InvalidBackupPassword() + + return master_key, master_iv + +def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_version): + """ + Generate encryption keyffrom password and do decryption + """ + if encryption_algo != b"AES-256": + raise AndroidBackupNotImplemented("Encryption Algorithm not implemented") + + if password is None: + raise InvalidBackupPassword() + + [user_salt, checksum_salt, pbkdf2_rounds, user_iv, master_key_blob, encrypted_data] = encrypted_backup.split(b"\n", 5) + user_salt = bytes.fromhex(user_salt.decode("utf-8")) + checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8")) + pbkdf2_rounds = int(pbkdf2_rounds) + user_iv = bytes.fromhex(user_iv.decode("utf-8")) + master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8")) + + # Derive decryption master key from password + master_key, master_iv = decrypt_master_key(password=password, user_salt=user_salt, user_iv=user_iv, + pbkdf2_rounds=pbkdf2_rounds, master_key_blob=master_key_blob, + format_version=format_version, checksum_salt=checksum_salt) + + # Decrypt and unpad backup data using derivied key + cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv)) + decryptor = cipher.decryptor() + decrypted_tar = decryptor.update(encrypted_data) + decryptor.finalize() + + unpadder = padding.PKCS7(128).unpadder() + return unpadder.update(decrypted_tar) def parse_backup_file(data, password=None): """ @@ -79,70 +152,18 @@ def parse_backup_file(data, password=None): if not data.startswith(b"ANDROID BACKUP"): raise AndroidBackupParsingError("Invalid file header") - [magic_header, version, is_compressed, encryption, tar_data] = data.split(b"\n", 4) + [magic_header, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4) version = int(version) is_compressed = int(is_compressed) - if encryption != b"none": - if encryption != b"AES-256": - raise AndroidBackupNotImplemented("Encryption Algorithm not implemented") - if password is None: - raise InvalidBackupPassword() - [user_salt, checksum_salt, pbkdf2_rounds, user_iv, master_key_blob, encrypted_data] = tar_data.split(b"\n", 5) - user_salt = bytes.fromhex(user_salt.decode("utf-8")) - checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8")) - pbkdf2_rounds = int(pbkdf2_rounds) - user_iv = bytes.fromhex(user_iv.decode("utf-8")) - master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8")) + if encryption_algo != b"none": + tar_data = decrypt_backup_data(tar_data, password, encryption_algo, format_version=version) - kdf = PBKDF2HMAC( - algorithm=hashes.SHA1(), - length=32, - salt=user_salt, - iterations=pbkdf2_rounds) - key = kdf.derive(password.encode("utf-8")) - - cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv)) - decryptor = cipher.decryptor() - try: - plain_text = decryptor.update(master_key_blob) + decryptor.finalize() - - blob = io.BytesIO(plain_text) - master_iv_length = ord(blob.read(1)) - master_iv = blob.read(master_iv_length) - master_key_length = ord(blob.read(1)) - master_key = blob.read(master_key_length) - master_key_checksum_length = ord(blob.read(1)) - master_key_checksum = blob.read(master_key_checksum_length) - except TypeError: - raise InvalidBackupPassword() - - if version > 1: - hmac_mk = to_utf8_bytes(master_key) - else: - hmac_mk = master_key - - kdf = PBKDF2HMAC( - algorithm=hashes.SHA1(), - length=32, - salt=checksum_salt, - iterations=pbkdf2_rounds) - calculated_checksum = kdf.derive(hmac_mk) - - if master_key_checksum != calculated_checksum: - raise InvalidBackupPassword() - - cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv)) - decryptor = cipher.decryptor() - tar_data = decryptor.update(encrypted_data) + decryptor.finalize() - unpadder = padding.PKCS7(128).unpadder() - tar_data = data = unpadder.update(tar_data) - - if is_compressed == 1: + if is_compressed: try: tar_data = zlib.decompress(tar_data) except zlib.error: - raise AndroidBackupParsingError("Impossible to decompress the file") + raise AndroidBackupParsingError("Impossible to decompress the backup file") return tar_data