releasetools: Add Payload class.
This breaks down the current WriteABOTAPackageWithBrilloScript() into
smaller and testable units, which also prepares for the work in
b/35724498.
Bug: 35724498
Test: python -m unittest test_ota_from_target_files
Test: Get identical A/B OTA packages w/ and w/o the CL.
Change-Id: I2ea45ce98e2d2baa58e94fb829b7242f6fe685a7
(cherry picked from commit 036d72181280cd6589b056c0c4adb9d2eb80228b)
diff --git a/tools/releasetools/ota_from_target_files.py b/tools/releasetools/ota_from_target_files.py
index 2b64f7f..edc20f6 100755
--- a/tools/releasetools/ota_from_target_files.py
+++ b/tools/releasetools/ota_from_target_files.py
@@ -360,6 +360,122 @@
return out_file
+class Payload(object):
+ """Manages the creation and the signing of an A/B OTA Payload."""
+
+ PAYLOAD_BIN = 'payload.bin'
+ PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt'
+
+ def __init__(self):
+ # The place where the output from the subprocess should go.
+ self._log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE
+ self.payload_file = None
+ self.payload_properties = None
+
+ def Generate(self, target_file, source_file=None, additional_args=None):
+ """Generates a payload from the given target-files zip(s).
+
+ Args:
+ target_file: The filename of the target build target-files zip.
+ source_file: The filename of the source build target-files zip; or None if
+ generating a full OTA.
+ additional_args: A list of additional args that should be passed to
+ brillo_update_payload script; or None.
+ """
+ if additional_args is None:
+ additional_args = []
+
+ payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin")
+ cmd = ["brillo_update_payload", "generate",
+ "--payload", payload_file,
+ "--target_image", target_file]
+ if source_file is not None:
+ cmd.extend(["--source_image", source_file])
+ cmd.extend(additional_args)
+ p = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
+ stdoutdata, _ = p.communicate()
+ assert p.returncode == 0, \
+ "brillo_update_payload generate failed: {}".format(stdoutdata)
+
+ self.payload_file = payload_file
+ self.payload_properties = None
+
+ def Sign(self, payload_signer):
+ """Generates and signs the hashes of the payload and metadata.
+
+ Args:
+ payload_signer: A PayloadSigner() instance that serves the signing work.
+
+ Raises:
+ AssertionError: On any failure when calling brillo_update_payload script.
+ """
+ assert isinstance(payload_signer, PayloadSigner)
+
+ # 1. Generate hashes of the payload and metadata files.
+ payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
+ metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
+ cmd = ["brillo_update_payload", "hash",
+ "--unsigned_payload", self.payload_file,
+ "--signature_size", "256",
+ "--metadata_hash_file", metadata_sig_file,
+ "--payload_hash_file", payload_sig_file]
+ p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
+ p1.communicate()
+ assert p1.returncode == 0, "brillo_update_payload hash failed"
+
+ # 2. Sign the hashes.
+ signed_payload_sig_file = payload_signer.Sign(payload_sig_file)
+ signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
+
+ # 3. Insert the signatures back into the payload file.
+ signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
+ suffix=".bin")
+ cmd = ["brillo_update_payload", "sign",
+ "--unsigned_payload", self.payload_file,
+ "--payload", signed_payload_file,
+ "--signature_size", "256",
+ "--metadata_signature_file", signed_metadata_sig_file,
+ "--payload_signature_file", signed_payload_sig_file]
+ p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
+ p1.communicate()
+ assert p1.returncode == 0, "brillo_update_payload sign failed"
+
+ # 4. Dump the signed payload properties.
+ properties_file = common.MakeTempFile(prefix="payload-properties-",
+ suffix=".txt")
+ cmd = ["brillo_update_payload", "properties",
+ "--payload", signed_payload_file,
+ "--properties_file", properties_file]
+ p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
+ p1.communicate()
+ assert p1.returncode == 0, "brillo_update_payload properties failed"
+
+ if OPTIONS.wipe_user_data:
+ with open(properties_file, "a") as f:
+ f.write("POWERWASH=1\n")
+
+ self.payload_file = signed_payload_file
+ self.payload_properties = properties_file
+
+ def WriteToZip(self, output_zip):
+ """Writes the payload to the given zip.
+
+ Args:
+ output_zip: The output ZipFile instance.
+ """
+ assert self.payload_file is not None
+ assert self.payload_properties is not None
+
+ # Add the signed payload file and properties into the zip. In order to
+ # support streaming, we pack them as ZIP_STORED. So these entries can be
+ # read directly with the offset and length pairs.
+ common.ZipWrite(output_zip, self.payload_file, arcname=Payload.PAYLOAD_BIN,
+ compress_type=zipfile.ZIP_STORED)
+ common.ZipWrite(output_zip, self.payload_properties,
+ arcname=Payload.PAYLOAD_PROPERTIES_TXT,
+ compress_type=zipfile.ZIP_STORED)
+
+
def SignOutput(temp_zip_name, output_zip_name):
pw = OPTIONS.key_passwords[OPTIONS.package_key]
@@ -1122,12 +1238,6 @@
value += ' ' * (expected_length - len(value))
return value
- # The place where the output from the subprocess should go.
- log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE
-
- # Get the PayloadSigner to be used in step 3.
- payload_signer = PayloadSigner()
-
# Stage the output zip package for package signing.
temp_zip_file = tempfile.NamedTemporaryFile()
output_zip = zipfile.ZipFile(temp_zip_file, "w",
@@ -1143,77 +1253,23 @@
# Metadata to comply with Android OTA package format.
metadata = GetPackageMetadata(target_info, source_info)
- # 1. Generate payload.
- payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin")
- cmd = ["brillo_update_payload", "generate",
- "--payload", payload_file,
- "--target_image", target_file]
- if source_file is not None:
- cmd.extend(["--source_image", source_file])
+ # Generate payload.
+ payload = Payload()
+
+ # Enforce a max timestamp this payload can be applied on top of.
if OPTIONS.downgrade:
max_timestamp = source_info.GetBuildProp("ro.build.date.utc")
else:
max_timestamp = metadata["post-timestamp"]
- cmd.extend(["--max_timestamp", max_timestamp])
- p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
- p1.communicate()
- assert p1.returncode == 0, "brillo_update_payload generate failed"
+ additional_args = ["--max_timestamp", max_timestamp]
- # 2. Generate hashes of the payload and metadata files.
- payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
- metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
- cmd = ["brillo_update_payload", "hash",
- "--unsigned_payload", payload_file,
- "--signature_size", "256",
- "--metadata_hash_file", metadata_sig_file,
- "--payload_hash_file", payload_sig_file]
- p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
- p1.communicate()
- assert p1.returncode == 0, "brillo_update_payload hash failed"
+ payload.Generate(target_file, source_file, additional_args)
- # 3. Sign the hashes and insert them back into the payload file.
- # 3a. Sign the payload hash.
- signed_payload_sig_file = payload_signer.Sign(payload_sig_file)
+ # Sign the payload.
+ payload.Sign(PayloadSigner())
- # 3b. Sign the metadata hash.
- signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
-
- # 3c. Insert the signatures back into the payload file.
- signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
- suffix=".bin")
- cmd = ["brillo_update_payload", "sign",
- "--unsigned_payload", payload_file,
- "--payload", signed_payload_file,
- "--signature_size", "256",
- "--metadata_signature_file", signed_metadata_sig_file,
- "--payload_signature_file", signed_payload_sig_file]
- p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
- p1.communicate()
- assert p1.returncode == 0, "brillo_update_payload sign failed"
-
- # 4. Dump the signed payload properties.
- properties_file = common.MakeTempFile(prefix="payload-properties-",
- suffix=".txt")
- cmd = ["brillo_update_payload", "properties",
- "--payload", signed_payload_file,
- "--properties_file", properties_file]
- p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
- p1.communicate()
- assert p1.returncode == 0, "brillo_update_payload properties failed"
-
- if OPTIONS.wipe_user_data:
- with open(properties_file, "a") as f:
- f.write("POWERWASH=1\n")
-
- # Add the signed payload file and properties into the zip. In order to
- # support streaming, we pack payload.bin, payload_properties.txt and
- # care_map.txt as ZIP_STORED. So these entries can be read directly with
- # the offset and length pairs.
- common.ZipWrite(output_zip, signed_payload_file, arcname="payload.bin",
- compress_type=zipfile.ZIP_STORED)
- common.ZipWrite(output_zip, properties_file,
- arcname="payload_properties.txt",
- compress_type=zipfile.ZIP_STORED)
+ # Write the payload into output zip.
+ payload.WriteToZip(output_zip)
# If dm-verity is supported for the device, copy contents of care_map
# into A/B OTA package.
@@ -1224,6 +1280,8 @@
namelist = target_zip.namelist()
if care_map_path in namelist:
care_map_data = target_zip.read(care_map_path)
+ # In order to support streaming, care_map.txt needs to be packed as
+ # ZIP_STORED.
common.ZipWriteStr(output_zip, "care_map.txt", care_map_data,
compress_type=zipfile.ZIP_STORED)
else:
diff --git a/tools/releasetools/test_ota_from_target_files.py b/tools/releasetools/test_ota_from_target_files.py
index fa6655b..103d4b6 100644
--- a/tools/releasetools/test_ota_from_target_files.py
+++ b/tools/releasetools/test_ota_from_target_files.py
@@ -15,12 +15,14 @@
#
import copy
+import os
import os.path
import unittest
+import zipfile
import common
from ota_from_target_files import (
- _LoadOemDicts, BuildInfo, GetPackageMetadata, PayloadSigner,
+ _LoadOemDicts, BuildInfo, GetPackageMetadata, Payload, PayloadSigner,
WriteFingerprintAssertion)
@@ -564,3 +566,157 @@
verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
self._assertFilesEqual(verify_file, signed_file)
+
+
+class PayloadTest(unittest.TestCase):
+
+ def setUp(self):
+ self.testdata_dir = get_testdata_dir()
+ self.assertTrue(os.path.exists(self.testdata_dir))
+
+ common.OPTIONS.wipe_user_data = False
+ common.OPTIONS.payload_signer = None
+ common.OPTIONS.payload_signer_args = None
+ common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
+ common.OPTIONS.key_passwords = {
+ common.OPTIONS.package_key : None,
+ }
+
+ def tearDown(self):
+ common.Cleanup()
+
+ @staticmethod
+ def _construct_target_files():
+ target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
+ with zipfile.ZipFile(target_files, 'w') as target_files_zip:
+ # META/update_engine_config.txt
+ target_files_zip.writestr(
+ 'META/update_engine_config.txt',
+ "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")
+
+ # META/ab_partitions.txt
+ ab_partitions = ['boot', 'system', 'vendor']
+ target_files_zip.writestr(
+ 'META/ab_partitions.txt',
+ '\n'.join(ab_partitions))
+
+ # Create dummy images for each of them.
+ for partition in ab_partitions:
+ target_files_zip.writestr('IMAGES/' + partition + '.img',
+ os.urandom(len(partition)))
+
+ return target_files
+
+ def _create_payload_full(self):
+ target_file = self._construct_target_files()
+ payload = Payload()
+ payload.Generate(target_file)
+ return payload
+
+ def _create_payload_incremental(self):
+ target_file = self._construct_target_files()
+ source_file = self._construct_target_files()
+ payload = Payload()
+ payload.Generate(target_file, source_file)
+ return payload
+
+ def test_Generate_full(self):
+ payload = self._create_payload_full()
+ self.assertTrue(os.path.exists(payload.payload_file))
+
+ def test_Generate_incremental(self):
+ payload = self._create_payload_incremental()
+ self.assertTrue(os.path.exists(payload.payload_file))
+
+ def test_Generate_additionalArgs(self):
+ target_file = self._construct_target_files()
+ source_file = self._construct_target_files()
+ payload = Payload()
+ # This should work the same as calling payload.Generate(target_file,
+ # source_file).
+ payload.Generate(
+ target_file, additional_args=["--source_image", source_file])
+ self.assertTrue(os.path.exists(payload.payload_file))
+
+ def test_Generate_invalidInput(self):
+ target_file = self._construct_target_files()
+ common.ZipDelete(target_file, 'IMAGES/vendor.img')
+ payload = Payload()
+ self.assertRaises(AssertionError, payload.Generate, target_file)
+
+ def test_Sign_full(self):
+ payload = self._create_payload_full()
+ payload.Sign(PayloadSigner())
+
+ output_file = common.MakeTempFile(suffix='.zip')
+ with zipfile.ZipFile(output_file, 'w') as output_zip:
+ payload.WriteToZip(output_zip)
+
+ import check_ota_package_signature
+ check_ota_package_signature.VerifyAbOtaPayload(
+ os.path.join(self.testdata_dir, 'testkey.x509.pem'),
+ output_file)
+
+ def test_Sign_incremental(self):
+ payload = self._create_payload_incremental()
+ payload.Sign(PayloadSigner())
+
+ output_file = common.MakeTempFile(suffix='.zip')
+ with zipfile.ZipFile(output_file, 'w') as output_zip:
+ payload.WriteToZip(output_zip)
+
+ import check_ota_package_signature
+ check_ota_package_signature.VerifyAbOtaPayload(
+ os.path.join(self.testdata_dir, 'testkey.x509.pem'),
+ output_file)
+
+ def test_Sign_withDataWipe(self):
+ common.OPTIONS.wipe_user_data = True
+ payload = self._create_payload_full()
+ payload.Sign(PayloadSigner())
+
+ with open(payload.payload_properties) as properties_fp:
+ self.assertIn("POWERWASH=1", properties_fp.read())
+
+ def test_Sign_badSigner(self):
+ """Tests that signing failure can be captured."""
+ payload = self._create_payload_full()
+ payload_signer = PayloadSigner()
+ payload_signer.signer_args.append('bad-option')
+ self.assertRaises(AssertionError, payload.Sign, payload_signer)
+
+ def test_WriteToZip(self):
+ payload = self._create_payload_full()
+ payload.Sign(PayloadSigner())
+
+ output_file = common.MakeTempFile(suffix='.zip')
+ with zipfile.ZipFile(output_file, 'w') as output_zip:
+ payload.WriteToZip(output_zip)
+
+ with zipfile.ZipFile(output_file) as verify_zip:
+ # First make sure we have the essential entries.
+ namelist = verify_zip.namelist()
+ self.assertIn(Payload.PAYLOAD_BIN, namelist)
+ self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist)
+
+ # Then assert these entries are stored.
+ for entry_info in verify_zip.infolist():
+ if entry_info.filename not in (Payload.PAYLOAD_BIN,
+ Payload.PAYLOAD_PROPERTIES_TXT):
+ continue
+ self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
+
+ def test_WriteToZip_unsignedPayload(self):
+ """Unsigned payloads should not be allowed to be written to zip."""
+ payload = self._create_payload_full()
+
+ output_file = common.MakeTempFile(suffix='.zip')
+ with zipfile.ZipFile(output_file, 'w') as output_zip:
+ self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
+
+ # Also test with incremental payload.
+ payload = self._create_payload_incremental()
+
+ output_file = common.MakeTempFile(suffix='.zip')
+ with zipfile.ZipFile(output_file, 'w') as output_zip:
+ self.assertRaises(AssertionError, payload.WriteToZip, output_zip)