Skip to content

Commit

Permalink
use key_may_exist() from rocksdict (#479)
Browse files Browse the repository at this point in the history
* use key_may_exist() from rocksdict

* cleanup tests and lint

* remove raw_mode=True kwarg from ReadOptions

* set use_rocksdict lowercase
  • Loading branch information
wbarnha authored Mar 22, 2023
1 parent d19bd4a commit 811e032
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 98 deletions.
38 changes: 12 additions & 26 deletions faust/stores/rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def open(self, path: Path, *, read_only: bool = False) -> DB:
[rocksdict.DBPath(str(path), self.target_file_size_base)]
)
db = DB(str(path), options=self.as_options())
db.set_read_options(rocksdict.ReadOptions(raw_mode=True))
db.set_read_options(rocksdict.ReadOptions())
return db
else:
return rocksdb.DB(str(path), self.as_options(), read_only=read_only)
Expand Down Expand Up @@ -241,11 +241,11 @@ def __init__(

self.driver = self.options.pop("driver", driver)
if self.driver == "rocksdict":
self.USE_ROCKSDICT = True
self.use_rocksdict = True
elif self.driver == "python-rocksdb":
self.USE_ROCKSDICT = False
self.use_rocksdict = False
else:
self.USE_ROCKSDICT = USE_ROCKSDICT
self.use_rocksdict = USE_ROCKSDICT

self.rocksdb_options = RocksDBOptions(**self.options)
if key_index_size is None:
Expand Down Expand Up @@ -413,7 +413,7 @@ def apply_changelog_batch(
of a changelog event.
"""
batches: DefaultDict[int, WriteBatch]
if self.USE_ROCKSDICT:
if self.use_rocksdict:
batches = defaultdict(lambda: WriteBatch(raw_mode=True))
else:
batches = defaultdict(rocksdb.WriteBatch)
Expand All @@ -424,12 +424,12 @@ def apply_changelog_batch(
offset if tp not in tp_offsets else max(offset, tp_offsets[tp])
)
msg = event.message
if self.USE_ROCKSDICT:
if self.use_rocksdict:
msg.key = msg.key.encode()
if msg.value is None:
batches[msg.partition].delete(msg.key)
else:
if self.USE_ROCKSDICT:
if self.use_rocksdict:
msg.value = msg.value.encode()
batches[msg.partition].put(msg.key, msg.value)

Expand Down Expand Up @@ -494,18 +494,11 @@ def _get_bucket_for_key(self, key: bytes) -> Optional[_DBValueTuple]:
dbs = cast(Iterable[PartitionDB], self._dbs.items())

for partition, db in dbs:
if self.USE_ROCKSDICT:
# TODO: Remove this once key_may_exist is added
if db.key_may_exist(key)[0]:
value = db.get(key)
if value is not None:
self._key_index[key] = partition
return _DBValueTuple(db, value)
else:
if db.key_may_exist(key)[0]:
value = db.get(key)
if value is not None:
self._key_index[key] = partition
return _DBValueTuple(db, value)
return None

def _del(self, key: bytes) -> None:
Expand Down Expand Up @@ -629,15 +622,8 @@ def _contains(self, key: bytes) -> bool:
else:
for db in self._dbs_for_key(key):
# bloom filter: false positives possible, but not false negatives
if self.USE_ROCKSDICT:
# TODO: Remove once key_may_exist is added
if db.get(key) is not None:
return True
else:
return False
else:
if db.key_may_exist(key)[0] and db.get(key) is not None:
return True
if db.key_may_exist(key)[0] and db.get(key) is not None:
return True
return False

def _dbs_for_key(self, key: bytes) -> Iterable[DB]:
Expand All @@ -662,7 +648,7 @@ def _size(self) -> int:
return sum(self._size1(db) for db in self._dbs_for_actives())

def _visible_keys(self, db: DB) -> Iterator[bytes]:
if self.USE_ROCKSDICT:
if self.use_rocksdict:
it = db.keys()
iter = db.iter()
iter.seek_to_first()
Expand All @@ -674,7 +660,7 @@ def _visible_keys(self, db: DB) -> Iterator[bytes]:
yield key

def _visible_items(self, db: DB) -> Iterator[Tuple[bytes, bytes]]:
if self.USE_ROCKSDICT:
if self.use_rocksdict:
it = db.items()
else:
it = db.iteritems() # noqa: B301
Expand Down
2 changes: 1 addition & 1 deletion requirements/extras/rocksdict.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rocksdict>=0.3.8
rocksdict==0.3.9
71 changes: 0 additions & 71 deletions tests/unit/stores/test_rocksdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,46 +757,6 @@ class Test_Store_Rocksdict(Test_Store_RocksDB):
def store(self, *, app, rocks, table):
return Store("rocksdb://", app, table, driver="rocksdict")

def test__get__has_event(self, *, store, current_event):
partition = 1
message = Mock(name="message")
message.partition.return_value = partition

current_event.return_value = message

db = Mock(name="db")
store._db_for_partition = Mock("_db_for_partition")
store._db_for_partition.return_value = db
db.get.return_value = b"value"
db.__getitem__ = Mock()
db.__getitem__.return_value = b"value"
store.table = Mock(name="table")
store.table.is_global = False
store.table.synchronize_all_active_partitions = False
store.table.use_partitioner = False

assert store._get(b"key") == b"value"

db.get.return_value = None
db.__getitem__ = Mock()
db.__getitem__.return_value = None
assert store._get(b"key2") is None

@pytest.mark.skip("key_may_exist not available in rocksdict yet")
def test_get_bucket_for_key__is_in_index(self, *, store):
store._key_index[b"key"] = 30
db = store._dbs[30] = Mock(name="db-p30")

db.key_may_exist.return_value = [False]
assert store._get_bucket_for_key(b"key") is None

db.key_may_exist.return_value = [True]
db.get.return_value = None
assert store._get_bucket_for_key(b"key") is None

db.get.return_value = b"value"
assert store._get_bucket_for_key(b"key") == (db, b"value")

def test__iteritems(self, *, store):
dbs = self._setup_items(
db1=[
Expand All @@ -823,37 +783,6 @@ def test__iteritems(self, *, store):
# iteritems not available in rocksdict yet
db.items.assert_called_once_with()

def new_db(self, name, exists=False):
db = Mock(name=name)
db.key_may_exist.return_value = [exists]
db.get.return_value = name
return db

@pytest.mark.skip("key_may_exist not available in rocksdict yet")
def test_get_bucket_for_key__not_in_index(self, *, store):
dbs = {
1: self.new_db(name="db1"),
2: self.new_db(name="db2"),
3: self.new_db(name="db3", exists=True),
4: self.new_db(name="db4", exists=True),
}
store._dbs.update(dbs)

assert store._get_bucket_for_key(b"key") == (dbs[3], "db3")

@pytest.mark.skip("key_may_exist not available in rocksdict yet")
def test__contains(self, *, store):
db1 = self.new_db("db1", exists=False)
db2 = self.new_db("db2", exists=True)
dbs = {b"key": [db1, db2]}
store._dbs_for_key = Mock(side_effect=dbs.get)

db2.get.return_value = None
assert not store._contains(b"key")

db2.get.return_value = b"value"
assert store._contains(b"key")

def test__iterkeys(self, *, store):
dbs = self._setup_keys(
db1=[
Expand Down

0 comments on commit 811e032

Please sign in to comment.