from __future__ import division, print_function, unicode_literals
+import codecs
import hashlib
import itertools
import os
import re
import sqlite3
+import subprocess
import sys
import tarfile
import tempfile
import cumulus.store
import cumulus.store.file
-if sys.version < '3':
+if sys.version < "3":
StringTypes = (str, unicode)
else:
StringTypes = (str,)
("", None),
]
+def to_lines(data):
+ """Decode binary data from a file into a sequence of lines.
+
+ Newline markers are retained."""
+ return list(codecs.iterdecode(data.splitlines(True), "utf-8"))
+
def uri_decode(s):
"""Decode a URI-encoded (%xx escapes) string."""
def hex_decode(m): return chr(int(m.group(1), 16))
store may either be a Store object or URL.
"""
if type(backend) in StringTypes:
- if backend.find(":") >= 0:
- self._backend = cumulus.store.open(backend)
- else:
- self._backend = cumulus.store.file.FileStore(backend)
+ self._backend = cumulus.store.open(backend)
else:
self._backend = backend
def load_snapshot(self, snapshot):
snapshot_file = self.backend.open_snapshot(snapshot)[0]
- return snapshot_file.read().splitlines(True)
+ return to_lines(snapshot_file.read())
@staticmethod
def filter_data(filehandle, filter_cmd):
if filter_cmd is None:
return filehandle
- (input, output) = os.popen2(filter_cmd)
+ p = subprocess.Popen(filter_cmd, shell=True, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, close_fds=True)
+ input, output = p.stdin, p.stdout
def copy_thread(src, dst):
BLOCK_SIZE = 4096
while True:
dst.write(block)
src.close()
dst.close()
+ p.wait()
_thread.start_new_thread(copy_thread, (filehandle, input))
return output
def follow_ref(refstr):
if len(stack) >= MAX_RECURSION_DEPTH: raise OverflowError
- lines = object_store.get(refstr).splitlines(True)
+ lines = to_lines(object_store.get(refstr))
lines.reverse()
stack.append(lines)