from __future__ import division, print_function, unicode_literals
+import codecs
import hashlib
import itertools
import os
import cumulus.store
import cumulus.store.file
-if sys.version < '3':
+if sys.version < "3":
StringTypes = (str, unicode)
else:
StringTypes = (str,)
("", None),
]
+def to_lines(data):
+ """Decode binary data from a file into a sequence of lines.
+
+ Newline markers are retained."""
+ return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
+
def uri_decode(s):
"""Decode a URI-encoded (%xx escapes) string."""
def hex_decode(m): return chr(int(m.group(1), 16))
def load_snapshot(self, snapshot):
snapshot_file = self.backend.open_snapshot(snapshot)[0]
- return snapshot_file.read().splitlines(True)
+ return to_lines(snapshot_file.read())
@staticmethod
def filter_data(filehandle, filter_cmd):
def follow_ref(refstr):
if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
- lines = object_store.get(refstr).splitlines(True)
+ lines = to_lines(object_store.get(refstr))
lines.reverse()
stack.append(lines)
def get(self, path):
try:
- return open(os.path.join(self.prefix, path), 'rb')
+ return open(os.path.join(self.prefix, path), "rb")
except IOError:
raise cumulus.store.NotFoundError(path)