Support streaming for lz4 recompress
This reduces peak memory usage of update_engine from
~700MB to ~400MB. As we no longer need to cache the entire patched
blocks in memory, data are written to disk as they come.
Test: th
Change-Id: I7b353dbaee4ee63e984ec2014476e3d27387e0fc
diff --git a/lz4diff/lz4patch.cc b/lz4diff/lz4patch.cc
index 7766e24..9de6d58 100644
--- a/lz4diff/lz4patch.cc
+++ b/lz4diff/lz4patch.cc
@@ -30,6 +30,7 @@
#include "android-base/strings.h"
#include "lz4diff/lz4diff.h"
+#include "lz4diff/lz4diff.pb.h"
#include "lz4diff_compress.h"
#include "lz4diff_format.h"
#include "puffin/puffpatch.h"
@@ -168,45 +169,6 @@
return err == 0;
}
-bool ApplyPostfixPatch(
- std::string_view recompressed_blob,
- const google::protobuf::RepeatedPtrField<CompressedBlockInfo>&
- dst_block_info,
- Blob* output) {
- // Output size should be always identical to size of recompressed_blob
- output->clear();
- output->reserve(recompressed_blob.size());
- size_t offset = 0;
- for (const auto& block_info : dst_block_info) {
- auto block =
- recompressed_blob.substr(offset, block_info.compressed_length());
- if (!block_info.sha256_hash().empty()) {
- Blob actual_hash;
- CHECK(HashCalculator::RawHashOfBytes(
- block.data(), block.size(), &actual_hash));
- if (ToStringView(actual_hash) != block_info.sha256_hash()) {
- LOG(ERROR) << "Block " << block_info
- << " is corrupted. This usually means the patch generator "
- "used a different version of LZ4, or an incompatible LZ4 "
- "patch generator was used, or LZ4 produces different "
- "output on different platforms. Expected hash: "
- << HexEncode(block_info.sha256_hash())
- << ", actual hash: " << HexEncode(actual_hash);
- }
- }
- if (!block_info.postfix_bspatch().empty()) {
- Blob fixed_block;
- TEST_AND_RETURN_FALSE(
- bspatch(block, block_info.postfix_bspatch(), &fixed_block));
- output->insert(output->end(), fixed_block.begin(), fixed_block.end());
- } else {
- output->insert(output->end(), block.begin(), block.end());
- }
- offset += block_info.compressed_length();
- }
- return true;
-}
-
bool puffpatch(std::string_view input_data,
std::string_view patch_data,
Blob* output) {
@@ -219,6 +181,7 @@
std::vector<CompressedBlock> ToCompressedBlockVec(
const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& rpf) {
std::vector<CompressedBlock> ret;
+ ret.reserve(rpf.size());
for (const auto& block : rpf) {
auto& info = ret.emplace_back();
info.compressed_length = block.compressed_length();
@@ -237,6 +200,129 @@
return false;
}
+size_t GetCompressedSize(
+ const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& info) {
+ size_t compressed_size = 0;
+ for (const auto& block : info) {
+ compressed_size += block.compressed_length();
+ }
+ return compressed_size;
+}
+
+size_t GetDecompressedSize(
+ const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& info) {
+ size_t decompressed_size = 0;
+ for (const auto& block : info) {
+ decompressed_size += block.uncompressed_length();
+ }
+ return decompressed_size;
+}
+
+bool ApplyInnerPatch(Blob decompressed_src,
+ const Lz4diffPatch& patch,
+ Blob* decompressed_dst) {
+ switch (patch.pb_header.inner_type()) {
+ case InnerPatchType::BSDIFF:
+ TEST_AND_RETURN_FALSE(bspatch(
+ ToStringView(decompressed_src), patch.inner_patch, decompressed_dst));
+ break;
+ case InnerPatchType::PUFFDIFF:
+ TEST_AND_RETURN_FALSE(puffpatch(
+ ToStringView(decompressed_src), patch.inner_patch, decompressed_dst));
+ break;
+ default:
+ LOG(ERROR) << "Unsupported patch type: " << patch.pb_header.inner_type();
+ return false;
+ }
+ return true;
+}
+
+// TODO(zhangkelvin) Rewrite this in C++ 20 coroutine once that's available.
+// Hand coding CPS is not fun.
+bool Lz4Patch(std::string_view src_data,
+ const Lz4diffPatch& patch,
+ const SinkFunc& sink) {
+ auto decompressed_src = TryDecompressBlob(
+ src_data,
+ ToCompressedBlockVec(patch.pb_header.src_info().block_info()),
+ patch.pb_header.src_info().zero_padding_enabled());
+ TEST_AND_RETURN_FALSE(!decompressed_src.empty());
+ Blob decompressed_dst;
+ const auto decompressed_dst_size =
+ GetDecompressedSize(patch.pb_header.dst_info().block_info());
+ decompressed_dst.reserve(decompressed_dst_size);
+
+ ApplyInnerPatch(std::move(decompressed_src), patch, &decompressed_dst);
+
+ if (!HasPosfixPatches(patch)) {
+ return TryCompressBlob(
+ ToStringView(decompressed_dst),
+ ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
+ patch.pb_header.dst_info().zero_padding_enabled(),
+ patch.pb_header.dst_info().algo(),
+ sink);
+ }
+ auto postfix_patcher =
+ [&sink,
+ block_idx = 0,
+ &dst_block_info = patch.pb_header.dst_info().block_info()](
+ const uint8_t* data, size_t size) mutable -> size_t {
+ if (block_idx >= dst_block_info.size()) {
+ return sink(data, size);
+ }
+ const auto& block_info = dst_block_info[block_idx];
+ TEST_EQ(size, block_info.compressed_length());
+ DEFER { block_idx++; };
+ if (block_info.postfix_bspatch().empty()) {
+ return sink(data, size);
+ }
+ if (!block_info.sha256_hash().empty()) {
+ Blob actual_hash;
+ TEST_AND_RETURN_FALSE(
+ HashCalculator::RawHashOfBytes(data, size, &actual_hash));
+ if (ToStringView(actual_hash) != block_info.sha256_hash()) {
+ LOG(ERROR) << "Block " << block_info
+ << " is corrupted. This usually means the patch generator "
+ "used a different version of LZ4, or an incompatible LZ4 "
+ "patch generator was used, or LZ4 produces different "
+ "output on different platforms. Expected hash: "
+ << HexEncode(block_info.sha256_hash())
+ << ", actual hash: " << HexEncode(actual_hash);
+ return 0;
+ }
+ }
+ Blob fixed_block;
+ TEST_AND_RETURN_FALSE(
+ bspatch(std::string_view(reinterpret_cast<const char*>(data), size),
+ block_info.postfix_bspatch(),
+ &fixed_block));
+ return sink(fixed_block.data(), fixed_block.size());
+ };
+
+ return TryCompressBlob(
+ ToStringView(decompressed_dst),
+ ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
+ patch.pb_header.dst_info().zero_padding_enabled(),
+ patch.pb_header.dst_info().algo(),
+ postfix_patcher);
+}
+
+bool Lz4Patch(std::string_view src_data,
+ const Lz4diffPatch& patch,
+ Blob* output) {
+ Blob blob;
+ const auto output_size =
+ GetCompressedSize(patch.pb_header.dst_info().block_info());
+ blob.reserve(output_size);
+ TEST_AND_RETURN_FALSE(Lz4Patch(
+ src_data, patch, [&blob](const uint8_t* data, size_t size) -> size_t {
+ blob.insert(blob.end(), data, data + size);
+ return size;
+ }));
+ *output = std::move(blob);
+ return true;
+}
+
} // namespace
bool Lz4Patch(std::string_view src_data,
@@ -244,57 +330,15 @@
Blob* output) {
Lz4diffPatch patch;
TEST_AND_RETURN_FALSE(ParseLz4DifffPatch(patch_data, &patch));
+ return Lz4Patch(src_data, patch, output);
+}
- Blob decompressed_dst;
- // This scope is here just so that |decompressed_src| can be freed earlier
- // than function scope.
- // This whole patching algorithm has non-trivial memory usage, as it needs to
- // load source data in to memory and decompress that. Now both src and
- // decompressed src data are in memory.
- // TODO(b/206729162) Make lz4diff more memory efficient and more streaming
- // friendly.
- {
- const auto decompressed_src = TryDecompressBlob(
- src_data,
- ToCompressedBlockVec(patch.pb_header.src_info().block_info()),
- patch.pb_header.src_info().zero_padding_enabled());
- switch (patch.pb_header.inner_type()) {
- case InnerPatchType::BSDIFF:
- TEST_AND_RETURN_FALSE(bspatch(ToStringView(decompressed_src),
- patch.inner_patch,
- &decompressed_dst));
- break;
- case InnerPatchType::PUFFDIFF:
- TEST_AND_RETURN_FALSE(puffpatch(ToStringView(decompressed_src),
- patch.inner_patch,
- &decompressed_dst));
- break;
- default:
- LOG(ERROR) << "Unsupported patch type: "
- << patch.pb_header.inner_type();
- return false;
- }
- }
-
- auto recompressed_dst = TryCompressBlob(
- ToStringView(decompressed_dst),
- ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
- patch.pb_header.dst_info().zero_padding_enabled(),
- patch.pb_header.dst_info().algo());
- TEST_AND_RETURN_FALSE(recompressed_dst.size() > 0);
- // free memory used by |decompressed_dst|.
- decompressed_dst = {};
-
- if (HasPosfixPatches(patch)) {
- TEST_AND_RETURN_FALSE(
- ApplyPostfixPatch(ToStringView(recompressed_dst),
- patch.pb_header.dst_info().block_info(),
- output));
- } else {
- *output = std::move(recompressed_dst);
- }
-
- return true;
+bool Lz4Patch(std::string_view src_data,
+ std::string_view patch_data,
+ const SinkFunc& sink) {
+ Lz4diffPatch patch;
+ TEST_AND_RETURN_FALSE(ParseLz4DifffPatch(patch_data, &patch));
+ return Lz4Patch(src_data, patch, sink);
}
bool Lz4Patch(const Blob& src_data, const Blob& patch_data, Blob* output) {