DEV: Separate FileCheckpointManager and GenericFileCheckpointManager.

- Adds a `GenericCheckpointMixin` as a helper for implementing the two
  boundary-traversing Checkpoint API methods, `create_checkpoint` and
  `restore_checkpoint`.

- `GenericFileCheckpointManager` is implemented as a subclass of
  `FileCheckpointManager` using `GenericCheckpointMixin`.  Note that
  this is the safe subtyping relationship because of method
  signature *contra*variance: `FileCheckpointManager` accepts
  `FileContentsManager` in its method signatures type, whereas
  `GenericFileCheckpointManager` accepts any `ContentsManager`.

- Moved Checkpoint-related classes to their own files.
This commit is contained in:
Scott Sanderson 2015-01-08 13:49:58 -05:00
parent 12fe97e2af
commit 021e2da495
6 changed files with 494 additions and 445 deletions

View File

@ -0,0 +1,112 @@
"""
Classes for managing Checkpoints.
"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
from IPython.config.configurable import LoggingConfigurable
class CheckpointManager(LoggingConfigurable):
"""
Base class for managing checkpoints for a ContentsManager.
Subclasses are required to implement:
create_checkpoint(self, contents_mgr, path)
restore_checkpoint(self, contents_mgr, checkpoint_id, path)
rename_checkpoint(self, checkpoint_id, old_path, new_path)
delete_checkpoint(self, checkpoint_id, path)
list_checkpoints(self, path)
"""
def create_checkpoint(self, contents_mgr, path):
"""Create a checkpoint."""
raise NotImplementedError("must be implemented in a subclass")
def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
"""Restore a checkpoint"""
raise NotImplementedError("must be implemented in a subclass")
def rename_checkpoint(self, checkpoint_id, old_path, new_path):
"""Rename a single checkpoint from old_path to new_path."""
raise NotImplementedError("must be implemented in a subclass")
def delete_checkpoint(self, checkpoint_id, path):
"""delete a checkpoint for a file"""
raise NotImplementedError("must be implemented in a subclass")
def list_checkpoints(self, path):
"""Return a list of checkpoints for a given file"""
raise NotImplementedError("must be implemented in a subclass")
def rename_all_checkpoints(self, old_path, new_path):
"""Rename all checkpoints for old_path to new_path."""
for cp in self.list_checkpoints(old_path):
self.rename_checkpoint(cp['id'], old_path, new_path)
def delete_all_checkpoints(self, path):
"""Delete all checkpoints for the given path."""
for checkpoint in self.list_checkpoints(path):
self.delete_checkpoint(checkpoint['id'], path)
class GenericCheckpointMixin(object):
"""
Helper for creating CheckpointManagers that can be used with any
ContentsManager.
Provides an implementation of `create_checkpoint` and `restore_checkpoint`
in terms of the following operations:
create_file_checkpoint(self, content, format, path)
create_notebook_checkpoint(self, nb, path)
get_checkpoint(self, checkpoint_id, path, type)
**Any** valid CheckpointManager implementation should also be valid when
this mixin is applied.
"""
def create_checkpoint(self, contents_mgr, path):
model = contents_mgr.get(path, content=True)
type = model['type']
if type == 'notebook':
return self.create_notebook_checkpoint(
model['content'],
path,
)
elif type == 'file':
return self.create_file_checkpoint(
model['content'],
model['format'],
path,
)
def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
"""Restore a checkpoint."""
type = contents_mgr.get(path, content=False)['type']
model = self.get_checkpoint(checkpoint_id, path, type)
contents_mgr.save(model, path)
# Required Methods
def create_file_checkpoint(self, content, format, path):
"""Create a checkpoint of the current state of a file
Returns a checkpoint model for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
def create_notebook_checkpoint(self, nb, path):
"""Create a checkpoint of the current state of a file
Returns a checkpoint model for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
def get_checkpoint(self, checkpoint_id, path, type):
"""Get the content of a checkpoint.
Returns an unvalidated model with the same structure as
the return value of ContentsManager.get
"""
raise NotImplementedError("must be implemented in a subclass")

View File

@ -0,0 +1,198 @@
"""
File-based CheckpointManagers.
"""
import os
import shutil
from tornado.web import HTTPError
from .checkpoints import (
CheckpointManager,
GenericCheckpointMixin,
)
from .fileio import FileManagerMixin
from IPython.utils import tz
from IPython.utils.path import ensure_dir_exists
from IPython.utils.py3compat import getcwd
from IPython.utils.traitlets import Unicode
class FileCheckpointManager(FileManagerMixin, CheckpointManager):
"""
A CheckpointManager that caches checkpoints for files in adjacent
directories.
Only works with FileContentsManager. Use GenericFileCheckpointManager if
you want file-based checkpoints with another ContentsManager.
"""
checkpoint_dir = Unicode(
'.ipynb_checkpoints',
config=True,
help="""The directory name in which to keep file checkpoints
This is a path relative to the file's own directory.
By default, it is .ipynb_checkpoints
""",
)
root_dir = Unicode(config=True)
def _root_dir_default(self):
try:
return self.parent.root_dir
except AttributeError:
return getcwd()
# ContentsManager-dependent checkpoint API
def create_checkpoint(self, contents_mgr, path):
"""Create a checkpoint."""
checkpoint_id = u'checkpoint'
src_path = contents_mgr._get_os_path(path)
dest_path = self.checkpoint_path(checkpoint_id, path)
self._copy(src_path, dest_path)
return self.checkpoint_model(checkpoint_id, dest_path)
def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
"""Restore a checkpoint."""
src_path = self.checkpoint_path(checkpoint_id, path)
dest_path = contents_mgr._get_os_path(path)
self._copy(src_path, dest_path)
# ContentsManager-independent checkpoint API
def rename_checkpoint(self, checkpoint_id, old_path, new_path):
"""Rename a checkpoint from old_path to new_path."""
old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
if os.path.isfile(old_cp_path):
self.log.debug(
"Renaming checkpoint %s -> %s",
old_cp_path,
new_cp_path,
)
with self.perm_to_403():
shutil.move(old_cp_path, new_cp_path)
def delete_checkpoint(self, checkpoint_id, path):
"""delete a file's checkpoint"""
path = path.strip('/')
cp_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(cp_path):
self.no_such_checkpoint(path, checkpoint_id)
self.log.debug("unlinking %s", cp_path)
with self.perm_to_403():
os.unlink(cp_path)
def list_checkpoints(self, path):
"""list the checkpoints for a given file
This contents manager currently only supports one checkpoint per file.
"""
path = path.strip('/')
checkpoint_id = "checkpoint"
os_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(os_path):
return []
else:
return [self.checkpoint_model(checkpoint_id, os_path)]
# Checkpoint-related utilities
def checkpoint_path(self, checkpoint_id, path):
"""find the path to a checkpoint"""
path = path.strip('/')
parent, name = ('/' + path).rsplit('/', 1)
parent = parent.strip('/')
basename, ext = os.path.splitext(name)
filename = u"{name}-{checkpoint_id}{ext}".format(
name=basename,
checkpoint_id=checkpoint_id,
ext=ext,
)
os_path = self._get_os_path(path=parent)
cp_dir = os.path.join(os_path, self.checkpoint_dir)
with self.perm_to_403():
ensure_dir_exists(cp_dir)
cp_path = os.path.join(cp_dir, filename)
return cp_path
def checkpoint_model(self, checkpoint_id, os_path):
"""construct the info dict for a given checkpoint"""
stats = os.stat(os_path)
last_modified = tz.utcfromtimestamp(stats.st_mtime)
info = dict(
id=checkpoint_id,
last_modified=last_modified,
)
return info
# Error Handling
def no_such_checkpoint(self, path, checkpoint_id):
raise HTTPError(
404,
u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
)
class GenericFileCheckpointManager(GenericCheckpointMixin,
FileCheckpointManager):
"""
Local filesystem CheckpointManager that works with any conforming
ContentsManager.
"""
def create_file_checkpoint(self, content, format, path):
"""Create a checkpoint from the current content of a notebook."""
path = path.strip('/')
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
self.log.debug("creating checkpoint for %s", path)
with self.perm_to_403():
self._save_file(os_checkpoint_path, content, format=format)
# return the checkpoint info
return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
def create_notebook_checkpoint(self, nb, path):
"""Create a checkpoint from the current content of a notebook."""
path = path.strip('/')
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
self.log.debug("creating checkpoint for %s", path)
with self.perm_to_403():
self._save_notebook(os_checkpoint_path, nb)
# return the checkpoint info
return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
def get_checkpoint(self, checkpoint_id, path, type):
"""Get the content of a checkpoint.
Returns a model suitable for passing to ContentsManager.save.
"""
path = path.strip('/')
self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(os_checkpoint_path):
self.no_such_checkpoint(path, checkpoint_id)
if type == 'notebook':
return {
'type': type,
'content': self._read_notebook(
os_checkpoint_path,
as_version=4,
),
}
elif type == 'file':
content, format = self._read_file(os_checkpoint_path, format=None)
return {
'type': type,
'content': content,
'format': format,
}
else:
raise HTTPError(500, u'Unexpected type %s' % type)

