| #!/usr/bin/env python |
| # |
| # Copyright (C) 2015 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Unit testing payload_info.py.""" |
| |
| # Disable check for function names to avoid errors based on old code |
| # pylint: disable-msg=invalid-name |
| |
| from __future__ import absolute_import |
| from __future__ import print_function |
| |
| import sys |
| import unittest |
| |
| from contextlib import contextmanager |
| |
| from six.moves import StringIO |
| |
| import mock # pylint: disable=import-error |
| |
| import payload_info |
| import update_payload |
| |
| from update_payload import update_metadata_pb2 |
| |
| |
| class FakePayloadError(Exception): |
| """A generic error when using the FakePayload.""" |
| |
| |
| class FakeOption(object): |
| """Fake options object for testing.""" |
| |
| def __init__(self, **kwargs): |
| self.list_ops = False |
| self.stats = False |
| self.signatures = False |
| for key, val in kwargs.items(): |
| setattr(self, key, val) |
| if not hasattr(self, 'payload_file'): |
| self.payload_file = None |
| |
| |
| class FakeOp(object): |
| """Fake manifest operation for testing.""" |
| |
| def __init__(self, src_extents, dst_extents, op_type, **kwargs): |
| self.src_extents = src_extents |
| self.dst_extents = dst_extents |
| self.type = op_type |
| for key, val in kwargs.items(): |
| setattr(self, key, val) |
| |
| def HasField(self, field): |
| return hasattr(self, field) |
| |
| |
| class FakeExtent(object): |
| """Fake Extent for testing.""" |
| def __init__(self, start_block, num_blocks): |
| self.start_block = start_block |
| self.num_blocks = num_blocks |
| |
| |
| class FakePartitionInfo(object): |
| """Fake PartitionInfo for testing.""" |
| def __init__(self, size): |
| self.size = size |
| |
| |
| class FakePartition(object): |
| """Fake PartitionUpdate field for testing.""" |
| |
| def __init__(self, partition_name, operations, old_size, new_size): |
| self.partition_name = partition_name |
| self.operations = operations |
| self.old_partition_info = FakePartitionInfo(old_size) |
| self.new_partition_info = FakePartitionInfo(new_size) |
| |
| |
| class FakeManifest(object): |
| """Fake manifest for testing.""" |
| |
| def __init__(self): |
| self.partitions = [ |
| FakePartition(update_payload.common.ROOTFS, |
| [FakeOp([], [FakeExtent(1, 1), FakeExtent(2, 2)], |
| update_payload.common.OpType.REPLACE_BZ, |
| dst_length=3*4096, |
| data_offset=1, |
| data_length=1) |
| ], 1 * 4096, 3 * 4096), |
| FakePartition(update_payload.common.KERNEL, |
| [FakeOp([FakeExtent(1, 1)], |
| [FakeExtent(x, x) for x in range(20)], |
| update_payload.common.OpType.SOURCE_COPY, |
| src_length=4096) |
| ], 2 * 4096, 4 * 4096), |
| ] |
| self.block_size = 4096 |
| self.minor_version = 4 |
| self.signatures_offset = None |
| self.signatures_size = None |
| |
| def HasField(self, field_name): |
| """Fake HasField method based on the python members.""" |
| return hasattr(self, field_name) and getattr(self, field_name) is not None |
| |
| |
| class FakeHeader(object): |
| """Fake payload header for testing.""" |
| |
| def __init__(self, manifest_len, metadata_signature_len): |
| self.version = payload_info.MAJOR_PAYLOAD_VERSION_BRILLO |
| self.manifest_len = manifest_len |
| self.metadata_signature_len = metadata_signature_len |
| |
| @property |
| def size(self): |
| return 24 |
| |
| |
| class FakePayload(object): |
| """Fake payload for testing.""" |
| |
| def __init__(self): |
| self._header = FakeHeader(222, 0) |
| self.header = None |
| self._manifest = FakeManifest() |
| self.manifest = None |
| |
| self._blobs = {} |
| self._payload_signatures = update_metadata_pb2.Signatures() |
| self._metadata_signatures = update_metadata_pb2.Signatures() |
| |
| def Init(self): |
| """Fake Init that sets header and manifest. |
| |
| Failing to call Init() will not make header and manifest available to the |
| test. |
| """ |
| self.header = self._header |
| self.manifest = self._manifest |
| |
| def ReadDataBlob(self, offset, length): |
| """Return the blob that should be present at the offset location""" |
| if not offset in self._blobs: |
| raise FakePayloadError('Requested blob at unknown offset %d' % offset) |
| blob = self._blobs[offset] |
| if len(blob) != length: |
| raise FakePayloadError('Read blob with the wrong length (expect: %d, ' |
| 'actual: %d)' % (len(blob), length)) |
| return blob |
| |
| @staticmethod |
| def _AddSignatureToProto(proto, **kwargs): |
| """Add a new Signature element to the passed proto.""" |
| new_signature = proto.signatures.add() |
| for key, val in kwargs.items(): |
| setattr(new_signature, key, val) |
| |
| def AddPayloadSignature(self, **kwargs): |
| self._AddSignatureToProto(self._payload_signatures, **kwargs) |
| blob = self._payload_signatures.SerializeToString() |
| self._manifest.signatures_offset = 1234 |
| self._manifest.signatures_size = len(blob) |
| self._blobs[self._manifest.signatures_offset] = blob |
| |
| def AddMetadataSignature(self, **kwargs): |
| self._AddSignatureToProto(self._metadata_signatures, **kwargs) |
| if self._header.metadata_signature_len: |
| del self._blobs[-self._header.metadata_signature_len] |
| blob = self._metadata_signatures.SerializeToString() |
| self._header.metadata_signature_len = len(blob) |
| self._blobs[-len(blob)] = blob |
| |
| |
| class PayloadCommandTest(unittest.TestCase): |
| """Test class for our PayloadCommand class.""" |
| |
| @contextmanager |
| def OutputCapturer(self): |
| """A tool for capturing the sys.stdout""" |
| stdout = sys.stdout |
| try: |
| sys.stdout = StringIO() |
| yield sys.stdout |
| finally: |
| sys.stdout = stdout |
| |
| def TestCommand(self, payload_cmd, payload, expected_out): |
| """A tool for testing a payload command. |
| |
| It tests that a payload command which runs with a given payload produces a |
| correct output. |
| """ |
| with mock.patch.object(update_payload, 'Payload', return_value=payload), \ |
| self.OutputCapturer() as output: |
| payload_cmd.Run() |
| self.assertEqual(output.getvalue(), expected_out) |
| |
| def testDisplayValue(self): |
| """Verify that DisplayValue prints what we expect.""" |
| with self.OutputCapturer() as output: |
| payload_info.DisplayValue('key', 'value') |
| self.assertEqual(output.getvalue(), 'key: value\n') |
| |
| def testRun(self): |
| """Verify that Run parses and displays the payload like we expect.""" |
| payload_cmd = payload_info.PayloadCommand(FakeOption(action='show')) |
| payload = FakePayload() |
| expected_out = """Payload version: 2 |
| Manifest length: 222 |
| Number of partitions: 2 |
| Number of "root" ops: 1 |
| Number of "kernel" ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testListOpsOnVersion2(self): |
| """Verify that the --list_ops option gives the correct output.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(list_ops=True, action='show')) |
| payload = FakePayload() |
| expected_out = """Payload version: 2 |
| Manifest length: 222 |
| Number of partitions: 2 |
| Number of "root" ops: 1 |
| Number of "kernel" ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| |
| root install operations: |
| 0: REPLACE_BZ |
| Data offset: 1 |
| Data length: 1 |
| Destination: 2 extents (3 blocks) |
| (1,1) (2,2) |
| kernel install operations: |
| 0: SOURCE_COPY |
| Source: 1 extent (1 block) |
| (1,1) |
| Destination: 20 extents (190 blocks) |
| (0,0) (1,1) (2,2) (3,3) (4,4) (5,5) (6,6) (7,7) (8,8) (9,9) (10,10) |
| (11,11) (12,12) (13,13) (14,14) (15,15) (16,16) (17,17) (18,18) (19,19) |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testStatsOnVersion2(self): |
| """Verify that the --stats option works correctly on version 2.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(stats=True, action='show')) |
| payload = FakePayload() |
| expected_out = """Payload version: 2 |
| Manifest length: 222 |
| Number of partitions: 2 |
| Number of "root" ops: 1 |
| Number of "kernel" ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| Blocks read: 11 |
| Blocks written: 193 |
| Seeks when writing: 18 |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testEmptySignatures(self): |
| """Verify that the --signatures option works with unsigned payloads.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(action='show', signatures=True)) |
| payload = FakePayload() |
| expected_out = """Payload version: 2 |
| Manifest length: 222 |
| Number of partitions: 2 |
| Number of "root" ops: 1 |
| Number of "kernel" ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| No metadata signatures stored in the payload |
| No payload signatures stored in the payload |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| def testSignatures(self): |
| """Verify that the --signatures option shows the present signatures.""" |
| payload_cmd = payload_info.PayloadCommand( |
| FakeOption(action='show', signatures=True)) |
| payload = FakePayload() |
| payload.AddPayloadSignature(version=1, |
| data=b'12345678abcdefgh\x00\x01\x02\x03') |
| payload.AddPayloadSignature(data=b'I am a signature so access is yes.') |
| payload.AddMetadataSignature(data=b'\x00\x0a\x0c') |
| expected_out = """Payload version: 2 |
| Manifest length: 222 |
| Number of partitions: 2 |
| Number of "root" ops: 1 |
| Number of "kernel" ops: 1 |
| Block size: 4096 |
| Minor version: 4 |
| Metadata signatures blob: file_offset=246 (7 bytes) |
| Metadata signatures: (1 entries) |
| version=None, hex_data: (3 bytes) |
| 00 0a 0c | ... |
| Payload signatures blob: blob_offset=1234 (64 bytes) |
| Payload signatures: (2 entries) |
| version=1, hex_data: (20 bytes) |
| 31 32 33 34 35 36 37 38 61 62 63 64 65 66 67 68 | 12345678abcdefgh |
| 00 01 02 03 | .... |
| version=None, hex_data: (34 bytes) |
| 49 20 61 6d 20 61 20 73 69 67 6e 61 74 75 72 65 | I am a signature |
| 20 73 6f 20 61 63 63 65 73 73 20 69 73 20 79 65 | so access is ye |
| 73 2e | s. |
| """ |
| self.TestCommand(payload_cmd, payload, expected_out) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |