Skip to content

Commit b0c2a7f

Browse files
committed
fix: improve file cache of regions in concurrent scenarios
1 parent fca7849 commit b0c2a7f

File tree

4 files changed

+210
-54
lines changed

4 files changed

+210
-54
lines changed

qiniu/compat.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@
1414
# because of u'...' Unicode literals.
1515
import json # noqa
1616

17+
# -------
18+
# Platform
19+
# -------
20+
21+
is_windows = sys.platform == 'win32'
22+
is_linux = sys.platform == 'linux'
23+
is_macos = sys.platform == 'darwin'
1724

1825
# -------
1926
# Pythons

qiniu/http/regions_provider.py

Lines changed: 162 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@
66
import tempfile
77
import os
88
import shutil
9+
import threading
910

10-
from qiniu.compat import json, b as to_bytes
11+
from qiniu.compat import json, b as to_bytes, is_windows, is_linux, is_macos
1112
from qiniu.utils import io_md5, dt2ts
1213

1314
from .endpoint import Endpoint
@@ -24,7 +25,7 @@ def __iter__(self):
2425
"""
2526
Returns
2627
-------
27-
list[Region]
28+
Generator[Region, None, None]
2829
"""
2930

3031

@@ -137,27 +138,112 @@ def __init__(self, message):
137138
super(FileAlreadyLocked, self).__init__(message)
138139

139140

140-
class _FileLocker:
141-
def __init__(self, origin_file):
142-
self._origin_file = origin_file
141+
_file_threading_lockers_lock = threading.Lock()
142+
_file_threading_lockers = {}
143+
144+
145+
class _FileThreadingLocker:
146+
def __init__(self, fd):
147+
self._fd = fd
143148

144149
def __enter__(self):
145-
if os.access(self.lock_file_path, os.R_OK | os.W_OK):
146-
raise FileAlreadyLocked('File {0} already locked'.format(self._origin_file))
147-
with open(self.lock_file_path, 'w'):
148-
pass
150+
with _file_threading_lockers_lock:
151+
global _file_threading_lockers
152+
threading_lock = _file_threading_lockers.get(self._file_path, threading.Lock())
153+
# Could use keyword style `acquire(blocking=False)` when min version of python update to >= 3
154+
if not threading_lock.acquire(False):
155+
raise FileAlreadyLocked('File {0} already locked'.format(self._file_path))
156+
_file_threading_lockers[self._file_path] = threading_lock
149157

150158
def __exit__(self, exc_type, exc_val, exc_tb):
151-
os.remove(self.lock_file_path)
159+
with _file_threading_lockers_lock:
160+
global _file_threading_lockers
161+
threading_lock = _file_threading_lockers.get(self._file_path)
162+
if threading_lock and threading_lock.locked():
163+
threading_lock.release()
164+
del _file_threading_lockers[self._file_path]
152165

153166
@property
154-
def lock_file_path(self):
155-
"""
156-
Returns
157-
-------
158-
str
159-
"""
160-
return self._origin_file + '.lock'
167+
def _file_path(self):
168+
return self._fd.name
169+
170+
171+
if is_linux or is_macos:
172+
import fcntl
173+
174+
# Use subclass of _FileThreadingLocker when min version of python update to >= 3
175+
class _FileLocker:
176+
def __init__(self, fd):
177+
self._fd = fd
178+
179+
def __enter__(self):
180+
try:
181+
fcntl.lockf(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
182+
except IOError:
183+
# Use `raise ... from ...` when min version of python update to >= 3
184+
raise FileAlreadyLocked('File {0} already locked'.format(self._file_path))
185+
186+
def __exit__(self, exc_type, exc_val, exc_tb):
187+
fcntl.lockf(self._fd, fcntl.LOCK_UN)
188+
189+
@property
190+
def _file_path(self):
191+
return self._fd.name
192+
193+
elif is_windows:
194+
import msvcrt
195+
196+
197+
class _FileLocker:
198+
def __init__(self, fd):
199+
self._fd = fd
200+
201+
def __enter__(self):
202+
try:
203+
# TODO(lihs): set `_nbyte` bigger?
204+
msvcrt.locking(self._fd, msvcrt.LK_LOCK | msvcrt.LK_NBLCK, 1)
205+
except OSError:
206+
raise FileAlreadyLocked('File {0} already locked'.format(self._file_path))
207+
208+
def __exit__(self, exc_type, exc_val, exc_tb):
209+
msvcrt.locking(self._fd, msvcrt.LK_UNLCK, 1)
210+
211+
@property
212+
def _file_path(self):
213+
return self._fd.name
214+
215+
else:
216+
class _FileLocker:
217+
def __init__(self, fd):
218+
self._fd = fd
219+
220+
def __enter__(self):
221+
try:
222+
# Atomic file creation
223+
open_flags = os.O_EXCL | os.O_RDWR | os.O_CREAT
224+
fd = os.open(self.lock_file_path, open_flags)
225+
os.close(fd)
226+
except FileExistsError:
227+
raise FileAlreadyLocked('File {0} already locked'.format(self._file_path))
228+
229+
def __exit__(self, exc_type, exc_val, exc_tb):
230+
try:
231+
os.remove(self.lock_file_path)
232+
except FileNotFoundError:
233+
pass
234+
235+
@property
236+
def _file_path(self):
237+
return self._fd.name
238+
239+
@property
240+
def lock_file_path(self):
241+
"""
242+
Returns
243+
-------
244+
str
245+
"""
246+
return self._file_path + '.lock'
161247

162248

163249
# use dataclass instead namedtuple if min version of python update to 3.7
@@ -168,7 +254,8 @@ def lock_file_path(self):
168254
'persist_path',
169255
'last_shrink_at',
170256
'shrink_interval',
171-
'should_shrink_expired_regions'
257+
'should_shrink_expired_regions',
258+
'memo_cache_lock'
172259
]
173260
)
174261

