forked from redis-collections/redis-collections
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase.py
239 lines (193 loc) · 7.71 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
"""
base
~~~~
"""
import abc
from decimal import Decimal
from fractions import Fraction
import pickle
import uuid
import redis
NUMERIC_TYPES = (int,) + (float, Decimal, Fraction, complex)
class RedisCollection(metaclass=abc.ABCMeta):
"""Abstract class providing backend functionality for all the other
Redis collections.
"""
not_impl_msg = ('Cannot be implemented efficiently or atomically '
'due to limitations in Redis command set.')
@abc.abstractmethod
def __init__(
self, redis=None, key=None, pickle_protocol=pickle.HIGHEST_PROTOCOL
):
"""
:param data: Initial data.
:param redis: Redis client instance. If not provided, a new Redis
connection is created.
:type redis: :class:`redis.StrictRedis`
:param key: The key at which the collection will be stored in Redis.
Collections with the same key point to the same data.
If not provided a random key is generated.
:type key: str
:param pickle_protocol: The version number of the pickle protocol to
use. The default is the highest version
supported by the current Python process.
:type key: str
"""
self.redis = self._create_redis() if redis is None else redis
self._redis_version = None # Determined if needed and cached
self.key = key or self._create_key()
self.pickle_protocol = pickle_protocol
def _create_redis(self):
"""
Creates a new Redis connection when none is specified during
initialization.
:rtype: :class:`redis.StrictRedis`
"""
return redis.StrictRedis()
def _create_key(self):
"""
Creates a random Redis key for storing this collection's data.
:rtype: string
.. note::
:func:`uuid.uuid4` is used. If you are not satisfied with its
`collision
probability <http://stackoverflow.com/a/786541/325365>`_,
make your own implementation by subclassing and overriding this
method.
"""
return uuid.uuid4().hex
@abc.abstractmethod
def _data(self, pipe=None):
"""Helper for getting the collection's data within a transaction.
:param pipe: Redis pipe in case creation is performed as a part
of transaction.
:type pipe: :class:`redis.client.StrictPipeline` or
:class:`redis.client.StrictRedis`
"""
def _pickle(self, data):
"""Converts given data to a bytes string.
:param data: Data to be serialized.
:type data: anything serializable
:rtype: bytes
"""
return pickle.dumps(data, protocol=self.pickle_protocol)
def _pickle_3(self, data):
# Several numeric types are equal, have the same hash, but nonetheless
# pickle to different byte strings. This method reduces them down to
# integers to help match with Python's behavior.
# len({1.0, 1, complex(1, 0)}) == 1
if isinstance(data, complex):
int_data = int(data.real)
if data == int_data:
data = int_data
elif isinstance(data, NUMERIC_TYPES):
int_data = int(data)
if data == int_data:
data = int_data
return pickle.dumps(data, protocol=self.pickle_protocol)
def _unpickle(self, pickled_data):
"""Convert *pickled_data* to a Python object and return it.
:param pickled_data: Serialized data.
:type pickled_data: bytes
:rtype: anything serializable
"""
return pickle.loads(pickled_data) if pickled_data else None
def _clear(self, pipe=None):
"""Helper for clear operations.
:param pipe: Redis pipe in case update is performed as a part
of transaction.
:type pipe: :class:`redis.client.StrictPipeline` or
:class:`redis.client.StrictRedis`
"""
redis = self.redis if pipe is None else pipe
redis.delete(self.key)
@property
def redis_version(self):
# Set the Redis version if it's not already set.
if self._redis_version is None:
self._redis_version = tuple(
int(x) for x in self.redis.info()['redis_version'].split('.')
)
return self._redis_version
def _same_redis(self, other, cls=None):
cls = cls or self.__class__
if not isinstance(other, cls):
return False
self_kwargs = self.redis.connection_pool.connection_kwargs
other_kwargs = other.redis.connection_pool.connection_kwargs
return (
self_kwargs.get('host') == other_kwargs.get('host') and
self_kwargs.get('port') == other_kwargs.get('port') and
self_kwargs.get('path') == other_kwargs.get('path') and
self_kwargs.get('db', 0) == other_kwargs.get('db', 0)
)
def _normalize_index(self, index, pipe=None):
"""Convert negative indexes into their positive equivalents."""
pipe = self.redis if pipe is None else pipe
len_self = self.__len__(pipe)
positive_index = index if index >= 0 else len_self + index
return len_self, positive_index
def _normalize_slice(self, index, pipe=None):
"""Given a :obj:`slice` *index*, return a 4-tuple
``(start, stop, step, fowrward)``. The first three items can be used
with the ``range`` function to retrieve the values associated with the
slice; the last item indicates the direction.
"""
if index.step == 0:
raise ValueError
pipe = self.redis if pipe is None else pipe
len_self = self.__len__(pipe)
step = index.step or 1
forward = step > 0
step = abs(step)
if index.start is None:
start = 0 if forward else len_self - 1
elif index.start < 0:
start = max(len_self + index.start, 0)
else:
start = min(index.start, len_self)
if index.stop is None:
stop = len_self if forward else -1
elif index.stop < 0:
stop = max(len_self + index.stop, 0)
else:
stop = min(index.stop, len_self)
if not forward:
start, stop = min(stop + 1, len_self), min(start + 1, len_self)
return start, stop, step, forward, len_self
def _transaction(self, fn, *extra_keys):
"""Helper simplifying code within watched transaction.
Takes *fn*, function treated as a transaction. Returns whatever
*fn* returns. ``self.key`` is watched. *fn* takes *pipe* as the
only argument.
:param fn: Closure treated as a transaction.
:type fn: function *fn(pipe)*
:param extra_keys: Optional list of additional keys to watch.
:type extra_keys: list
:rtype: whatever *fn* returns
"""
results = []
def trans(pipe):
results.append(fn(pipe))
self.redis.transaction(trans, self.key, *extra_keys)
return results[0]
def __enter__(self):
self.writeback = True
return self
def __exit__(self, exc_type, exc_value, traceback):
self.sync()
def sync(self):
pass
@abc.abstractmethod
def _repr_data(self):
"""
Abstract method for subclasses to implement.
Return a string appropriate for displaying the contents of the
collection. Called by __repr__.
"""
def __repr__(self):
cls_name = self.__class__.__name__
data = self._repr_data()
return '<redis_collections.{} at {} {}>'.format(
cls_name, self.key, data
)