diff --git a/README.md b/README.md index 1e5a5d4..9644eb3 100644 --- a/README.md +++ b/README.md @@ -1 +1,4 @@ -# bitloss \ No newline at end of file +# bitloss +These are prototype projects. + +I tried to do this using AI. Because I couldn't get the AI ​​to accept the algorithms, the examples in the branches are malfunctioning. Main is fine. diff --git a/bitloss.py b/bitloss.py index 488aca4..059fb01 100644 --- a/bitloss.py +++ b/bitloss.py @@ -1,35 +1,98 @@ --------------------------------------------------------------- +#-------------------------------------------------------------- # # MIT LICENSE # MESUT ERTURHAN # https://github.com/piyxu/bitloss --------------------------------------------------------------- +# +#-------------------------------------------------------------- +""" +Bitloss encoder/decoder with multi-CPU support. + +Encoding flow per 256-bit block: + 1. Map raw block to R in [0, 16*C(256,128)) and run encode256 (metadata + balanced bits). + 2. Drop the last balanced bit, rank the remaining 255 bits with fixed k=128. + 3. Store 4-bit metadata + 251-bit rank for a 255-bit payload. + +Uses multiprocessing to process blocks in parallel (up to 50% CPU). +""" -import random +from __future__ import annotations + +import argparse +import sys +import os from math import comb +from pathlib import Path +from typing import Iterable, List, Tuple +from multiprocessing import Pool, cpu_count +C256 = comb(256, 128) +MASK_256 = (1 << 256) - 1 +MAX_R = 16 * C256 -def random_256_k128(): - """Generate a random 256-bit sequence with exactly 128 ones.""" - positions = random.sample(range(256), 128) - bits = ["0"] * 256 - for p in positions: - bits[p] = "1" - return "".join(bits) +def unrank_nk(n: int, k: int, r: int) -> List[int]: + """Lexicographic combinational unrank with fixed k.""" + bits = [] + rem = k + for i in range(n): + z = comb(n - i - 1, rem) if rem <= (n - i - 1) else 0 + if r < z: + bits.append(0) + else: + bits.append(1) + r -= z + rem -= 1 + return bits -def true_rank_fixed_k128(bits255): - """ - Compute the true combinational rank (C(n,k) based) - using FIXED k = 128 even if k255 becomes 127. - """ + +def rank_nk(bits: Iterable[int], k_fixed: int) -> int: + """Lexicographic rank.""" + bits = list(bits) + n = len(bits) + rem = k_fixed + r = 0 + for i, b in enumerate(bits): + if b == 1: + z = comb(n - i - 1, rem) + r += z + rem -= 1 + return r + + +def unrank_256_128(r: int) -> List[int]: + return unrank_nk(256, 128, r) + + +def rank_256_128(bits: Iterable[int]) -> int: + return rank_nk(bits, 128) + + +def encode256(r_value: int) -> Tuple[List[int], int]: + if not (0 <= r_value < MAX_R): + raise ValueError("R must be in [0, 16*C(256,128))") + meta = r_value // C256 + idx = r_value % C256 + bits256 = unrank_256_128(idx) + return bits256, meta + + +def decode256(bits256: Iterable[int], meta: int) -> int: + bits256 = list(bits256) + idx = rank_256_128(bits256) + return meta * C256 + idx + + +def true_rank_fixed_k128(bits255: Iterable[int]) -> int: + """Rank implementation (fixed k=128 on 255 bits).""" + bits = list(bits255) n = 255 k = 128 r = 0 rem = k - for i, b in enumerate(bits255): - if b == "1": + for i, b in enumerate(bits): + if b == 1: r += comb(n - i - 1, rem - 1) rem -= 1 if rem == 0: @@ -37,64 +100,582 @@ def true_rank_fixed_k128(bits255): return r -def pad_rank_to_251(rank_int): - """Convert rank to binary and left-pad with zeros to make exactly 251 bits.""" - b = bin(rank_int)[2:] # remove '0b' - if len(b) < 251: - b = '0' * (251 - len(b)) + b - return b +def unrank_true_fixed_k128(rank_value: int) -> List[int]: + """Inverse of true_rank_fixed_k128.""" + n = 255 + rem = 128 + bits: List[int] = [] + + for i in range(n): + if rem == 0: + bits.append(0) + continue + z = comb(n - i - 1, rem - 1) + if rank_value < z: + bits.append(0) + else: + bits.append(1) + rank_value -= z + rem -= 1 + return bits + + +def decode_missing_bit(bits255: List[int]) -> int: + """Recover the (lost) 256th bit.""" + k255 = sum(bits255) + if k255 == 128: + return 0 + if k255 == 127: + return 1 + return 0 + + +def int_to_bits(value: int, width: int) -> List[int]: + return [(value >> (width - 1 - i)) & 1 for i in range(width)] + + +def bytes_to_bits(data: bytes) -> List[int]: + bits: List[int] = [] + for byte in data: + for shift in range(7, -1, -1): + bits.append((byte >> shift) & 1) + return bits + + +def bits_to_bytes(bits: List[int]) -> bytes: + if len(bits) % 8 != 0: + raise ValueError("Bit length must be a multiple of 8") + out = bytearray() + for i in range(0, len(bits), 8): + value = 0 + for bit in bits[i : i + 8]: + value = (value << 1) | bit + out.append(value) + return bytes(out) + + +def bits_to_bitstring(bits: List[int]) -> str: + """Convert list of bits to string representation.""" + return "".join(str(bit) for bit in bits) + + +def bitstring_to_bits(bitstring: str) -> List[int]: + """Convert string representation to list of bits.""" + return [int(c) for c in bitstring if c in '01'] + + +class BitWriter: + def __init__(self) -> None: + self.bits: List[int] = [] + self.total_bits = 0 + + def write_bits(self, bits: Iterable[int]) -> None: + for bit in bits: + self.bits.append(bit & 1) + self.total_bits += 1 + + def get_bitstring(self) -> str: + """Return all bits as a string.""" + return bits_to_bitstring(self.bits) + + def get_bits(self) -> List[int]: + """Return all bits as a list.""" + return self.bits.copy() + +class BitReader: + def __init__(self, bitstring: str) -> None: + self.bits = bitstring_to_bits(bitstring) + self.pos = 0 + self.useful_bits = len(self.bits) -def decode_one(bits255, k255): + def _read_bit(self) -> int: + if self.pos >= self.useful_bits: + raise ValueError("Attempting to read past the end of bit stream") + bit = self.bits[self.pos] + self.pos += 1 + return bit + + def read_bits(self, count: int) -> List[int]: + return [self._read_bit() for _ in range(count)] + + def read_int(self, count: int) -> int: + value = 0 + for _ in range(count): + value = (value << 1) | self._read_bit() + return value + + def remaining(self) -> int: + return self.useful_bits - self.pos + + +def encode_block_worker(args: Tuple[int, bytes]) -> Tuple[int, List[int]]: + """Worker function for parallel block encoding.""" + block_idx, chunk = args + block_value = int.from_bytes(chunk, "big") + overflow = 1 if block_value >= MAX_R else 0 + if overflow: + block_value -= MAX_R + + bits256, meta = encode256(block_value) + bits255 = bits256[:-1] + rank_value = true_rank_fixed_k128(bits255) + + meta_bits = int_to_bits(meta, 4) + rank_bits = int_to_bits(rank_value, 251) + + result = meta_bits + rank_bits + return block_idx, result + + +def decode_block_worker(args: Tuple[int, int, int]) -> Tuple[int, bytes]: + """Worker function for parallel block decoding.""" + block_idx, meta, rank_value = args + + bits255 = unrank_true_fixed_k128(rank_value) + missing_bit = decode_missing_bit(bits255) + bits256 = bits255 + [missing_bit] + r_value = decode256(bits256, meta) + block_value = r_value + block_bytes = (block_value & MASK_256).to_bytes(32, "big") + + return block_idx, block_bytes + + +def encode_once(src: Path, dest: Path, stage: int = 1, in_memory_data: str = None) -> str: """ - Recover the last bit using only k255: - k255 == 128 β†’ missing bit = '0' - k255 == 127 β†’ missing bit = '1' + Encode once. Returns bitstring for in-memory processing. + If in_memory_data is provided, use it instead of reading from file. """ - if k255 == 128: - missing = "0" - elif k255 == 127: - missing = "1" + payload_writer = BitWriter() + + if in_memory_data is not None: + # Use in-memory data from previous stage + reader = BitReader(in_memory_data) + + # Skip headers (256 bits hash + 8 bits header map + 8 bits padding flag) + reader.read_bits(256) + reader.read_int(8) + + padding_flag = reader.read_int(8) + has_padding = (padding_flag >> 7) & 1 + padding_bits = padding_flag & 0x7F + + # Read remaining bits as source data + source_bits = reader.read_bits(reader.remaining()) + + # Remove padding bits if they exist + if has_padding and padding_bits > 0: + source_bits = source_bits[:-padding_bits] + + while len(source_bits) % 8 != 0: + source_bits.append(0) + source_bytes = bits_to_bytes(source_bits) + elif src is not None: + # Read from file + source_bytes = src.read_bytes() + else: + raise ValueError("Either src or in_memory_data must be provided") + + # Process in 32-byte chunks with multiprocessing + full_blocks = len(source_bytes) // 32 + remainder_bytes = len(source_bytes) % 32 + + if full_blocks > 0: + # Prepare blocks for parallel processing + block_data = [(i, source_bytes[i*32:(i+1)*32]) for i in range(full_blocks)] + + # Use 50% of CPU cores + num_workers = max(1, cpu_count() // 2) + + print(f"Encoding {full_blocks} blocks with {num_workers} workers...", end='', flush=True) + + with Pool(processes=num_workers) as pool: + results = pool.map(encode_block_worker, block_data) + + print(f" [DONE]") + + # Sort by block index and write in order + results.sort(key=lambda x: x[0]) + + # Debug: verify order + for i, (block_idx, block_bits) in enumerate(results): + if block_idx != i: + raise ValueError(f"Block order mismatch: expected {i}, got {block_idx}") + payload_writer.write_bits(block_bits) + + # Handle remainder bytes + if remainder_bytes > 0: + remainder = source_bytes[full_blocks * 32:] + payload_writer.write_bits(bytes_to_bits(remainder)) + + # Calculate padding needed + payload_bits = payload_writer.total_bits + padding_needed = (8 - (payload_bits % 8)) % 8 + + if padding_needed > 0: + payload_writer.write_bits([0] * padding_needed) + + # Create flags + has_padding = 1 if padding_needed > 0 else 0 + padding_flag = (has_padding << 7) | padding_needed + header_map = 0 + + # Final output with dummy hash (256 zeros) + final_writer = BitWriter() + final_writer.write_bits([0] * 256) # Dummy hash + final_writer.write_bits(int_to_bits(header_map, 8)) + final_writer.write_bits(int_to_bits(padding_flag, 8)) + final_writer.write_bits(payload_writer.get_bits()) + + # Get bitstring + bit_payload = final_writer.get_bitstring() + + # Write to file only if dest is provided + if dest is not None: + with open(dest, "w") as fout: + fout.write(bit_payload) + + return bit_payload + + +def decode_once(src: Path, dest: Path, is_final: bool = False, in_memory_data: str = None) -> str: + """ + Decode once. Returns bitstring for in-memory processing. + If in_memory_data is provided, use it instead of reading from file. + """ + # Read bitstring + if in_memory_data is not None: + bitstring = in_memory_data + else: + bitstring = src.read_text().strip() + + reader = BitReader(bitstring) + + # Skip hash (256 bits) + reader.read_bits(256) + + # Read header map (8 bits) + reader.read_int(8) + + # Read padding flag + padding_flag = reader.read_int(8) + has_padding = (padding_flag >> 7) & 1 + padding_bits = padding_flag & 0x7F + + payload_bits = reader.remaining() + + # Calculate blocks and remainder + full_blocks = payload_bits // 255 + remainder_bits = payload_bits % 255 + + if full_blocks > 0: + # Prepare blocks for parallel processing + block_data = [] + for block_index in range(full_blocks): + meta = reader.read_int(4) + rank_value = reader.read_int(251) + block_data.append((block_index, meta, rank_value)) + + # Use 50% of CPU cores + num_workers = max(1, cpu_count() // 2) + + print(f"Decoding {full_blocks} blocks with {num_workers} workers...", end='', flush=True) + + with Pool(processes=num_workers) as pool: + results = pool.map(decode_block_worker, block_data) + + print(f" [DONE]") + + # Sort by block index and collect bytes + results.sort(key=lambda x: x[0]) + + # Debug: verify order + out_bits = [] + for i, (block_idx, block_bytes) in enumerate(results): + if block_idx != i: + raise ValueError(f"Block order mismatch: expected {i}, got {block_idx}") + out_bits.extend(bytes_to_bits(block_bytes)) + else: + out_bits = [] + + # Process remainder bits if any + if remainder_bits > 0: + tail_bits = reader.read_bits(remainder_bits) + out_bits.extend(tail_bits) + + # Remove padding bits if they exist + if has_padding and padding_bits > 0: + out_bits = out_bits[:-padding_bits] + + # Write output + if is_final: + # Final decode: write as binary + while len(out_bits) % 8 != 0: + out_bits.append(0) + out_bytes = bits_to_bytes(out_bits) + if dest is not None: + with open(dest, "wb") as fout: + fout.write(out_bytes) + return None else: - missing = "0" - return bits255 + missing + # Intermediate stage: return bitstring + bit_output = bits_to_bitstring(out_bits) + if dest is not None: + with open(dest, "w") as fout: + fout.write(bit_output) + return bit_output + + +def hyphenated_name(src: Path) -> str: + name = src.name + if "." in name: + stem, ext = name.rsplit(".", 1) + return f"{stem}-{ext}" + return name + + +def encode_stage_name(base: str, iteration: int) -> str: + if iteration < 1: + raise ValueError("Iteration must be >= 1") + return f"{base}.btl" if iteration == 1 else f"{base}.{iteration}.btl" + + +def encoded_filename(src: Path, repeat: int) -> Path: + base = hyphenated_name(src) + return src.with_name(encode_stage_name(base, repeat)) + +def run_encode(path_str: str, repeat: int) -> Path: + src = Path(path_str) + if repeat < 1: + raise ValueError("Repeat count must be >= 1") + base = hyphenated_name(src) + final_path = encoded_filename(src, repeat) -def run_experiment(num_tests=10, seed=0): - random.seed(seed) - results = [] + # In-memory processing + in_memory_data = None + + for iteration in range(1, repeat + 1): + print(f"\nStage: {iteration}/{repeat}") + + if iteration == 1: + # First stage: read from file + in_memory_data = encode_once(src, None, iteration, None) + else: + # Subsequent stages: use in-memory data + in_memory_data = encode_once(None, None, iteration, in_memory_data) + + # Write final result to file + dest_name = encode_stage_name(base, repeat) + dest = src.with_name(dest_name) + with open(dest, "w") as fout: + fout.write(in_memory_data) + + print(f"\nFinal encoded file written to disk") - for i in range(num_tests): - bits256 = random_256_k128() - bits255 = bits256[:-1] - k255 = bits255.count("1") + return final_path - # compute true rank (with fixed k = 128) - r = true_rank_fixed_k128(bits255) - rank_bits = r.bit_length() - # pad rank to 251 bits - rank_padded = pad_rank_to_251(r) +def parse_encoded_name(path: Path) -> Tuple[str, int]: + name = path.name + if not name.endswith(".btl"): + raise ValueError("Encoded file must end with .btl") + base = name[:-4] + repeat = 1 + if "." in base: + candidate, suffix = base.rsplit(".", 1) + if suffix.isdigit(): + base = candidate + repeat = int(suffix) + if repeat < 1: + raise ValueError("Repeat count inferred from file name must be >= 1") + return base, repeat + + +def restore_original_name(base: str) -> str: + if "-" not in base: + return base + stem, ext = base.rsplit("-", 1) + return f"{stem}.{ext}" + + +def run_decode(path_str: str) -> Path: + src = Path(path_str) + base, repeat = parse_encoded_name(src) + final_name = restore_original_name(base) + + # Read initial file + in_memory_data = src.read_text().strip() + + # In-memory processing + for stage_counter in range(1, repeat + 1): + is_final = (stage_counter == repeat) + + print(f"\nStage: {stage_counter}/{repeat}") + + if is_final: + # Final stage: write to file + dest = src.with_name(final_name) + decode_once(None, dest, is_final=True, in_memory_data=in_memory_data) + print(f"\nFinal decoded file written to disk") + else: + # Intermediate stages: keep in memory + in_memory_data = decode_once(None, None, is_final=False, in_memory_data=in_memory_data) + + return src.with_name(final_name) + + +def run_test(path_str: str, repeat: int) -> bool: + """Test mode: encode then decode and compare with original.""" + src = Path(path_str) + if not src.exists(): + raise FileNotFoundError(f"File not found: {src}") + + print(f"\n{'='*60}") + print(f"TEST MODE: {src.name} (repeat={repeat})") + print(f"{'='*60}\n") + + # Read original file + original_data = src.read_bytes() + original_size = len(original_data) + + print(f"[1/4] Original file:") + print(f" Size: {original_size} bytes\n") + + # Encode + print(f"[2/4] Encoding (repeat={repeat})...") + encoded_path = run_encode(path_str, repeat) + encoded_size = encoded_path.stat().st_size + print(f" Encoded -> {encoded_path.name}") + print(f" Size: {encoded_size} bytes\n") + + # Decode + print(f"[3/4] Decoding...") + decoded_path = run_decode(str(encoded_path)) + decoded_size = decoded_path.stat().st_size + print(f" Decoded -> {decoded_path.name}") + print(f" Size: {decoded_size} bytes\n") + + # Compare + print(f"[4/4] Verification:") + decoded_data = decoded_path.read_bytes() + + # Size check + size_match = (len(decoded_data) == original_size) + print(f" Size match: {'[OK]' if size_match else '[FAILED]'}") + if not size_match: + print(f" Original: {original_size} bytes") + print(f" Decoded: {len(decoded_data)} bytes") + + # Byte-by-byte check + byte_match = (decoded_data == original_data) + print(f" Byte-by-byte: {'[OK]' if byte_match else '[FAILED]'}") + + if not byte_match and size_match: + for i, (a, b) in enumerate(zip(original_data, decoded_data)): + if a != b: + print(f" First byte diff at position {i}: {a:02x} != {b:02x}") + break + + # Bit-by-bit check + original_bits = bytes_to_bits(original_data) + decoded_bits = bytes_to_bits(decoded_data) + + bit_match = (original_bits == decoded_bits) + print(f" Bit-by-bit: {'[OK]' if bit_match else '[FAILED]'}") + + if not bit_match: + min_len = min(len(original_bits), len(decoded_bits)) + bit_diffs = sum(1 for i in range(min_len) if original_bits[i] != decoded_bits[i]) + + print(f" Original bits: {len(original_bits)}") + print(f" Decoded bits: {len(decoded_bits)}") + print(f" Bit differences: {bit_diffs}") + + for i in range(min_len): + if original_bits[i] != decoded_bits[i]: + byte_pos = i // 8 + bit_pos = i % 8 + print(f" First bit diff at bit {i} (byte {byte_pos}, bit {bit_pos}): {original_bits[i]} != {decoded_bits[i]}") + break + + # Compression ratio + ratio = (encoded_size / original_size) * 100 if original_size > 0 else 0 + print(f"\n Compression ratio: {ratio:.2f}%") + + # Save comparison files + test_dir = src.parent / "test_output" + test_dir.mkdir(exist_ok=True) + + original_copy = test_dir / f"original_{src.name}" + decoded_copy = test_dir / f"decoded_{src.name}" + + import shutil + shutil.copy2(src, original_copy) + shutil.copy2(decoded_path, decoded_copy) + + print(f"\n Test files saved to: {test_dir}") + print(f" - {original_copy.name}") + print(f" - {decoded_copy.name}") + + # Overall result + success = size_match and byte_match and bit_match + print(f"\n{'='*60}") + if success: + print(f"TEST RESULT: [SUCCESS] All checks passed!") + else: + print(f"TEST RESULT: [FAILED] Some checks failed!") + print(f"{'='*60}\n") + + return success - # decode using k255 rule - decoded = decode_one(bits255, k255) - match = (decoded == bits256) - print(f"--- Test {i+1} ---") - print(f"Original last bit : {bits256[-1]}") - print(f"255-bit weight (k255) : {k255}") - print(f"Actual rank bit length : {rank_bits}") - print(f"Padded rank length : {len(rank_padded)} (ALWAYS 251)") - print(f"Recovered last bit : {decoded[-1]}") - print(f"Match : {match}") - print() +def parse_args(argv: List[str]) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Bitloss encoder / decoder with multi-CPU support") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("-e", "--encode", nargs="+", metavar=("FILE", "REPEAT"), help="Encode file; optional repeat count") + group.add_argument("-d", "--decode", metavar="FILE", help="Decode file (repeat inferred from name)") + group.add_argument("-t", "--test", nargs="+", metavar=("FILE", "REPEAT"), help="Test mode: encode then decode and compare; optional repeat count") + return parser.parse_args(argv) - results.append(rank_bits) - print("--------- SUMMARY ----------") - print("Actual rank bit lengths:", results) - print("----------------------------") +def main(argv: List[str]) -> int: + args = parse_args(argv) + try: + if args.encode is not None: + if not (1 <= len(args.encode) <= 2): + raise ValueError("Encode requires FILE and optional REPEAT") + src = args.encode[0] + repeat = int(args.encode[1]) if len(args.encode) == 2 else 1 + + if repeat < 1: + repeat = 1 + + output = run_encode(src, repeat) + print(f"\nEncoded -> {output}") + elif args.test is not None: + if not (1 <= len(args.test) <= 2): + raise ValueError("Test requires FILE and optional REPEAT") + src = args.test[0] + repeat = int(args.test[1]) if len(args.test) == 2 else 1 + + if repeat < 1: + repeat = 1 + + success = run_test(src, repeat) + return 0 if success else 1 + else: + src = args.decode + output = run_decode(src) + print(f"\nDecoded -> {output}") + except Exception as exc: + print(f"Error: {exc}", file=sys.stderr) + import traceback + traceback.print_exc() + return 1 + return 0 if __name__ == "__main__": - run_experiment() + raise SystemExit(main(sys.argv[1:]))