VibecoderMcSwaggins commited on
Commit
e85ccf5
·
unverified ·
1 Parent(s): 32fc7aa

refactor(orchestrator): Introduce WorkflowState dataclass (Priority 7) (#129)

Browse files

Priority 7: WorkflowState Dataclass + ConfigurationError Fix

- Introduced WorkflowState dataclass in advanced.py
- Fixed CodeRabbit feedback: Use ConfigurationError instead of ValueError in registry.py

✅ All checks passed

src/clients/registry.py CHANGED
@@ -10,6 +10,7 @@ import structlog
10
  from agent_framework import BaseChatClient
11
 
12
  from src.utils.config import Settings
 
13
 
14
  logger = structlog.get_logger()
15
 
@@ -87,4 +88,4 @@ class ProviderRegistry:
87
  logger.info(f"Using {p.name} Chat Client")
88
  return p.create(settings, api_key, model_id, **kwargs)
89
 
90
- raise ValueError(f"No suitable provider found for provider={provider}")
 
10
  from agent_framework import BaseChatClient
11
 
12
  from src.utils.config import Settings
13
+ from src.utils.exceptions import ConfigurationError
14
 
15
  logger = structlog.get_logger()
16
 
 
88
  logger.info(f"Using {p.name} Chat Client")
89
  return p.create(settings, api_key, model_id, **kwargs)
90
 
91
+ raise ConfigurationError(f"No suitable provider found for provider={provider}")
src/orchestrators/advanced.py CHANGED
@@ -17,6 +17,7 @@ Design Patterns:
17
 
18
  import asyncio
19
  from collections.abc import AsyncGenerator
 
20
  from typing import TYPE_CHECKING, Any, Literal
21
 
22
  import structlog
@@ -56,6 +57,18 @@ JUDGE_AGENT_ID = "judge"
56
  HYPOTHESIZER_AGENT_ID = "hypothesizer"
57
 
58
 
 
 
 
 
 
 
 
 
 
 
 
 
59
  class AdvancedOrchestrator(OrchestratorProtocol):
60
  """
61
  Advanced orchestrator using Microsoft Agent Framework ChatAgent pattern.
@@ -305,16 +318,7 @@ The final output should be a structured research report."""
305
  iteration=0,
306
  )
307
 
308
- iteration = 0
309
- final_event_received = False
310
- reporter_ran = False # P1 FIX: Track if ReportAgent produced output
311
-
312
- # ACCUMULATOR PATTERN: Track streaming content to bypass upstream Repr Bug
313
- # Upstream bug in _magentic.py flattens message.contents and sets message.text
314
- # to repr(message) if text is empty. We must reconstruct text from Deltas.
315
- current_message_buffer: str = ""
316
- current_agent_id: str | None = None
317
- last_streamed_length: int = 0 # Track for P2 Duplicate Report Bug Fix
318
 
319
  try:
320
  async with asyncio.timeout(self._timeout_seconds):
