Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions _tests/test_pput.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from cStringIO import StringIO
from io import BytesIO
from datetime import datetime
from Queue import Queue
from queue import Queue
from uuid import uuid4
import hashlib

Expand Down Expand Up @@ -39,19 +39,19 @@ def sample_data():
"""
global _cached_sample_data
if _cached_sample_data is None:
data = StringIO()
chars = "".join(chr(i) for i in xrange(256))
for count in xrange(6):
data = BytesIO()
chars = "".join(chr(i) for i in range(256))
for count in range(6):
cc = chr(count)
for _ in xrange(2 * 1024):
for _ in range(2 * 1024):
# each iteration adds 1MB
# each 1MB chunk is made up of an alternation of the block's index (zero based)
# and an incrementing counter (overflows to 0 several times)
# the first block will be: 00 00 00 01 00 02 ... 00 ff 00 00 ... 00 ff
data.write(
"".join(cc+chars[i] for i in xrange(256))
"".join(cc+chars[i] for i in range(256)).encode("latin1")
)
print "wrote {} MB" .format(data.tell() / 1024.0 / 1024.0)
print("wrote {} MB" .format(data.tell() / 1024.0 / 1024.0))
# give the test a read-only file to avoid accidentally modifying the data between tests
_cached_sample_data = ReadOnlyFile(data)
_cached_sample_data.seek(0)
Expand All @@ -71,12 +71,12 @@ def test_multipart_etag(sample_data):


def test_stream_handler():
stream_handler = StreamHandler(StringIO("aabbccdde"), chunk_size=2)
stream_handler = StreamHandler(BytesIO(b"aabbccdde"), chunk_size=2)
chunks = []
while not stream_handler.finished:
chunk = stream_handler.get_chunk()
chunks.append(chunk)
assert chunks == ['aa', 'bb', 'cc', 'dd', 'e']
assert chunks == [b'aa', b'bb', b'cc', b'dd', b'e']


def test_handle_results():
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_supervisor_loop(sample_data):


def test_zero_data(sample_data):
stream_handler = StreamHandler(StringIO())
stream_handler = StreamHandler(BytesIO())
bucket = FakeBucket()
sup = UploadSupervisor(stream_handler, 'test', bucket=bucket)
with pytest.raises(UploadException):
Expand All @@ -149,6 +149,7 @@ def upload_part(self, index, chunk):
return hashlib.md5(chunk).hexdigest()


@pytest.mark.filterwarnings("ignore:Exception in thread")
def test_supervisor_loop_with_worker_crash(sample_data):
stream_handler = StreamHandler(sample_data)
bucket = FakeBucket()
Expand All @@ -174,7 +175,7 @@ def call(self):
def test_retry_decorator():
boom = Boom()
with pytest.raises(BoomException) as excp_info:
for _ in xrange(3):
for _ in range(3):
boom.call()
assert boom.count == 3

Expand Down
22 changes: 11 additions & 11 deletions _tests/test_snap.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# pylint: disable=redefined-outer-name,protected-access
from collections import OrderedDict
from cStringIO import StringIO
from io import StringIO
import contextlib
import string
import sys
Expand Down Expand Up @@ -30,7 +30,7 @@ def __init__(self, name, metadata=None):


class FakeBucket(object):
rand_prefix = 'test-' + ''.join([random.choice(string.ascii_letters) for _ in xrange(8)]) + '/'
rand_prefix = 'test-' + ''.join([random.choice(string.ascii_letters) for _ in range(8)]) + '/'
fake_data = {
"pool/fs@snap_0": {'parent': 'pool/fs@snap_expired'},
"pool/fs@snap_1_f": {'isfull': 'true', 'compressor': 'pigz1'},
Expand All @@ -44,7 +44,7 @@ class FakeBucket(object):

def list(self, *a, **kwa):
# boto bucket.list gives you keys without metadata, let's emulate that
return (FakeKey(os.path.join(self.rand_prefix, name)) for name in self.fake_data.iterkeys())
return (FakeKey(os.path.join(self.rand_prefix, name)) for name in self.fake_data.keys())

def get_key(self, key):
name = key[len(self.rand_prefix):]
Expand All @@ -60,9 +60,9 @@ def write_s3_data():
cfg = get_config()
bucket = boto.connect_s3(
cfg['S3_KEY_ID'], cfg['S3_SECRET']).get_bucket(cfg['BUCKET'])
for name, metadata in FakeBucket.fake_data.iteritems():
for name, metadata in FakeBucket.fake_data.items():
key = bucket.new_key(os.path.join(FakeBucket.rand_prefix, name))
headers = {("x-amz-meta-" + k): v for k, v in metadata.iteritems()}
headers = {("x-amz-meta-" + k): v for k, v in metadata.items()}
key.set_contents_from_string("spam", headers=headers)
return bucket

Expand Down Expand Up @@ -204,8 +204,8 @@ def test_list_local_snapshots():
}
snapshots = zfs._parse_snapshots()
# comparing .items() because we care about the sorting in the OrderedDict's
assert snapshots['pool'].items() == expected['pool'].items()
assert snapshots['pool/fs'].items() == expected['pool/fs'].items()
assert list(snapshots['pool'].items()) == list(expected['pool'].items())
assert list(snapshots['pool/fs'].items()) == list(expected['pool/fs'].items())


@pytest.mark.parametrize("fs_name, expected", [
Expand Down Expand Up @@ -324,7 +324,7 @@ def test_backup_incremental_missing_parent(s3_manager):
pair_manager = PairManager(s3_manager, zfs_manager, command_executor=fake_cmd)
with pytest.raises(IntegrityError) as excp_info:
pair_manager.backup_incremental()
assert excp_info.value.message == \
assert str(excp_info.value) == \
"Broken snapshot detected pool/fs@snap_5, reason: 'parent broken'"
assert fake_cmd._called_commands == []

Expand All @@ -345,7 +345,7 @@ def test_backup_incremental_cycle(s3_manager):
pair_manager = PairManager(s3_manager, zfs_manager, command_executor=fake_cmd)
with pytest.raises(IntegrityError) as excp_info:
pair_manager.backup_incremental()
assert excp_info.value.message == \
assert str(excp_info.value) == \
"Broken snapshot detected pool/fs@snap_7_cycle, reason: 'cycle detected'"
assert fake_cmd._called_commands == []

Expand Down Expand Up @@ -456,7 +456,7 @@ def test_restore_broken(s3_manager):
pair_manager = PairManager(s3_manager, zfs_manager, command_executor=fake_cmd)
with pytest.raises(IntegrityError) as excp_info:
pair_manager.restore('pool/fs@snap_4_mp')
assert excp_info.value.message == \
assert str(excp_info.value) == \
"Broken snapshot detected pool/fs@snap_4_mp, reason: 'missing parent'"


Expand Down Expand Up @@ -501,7 +501,7 @@ def test_get_latest():
fake_cmd = FakeCommandExecutor()
with pytest.raises(SoftError) as excp_info:
zfs_manager.get_latest()
assert excp_info.value.message == \
assert str(excp_info.value) == \
'Nothing to backup for filesystem "None". Are you sure ' \
'SNAPSHOT_PREFIX="zfs-auto-snap:daily" is correct?'
assert fake_cmd._called_commands == []
Expand Down
10 changes: 5 additions & 5 deletions _tests/test_ssh_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)


@pytest.mark.parametrize("pair, expected", HAPPY_PATH.values(), ids=HAPPY_PATH.keys())
@pytest.mark.parametrize("pair, expected", list(HAPPY_PATH.values()), ids=list(HAPPY_PATH.keys()))
def test_snapshots_to_send(pair, expected):
local, remote = pair
assert snapshots_to_send(local, remote) == expected
Expand All @@ -36,12 +36,12 @@ def test_snapshots_to_send(pair, expected):
)


@pytest.mark.parametrize('pair, err_msg', ERRORS.values(), ids=ERRORS.keys())
@pytest.mark.parametrize('pair, err_msg', list(ERRORS.values()), ids=list(ERRORS.keys()))
def test_snapshots_to_send_error(pair, err_msg):
local, remote = pair
with pytest.raises(AssertionError) as err:
snapshots_to_send(local, remote)
assert err_msg == err.value.message
assert err_msg == str(err.value)


PULL_HAPPY_PATH = dict(
Expand All @@ -59,7 +59,7 @@ def test_snapshots_to_send_error(pair, err_msg):
)


@pytest.mark.parametrize('pair, expected', PULL_HAPPY_PATH.values(), ids=PULL_HAPPY_PATH.keys())
@pytest.mark.parametrize('pair, expected', list(PULL_HAPPY_PATH.values()), ids=list(PULL_HAPPY_PATH.keys()))
def test_pull_command(pair, expected):
commands = sync_snapshots(
pair,
Expand Down Expand Up @@ -87,7 +87,7 @@ def test_pull_command(pair, expected):
)


@pytest.mark.parametrize('pair, expected', PUSH_HAPPY_PATH.values(), ids=PUSH_HAPPY_PATH.keys())
@pytest.mark.parametrize('pair, expected', list(PUSH_HAPPY_PATH.values()), ids=list(PUSH_HAPPY_PATH.keys()))
def test_push_command(pair, expected):
commands = sync_snapshots(
pair,
Expand Down
4 changes: 2 additions & 2 deletions z3/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import ConfigParser
import configparser
import os
import os.path

Expand Down Expand Up @@ -50,7 +50,7 @@ def get(self, key, default=None, section=None):
def get_config():
global _settings
if _settings is None:
_config = ConfigParser.ConfigParser()
_config = configparser.ConfigParser()
default = os.path.join(z3.__path__[0], "z3.conf")
_config.read(default)
_config.read("/etc/z3_backup/z3.conf")
Expand Down
2 changes: 1 addition & 1 deletion z3/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def main():
else:
s3 = boto3.client('s3', **extra_config)
try:
s3.download_fileobj(cfg['BUCKET'], args.name, sys.stdout, Config=config)
s3.download_fileobj(cfg['BUCKET'], args.name, sys.stdout.buffer, Config=config)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
Expand Down
20 changes: 10 additions & 10 deletions z3/pput.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
pput bucket_name/filename
"""

