mirror of
https://github.com/jupyter/notebook.git
synced 2025-01-24 12:05:22 +08:00
Merge pull request #5372 from kevin-bates/cull-terminals
Add ability to cull terminals and track last activity
This commit is contained in:
commit
3ec00ad2ad
@ -91,6 +91,7 @@ from .gateway.managers import GatewayKernelManager, GatewayKernelSpecManager, Ga
|
||||
from .auth.login import LoginHandler
|
||||
from .auth.logout import LogoutHandler
|
||||
from .base.handlers import FileFindHandler
|
||||
from .terminal import TerminalManager
|
||||
|
||||
from traitlets.config import Config
|
||||
from traitlets.config.application import catch_config_error, boolean_flag
|
||||
@ -659,7 +660,7 @@ class NotebookApp(JupyterApp):
|
||||
|
||||
classes = [
|
||||
KernelManager, Session, MappingKernelManager, KernelSpecManager,
|
||||
ContentsManager, FileContentsManager, NotebookNotary,
|
||||
ContentsManager, FileContentsManager, NotebookNotary, TerminalManager,
|
||||
GatewayKernelManager, GatewayKernelSpecManager, GatewaySessionManager, GatewayClient,
|
||||
]
|
||||
flags = Dict(flags)
|
||||
@ -1757,7 +1758,7 @@ class NotebookApp(JupyterApp):
|
||||
|
||||
try:
|
||||
from .terminal import initialize
|
||||
initialize(self.web_app, self.notebook_dir, self.connection_url, self.terminado_settings)
|
||||
initialize(nb_app=self)
|
||||
self.web_app.settings['terminals_available'] = True
|
||||
except ImportError as e:
|
||||
self.log.warning(_("Terminals not available (error was %s)"), e)
|
||||
@ -1993,6 +1994,22 @@ class NotebookApp(JupyterApp):
|
||||
self.log.info(kernel_msg % n_kernels)
|
||||
run_sync(self.kernel_manager.shutdown_all())
|
||||
|
||||
def cleanup_terminals(self):
|
||||
"""Shutdown all terminals.
|
||||
|
||||
The terminals will shutdown themselves when this process no longer exists,
|
||||
but explicit shutdown allows the TerminalManager to cleanup.
|
||||
"""
|
||||
try:
|
||||
terminal_manager = self.web_app.settings['terminal_manager']
|
||||
except KeyError:
|
||||
return # Terminals not enabled
|
||||
|
||||
n_terminals = len(terminal_manager.list())
|
||||
terminal_msg = trans.ngettext('Shutting down %d terminal', 'Shutting down %d terminals', n_terminals)
|
||||
self.log.info(terminal_msg % n_terminals)
|
||||
run_sync(terminal_manager.terminate_all())
|
||||
|
||||
def notebook_info(self, kernel_count=True):
|
||||
"Return the current working directory and the server url information"
|
||||
info = self.contents_manager.info_string() + "\n"
|
||||
@ -2183,6 +2200,7 @@ class NotebookApp(JupyterApp):
|
||||
self.remove_server_info_file()
|
||||
self.remove_browser_open_file()
|
||||
self.cleanup_kernels()
|
||||
self.cleanup_terminals()
|
||||
|
||||
def stop(self):
|
||||
def _stop():
|
||||
|
@ -563,7 +563,7 @@ paths:
|
||||
schema:
|
||||
type: array
|
||||
items:
|
||||
$ref: '#/definitions/Terminal_ID'
|
||||
$ref: '#/definitions/Terminal'
|
||||
403:
|
||||
description: Forbidden to access
|
||||
404:
|
||||
@ -577,7 +577,7 @@ paths:
|
||||
200:
|
||||
description: Succesfully created a new terminal
|
||||
schema:
|
||||
$ref: '#/definitions/Terminal_ID'
|
||||
$ref: '#/definitions/Terminal'
|
||||
403:
|
||||
description: Forbidden to access
|
||||
404:
|
||||
@ -594,7 +594,7 @@ paths:
|
||||
200:
|
||||
description: Terminal session with given id
|
||||
schema:
|
||||
$ref: '#/definitions/Terminal_ID'
|
||||
$ref: '#/definitions/Terminal'
|
||||
403:
|
||||
description: Forbidden to access
|
||||
404:
|
||||
@ -840,12 +840,18 @@ definitions:
|
||||
type: string
|
||||
description: Last modified timestamp
|
||||
format: dateTime
|
||||
Terminal_ID:
|
||||
description: A Terminal_ID object
|
||||
Terminal:
|
||||
description: A Terminal object
|
||||
type: object
|
||||
required:
|
||||
- name
|
||||
properties:
|
||||
name:
|
||||
type: string
|
||||
description: name of terminal ID
|
||||
description: name of terminal
|
||||
last_activity:
|
||||
type: string
|
||||
description: |
|
||||
ISO 8601 timestamp for the last-seen activity on this terminal. Use
|
||||
this to identify which terminals have been inactive since a given time.
|
||||
Timestamps will be UTC, indicated 'Z' suffix.
|
||||
|
@ -7,31 +7,33 @@ if not check_version(terminado.__version__, '0.8.1'):
|
||||
raise ImportError("terminado >= 0.8.1 required, found %s" % terminado.__version__)
|
||||
|
||||
from ipython_genutils.py3compat import which
|
||||
from terminado import NamedTermManager
|
||||
from tornado.log import app_log
|
||||
from notebook.utils import url_path_join as ujoin
|
||||
from .terminalmanager import TerminalManager
|
||||
from .handlers import TerminalHandler, TermSocket
|
||||
from . import api_handlers
|
||||
|
||||
def initialize(webapp, notebook_dir, connection_url, settings):
|
||||
|
||||
def initialize(nb_app):
|
||||
if os.name == 'nt':
|
||||
default_shell = 'powershell.exe'
|
||||
else:
|
||||
default_shell = which('sh')
|
||||
shell = settings.get('shell_command',
|
||||
[os.environ.get('SHELL') or default_shell]
|
||||
)
|
||||
shell = nb_app.terminado_settings.get('shell_command',
|
||||
[os.environ.get('SHELL') or default_shell]
|
||||
)
|
||||
# Enable login mode - to automatically source the /etc/profile script
|
||||
if os.name != 'nt':
|
||||
shell.append('-l')
|
||||
terminal_manager = webapp.settings['terminal_manager'] = NamedTermManager(
|
||||
terminal_manager = nb_app.web_app.settings['terminal_manager'] = TerminalManager(
|
||||
shell_command=shell,
|
||||
extra_env={'JUPYTER_SERVER_ROOT': notebook_dir,
|
||||
'JUPYTER_SERVER_URL': connection_url,
|
||||
extra_env={'JUPYTER_SERVER_ROOT': nb_app.notebook_dir,
|
||||
'JUPYTER_SERVER_URL': nb_app.connection_url,
|
||||
},
|
||||
parent=nb_app,
|
||||
)
|
||||
terminal_manager.log = app_log
|
||||
base_url = webapp.settings['base_url']
|
||||
terminal_manager.log = nb_app.log
|
||||
base_url = nb_app.web_app.settings['base_url']
|
||||
handlers = [
|
||||
(ujoin(base_url, r"/terminals/(\w+)"), TerminalHandler),
|
||||
(ujoin(base_url, r"/terminals/websocket/(\w+)"), TermSocket,
|
||||
@ -39,4 +41,4 @@ def initialize(webapp, notebook_dir, connection_url, settings):
|
||||
(ujoin(base_url, r"/api/terminals"), api_handlers.TerminalRootHandler),
|
||||
(ujoin(base_url, r"/api/terminals/(\w+)"), api_handlers.TerminalHandler),
|
||||
]
|
||||
webapp.add_handlers(".*$", handlers)
|
||||
nb_app.web_app.add_handlers(".*$", handlers)
|
||||
|
@ -1,29 +1,19 @@
|
||||
import json
|
||||
from tornado import web, gen
|
||||
from ..base.handlers import APIHandler
|
||||
from ..prometheus.metrics import TERMINAL_CURRENTLY_RUNNING_TOTAL
|
||||
|
||||
|
||||
class TerminalRootHandler(APIHandler):
|
||||
@web.authenticated
|
||||
def get(self):
|
||||
tm = self.terminal_manager
|
||||
terms = [{'name': name} for name in tm.terminals]
|
||||
self.finish(json.dumps(terms))
|
||||
|
||||
# Update the metric below to the length of the list 'terms'
|
||||
TERMINAL_CURRENTLY_RUNNING_TOTAL.set(
|
||||
len(terms)
|
||||
)
|
||||
models = self.terminal_manager.list()
|
||||
self.finish(json.dumps(models))
|
||||
|
||||
@web.authenticated
|
||||
def post(self):
|
||||
"""POST /terminals creates a new terminal and redirects to it"""
|
||||
name, _ = self.terminal_manager.new_named_terminal()
|
||||
self.finish(json.dumps({'name': name}))
|
||||
|
||||
# Increase the metric by one because a new terminal was created
|
||||
TERMINAL_CURRENTLY_RUNNING_TOTAL.inc()
|
||||
model = self.terminal_manager.create()
|
||||
self.finish(json.dumps(model))
|
||||
|
||||
|
||||
class TerminalHandler(APIHandler):
|
||||
@ -31,24 +21,12 @@ class TerminalHandler(APIHandler):
|
||||
|
||||
@web.authenticated
|
||||
def get(self, name):
|
||||
tm = self.terminal_manager
|
||||
if name in tm.terminals:
|
||||
self.finish(json.dumps({'name': name}))
|
||||
else:
|
||||
raise web.HTTPError(404, "Terminal not found: %r" % name)
|
||||
model = self.terminal_manager.get(name)
|
||||
self.finish(json.dumps(model))
|
||||
|
||||
@web.authenticated
|
||||
@gen.coroutine
|
||||
def delete(self, name):
|
||||
tm = self.terminal_manager
|
||||
if name in tm.terminals:
|
||||
yield tm.terminate(name, force=True)
|
||||
self.set_status(204)
|
||||
self.finish()
|
||||
|
||||
# Decrease the metric below by one
|
||||
# because a terminal has been shutdown
|
||||
TERMINAL_CURRENTLY_RUNNING_TOTAL.dec()
|
||||
|
||||
else:
|
||||
raise web.HTTPError(404, "Terminal not found: %r" % name)
|
||||
yield self.terminal_manager.terminate(name, force=True)
|
||||
self.set_status(204)
|
||||
self.finish()
|
||||
|
@ -35,8 +35,14 @@ class TermSocket(WebSocketMixin, IPythonHandler, terminado.TermSocket):
|
||||
|
||||
def on_message(self, message):
|
||||
super(TermSocket, self).on_message(message)
|
||||
self.application.settings['terminal_last_activity'] = utcnow()
|
||||
self._update_activity()
|
||||
|
||||
def write_message(self, message, binary=False):
|
||||
super(TermSocket, self).write_message(message, binary=binary)
|
||||
self._update_activity()
|
||||
|
||||
def _update_activity(self):
|
||||
self.application.settings['terminal_last_activity'] = utcnow()
|
||||
# terminal may not be around on deletion/cull
|
||||
if self.term_name in self.terminal_manager.terminals:
|
||||
self.terminal_manager.terminals[self.term_name].last_activity = utcnow()
|
||||
|
151
notebook/terminal/terminalmanager.py
Normal file
151
notebook/terminal/terminalmanager.py
Normal file
@ -0,0 +1,151 @@
|
||||
"""A MultiTerminalManager for use in the notebook webserver
|
||||
- raises HTTPErrors
|
||||
- creates REST API models
|
||||
"""
|
||||
|
||||
# Copyright (c) Jupyter Development Team.
|
||||
# Distributed under the terms of the Modified BSD License.
|
||||
|
||||
import warnings
|
||||
|
||||
from datetime import timedelta
|
||||
from notebook._tz import utcnow, isoformat
|
||||
from terminado import NamedTermManager
|
||||
from tornado import web
|
||||
from tornado.ioloop import IOLoop, PeriodicCallback
|
||||
from traitlets import Integer, validate
|
||||
from traitlets.config import LoggingConfigurable
|
||||
from ..prometheus.metrics import TERMINAL_CURRENTLY_RUNNING_TOTAL
|
||||
|
||||
|
||||
class TerminalManager(LoggingConfigurable, NamedTermManager):
|
||||
""" """
|
||||
|
||||
_culler_callback = None
|
||||
|
||||
_initialized_culler = False
|
||||
|
||||
cull_inactive_timeout = Integer(0, config=True,
|
||||
help="""Timeout (in seconds) in which a terminal has been inactive and ready to be culled.
|
||||
Values of 0 or lower disable culling."""
|
||||
)
|
||||
|
||||
cull_interval_default = 300 # 5 minutes
|
||||
cull_interval = Integer(cull_interval_default, config=True,
|
||||
help="""The interval (in seconds) on which to check for terminals exceeding the inactive timeout value."""
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Methods for managing terminals
|
||||
# -------------------------------------------------------------------------
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(TerminalManager, self).__init__(*args, **kwargs)
|
||||
|
||||
def create(self):
|
||||
"""Create a new terminal."""
|
||||
name, term = self.new_named_terminal()
|
||||
# Monkey-patch last-activity, similar to kernels. Should we need
|
||||
# more functionality per terminal, we can look into possible sub-
|
||||
# classing or containment then.
|
||||
term.last_activity = utcnow()
|
||||
model = self.get_terminal_model(name)
|
||||
# Increase the metric by one because a new terminal was created
|
||||
TERMINAL_CURRENTLY_RUNNING_TOTAL.inc()
|
||||
# Ensure culler is initialized
|
||||
self._initialize_culler()
|
||||
return model
|
||||
|
||||
def get(self, name):
|
||||
"""Get terminal 'name'."""
|
||||
model = self.get_terminal_model(name)
|
||||
return model
|
||||
|
||||
def list(self):
|
||||
"""Get a list of all running terminals."""
|
||||
models = [self.get_terminal_model(name) for name in self.terminals]
|
||||
|
||||
# Update the metric below to the length of the list 'terms'
|
||||
TERMINAL_CURRENTLY_RUNNING_TOTAL.set(
|
||||
len(models)
|
||||
)
|
||||
return models
|
||||
|
||||
async def terminate(self, name, force=False):
|
||||
"""Terminate terminal 'name'."""
|
||||
self._check_terminal(name)
|
||||
await super(TerminalManager, self).terminate(name, force=force)
|
||||
|
||||
# Decrease the metric below by one
|
||||
# because a terminal has been shutdown
|
||||
TERMINAL_CURRENTLY_RUNNING_TOTAL.dec()
|
||||
|
||||
async def terminate_all(self):
|
||||
"""Terminate all terminals."""
|
||||
terms = [name for name in self.terminals]
|
||||
for term in terms:
|
||||
await self.terminate(term, force=True)
|
||||
|
||||
def get_terminal_model(self, name):
|
||||
"""Return a JSON-safe dict representing a terminal.
|
||||
For use in representing terminals in the JSON APIs.
|
||||
"""
|
||||
self._check_terminal(name)
|
||||
term = self.terminals[name]
|
||||
model = {
|
||||
"name": name,
|
||||
"last_activity": isoformat(term.last_activity),
|
||||
}
|
||||
return model
|
||||
|
||||
def _check_terminal(self, name):
|
||||
"""Check a that terminal 'name' exists and raise 404 if not."""
|
||||
if name not in self.terminals:
|
||||
raise web.HTTPError(404, u'Terminal not found: %s' % name)
|
||||
|
||||
def _initialize_culler(self):
|
||||
"""Start culler if 'cull_inactive_timeout' is greater than zero.
|
||||
Regardless of that value, set flag that we've been here.
|
||||
"""
|
||||
if not self._initialized_culler and self.cull_inactive_timeout > 0:
|
||||
if self._culler_callback is None:
|
||||
loop = IOLoop.current()
|
||||
if self.cull_interval <= 0: # handle case where user set invalid value
|
||||
self.log.warning("Invalid value for 'cull_interval' detected (%s) - using default value (%s).",
|
||||
self.cull_interval, self.cull_interval_default)
|
||||
self.cull_interval = self.cull_interval_default
|
||||
self._culler_callback = PeriodicCallback(
|
||||
self._cull_terminals, 1000 * self.cull_interval)
|
||||
self.log.info("Culling terminals with inactivity > %s seconds at %s second intervals ...",
|
||||
self.cull_inactive_timeout, self.cull_interval)
|
||||
self._culler_callback.start()
|
||||
|
||||
self._initialized_culler = True
|
||||
|
||||
async def _cull_terminals(self):
|
||||
self.log.debug("Polling every %s seconds for terminals inactive for > %s seconds...",
|
||||
self.cull_interval, self.cull_inactive_timeout)
|
||||
# Create a separate list of terminals to avoid conflicting updates while iterating
|
||||
for name in list(self.terminals):
|
||||
try:
|
||||
await self._cull_inactive_terminal(name)
|
||||
except Exception as e:
|
||||
self.log.exception("The following exception was encountered while checking the "
|
||||
"activity of terminal {}: {}".format(name, e))
|
||||
|
||||
async def _cull_inactive_terminal(self, name):
|
||||
try:
|
||||
term = self.terminals[name]
|
||||
except KeyError:
|
||||
return # KeyErrors are somewhat expected since the terminal can be terminated as the culling check is made.
|
||||
|
||||
self.log.debug("name=%s, last_activity=%s", name, term.last_activity)
|
||||
if hasattr(term, 'last_activity'):
|
||||
dt_now = utcnow()
|
||||
dt_inactive = dt_now - term.last_activity
|
||||
# Compute idle properties
|
||||
is_time = dt_inactive > timedelta(seconds=self.cull_inactive_timeout)
|
||||
# Cull the kernel if all three criteria are met
|
||||
if (is_time):
|
||||
inactivity = int(dt_inactive.total_seconds())
|
||||
self.log.warning("Culling terminal '%s' due to %s seconds of inactivity.", name, inactivity)
|
||||
await self.terminate(name, force=True)
|
0
notebook/terminal/tests/__init__.py
Normal file
0
notebook/terminal/tests/__init__.py
Normal file
169
notebook/terminal/tests/test_terminals_api.py
Normal file
169
notebook/terminal/tests/test_terminals_api.py
Normal file
@ -0,0 +1,169 @@
|
||||
"""Test the terminal service API."""
|
||||
|
||||
import time
|
||||
|
||||
from requests import HTTPError
|
||||
from traitlets.config import Config
|
||||
|
||||
from notebook.utils import url_path_join
|
||||
from notebook.tests.launchnotebook import NotebookTestBase, assert_http_error
|
||||
|
||||
|
||||
class TerminalAPI(object):
|
||||
"""Wrapper for terminal REST API requests"""
|
||||
def __init__(self, request, base_url, headers):
|
||||
self.request = request
|
||||
self.base_url = base_url
|
||||
self.headers = headers
|
||||
|
||||
def _req(self, verb, path, body=None):
|
||||
response = self.request(verb, path, data=body)
|
||||
|
||||
if 400 <= response.status_code < 600:
|
||||
try:
|
||||
response.reason = response.json()['message']
|
||||
except:
|
||||
pass
|
||||
response.raise_for_status()
|
||||
|
||||
return response
|
||||
|
||||
def list(self):
|
||||
return self._req('GET', 'api/terminals')
|
||||
|
||||
def get(self, name):
|
||||
return self._req('GET', url_path_join('api/terminals', name))
|
||||
|
||||
def start(self):
|
||||
return self._req('POST', 'api/terminals')
|
||||
|
||||
def shutdown(self, name):
|
||||
return self._req('DELETE', url_path_join('api/terminals', name))
|
||||
|
||||
|
||||
class TerminalAPITest(NotebookTestBase):
|
||||
"""Test the terminals web service API"""
|
||||
def setUp(self):
|
||||
self.term_api = TerminalAPI(self.request,
|
||||
base_url=self.base_url(),
|
||||
headers=self.auth_headers(),
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
for k in self.term_api.list().json():
|
||||
self.term_api.shutdown(k['name'])
|
||||
|
||||
def test_no_terminals(self):
|
||||
# Make sure there are no terminals running at the start
|
||||
terminals = self.term_api.list().json()
|
||||
self.assertEqual(terminals, [])
|
||||
|
||||
def test_create_terminal(self):
|
||||
# POST request
|
||||
r = self.term_api._req('POST', 'api/terminals')
|
||||
term1 = r.json()
|
||||
self.assertEqual(r.status_code, 200)
|
||||
self.assertIsInstance(term1, dict)
|
||||
|
||||
def test_terminal_root_handler(self):
|
||||
# POST request
|
||||
r = self.term_api.start()
|
||||
term1 = r.json()
|
||||
self.assertEqual(r.status_code, 200)
|
||||
self.assertIsInstance(term1, dict)
|
||||
|
||||
# GET request
|
||||
r = self.term_api.list()
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert isinstance(r.json(), list)
|
||||
self.assertEqual(r.json()[0]['name'], term1['name'])
|
||||
|
||||
# create another terminal and check that they both are added to the
|
||||
# list of terminals from a GET request
|
||||
term2 = self.term_api.start().json()
|
||||
assert isinstance(term2, dict)
|
||||
r = self.term_api.list()
|
||||
terminals = r.json()
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert isinstance(terminals, list)
|
||||
self.assertEqual(len(terminals), 2)
|
||||
|
||||
def test_terminal_handler(self):
|
||||
# GET terminal with given name
|
||||
term = self.term_api.start().json()['name']
|
||||
r = self.term_api.get(term)
|
||||
term1 = r.json()
|
||||
self.assertEqual(r.status_code, 200)
|
||||
assert isinstance(term1, dict)
|
||||
self.assertIn('name', term1)
|
||||
self.assertEqual(term1['name'], term)
|
||||
|
||||
# Request a bad terminal id and check that a JSON
|
||||
# message is returned!
|
||||
bad_term = 'nonExistentTerm'
|
||||
with assert_http_error(404, 'Terminal not found: ' + bad_term):
|
||||
self.term_api.get(bad_term)
|
||||
|
||||
# DELETE terminal with name
|
||||
r = self.term_api.shutdown(term)
|
||||
self.assertEqual(r.status_code, 204)
|
||||
terminals = self.term_api.list().json()
|
||||
self.assertEqual(terminals, [])
|
||||
|
||||
# Request to delete a non-existent terminal name
|
||||
bad_term = 'nonExistentTerm'
|
||||
with assert_http_error(404, 'Terminal not found: ' + bad_term):
|
||||
self.term_api.shutdown(bad_term)
|
||||
|
||||
|
||||
class TerminalCullingTest(NotebookTestBase):
|
||||
|
||||
# Configure culling
|
||||
config = Config({
|
||||
'NotebookApp': {
|
||||
'TerminalManager': {
|
||||
'cull_interval': 3,
|
||||
'cull_inactive_timeout': 2
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
def setUp(self):
|
||||
self.term_api = TerminalAPI(self.request,
|
||||
base_url=self.base_url(),
|
||||
headers=self.auth_headers(),
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
for k in self.term_api.list().json():
|
||||
self.term_api.shutdown(k['name'])
|
||||
|
||||
# Sanity check verifying that the configurable was properly set.
|
||||
def test_config(self):
|
||||
self.assertEqual(self.config.NotebookApp.TerminalManager.cull_inactive_timeout, 2)
|
||||
self.assertEqual(self.config.NotebookApp.TerminalManager.cull_interval, 3)
|
||||
terminal_mgr = self.notebook.web_app.settings['terminal_manager']
|
||||
self.assertEqual(terminal_mgr.cull_inactive_timeout, 2)
|
||||
self.assertEqual(terminal_mgr.cull_interval, 3)
|
||||
|
||||
def test_culling(self):
|
||||
# POST request
|
||||
r = self.term_api.start()
|
||||
self.assertEqual(r.status_code, 200)
|
||||
body = r.json()
|
||||
term1 = body['name']
|
||||
last_activity = body['last_activity']
|
||||
|
||||
culled = False
|
||||
for i in range(10): # Culling should occur in a few seconds
|
||||
try:
|
||||
r = self.term_api.get(term1)
|
||||
except HTTPError as e:
|
||||
self.assertEqual(e.response.status_code, 404)
|
||||
culled = True
|
||||
break
|
||||
else:
|
||||
self.assertEqual(r.status_code, 200)
|
||||
time.sleep(1)
|
||||
|
||||
self.assertTrue(culled)
|
Loading…
Reference in New Issue
Block a user