Update queue with using deque & Update requirements (#2428)

* Update queue.py

* Update queue.py

* Update CHANGELOG.md

* Update CHANGELOG.md

* Update queue.py

* Update requirements.txt

websockets with version <=9.1 with raise TypeError: WebSocketCommonProtocol.__init__() got an unexpected keyword argument 'logger'

* fix issues after rebase

Co-authored-by: Freddy Boulton <alfonsoboulton@gmail.com>
This commit is contained in:
GLGDLY 2022-11-05 04:53:23 +08:00 committed by GitHub
parent f795a4d3b1
commit b1cc5be78d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 29 deletions

View File

@ -56,6 +56,7 @@ No changes to highlight.
## Full Changelog:
* Add `api_name` to `Blocks.__call__` by [@freddyaboulton](https://github.com/freddyaboulton) in [PR 2593](https://github.com/gradio-app/gradio/pull/2593)
* Update queue with using deque & update requirements by [@GLGDLY](https://github.com/GLGDLY) in [PR 2428](https://github.com/gradio-app/gradio/pull/2428)
## Contributors Shoutout:

View File

@ -4,7 +4,9 @@ import asyncio
import copy
import sys
import time
from typing import Dict, List, Optional, Tuple
from collections import deque
from itertools import islice
from typing import Deque, Dict, List, Optional, Tuple
import fastapi
from pydantic import BaseModel
@ -45,7 +47,7 @@ class Queue:
max_size: Optional[int],
blocks_dependencies: List,
):
self.event_queue: List[Event] = []
self.event_queue: Deque[Event] = deque()
self.events_pending_reconnection = []
self.stopped = False
self.max_thread_count = concurrency_count
@ -93,7 +95,7 @@ class Queue:
if not (self.event_queue):
return None, False
first_event = self.event_queue.pop(0)
first_event = self.event_queue.popleft()
events = [first_event]
event_fn_index = first_event.fn_index
@ -136,10 +138,11 @@ class Queue:
Returns:
rank of submitted Event
"""
if self.max_size is not None and len(self.event_queue) >= self.max_size:
queue_len = len(self.event_queue)
if self.max_size is not None and queue_len >= self.max_size:
return None
self.event_queue.append(event)
return len(self.event_queue) - 1
return queue_len
async def clean_event(self, event: Event) -> None:
if event in self.event_queue:
@ -161,7 +164,7 @@ class Queue:
await asyncio.gather(
*[
self.gather_event_data(event)
for event in self.event_queue[: self.data_gathering_start]
for event in islice(self.event_queue, self.data_gathering_start)
]
)

View File

@ -19,4 +19,4 @@ Jinja2
fsspec
httpx
pydantic
websockets
websockets>=10.0

View File

@ -1,5 +1,6 @@
import os
import sys
from collections import deque
from unittest.mock import MagicMock, patch
import pytest
@ -336,18 +337,21 @@ class TestQueueBatch:
class TestGetEventsInBatch:
def test_empty_event_queue(self, queue: Queue):
queue.event_queue = []
queue.event_queue = deque()
events, _ = queue.get_events_in_batch()
assert events is None
def test_single_type_of_event(self, queue: Queue):
queue.blocks_dependencies = [{"batch": True, "max_batch_size": 3}]
queue.event_queue = [
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=0),
]
queue.event_queue = deque()
queue.event_queue.extend(
[
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=0),
]
)
events, batch = queue.get_events_in_batch()
assert batch
assert [e.fn_index for e in events] == [0, 0, 0]
@ -361,14 +365,17 @@ class TestGetEventsInBatch:
{"batch": True, "max_batch_size": 3},
{"batch": True, "max_batch_size": 2},
]
queue.event_queue = [
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=1),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=1),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=0),
]
queue.event_queue = deque()
queue.event_queue.extend(
[
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=1),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=1),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=0),
]
)
events, batch = queue.get_events_in_batch()
assert batch
assert [e.fn_index for e in events] == [0, 0, 0]
@ -386,13 +393,16 @@ class TestGetEventsInBatch:
{"batch": True, "max_batch_size": 3},
{"batch": False},
]
queue.event_queue = [
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=1),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=1),
Event(websocket=MagicMock(), fn_index=1),
]
queue.event_queue = deque()
queue.event_queue.extend(
[
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=1),
Event(websocket=MagicMock(), fn_index=0),
Event(websocket=MagicMock(), fn_index=1),
Event(websocket=MagicMock(), fn_index=1),
]
)
events, batch = queue.get_events_in_batch()
assert batch
assert [e.fn_index for e in events] == [0, 0]