mirrored logic from https://github.com/jupyter/nb2kg/pull/45 to notebook/gateway

This commit is contained in:
sd 2021-01-05 13:19:22 +02:00
parent 16727777f5
commit ba0f490c94
2 changed files with 45 additions and 4 deletions

View File

@ -4,6 +4,7 @@
import os import os
import logging import logging
import mimetypes import mimetypes
import random
from ..base.handlers import APIHandler, IPythonHandler from ..base.handlers import APIHandler, IPythonHandler
from ..utils import url_path_join from ..utils import url_path_join
@ -134,6 +135,7 @@ class GatewayWebSocketClient(LoggingConfigurable):
self.ws = None self.ws = None
self.ws_future = Future() self.ws_future = Future()
self.disconnected = False self.disconnected = False
self.retry = 0
@gen.coroutine @gen.coroutine
def _connect(self, kernel_id): def _connect(self, kernel_id):
@ -155,6 +157,7 @@ class GatewayWebSocketClient(LoggingConfigurable):
def _connection_done(self, fut): def _connection_done(self, fut):
if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError if not self.disconnected and fut.exception() is None: # prevent concurrent.futures._base.CancelledError
self.ws = fut.result() self.ws = fut.result()
self.retry = 0
self.log.debug("Connection is ready: ws: {}".format(self.ws)) self.log.debug("Connection is ready: ws: {}".format(self.ws))
else: else:
self.log.warning("Websocket connection has been closed via client disconnect or due to error. " self.log.warning("Websocket connection has been closed via client disconnect or due to error. "
@ -189,8 +192,15 @@ class GatewayWebSocketClient(LoggingConfigurable):
else: # ws cancelled - stop reading else: # ws cancelled - stop reading
break break
if not self.disconnected: # if websocket is not disconnected by client, attept to reconnect to Gateway # NOTE(esevan): if websocket is not disconnected by client, try to reconnect.
self.log.info("Attempting to re-establish the connection to Gateway: {}".format(self.kernel_id)) if not self.disconnected and self.retry < GatewayClient.instance().gateway_retry_max:
jitter = random.randint(10, 100) * 0.01
retry_interval = min(GatewayClient.instance().gateway_retry_interval * (2 ** self.retry),
GatewayClient.instance().gateway_retry_interval_max) + jitter
self.retry += 1
self.log.info("Attempting to re-establish the connection to Gateway in %s secs (%s/%s): %s",
retry_interval, self.retry, GatewayClient.instance().gateway_retry_max, self.kernel_id)
yield gen.sleep(retry_interval)
self._connect(self.kernel_id) self._connect(self.kernel_id)
loop = IOLoop.current() loop = IOLoop.current()
loop.add_future(self.ws_future, lambda future: self._read_messages(callback)) loop.add_future(self.ws_future, lambda future: self._read_messages(callback))

View File

@ -22,7 +22,7 @@ from traitlets.config import SingletonConfigurable
class GatewayClient(SingletonConfigurable): class GatewayClient(SingletonConfigurable):
"""This class manages the configuration. It's its own singleton class so that we """This class manages the configuration. It's its own singleton class so that we
can share these values across all objects. It also contains some helper methods can share these values across all objects. It also contains some helper methods
to build request arguments out of the various config options. to build request arguments out of the various config options.
""" """
@ -220,6 +220,38 @@ class GatewayClient(SingletonConfigurable):
def _env_whitelist_default(self): def _env_whitelist_default(self):
return os.environ.get(self.env_whitelist_env, self.env_whitelist_default_value) return os.environ.get(self.env_whitelist_env, self.env_whitelist_default_value)
gateway_retry_interval_default_value = 1.0
gateway_retry_interval_env = 'JUPYTER_GATEWAY_RETRY_INTERVAL'
gateway_retry_interval = Float(default_value=gateway_retry_interval_default_value, config=True,
help="""The time allowed for HTTP reconnection with the Gateway server for the first time.
Next will be JUPYTER_GATEWAY_RETRY_INTERVAL multiplied by two in factor of numbers of retries
but less than JUPYTER_GATEWAY_RETRY_INTERVAL_MAX.
(JUPYTER_GATEWAY_RETRY_INTERVAL env var)""")
@default('gateway_retry_interval')
def gateway_retry_interval_default(self):
return float(os.environ.get('JUPYTER_GATEWAY_RETRY_INTERVAL', self.gateway_retry_interval_default_value))
gateway_retry_interval_max_default_value = 30.0
gateway_retry_interval_max_env = 'JUPYTER_GATEWAY_RETRY_INTERVAL_MAX'
gateway_retry_interval_max = Float(default_value=gateway_retry_interval_max_default_value, config=True,
help="""The maximum time allowed for HTTP reconnection retry with the Gateway server.
(JUPYTER_GATEWAY_RETRY_INTERVAL_MAX env var)""")
@default('gateway_retry_interval_max')
def gateway_retry_interval_max_default(self):
return float(os.environ.get('JUPYTER_GATEWAY_RETRY_INTERVAL_MAX', self.gateway_retry_interval_max_default_value))
gateway_retry_max_default_value = 5
gateway_retry_max_env = 'JUPYTER_GATEWAY_RETRY_MAX'
gateway_retry_max = Float(default_value=gateway_retry_max_default_value, config=True,
help="""The maximum numbers allowed for HTTP reconnection retries with the Gateway server.
(JUPYTER_GATEWAY_RETRY_MAX env var)""")
@default('gateway_retry_max')
def gateway_retry_max_default(self):
return int(os.environ.get('JUPYTER_GATEWAY_RETRY_MAX', self.gateway_retry_max_default_value))
@property @property
def gateway_enabled(self): def gateway_enabled(self):
return bool(self.url is not None and len(self.url) > 0) return bool(self.url is not None and len(self.url) > 0)
@ -503,7 +535,6 @@ class GatewayKernelManager(MappingKernelManager):
self.remove_kernel(kernel_id) self.remove_kernel(kernel_id)
class GatewayKernelSpecManager(KernelSpecManager): class GatewayKernelSpecManager(KernelSpecManager):
def __init__(self, **kwargs): def __init__(self, **kwargs):