|
|
import json |
|
|
import base64 |
|
|
import argparse |
|
|
import os |
|
|
import re |
|
|
import sys |
|
|
|
|
|
|
|
|
|
|
|
def vread(buf: bytes, i: int): |
|
|
shift = val = 0 |
|
|
while True: |
|
|
if i >= len(buf): |
|
|
raise IndexError("Buffer exhausted during vread") |
|
|
b = buf[i] |
|
|
i += 1 |
|
|
val |= (b & 0x7F) << shift |
|
|
if b < 0x80: |
|
|
return val, i |
|
|
shift += 7 |
|
|
|
|
|
def decompress_windows_starts_lens(b64_stream: str) -> tuple[list[int], list[int]]: |
|
|
try: |
|
|
buf = base64.b64decode(b64_stream) |
|
|
i = 0 |
|
|
cursor= 0 |
|
|
starts, lens = [], [] |
|
|
while i < len(buf): |
|
|
gap, i = vread(buf, i) |
|
|
size, i = vread(buf, i) |
|
|
start = cursor + gap |
|
|
length = size |
|
|
starts.append(start) |
|
|
lens.append(length) |
|
|
cursor = start + length |
|
|
return starts, lens |
|
|
except (base64.binascii.Error, IndexError) as e: |
|
|
print(f" [解码窗口时出错: {e}]") |
|
|
return [], [] |
|
|
|
|
|
def packed_bytes_to_pseudo(b: bytes) -> list[int]: |
|
|
|
|
|
out, acc, bits = [], 0, 0 |
|
|
for byte in b: |
|
|
acc |= byte << bits |
|
|
bits += 8 |
|
|
while bits >= 9: |
|
|
out.append(acc & 0x1FF) |
|
|
acc >>= 9 |
|
|
bits -= 9 |
|
|
|
|
|
return out |
|
|
|
|
|
|
|
|
|
|
|
def parse_parameters_from_path(path_name: str) -> dict: |
|
|
params = {} |
|
|
base_name = os.path.basename(os.path.normpath(path_name)) |
|
|
parts = base_name.split('_') |
|
|
for part in parts: |
|
|
if '-' in part: |
|
|
key, value = part.split('-', 1) |
|
|
params[key.lower()] = value.lower() |
|
|
else: |
|
|
match = re.match(r'([a-zA-Z]+)(\d+)', part) |
|
|
if match: |
|
|
key, value = match.groups() |
|
|
params[key.lower()] = value |
|
|
return params |
|
|
|
|
|
def construct_compression_key(params: dict) -> str: |
|
|
ow = params.get('ow', '20') |
|
|
escape_fb = 'True' if params.get('escapefb', 'false') == 'true' else 'False' |
|
|
iterative = 'True' if params.get('iterative', 'false') == 'true' else 'False' |
|
|
force_padding = 'True' if params.get('forcepadding', 'false') == 'true' else 'False' |
|
|
key = f"m1_ac_ow{ow}_escapefb-{escape_fb}_iterative-{iterative}_forcepadding-{force_padding}" |
|
|
return key |
|
|
|
|
|
|
|
|
|
|
|
def debug_line(args): |
|
|
print(f"--- 开始调试: 文件夹 '{args.input_dir}', 行号 {args.line_number} ---") |
|
|
|
|
|
|
|
|
target_line = args.line_number |
|
|
current_line_count = 0 |
|
|
line_content = None |
|
|
|
|
|
jsonl_files = sorted([os.path.join(r, f) for r, _, fs in os.walk(args.input_dir) for f in fs if f.endswith('.jsonl')]) |
|
|
if not jsonl_files: |
|
|
print(f"❌ 错误: 文件夹中没有 .jsonl 文件。") |
|
|
return |
|
|
|
|
|
for file_path in jsonl_files: |
|
|
with open(file_path, 'r', errors='ignore') as f: |
|
|
for line in f: |
|
|
current_line_count += 1 |
|
|
if current_line_count == target_line: |
|
|
line_content = line |
|
|
print(f"✅ 找到了第 {target_line} 行,位于文件: {file_path}") |
|
|
break |
|
|
if line_content: |
|
|
break |
|
|
|
|
|
if not line_content: |
|
|
print(f"❌ 错误: 未能找到第 {target_line} 行 (总共扫描了 {current_line_count} 行)。") |
|
|
return |
|
|
|
|
|
|
|
|
params = parse_parameters_from_path(args.input_dir) |
|
|
compression_key = construct_compression_key(params) |
|
|
print(f"\n[步骤 A] 构建的压缩 Key: '{compression_key}'") |
|
|
|
|
|
try: |
|
|
data = json.loads(line_content) |
|
|
print(" -> JSON 加载成功。") |
|
|
if compression_key not in data: |
|
|
print(f" -> ❌ 错误: 构建的 Key 不在 JSON 对象中!") |
|
|
print(f" JSON 中的可用 Keys: {list(data.keys())}") |
|
|
return |
|
|
print(" -> ✅ Key 匹配成功!") |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"❌ 错误: JSON 解码失败: {e}") |
|
|
return |
|
|
|
|
|
|
|
|
print("\n[步骤 B] 解码 'windows_starts_lens_b64'") |
|
|
b64_windows = data.get('windows_starts_lens_b64', '') |
|
|
print(f" -> 输入的 Base64 (前64字节): '{b64_windows[:64]}...'") |
|
|
starts, lens = decompress_windows_starts_lens(b64_windows) |
|
|
print(f" -> 解码结果: 共有 {len(starts)} 个窗口。") |
|
|
if starts: |
|
|
print(f" -> 前 5 个窗口 (start, length): {list(zip(starts, lens))[:5]}") |
|
|
|
|
|
|
|
|
print(f"\n[步骤 C] 解码压缩数据字段 '{compression_key}'") |
|
|
b64_compressed = data.get(compression_key, '') |
|
|
print(f" -> 输入的 Base64 (前64字节): '{b64_compressed[:64]}...'") |
|
|
try: |
|
|
decoded_bytes = base64.b64decode(b64_compressed) |
|
|
print(f" -> Base64 解码后的字节长度: {len(decoded_bytes)}") |
|
|
|
|
|
mixed_pseudo_bytes = packed_bytes_to_pseudo(decoded_bytes) |
|
|
print(f" -> `packed_bytes_to_pseudo` 输出的总元素数量: {len(mixed_pseudo_bytes)}") |
|
|
print(f" -> 前 20 个元素: {mixed_pseudo_bytes[:20]}") |
|
|
|
|
|
pseudo_tokens = [t for t in mixed_pseudo_bytes if t >= 256] |
|
|
print(f" -> 过滤后 (>= 256) 的压缩 Token 数量: {len(pseudo_tokens)}") |
|
|
print(f" -> 前 20 个压缩 Token: {pseudo_tokens[:20]}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f" -> ❌ 在此步骤中发生错误: {e}") |
|
|
return |
|
|
|
|
|
|
|
|
print("\n" + "="*20 + " 最终诊断 " + "="*20) |
|
|
print(f"窗口数量 (来自 windows_starts_lens_b64): {len(starts)}") |
|
|
print(f"压缩 Token 数量 (来自 {compression_key}): {len(pseudo_tokens)}") |
|
|
if len(starts) == len(pseudo_tokens): |
|
|
print("\n🟢 结论: 长度匹配!之前的脚本可能存在其他问题。") |
|
|
else: |
|
|
print("\n🔴 结论: 长度不匹配!这是导致100%失败的根本原因。") |
|
|
print(" 这表明数据生成逻辑与我们的解析逻辑之间存在根本性的不一致。") |
|
|
print(" 可能的原因:") |
|
|
print(" 1. `windows_starts_lens_b64` 可能不包含所有压缩块的信息(例如,跳过了某些小块)。") |
|
|
print(" 2. 最终的压缩流中可能包含了一些不对应于 `windows` 的特殊符号。") |
|
|
print(" 3. `packed_bytes_to_pseudo` 的行为可能比我们想象的更复杂。") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
parser = argparse.ArgumentParser( |
|
|
description="调试单行压缩数据,以找出长度不匹配的根本原因。", |
|
|
formatter_class=argparse.RawTextHelpFormatter |
|
|
) |
|
|
parser.add_argument("input_dir", type=str, help="包含 .jsonl 数据文件的输入文件夹路径。") |
|
|
parser.add_argument("--line_number", type=int, required=True, help="要检查的具体行号 (从1开始)。") |
|
|
|
|
|
args = parser.parse_args() |
|
|
debug_line(args) |
|
|
|
|
|
|