sagar008 commited on
Commit
2bdc136
Β·
verified Β·
1 Parent(s): 95714c1

Update document_processor.py

Browse files

fixed document_processor.py for float32 errors

Files changed (1) hide show
  1. document_processor.py +79 -62
document_processor.py CHANGED
@@ -1,6 +1,6 @@
1
- # document_processor.py - Updated with optimized processing
2
  import time
3
  import asyncio
 
4
  from concurrent.futures import ThreadPoolExecutor
5
  from typing import List, Dict, Any, Tuple
6
  from chunker import DocumentChunker
@@ -9,114 +9,129 @@ from risk_detector import RiskDetector
9
  from clause_tagger import ClauseTagger
10
  from models import *
11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  class DocumentProcessor:
13
  def __init__(self):
14
  self.chunker = None
15
  self.summarizer = None
16
  self.risk_detector = None
17
  self.clause_tagger = None
18
- self.cache = {} # Simple in-memory cache
19
- self.executor = ThreadPoolExecutor(max_workers=3) # For CPU-bound parallel tasks
20
-
21
  async def initialize(self):
22
  """Initialize all components"""
23
- print("πŸš€ Initializing Document Processor...")
24
-
25
  self.chunker = DocumentChunker()
26
  self.summarizer = DocumentSummarizer()
27
  self.risk_detector = RiskDetector()
28
  self.clause_tagger = ClauseTagger()
29
-
30
  # Initialize models in parallel for faster startup
31
  init_tasks = [
32
  self.summarizer.initialize(),
33
  self.clause_tagger.initialize()
34
  ]
35
-
36
  await asyncio.gather(*init_tasks)
37
-
38
- print("βœ… Document Processor initialized")
39
 
40
  async def process_document(self, text: str, doc_id: str) -> Tuple[Dict[str, Any], List[Dict]]:
41
  """Process document with optimized single embedding generation"""
42
-
43
  # Check cache first
44
  if doc_id in self.cache:
45
- print(f"πŸ“‹ Using cached result for doc_id: {doc_id}")
46
  return self.cache[doc_id]
47
-
48
- print(f"πŸ”„ Processing new document: {doc_id}")
49
  start_time = time.time()
50
-
51
  try:
52
- # Step 1: Chunk the document (fast, do this first)
53
  chunks = self.chunker.chunk_by_tokens(text, max_tokens=1600, stride=50)
54
- print(f"πŸ“¦ Created {len(chunks)} chunks in {time.time() - start_time:.2f}s")
55
-
56
- # Step 2: Generate embeddings ONCE for all chunks
57
- print(f"🧠 Generating embeddings for {len(chunks)} chunks...")
58
  embedding_start = time.time()
59
-
60
- # Generate all embeddings in one batch
61
  if self.clause_tagger.embedding_model:
62
  chunk_embeddings = self.clause_tagger.embedding_model.encode(chunks)
63
  embedding_time = time.time() - embedding_start
64
- print(f"βœ… Generated embeddings in {embedding_time:.2f}s")
65
-
66
- # Store embeddings for reuse
67
  chunk_data = [
68
- {"text": chunk, "embedding": embedding}
69
  for chunk, embedding in zip(chunks, chunk_embeddings)
70
  ]
71
  else:
72
  chunk_data = [{"text": chunk, "embedding": None} for chunk in chunks]
73
  embedding_time = 0
74
- print("⚠️ No embedding model available")
75
-
76
- # Step 3: Run analysis tasks in parallel using pre-computed embeddings
77
  tasks = []
78
-
79
- # Parallel task 1: Batch summarization (async)
80
  summary_task = asyncio.create_task(
81
  self.summarizer.batch_summarize(chunks)
82
  )
83
  tasks.append(('summary', summary_task))
84
-
85
- # Parallel task 2: Risk detection (CPU-bound, run in thread pool)
86
  risk_task = asyncio.get_event_loop().run_in_executor(
87
  self.executor,
88
  self.risk_detector.detect_risks,
89
  chunks
90
  )
91
  tasks.append(('risks', risk_task))
