Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream with event queue #38996

Open
wants to merge 1 commit into
base: feature/azure-ai-projects-beta5
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 75 additions & 16 deletions sdk/ai/azure-ai-projects/azure/ai/projects/models/_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,7 @@ def __init__(self) -> None:
)
self.done: bool
self.buffer: str
self._event_queue: list[T] = []

def initialize(
self,
Expand All @@ -1244,23 +1245,52 @@ def initialize(
self.buffer = ""

async def __anext__(self) -> T:
if self._event_queue:
return self._event_queue.pop(0)

if self.response_iterator is None:
raise ValueError("The response handler was not initialized.")

# Otherwise, keep reading from the SSE until we find at least one event
async for chunk in self.response_iterator:
self.buffer += chunk.decode("utf-8")
split_buffer = self.buffer.split("\n\n")
self.buffer = split_buffer[-1]
for ln in split_buffer[:-1]:
self.buffer += chunk.decode("utf-8", errors="replace")

# Split on "\n\n" in SSE frames events
parts = self.buffer.split("\n\n")
self.buffer = parts[-1] # leftover (possibly partial event)
complete_parts = parts[:-1] # fully delimited events

for ln in complete_parts:
event = await self._process_event(ln)
if event:
return event
self._event_queue.append(event)

if self._event_queue:
# Return the first newly parsed event
return self._event_queue.pop(0)

# If we exit the for loop, the stream is exhausted (StopAsyncIteration on the upstream)
# We might still have leftover data in self.buffer that may contain 1 or more events
if self.buffer:
event = await self._process_event(self.buffer)
if event:
return event
parts = self.buffer.split("\n\n")
self.buffer = parts[-1]
for ln in parts[:-1]:
event = await self._process_event(ln)
if event:
self._event_queue.append(event)

# If there's a leftover piece that might be a complete event
if self.buffer:
event = await self._process_event(self.buffer)
self.buffer = "" # we've parsed it
if event:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can modify _process_event to return without Optional and eliminate the if event check here.

self._event_queue.append(event)

# If we now have events queued from parsing leftover, yield the first
if self._event_queue:
return self._event_queue.pop(0)

# Otherwise, there’s nothing left, so end iteration
raise StopAsyncIteration()

async def _process_event(self, event_data_str: str) -> Optional[T]:
Expand All @@ -1286,6 +1316,7 @@ def __init__(self) -> None:
self.submit_tool_outputs: Optional[Callable[[ThreadRun, "BaseAgentEventHandler[T]"], Awaitable[None]]] = None
self.done: bool
self.buffer: str
self._event_queue: list[T] = []

def initialize(
self,
Expand All @@ -1300,22 +1331,50 @@ def initialize(
self.buffer = ""

def __next__(self) -> T:
if self._event_queue:
return self._event_queue.pop(0)

if self.response_iterator is None:
raise ValueError("The response handler was not initialized.")

# Otherwise, keep reading from the SSE until we find at least one event
for chunk in self.response_iterator:
self.buffer += chunk.decode("utf-8")
split_buffer = self.buffer.split("\n\n")
self.buffer = split_buffer[-1]
for ln in split_buffer[:-1]:
self.buffer += chunk.decode("utf-8", errors="replace")

# Split on "\n\n" if that's how your SSE frames events
parts = self.buffer.split("\n\n")
self.buffer = parts[-1] # leftover (possibly partial event)
complete_parts = parts[:-1] # fully delimited events

for ln in complete_parts:
event = self._process_event(ln)
if event:
return event
self._event_queue.append(event)

if self._event_queue:
# Return the first newly parsed event
return self._event_queue.pop(0)

# If we exit the for loop, the stream is exhausted (StopIteration on the upstream)
# We might still have leftover data in self.buffer that may contain 1 or more events
if self.buffer:
event = self._process_event(self.buffer)
if event:
return event
parts = self.buffer.split("\n\n")
self.buffer = parts[-1]
for ln in parts[:-1]:
event = self._process_event(ln)
if event:
self._event_queue.append(event)

# If there's a leftover piece that might be a complete event
if self.buffer:
event = self._process_event(self.buffer)
self.buffer = "" # we've parsed it
if event:
self._event_queue.append(event)

# If we now have events queued from parsing leftover, yield the first
if self._event_queue:
return self._event_queue.pop(0)

raise StopIteration()

Expand Down
Loading