import json import base64 import argparse import os import re import sys # --- 从你提供的源代码中复制的关键函数 --- def vread(buf: bytes, i: int): shift = val = 0 while True: if i >= len(buf): raise IndexError("Buffer exhausted during vread") b = buf[i] i += 1 val |= (b & 0x7F) << shift if b < 0x80: return val, i shift += 7 def decompress_windows_starts_lens(b64_stream: str) -> tuple[list[int], list[int]]: try: buf = base64.b64decode(b64_stream) i = 0 cursor= 0 starts, lens = [], [] while i < len(buf): gap, i = vread(buf, i) size, i = vread(buf, i) start = cursor + gap length = size starts.append(start) lens.append(length) cursor = start + length return starts, lens except (base64.binascii.Error, IndexError) as e: print(f" [解码窗口时出错: {e}]") return [], [] def packed_bytes_to_pseudo(b: bytes) -> list[int]: # 来自你的源代码的 9-bit 解包器 out, acc, bits = [], 0, 0 for byte in b: acc |= byte << bits bits += 8 while bits >= 9: out.append(acc & 0x1FF) acc >>= 9 bits -= 9 # 注意:你的原始代码没有处理尾部不足9bit的情况,这里我们保持一致 return out # --- 参数解析和 Key 构建 --- def parse_parameters_from_path(path_name: str) -> dict: params = {} base_name = os.path.basename(os.path.normpath(path_name)) parts = base_name.split('_') for part in parts: if '-' in part: key, value = part.split('-', 1) params[key.lower()] = value.lower() else: match = re.match(r'([a-zA-Z]+)(\d+)', part) if match: key, value = match.groups() params[key.lower()] = value return params def construct_compression_key(params: dict) -> str: ow = params.get('ow', '20') escape_fb = 'True' if params.get('escapefb', 'false') == 'true' else 'False' iterative = 'True' if params.get('iterative', 'false') == 'true' else 'False' force_padding = 'True' if params.get('forcepadding', 'false') == 'true' else 'False' key = f"m1_ac_ow{ow}_escapefb-{escape_fb}_iterative-{iterative}_forcepadding-{force_padding}" return key # --- 主调试函数 --- def debug_line(args): print(f"--- 开始调试: 文件夹 '{args.input_dir}', 行号 {args.line_number} ---") # 1. 找到对应的文件和行 target_line = args.line_number current_line_count = 0 line_content = None jsonl_files = sorted([os.path.join(r, f) for r, _, fs in os.walk(args.input_dir) for f in fs if f.endswith('.jsonl')]) if not jsonl_files: print(f"❌ 错误: 文件夹中没有 .jsonl 文件。") return for file_path in jsonl_files: with open(file_path, 'r', errors='ignore') as f: for line in f: current_line_count += 1 if current_line_count == target_line: line_content = line print(f"✅ 找到了第 {target_line} 行,位于文件: {file_path}") break if line_content: break if not line_content: print(f"❌ 错误: 未能找到第 {target_line} 行 (总共扫描了 {current_line_count} 行)。") return # 2. 解析和 Key 构建 params = parse_parameters_from_path(args.input_dir) compression_key = construct_compression_key(params) print(f"\n[步骤 A] 构建的压缩 Key: '{compression_key}'") try: data = json.loads(line_content) print(" -> JSON 加载成功。") if compression_key not in data: print(f" -> ❌ 错误: 构建的 Key 不在 JSON 对象中!") print(f" JSON 中的可用 Keys: {list(data.keys())}") return print(" -> ✅ Key 匹配成功!") except json.JSONDecodeError as e: print(f"❌ 错误: JSON 解码失败: {e}") return # 3. 解码 windows_starts_lens_b64 print("\n[步骤 B] 解码 'windows_starts_lens_b64'") b64_windows = data.get('windows_starts_lens_b64', '') print(f" -> 输入的 Base64 (前64字节): '{b64_windows[:64]}...'") starts, lens = decompress_windows_starts_lens(b64_windows) print(f" -> 解码结果: 共有 {len(starts)} 个窗口。") if starts: print(f" -> 前 5 个窗口 (start, length): {list(zip(starts, lens))[:5]}") # 4. 解码压缩数据 print(f"\n[步骤 C] 解码压缩数据字段 '{compression_key}'") b64_compressed = data.get(compression_key, '') print(f" -> 输入的 Base64 (前64字节): '{b64_compressed[:64]}...'") try: decoded_bytes = base64.b64decode(b64_compressed) print(f" -> Base64 解码后的字节长度: {len(decoded_bytes)}") mixed_pseudo_bytes = packed_bytes_to_pseudo(decoded_bytes) print(f" -> `packed_bytes_to_pseudo` 输出的总元素数量: {len(mixed_pseudo_bytes)}") print(f" -> 前 20 个元素: {mixed_pseudo_bytes[:20]}") pseudo_tokens = [t for t in mixed_pseudo_bytes if t >= 256] print(f" -> 过滤后 (>= 256) 的压缩 Token 数量: {len(pseudo_tokens)}") print(f" -> 前 20 个压缩 Token: {pseudo_tokens[:20]}") except Exception as e: print(f" -> ❌ 在此步骤中发生错误: {e}") return # 5. 最终诊断 print("\n" + "="*20 + " 最终诊断 " + "="*20) print(f"窗口数量 (来自 windows_starts_lens_b64): {len(starts)}") print(f"压缩 Token 数量 (来自 {compression_key}): {len(pseudo_tokens)}") if len(starts) == len(pseudo_tokens): print("\n🟢 结论: 长度匹配!之前的脚本可能存在其他问题。") else: print("\n🔴 结论: 长度不匹配!这是导致100%失败的根本原因。") print(" 这表明数据生成逻辑与我们的解析逻辑之间存在根本性的不一致。") print(" 可能的原因:") print(" 1. `windows_starts_lens_b64` 可能不包含所有压缩块的信息(例如,跳过了某些小块)。") print(" 2. 最终的压缩流中可能包含了一些不对应于 `windows` 的特殊符号。") print(" 3. `packed_bytes_to_pseudo` 的行为可能比我们想象的更复杂。") if __name__ == "__main__": parser = argparse.ArgumentParser( description="调试单行压缩数据,以找出长度不匹配的根本原因。", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument("input_dir", type=str, help="包含 .jsonl 数据文件的输入文件夹路径。") parser.add_argument("--line_number", type=int, required=True, help="要检查的具体行号 (从1开始)。") args = parser.parse_args() debug_line(args)