92
-
93
- # Parallel task 3: Clause tagging using pre-computed embeddings
94
  if self.clause_tagger.embedding_model and chunk_data[0]["embedding"] is not None:
95
  clause_task = asyncio.create_task(
96
  self.clause_tagger.tag_clauses_with_embeddings(chunk_data)
97
  )
98
  tasks.append(('clauses', clause_task))
99
-
100
- print(f"πŸš€ Starting {len(tasks)} parallel analysis tasks...")
101
-
102
- # Wait for all tasks to complete with progress tracking
103
  results = {}
104
  for task_name, task in tasks:
105
  try:
106
- print(f"⏳ Waiting for {task_name} analysis...")
107
  results[task_name] = await task
108
- print(f"βœ… {task_name} completed")
109
  except Exception as e:
110
- print(f"⚠️ {task_name} analysis failed: {e}")
111
- # Provide fallback results
112
  if task_name == 'summary':
113
  results[task_name] = {"actual_summary": "Summary generation failed", "short_summary": "Summary failed"}
114
  elif task_name == 'risks':
115
  results[task_name] = []
116
  elif task_name == 'clauses':
117
  results[task_name] = []
118
-
119
- # Combine results
120
  processing_time = time.time() - start_time
121
  result = {
122
  "summary": results.get('summary', {"actual_summary": "Summary not available", "short_summary": "Summary not available"}),
@@ -129,19 +144,21 @@ class DocumentProcessor:
129
  "doc_id": doc_id,
130
  "parallel_tasks_completed": len([r for r in results.values() if r])
131
  }
132
-
133
- # Cache the result
134
- cached_data = (result, chunk_data)
135
- self.cache[doc_id] = cached_data
 
 
 
136
  print(f"πŸŽ‰ Document processing completed in {processing_time:.2f}s")
137
-
138
- return result, chunk_data
139
-
140
  except Exception as e:
141
  error_time = time.time() - start_time
142
  print(f"❌ Document processing failed after {error_time:.2f}s: {e}")
143
-
144
- # Return error result
145
  error_result = {
146
  "error": str(e),
147
  "summary": {"actual_summary": "Processing failed", "short_summary": "Processing failed"},
@@ -151,9 +168,9 @@ class DocumentProcessor:
151
  "processing_time": f"{error_time:.2f}s",
152
  "doc_id": doc_id
153
  }
154
-
155
- return error_result, []
156
-
157
  def chunk_text(self, data: ChunkInput) -> Dict[str, Any]:
158
  """Standalone chunking endpoint"""
159
  start = time.time()
@@ -173,7 +190,7 @@ class DocumentProcessor:
173
  "time_taken": f"{time.time() - start:.2f}s",
174
  "status": "failed"
175
  }
176
-
177
  def summarize_batch(self, data: SummarizeBatchInput) -> Dict[str, Any]:
178
  """Standalone batch summarization endpoint"""
179
  start = time.time()
@@ -181,7 +198,7 @@ class DocumentProcessor:
181
  result = self.summarizer.summarize_texts_sync(data.texts, data.max_length, data.min_length)
182
  result["time_taken"] = f"{time.time() - start:.2f}s"
183
  result["status"] = "success"
184
- return result
185
  except Exception as e:
186
  return {
187
  "error": str(e),
@@ -189,14 +206,14 @@ class DocumentProcessor:
189
  "time_taken": f"{time.time() - start:.2f}s",
190
  "status": "failed"
191
  }
192
-
193
  def get_cache_stats(self) -> Dict[str, Any]:
194
  """Get cache statistics for monitoring"""
195
  return {
196
  "cached_documents": len(self.cache),
197
  "cache_keys": list(self.cache.keys())
198
  }
199
-
200
  def clear_cache(self) -> Dict[str, str]:
201
  """Clear the document cache"""
202
  cleared_count = len(self.cache)
@@ -205,7 +222,7 @@ class DocumentProcessor:
205
  "message": f"Cleared {cleared_count} cached documents",
206
  "status": "success"
207
  }
