Adds a monitoring dashboard to Gradio apps that can be used to view usage (#8478)

* changes

* add changeset

* changes

* changes

* changes

* add changeset

* changes

---------

Co-authored-by: Ali Abid <aliabid94@gmail.com>
Co-authored-by: gradio-pr-bot <gradio-pr-bot@users.noreply.github.com>
Co-authored-by: Abubakar Abid <abubakar@huggingface.co>
This commit is contained in:
aliabid94 2024-06-06 08:31:22 -07:00 committed by GitHub
parent f9406b46e3
commit 73e11087a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 155 additions and 2 deletions

View File

@ -0,0 +1,5 @@
---
"gradio": minor
---
feat:Adds a monitoring dashboard to Gradio apps that can be used to view usage

View File

@ -0,0 +1,91 @@
import random
import time
import pandas as pd
import gradio as gr
data = {"data": {}}
with gr.Blocks() as demo:
with gr.Row():
selected_function = gr.Dropdown(
["All"],
value="All",
label="Endpoint",
info="Select the function to see analytics for, or 'All' for aggregate.",
scale=2,
)
demo.load(
lambda: gr.Dropdown(
choices=["All"]
+ list({row["function"] for row in data["data"].values()}) # type: ignore
),
None,
selected_function,
)
timespan = gr.Dropdown(
["All Time", "24 hours", "1 hours", "10 minutes"],
value="All Time",
label="Timespan",
info="Duration to see data for.",
)
with gr.Group():
with gr.Row():
unique_users = gr.Label(label="Unique Users")
unique_requests = gr.Label(label="Unique Requests")
process_time = gr.Label(label="Avg Process Time")
plot = gr.BarPlot(
x="time",
y="count",
color="status",
title="Requests over Time",
y_title="Requests",
width=600,
)
@gr.on(
[demo.load, selected_function.change, timespan.change],
inputs=[selected_function, timespan],
outputs=[unique_users, unique_requests, process_time, plot],
)
def load_dfs(function, timespan):
df = pd.DataFrame(data["data"].values())
if df.empty:
return 0, 0, 0, gr.skip()
df["time"] = pd.to_datetime(df["time"], unit="s")
df_filtered = df if function == "All" else df[df["function"] == function]
if timespan != "All Time":
df_filtered = df_filtered[
df_filtered["time"] > pd.Timestamp.now() - pd.Timedelta(timespan)
]
df_filtered["time"] = df_filtered["time"].dt.floor("min")
plot = df_filtered.groupby(["time", "status"]).size().reset_index(name="count") # type: ignore
mean_process_time_for_success = df_filtered[df_filtered["status"] == "success"][
"process_time"
].mean()
return (
df_filtered["session_hash"].nunique(),
df_filtered.shape[0],
round(mean_process_time_for_success, 2),
plot,
)
if __name__ == "__main__":
data["data"] = {
random.randint(0, 1000000): {
"time": time.time() - random.randint(0, 60 * 60 * 24 * 3),
"status": random.choice(
["success", "success", "failure", "pending", "queued"]
),
"function": random.choice(["predict", "chat", "chat"]),
"process_time": random.randint(0, 10),
"session_hash": str(random.randint(0, 4)),
}
for r in range(random.randint(100, 200))
}
demo.launch()

View File

@ -2178,6 +2178,7 @@ Received outputs:
auth_dependency: Callable[[fastapi.Request], str | None] | None = None,
max_file_size: str | int | None = None,
_frontend: bool = True,
enable_monitoring: bool = False,
) -> tuple[FastAPI, str, str]:
"""
Launches a simple web server that serves the demo. Can also be used to create a
@ -2397,6 +2398,11 @@ Received outputs:
else:
self.share = share
if enable_monitoring:
print(
f"Monitoring URL: {self.local_url}monitoring/{self.app.analytics_key}"
)
# If running in a colab or not able to access localhost,
# a shareable link must be created.
if (

View File

@ -114,6 +114,7 @@ class Queue:
self.default_concurrency_limit = self._resolve_concurrency_limit(
default_concurrency_limit
)
self.event_analytics: dict[str, dict[str, float | str | None]] = {}
def start(self):
self.active_jobs = [None] * self.max_thread_count
@ -227,6 +228,13 @@ class Queue:
"Event not found in queue. If you are deploying this Gradio app with multiple replicas, please enable stickiness to ensure that all requests from the same user are routed to the same instance."
) from e
event_queue.queue.append(event)
self.event_analytics[event._id] = {
"time": time.time(),
"status": "queued",
"process_time": None,
"function": fn.api_name,
"session_hash": body.session_hash,
}
self.broadcast_estimations(event.concurrency_id, len(event_queue.queue) - 1)
@ -294,6 +302,8 @@ class Queue:
event_queue.current_concurrency += 1
start_time = time.time()
event_queue.start_times_per_fn[events[0].fn].add(start_time)
for event in events:
self.event_analytics[event._id]["status"] = "processing"
process_event_task = run_coro_in_background(
self.process_events, events, batch, start_time
)
@ -470,6 +480,7 @@ class Queue:
) -> None:
awake_events: list[Event] = []
fn = events[0].fn
success = False
try:
for event in events:
if event.alive:
@ -587,16 +598,20 @@ class Queue:
for e, event in enumerate(awake_events):
if batch and "data" in output:
output["data"] = list(zip(*response.get("data")))[e]
success = response is not None
self.send_message(
event,
ProcessCompletedMessage(
output=output,
success=response is not None,
success=success,
),
)
end_time = time.time()
if response is not None:
self.process_time_per_fn[events[0].fn].add(end_time - begin_time)
duration = end_time - begin_time
self.process_time_per_fn[events[0].fn].add(duration)
for event in events:
self.event_analytics[event._id]["process_time"] = duration
except Exception as e:
traceback.print_exc()
finally:
@ -620,6 +635,13 @@ class Queue:
# to start "from scratch"
await self.reset_iterators(event._id)
if event in awake_events:
self.event_analytics[event._id]["status"] = (
"success" if success else "failed"
)
else:
self.event_analytics[event._id]["status"] = "cancelled"
async def reset_iterators(self, event_id: str):
# Do the same thing as the /reset route
app = self.server_app

View File

@ -164,6 +164,8 @@ class App(FastAPI):
):
self.tokens = {}
self.auth = None
self.analytics_key = secrets.token_urlsafe(16)
self.analytics_enabled = False
self.blocks: gradio.Blocks | None = None
self.state_holder = StateHolder()
self.iterators: dict[str, AsyncIterator] = {}
@ -1165,6 +1167,32 @@ class App(FastAPI):
else:
return "User-agent: *\nDisallow: "
@app.get("/monitoring")
async def analytics_login():
print(
f"Monitoring URL: {app.get_blocks().local_url}monitoring/{app.analytics_key}"
)
return HTMLResponse("See console for monitoring URL.")
@app.get("/monitoring/{key}")
async def analytics_dashboard(key: str):
if key == app.analytics_key:
analytics_url = f"/monitoring/{app.analytics_key}/dashboard"
if not app.analytics_enabled:
from gradio.analytics_dashboard import data
from gradio.analytics_dashboard import demo as dashboard
mount_gradio_app(app, dashboard, path=analytics_url)
dashboard._queue.start()
analytics = app.get_blocks()._queue.event_analytics
data["data"] = analytics
app.analytics_enabled = True
return RedirectResponse(
url=analytics_url, status_code=status.HTTP_302_FOUND
)
else:
raise HTTPException(status_code=403, detail="Invalid key.")
return app

View File

@ -13,3 +13,4 @@ This limits the number of requests processed for this event listener at a single
See the [docs on queueing](/docs/gradio/interface#interface-queue) for more details on configuring the queuing parameters.
You can see analytics on the number and status of all requests processed by the queue by visiting the `/monitoring` endpoint of your app. This endpoint will print a secret URL to your console that links to the full analytics dashboard.