Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| TOC-Guided PDF Parser | |
| Uses the Table of Contents to guide intelligent chunking that respects | |
| document structure and hierarchy. | |
| Author: Arthur Passuello | |
| """ | |
| import re | |
| from typing import Dict, List, Optional, Tuple | |
| from dataclasses import dataclass | |
| class TOCEntry: | |
| """Represents a table of contents entry.""" | |
| title: str | |
| page: int | |
| level: int # 0 for chapters, 1 for sections, 2 for subsections | |
| parent: Optional[str] = None | |
| parent_title: Optional[str] = None # Added for hybrid parser compatibility | |
| class TOCGuidedParser: | |
| """Parser that uses TOC to create structure-aware chunks.""" | |
| def __init__(self, target_chunk_size: int = 1400, min_chunk_size: int = 800, | |
| max_chunk_size: int = 2000): | |
| """Initialize TOC-guided parser.""" | |
| self.target_chunk_size = target_chunk_size | |
| self.min_chunk_size = min_chunk_size | |
| self.max_chunk_size = max_chunk_size | |
| def parse_toc(self, pages: List[Dict]) -> List[TOCEntry]: | |
| """Parse table of contents from pages.""" | |
| toc_entries = [] | |
| # Find TOC pages (usually early in document) | |
| toc_pages = [] | |
| for i, page in enumerate(pages[:20]): # Check first 20 pages | |
| page_text = page.get('text', '').lower() | |
| if 'contents' in page_text or 'table of contents' in page_text: | |
| toc_pages.append((i, page)) | |
| if not toc_pages: | |
| print("No TOC found, using fallback structure detection") | |
| return self._detect_structure_without_toc(pages) | |
| # Parse TOC entries | |
| for page_idx, page in toc_pages: | |
| text = page.get('text', '') | |
| lines = text.split('\n') | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i].strip() | |
| # Skip empty lines and TOC header | |
| if not line or 'contents' in line.lower(): | |
| i += 1 | |
| continue | |
| # Pattern 1: "1.1 Title .... 23" | |
| match1 = re.match(r'^(\d+(?:\.\d+)*)\s+(.+?)\s*\.{2,}\s*(\d+)$', line) | |
| if match1: | |
| number, title, page_num = match1.groups() | |
| level = len(number.split('.')) - 1 | |
| toc_entries.append(TOCEntry( | |
| title=title.strip(), | |
| page=int(page_num), | |
| level=level | |
| )) | |
| i += 1 | |
| continue | |
| # Pattern 2: Multi-line format | |
| # "1.1" | |
| # "Title" | |
| # ". . . . 23" | |
| if re.match(r'^(\d+(?:\.\d+)*)$', line): | |
| number = line | |
| if i + 1 < len(lines): | |
| title_line = lines[i + 1].strip() | |
| if i + 2 < len(lines): | |
| dots_line = lines[i + 2].strip() | |
| page_match = re.search(r'(\d+)\s*$', dots_line) | |
| if page_match and '.' in dots_line: | |
| title = title_line | |
| page_num = int(page_match.group(1)) | |
| level = len(number.split('.')) - 1 | |
| toc_entries.append(TOCEntry( | |
| title=title, | |
| page=page_num, | |
| level=level | |
| )) | |
| i += 3 | |
| continue | |
| # Pattern 3: "Chapter 1: Title ... 23" | |
| match3 = re.match(r'^(Chapter|Section|Part)\s+(\d+):?\s+(.+?)\s*\.{2,}\s*(\d+)$', line, re.IGNORECASE) | |
| if match3: | |
| prefix, number, title, page_num = match3.groups() | |
| level = 0 if prefix.lower() == 'chapter' else 1 | |
| toc_entries.append(TOCEntry( | |
| title=f"{prefix} {number}: {title}", | |
| page=int(page_num), | |
| level=level | |
| )) | |
| i += 1 | |
| continue | |
| i += 1 | |
| # Add parent relationships | |
| for i, entry in enumerate(toc_entries): | |
| if entry.level > 0: | |
| # Find parent (previous entry with lower level) | |
| for j in range(i - 1, -1, -1): | |
| if toc_entries[j].level < entry.level: | |
| entry.parent = toc_entries[j].title | |
| entry.parent_title = toc_entries[j].title # Set both for compatibility | |
| break | |
| return toc_entries | |
| def _detect_structure_without_toc(self, pages: List[Dict]) -> List[TOCEntry]: | |
| """Fallback: detect structure from content patterns across ALL pages.""" | |
| entries = [] | |
| # Expanded patterns for better structure detection | |
| chapter_patterns = [ | |
| re.compile(r'^(Chapter|CHAPTER)\s+(\d+|[IVX]+)(?:\s*[:\-]\s*(.+))?', re.MULTILINE), | |
| re.compile(r'^(\d+)\s+([A-Z][^.]*?)(?:\s*\.{2,}\s*\d+)?$', re.MULTILINE), # "1 Introduction" | |
| re.compile(r'^([A-Z][A-Z\s]{10,})$', re.MULTILINE), # ALL CAPS titles | |
| ] | |
| section_patterns = [ | |
| re.compile(r'^(\d+\.\d+)\s+(.+?)(?:\s*\.{2,}\s*\d+)?$', re.MULTILINE), # "1.1 Section" | |
| re.compile(r'^(\d+\.\d+\.\d+)\s+(.+?)(?:\s*\.{2,}\s*\d+)?$', re.MULTILINE), # "1.1.1 Subsection" | |
| ] | |
| # Process ALL pages, not just first 20 | |
| for i, page in enumerate(pages): | |
| text = page.get('text', '') | |
| if not text.strip(): | |
| continue | |
| # Find chapters with various patterns | |
| for pattern in chapter_patterns: | |
| for match in pattern.finditer(text): | |
| if len(match.groups()) >= 2: | |
| if len(match.groups()) >= 3 and match.group(3): | |
| title = match.group(3).strip() | |
| else: | |
| title = match.group(2).strip() if match.group(2) else f"Section {match.group(1)}" | |
| # Skip very short or likely false positives | |
| if len(title) >= 3 and not re.match(r'^\d+$', title): | |
| entries.append(TOCEntry( | |
| title=title, | |
| page=i + 1, | |
| level=0 | |
| )) | |
| # Find sections | |
| for pattern in section_patterns: | |
| for match in pattern.finditer(text): | |
| section_num = match.group(1) | |
| title = match.group(2).strip() if len(match.groups()) >= 2 else f"Section {section_num}" | |
| # Determine level by number of dots | |
| level = section_num.count('.') | |
| # Skip very short titles or obvious artifacts | |
| if len(title) >= 3 and not re.match(r'^\d+$', title): | |
| entries.append(TOCEntry( | |
| title=title, | |
| page=i + 1, | |
| level=level | |
| )) | |
| # If still no entries found, create page-based entries for full coverage | |
| if not entries: | |
| print("No structure patterns found, creating page-based sections for full coverage") | |
| # Create sections every 10 pages to ensure full document coverage | |
| for i in range(0, len(pages), 10): | |
| start_page = i + 1 | |
| end_page = min(i + 10, len(pages)) | |
| title = f"Pages {start_page}-{end_page}" | |
| entries.append(TOCEntry( | |
| title=title, | |
| page=start_page, | |
| level=0 | |
| )) | |
| return entries | |
| def create_chunks_from_toc(self, pdf_data: Dict, toc_entries: List[TOCEntry]) -> List[Dict]: | |
| """Create chunks based on TOC structure.""" | |
| chunks = [] | |
| pages = pdf_data.get('pages', []) | |
| for i, entry in enumerate(toc_entries): | |
| # Determine page range for this entry | |
| start_page = entry.page - 1 # Convert to 0-indexed | |
| # Find end page (start of next entry at same or higher level) | |
| end_page = len(pages) | |
| for j in range(i + 1, len(toc_entries)): | |
| if toc_entries[j].level <= entry.level: | |
| end_page = toc_entries[j].page - 1 | |
| break | |
| # Extract text for this section | |
| section_text = [] | |
| for page_idx in range(max(0, start_page), min(end_page, len(pages))): | |
| page_text = pages[page_idx].get('text', '') | |
| if page_text.strip(): | |
| section_text.append(page_text) | |
| if not section_text: | |
| continue | |
| full_text = '\n\n'.join(section_text) | |
| # Create chunks from section text | |
| if len(full_text) <= self.max_chunk_size: | |
| # Single chunk for small sections | |
| chunks.append({ | |
| 'text': full_text.strip(), | |
| 'title': entry.title, | |
| 'parent_title': entry.parent_title or entry.parent or '', | |
| 'level': entry.level, | |
| 'page': entry.page, | |
| 'context': f"From {entry.title}", | |
| 'metadata': { | |
| 'parsing_method': 'toc_guided', | |
| 'section_title': entry.title, | |
| 'hierarchy_level': entry.level | |
| } | |
| }) | |
| else: | |
| # Split large sections into chunks | |
| section_chunks = self._split_text_into_chunks(full_text) | |
| for j, chunk_text in enumerate(section_chunks): | |
| chunks.append({ | |
| 'text': chunk_text.strip(), | |
| 'title': f"{entry.title} (Part {j+1})", | |
| 'parent_title': entry.parent_title or entry.parent or '', | |
| 'level': entry.level, | |
| 'page': entry.page, | |
| 'context': f"Part {j+1} of {entry.title}", | |
| 'metadata': { | |
| 'parsing_method': 'toc_guided', | |
| 'section_title': entry.title, | |
| 'hierarchy_level': entry.level, | |
| 'part_number': j + 1, | |
| 'total_parts': len(section_chunks) | |
| } | |
| }) | |
| return chunks | |
| def _split_text_into_chunks(self, text: str) -> List[str]: | |
| """Split text into chunks while preserving sentence boundaries.""" | |
| sentences = re.split(r'(?<=[.!?])\s+', text) | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 0 | |
| for sentence in sentences: | |
| sentence_size = len(sentence) | |
| if current_size + sentence_size > self.target_chunk_size and current_chunk: | |
| # Save current chunk | |
| chunks.append(' '.join(current_chunk)) | |
| current_chunk = [sentence] | |
| current_size = sentence_size | |
| else: | |
| current_chunk.append(sentence) | |
| current_size += sentence_size + 1 # +1 for space | |
| if current_chunk: | |
| chunks.append(' '.join(current_chunk)) | |
| return chunks | |
| def parse_pdf_with_toc_guidance(pdf_data: Dict, **kwargs) -> List[Dict]: | |
| """Main entry point for TOC-guided parsing.""" | |
| parser = TOCGuidedParser(**kwargs) | |
| # Parse TOC | |
| pages = pdf_data.get('pages', []) | |
| toc_entries = parser.parse_toc(pages) | |
| print(f"Found {len(toc_entries)} TOC entries") | |
| if not toc_entries: | |
| print("No TOC entries found, falling back to basic chunking") | |
| from .chunker import chunk_technical_text | |
| return chunk_technical_text(pdf_data.get('text', '')) | |
| # Create chunks based on TOC | |
| chunks = parser.create_chunks_from_toc(pdf_data, toc_entries) | |
| print(f"Created {len(chunks)} chunks from TOC structure") | |
| return chunks |