@@ -322,92 +326,99 @@ The final output should be a structured research report."""
322
  # 1. Handle Streaming (Source of Truth for Content)
323
  if isinstance(event, MagenticAgentDeltaEvent):
324
  # Detect agent switch to clear buffer
325
- if event.agent_id != current_agent_id:
326
- current_message_buffer = ""
327
- current_agent_id = event.agent_id
328
 
329
  if event.text:
330
- current_message_buffer += event.text
331
  yield AgentEvent(
332
  type="streaming",
333
  message=event.text,
334
  data={"agent_id": event.agent_id},
335
- iteration=iteration,
336
  )
337
  continue
338
 
339
  # 2. Handle Completion Signal
340
  # We use our accumulated buffer instead of the corrupted event.message
341
  if isinstance(event, MagenticAgentMessageEvent):
342
- iteration += 1
343
 
344
  # P1 FIX: Track if ReportAgent produced output
345
  agent_name = (event.agent_id or "").lower()
346
  if REPORTER_AGENT_ID in agent_name:
347
- reporter_ran = True
348
 
349
  comp_event, prog_event = self._handle_completion_event(
350
- event, current_message_buffer, iteration
351
  )
352
  yield comp_event
353
  yield prog_event
354
 
355
  # P2 BUG FIX: Save length before clearing
356
- last_streamed_length = len(current_message_buffer)
357
  # Clear buffer after consuming
358
- current_message_buffer = ""
359
  continue
360
 
361
  # 3. Handle Final Events Inline (P2 Duplicate Report Fix + P1 Forced Synthesis)
362
  if isinstance(event, (MagenticFinalResultEvent, WorkflowOutputEvent)):
363
- if final_event_received:
364
  continue # Skip duplicate final events
365
- final_event_received = True
366
 
367
  # P1 FIX: Force synthesis if ReportAgent never ran
368
- if not reporter_ran:
369
  logger.warning(
370
  "ReportAgent never ran - forcing synthesis",
371
- iterations=iteration,
372
  )
373
  async for synth_event in self._synthesize_fallback(
374
- iteration, "no_reporter"
375
  ):
376
  yield synth_event
377
  else:
378
- yield self._handle_final_event(event, iteration, last_streamed_length)
 
 
379
  continue
380
 
381
  # 4. Handle other events normally
382
- agent_event = self._process_event(event, iteration)
383
  if agent_event:
384
  yield agent_event
385
 
386
  # GUARANTEE: Always emit termination event if stream ends without one
387
  # (e.g., max rounds reached)
388
- if not final_event_received:
389
  logger.warning(
390
  "Workflow ended without final event",
391
- iterations=iteration,
392
  )
393
  # P1 FIX: Force synthesis if ReportAgent never ran
394
- if not reporter_ran:
395
- async for synth_event in self._synthesize_fallback(iteration, "max_rounds"):
 
 
396
  yield synth_event
397
  else:
398
  yield AgentEvent(
399
  type="complete",
400
  message=(
401
- f"Research completed after {iteration} agent rounds. "
402
  "Max iterations reached - results may be partial. "
403
  "Try a more specific query for better results."
404
  ),
405
- data={"iterations": iteration, "reason": "max_rounds_reached"},
406
- iteration=iteration,
 
 
 
407
  )
408
 
409
  except TimeoutError:
410
- async for event in self._synthesize_fallback(iteration, "timeout"):
411
  yield event
412
 
413
  except Exception as e:
@@ -415,7 +426,7 @@ The final output should be a structured research report."""
415
  yield AgentEvent(
416
  type="error",
417
  message=f"Workflow error: {e!s}",
418
- iteration=iteration,
419
  )
420
 