from Queue import Queue
from cStringIO import StringIO
from queue import Queue
from io import BytesIO
from collections import namedtuple
from threading import Thread
import argparse
Expand Down Expand Up @@ -47,7 +47,7 @@ def multipart_etag(digests):


def parse_size(size):
if isinstance(size, (int, long)):
if isinstance(size, int):
return size
size = size.strip().upper()
last = size[-1]
Expand All @@ -66,7 +66,7 @@ class StreamHandler(object):
def __init__(self, input_stream, chunk_size=5*1024*1024):
self.input_stream = input_stream
self.chunk_size = chunk_size
self._partial_chunk = ""
self._partial_chunk = b""
self._eof_reached = False

@property
Expand All @@ -82,7 +82,7 @@ def get_chunk(self):
self._partial_chunk += read
if len(self._partial_chunk) == self.chunk_size or self._eof_reached:
chunk = self._partial_chunk
self._partial_chunk = ""
self._partial_chunk = b""
return chunk
# else:
# print "partial", len(self._partial_chunk)
Expand All @@ -92,7 +92,7 @@ def retry(times=int(CFG['MAX_RETRIES'])):
def decorator(func):
@functools.wraps(func)
def wrapped(*a, **kwa):
for attempt in xrange(1, times+1):
for attempt in range(1, times+1):
try:
return func(*a, **kwa)
except: # pylint: disable=bare-except
Expand Down Expand Up @@ -123,7 +123,7 @@ def upload_part(self, index, chunk):
part.id = self.multipart.id
part.key_name = self.multipart.key_name
return part.upload_part_from_file(
StringIO(chunk), index, replace=True).md5
BytesIO(chunk), index, replace=True).md5