208
-
209
  def __del__(self):
210
  """Cleanup thread pool on destruction"""
211
  if hasattr(self, 'executor'):
 
 
1
  import time
2
  import asyncio
3
+ import numpy as np
4
  from concurrent.futures import ThreadPoolExecutor
5
  from typing import List, Dict, Any, Tuple
6
  from chunker import DocumentChunker
 
9
  from clause_tagger import ClauseTagger
10
  from models import *
11
 
12
+
13
+
14
+ def clean_numpy(obj):
15
+ """Recursively convert NumPy types to native Python types"""
16
+ if isinstance(obj, np.generic):
17
+ return obj.item()
18
+ elif isinstance(obj, np.ndarray):
19
+ return obj.tolist()
20
+ elif isinstance(obj, dict):
21
+ return {k: clean_numpy(v) for k, v in obj.items()}
22
+ elif isinstance(obj, list):
23
+ return [clean_numpy(v) for v in obj]
24
+ else:
25
+ return obj
26
+
27
+
28
+
29
  class DocumentProcessor:
30
  def __init__(self):
31
  self.chunker = None
32
  self.summarizer = None
33
  self.risk_detector = None
34
  self.clause_tagger = None
35
+ self.cache = {}
36
+ self.executor = ThreadPoolExecutor(max_workers=3)
37
+
38
  async def initialize(self):
39
  """Initialize all components"""
40
+ print(" Initializing Document Processor...")
41
+
42
  self.chunker = DocumentChunker()
43
  self.summarizer = DocumentSummarizer()
44
  self.risk_detector = RiskDetector()
45
  self.clause_tagger = ClauseTagger()
46
+
47
  # Initialize models in parallel for faster startup
48
  init_tasks = [
49
  self.summarizer.initialize(),
50
  self.clause_tagger.initialize()
51
  ]
52
+
53
  await asyncio.gather(*init_tasks)
54
+ print(" Document Processor initialized")
 
55
 
56
  async def process_document(self, text: str, doc_id: str) -> Tuple[Dict[str, Any], List[Dict]]:
57
  """Process document with optimized single embedding generation"""
58
+
59
  # Check cache first
60
  if doc_id in self.cache:
61
+ print(f" Using cached result for doc_id: {doc_id}")
62
  return self.cache[doc_id]
63
+
64
+ print(f" Processing new document: {doc_id}")
65
  start_time = time.time()
66
+
67
  try:
68
+ # Step 1: Chunk the document
69
  chunks = self.chunker.chunk_by_tokens(text, max_tokens=1600, stride=50)
70
+ print(f" Created {len(chunks)} chunks in {time.time() - start_time:.2f}s")
71
+
72
+ # Step 2: Generate embeddings
73
+ print(f" Generating embeddings for {len(chunks)} chunks...")
74
  embedding_start = time.time()
75
+
 
76
  if self.clause_tagger.embedding_model:
77
  chunk_embeddings = self.clause_tagger.embedding_model.encode(chunks)
78
  embedding_time = time.time() - embedding_start
79
+ print(f" Generated embeddings in {embedding_time:.2f}s")
80
+
81
+ # Convert embeddings to lists to avoid NumPy serialization issues
82
  chunk_data = [
83
+ {"text": chunk, "embedding": embedding.tolist()}
84
  for chunk, embedding in zip(chunks, chunk_embeddings)
85
  ]
86
  else:
87
  chunk_data = [{"text": chunk, "embedding": None} for chunk in chunks]
88
  embedding_time = 0
89
+ print(" No embedding model available")
90
+
91
+ # Step 3: Run analysis tasks in parallel
92
  tasks = []
93
+
94
+ # Task 1: Summarization (async)
95
  summary_task = asyncio.create_task(
96
  self.summarizer.batch_summarize(chunks)
97
  )
98
  tasks.append(('summary', summary_task))
99
+
100
+ # Task 2: Risk detection (CPU-bound)
101
  risk_task = asyncio.get_event_loop().run_in_executor(
102
  self.executor,
103
  self.risk_detector.detect_risks,
104
  chunks
105
  )