View File

@ -0,0 +1,166 @@
"""
Utilities for file-based Contents/Checkpoints managers.
"""
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import base64
from contextlib import contextmanager
import errno
import io
import os
import shutil
from tornado.web import HTTPError
from IPython.html.utils import (
to_api_path,
to_os_path,
)
from IPython import nbformat
from IPython.utils.io import atomic_writing
from IPython.utils.py3compat import str_to_unicode
class FileManagerMixin(object):
"""
Mixin for ContentsAPI classes that interact with the filesystem.
Provides facilities for reading, writing, and copying both notebooks and
generic files.
Shared by FileContentsManager and FileCheckpointManager.
Note
----
Classes using this mixin must provide the following attributes:
root_dir : unicode
A directory against against which API-style paths are to be resolved.
log : logging.Logger
"""
@contextmanager
def open(self, os_path, *args, **kwargs):
"""wrapper around io.open that turns permission errors into 403"""
with self.perm_to_403(os_path):
with io.open(os_path, *args, **kwargs) as f:
yield f
@contextmanager
def atomic_writing(self, os_path, *args, **kwargs):
"""wrapper around atomic_writing that turns permission errors to 403"""
with self.perm_to_403(os_path):
with atomic_writing(os_path, *args, **kwargs) as f:
yield f
@contextmanager
def perm_to_403(self, os_path=''):
"""context manager for turning permission errors into 403."""
try:
yield
except OSError as e:
if e.errno in {errno.EPERM, errno.EACCES}:
# make 403 error message without root prefix
# this may not work perfectly on unicode paths on Python 2,
# but nobody should be doing that anyway.
if not os_path:
os_path = str_to_unicode(e.filename or 'unknown file')
path = to_api_path(os_path, root=self.root_dir)
raise HTTPError(403, u'Permission denied: %s' % path)
else:
raise
def _copy(self, src, dest):
"""copy src to dest
like shutil.copy2, but log errors in copystat
"""
shutil.copyfile(src, dest)
try:
shutil.copystat(src, dest)
except OSError:
self.log.debug("copystat on %s failed", dest, exc_info=True)
def _get_os_path(self, path):
"""Given an API path, return its file system path.
Parameters
----------
path : string
The relative API path to the named file.
Returns
-------
path : string
Native, absolute OS path to for a file.
"""
return to_os_path(path, self.root_dir)
def _read_notebook(self, os_path, as_version=4):
"""Read a notebook from an os path."""
with self.open(os_path, 'r', encoding='utf-8') as f:
try:
return nbformat.read(f, as_version=as_version)
except Exception as e:
raise HTTPError(
400,
u"Unreadable Notebook: %s %r" % (os_path, e),
)
def _save_notebook(self, os_path, nb):
"""Save a notebook to an os_path."""
with self.atomic_writing(os_path, encoding='utf-8') as f:
nbformat.write(nb, f, version=nbformat.NO_CONVERT)
def _read_file(self, os_path, format):
"""Read a non-notebook file.
os_path: The path to be read.
format:
If 'text', the contents will be decoded as UTF-8.
If 'base64', the raw bytes contents will be encoded as base64.
If not specified, try to decode as UTF-8, and fall back to base64
"""
if not os.path.isfile(os_path):
raise HTTPError(400, "Cannot read non-file %s" % os_path)
with self.open(os_path, 'rb') as f:
bcontent = f.read()
if format is None or format == 'text':
# Try to interpret as unicode if format is unknown or if unicode
# was explicitly requested.
try:
return bcontent.decode('utf8'), 'text'
except UnicodeError:
if format == 'text':
raise HTTPError(
400,
"%s is not UTF-8 encoded" % os_path,
reason='bad format',
)
return base64.encodestring(bcontent).decode('ascii'), 'base64'
def _save_file(self, os_path, content, format):
"""Save content of a generic file."""
if format not in {'text', 'base64'}:
raise HTTPError(
400,
"Must specify format of file contents as 'text' or 'base64'",
)
try:
if format == 'text':
bcontent = content.encode('utf8')
else:
b64_bytes = content.encode('ascii')
bcontent = base64.decodestring(b64_bytes)
except Exception as e:
raise HTTPError(
400, u'Encoding error saving %s: %s' % (os_path, e)
)
with self.atomic_writing(os_path, text=False) as f:
f.write(bcontent)