def start(self):
self._thread = Thread(target=self.main_loop)
Expand Down Expand Up @@ -179,7 +179,7 @@ def _start_workers(self, concurrency, worker_class):
inbox=work_queue,
outbox=result_queue,
).start()
for _ in xrange(concurrency)]
for _ in range(concurrency)]
return workers

def _begin_upload(self):
Expand Down Expand Up @@ -321,7 +321,7 @@ def parse_args():

def main():
args = parse_args()
input_fd = os.fdopen(args.file_descriptor, 'r') if args.file_descriptor else sys.stdin
input_fd = os.fdopen(args.file_descriptor, 'rb') if args.file_descriptor else sys.stdin.buffer
if args.estimated is not None:
chunk_size = optimize_chunksize(parse_size(args.estimated))
else:
Expand Down Expand Up @@ -359,7 +359,7 @@ def main():
sys.stderr.write("{}\n".format(excp))
return 1
if verbosity >= VERB_NORMAL:
print json.dumps({'status': 'success', 'etag': etag})
print(json.dumps({'status': 'success', 'etag': etag}))


if __name__ == '__main__':
Expand Down
6 changes: 3 additions & 3 deletions z3/s3_mp_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ def cleanup_multipart(bucket, max_days=1, dry_run=False):
max_age_seconds = max_days * 24 * 3600
now = datetime.utcnow()
fmt = "{} | {:30} | {:20}"
print fmt.format("A", "key", "initiated")
print(fmt.format("A", "key", "initiated"))
for multi in bucket.list_multipart_uploads():
delta = now-boto.utils.parse_ts(multi.initiated)
if delta.total_seconds() >= max_age_seconds:
print fmt.format("X", multi.key_name, multi.initiated)
print(fmt.format("X", multi.key_name, multi.initiated))
if not dry_run:
multi.cancel_upload()
else:
print fmt.format(" ", multi.key_name, multi.initiated)
print(fmt.format(" ", multi.key_name, multi.initiated))