@@ -177,11 +264,12 @@ def lock_file_path(self):
177264
memo_cache={},
178265
persist_path=os.path.join(
179266
tempfile.gettempdir(),
180-
'qn-regions-cache.jsonl'
267+
'qn-py-sdk-regions-cache.jsonl'
181268
),
182269
last_shrink_at=datetime.datetime.fromtimestamp(0),
183-
shrink_interval=datetime.timedelta(-1), # useless for now
184-
should_shrink_expired_regions=False
270+
shrink_interval=datetime.timedelta(days=1),
271+
should_shrink_expired_regions=False,
272+
memo_cache_lock=threading.Lock()
185273
)
186274

187275

@@ -323,7 +411,7 @@ def _parse_persisted_regions(persisted_data):
323411
return parsed_data.get('cacheKey'), regions
324412

325413

326-
def _walk_persist_cache_file(persist_path, ignore_parse_error=False):
414+
def _walk_persist_cache_file(persist_path, ignore_parse_error=True):
327415
"""
328416
Parameters
329417
----------
@@ -394,23 +482,24 @@ def __init__(
394482
self.base_regions_provider = base_regions_provider
395483

396484
persist_path = kwargs.get('persist_path', None)
485+
last_shrink_at = datetime.datetime.fromtimestamp(0)
397486
if persist_path is None:
398487
persist_path = _global_cache_scope.persist_path
488+
last_shrink_at = _global_cache_scope.last_shrink_at
399489

400490
shrink_interval = kwargs.get('shrink_interval', None)
401491
if shrink_interval is None:
402-
shrink_interval = datetime.timedelta(days=1)
492+
shrink_interval = _global_cache_scope.shrink_interval
403493

404494
should_shrink_expired_regions = kwargs.get('should_shrink_expired_regions', None)
405495
if should_shrink_expired_regions is None:
406-
should_shrink_expired_regions = False
496+
should_shrink_expired_regions = _global_cache_scope.should_shrink_expired_regions
407497

408-
self._cache_scope = CacheScope(
409-
memo_cache=_global_cache_scope.memo_cache,
498+
self._cache_scope = _global_cache_scope._replace(
410499
persist_path=persist_path,
411-
last_shrink_at=datetime.datetime.fromtimestamp(0),
500+
last_shrink_at=last_shrink_at,
412501
shrink_interval=shrink_interval,
413-
should_shrink_expired_regions=should_shrink_expired_regions,
502+
should_shrink_expired_regions=should_shrink_expired_regions
414503
)
415504

416505
def __iter__(self):
@@ -423,7 +512,7 @@ def __iter__(self):
423512
self.__get_regions_from_base_provider
424513
]
425514

426-
regions = None
515+
regions = []
427516
for get_regions in get_regions_fns:
428517
regions = get_regions(fallback=regions)
429518
if regions and all(r.is_live for r in regions):
@@ -439,7 +528,8 @@ def set_regions(self, regions):
439528
----------
440529
regions: list[Region]
441530
"""
442-
self._cache_scope.memo_cache[self.cache_key] = regions
531+
with self._cache_scope.memo_cache_lock:
532+
self._cache_scope.memo_cache[self.cache_key] = regions
443533

