Run events every given number of seconds (#2512)

* WIP

* Fix implementation

* Fix units

* Rename

* Delete dead code

* This works

* Fix tests

* Final fix - still rough

* Reduce code duplication

* Fixing bugs + test

* Changelog

* Add gif to changelog

* Update demo and remove unused imports

* undo accidental diff

* Linting

* Add to guides

* Address comments

* formatting changelog

* Fix warnings
This commit is contained in:
Freddy Boulton 2022-10-28 17:53:06 -04:00 committed by GitHub
parent a9f6d53016
commit a23eb53e3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 372 additions and 103 deletions

View File

@ -1,7 +1,50 @@
# Upcoming Release
## New Features:
No changes to highlight.
### Running Events Continuously
Gradio now supports the ability to run an event continuously on a fixed schedule. To use this feature,
pass `every=# of seconds` to the event definition. This will run the event every given number of seconds!
This can be used to:
* Create live visualizations that show the most up to date data
* Refresh the state of the frontend automatically in response to changes in the backend
Here is an example of a live plot that refreshes every half second:
```python
import math
import gradio as gr
import plotly.express as px
import numpy as np
plot_end = 2 * math.pi
def get_plot(period=1):
global plot_end
x = np.arange(plot_end - 2 * math.pi, plot_end, 0.02)
y = np.sin(2*math.pi*period * x)
fig = px.line(x=x, y=y)
plot_end += 2 * math.pi
return fig
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
gr.Markdown("Change the value of the slider to automatically update the plot")
period = gr.Slider(label="Period of plot", value=1, minimum=0, maximum=10, step=1)
plot = gr.Plot(label="Plot (updates every half second)")
dep = demo.load(get_plot, None, plot, every=0.5)
period.change(get_plot, period, plot, every=0.5, cancels=[dep])
demo.queue().launch()
```
![live_demo](https://user-images.githubusercontent.com/41651716/198357377-633ce460-4e31-47bd-8202-1440cdd6fe19.gif)
## Bug Fixes:
No changes to highlight.
@ -42,6 +85,7 @@ No changes to highlight.
No changes to highlight.
## Full Changelog:
* Added the `every` keyword to event listeners that runs events on a fixed schedule by [@freddyaboulton](https://github.com/freddyaboulton) in [PR 2512](https://github.com/gradio-app/gradio/pull/2512)
* Fix whitespace issue when using plotly. [@dawoodkhan82](https://github.com/dawoodkhan82) in [PR 2548](https://github.com/gradio-app/gradio/pull/2548)
* Apply appropriate alt text to all gallery images. [@camenduru](https://github.com/camenduru) in [PR 2358](https://github.com/gradio-app/gradio/pull/2538)

View File

@ -0,0 +1 @@
plotly

View File

@ -0,0 +1,42 @@
import math
import gradio as gr
import datetime
import plotly.express as px
import numpy as np
def get_time():
return datetime.datetime.now()
plot_end = 2 * math.pi
def get_plot(period=1):
global plot_end
x = np.arange(plot_end - 2 * math.pi, plot_end, 0.02)
y = np.sin(2*math.pi*period * x)
fig = px.line(x=x, y=y)
plot_end += 2 * math.pi
return fig
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
c_time2 = gr.Textbox(label="Current Time refreshed every second")
gr.Markdown("Change the value of the slider to automatically update the plot")
period = gr.Slider(label="Period of plot", value=1, minimum=0, maximum=10, step=1)
plot = gr.Plot(label="Plot (updates every half second)")
with gr.Column():
name = gr.Textbox(label="Enter your name")
greeting = gr.Textbox(label="Greeting")
button = gr.Button(value="Greet")
button.click(lambda s: f"Hello {s}", name, greeting)
demo.load(lambda: datetime.datetime.now(), None, c_time2, every=1)
dep = demo.load(get_plot, None, plot, every=0.5)
period.change(get_plot, period, plot, every=0.5, cancels=[dep])
if __name__ == "__main__":
demo.queue().launch()

View File

@ -0,0 +1 @@
plotly

31
demo/sine_curve/run.py Normal file
View File

@ -0,0 +1,31 @@
import math
import gradio as gr
import plotly.express as px
import numpy as np
plot_end = 2 * math.pi
def get_plot(period=1):
global plot_end
x = np.arange(plot_end - 2 * math.pi, plot_end, 0.02)
y = np.sin(2*math.pi*period * x)
fig = px.line(x=x, y=y)
plot_end += 2 * math.pi
return fig
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
gr.Markdown("Change the value of the slider to automatically update the plot")
period = gr.Slider(label="Period of plot", value=1, minimum=0, maximum=10, step=1)
plot = gr.Plot(label="Plot (updates every half second)")
dep = demo.load(get_plot, None, plot, every=1)
period.change(get_plot, period, plot, every=1, cancels=[dep])
if __name__ == "__main__":
demo.queue().launch()

View File

@ -51,6 +51,7 @@ from gradio.utils import (
component_or_layout_class,
delete_none,
get_cancel_function,
get_continuous_fn,
)
set_documentation_group("blocks")
@ -139,6 +140,7 @@ class Block:
batch: bool = False,
max_batch_size: int = 4,
cancels: List[int] | None = None,
every: float | None = None,
) -> Dict[str, Any]:
"""
Adds an event to the component's dependencies.
@ -178,13 +180,24 @@ class Block:
elif not isinstance(outputs, list):
outputs = [outputs]
if fn is not None:
if fn is not None and not cancels:
check_function_inputs_match(fn, inputs, inputs_as_dict)
if Context.root_block is None:
raise AttributeError(
f"{event_name}() and other events can only be called within a Blocks context."
)
if every is not None and every <= 0:
raise ValueError("Parameter every must be positive or None")
if every and batch:
raise ValueError(
f"Cannot run {event_name} event in a batch and every {every} seconds. "
"Either batch is True or every is non-zero but not both."
)
if every:
fn = get_continuous_fn(fn, every)
Context.root_block.fns.append(
BlockFunction(fn, inputs, outputs, preprocess, postprocess, inputs_as_dict)
)
@ -197,6 +210,7 @@ class Block:
"api_name {} already exists, using {}".format(api_name, api_name_)
)
api_name = api_name_
dependency = {
"targets": [self._id] if not no_target else [],
"trigger": event_name,
@ -208,6 +222,7 @@ class Block:
"api_name": api_name,
"scroll_to_output": scroll_to_output,
"show_progress": show_progress,
"every": every,
"batch": batch,
"max_batch_size": max_batch_size,
"cancels": cancels or [],
@ -929,7 +944,6 @@ class Blocks(BlockContext):
data = [self.postprocess_data(fn_index, o, state) for o in zip(*preds)]
data = list(zip(*data))
is_generating, iterator = None, None
else:
inputs = self.preprocess_data(fn_index, inputs, state)
iterator = iterators.get(fn_index, None) if iterators else None
@ -1027,8 +1041,9 @@ class Blocks(BlockContext):
api_key: Optional[str] = None,
alias: Optional[str] = None,
_js: Optional[str] = None,
every: None | int = None,
**kwargs,
) -> Blocks | None:
) -> Blocks | Dict[str, Any] | None:
"""
For reverse compatibility reasons, this is both a class method and an instance
method, the two of which, confusingly, do two completely different things.
@ -1046,6 +1061,7 @@ class Blocks(BlockContext):
fn: Instance Method - Callable function
inputs: Instance Method - input list
outputs: Instance Method - output list
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
Example:
import gradio as gr
import datetime
@ -1070,13 +1086,14 @@ class Blocks(BlockContext):
kwargs["outputs"] = outputs
return external.load_blocks_from_repo(name, src, api_key, alias, **kwargs)
else:
self_or_cls.set_event_trigger(
return self_or_cls.set_event_trigger(
event_name="load",
fn=fn,
inputs=inputs,
outputs=outputs,
no_target=True,
js=_js,
no_target=True,
every=every,
)
def clear(self):

View File

@ -44,6 +44,7 @@ class Changeable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -63,27 +64,28 @@ class Changeable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
if status_tracker:
warnings.warn(
"The 'status_tracker' parameter has been deprecated and has no effect."
)
dep = self.set_event_trigger(
"change",
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "change", cancels)
return dep
@ -105,6 +107,7 @@ class Clickable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -124,6 +127,7 @@ class Clickable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
if status_tracker:
@ -136,15 +140,16 @@ class Clickable(Block):
fn,
inputs,
outputs,
api_name=api_name,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
every=every,
)
set_cancel_events(self, "click", cancels)
return dep
@ -166,6 +171,7 @@ class Submittable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -186,6 +192,7 @@ class Submittable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
if status_tracker:
@ -198,15 +205,16 @@ class Submittable(Block):
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "submit", cancels)
return dep
@ -228,6 +236,7 @@ class Editable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -247,6 +256,7 @@ class Editable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
if status_tracker:
@ -259,15 +269,16 @@ class Editable(Block):
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "edit", cancels)
return dep
@ -289,6 +300,7 @@ class Clearable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -308,6 +320,7 @@ class Clearable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
if status_tracker:
@ -320,15 +333,16 @@ class Clearable(Block):
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "submit", cancels)
return dep
@ -350,6 +364,7 @@ class Playable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -369,6 +384,7 @@ class Playable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
if status_tracker:
@ -381,15 +397,16 @@ class Playable(Block):
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "play", cancels)
return dep
@ -409,6 +426,7 @@ class Playable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -428,6 +446,7 @@ class Playable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
if status_tracker:
@ -440,15 +459,16 @@ class Playable(Block):
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "pause", cancels)
return dep
@ -468,6 +488,7 @@ class Playable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -487,6 +508,7 @@ class Playable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
if status_tracker:
@ -499,15 +521,16 @@ class Playable(Block):
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "stop", cancels)
return dep
@ -529,6 +552,7 @@ class Streamable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -548,6 +572,7 @@ class Streamable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
self.streaming = True
@ -562,15 +587,16 @@ class Streamable(Block):
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "stream", cancels)
return dep
@ -591,6 +617,7 @@ class Blurrable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: Dict[str, Any] | List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -609,6 +636,7 @@ class Blurrable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
@ -617,15 +645,16 @@ class Blurrable(Block):
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "blur", cancels)
@ -645,6 +674,7 @@ class Uploadable(Block):
preprocess: bool = True,
postprocess: bool = True,
cancels: List[Dict[str, Any]] | None = None,
every: float | None = None,
_js: Optional[str] = None,
):
"""
@ -663,6 +693,7 @@ class Uploadable(Block):
preprocess: If False, will not run preprocessing of component data before running 'fn' (e.g. leaving it as a base64 string if this method is called with the `Image` component).
postprocess: If False, will not run postprocessing of component data before returning 'fn' output to the browser.
cancels: A list of other events to cancel when this event is triggered. For example, setting cancels=[click_event] will cancel the click_event, where click_event is the return value of another components .click method.
every: Run this event 'every' number of seconds. Interpreted in seconds. Queue must be enabled.
"""
# _js: Optional frontend js method to run before running 'fn'. Input arguments for js method are values of 'inputs' and 'outputs', return should be a list of values for output components.
@ -671,14 +702,15 @@ class Uploadable(Block):
fn,
inputs,
outputs,
api_name=api_name,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
js=_js,
preprocess=preprocess,
postprocess=postprocess,
scroll_to_output=scroll_to_output,
show_progress=show_progress,
api_name=api_name,
js=_js,
queue=queue,
batch=batch,
max_batch_size=max_batch_size,
every=every,
)
set_cancel_events(self, "upload", cancels)

