Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18960.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug in the database function for fetching state deltas that could result in unnecessarily long query times.
2 changes: 1 addition & 1 deletion synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ async def get_current_state_deltas(
# https://github.com/matrix-org/synapse/issues/13008

return await self.stores.main.get_partial_current_state_deltas(
prev_stream_id, max_stream_id
prev_stream_id, max_stream_id, limit=100
)

@trace
Expand Down
97 changes: 64 additions & 33 deletions synapse/storage/databases/main/state_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,41 @@ def __init__(
)

async def get_partial_current_state_deltas(
self, prev_stream_id: int, max_stream_id: int
self, prev_stream_id: int, max_stream_id: int, limit: int = 100
) -> tuple[int, list[StateDelta]]:
"""Fetch a list of room state changes since the given stream id
"""Fetch a list of room state changes since the given stream id.

This may be the partial state if we're lazy joining the room.

This method takes care to handle state deltas that share the same
`stream_id`. That can happen when persisting state in a batch,
potentially as the result of state resolution (both adding new state and
undo'ing previous state).

State deltas are grouped by `stream_id`. When hitting the given `limit`
would return only part of a "group" of state deltas, that entire group
is omitted. Thus, this function may return *up to* `limit` state deltas,
or slightly more when a single group itself exceeds `limit`.

Args:
prev_stream_id: point to get changes since (exclusive)
max_stream_id: the point that we know has been correctly persisted
- ie, an upper limit to return changes from.
limit: the maximum number of rows to return.

Returns:
A tuple consisting of:
- the stream id which these results go up to
- list of current_state_delta_stream rows. If it is empty, we are
up to date.

A maximum of 100 rows will be returned.
"""
prev_stream_id = int(prev_stream_id)

if limit <= 0:
raise ValueError(
"Invalid `limit` passed to `get_partial_current_state_deltas"
)

# check we're not going backwards
assert prev_stream_id <= max_stream_id, (
f"New stream id {max_stream_id} is smaller than prev stream id {prev_stream_id}"
Expand All @@ -115,45 +129,62 @@ async def get_partial_current_state_deltas(
def get_current_state_deltas_txn(
txn: LoggingTransaction,
) -> tuple[int, list[StateDelta]]:
# First we calculate the max stream id that will give us less than
# N results.
# We arbitrarily limit to 100 stream_id entries to ensure we don't
# select toooo many.
sql = """
SELECT stream_id, count(*)
# First we group state deltas by `stream_id` and calculate which
# groups can be returned without exceeding the provided `limit`.
sql_grouped = """
SELECT stream_id, COUNT(*) AS c
FROM current_state_delta_stream
WHERE stream_id > ? AND stream_id <= ?
GROUP BY stream_id
ORDER BY stream_id ASC
LIMIT 100
ORDER BY stream_id
LIMIT ?
"""
txn.execute(sql, (prev_stream_id, max_stream_id))

total = 0

for stream_id, count in txn:
total += count
if total > 100:
# We arbitrarily limit to 100 entries to ensure we don't
# select toooo many.
logger.debug(
"Clipping current_state_delta_stream rows to stream_id %i",
stream_id,
)
clipped_stream_id = stream_id
group_limit = limit + 1
txn.execute(sql_grouped, (prev_stream_id, max_stream_id, group_limit))
grouped_rows = txn.fetchall()

if not grouped_rows:
# Nothing to return in the range; we are up to date through max_stream_id.
return max_stream_id, []

# Always retrieve the first group, at the bare minimum. This ensures the
# caller always makes progress, even if a single group exceeds `limit`.
fetch_upto_stream_id, included_rows = grouped_rows[0]

# Determine which other groups we can retrieve at the same time,
# without blowing the budget.
included_all_groups = True
for stream_id, count in grouped_rows[1:]:
if included_rows + count > limit:
included_all_groups = False
break
else:
# if there's no problem, we may as well go right up to the max_stream_id
clipped_stream_id = max_stream_id
included_rows += count
fetch_upto_stream_id = stream_id

# If we retrieved fewer groups than the limit *and* we didn't hit the
# `LIMIT ?` cap on the grouping query, we know we've caught up with
# the stream.
caught_up_with_stream = (
included_all_groups and len(grouped_rows) < group_limit
)

# At this point we should have advanced, or bailed out early above.
assert fetch_upto_stream_id != prev_stream_id

# Now actually get the deltas
sql = """
# 2) Fetch the actual rows for only the included stream_id groups.
sql_rows = """
SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
FROM current_state_delta_stream
WHERE ? < stream_id AND stream_id <= ?
ORDER BY stream_id ASC
"""
txn.execute(sql, (prev_stream_id, clipped_stream_id))
txn.execute(sql_rows, (prev_stream_id, fetch_upto_stream_id))
rows = txn.fetchall()

clipped_stream_id = (
max_stream_id if caught_up_with_stream else fetch_upto_stream_id
)

return clipped_stream_id, [
StateDelta(
stream_id=row[0],
Expand All @@ -163,7 +194,7 @@ def get_current_state_deltas_txn(
event_id=row[4],
prev_event_id=row[5],
)
for row in txn.fetchall()
for row in rows
]

return await self.db_pool.runInteraction(
Expand Down
Loading
Loading