mirror of
https://github.com/jupyter/notebook.git
synced 2025-01-24 12:05:22 +08:00
commit
9d17ea997d
@ -11,6 +11,7 @@ import errno
|
||||
import io
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
from tornado.web import HTTPError
|
||||
|
||||
@ -19,10 +20,91 @@ from IPython.html.utils import (
|
||||
to_os_path,
|
||||
)
|
||||
from IPython import nbformat
|
||||
from IPython.utils.io import atomic_writing
|
||||
from IPython.utils.py3compat import str_to_unicode
|
||||
|
||||
|
||||
def _copy_metadata(src, dst):
|
||||
"""Copy the set of metadata we want for atomic_writing.
|
||||
|
||||
Permission bits and flags. We'd like to copy file ownership as well, but we
|
||||
can't do that.
|
||||
"""
|
||||
shutil.copymode(src, dst)
|
||||
st = os.stat(src)
|
||||
if hasattr(os, 'chflags') and hasattr(st, 'st_flags'):
|
||||
os.chflags(dst, st.st_flags)
|
||||
|
||||
@contextmanager
|
||||
def atomic_writing(path, text=True, encoding='utf-8', **kwargs):
|
||||
"""Context manager to write to a file only if the entire write is successful.
|
||||
|
||||
This works by creating a temporary file in the same directory, and renaming
|
||||
it over the old file if the context is exited without an error. If other
|
||||
file names are hard linked to the target file, this relationship will not be
|
||||
preserved.
|
||||
|
||||
On Windows, there is a small chink in the atomicity: the target file is
|
||||
deleted before renaming the temporary file over it. This appears to be
|
||||
unavoidable.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : str
|
||||
The target file to write to.
|
||||
|
||||
text : bool, optional
|
||||
Whether to open the file in text mode (i.e. to write unicode). Default is
|
||||
True.
|
||||
|
||||
encoding : str, optional
|
||||
The encoding to use for files opened in text mode. Default is UTF-8.
|
||||
|
||||
**kwargs
|
||||
Passed to :func:`io.open`.
|
||||
"""
|
||||
# realpath doesn't work on Windows: http://bugs.python.org/issue9949
|
||||
# Luckily, we only need to resolve the file itself being a symlink, not
|
||||
# any of its directories, so this will suffice:
|
||||
if os.path.islink(path):
|
||||
path = os.path.join(os.path.dirname(path), os.readlink(path))
|
||||
|
||||
dirname, basename = os.path.split(path)
|
||||
tmp_dir = tempfile.mkdtemp(prefix=basename, dir=dirname)
|
||||
tmp_path = os.path.join(tmp_dir, basename)
|
||||
if text:
|
||||
fileobj = io.open(tmp_path, 'w', encoding=encoding, **kwargs)
|
||||
else:
|
||||
fileobj = io.open(tmp_path, 'wb', **kwargs)
|
||||
|
||||
try:
|
||||
yield fileobj
|
||||
except:
|
||||
fileobj.close()
|
||||
shutil.rmtree(tmp_dir)
|
||||
raise
|
||||
|
||||
# Flush to disk
|
||||
fileobj.flush()
|
||||
os.fsync(fileobj.fileno())
|
||||
|
||||
# Written successfully, now rename it
|
||||
fileobj.close()
|
||||
|
||||
# Copy permission bits, access time, etc.
|
||||
try:
|
||||
_copy_metadata(path, tmp_path)
|
||||
except OSError:
|
||||
# e.g. the file didn't already exist. Ignore any failure to copy metadata
|
||||
pass
|
||||
|
||||
if os.name == 'nt' and os.path.exists(path):
|
||||
# Rename over existing file doesn't work on Windows
|
||||
os.remove(path)
|
||||
|
||||
os.rename(tmp_path, path)
|
||||
shutil.rmtree(tmp_dir)
|
||||
|
||||
|
||||
class FileManagerMixin(object):
|
||||
"""
|
||||
Mixin for ContentsAPI classes that interact with the filesystem.
|
||||
|
131
IPython/html/services/contents/tests/test_fileio.py
Normal file
131
IPython/html/services/contents/tests/test_fileio.py
Normal file
@ -0,0 +1,131 @@
|
||||
# encoding: utf-8
|
||||
"""Tests for file IO"""
|
||||
|
||||
# Copyright (c) IPython Development Team.
|
||||
# Distributed under the terms of the Modified BSD License.
|
||||
|
||||
import io as stdlib_io
|
||||
import os.path
|
||||
import stat
|
||||
|
||||
import nose.tools as nt
|
||||
|
||||
from IPython.testing.decorators import skip_win32
|
||||
from ..fileio import atomic_writing
|
||||
|
||||
from IPython.utils.tempdir import TemporaryDirectory
|
||||
|
||||
umask = 0
|
||||
|
||||
def test_atomic_writing():
|
||||
class CustomExc(Exception): pass
|
||||
|
||||
with TemporaryDirectory() as td:
|
||||
f1 = os.path.join(td, 'penguin')
|
||||
with stdlib_io.open(f1, 'w') as f:
|
||||
f.write(u'Before')
|
||||
|
||||
if os.name != 'nt':
|
||||
os.chmod(f1, 0o701)
|
||||
orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
|
||||
|
||||
f2 = os.path.join(td, 'flamingo')
|
||||
try:
|
||||
os.symlink(f1, f2)
|
||||
have_symlink = True
|
||||
except (AttributeError, NotImplementedError, OSError):
|
||||
# AttributeError: Python doesn't support it
|
||||
# NotImplementedError: The system doesn't support it
|
||||
# OSError: The user lacks the privilege (Windows)
|
||||
have_symlink = False
|
||||
|
||||
with nt.assert_raises(CustomExc):
|
||||
with atomic_writing(f1) as f:
|
||||
f.write(u'Failing write')
|
||||
raise CustomExc
|
||||
|
||||
# Because of the exception, the file should not have been modified
|
||||
with stdlib_io.open(f1, 'r') as f:
|
||||
nt.assert_equal(f.read(), u'Before')
|
||||
|
||||
with atomic_writing(f1) as f:
|
||||
f.write(u'Overwritten')
|
||||
|
||||
with stdlib_io.open(f1, 'r') as f:
|
||||
nt.assert_equal(f.read(), u'Overwritten')
|
||||
|
||||
if os.name != 'nt':
|
||||
mode = stat.S_IMODE(os.stat(f1).st_mode)
|
||||
nt.assert_equal(mode, orig_mode)
|
||||
|
||||
if have_symlink:
|
||||
# Check that writing over a file preserves a symlink
|
||||
with atomic_writing(f2) as f:
|
||||
f.write(u'written from symlink')
|
||||
|
||||
with stdlib_io.open(f1, 'r') as f:
|
||||
nt.assert_equal(f.read(), u'written from symlink')
|
||||
|
||||
def _save_umask():
|
||||
global umask
|
||||
umask = os.umask(0)
|
||||
os.umask(umask)
|
||||
|
||||
def _restore_umask():
|
||||
os.umask(umask)
|
||||
|
||||
@skip_win32
|
||||
@nt.with_setup(_save_umask, _restore_umask)
|
||||
def test_atomic_writing_umask():
|
||||
with TemporaryDirectory() as td:
|
||||
os.umask(0o022)
|
||||
f1 = os.path.join(td, '1')
|
||||
with atomic_writing(f1) as f:
|
||||
f.write(u'1')
|
||||
mode = stat.S_IMODE(os.stat(f1).st_mode)
|
||||
nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
|
||||
|
||||
os.umask(0o057)
|
||||
f2 = os.path.join(td, '2')
|
||||
with atomic_writing(f2) as f:
|
||||
f.write(u'2')
|
||||
mode = stat.S_IMODE(os.stat(f2).st_mode)
|
||||
nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
|
||||
|
||||
|
||||
def test_atomic_writing_newlines():
|
||||
with TemporaryDirectory() as td:
|
||||
path = os.path.join(td, 'testfile')
|
||||
|
||||
lf = u'a\nb\nc\n'
|
||||
plat = lf.replace(u'\n', os.linesep)
|
||||
crlf = lf.replace(u'\n', u'\r\n')
|
||||
|
||||
# test default
|
||||
with stdlib_io.open(path, 'w') as f:
|
||||
f.write(lf)
|
||||
with stdlib_io.open(path, 'r', newline='') as f:
|
||||
read = f.read()
|
||||
nt.assert_equal(read, plat)
|
||||
|
||||
# test newline=LF
|
||||
with stdlib_io.open(path, 'w', newline='\n') as f:
|
||||
f.write(lf)
|
||||
with stdlib_io.open(path, 'r', newline='') as f:
|
||||
read = f.read()
|
||||
nt.assert_equal(read, lf)
|
||||
|
||||
# test newline=CRLF
|
||||
with atomic_writing(path, newline='\r\n') as f:
|
||||
f.write(lf)
|
||||
with stdlib_io.open(path, 'r', newline='') as f:
|
||||
read = f.read()
|
||||
nt.assert_equal(read, crlf)
|
||||
|
||||
# test newline=no convert
|
||||
text = u'crlf\r\ncr\rlf\n'
|
||||
with atomic_writing(path, newline='') as f:
|
||||
f.write(text)
|
||||
with stdlib_io.open(path, 'r', newline='') as f:
|
||||
read = f.read()
|
||||
nt.assert_equal(read, text)
|
@ -18,9 +18,7 @@ import unittest
|
||||
import nose.tools as nt
|
||||
|
||||
from IPython.testing.decorators import skipif, skip_win32
|
||||
from IPython.utils.io import (Tee, capture_output, unicode_std_stream,
|
||||
atomic_writing,
|
||||
)
|
||||
from IPython.utils.io import Tee, capture_output
|
||||
from IPython.utils.py3compat import doctest_refactor_print, PY3
|
||||
from IPython.utils.tempdir import TemporaryDirectory
|
||||
|
||||
@ -86,146 +84,4 @@ def test_capture_output():
|
||||
nt.assert_equal(io.stdout, 'hi, stdout\n')
|
||||
nt.assert_equal(io.stderr, 'hi, stderr\n')
|
||||
|
||||
def test_UnicodeStdStream():
|
||||
# Test wrapping a bytes-level stdout
|
||||
if PY3:
|
||||
stdoutb = stdlib_io.BytesIO()
|
||||
stdout = stdlib_io.TextIOWrapper(stdoutb, encoding='ascii')
|
||||
else:
|
||||
stdout = stdoutb = stdlib_io.BytesIO()
|
||||
|
||||
orig_stdout = sys.stdout
|
||||
sys.stdout = stdout
|
||||
try:
|
||||
sample = u"@łe¶ŧ←"
|
||||
unicode_std_stream().write(sample)
|
||||
|
||||
output = stdoutb.getvalue().decode('utf-8')
|
||||
nt.assert_equal(output, sample)
|
||||
assert not stdout.closed
|
||||
finally:
|
||||
sys.stdout = orig_stdout
|
||||
|
||||
@skipif(not PY3, "Not applicable on Python 2")
|
||||
def test_UnicodeStdStream_nowrap():
|
||||
# If we replace stdout with a StringIO, it shouldn't get wrapped.
|
||||
orig_stdout = sys.stdout
|
||||
sys.stdout = StringIO()
|
||||
try:
|
||||
nt.assert_is(unicode_std_stream(), sys.stdout)
|
||||
assert not sys.stdout.closed
|
||||
finally:
|
||||
sys.stdout = orig_stdout
|
||||
|
||||
def test_atomic_writing():
|
||||
class CustomExc(Exception): pass
|
||||
|
||||
with TemporaryDirectory() as td:
|
||||
f1 = os.path.join(td, 'penguin')
|
||||
with stdlib_io.open(f1, 'w') as f:
|
||||
f.write(u'Before')
|
||||
|
||||
if os.name != 'nt':
|
||||
os.chmod(f1, 0o701)
|
||||
orig_mode = stat.S_IMODE(os.stat(f1).st_mode)
|
||||
|
||||
f2 = os.path.join(td, 'flamingo')
|
||||
try:
|
||||
os.symlink(f1, f2)
|
||||
have_symlink = True
|
||||
except (AttributeError, NotImplementedError, OSError):
|
||||
# AttributeError: Python doesn't support it
|
||||
# NotImplementedError: The system doesn't support it
|
||||
# OSError: The user lacks the privilege (Windows)
|
||||
have_symlink = False
|
||||
|
||||
with nt.assert_raises(CustomExc):
|
||||
with atomic_writing(f1) as f:
|
||||
f.write(u'Failing write')
|
||||
raise CustomExc
|
||||
|
||||
# Because of the exception, the file should not have been modified
|
||||
with stdlib_io.open(f1, 'r') as f:
|
||||
nt.assert_equal(f.read(), u'Before')
|
||||
|
||||
with atomic_writing(f1) as f:
|
||||
f.write(u'Overwritten')
|
||||
|
||||
with stdlib_io.open(f1, 'r') as f:
|
||||
nt.assert_equal(f.read(), u'Overwritten')
|
||||
|
||||
if os.name != 'nt':
|
||||
mode = stat.S_IMODE(os.stat(f1).st_mode)
|
||||
nt.assert_equal(mode, orig_mode)
|
||||
|
||||
if have_symlink:
|
||||
# Check that writing over a file preserves a symlink
|
||||
with atomic_writing(f2) as f:
|
||||
f.write(u'written from symlink')
|
||||
|
||||
with stdlib_io.open(f1, 'r') as f:
|
||||
nt.assert_equal(f.read(), u'written from symlink')
|
||||
|
||||
def _save_umask():
|
||||
global umask
|
||||
umask = os.umask(0)
|
||||
os.umask(umask)
|
||||
|
||||
def _restore_umask():
|
||||
os.umask(umask)
|
||||
|
||||
@skip_win32
|
||||
@nt.with_setup(_save_umask, _restore_umask)
|
||||
def test_atomic_writing_umask():
|
||||
with TemporaryDirectory() as td:
|
||||
os.umask(0o022)
|
||||
f1 = os.path.join(td, '1')
|
||||
with atomic_writing(f1) as f:
|
||||
f.write(u'1')
|
||||
mode = stat.S_IMODE(os.stat(f1).st_mode)
|
||||
nt.assert_equal(mode, 0o644, '{:o} != 644'.format(mode))
|
||||
|
||||
os.umask(0o057)
|
||||
f2 = os.path.join(td, '2')
|
||||
with atomic_writing(f2) as f:
|
||||
f.write(u'2')
|
||||
mode = stat.S_IMODE(os.stat(f2).st_mode)
|
||||
nt.assert_equal(mode, 0o620, '{:o} != 620'.format(mode))
|
||||
|
||||
|
||||
def test_atomic_writing_newlines():
|
||||
with TemporaryDirectory() as td:
|
||||
path = os.path.join(td, 'testfile')
|
||||
|
||||
lf = u'a\nb\nc\n'
|
||||
plat = lf.replace(u'\n', os.linesep)
|
||||
crlf = lf.replace(u'\n', u'\r\n')
|
||||
|
||||
# test default
|
||||
with stdlib_io.open(path, 'w') as f:
|
||||
f.write(lf)
|
||||
with stdlib_io.open(path, 'r', newline='') as f:
|
||||
read = f.read()
|
||||
nt.assert_equal(read, plat)
|
||||
|
||||
# test newline=LF
|
||||
with stdlib_io.open(path, 'w', newline='\n') as f:
|
||||
f.write(lf)
|
||||
with stdlib_io.open(path, 'r', newline='') as f:
|
||||
read = f.read()
|
||||
nt.assert_equal(read, lf)
|
||||
|
||||
# test newline=CRLF
|
||||
with atomic_writing(path, newline='\r\n') as f:
|
||||
f.write(lf)
|
||||
with stdlib_io.open(path, 'r', newline='') as f:
|
||||
read = f.read()
|
||||
nt.assert_equal(read, crlf)
|
||||
|
||||
# test newline=no convert
|
||||
text = u'crlf\r\ncr\rlf\n'
|
||||
with atomic_writing(path, newline='') as f:
|
||||
f.write(text)
|
||||
with stdlib_io.open(path, 'r', newline='') as f:
|
||||
read = f.read()
|
||||
nt.assert_equal(read, text)
|
||||
|
Loading…
Reference in New Issue
Block a user