106
  tasks.append(('risks', risk_task))
107
+
108
+ # Task 3: Clause tagging (async, uses embeddings)
109
  if self.clause_tagger.embedding_model and chunk_data[0]["embedding"] is not None:
110
  clause_task = asyncio.create_task(
111
  self.clause_tagger.tag_clauses_with_embeddings(chunk_data)
112
  )
113
  tasks.append(('clauses', clause_task))
114
+
115
+ print(f" Starting {len(tasks)} parallel analysis tasks...")
116
+
117
+ # Wait for all tasks
118
  results = {}
119
  for task_name, task in tasks:
120
  try:
121
+ print(f" Waiting for {task_name} analysis...")
122
  results[task_name] = await task
123
+ print(f" {task_name} completed")
124
  except Exception as e:
125
+ print(f" {task_name} analysis failed: {e}")
126
+ # Fallback results
127
  if task_name == 'summary':
128
  results[task_name] = {"actual_summary": "Summary generation failed", "short_summary": "Summary failed"}
129
  elif task_name == 'risks':
130
  results[task_name] = []
131
  elif task_name == 'clauses':
132
  results[task_name] = []
133
+
134
+ # Step 4: Combine results
135
  processing_time = time.time() - start_time
136
  result = {
137
  "summary": results.get('summary', {"actual_summary": "Summary not available", "short_summary": "Summary not available"}),
 
144
  "doc_id": doc_id,
145
  "parallel_tasks_completed": len([r for r in results.values() if r])
146
  }
147
+
148
+ # Step 5: Clean NumPy data before caching/returning
149
+ cleaned_result = clean_numpy(result)
150
+ cleaned_chunk_data = clean_numpy(chunk_data)
151
+
152
+ # Cache results
153
+ self.cache[doc_id] = (cleaned_result, cleaned_chunk_data)
154
  print(f"πŸŽ‰ Document processing completed in {processing_time:.2f}s")
155
+
156
+ return cleaned_result, cleaned_chunk_data
157
+
158
  except Exception as e:
159
  error_time = time.time() - start_time
160
  print(f"❌ Document processing failed after {error_time:.2f}s: {e}")
161
+
 
162
  error_result = {
163
  "error": str(e),
164
  "summary": {"actual_summary": "Processing failed", "short_summary": "Processing failed"},
 
168
  "processing_time": f"{error_time:.2f}s",
169
  "doc_id": doc_id
170
  }
171
+
172
+ return clean_numpy(error_result), []
173
+
174
  def chunk_text(self, data: ChunkInput) -> Dict[str, Any]:
175
  """Standalone chunking endpoint"""
176
  start = time.time()
 
190
  "time_taken": f"{time.time() - start:.2f}s",
191
  "status": "failed"
192
  }
193
+
194
  def summarize_batch(self, data: SummarizeBatchInput) -> Dict[str, Any]:
195
  """Standalone batch summarization endpoint"""
196
  start = time.time()
 
198
  result = self.summarizer.summarize_texts_sync(data.texts, data.max_length, data.min_length)
199
  result["time_taken"] = f"{time.time() - start:.2f}s"
200
  result["status"] = "success"
201
+ return clean_numpy(result)
202
  except Exception as e:
203
  return {
204
  "error": str(e),
 
206
  "time_taken": f"{time.time() - start:.2f}s",
207
  "status": "failed"
208
  }
209
+
210
  def get_cache_stats(self) -> Dict[str, Any]:
211
  """Get cache statistics for monitoring"""
212
  return {
213
  "cached_documents": len(self.cache),
214
  "cache_keys": list(self.cache.keys())
215
  }
216
+
217
  def clear_cache(self) -> Dict[str, str]:
218
  """Clear the document cache"""
219
  cleared_count = len(self.cache)
 
222
  "message": f"Cleared {cleared_count} cached documents",
223
  "status": "success"
224
  }
225
+
226
  def __del__(self):
227
  """Cleanup thread pool on destruction"""
228
  if hasattr(self, 'executor'):