444534
if not self._cache_scope.persist_path:
445535
return
@@ -469,8 +559,11 @@ def persist_path(self, value):
469559
----------
470560
value: str
471561
"""
562+
if value == self._cache_scope.persist_path:
563+
return
472564
self._cache_scope = self._cache_scope._replace(
473-
persist_path=value
565+
persist_path=value,
566+
last_shrink_at=datetime.datetime.fromtimestamp(0)
474567
)
475568

476569
@property
@@ -586,7 +679,6 @@ def __get_regions_from_base_provider(self, fallback=None):
586679
def __flush_file_cache_to_memo(self):
587680
for cache_key, regions in _walk_persist_cache_file(
588681
persist_path=self._cache_scope.persist_path
589-
# ignore_parse_error=True
590682
):
591683
if cache_key not in self._cache_scope.memo_cache:
592684
self._cache_scope.memo_cache[cache_key] = regions
@@ -609,12 +701,18 @@ def __should_shrink(self):
609701
def __shrink_cache(self):
610702
# shrink memory cache
611703
if self._cache_scope.should_shrink_expired_regions:
612-
kept_memo_cache = {}
613-
for k, regions in self._cache_scope.memo_cache.items():
614-
live_regions = [r for r in regions if r.is_live]
615-
if live_regions:
616-
kept_memo_cache[k] = live_regions
617-
self._cache_scope = self._cache_scope._replace(memo_cache=kept_memo_cache)
704+
memo_cache_old = self._cache_scope.memo_cache.copy()
705+
# Could use keyword style `acquire(blocking=False)` when min version of python update to >= 3
706+
if self._cache_scope.memo_cache_lock.acquire(False):
707+
try:
708+
for k, regions in memo_cache_old.items():
709+
live_regions = [r for r in regions if r.is_live]
710+
if live_regions:
711+
self._cache_scope.memo_cache[k] = live_regions
712+
else:
713+
del self._cache_scope.memo_cache[k]
714+
finally:
715+
self._cache_scope.memo_cache_lock.release()
618716

619717
# shrink file cache
620718
if not self._cache_scope.persist_path:
@@ -625,7 +723,7 @@ def __shrink_cache(self):
625723

626724
shrink_file_path = self._cache_scope.persist_path + '.shrink'
627725
try:
628-
with _FileLocker(shrink_file_path):
726+
with open(shrink_file_path, 'a') as f, _FileThreadingLocker(f), _FileLocker(f):
629727
# filter data
630728
shrunk_cache = {}
631729
for cache_key, regions in _walk_persist_cache_file(
@@ -646,25 +744,36 @@ def __shrink_cache(self):
646744
)
647745

648746
# write data
649-
with open(shrink_file_path, 'a') as f:
650-
for cache_key, regions in shrunk_cache.items():
651-
f.write(
652-
json.dumps(
653-
{
654-
'cacheKey': cache_key,
655-
'regions': [_persist_region(r) for r in regions]
656-
}
657-
) + os.linesep
658-
)
747+
for cache_key, regions in shrunk_cache.items():
748+
f.write(
749+
json.dumps(
750+
{
751+
'cacheKey': cache_key,
752+
'regions': [_persist_region(r) for r in regions]
753+
}
754+
) + os.linesep
755+
)
756+
757+
# make the cache file available for all users
758+
if is_linux or is_macos:
759+
os.chmod(shrink_file_path, 0o666)
659760

660761
# rename file
661762
shutil.move(shrink_file_path, self._cache_scope.persist_path)
763+
764+
# update last shrink time
765+
self._cache_scope = self._cache_scope._replace(
766+
last_shrink_at=datetime.datetime.now()
767+
)
768+
global _global_cache_scope
769+
if _global_cache_scope.persist_path == self._cache_scope.persist_path:
770+
_global_cache_scope = _global_cache_scope._replace(
771+
last_shrink_at=self._cache_scope.last_shrink_at
772+
)
773+
662774
except FileAlreadyLocked:
775+
# skip file shrink by another running
663776
pass
664-
finally:
665-
self._cache_scope = self._cache_scope._replace(
666-
last_shrink_at=datetime.datetime.now()
667-
)
668777

669778

670779
def get_default_regions_provider(

qiniu/http/response.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def need_retry(self):
5757
]):
5858
return False
5959
# https://developer.qiniu.com/fusion/kb/1352/the-http-request-return-a-status-code
60+
# https://developer.qiniu.com/kodo/3928/error-responses
6061
if self.status_code in [
6162
501, 509, 573, 579, 608, 612, 614, 616, 618, 630, 631, 632, 640, 701
6263
]:

0 commit comments

Comments
 (0)