| #!/usr/bin/python2 |
| # |
| # Copyright (C) 2015 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Unit testing payload_info.py.""" |
| |
| from __future__ import print_function |
| |
| import StringIO |
| import collections |
| import mock |
| import sys |
| import unittest |
| |
| import payload_info |
| import update_payload |
| |
| from contextlib import contextmanager |
| |
| from update_payload import update_metadata_pb2 |
| |
| class FakePayloadError(Exception): |
| """A generic error when using the FakePayload.""" |
| |
| class FakeOption(object): |
| """Fake options object for testing.""" |
| |
| def __init__(self, **kwargs): |
| self.list_ops = False |
| self.stats = False |
| self.signatures = False |
| for key, val in kwargs.iteritems(): |
| setattr(self, key, val) |
| if not hasattr(self, 'payload_file'): |
| self.payload_file = None |
| |
| class FakeOp(object): |
| """Fake manifest operation for testing.""" |
| |
| def __init__(self, src_extents, dst_extents, op_type, **kwargs): |
| self.src_extents = src_extents |
| self.dst_extents = dst_extents |
| self.type = op_type |
| for key, val in kwargs.iteritems(): |
| setattr(self, key, val) |
| |
| def HasField(self, field): |
| return hasattr(self, field) |
| |
| class FakePartition(object): |
| """Fake PartitionUpdate field for testing.""" |
| |
| def __init__(self, partition_name, operations): |
| self.partition_name = partition_name |
| self.operations = operations |
| |
| class FakeManifest(object): |
| """Fake manifest for testing.""" |
| |
| def __init__(self, major_version): |
| FakeExtent = collections.namedtuple('FakeExtent', |
| ['start_block', 'num_blocks']) |
| self.install_operations = [FakeOp([], |
| [FakeExtent(1, 1), FakeExtent(2, 2)], |
| update_payload.common.OpType.REPLACE_BZ, |
| dst_length=3*4096, |
| data_offset=1, |
| data_length=1)] |
| self.kernel_install_operations = [FakeOp( |
| [FakeExtent(1, 1)], |
| [FakeExtent(x, x) for x in xrange(20)], |
| update_payload.common.OpType.SOURCE_COPY, |
| src_length=4096)] |
| if major_version == payload_info.MAJOR_PAYLOAD_VERSION_BRILLO: |
| self.partitions = [FakePartition('root', self.install_operations), |
| FakePartition('kernel', |
| self.kernel_install_operations)] |
| self.install_operations = self.kernel_install_operations = [] |
| self.block_size = 4096 |
| self.minor_version = 4 |
| FakePartInfo = collections.namedtuple('FakePartInfo', ['size']) |
| self.old_rootfs_info = FakePartInfo(1 * 4096) |
| self.old_kernel_info = FakePartInfo(2 * 4096) |
| self.new_rootfs_info = FakePartInfo(3 * 4096) |
| self.new_kernel_info = FakePartInfo(4 * 4096) |
| self.signatures_offset = None |
| self.signatures_size = None |
| |
| def HasField(self, field_name): |
| """Fake HasField method based on the python members.""" |
| return hasattr(self, field_name) and getattr(self, field_name) is not None |
| |
| class FakeHeader(object): |
| """Fake payload header for testing.""" |
| |
| def __init__(self, version, manifest_len, metadata_signature_len): |
| self.version = version |
| self.manifest_len = manifest_len |
| self.metadata_signature_len = metadata_signature_len |
| |
| @property |
| def size(self): |
| return (20 if self.version == payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS |
| else 24) |
| |
| class FakePayload(object): |
| """Fake payload for testing.""" |
| |
| def __init__(self, major_version): |
| self._header = FakeHeader(major_version, 222, 0) |
| self.header = None |
| self._manifest = FakeManifest(major_version) |
| self.manifest = None |
| |
| self._blobs = {} |
| self._payload_signatures = update_metadata_pb2.Signatures() |
| self._metadata_signatures = update_metadata_pb2.Signatures() |
| |
| def Init(self): |
| """Fake Init that sets header and manifest. |
| |
| Failing to call Init() will not make header and manifest available to the |
| test. |
| """ |
| self.header = self._header |
| self.manifest = self._manifest |
| |
| def ReadDataBlob(self, offset, length): |
| """Return the blob that should be present at the offset location""" |
| if not offset in self._blobs: |
| raise FakePayloadError('Requested blob at unknown offset %d' % offset) |
| blob = self._blobs[offset] |
| if len(blob) != length: |
| raise FakePayloadError('Read blob with the wrong length (expect: %d, ' |
| 'actual: %d)' % (len(blob), length)) |
| return blob |
| |
| @staticmethod |
| def _AddSignatureToProto(proto, **kwargs): |
| """Add a new Signature element to the passed proto.""" |
| new_signature = proto.signatures.add() |
| for key, val in kwargs.iteritems(): |
| setattr(new_signature, key, val) |
| |
| def AddPayloadSignature(self, **kwargs): |
| self._AddSignatureToProto(self._payload_signatures, **kwargs) |
| blob = self._payload_signatures.SerializeToString() |
| self._manifest.signatures_offset = 1234 |
| self._manifest.signatures_size = len(blob) |
| self._blobs[self._manifest.signatures_offset] = blob |
| |
| def AddMetadataSignature(self, **kwargs): |
| self._AddSignatureToProto(self._metadata_signatures, **kwargs) |
| if self._header.metadata_signature_len: |
| del self._blobs[-self._header.metadata_signature_len] |
| blob = self._metadata_signatures.SerializeToString() |
| self._header.metadata_signature_len = len(blob) |
| self._blobs[-len(blob)] = blob |
| |
| class PayloadCommandTest(unittest.TestCase): |
| """Test class for our PayloadCommand class.""" |
| |
| @contextmanager |
| def OutputCapturer(self): |
| """A tool for capturing the sys.stdout""" |
| stdout = sys.stdout |
| try: |
| sys.stdout = StringIO.StringIO() |
| yield sys.stdout |
| finally: |
| sys.stdout = stdout |
| |
| def TestCommand(self, payload_cmd, payload, expected_out): |
| """A tool for testing a payload command. |
| |
| It tests that a payload command which runs with a given payload produces a |
| correct output. |
| """ |
| with mock.patch.object(update_payload, 'Payload', return_value=payload), \ |
| self.OutputCapturer() as output: |
| payload_cmd.Run() |
| self.assertEquals(output.getvalue(), expected_out) |
| |
| def testDisplayValue(self): |
| """Verify that DisplayValue prints what we expect.""" |
| with self.OutputCapturer() as output: |
| payload_info.DisplayValue('key', 'value') |
| self.assertEquals(output.getvalue(), 'key: value\n') |
| |
| def testRun(self): |
| """Verify that Run parses and displays the payload like we expect.""" |
| payload_cmd = payload_info.PayloadCommand(FakeOption(action='show')) |
| payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS) |
| expected_out = """Payload version: 1 |
| Manifest length: 222 |
| Number of operations: 1 |
| Number of kernel ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testListOpsOnVersion1(self): |
| """Verify that the --list_ops option gives the correct output.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(list_ops=True, action='show')) |
| payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS) |
| expected_out = """Payload version: 1 |
| Manifest length: 222 |
| Number of operations: 1 |
| Number of kernel ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| |
| Install operations: |
| 0: REPLACE_BZ |
| Data offset: 1 |
| Data length: 1 |
| Destination: 2 extents (3 blocks) |
| (1,1) (2,2) |
| Kernel install operations: |
| 0: SOURCE_COPY |
| Source: 1 extent (1 block) |
| (1,1) |
| Destination: 20 extents (190 blocks) |
| (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) |
| (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testListOpsOnVersion2(self): |
| """Verify that the --list_ops option gives the correct output.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(list_ops=True, action='show')) |
| payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO) |
| expected_out = """Payload version: 2 |
| Manifest length: 222 |
| Number of partitions: 2 |
| Number of "root" ops: 1 |
| Number of "kernel" ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| |
| root install operations: |
| 0: REPLACE_BZ |
| Data offset: 1 |
| Data length: 1 |
| Destination: 2 extents (3 blocks) |
| (1,1) (2,2) |
| kernel install operations: |
| 0: SOURCE_COPY |
| Source: 1 extent (1 block) |
| (1,1) |
| Destination: 20 extents (190 blocks) |
| (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) |
| (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testStatsOnVersion1(self): |
| """Verify that the --stats option works correctly.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(stats=True, action='show')) |
| payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS) |
| expected_out = """Payload version: 1 |
| Manifest length: 222 |
| Number of operations: 1 |
| Number of kernel ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| Blocks read: 11 |
| Blocks written: 193 |
| Seeks when writing: 18 |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testStatsOnVersion2(self): |
| """Verify that the --stats option works correctly on version 2.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(stats=True, action='show')) |
| payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO) |
| expected_out = """Payload version: 2 |
| Manifest length: 222 |
| Number of partitions: 2 |
| Number of "root" ops: 1 |
| Number of "kernel" ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| Blocks read: 11 |
| Blocks written: 193 |
| Seeks when writing: 18 |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testEmptySignatures(self): |
| """Verify that the --signatures option works with unsigned payloads.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(action='show', signatures=True)) |
| payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_CHROMEOS) |
| expected_out = """Payload version: 1 |
| Manifest length: 222 |
| Number of operations: 1 |
| Number of kernel ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| No metadata signatures stored in the payload |
| No payload signatures stored in the payload |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testSignatures(self): |
| """Verify that the --signatures option shows the present signatures.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(action='show', signatures=True)) |
| payload = FakePayload(payload_info.MAJOR_PAYLOAD_VERSION_BRILLO) |
| payload.AddPayloadSignature(version=1, |
| data='12345678abcdefgh\x00\x01\x02\x03') |
| payload.AddPayloadSignature(data='I am a signature so access is yes.') |
| payload.AddMetadataSignature(data='\x00\x0a\x0c') |
| expected_out = """Payload version: 2 |
| Manifest length: 222 |
| Number of partitions: 2 |
| Number of "root" ops: 1 |
| Number of "kernel" ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| Metadata signatures blob: file_offset=246 (7 bytes) |
| Metadata signatures: (1 entries) |
| version=None, hex_data: (3 bytes) |
| 00 0a 0c | ... |
| Payload signatures blob: blob_offset=1234 (64 bytes) |
| Payload signatures: (2 entries) |
| version=1, hex_data: (20 bytes) |
| 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh |
| 00 01 02 03 | .... |
| version=None, hex_data: (34 bytes) |
| 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature |
| 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye |
| 73 2e | s. |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| if __name__ == '__main__': |
| unittest.main() |