Byte-lingua-code / debug_single_line.py
2ira's picture
offline_compression_graph_code
72c0672 verified
import json
import base64
import argparse
import os
import re
import sys
# --- 从你提供的源代码中复制的关键函数 ---
def vread(buf: bytes, i: int):
shift = val = 0
while True:
if i >= len(buf):
raise IndexError("Buffer exhausted during vread")
b = buf[i]
i += 1
val |= (b & 0x7F) << shift
if b < 0x80:
return val, i
shift += 7
def decompress_windows_starts_lens(b64_stream: str) -> tuple[list[int], list[int]]:
try:
buf = base64.b64decode(b64_stream)
i = 0
cursor= 0
starts, lens = [], []
while i < len(buf):
gap, i = vread(buf, i)
size, i = vread(buf, i)
start = cursor + gap
length = size
starts.append(start)
lens.append(length)
cursor = start + length
return starts, lens
except (base64.binascii.Error, IndexError) as e:
print(f" [解码窗口时出错: {e}]")
return [], []
def packed_bytes_to_pseudo(b: bytes) -> list[int]:
# 来自你的源代码的 9-bit 解包器
out, acc, bits = [], 0, 0
for byte in b:
acc |= byte << bits
bits += 8
while bits >= 9:
out.append(acc & 0x1FF)
acc >>= 9
bits -= 9
# 注意:你的原始代码没有处理尾部不足9bit的情况,这里我们保持一致
return out
# --- 参数解析和 Key 构建 ---
def parse_parameters_from_path(path_name: str) -> dict:
params = {}
base_name = os.path.basename(os.path.normpath(path_name))
parts = base_name.split('_')
for part in parts:
if '-' in part:
key, value = part.split('-', 1)
params[key.lower()] = value.lower()
else:
match = re.match(r'([a-zA-Z]+)(\d+)', part)
if match:
key, value = match.groups()
params[key.lower()] = value
return params
def construct_compression_key(params: dict) -> str:
ow = params.get('ow', '20')
escape_fb = 'True' if params.get('escapefb', 'false') == 'true' else 'False'
iterative = 'True' if params.get('iterative', 'false') == 'true' else 'False'
force_padding = 'True' if params.get('forcepadding', 'false') == 'true' else 'False'
key = f"m1_ac_ow{ow}_escapefb-{escape_fb}_iterative-{iterative}_forcepadding-{force_padding}"
return key
# --- 主调试函数 ---
def debug_line(args):
print(f"--- 开始调试: 文件夹 '{args.input_dir}', 行号 {args.line_number} ---")
# 1. 找到对应的文件和行
target_line = args.line_number
current_line_count = 0
line_content = None
jsonl_files = sorted([os.path.join(r, f) for r, _, fs in os.walk(args.input_dir) for f in fs if f.endswith('.jsonl')])
if not jsonl_files:
print(f"❌ 错误: 文件夹中没有 .jsonl 文件。")
return
for file_path in jsonl_files:
with open(file_path, 'r', errors='ignore') as f:
for line in f:
current_line_count += 1
if current_line_count == target_line:
line_content = line
print(f"✅ 找到了第 {target_line} 行,位于文件: {file_path}")
break
if line_content:
break
if not line_content:
print(f"❌ 错误: 未能找到第 {target_line} 行 (总共扫描了 {current_line_count} 行)。")
return
# 2. 解析和 Key 构建
params = parse_parameters_from_path(args.input_dir)
compression_key = construct_compression_key(params)
print(f"\n[步骤 A] 构建的压缩 Key: '{compression_key}'")
try:
data = json.loads(line_content)
print(" -> JSON 加载成功。")
if compression_key not in data:
print(f" -> ❌ 错误: 构建的 Key 不在 JSON 对象中!")
print(f" JSON 中的可用 Keys: {list(data.keys())}")
return
print(" -> ✅ Key 匹配成功!")
except json.JSONDecodeError as e:
print(f"❌ 错误: JSON 解码失败: {e}")
return
# 3. 解码 windows_starts_lens_b64
print("\n[步骤 B] 解码 'windows_starts_lens_b64'")
b64_windows = data.get('windows_starts_lens_b64', '')
print(f" -> 输入的 Base64 (前64字节): '{b64_windows[:64]}...'")
starts, lens = decompress_windows_starts_lens(b64_windows)
print(f" -> 解码结果: 共有 {len(starts)} 个窗口。")
if starts:
print(f" -> 前 5 个窗口 (start, length): {list(zip(starts, lens))[:5]}")
# 4. 解码压缩数据
print(f"\n[步骤 C] 解码压缩数据字段 '{compression_key}'")
b64_compressed = data.get(compression_key, '')
print(f" -> 输入的 Base64 (前64字节): '{b64_compressed[:64]}...'")
try:
decoded_bytes = base64.b64decode(b64_compressed)
print(f" -> Base64 解码后的字节长度: {len(decoded_bytes)}")
mixed_pseudo_bytes = packed_bytes_to_pseudo(decoded_bytes)
print(f" -> `packed_bytes_to_pseudo` 输出的总元素数量: {len(mixed_pseudo_bytes)}")
print(f" -> 前 20 个元素: {mixed_pseudo_bytes[:20]}")
pseudo_tokens = [t for t in mixed_pseudo_bytes if t >= 256]
print(f" -> 过滤后 (>= 256) 的压缩 Token 数量: {len(pseudo_tokens)}")
print(f" -> 前 20 个压缩 Token: {pseudo_tokens[:20]}")
except Exception as e:
print(f" -> ❌ 在此步骤中发生错误: {e}")
return
# 5. 最终诊断
print("\n" + "="*20 + " 最终诊断 " + "="*20)
print(f"窗口数量 (来自 windows_starts_lens_b64): {len(starts)}")
print(f"压缩 Token 数量 (来自 {compression_key}): {len(pseudo_tokens)}")
if len(starts) == len(pseudo_tokens):
print("\n🟢 结论: 长度匹配!之前的脚本可能存在其他问题。")
else:
print("\n🔴 结论: 长度不匹配!这是导致100%失败的根本原因。")
print(" 这表明数据生成逻辑与我们的解析逻辑之间存在根本性的不一致。")
print(" 可能的原因:")
print(" 1. `windows_starts_lens_b64` 可能不包含所有压缩块的信息(例如,跳过了某些小块)。")
print(" 2. 最终的压缩流中可能包含了一些不对应于 `windows` 的特殊符号。")
print(" 3. `packed_bytes_to_pseudo` 的行为可能比我们想象的更复杂。")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="调试单行压缩数据,以找出长度不匹配的根本原因。",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument("input_dir", type=str, help="包含 .jsonl 数据文件的输入文件夹路径。")
parser.add_argument("--line_number", type=int, required=True, help="要检查的具体行号 (从1开始)。")
args = parser.parse_args()
debug_line(args)