View File

@ -10,7 +10,7 @@ import fastapi
from pydantic import BaseModel
from gradio.dataclasses import PredictBody
from gradio.utils import Request, run_coro_in_background
from gradio.utils import Request, run_coro_in_background, set_task_name
class Estimation(BaseModel):
@ -118,7 +118,6 @@ class Queue:
if not (None in self.active_jobs):
await asyncio.sleep(self.sleep_when_free)
continue
# Using mutex to avoid editing a list in use
async with self.delete_lock:
events, batch = self.get_events_in_batch()
@ -127,10 +126,7 @@ class Queue:
self.active_jobs[self.active_jobs.index(None)] = events
task = run_coro_in_background(self.process_events, events, batch)
run_coro_in_background(self.broadcast_live_estimations)
if sys.version_info >= (3, 8) and not (
batch
): # You shouldn't be able to cancel a task if it's part of a batch
task.set_name(f"{events[0].session_hash}_{events[0].fn_index}")
set_task_name(task, events[0].session_hash, events[0].fn_index, batch)
def push(self, event: Event) -> int | None:
"""
@ -349,14 +345,7 @@ class Queue:
# If the job finished successfully, this has no effect
# If the job is cancelled, this will enable future runs
# to start "from scratch"
await Request(
method=Request.Method.POST,
url=f"{self.server_path}reset",
json={
"session_hash": event.session_hash,
"fn_index": event.fn_index,
},
)
await self.reset_iterators(event.session_hash, event.fn_index)
async def send_message(self, event, data: Dict) -> bool:
try:
@ -373,3 +362,13 @@ class Queue:
except:
await self.clean_event(event)
return None
async def reset_iterators(self, session_hash: str, fn_index: int):
await Request(
method=Request.Method.POST,
url=f"{self.server_path}reset",
json={
"session_hash": session_hash,
"fn_index": fn_index,
},
)

View File

@ -34,6 +34,7 @@ from gradio.dataclasses import PredictBody, ResetBody
from gradio.documentation import document, set_documentation_group
from gradio.exceptions import Error
from gradio.queue import Estimation, Event
from gradio.utils import cancel_tasks, run_coro_in_background, set_task_name
mimetypes.init()
@ -354,14 +355,26 @@ class App(FastAPI):
event.session_hash = session_hash["session_hash"]
event.fn_index = session_hash["fn_index"]
rank = app.blocks._queue.push(event)
# Continuous events are not put in the queue so that they do not
# occupy the queue's resource as they are expected to run forever
if app.blocks.dependencies[event.fn_index].get("every", 0):
await cancel_tasks([f"{event.session_hash}_{event.fn_index}"])
await app.blocks._queue.reset_iterators(
event.session_hash, event.fn_index
)
task = run_coro_in_background(
app.blocks._queue.process_events, [event], False
)
set_task_name(task, event.session_hash, event.fn_index, batch=False)
else:
rank = app.blocks._queue.push(event)
if rank is None:
await app.blocks._queue.send_message(event, {"msg": "queue_full"})
await event.disconnect()
return
estimation = app.blocks._queue.get_estimation()
await app.blocks._queue.send_estimation(event, estimation, rank)
if rank is None:
await app.blocks._queue.send_message(event, {"msg": "queue_full"})
await event.disconnect()
return
estimation = app.blocks._queue.get_estimation()
await app.blocks._queue.send_estimation(event, estimation, rank)
while True:
await asyncio.sleep(60)
if websocket.application_state == WebSocketState.DISCONNECTED:

View File

@ -192,6 +192,7 @@ XRAY_CONFIG = {
"batch": False,
"max_batch_size": 4,
"cancels": [],
"every": None,
},
{
"targets": [39],
@ -207,6 +208,7 @@ XRAY_CONFIG = {
"batch": False,
"max_batch_size": 4,
"cancels": [],
"every": None,
},
{
"targets": [],
@ -222,6 +224,7 @@ XRAY_CONFIG = {
"batch": False,
"max_batch_size": 4,
"cancels": [],
"every": None,
},
],
}
@ -421,6 +424,7 @@ XRAY_CONFIG_DIFF_IDS = {
"batch": False,
"max_batch_size": 4,
"cancels": [],
"every": None,
},
{
"targets": [933],
@ -436,6 +440,7 @@ XRAY_CONFIG_DIFF_IDS = {
"batch": False,
"max_batch_size": 4,
"cancels": [],
"every": None,
},
{
"targets": [],
@ -451,6 +456,7 @@ XRAY_CONFIG_DIFF_IDS = {
"batch": False,
"max_batch_size": 4,
"cancels": [],
"every": None,
},
],
}

View File

@ -11,6 +11,7 @@ import os
import pkgutil
import random
import sys
import time
import warnings
from contextlib import contextmanager
from distutils.version import StrictVersion
@ -689,6 +690,55 @@ def is_update(val):
return type(val) is dict and "update" in val.get("__type__", "")
def get_continuous_fn(fn, every):
def continuous_fn(*args):
while True:
output = fn(*args)
yield output
time.sleep(every)
return continuous_fn
async def cancel_tasks(task_ids: List[str]):
if sys.version_info < (3, 8):
return None
matching_tasks = [
task for task in asyncio.all_tasks() if task.get_name() in task_ids
]
for task in matching_tasks:
task.cancel()
await asyncio.gather(*matching_tasks, return_exceptions=True)
def set_task_name(task, session_hash: str, fn_index: int, batch: bool):
if sys.version_info >= (3, 8) and not (
batch
): # You shouldn't be able to cancel a task if it's part of a batch
task.set_name(f"{session_hash}_{fn_index}")
def get_cancel_function(
dependencies: List[Dict[str, Any]]
) -> Tuple[Callable, List[int]]:
fn_to_comp = {}
for dep in dependencies:
fn_index = next(
i for i, d in enumerate(Context.root_block.dependencies) if d == dep
)
fn_to_comp[fn_index] = [Context.root_block.blocks[o] for o in dep["outputs"]]
async def cancel(session_hash: str) -> None:
task_ids = set([f"{session_hash}_{fn}" for fn in fn_to_comp])
await cancel_tasks(task_ids)
return (
cancel,
list(fn_to_comp.keys()),
)
def check_function_inputs_match(fn: Callable, inputs: List, inputs_as_dict: bool):
"""
Checks if the input component set matches the function
@ -721,32 +771,3 @@ def check_function_inputs_match(fn: Callable, inputs: List, inputs_as_dict: bool
warnings.warn(
f"Expected maximum {max_args} arguments for function {fn}, received {arg_count}."
)
def get_cancel_function(
dependencies: List[Dict[str, Any]]
) -> Tuple[Callable, List[int]]:
fn_to_comp = {}
for dep in dependencies:
fn_index = next(
i for i, d in enumerate(Context.root_block.dependencies) if d == dep
)
fn_to_comp[fn_index] = [Context.root_block.blocks[o] for o in dep["outputs"]]
async def cancel(session_hash: str) -> None:
if sys.version_info < (3, 8):
return None
task_ids = set([f"{session_hash}_{fn}" for fn in fn_to_comp])
matching_tasks = [
task for task in asyncio.all_tasks() if task.get_name() in task_ids
]
for task in matching_tasks:
task.cancel()
await asyncio.gather(*matching_tasks, return_exceptions=True)
return (
cancel,
list(fn_to_comp.keys()),
)

View File

@ -30,6 +30,17 @@ $demo_blocks_hello
Instead of being triggered by a click, the `welcome` function is triggered by typing in the Textbox `inp`. This is due to the `change()` event listener. Different Components support different event listeners. For example, the `Video` Commponent supports a `play()` event listener, triggered when a user presses play. See the [Docs](http://gradio.app/docs) for the event listeners for each Component.
## Running Events Continuously
You can run events on a fixed schedule using the `every` parameter of the event listener. This will run the event
`every` number of seconds. Note that this does not take into account the runtime of the event itself. So a function
with a 1 second runtime running with `every=5`, would actually run every 6 seconds.
Here is an example of a sine curve that updates every second!
$code_sine_curve
$demo_sine_curve
## Multiple Data Flows
A Blocks app is not limited to a single data flow the way Interfaces are. Take a look at the demo below:

View File

@ -1,5 +1,6 @@
import asyncio
import io
import json
import os
import random
import sys
@ -14,10 +15,10 @@ from unittest.mock import patch
import mlflow
import pytest
import wandb
import websockets
from fastapi.testclient import TestClient
import gradio as gr
import gradio.events
import gradio.utils
from gradio.exceptions import DuplicateBlockError
from gradio.routes import PredictBody
from gradio.test_data.blocks_configs import XRAY_CONFIG
@ -839,6 +840,56 @@ class TestCancel:
demo.queue().launch(prevent_thread_lock=True)
class TestEvery:
def test_raise_exception_if_parameters_invalid(self):
with pytest.raises(
ValueError, match="Cannot run change event in a batch and every 0.5 seconds"
):
with gr.Blocks():
num = gr.Number()
num.change(
lambda s: s + 1, inputs=[num], outputs=[num], every=0.5, batch=True
)
with pytest.raises(
ValueError, match="Parameter every must be positive or None"
):
with gr.Blocks():
num = gr.Number()
num.change(lambda s: s + 1, inputs=[num], outputs=[num], every=-0.1)
@pytest.mark.asyncio
async def test_every_does_not_block_queue(self):
with gr.Blocks() as demo:
num = gr.Number(value=0)
name = gr.Textbox()
greeting = gr.Textbox()
button = gr.Button(value="Greet")
name.change(lambda n: n + random.random(), num, num, every=0.5)
button.click(lambda s: f"Hello, {s}!", name, greeting)
app, _, _ = demo.queue(max_size=1).launch(prevent_thread_lock=True)
client = TestClient(app)
async with websockets.connect(
f"{demo.local_url.replace('http', 'ws')}queue/join"
) as ws:
completed = False
while not completed:
msg = json.loads(await ws.recv())
if msg["msg"] == "send_data":
await ws.send(json.dumps({"data": [0], "fn_index": 0}))
if msg["msg"] == "send_hash":
await ws.send(json.dumps({"fn_index": 0, "session_hash": "shdce"}))
status = client.get("/queue/status")
# If the continuous event got pushed to the queue, the size would be nonzero
# asserting false will terminate the test
if status.json()["queue_size"] != 0:
assert False
else:
break
def test_queue_enabled_for_fn():
with gr.Blocks() as demo:
input = gr.Textbox()