View File

@ -3,9 +3,7 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
import base64
from contextlib import contextmanager
import errno
import io
import os
import shutil
@ -13,25 +11,23 @@ import mimetypes
from tornado import web
from .manager import (
CheckpointManager,
ContentsManager,
)
from .filecheckpoints import FileCheckpointManager
from .fileio import FileManagerMixin
from .manager import ContentsManager
from IPython import nbformat
from IPython.utils.io import atomic_writing
from IPython.utils.importstring import import_item
from IPython.utils.path import ensure_dir_exists
from IPython.utils.traitlets import Any, Unicode, Bool, TraitError
from IPython.utils.py3compat import getcwd, string_types, str_to_unicode
from IPython.utils.py3compat import getcwd, string_types
from IPython.utils import tz
from IPython.html.utils import (
is_hidden,
to_api_path,
to_os_path,
)
_script_exporter = None
def _post_save_script(model, os_path, contents_manager, **kwargs):
"""convert notebooks to Python script after save with nbconvert
@ -56,346 +52,6 @@ def _post_save_script(model, os_path, contents_manager, **kwargs):
f.write(script)
class FileManagerMixin(object):
"""
Mixin for ContentsAPI classes that interact with the filesystem.
Provides facilities for reading, writing, and copying both notebooks and
generic files.
Shared by FileContentsManager and FileCheckpointManager.
Note
----
Classes using this mixin must provide the following attributes:
root_dir : unicode
A directory against against which API-style paths are to be resolved.
log : logging.Logger
"""
@contextmanager
def open(self, os_path, *args, **kwargs):
"""wrapper around io.open that turns permission errors into 403"""
with self.perm_to_403(os_path):
with io.open(os_path, *args, **kwargs) as f:
yield f
@contextmanager
def atomic_writing(self, os_path, *args, **kwargs):
"""wrapper around atomic_writing that turns permission errors into 403"""
with self.perm_to_403(os_path):
with atomic_writing(os_path, *args, **kwargs) as f:
yield f
@contextmanager
def perm_to_403(self, os_path=''):
"""context manager for turning permission errors into 403."""
try:
yield
except OSError as e:
if e.errno in {errno.EPERM, errno.EACCES}:
# make 403 error message without root prefix
# this may not work perfectly on unicode paths on Python 2,
# but nobody should be doing that anyway.
if not os_path:
os_path = str_to_unicode(e.filename or 'unknown file')
path = to_api_path(os_path, root=self.root_dir)
raise web.HTTPError(403, u'Permission denied: %s' % path)
else:
raise
def _copy(self, src, dest):
"""copy src to dest
like shutil.copy2, but log errors in copystat
"""
shutil.copyfile(src, dest)
try:
shutil.copystat(src, dest)
except OSError:
self.log.debug("copystat on %s failed", dest, exc_info=True)
def _get_os_path(self, path):
"""Given an API path, return its file system path.
Parameters
----------
path : string
The relative API path to the named file.
Returns
-------
path : string
Native, absolute OS path to for a file.
"""
return to_os_path(path, self.root_dir)
def _read_notebook(self, os_path, as_version=4):
"""Read a notebook from an os path."""
with self.open(os_path, 'r', encoding='utf-8') as f:
try:
return nbformat.read(f, as_version=as_version)
except Exception as e:
raise web.HTTPError(
400,
u"Unreadable Notebook: %s %r" % (os_path, e),
)
def _save_notebook(self, os_path, nb):
"""Save a notebook to an os_path."""
with self.atomic_writing(os_path, encoding='utf-8') as f:
nbformat.write(nb, f, version=nbformat.NO_CONVERT)
def _read_file(self, os_path, format):
"""Read a non-notebook file.
os_path: The path to be read.
format:
If 'text', the contents will be decoded as UTF-8.
If 'base64', the raw bytes contents will be encoded as base64.
If not specified, try to decode as UTF-8, and fall back to base64
"""
if not os.path.isfile(os_path):
raise web.HTTPError(400, "Cannot read non-file %s" % os_path)
with self.open(os_path, 'rb') as f:
bcontent = f.read()
if format is None or format == 'text':
# Try to interpret as unicode if format is unknown or if unicode
# was explicitly requested.
try:
return bcontent.decode('utf8'), 'text'
except UnicodeError as e:
if format == 'text':
raise web.HTTPError(
400,
"%s is not UTF-8 encoded" % os_path,
reason='bad format',
)
return base64.encodestring(bcontent).decode('ascii'), 'base64'
def _save_file(self, os_path, content, format):
"""Save content of a generic file."""
if format not in {'text', 'base64'}:
raise web.HTTPError(
400,
"Must specify format of file contents as 'text' or 'base64'",
)
try:
if format == 'text':
bcontent = content.encode('utf8')
else:
b64_bytes = content.encode('ascii')
bcontent = base64.decodestring(b64_bytes)
except Exception as e:
raise web.HTTPError(400, u'Encoding error saving %s: %s' % (os_path, e))
with self.atomic_writing(os_path, text=False) as f:
f.write(bcontent)
class FileCheckpointManager(FileManagerMixin, CheckpointManager):
"""
A CheckpointManager that caches checkpoints for files in adjacent
directories.
"""
checkpoint_dir = Unicode(
'.ipynb_checkpoints',
config=True,
help="""The directory name in which to keep file checkpoints
This is a path relative to the file's own directory.
By default, it is .ipynb_checkpoints
""",
)
root_dir = Unicode(config=True)
def _root_dir_default(self):
try:
return self.parent.root_dir
except AttributeError:
return getcwd()
# ContentsManager-dependent checkpoint API
def create_checkpoint(self, contents_mgr, path):
"""
Create a checkpoint.
If contents_mgr is backed by the local filesystem, just copy the
appropriate file to the checkpoint directory. Otherwise, ask the
ContentsManager for a model and write it ourselves.
"""
if contents_mgr.backend == 'local_file':
# We know that the file is in the local filesystem, so just copy
# from the base location to our location.
checkpoint_id = u'checkpoint'
src_path = contents_mgr._get_os_path(path)
dest_path = self.checkpoint_path(checkpoint_id, path)
self._copy(src_path, dest_path)
return self.checkpoint_model(checkpoint_id, dest_path)
else:
return super(FileCheckpointManager, self).create_checkpoint(
contents_mgr, path,
)
def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
"""
Restore a checkpoint.
If contents_mgr is backed by the local filesystem, just copy the
appropriate file from the checkpoint directory. Otherwise, load the
model and pass it to ContentsManager.save.
"""
if contents_mgr.backend == 'local_file':
# We know that the file is in the local filesystem, so just copy
# from our base location to the location expected by content
src_path = self.checkpoint_path(checkpoint_id, path)
dest_path = contents_mgr._get_os_path(path)
self._copy(src_path, dest_path)
else:
super(FileCheckpointManager, self).restore_checkpoint(
contents_mgr, checkpoint_id, path
)
# ContentsManager-independent checkpoint API
def rename_checkpoint(self, checkpoint_id, old_path, new_path):
"""Rename a checkpoint from old_path to new_path."""
old_cp_path = self.checkpoint_path(checkpoint_id, old_path)
new_cp_path = self.checkpoint_path(checkpoint_id, new_path)
if os.path.isfile(old_cp_path):
self.log.debug(
"Renaming checkpoint %s -> %s",
old_cp_path,
new_cp_path,
)
with self.perm_to_403():
shutil.move(old_cp_path, new_cp_path)
def delete_checkpoint(self, checkpoint_id, path):
"""delete a file's checkpoint"""
path = path.strip('/')
cp_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(cp_path):
self.no_such_checkpoint(path, checkpoint_id)
self.log.debug("unlinking %s", cp_path)
with self.perm_to_403():
os.unlink(cp_path)
def list_checkpoints(self, path):
"""list the checkpoints for a given file
This contents manager currently only supports one checkpoint per file.
"""
path = path.strip('/')
checkpoint_id = "checkpoint"
os_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(os_path):
return []
else:
return [self.checkpoint_model(checkpoint_id, os_path)]
# Checkpoint-related utilities
def checkpoint_path(self, checkpoint_id, path):
"""find the path to a checkpoint"""
path = path.strip('/')
parent, name = ('/' + path).rsplit('/', 1)
parent = parent.strip('/')
basename, ext = os.path.splitext(name)
filename = u"{name}-{checkpoint_id}{ext}".format(
name=basename,
checkpoint_id=checkpoint_id,
ext=ext,
)
os_path = self._get_os_path(path=parent)
cp_dir = os.path.join(os_path, self.checkpoint_dir)
with self.perm_to_403():
ensure_dir_exists(cp_dir)
cp_path = os.path.join(cp_dir, filename)
return cp_path
def checkpoint_model(self, checkpoint_id, os_path):
"""construct the info dict for a given checkpoint"""
stats = os.stat(os_path)
last_modified = tz.utcfromtimestamp(stats.st_mtime)
info = dict(
id=checkpoint_id,
last_modified=last_modified,
)
return info
def create_file_checkpoint(self, content, format, path):
"""Create a checkpoint from the current content of a notebook."""
path = path.strip('/')
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
self.log.debug("creating checkpoint for %s", path)
with self.perm_to_403():
self._save_file(os_checkpoint_path, content, format=format)
# return the checkpoint info
return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
def create_notebook_checkpoint(self, nb, path):
"""Create a checkpoint from the current content of a notebook."""
path = path.strip('/')
# only the one checkpoint ID:
checkpoint_id = u"checkpoint"
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
self.log.debug("creating checkpoint for %s", path)
with self.perm_to_403():
self._save_notebook(os_checkpoint_path, nb)
# return the checkpoint info
return self.checkpoint_model(checkpoint_id, os_checkpoint_path)
def get_checkpoint(self, checkpoint_id, path, type):
"""Get the content of a checkpoint.
Returns a model suitable for passing to ContentsManager.save.
"""
path = path.strip('/')
self.log.info("restoring %s from checkpoint %s", path, checkpoint_id)
os_checkpoint_path = self.checkpoint_path(checkpoint_id, path)
if not os.path.isfile(os_checkpoint_path):
self.no_such_checkpoint(path, checkpoint_id)
if type == 'notebook':
return {
'type': type,
'content': self._read_notebook(
os_checkpoint_path,
as_version=4,
),
}
elif type == 'file':
content, format = self._read_file(os_checkpoint_path, format=None)
return {
'type': type,
'content': content,
'format': format,
}
else:
raise web.HTTPError(
500,
u'Unexpected type %s' % type
)
# Error Handling
def no_such_checkpoint(self, path, checkpoint_id):
raise web.HTTPError(
404,
u'Checkpoint does not exist: %s@%s' % (path, checkpoint_id)
)
class FileContentsManager(FileManagerMixin, ContentsManager):
root_dir = Unicode(config=True)
@ -468,9 +124,6 @@ class FileContentsManager(FileManagerMixin, ContentsManager):
def _checkpoint_manager_class_default(self):
return FileCheckpointManager
def _backend_default(self):
return 'local_file'
def is_hidden(self, path):
"""Does the API style path correspond to a hidden directory or file?

View File

@ -11,6 +11,7 @@ import re
from tornado.web import HTTPError
from .checkpoints import CheckpointManager
from IPython.config.configurable import LoggingConfigurable
from IPython.nbformat import sign, validate, ValidationError
from IPython.nbformat.v4 import new_notebook
@ -29,77 +30,6 @@ from IPython.utils.py3compat import string_types
copy_pat = re.compile(r'\-Copy\d*\.')
class CheckpointManager(LoggingConfigurable):
"""
Base class for managing checkpoints for a ContentsManager.
"""
def create_checkpoint(self, contents_mgr, path):
model = contents_mgr.get(path, content=True)
type = model['type']
if type == 'notebook':
return self.create_notebook_checkpoint(
model['content'],
path,
)
elif type == 'file':
return self.create_file_checkpoint(
model['content'],
model['format'],
path,
)
def restore_checkpoint(self, contents_mgr, checkpoint_id, path):
"""Restore a checkpoint."""
type = contents_mgr.get(path, content=False)['type']
model = self.get_checkpoint(checkpoint_id, path, type)
contents_mgr.save(model, path)
def create_file_checkpoint(self, content, format, path):
"""Create a checkpoint of the current state of a file
Returns a checkpoint model for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
def create_notebook_checkpoint(self, nb, path):
"""Create a checkpoint of the current state of a file
Returns a checkpoint model for the new checkpoint.
"""
raise NotImplementedError("must be implemented in a subclass")
def get_checkpoint(self, checkpoint_id, path, type):
"""Get the content of a checkpoint.
Returns an unvalidated model with the same structure as
the return value of ContentsManager.get
"""
raise NotImplementedError("must be implemented in a subclass")
def rename_checkpoint(self, checkpoint_id, old_path, new_path):
"""Rename a single checkpoint from old_path to new_path."""
raise NotImplementedError("must be implemented in a subclass")
def delete_checkpoint(self, checkpoint_id, path):
"""delete a checkpoint for a file"""
raise NotImplementedError("must be implemented in a subclass")
def list_checkpoints(self, path):
"""Return a list of checkpoints for a given file"""
raise NotImplementedError("must be implemented in a subclass")
def rename_all_checkpoints(self, old_path, new_path):
"""Rename all checkpoints for old_path to new_path."""
for cp in self.list_checkpoints(old_path):
self.rename_checkpoint(cp['id'], old_path, new_path)
def delete_all_checkpoints(self, path):
"""Delete all checkpoints for the given path."""
for checkpoint in self.list_checkpoints(path):
self.delete_checkpoint(checkpoint['id'], path)
class ContentsManager(LoggingConfigurable):
"""Base class for serving files and directories.
@ -180,7 +110,6 @@ class ContentsManager(LoggingConfigurable):
checkpoint_manager_class = Type(CheckpointManager, config=True)
checkpoint_manager = Instance(CheckpointManager, config=True)
checkpoint_manager_kwargs = Dict(allow_none=False, config=True)
backend = Unicode(default_value="")
def _checkpoint_manager_default(self):
return self.checkpoint_manager_class(**self.checkpoint_manager_kwargs)

View File

@ -13,6 +13,9 @@ pjoin = os.path.join
import requests
from ..filecheckpoints import GenericFileCheckpointManager
from IPython.config import Config
from IPython.html.utils import url_path_join, url_escape, to_os_path
from IPython.html.tests.launchnotebook import NotebookTestBase, assert_http_error
from IPython.nbformat import read, write, from_dict
@ -615,24 +618,12 @@ class APITest(NotebookTestBase):
with self.patch_cp_root(td):
self.test_file_checkpoints()
@contextmanager
def patch_cm_backend(self):
"""
Temporarily patch our ContentsManager to present a different backend.
"""
mgr = self.notebook.contents_manager
old_backend = mgr.backend
mgr.backend = ""
try:
yield
finally:
mgr.backend = old_backend
def test_checkpoints_empty_backend(self):
with self.patch_cm_backend():
self.test_checkpoints()
with self.patch_cm_backend():
self.test_file_checkpoints()
class GenericFileCheckpointsAPITest(APITest):
"""
Run the tests from APITest with GenericFileCheckpointManager.
"""
config = Config()
config.FileContentsManager.checkpoint_manager_class = \
GenericFileCheckpointManager