| # import mmap |
| |
| import struct |
| |
| LZ4_FRAME_MAGIC = b"\x04\x22\x4D\x18" |
| |
| |
| def scan_legacy_lz4_frames(data): |
| LZ4_LEGACY_FRAME_MAGIC = b"\x02\x21\x4C\x18" |
| index = 0 |
| while index < len(data): |
| try: |
| index = data.index(LZ4_LEGACY_FRAME_MAGIC, index) |
| print("Legacy Lz4 frame at {}".format(index)) |
| index += 4 |
| while index < len(data): |
| magic = data[index:index+4] |
| if magic == LZ4_LEGACY_FRAME_MAGIC or magic == LZ4_FRAME_MAGIC: |
| break |
| (csize,) = struct.unpack("<L", magic) |
| if index + 4 + csize >= len(data) or csize == 0: |
| break |
| print("Legacy lz4 block at {}, compressed data size {}".format(index, csize)) |
| index += csize |
| |
| except ValueError: |
| break |
| |
| |
| def scan_lz4_frames(data): |
| index = 0 |
| while index < len(data): |
| try: |
| index = data.index(LZ4_FRAME_MAGIC, index) |
| frame_offset = index |
| index += 4 |
| flag = data[index] |
| block_descriptor = data[index+1] |
| block_checksum_present = flag & 0x10 != 0 |
| content_size_present = flag & 0x8 != 0 |
| content_checksum_present = flag & 0x4 != 0 |
| dictionary_id = flag & 0x1 != 0 |
| index += 2 |
| content_size = None |
| if content_size_present: |
| content_size = struct.unpack("<Q", data[index:index+8]) |
| index += 8 |
| if dictionary_id: |
| dictionary_id = struct.unpack("<L", data[index:index+4]) |
| index += 4 |
| header_checksum = data[index:index+1] |
| index += 1 |
| print("Lz4 frame at {}, content size: {}".format( |
| frame_offset, content_size)) |
| while index < len(data): |
| (block_size,) = struct.unpack("<L", data[index:index+4]) |
| uncompressed = block_size & 0x80000000 != 0 |
| block_size &= 0x7FFFFFFF |
| index += 4 |
| index += block_size |
| if index >= len(data) or block_size == 0: |
| break |
| print("Block uncompressed: {}, size: {}".format(uncompressed, block_size)) |
| except ValueError: |
| break |
| |
| |
| def main(argv): |
| if len(argv) != 2: |
| print("Usage:", argv[0], "<path to a file>") |
| return 1 |
| path = argv[1] |
| |
| with open(path, "rb") as fp: |
| data = fp.read() |
| scan_legacy_lz4_frames(data) |
| scan_lz4_frames(data) |
| |
| |
| if __name__ == '__main__': |
| import sys |
| sys.exit(main(sys.argv)) |