421
  def _handle_completion_event(
 
17
 
18
  import asyncio
19
  from collections.abc import AsyncGenerator
20
+ from dataclasses import dataclass
21
  from typing import TYPE_CHECKING, Any, Literal
22
 
23
  import structlog
 
57
  HYPOTHESIZER_AGENT_ID = "hypothesizer"
58
 
59
 
60
+ @dataclass
61
+ class WorkflowState:
62
+ """Tracks mutable state during workflow execution."""
63
+
64
+ iteration: int = 0
65
+ reporter_ran: bool = False
66
+ current_message_buffer: str = ""
67
+ current_agent_id: str | None = None
68
+ last_streamed_length: int = 0
69
+ final_event_received: bool = False
70
+
71
+
72
  class AdvancedOrchestrator(OrchestratorProtocol):
73
  """
74
  Advanced orchestrator using Microsoft Agent Framework ChatAgent pattern.
 
318
  iteration=0,
319
  )
320
 
321
+ state = WorkflowState()
 
 
 
 
 
 
 
 
 
322
 
323
  try:
324
  async with asyncio.timeout(self._timeout_seconds):
 
326
  # 1. Handle Streaming (Source of Truth for Content)
327
  if isinstance(event, MagenticAgentDeltaEvent):
328
  # Detect agent switch to clear buffer
329
+ if event.agent_id != state.current_agent_id:
330
+ state.current_message_buffer = ""
331
+ state.current_agent_id = event.agent_id
332
 
333
  if event.text:
334
+ state.current_message_buffer += event.text
335
  yield AgentEvent(
336
  type="streaming",
337
  message=event.text,
338
  data={"agent_id": event.agent_id},
339
+ iteration=state.iteration,
340
  )
341
  continue
342
 
343
  # 2. Handle Completion Signal
344
  # We use our accumulated buffer instead of the corrupted event.message
345
  if isinstance(event, MagenticAgentMessageEvent):
346
+ state.iteration += 1
347
 
348
  # P1 FIX: Track if ReportAgent produced output
349
  agent_name = (event.agent_id or "").lower()
350
  if REPORTER_AGENT_ID in agent_name:
351
+ state.reporter_ran = True
352
 
353
  comp_event, prog_event = self._handle_completion_event(
354
+ event, state.current_message_buffer, state.iteration
355
  )
356
  yield comp_event
357
  yield prog_event
358
 
359
  # P2 BUG FIX: Save length before clearing
360
+ state.last_streamed_length = len(state.current_message_buffer)
361
  # Clear buffer after consuming
362
+ state.current_message_buffer = ""
363
  continue
364
 
365
  # 3. Handle Final Events Inline (P2 Duplicate Report Fix + P1 Forced Synthesis)
366
  if isinstance(event, (MagenticFinalResultEvent, WorkflowOutputEvent)):
367
+ if state.final_event_received:
368
  continue # Skip duplicate final events
369
+ state.final_event_received = True
370
 
371
  # P1 FIX: Force synthesis if ReportAgent never ran
372
+ if not state.reporter_ran:
373
  logger.warning(
374
  "ReportAgent never ran - forcing synthesis",
375
+ iterations=state.iteration,
376
  )
377
  async for synth_event in self._synthesize_fallback(
378
+ state.iteration, "no_reporter"
379
  ):
380
  yield synth_event
381
  else:
382
+ yield self._handle_final_event(
383
+ event, state.iteration, state.last_streamed_length
384
+ )
385
  continue
386
 
387
  # 4. Handle other events normally
388
+ agent_event = self._process_event(event, state.iteration)
389
  if agent_event:
390
  yield agent_event
391
 
392
  # GUARANTEE: Always emit termination event if stream ends without one
393
  # (e.g., max rounds reached)
394
+ if not state.final_event_received:
395
  logger.warning(
396
  "Workflow ended without final event",
397
+ iterations=state.iteration,
398
  )
399
  # P1 FIX: Force synthesis if ReportAgent never ran
400
+ if not state.reporter_ran:
401
+ async for synth_event in self._synthesize_fallback(
402
+ state.iteration, "max_rounds"
403
+ ):
404
  yield synth_event
405
  else:
406
  yield AgentEvent(
407
  type="complete",
408
  message=(
409
+ f"Research completed after {state.iteration} agent rounds. "
410
  "Max iterations reached - results may be partial. "
411
  "Try a more specific query for better results."
412
  ),
413
+ data={
414
+ "iterations": state.iteration,
415
+ "reason": "max_rounds_reached",
416
+ },
417
+ iteration=state.iteration,
418
  )
419
 
420
  except TimeoutError:
421
+ async for event in self._synthesize_fallback(state.iteration, "timeout"):
422
  yield event
423
 
424
  except Exception as e:
 
426
  yield AgentEvent(
427
  type="error",
428
  message=f"Workflow error: {e!s}",
429
+ iteration=state.iteration,
430
  )
431
 
432
  def _handle_completion_event(
tests/unit/clients/test_chat_client_factory.py CHANGED
@@ -4,6 +4,8 @@ from unittest.mock import MagicMock, patch
4
 
5
  import pytest
6
 
 
 
7
  # Skip if agent-framework-core not installed
8
  pytest.importorskip("agent_framework")
9
 
@@ -71,15 +73,15 @@ class TestChatClientFactory:
71
 
72
  assert "HuggingFace" in type(client).__name__
73
 
74
- def test_unsupported_provider_raises_value_error(self) -> None:
75
- """Unsupported provider should raise ValueError, not silently fallback."""
76
  with patch("src.clients.factory.settings") as mock_settings:
77
  mock_settings.has_openai_key = False
78
  mock_settings.has_gemini_key = False
79
 
80
  from src.clients.factory import get_chat_client
81
 
82
- with pytest.raises(ValueError, match="No suitable provider found"):
83
  get_chat_client(provider="invalid_provider")
84
 
85
  def test_byok_auto_detects_openai_from_key_prefix(self) -> None:
 
4
 
5
  import pytest
6
 
7
+ from src.utils.exceptions import ConfigurationError
8
+
9
  # Skip if agent-framework-core not installed
10
  pytest.importorskip("agent_framework")
11
 
 
73
 
74
  assert "HuggingFace" in type(client).__name__
75
 
76
+ def test_unsupported_provider_raises_configuration_error(self) -> None:
77
+ """Unsupported provider should raise ConfigurationError, not silently fallback."""
78
  with patch("src.clients.factory.settings") as mock_settings:
79
  mock_settings.has_openai_key = False
80
  mock_settings.has_gemini_key = False
81
 
82
  from src.clients.factory import get_chat_client
83
 
84
+ with pytest.raises(ConfigurationError, match="No suitable provider found"):
85
  get_chat_client(provider="invalid_provider")
86
 
87
  def test_byok_auto_detects_openai_from_key_prefix(self) -> None: