From 99b0afd16ec724e46f9983bcabc3df986f4de261 Mon Sep 17 00:00:00 2001 From: Kevin Bates Date: Fri, 27 Mar 2020 10:53:32 -0700 Subject: [PATCH] Apply changes per review Add comments and rename self.super to self.pinned_superclass to clarify intent. Add run_sync() util method to clean up shutdown_all() invocation. --- notebook/notebookapp.py | 8 ++--- notebook/services/kernels/kernelmanager.py | 26 +++++++------- notebook/utils.py | 40 ++++++++++++++++++++++ 3 files changed, 55 insertions(+), 19 deletions(-) diff --git a/notebook/notebookapp.py b/notebook/notebookapp.py index 9b7319e71..a8535c35d 100755 --- a/notebook/notebookapp.py +++ b/notebook/notebookapp.py @@ -107,7 +107,7 @@ from jupyter_core.paths import jupyter_runtime_dir, jupyter_path from notebook._sysinfo import get_sys_info from ._tz import utcnow, utcfromtimestamp -from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url +from .utils import url_path_join, check_pid, url_escape, urljoin, pathname2url, run_sync # Check if we can use async kernel management try: @@ -1801,11 +1801,7 @@ class NotebookApp(JupyterApp): n_kernels = len(self.kernel_manager.list_kernel_ids()) kernel_msg = trans.ngettext('Shutting down %d kernel', 'Shutting down %d kernels', n_kernels) self.log.info(kernel_msg % n_kernels) - # If we're using async kernel management, we need to invoke the async method via the event loop. - if isinstance(self.kernel_manager, AsyncMappingKernelManager): - asyncio.get_event_loop().run_until_complete(self.kernel_manager.shutdown_all()) - else: - self.kernel_manager.shutdown_all() + run_sync(self.kernel_manager.shutdown_all()) def notebook_info(self, kernel_count=True): "Return the current working directory and the server url information" diff --git a/notebook/services/kernels/kernelmanager.py b/notebook/services/kernels/kernelmanager.py index 6cb5d20f9..c9c6d18a4 100644 --- a/notebook/services/kernels/kernelmanager.py +++ b/notebook/services/kernels/kernelmanager.py @@ -122,10 +122,6 @@ class MappingKernelManager(MultiKernelManager): last_kernel_activity = Instance(datetime, help="The last activity on any kernel, including shutting down a kernel") - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.last_kernel_activity = utcnow() - allowed_message_types = List(trait=Unicode(), config=True, help="""White list of allowed kernel message types. When the list is empty, all message types are allowed. @@ -137,8 +133,11 @@ class MappingKernelManager(MultiKernelManager): #------------------------------------------------------------------------- def __init__(self, **kwargs): - self.super = MultiKernelManager - self.super.__init__(self, **kwargs) + # Pin the superclass to better control the MRO. This is needed by + # AsyncMappingKernelManager so that it can give priority to methods + # on AsyncMultiKernelManager over this superclass. + self.pinned_superclass = MultiKernelManager + self.pinned_superclass.__init__(self, **kwargs) self.last_kernel_activity = utcnow() def _handle_kernel_died(self, kernel_id): @@ -173,7 +172,7 @@ class MappingKernelManager(MultiKernelManager): if kernel_id is None: if path is not None: kwargs['cwd'] = self.cwd_for_path(path) - kernel_id = await maybe_future(self.super.start_kernel(self, **kwargs)) + kernel_id = await maybe_future(self.pinned_superclass.start_kernel(self, **kwargs)) self._kernel_connections[kernel_id] = 0 self.start_watching_activity(kernel_id) self.log.info("Kernel started: %s" % kernel_id) @@ -302,12 +301,12 @@ class MappingKernelManager(MultiKernelManager): type=self._kernels[kernel_id].kernel_name ).dec() - return self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart) + return self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart) async def restart_kernel(self, kernel_id, now=False): """Restart a kernel by kernel_id""" self._check_kernel_id(kernel_id) - await maybe_future(self.super.restart_kernel(self, kernel_id, now=now)) + await maybe_future(self.pinned_superclass.restart_kernel(self, kernel_id, now=now)) kernel = self.get_kernel(kernel_id) # return a Future that will resolve when the kernel has successfully restarted channel = kernel.connect_shell() @@ -374,7 +373,7 @@ class MappingKernelManager(MultiKernelManager): def list_kernels(self): """Returns a list of kernel_id's of kernels running.""" kernels = [] - kernel_ids = self.super.list_kernel_ids(self) + kernel_ids = self.pinned_superclass.list_kernel_ids(self) for kernel_id in kernel_ids: model = self.kernel_model(kernel_id) kernels.append(model) @@ -485,8 +484,9 @@ class AsyncMappingKernelManager(MappingKernelManager, AsyncMultiKernelManager): return "jupyter_client.ioloop.AsyncIOLoopKernelManager" def __init__(self, **kwargs): - self.super = AsyncMultiKernelManager - self.super.__init__(self, **kwargs) + # Pin the superclass to better control the MRO. + self.pinned_superclass = AsyncMultiKernelManager + self.pinned_superclass.__init__(self, **kwargs) self.last_kernel_activity = utcnow() async def shutdown_kernel(self, kernel_id, now=False, restart=False): @@ -505,4 +505,4 @@ class AsyncMappingKernelManager(MappingKernelManager, AsyncMultiKernelManager): type=self._kernels[kernel_id].kernel_name ).dec() - return await self.super.shutdown_kernel(self, kernel_id, now=now, restart=restart) + return await self.pinned_superclass.shutdown_kernel(self, kernel_id, now=now, restart=restart) diff --git a/notebook/utils.py b/notebook/utils.py index 69f3586f1..9ec10773f 100644 --- a/notebook/utils.py +++ b/notebook/utils.py @@ -327,3 +327,43 @@ def maybe_future(obj): f.set_result(obj) return f + +def run_sync(maybe_async): + """If async, runs maybe_async and blocks until it has executed, + possibly creating an event loop. + If not async, just returns maybe_async as it is the result of something + that has already executed. + Parameters + ---------- + maybe_async : async or non-async object + The object to be executed, if it is async. + Returns + ------- + result : + Whatever the async object returns, or the object itself. + """ + if not inspect.isawaitable(maybe_async): + # that was not something async, just return it + return maybe_async + # it is async, we need to run it in an event loop + + def wrapped(): + create_new_event_loop = False + try: + loop = asyncio.get_event_loop() + except RuntimeError: + create_new_event_loop = True + else: + if loop.is_closed(): + create_new_event_loop = True + if create_new_event_loop: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + result = loop.run_until_complete(maybe_async) + except RuntimeError as e: + if str(e) == 'This event loop is already running': + # just return a Future, hoping that it will be awaited + result = asyncio.ensure_future(maybe_async) + return result + return wrapped()