DEV: Implement os.walk analog for ContentsManagers.

This commit is contained in:
Scott Sanderson 2015-01-06 23:59:31 -05:00
parent 1e2e86dcca
commit 2b73f1e620
2 changed files with 119 additions and 0 deletions

View File

@ -30,6 +30,21 @@ from IPython.utils.py3compat import string_types
copy_pat = re.compile(r'\-Copy\d*\.')
def _separate_dirs_files(models):
"""
Split an iterable of models into a list of file paths and a list of
directory paths.
"""
dirs = []
files = []
for model in models:
if model['type'] == 'directory':
dirs.append(model['path'])
else:
files.append(model['path'])
return dirs, files
class CheckpointManager(LoggingConfigurable):
"""
Base class for managing checkpoints for a ContentsManager.
@ -499,6 +514,31 @@ class ContentsManager(LoggingConfigurable):
"""Should this file/directory name be displayed in a listing?"""
return not any(fnmatch(name, glob) for glob in self.hide_globs)
def walk(self):
"""
Like os.walk, but written in terms of the ContentsAPI.
Returns a generator of tuples of the form:
(directory name, [subdirectories], [files in directory])
"""
return self._walk([''])
def _walk(self, dirs):
"""
Recursive helper for walk.
"""
for directory in dirs:
children = self.get(
directory,
content=True,
type='directory',
)['content']
dirs, files = map(sorted, _separate_dirs_files(children))
yield (directory, dirs, files)
if dirs:
for entry in self._walk(dirs):
yield(entry)
# Part 3: Checkpoints API
def create_checkpoint(self, path):
"""Create a checkpoint."""

View File

@ -614,3 +614,82 @@ class APITest(NotebookTestBase):
with TemporaryDirectory() as td:
with self.patch_cp_root(td):
self.test_file_checkpoints()
def test_walk(self):
"""
Test ContentsManager.walk.
"""
results = list(self.notebook.contents_manager.walk())
expected = [
(
'',
[
'Directory with spaces in',
'foo',
'ordering',
u'unicodé',
u'å b',
],
['inroot.blob', 'inroot.ipynb', 'inroot.txt'],
),
(
'Directory with spaces in',
[],
['inspace.blob', 'inspace.ipynb', 'inspace.txt'],
),
(
'foo',
['bar'],
[
'a.blob', 'a.ipynb', 'a.txt',
'b.blob', 'b.ipynb', 'b.txt',
'name with spaces.blob',
'name with spaces.ipynb',
'name with spaces.txt',
u'unicodé.blob', u'unicodé.ipynb', u'unicodé.txt'
]
),
(
'foo/bar',
[],
['baz.blob', 'baz.ipynb', 'baz.txt'],
),
(
'ordering',
[],
[
'A.blob', 'A.ipynb', 'A.txt',
'C.blob', 'C.ipynb', 'C.txt',
'b.blob', 'b.ipynb', 'b.txt',
],
),
(
u'unicodé',
[],
['innonascii.blob', 'innonascii.ipynb', 'innonascii.txt'],
),
(
u'å b',
[],
[u'ç d.blob', u'ç d.ipynb', u'ç d.txt'],
),
]
for idx, (dname, subdirs, files) in enumerate(expected):
result_dname, result_subdirs, result_files = results[idx]
if dname == '':
sep = ''
else:
sep = '/'
self.assertEqual(
dname,
result_dname,
)
self.assertEqual(
[sep.join([dname, sub]) for sub in subdirs],
result_subdirs,
)
self.assertEqual(
[sep.join([dname, fname]) for fname in files],
result_files,
)