Support streaming for lz4 recompress

This reduces peak memory usage of update_engine from
~700MB to ~400MB. As we no longer need to cache the entire patched
blocks in memory, data are written to disk as they come.

Test: th
Change-Id: I7b353dbaee4ee63e984ec2014476e3d27387e0fc
diff --git a/lz4diff/lz4patch.cc b/lz4diff/lz4patch.cc
index 7766e24..9de6d58 100644
--- a/lz4diff/lz4patch.cc
+++ b/lz4diff/lz4patch.cc
@@ -30,6 +30,7 @@
 
 #include "android-base/strings.h"
 #include "lz4diff/lz4diff.h"
+#include "lz4diff/lz4diff.pb.h"
 #include "lz4diff_compress.h"
 #include "lz4diff_format.h"
 #include "puffin/puffpatch.h"
@@ -168,45 +169,6 @@
   return err == 0;
 }
 
-bool ApplyPostfixPatch(
-    std::string_view recompressed_blob,
-    const google::protobuf::RepeatedPtrField<CompressedBlockInfo>&
-        dst_block_info,
-    Blob* output) {
-  // Output size should be always identical to size of recompressed_blob
-  output->clear();
-  output->reserve(recompressed_blob.size());
-  size_t offset = 0;
-  for (const auto& block_info : dst_block_info) {
-    auto block =
-        recompressed_blob.substr(offset, block_info.compressed_length());
-    if (!block_info.sha256_hash().empty()) {
-      Blob actual_hash;
-      CHECK(HashCalculator::RawHashOfBytes(
-          block.data(), block.size(), &actual_hash));
-      if (ToStringView(actual_hash) != block_info.sha256_hash()) {
-        LOG(ERROR) << "Block " << block_info
-                   << " is corrupted. This usually means the patch generator "
-                      "used a different version of LZ4, or an incompatible LZ4 "
-                      "patch generator was used, or LZ4 produces different "
-                      "output on different platforms. Expected hash: "
-                   << HexEncode(block_info.sha256_hash())
-                   << ", actual hash: " << HexEncode(actual_hash);
-      }
-    }
-    if (!block_info.postfix_bspatch().empty()) {
-      Blob fixed_block;
-      TEST_AND_RETURN_FALSE(
-          bspatch(block, block_info.postfix_bspatch(), &fixed_block));
-      output->insert(output->end(), fixed_block.begin(), fixed_block.end());
-    } else {
-      output->insert(output->end(), block.begin(), block.end());
-    }
-    offset += block_info.compressed_length();
-  }
-  return true;
-}
-
 bool puffpatch(std::string_view input_data,
                std::string_view patch_data,
                Blob* output) {
@@ -219,6 +181,7 @@
 std::vector<CompressedBlock> ToCompressedBlockVec(
     const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& rpf) {
   std::vector<CompressedBlock> ret;
+  ret.reserve(rpf.size());
   for (const auto& block : rpf) {
     auto& info = ret.emplace_back();
     info.compressed_length = block.compressed_length();
@@ -237,6 +200,129 @@
   return false;
 }
 
+size_t GetCompressedSize(
+    const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& info) {
+  size_t compressed_size = 0;
+  for (const auto& block : info) {
+    compressed_size += block.compressed_length();
+  }
+  return compressed_size;
+}
+
+size_t GetDecompressedSize(
+    const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& info) {
+  size_t decompressed_size = 0;
+  for (const auto& block : info) {
+    decompressed_size += block.uncompressed_length();
+  }
+  return decompressed_size;
+}
+
+bool ApplyInnerPatch(Blob decompressed_src,
+                     const Lz4diffPatch& patch,
+                     Blob* decompressed_dst) {
+  switch (patch.pb_header.inner_type()) {
+    case InnerPatchType::BSDIFF:
+      TEST_AND_RETURN_FALSE(bspatch(
+          ToStringView(decompressed_src), patch.inner_patch, decompressed_dst));
+      break;
+    case InnerPatchType::PUFFDIFF:
+      TEST_AND_RETURN_FALSE(puffpatch(
+          ToStringView(decompressed_src), patch.inner_patch, decompressed_dst));
+      break;
+    default:
+      LOG(ERROR) << "Unsupported patch type: " << patch.pb_header.inner_type();
+      return false;
+  }
+  return true;
+}
+
+// TODO(zhangkelvin) Rewrite this in C++ 20 coroutine once that's available.
+// Hand coding CPS is not fun.
+bool Lz4Patch(std::string_view src_data,
+              const Lz4diffPatch& patch,
+              const SinkFunc& sink) {
+  auto decompressed_src = TryDecompressBlob(
+      src_data,
+      ToCompressedBlockVec(patch.pb_header.src_info().block_info()),
+      patch.pb_header.src_info().zero_padding_enabled());
+  TEST_AND_RETURN_FALSE(!decompressed_src.empty());
+  Blob decompressed_dst;
+  const auto decompressed_dst_size =
+      GetDecompressedSize(patch.pb_header.dst_info().block_info());
+  decompressed_dst.reserve(decompressed_dst_size);
+
+  ApplyInnerPatch(std::move(decompressed_src), patch, &decompressed_dst);
+
+  if (!HasPosfixPatches(patch)) {
+    return TryCompressBlob(
+        ToStringView(decompressed_dst),
+        ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
+        patch.pb_header.dst_info().zero_padding_enabled(),
+        patch.pb_header.dst_info().algo(),
+        sink);
+  }
+  auto postfix_patcher =
+      [&sink,
+       block_idx = 0,
+       &dst_block_info = patch.pb_header.dst_info().block_info()](
+          const uint8_t* data, size_t size) mutable -> size_t {
+    if (block_idx >= dst_block_info.size()) {
+      return sink(data, size);
+    }
+    const auto& block_info = dst_block_info[block_idx];
+    TEST_EQ(size, block_info.compressed_length());
+    DEFER { block_idx++; };
+    if (block_info.postfix_bspatch().empty()) {
+      return sink(data, size);
+    }
+    if (!block_info.sha256_hash().empty()) {
+      Blob actual_hash;
+      TEST_AND_RETURN_FALSE(
+          HashCalculator::RawHashOfBytes(data, size, &actual_hash));
+      if (ToStringView(actual_hash) != block_info.sha256_hash()) {
+        LOG(ERROR) << "Block " << block_info
+                   << " is corrupted. This usually means the patch generator "
+                      "used a different version of LZ4, or an incompatible LZ4 "
+                      "patch generator was used, or LZ4 produces different "
+                      "output on different platforms. Expected hash: "
+                   << HexEncode(block_info.sha256_hash())
+                   << ", actual hash: " << HexEncode(actual_hash);
+        return 0;
+      }
+    }
+    Blob fixed_block;
+    TEST_AND_RETURN_FALSE(
+        bspatch(std::string_view(reinterpret_cast<const char*>(data), size),
+                block_info.postfix_bspatch(),
+                &fixed_block));
+    return sink(fixed_block.data(), fixed_block.size());
+  };
+
+  return TryCompressBlob(
+      ToStringView(decompressed_dst),
+      ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
+      patch.pb_header.dst_info().zero_padding_enabled(),
+      patch.pb_header.dst_info().algo(),
+      postfix_patcher);
+}
+
+bool Lz4Patch(std::string_view src_data,
+              const Lz4diffPatch& patch,
+              Blob* output) {
+  Blob blob;
+  const auto output_size =
+      GetCompressedSize(patch.pb_header.dst_info().block_info());
+  blob.reserve(output_size);
+  TEST_AND_RETURN_FALSE(Lz4Patch(
+      src_data, patch, [&blob](const uint8_t* data, size_t size) -> size_t {
+        blob.insert(blob.end(), data, data + size);
+        return size;
+      }));
+  *output = std::move(blob);
+  return true;
+}
+
 }  // namespace
 
 bool Lz4Patch(std::string_view src_data,
@@ -244,57 +330,15 @@
               Blob* output) {
   Lz4diffPatch patch;
   TEST_AND_RETURN_FALSE(ParseLz4DifffPatch(patch_data, &patch));
+  return Lz4Patch(src_data, patch, output);
+}
 
-  Blob decompressed_dst;
-  // This scope is here just so that |decompressed_src| can be freed earlier
-  // than function scope.
-  // This whole patching algorithm has non-trivial memory usage, as it needs to
-  // load source data in to memory and decompress that. Now both src and
-  // decompressed src data are in memory.
-  // TODO(b/206729162) Make lz4diff more memory efficient and more streaming
-  // friendly.
-  {
-    const auto decompressed_src = TryDecompressBlob(
-        src_data,
-        ToCompressedBlockVec(patch.pb_header.src_info().block_info()),
-        patch.pb_header.src_info().zero_padding_enabled());
-    switch (patch.pb_header.inner_type()) {
-      case InnerPatchType::BSDIFF:
-        TEST_AND_RETURN_FALSE(bspatch(ToStringView(decompressed_src),
-                                      patch.inner_patch,
-                                      &decompressed_dst));
-        break;
-      case InnerPatchType::PUFFDIFF:
-        TEST_AND_RETURN_FALSE(puffpatch(ToStringView(decompressed_src),
-                                        patch.inner_patch,
-                                        &decompressed_dst));
-        break;
-      default:
-        LOG(ERROR) << "Unsupported patch type: "
-                   << patch.pb_header.inner_type();
-        return false;
-    }
-  }
-
-  auto recompressed_dst = TryCompressBlob(
-      ToStringView(decompressed_dst),
-      ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
-      patch.pb_header.dst_info().zero_padding_enabled(),
-      patch.pb_header.dst_info().algo());
-  TEST_AND_RETURN_FALSE(recompressed_dst.size() > 0);
-  // free memory used by |decompressed_dst|.
-  decompressed_dst = {};
-
-  if (HasPosfixPatches(patch)) {
-    TEST_AND_RETURN_FALSE(
-        ApplyPostfixPatch(ToStringView(recompressed_dst),
-                          patch.pb_header.dst_info().block_info(),
-                          output));
-  } else {
-    *output = std::move(recompressed_dst);
-  }
-
-  return true;
+bool Lz4Patch(std::string_view src_data,
+              std::string_view patch_data,
+              const SinkFunc& sink) {
+  Lz4diffPatch patch;
+  TEST_AND_RETURN_FALSE(ParseLz4DifffPatch(patch_data, &patch));
+  return Lz4Patch(src_data, patch, sink);
 }
 
 bool Lz4Patch(const Blob& src_data, const Blob& patch_data, Blob* output) {