def main():
Expand Down
12 changes: 5 additions & 7 deletions z3/snap.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function

import argparse
import functools
import logging
Expand Down Expand Up @@ -145,7 +143,7 @@ def _snapshots(self):
return snapshots

def list(self):
return sorted(self._snapshots.values(), key=operator.attrgetter('name'))
return sorted(list(self._snapshots.values()), key=operator.attrgetter('name'))

def get(self, name):
return self._snapshots.get(name)
Expand All @@ -171,7 +169,7 @@ def _list_snapshots(self):
# see FakeZFSManager
return subprocess.check_output(
['zfs', 'list', '-Ht', 'snap', '-o',
'name,used,refer,mountpoint,written'])
'name,used,refer,mountpoint,written'], universal_newlines=True)

def _parse_snapshots(self):
"""Returns all snapshots grouped by filesystem, a dict of OrderedDict's
Expand Down Expand Up @@ -206,7 +204,7 @@ def _build_snapshots(self, fs_name):
# for fs_name, fs_snaps in self._parse_snapshots().iteritems():
fs_snaps = self._parse_snapshots().get(fs_name, {})
parent = None
for snap_name, data in fs_snaps.iteritems():
for snap_name, data in fs_snaps.items():
if not snap_name.startswith(self._snapshot_prefix):
continue
full_name = '{}@{}'.format(fs_name, snap_name)
Expand All @@ -226,7 +224,7 @@ def _snapshots(self):
return self._build_snapshots(self._fs_name)

def list(self):
return self._snapshots.values()
return list(self._snapshots.values())

def get_latest(self):
if len(self._snapshots) == 0:
Expand All @@ -235,7 +233,7 @@ def get_latest(self):
'Nothing to backup for filesystem "{}". Are you sure '
'SNAPSHOT_PREFIX="{}" is correct?'.format(
cfg.get('FILESYSTEM'), cfg.get('SNAPSHOT_PREFIX')))
return self._snapshots.values()[-1]
return list(self._snapshots.values())[-1]

def get(self, name):
return self._snapshots.get(name)
Expand Down
2 changes: 0 additions & 2 deletions z3/ssh_sync.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import print_function

import argparse
import subprocess
import sys
Expand Down