Archime commited on
Commit
935d736
·
1 Parent(s): 8417fa3

impl ACTIVE_STREAM_FLAG

Browse files
app.py CHANGED
@@ -28,6 +28,7 @@ from app.session_utils import (
28
  register_session_hash_code,
29
  reset_all_active_session_hash_code,
30
  get_active_task_flag_file,
 
31
 
32
  )
33
 
@@ -79,8 +80,7 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
79
  gr.Timer(3.0).tick(fn=get_active_session_hash_code, outputs=sessions_table)
80
 
81
  demo.load(fn=on_load, inputs=None, outputs=[session_hash_code, session_hash_code_box])
82
- demo.unload(on_unload)
83
- stop_streaming_flags = gr.State(value={"stop": False})
84
  active_filepath = gr.State(value=next(iter(EXAMPLE_CONFIGS)))
85
 
86
  with gr.Walkthrough(selected=0) as walkthrough:
@@ -144,7 +144,7 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
144
 
145
  webrtc_stream.stream(
146
  fn=read_and_stream_audio,
147
- inputs=[active_filepath, session_hash_code, stop_streaming_flags,gr.State(READ_SIZE)],
148
  outputs=[webrtc_stream],
149
  trigger=start_stream_button.click,
150
  concurrency_id="audio_stream",
@@ -267,7 +267,6 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
267
  with gr.Row():
268
  gr.Markdown("##### Transcription / Translation Result")
269
  with gr.Row():
270
-
271
  task_output = gr.Textbox(
272
  label="Transcription / Translation Result",
273
  show_label=False,
@@ -286,14 +285,11 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
286
 
287
  stop_stream_button.click(
288
  fn=stop_streaming,
289
- inputs=[session_hash_code, stop_streaming_flags],
290
- outputs=[stop_streaming_flags],
291
  )
292
 
293
  def stop_task_fn(session_hash_code):
294
- transcribe_active = get_active_task_flag_file(session_hash_code)
295
- if os.path.exists(transcribe_active):
296
- os.remove(transcribe_active)
297
  yield "Task stopped by user."
298
 
299
 
@@ -308,7 +304,7 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
308
  streaming_policy, alignatt_thr, waitk_lagging,
309
  exclude_sink_frames, xatt_scores_layer, hallucinations_detector]
310
  def start_transcription(
311
- session_hash_code, stop_streaming_flags,
312
  task_type, lang_source, lang_target,
313
  chunk_secs, left_context_secs, right_context_secs,
314
  streaming_policy, alignatt_thr, waitk_lagging,
@@ -338,7 +334,7 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
338
  """Stream transcription or translation results in real time."""
339
  accumulated = ""
340
  # Boucle sur le générateur de `task2()`
341
- for result, status, current_chunk in task_fake(
342
  session_hash_code,
343
  task_type, lang_source, lang_target,
344
  chunk_secs, left_context_secs, right_context_secs,
@@ -348,7 +344,9 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
348
  if status == "success":
349
  yield accumulated + result, gr.update(visible=True,value=current_chunk , elem_classes=["info"]), gr.update(visible=False), gr.update(visible=True)
350
  accumulated += result
351
- elif status in ["error", "warning", "info", "done"]:
 
 
352
  yield accumulated, gr.update(visible=True,value=result , elem_classes=[status]), gr.update(visible=True), gr.update(visible=False)
353
 
354
 
@@ -368,7 +366,7 @@ with gr.Blocks(theme=theme, css=css_style) as demo:
368
  # start_task_button.click(
369
  # fn=start_task,
370
  # inputs=[
371
- # session_hash_code, stop_streaming_flags,
372
  # task_type, lang_source, lang_target,
373
  # chunk_secs, left_context_secs, right_context_secs,
374
  # streaming_policy, alignatt_thr, waitk_lagging,
 
28
  register_session_hash_code,
29
  reset_all_active_session_hash_code,
30
  get_active_task_flag_file,
31
+ remove_active_task_flag_file
32
 
33
  )
34
 
 
80
  gr.Timer(3.0).tick(fn=get_active_session_hash_code, outputs=sessions_table)
81
 
82
  demo.load(fn=on_load, inputs=None, outputs=[session_hash_code, session_hash_code_box])
83
+ demo.unload(fn=on_unload)
 
84
  active_filepath = gr.State(value=next(iter(EXAMPLE_CONFIGS)))
85
 
86
  with gr.Walkthrough(selected=0) as walkthrough:
 
144
 
145
  webrtc_stream.stream(
146
  fn=read_and_stream_audio,
147
+ inputs=[active_filepath, session_hash_code,gr.State(READ_SIZE)],
148
  outputs=[webrtc_stream],
149
  trigger=start_stream_button.click,
150
  concurrency_id="audio_stream",
 
267
  with gr.Row():
268
  gr.Markdown("##### Transcription / Translation Result")
269
  with gr.Row():
 
270
  task_output = gr.Textbox(
271
  label="Transcription / Translation Result",
272
  show_label=False,
 
285
 
286
  stop_stream_button.click(
287
  fn=stop_streaming,
288
+ inputs=[session_hash_code],
 
289
  )
290
 
291
  def stop_task_fn(session_hash_code):
292
+ remove_active_task_flag_file(session_hash_code)
 
 
293
  yield "Task stopped by user."
294
 
295
 
 
304
  streaming_policy, alignatt_thr, waitk_lagging,
305
  exclude_sink_frames, xatt_scores_layer, hallucinations_detector]
306
  def start_transcription(
307
+ session_hash_code,
308
  task_type, lang_source, lang_target,
309
  chunk_secs, left_context_secs, right_context_secs,
310
  streaming_policy, alignatt_thr, waitk_lagging,
 
334
  """Stream transcription or translation results in real time."""
335
  accumulated = ""
336
  # Boucle sur le générateur de `task2()`
337
+ for result, status, current_chunk in task(
338
  session_hash_code,
339
  task_type, lang_source, lang_target,
340
  chunk_secs, left_context_secs, right_context_secs,
 
344
  if status == "success":
345
  yield accumulated + result, gr.update(visible=True,value=current_chunk , elem_classes=["info"]), gr.update(visible=False), gr.update(visible=True)
346
  accumulated += result
347
+ elif status in ["warning","info" ]:
348
+ yield accumulated, gr.update(visible=True,value=result , elem_classes=[status]), gr.update(visible=False), gr.update(visible=True)
349
+ elif status in ["error", "done"]:
350
  yield accumulated, gr.update(visible=True,value=result , elem_classes=[status]), gr.update(visible=True), gr.update(visible=False)
351
 
352
 
 
366
  # start_task_button.click(
367
  # fn=start_task,
368
  # inputs=[
369
+ # session_hash_code,
370
  # task_type, lang_source, lang_target,
371
  # chunk_secs, left_context_secs, right_context_secs,
372
  # streaming_policy, alignatt_thr, waitk_lagging,
app/canary_speech_engine.py CHANGED
@@ -373,7 +373,7 @@ class CanarySpeechEngine(IStreamingSpeechEngine):
373
  # logging.info(f"--- transcribe_chunk: took {duration_ms:.2f} ms ---")
374
 
375
  # Return both the full segment transcription and the new diff
376
- return current_transcription, new_text
377
 
378
  def finalize_segment(self):
379
  """
 
373
  # logging.info(f"--- transcribe_chunk: took {duration_ms:.2f} ms ---")
374
 
375
  # Return both the full segment transcription and the new diff
376
+ yield current_transcription, new_text
377
 
378
  def finalize_segment(self):
379
  """
app/session_utils.py CHANGED
@@ -9,6 +9,7 @@ import gradio as gr
9
 
10
  TMP_DIR = os.getenv("TMP_DIR", "/tmp/canary_aed_streaming")
11
  ACTIVE_SESSIONS_HASH_FILE = os.path.join(TMP_DIR, "active_session_hash_code.json")
 
12
  ACTIVE_TASK_FLAG="task_active_"
13
  NAME_FOLDER_CHUNKS="chunks_"
14
 
@@ -58,17 +59,17 @@ def on_load(request: gr.Request):
58
  # ---------------------------
59
  def on_unload(request: gr.Request):
60
  """Called when the visitor closes or refreshes the app."""
61
- sid = request.session_hash_code
62
  sessions = _read_session_hash_code()
63
 
64
- if sid in sessions:
65
- sessions.pop(sid)
66
  _write_session_hash_code(sessions)
67
- remove_session_hash_code_data(sid)
68
- unregister_session_hash_code_hash(sid)
69
- logging.info(f"[{sid}] session_hash_code removed (on_unload).")
70
  else:
71
- logging.info(f"[{sid}] No active session_hash_code found to remove.")
72
 
73
  def ensure_tmp_dir():
74
  """Ensures the base temporary directory exists."""
@@ -92,7 +93,8 @@ def reset_all_active_session_hash_code():
92
  # --- Clean all flag files (stream + transcribe) ---
93
  for f in os.listdir(TMP_DIR):
94
  if (
95
- f.startswith(f"{ACTIVE_TASK_FLAG}")
 
96
  ) and f.endswith(".txt"):
97
  path = os.path.join(TMP_DIR, f)
98
  try:
@@ -138,6 +140,7 @@ def remove_session_hash_code_data(session_hash_code: str):
138
  # --- Define all possible session_hash_code file patterns ---
139
  files_to_remove = [
140
  get_active_task_flag_file(session_hash_code),
 
141
  ]
142
 
143
  # --- Remove all temporary files ---
@@ -164,11 +167,6 @@ def remove_session_hash_code_data(session_hash_code: str):
164
  except Exception as e:
165
  logging.error(f"[{session_hash_code}] Error during reset_session: {e}")
166
 
167
- def generate_session_id() -> str:
168
- """Generates a unique session_hash_code ID."""
169
- sid = str(uuid.uuid4())
170
- logging.debug(f"[{sid}] New session_hash_code created.")
171
- return sid
172
 
173
 
174
  def register_session_hash_code(session_hash_code: str, filepath: str):
@@ -239,6 +237,27 @@ def get_active_session_hash_code():
239
  def get_active_task_flag_file(session_hash_code: str):
240
  return os.path.join(TMP_DIR, f"{ACTIVE_TASK_FLAG}{session_hash_code}.txt")
241
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
242
 
243
  def get_folder_chunks(session_hash_code: str):
244
  return os.path.join(TMP_DIR, f"{NAME_FOLDER_CHUNKS}{session_hash_code}")
 
9
 
10
  TMP_DIR = os.getenv("TMP_DIR", "/tmp/canary_aed_streaming")
11
  ACTIVE_SESSIONS_HASH_FILE = os.path.join(TMP_DIR, "active_session_hash_code.json")
12
+ ACTIVE_STREAM_FLAG="stream_active_"
13
  ACTIVE_TASK_FLAG="task_active_"
14
  NAME_FOLDER_CHUNKS="chunks_"
15
 
 
59
  # ---------------------------
60
  def on_unload(request: gr.Request):
61
  """Called when the visitor closes or refreshes the app."""
62
+ session_hash_code = request.session_hash
63
  sessions = _read_session_hash_code()
64
 
65
+ if session_hash_code in sessions:
66
+ sessions.pop(session_hash_code)
67
  _write_session_hash_code(sessions)
68
+ remove_session_hash_code_data(session_hash_code)
69
+ unregister_session_hash_code_hash(session_hash_code)
70
+ logging.info(f"[{session_hash_code}] session_hash_code removed (on_unload).")
71
  else:
72
+ logging.info(f"[{session_hash_code}] No active session_hash_code found to remove.")
73
 
74
  def ensure_tmp_dir():
75
  """Ensures the base temporary directory exists."""
 
93
  # --- Clean all flag files (stream + transcribe) ---
94
  for f in os.listdir(TMP_DIR):
95
  if (
96
+ f.startswith(f"{ACTIVE_TASK_FLAG}")
97
+ or f.startswith(f"{ACTIVE_STREAM_FLAG}")
98
  ) and f.endswith(".txt"):
99
  path = os.path.join(TMP_DIR, f)
100
  try:
 
140
  # --- Define all possible session_hash_code file patterns ---
141
  files_to_remove = [
142
  get_active_task_flag_file(session_hash_code),
143
+ get_active_stream_flag_file(session_hash_code),
144
  ]
145
 
146
  # --- Remove all temporary files ---
 
167
  except Exception as e:
168
  logging.error(f"[{session_hash_code}] Error during reset_session: {e}")
169
 
 
 
 
 
 
170
 
171
 
172
  def register_session_hash_code(session_hash_code: str, filepath: str):
 
237
  def get_active_task_flag_file(session_hash_code: str):
238
  return os.path.join(TMP_DIR, f"{ACTIVE_TASK_FLAG}{session_hash_code}.txt")
239
 
240
+ def get_active_stream_flag_file(session_hash_code: str):
241
+ return os.path.join(TMP_DIR, f"{ACTIVE_STREAM_FLAG}{session_hash_code}.txt")
242
+
243
+
244
+ def remove_active_stream_flag_file(session_hash_code: str):
245
+ fname = os.path.join(TMP_DIR, f"{ACTIVE_STREAM_FLAG}{session_hash_code}.txt")
246
+ if os.path.exists(fname):
247
+ try:
248
+ os.remove(fname)
249
+ logging.debug(f"[{session_hash_code}] Removed file: {fname}")
250
+ except Exception as e:
251
+ logging.warning(f"[{session_hash_code}] Failed to remove file {fname}: {e}")
252
+
253
+ def remove_active_task_flag_file(session_hash_code: str):
254
+ fname = os.path.join(TMP_DIR, f"{ACTIVE_TASK_FLAG}{session_hash_code}.txt")
255
+ if os.path.exists(fname):
256
+ try:
257
+ os.remove(fname)
258
+ logging.debug(f"[{session_hash_code}] Removed file: {fname}")
259
+ except Exception as e:
260
+ logging.warning(f"[{session_hash_code}] Failed to remove file {fname}: {e}")
261
 
262
  def get_folder_chunks(session_hash_code: str):
263
  return os.path.join(TMP_DIR, f"{NAME_FOLDER_CHUNKS}{session_hash_code}")
app/streaming_audio_processor.py CHANGED
@@ -84,18 +84,19 @@ class StreamingAudioProcessor:
84
  Flushes the remaining buffer to the transcriber, resets the state,
85
  and returns the last transcribed text.
86
  """
87
- new_text = ""
88
  if len(self.internal_buffer) > 0:
89
  # Buffer is already a numpy array
90
  final_segment_chunk = self.internal_buffer
91
  logging.info(f"Flushing segment remainder of {len(final_segment_chunk)} samples.")
92
- seg, new_text = self.speech_engine.transcribe_chunk(final_segment_chunk, is_last_chunk=True)
 
93
  else:
94
  # Buffer is empty, but send a silent "flush"
95
  # to force the transcriber to finalize its internal state.
96
  logging.info("Buffer empty, sending silent flush to finalize segment.")
97
  flush_chunk = np.zeros(self.logical_chunk_size, dtype='int16')
98
- seg, new_text = self.speech_engine.transcribe_chunk(flush_chunk, is_last_chunk=True)
 
99
 
100
  # Full state reset
101
  logging.debug("Resetting speech engine state...")
@@ -106,7 +107,7 @@ class StreamingAudioProcessor:
106
  self.is_first_logical_chunk = True
107
  self.silent_chunks_count = 0
108
 
109
- return new_text
110
 
111
  def process_chunk(self, chunk: np.ndarray):
112
  """
@@ -142,10 +143,12 @@ class StreamingAudioProcessor:
142
 
143
  if asr_chunk_np is not None:
144
  logging.debug(f"Sending logical chunk (size: {len(asr_chunk_np)}) to speech engine...")
145
- seg, new_text = self.speech_engine.transcribe_chunk(asr_chunk_np, is_last_chunk=False)
146
- if new_text:
147
  logging.info(f"Received new text segment: '{new_text}'")
148
  new_text_segments.append(new_text)
 
 
 
149
  self.is_first_logical_chunk = False
150
 
151
  # --- 3. VAD Reset Logic ---
@@ -153,12 +156,14 @@ class StreamingAudioProcessor:
153
  logging.info(f"\n[VAD RESET: SILENCE detected ({self.silent_chunks_count} empty chunks) at {(self.chunks_count * (self.read_size/self.VAD_SAMPLE_RATE)):.2f}s]")
154
 
155
  # Flush the buffer, reset state, and get final text
156
- reset_text = self._flush_and_reset()
157
- if reset_text:
158
  logging.info(f"Received final reset text: '{reset_text}'")
159
  new_text_segments.append(reset_text)
160
-
161
- return new_text_segments
 
 
 
162
 
163
  def finalize_stream(self):
164
  """
 
84
  Flushes the remaining buffer to the transcriber, resets the state,
85
  and returns the last transcribed text.
86
  """
 
87
  if len(self.internal_buffer) > 0:
88
  # Buffer is already a numpy array
89
  final_segment_chunk = self.internal_buffer
90
  logging.info(f"Flushing segment remainder of {len(final_segment_chunk)} samples.")
91
+ for seg, new_text in self.speech_engine.transcribe_chunk(final_segment_chunk, is_last_chunk=True) :
92
+ yield new_text
93
  else:
94
  # Buffer is empty, but send a silent "flush"
95
  # to force the transcriber to finalize its internal state.
96
  logging.info("Buffer empty, sending silent flush to finalize segment.")
97
  flush_chunk = np.zeros(self.logical_chunk_size, dtype='int16')
98
+ for seg, new_text in self.speech_engine.transcribe_chunk(flush_chunk, is_last_chunk=True) :
99
+ yield new_text
100
 
101
  # Full state reset
102
  logging.debug("Resetting speech engine state...")
 
107
  self.is_first_logical_chunk = True
108
  self.silent_chunks_count = 0
109
 
110
+ yield ""
111
 
112
  def process_chunk(self, chunk: np.ndarray):
113
  """
 
143
 
144
  if asr_chunk_np is not None:
145
  logging.debug(f"Sending logical chunk (size: {len(asr_chunk_np)}) to speech engine...")
146
+ for seg, new_text in self.speech_engine.transcribe_chunk(asr_chunk_np, is_last_chunk=False) :
 
147
  logging.info(f"Received new text segment: '{new_text}'")
148
  new_text_segments.append(new_text)
149
+ yield new_text
150
+ else :
151
+ yield ""
152
  self.is_first_logical_chunk = False
153
 
154
  # --- 3. VAD Reset Logic ---
 
156
  logging.info(f"\n[VAD RESET: SILENCE detected ({self.silent_chunks_count} empty chunks) at {(self.chunks_count * (self.read_size/self.VAD_SAMPLE_RATE)):.2f}s]")
157
 
158
  # Flush the buffer, reset state, and get final text
159
+ for reset_text in self._flush_and_reset() :
 
160
  logging.info(f"Received final reset text: '{reset_text}'")
161
  new_text_segments.append(reset_text)
162
+ yield reset_text
163
+ else :
164
+ yield ""
165
+
166
+ yield ""
167
 
168
  def finalize_stream(self):
169
  """
app/utils.py CHANGED
@@ -16,6 +16,9 @@ import torch
16
  from app.streaming_audio_processor import StreamingAudioProcessor
17
  from app.session_utils import (
18
  get_active_task_flag_file,
 
 
 
19
  get_folder_chunks
20
  )
21
  from app.ui_utils import (
@@ -65,73 +68,73 @@ def generate_coturn_config():
65
 
66
 
67
 
68
- def read_and_stream_audio(filepath_to_stream: str, session_id: str, stop_streaming_flags: dict,read_size:int =8000, sample_rate:int =16000):
69
  """
70
  Read an audio file and stream it chunk by chunk (1s per chunk).
71
  Handles errors safely and reports structured messages to the client.
72
  """
73
- if not session_id:
74
- yield from handle_stream_error("unknown", "No session_id provided.", stop_streaming_flags)
75
  return
76
 
77
  if not filepath_to_stream or not os.path.exists(filepath_to_stream):
78
- yield from handle_stream_error(session_id, f"Audio file not found: {filepath_to_stream}", stop_streaming_flags)
79
  return
80
- transcribe_flag = get_active_task_flag_file(session_id)
81
  try:
82
  segment = AudioSegment.from_file(filepath_to_stream)
83
  chunk_duration_ms = int((read_size/sample_rate)*1000)
84
  total_chunks = len(segment) // chunk_duration_ms + 1
85
- logging.info(f"[{session_id}] Starting audio streaming {filepath_to_stream} ({total_chunks} chunks).")
 
86
 
87
  for i, chunk in enumerate(segment[::chunk_duration_ms]):
88
-
89
 
90
  frame_rate = chunk.frame_rate
91
  samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
92
  progress = round(((i + 1) / total_chunks) * 100, 2)
93
- if _is_stop_requested(stop_streaming_flags):
94
- logging.info(f"[{session_id}] Stop signal received. Terminating stream.")
95
  yield ((frame_rate, samples), AdditionalOutputs({"stoped": True, "value": "STREAM_STOPED"} ) )
96
  break
97
 
98
  yield ((frame_rate, samples), AdditionalOutputs({"progressed": True, "value": progress} ))
99
- # logging.debug(f"[{session_id}] Sent chunk {i+1}/{total_chunks} ({progress}%).")
100
 
101
  time.sleep(chunk_duration_ms/1000)
102
  # Save only if transcription is active
103
- if os.path.exists(transcribe_flag) :
104
- chunk_dir = get_folder_chunks(session_id)
105
  if not os.path.exists(chunk_dir) :
106
  os.makedirs(chunk_dir, exist_ok=True)
107
  npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz")
108
  chunk_array = np.array(chunk.get_array_of_samples(), dtype=np.int16)
109
- np.savez_compressed(npz_path, data=chunk_array, rate=frame_rate)
110
- logging.debug(f"[{session_id}] Saved chunk {i}/{total_chunks} (transcribe active) ({progress}%) ({npz_path}).")
 
111
 
112
  # raise_function() # Optional injected test exception
113
 
114
- logging.info(f"[{session_id}] Audio streaming completed successfully.")
115
 
116
  except asyncio.CancelledError:
117
- yield from handle_stream_error(session_id, "Streaming cancelled by user.", stop_streaming_flags)
118
  except FileNotFoundError as e:
119
- yield from handle_stream_error(session_id, e, stop_streaming_flags)
120
  except Exception as e:
121
- yield from handle_stream_error(session_id, e, stop_streaming_flags)
122
-
123
  finally:
124
- if isinstance(stop_streaming_flags, dict):
125
- stop_streaming_flags["stop"] = False
126
- logging.info(f"[{session_id}] Stop flag reset.")
127
 
128
 
129
 
130
- # asr_model = nemo_asr.models.ASRModel.from_pretrained("nvidia/canary-1b-v2")
131
- asr_model = None
132
 
133
  @spaces.GPU
134
- def task_fake(session_id: str,
135
  task_type, lang_source, lang_target,
136
  chunk_secs, left_context_secs, right_context_secs,
137
  streaming_policy, alignatt_thr, waitk_lagging,
@@ -158,21 +161,21 @@ def task_fake(session_id: str,
158
  # streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config)
159
  ##-----------
160
  yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
161
- yield (f"Task started for session {session_id}", "info", None)
162
 
163
- active_flag = get_active_task_flag_file(session_id)
164
  with open(active_flag, "w") as f:
165
  f.write("1")
166
- chunk_dir = get_folder_chunks(session_id)
167
- logging.info(f"[{session_id}] task started. {chunk_dir}")
168
 
169
  try:
170
- logging.info(f"[{session_id}] task loop started.")
171
- yield (f"Task started for session {session_id}", "info", None)
172
 
173
  while os.path.exists(active_flag):
174
  if not os.path.exists(chunk_dir):
175
- logging.warning(f"[{session_id}] No chunk directory found for task.")
176
  yield ("No audio chunks yet... waiting for stream.", "warning", None)
177
  time.sleep(0.1)
178
  continue
@@ -193,15 +196,15 @@ def task_fake(session_id: str,
193
  # for text in new_texts:
194
  # print(text, end='', flush=True)
195
  # yield (text, "success", text)
196
- # logging.debug(f"[{session_id}] {new_texts}")
197
  ##-----------
198
  ### TODO
199
  text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
200
  yield (text, "success", fname)
201
  os.remove(fpath)
202
- logging.debug(f"[{session_id}] Deleted processed chunk: {fname}")
203
  except Exception as e:
204
- logging.warning(f"[{session_id}] Error processing {fname}: {e}")
205
  yield (f"Error processing {fname}: {e}", "warning", fname)
206
  continue
207
  time.sleep(0.1)
@@ -212,31 +215,31 @@ def task_fake(session_id: str,
212
  # yield (text, "success", final_text)
213
  ##-----------
214
  yield ("DONE", "done", None)
215
- logging.info(f"[{session_id}] task loop ended (flag removed).")
216
 
217
  except Exception as e:
218
- logging.error(f"[{session_id}] task error: {e}", exc_info=True)
219
  yield (f"Unexpected error: {e}", "error", None)
220
 
221
  finally:
222
  if os.path.exists(active_flag):
223
  os.remove(active_flag)
224
- logging.info(f"[{session_id}] task stopped.")
225
 
226
  try:
227
  if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
228
  os.rmdir(chunk_dir)
229
- logging.debug(f"[{session_id}] Cleaned up empty chunk dir.")
230
  except Exception as e:
231
- logging.error(f"[{session_id}] Cleanup error: {e}")
232
  yield (f"Cleanup error: {e}", "error", None)
233
 
234
- logging.info(f"[{session_id}] Exiting task loop.")
235
  yield ("Task finished and cleaned up.", "done", None)
236
 
237
 
238
-
239
- def task(session_id: str,
240
  task_type, lang_source, lang_target,
241
  chunk_secs, left_context_secs, right_context_secs,
242
  streaming_policy, alignatt_thr, waitk_lagging,
@@ -260,21 +263,21 @@ def task(session_id: str,
260
  )
261
  streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config)
262
  yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
263
- yield (f"Task started for session {session_id}", "info", None)
264
 
265
- active_flag = get_active_task_flag_file(session_id)
266
  with open(active_flag, "w") as f:
267
  f.write("1")
268
- chunk_dir = get_folder_chunks(session_id)
269
- logging.info(f"[{session_id}] task started. {chunk_dir}")
270
 
271
  try:
272
- logging.info(f"[{session_id}] task loop started.")
273
- yield (f"Task started for session {session_id}", "info", None)
274
 
275
  while os.path.exists(active_flag):
276
  if not os.path.exists(chunk_dir):
277
- logging.warning(f"[{session_id}] No chunk directory found for task.")
278
  yield ("No audio chunks yet... waiting for stream.", "warning", None)
279
  time.sleep(0.1)
280
  continue
@@ -290,54 +293,52 @@ def task(session_id: str,
290
  npz = np.load(fpath)
291
  samples = npz["data"]
292
  rate = int(npz["rate"])
293
- new_texts = streamer.process_chunk(samples)
294
- for text in new_texts:
295
- print(text, end='', flush=True)
296
  yield (text, "success", text)
297
- logging.debug(f"[{session_id}] {new_texts}")
298
  ### TODO
299
  # text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
300
  # yield (text, "success", fname)
301
  os.remove(fpath)
302
- logging.debug(f"[{session_id}] Deleted processed chunk: {fname}")
303
  except Exception as e:
304
- logging.warning(f"[{session_id}] Error processing {fname}: {e}")
305
  yield (f"Error processing {fname}: {e}", "warning", fname)
306
  continue
307
  time.sleep(0.1)
308
 
309
  # TODO
310
- final_text = streamer.finalize_stream()
311
- yield (text, "success", final_text)
312
  # if final_text:
313
  # print(final_text, end='', flush=True)
314
  # yield f"\n{final_text}"
315
  ##
316
  yield ("DONE", "done", None)
317
- logging.info(f"[{session_id}] task loop ended (flag removed).")
318
 
319
  except Exception as e:
320
- logging.error(f"[{session_id}] task error: {e}", exc_info=True)
321
  yield (f"Unexpected error: {e}", "error", None)
322
 
323
  finally:
324
  if os.path.exists(active_flag):
325
  os.remove(active_flag)
326
- logging.info(f"[{session_id}] task stopped.")
327
 
328
  try:
329
  if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
330
  os.rmdir(chunk_dir)
331
- logging.debug(f"[{session_id}] Cleaned up empty chunk dir.")
332
  except Exception as e:
333
- logging.error(f"[{session_id}] Cleanup error: {e}")
334
  yield (f"Cleanup error: {e}", "error", None)
335
 
336
- logging.info(f"[{session_id}] Exiting task loop.")
337
  yield ("Task finished and cleaned up.", "done", None)
338
 
339
 
340
- def handle_stream_error(session_id: str, error: Exception | str, stop_streaming_flags: dict | None = None):
341
  """
342
  Handle streaming errors:
343
  - Log the error
@@ -349,20 +350,14 @@ def handle_stream_error(session_id: str, error: Exception | str, stop_streaming_
349
  else:
350
  msg = str(error)
351
 
352
- logging.error(f"[{session_id}] Streaming error: {msg}", exc_info=isinstance(error, Exception))
353
 
354
- if isinstance(stop_streaming_flags, dict):
355
- stop_streaming_flags["stop"] = False
356
 
357
  yield ((16000,np.zeros(16000, dtype=np.float32).reshape(1, -1)), AdditionalOutputs({"errored": True, "value": msg}))
358
 
359
 
360
 
361
- def _is_stop_requested(stop_streaming_flags: dict) -> bool:
362
- """Check if the stop signal was requested."""
363
- if not isinstance(stop_streaming_flags, dict):
364
- return False
365
- return bool(stop_streaming_flags.get("stop", False))
366
 
367
  # --- Decorator compatibility layer ---
368
  if os.environ.get("SPACE_ID", "").startswith("zero-gpu"):
@@ -378,14 +373,23 @@ else:
378
 
379
 
380
 
381
- def stop_streaming(session_id: str, stop_streaming_flags: dict):
382
  """Trigger the stop flag for active streaming."""
383
- logging.info(f"[{session_id}] Stop button clicked — sending stop signal.")
384
- if not isinstance(stop_streaming_flags, dict):
385
- stop_streaming_flags = {"stop": True}
386
- else:
387
- stop_streaming_flags["stop"] = True
388
- return stop_streaming_flags
 
 
 
 
 
 
 
 
 
389
 
390
 
391
  def raise_function():
 
16
  from app.streaming_audio_processor import StreamingAudioProcessor
17
  from app.session_utils import (
18
  get_active_task_flag_file,
19
+ get_active_stream_flag_file,
20
+ remove_active_stream_flag_file,
21
+ remove_active_task_flag_file,
22
  get_folder_chunks
23
  )
24
  from app.ui_utils import (
 
68
 
69
 
70
 
71
+ def read_and_stream_audio(filepath_to_stream: str, session_hash_code: str,read_size:int =8000, sample_rate:int =16000):
72
  """
73
  Read an audio file and stream it chunk by chunk (1s per chunk).
74
  Handles errors safely and reports structured messages to the client.
75
  """
76
+ if not session_hash_code:
77
+ yield from handle_stream_error("unknown", "No session_hash_code provided.")
78
  return
79
 
80
  if not filepath_to_stream or not os.path.exists(filepath_to_stream):
81
+ yield from handle_stream_error(session_hash_code, f"Audio file not found: {filepath_to_stream}")
82
  return
83
+ task_active_flag = get_active_task_flag_file(session_hash_code)
84
  try:
85
  segment = AudioSegment.from_file(filepath_to_stream)
86
  chunk_duration_ms = int((read_size/sample_rate)*1000)
87
  total_chunks = len(segment) // chunk_duration_ms + 1
88
+ start_streaming(session_hash_code)
89
+ logging.info(f"[{session_hash_code}] Starting audio streaming {filepath_to_stream} ({total_chunks} chunks).")
90
 
91
  for i, chunk in enumerate(segment[::chunk_duration_ms]):
92
+
93
 
94
  frame_rate = chunk.frame_rate
95
  samples = np.array(chunk.get_array_of_samples()).reshape(1, -1)
96
  progress = round(((i + 1) / total_chunks) * 100, 2)
97
+ if _is_stop_requested(session_hash_code):
98
+ logging.info(f"[{session_hash_code}] Stop signal received. Terminating stream.")
99
  yield ((frame_rate, samples), AdditionalOutputs({"stoped": True, "value": "STREAM_STOPED"} ) )
100
  break
101
 
102
  yield ((frame_rate, samples), AdditionalOutputs({"progressed": True, "value": progress} ))
103
+ logging.debug(f"[{session_hash_code}] Sent chunk {i+1}/{total_chunks} ({progress}%).")
104
 
105
  time.sleep(chunk_duration_ms/1000)
106
  # Save only if transcription is active
107
+ if os.path.exists(task_active_flag) :
108
+ chunk_dir = get_folder_chunks(session_hash_code)
109
  if not os.path.exists(chunk_dir) :
110
  os.makedirs(chunk_dir, exist_ok=True)
111
  npz_path = os.path.join(chunk_dir, f"chunk_{i:05d}.npz")
112
  chunk_array = np.array(chunk.get_array_of_samples(), dtype=np.int16)
113
+ if os.path.exists(task_active_flag):
114
+ np.savez_compressed(npz_path, data=chunk_array, rate=frame_rate)
115
+ logging.debug(f"[{session_hash_code}] Saved chunk {i}/{total_chunks} (transcribe active) ({progress}%) ({npz_path}).")
116
 
117
  # raise_function() # Optional injected test exception
118
 
119
+ logging.info(f"[{session_hash_code}] Audio streaming completed successfully.")
120
 
121
  except asyncio.CancelledError:
122
+ yield from handle_stream_error(session_hash_code, "Streaming cancelled by user.")
123
  except FileNotFoundError as e:
124
+ yield from handle_stream_error(session_hash_code, e)
125
  except Exception as e:
126
+ yield from handle_stream_error(session_hash_code, e)
 
127
  finally:
128
+ remove_active_stream_flag_file(session_hash_code)
129
+ logging.info(f"[{session_hash_code}] Stop flag reset.")
 
130
 
131
 
132
 
133
+ asr_model = nemo_asr.models.ASRModel.from_pretrained("nvidia/canary-1b-v2")
134
+ # asr_model = None
135
 
136
  @spaces.GPU
137
+ def task_fake(session_hash_code: str,
138
  task_type, lang_source, lang_target,
139
  chunk_secs, left_context_secs, right_context_secs,
140
  streaming_policy, alignatt_thr, waitk_lagging,
 
161
  # streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config)
162
  ##-----------
163
  yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
164
+ yield (f"Task started for session {session_hash_code}", "info", None)
165
 
166
+ active_flag = get_active_task_flag_file(session_hash_code)
167
  with open(active_flag, "w") as f:
168
  f.write("1")
169
+ chunk_dir = get_folder_chunks(session_hash_code)
170
+ logging.info(f"[{session_hash_code}] task started. {chunk_dir}")
171
 
172
  try:
173
+ logging.info(f"[{session_hash_code}] task loop started.")
174
+ yield (f"Task started for session {session_hash_code}", "info", None)
175
 
176
  while os.path.exists(active_flag):
177
  if not os.path.exists(chunk_dir):
178
+ logging.warning(f"[{session_hash_code}] No chunk directory found for task.")
179
  yield ("No audio chunks yet... waiting for stream.", "warning", None)
180
  time.sleep(0.1)
181
  continue
 
196
  # for text in new_texts:
197
  # print(text, end='', flush=True)
198
  # yield (text, "success", text)
199
+ # logging.debug(f"[{session_hash_code}] {new_texts}")
200
  ##-----------
201
  ### TODO
202
  text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
203
  yield (text, "success", fname)
204
  os.remove(fpath)
205
+ logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
206
  except Exception as e:
207
+ logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}")
208
  yield (f"Error processing {fname}: {e}", "warning", fname)
209
  continue
210
  time.sleep(0.1)
 
215
  # yield (text, "success", final_text)
216
  ##-----------
217
  yield ("DONE", "done", None)
218
+ logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
219
 
220
  except Exception as e:
221
+ logging.error(f"[{session_hash_code}] task error: {e}", exc_info=True)
222
  yield (f"Unexpected error: {e}", "error", None)
223
 
224
  finally:
225
  if os.path.exists(active_flag):
226
  os.remove(active_flag)
227
+ logging.info(f"[{session_hash_code}] task stopped.")
228
 
229
  try:
230
  if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
231
  os.rmdir(chunk_dir)
232
+ logging.debug(f"[{session_hash_code}] Cleaned up empty chunk dir.")
233
  except Exception as e:
234
+ logging.error(f"[{session_hash_code}] Cleanup error: {e}")
235
  yield (f"Cleanup error: {e}", "error", None)
236
 
237
+ logging.info(f"[{session_hash_code}] Exiting task loop.")
238
  yield ("Task finished and cleaned up.", "done", None)
239
 
240
 
241
+ @spaces.GPU
242
+ def task(session_hash_code: str,
243
  task_type, lang_source, lang_target,
244
  chunk_secs, left_context_secs, right_context_secs,
245
  streaming_policy, alignatt_thr, waitk_lagging,
 
263
  )
264
  streamer = StreamingAudioProcessor(speech_engine=canary_speech_engine,vad_engine=silero_vad_engine,cfg=streaming_audio_processor_config)
265
  yield ("initialized the CanarySpeechEngine and Silero_Vad_Engine", "info", None)
266
+ yield (f"Task started for session {session_hash_code}", "info", None)
267
 
268
+ active_flag = get_active_task_flag_file(session_hash_code)
269
  with open(active_flag, "w") as f:
270
  f.write("1")
271
+ chunk_dir = get_folder_chunks(session_hash_code)
272
+ logging.info(f"[{session_hash_code}] task started. {chunk_dir}")
273
 
274
  try:
275
+ logging.info(f"[{session_hash_code}] task loop started.")
276
+ yield (f"Task started for session {session_hash_code}", "info", None)
277
 
278
  while os.path.exists(active_flag):
279
  if not os.path.exists(chunk_dir):
280
+ logging.warning(f"[{session_hash_code}] No chunk directory found for task.")
281
  yield ("No audio chunks yet... waiting for stream.", "warning", None)
282
  time.sleep(0.1)
283
  continue
 
293
  npz = np.load(fpath)
294
  samples = npz["data"]
295
  rate = int(npz["rate"])
296
+ for text in streamer.process_chunk(samples) :
 
 
297
  yield (text, "success", text)
298
+ logging.debug(f"[{session_hash_code}] {text}")
299
  ### TODO
300
  # text = f"Transcribed {fname}: {len(samples)} samples @ {rate}Hz\n"
301
  # yield (text, "success", fname)
302
  os.remove(fpath)
303
+ logging.debug(f"[{session_hash_code}] Deleted processed chunk: {fname}")
304
  except Exception as e:
305
+ logging.warning(f"[{session_hash_code}] Error processing {fname}: {e}")
306
  yield (f"Error processing {fname}: {e}", "warning", fname)
307
  continue
308
  time.sleep(0.1)
309
 
310
  # TODO
311
+ # final_text = streamer.finalize_stream()
312
+ # yield (text, "success", final_text)
313
  # if final_text:
314
  # print(final_text, end='', flush=True)
315
  # yield f"\n{final_text}"
316
  ##
317
  yield ("DONE", "done", None)
318
+ logging.info(f"[{session_hash_code}] task loop ended (flag removed).")
319
 
320
  except Exception as e:
321
+ logging.error(f"[{session_hash_code}] task error: {e}", exc_info=True)
322
  yield (f"Unexpected error: {e}", "error", None)
323
 
324
  finally:
325
  if os.path.exists(active_flag):
326
  os.remove(active_flag)
327
+ logging.info(f"[{session_hash_code}] task stopped.")
328
 
329
  try:
330
  if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
331
  os.rmdir(chunk_dir)
332
+ logging.debug(f"[{session_hash_code}] Cleaned up empty chunk dir.")
333
  except Exception as e:
334
+ logging.error(f"[{session_hash_code}] Cleanup error: {e}")
335
  yield (f"Cleanup error: {e}", "error", None)
336
 
337
+ logging.info(f"[{session_hash_code}] Exiting task loop.")
338
  yield ("Task finished and cleaned up.", "done", None)
339
 
340
 
341
+ def handle_stream_error(session_hash_code: str, error: Exception):
342
  """
343
  Handle streaming errors:
344
  - Log the error
 
350
  else:
351
  msg = str(error)
352
 
353
+ logging.error(f"[{session_hash_code}] Streaming error: {msg}", exc_info=isinstance(error, Exception))
354
 
355
+ remove_active_stream_flag_file(session_hash_code)
 
356
 
357
  yield ((16000,np.zeros(16000, dtype=np.float32).reshape(1, -1)), AdditionalOutputs({"errored": True, "value": msg}))
358
 
359
 
360
 
 
 
 
 
 
361
 
362
  # --- Decorator compatibility layer ---
363
  if os.environ.get("SPACE_ID", "").startswith("zero-gpu"):
 
373
 
374
 
375
 
376
+ def stop_streaming(session_hash_code: str):
377
  """Trigger the stop flag for active streaming."""
378
+ logging.info(f"[{session_hash_code}] Stop button clicked — sending stop signal.")
379
+ remove_active_stream_flag_file(session_hash_code)
380
+ remove_active_task_flag_file(session_hash_code)
381
+
382
+ def start_streaming(session_hash_code: str):
383
+ """Trigger the start flag for active streaming."""
384
+ logging.info(f"[{session_hash_code}] Start button clicked — sending start signal.")
385
+ active_stream_flag = get_active_stream_flag_file(session_hash_code)
386
+ with open(active_stream_flag, "w") as f:
387
+ f.write("1")
388
+
389
+ def _is_stop_requested(session_hash_code) -> bool:
390
+ """Check if the stop signal was requested."""
391
+ return not os.path.exists(get_active_stream_flag_file(session_hash_code))
392
+
393
 
394
 
395
  def raise_function():