- libuuid (sometimes part of e2fsprogs)
- sqlite3
- Python (2.7 or later, or 3.2 or later)
+ - Python six, a Python 2/3 compatibility library
+ https://pypi.python.org/pypi/six
- boto, the python interface to Amazon's Web Services (for S3 storage)
http://code.google.com/p/boto
- paramiko, SSH2 protocol for python (for sftp storage)
import os
import posixpath
import re
+import six
import sqlite3
import subprocess
import sys
import cumulus.store
import cumulus.store.file
-
-if sys.version < "3":
- StringTypes = (str, unicode)
-else:
- StringTypes = (str,)
+import cumulus.util
# The largest supported snapshot format that can be understood.
FORMAT_VERSION = (0, 11) # Cumulus Snapshot v0.11
Newline markers are retained."""
return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
-def uri_decode(s):
- """Decode a URI-encoded (%xx escapes) string."""
- def hex_decode(m): return chr(int(m.group(1), 16))
- return re.sub(r"%([0-9a-f]{2})", hex_decode, s)
-def uri_encode(s):
- """Encode a string to URI-encoded (%xx escapes) form."""
- def hex_encode(c):
- if c > '+' and c < '\x7f' and c != '@':
- return c
- else:
- return "%%%02x" % (ord(c),)
- return ''.join(hex_encode(c) for c in s)
-
class Struct:
"""A class which merely acts as a data container.
store may either be a Store object or URL.
"""
- if type(backend) in StringTypes:
+ if isinstance(backend, six.string_types):
self._backend = cumulus.store.open(backend)
else:
self._backend = backend
@staticmethod
def decode_str(s):
"""Decode a URI-encoded (%xx escapes) string."""
- return uri_decode(s)
+ return cumulus.util.uri_decode_pathname(s)
@staticmethod
def raw_str(s):
import time
import cumulus
+from cumulus import util
CHECKSUM_ALGORITHM = "sha224"
CHUNKER_PROGRAM = "cumulus-chunker-standalone"
data_size += tarinfo.size
object_count += 1
- return {"segment": cumulus.uri_encode(segment_name),
- "path": cumulus.uri_encode(relative_path),
+ return {"segment": util.uri_encode_pathname(segment_name),
+ "path": util.uri_encode_pathname(relative_path),
"checksum": checksum,
"data_size": data_size,
"disk_size": disk_size,
--- /dev/null
+# Cumulus: Efficient Filesystem Backup to the Cloud
+# Copyright (C) 2014 The Cumulus Developers
+# See the AUTHORS file for a list of contributors.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Collected utility functions for use by Cumulus."""
+
+from __future__ import division, print_function, unicode_literals
+
+import re
+import six
+
+# The encoding assumed when interpreting path names.
+ENCODING="utf-8"
+
+# In both Python 2 and Python 3 pathnames are represented using the str type.
+# For Python 2, this means that the converting from a bytestring to a pathname
+# is a no-op. For Python 3, the conversion assumes a utf-8 encoding, but the
+# surrogateescape encoding error handler is used to allow other byte sequences
+# to be passed through.
+if six.PY2:
+ def bytes_to_pathname(b): return b
+ def pathname_to_bytes(p):
+ if isinstance(p, unicode):
+ return p.encode(encoding=ENCODING, errors="replace")
+ else:
+ return p
+elif six.PY3:
+ def bytes_to_pathname(b):
+ """Decodes a byte string to a pathname.
+
+ The input is assumed to be encoded using ENCODING (defaults to
+ utf-8)."""
+ return b.decode(encoding=ENCODING, errors="surrogateescape")
+
+ def pathname_to_bytes(p):
+ """Converts a pathname to encoded bytes.
+
+ The input is encoded to ENCODING (defaults to utf-8)."""
+ return p.encode(encoding=ENCODING, errors="surrogateescape")
+else:
+ raise AssertionError("Unsupported Python version")
+
+def uri_decode_raw(s):
+ """Decode a URI-encoded (%xx escapes) string.
+
+ The input should be a string, preferably only using ASCII characters. The
+ output will be of type bytes."""
+ def hex_decode(m): return six.int2byte(int(m.group(1), 16))
+ return re.sub(br"%([0-9a-fA-F]{2})", hex_decode, pathname_to_bytes(s))
+
+def uri_encode_raw(s):
+ """Encode a bytes array to URI-encoded (%xx escapes) form."""
+ def hex_encode(c):
+ # Allow certain literal characters: c > "+" and c < "\x7f" and c != "@"
+ if c > 0x2b and c < 0x7f and c != 0x40:
+ return chr(c)
+ else:
+ return "%%%02x" % c
+
+ return "".join(hex_encode(c) for c in six.iterbytes(s))
+
+def uri_decode_pathname(s):
+ """Decodes a URI-encoded string to a pathname."""
+ return bytes_to_pathname(uri_decode_raw(s))
+
+def uri_encode_pathname(p):
+ """Encodes a pathname to a URI-encoded string."""
+ return uri_encode_raw(pathname_to_bytes(p))
--- /dev/null
+#!/usr/bin/python
+# coding: utf-8
+#
+# Cumulus: Efficient Filesystem Backup to the Cloud
+# Copyright (C) 2014 The Cumulus Developers
+# See the AUTHORS file for a list of contributors.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+"""Unit tests for the cumulus.util module."""
+
+from __future__ import division, print_function, unicode_literals
+
+import six
+import unittest
+
+from cumulus import util
+
+class UtilCodecs(unittest.TestCase):
+ def test_pathnames(self):
+ self.assertEqual(util.ENCODING, "utf-8")
+ if six.PY2:
+ self.assertEqual(util.bytes_to_pathname(b"ext\xc3\xa9nsion"),
+ b"ext\xc3\xa9nsion")
+ self.assertEqual(util.pathname_to_bytes(b"ext\xc3\xa9nsion"),
+ b"ext\xc3\xa9nsion")
+ self.assertEqual(util.pathname_to_bytes(u"exténsion"),
+ b"ext\xc3\xa9nsion")
+ elif six.PY3:
+ self.assertEqual(util.bytes_to_pathname(b"ext\xc3\xa9nsion"),
+ "exténsion")
+ self.assertEqual(util.pathname_to_bytes("exténsion"),
+ b"ext\xc3\xa9nsion")
+ self.assertEqual(util.bytes_to_pathname(b"inv\xe1lid"),
+ "inv\udce1lid")
+ self.assertEqual(util.pathname_to_bytes("inv\udce1lid"),
+ b"inv\xe1lid")
+
+ def test_uri_encode_raw(self):
+ self.assertEqual(util.uri_encode_raw(b"sample ASCII"), "sample%20ASCII")
+ self.assertEqual(util.uri_encode_raw(b"sample ext\xc3\xa9nded"),
+ "sample%20ext%c3%a9nded")
+
+ def test_uri_decode_raw(self):
+ self.assertEqual(util.uri_decode_raw("sample%20ASCII"), b"sample ASCII")
+ self.assertEqual(util.uri_decode_raw("sample%20ext%c3%a9nded"),
+ b"sample ext\xc3\xa9nded")
+
+ def test_uri_decode_pathname(self):
+ if six.PY2:
+ self.assertEqual(util.uri_decode_pathname("sample%20ext%c3%a9nded"),
+ b"sample ext\xc3\xa9nded")
+ self.assertEqual(util.uri_decode_pathname("sample%20exténded"),
+ b"sample ext\xc3\xa9nded")
+ # In Python 2, non-UTF-8 sequences are just passed through as
+ # bytestrings.
+ self.assertEqual(util.uri_decode_pathname(b"inv%e1lid"),
+ b"inv\xe1lid")
+ self.assertEqual(util.uri_decode_pathname(b"inv\xe1lid"),
+ b"inv\xe1lid")
+ elif six.PY3:
+ self.assertEqual(util.uri_decode_pathname("sample%20ext%c3%a9nded"),
+ "sample exténded")
+ self.assertEqual(util.uri_decode_pathname("sample%20exténded"),
+ "sample exténded")
+ # In Python 3, non-UTF-8 sequences are represented using surrogate
+ # escapes to allow lossless conversion back to the appropriate
+ # bytestring.
+ self.assertEqual(util.uri_decode_pathname("inv%e1lid"),
+ "inv\udce1lid")
+ self.assertEqual(
+ util.pathname_to_bytes(util.uri_decode_pathname("inv%e1lid")),
+ b"inv\xe1lid")
+
+
+if __name__ == "__main__":
+ unittest.main()