Skip to content

Introduce stream SAC status instead of active flag #13672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ PARALLEL_CT_SET_2_B = clustering_recovery crashing_queues deprecated_features di
PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_file_handle_cache unit_log_management unit_operator_policy
PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator

PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_prop rabbit_fifo_v0 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_prop rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor
Expand Down
1 change: 1 addition & 0 deletions deps/rabbit/ct.test.spec
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
, rabbit_local_random_exchange_SUITE
, rabbit_msg_interceptor_SUITE
, rabbit_stream_coordinator_SUITE
, rabbit_stream_sac_coordinator_v4_SUITE
, rabbit_stream_sac_coordinator_SUITE
, rabbitmq_4_0_deprecations_SUITE
, rabbitmq_queues_cli_integration_SUITE
Expand Down
177 changes: 118 additions & 59 deletions deps/rabbit/src/rabbit_stream_coordinator.erl

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_stream_coordinator.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
listeners = #{} :: undefined | #{stream_id() =>
#{pid() := queue_ref()}},
single_active_consumer = undefined :: undefined |
rabbit_stream_sac_coordinator_v4:state() |
rabbit_stream_sac_coordinator:state(),
%% future extensibility
reserved_2}).
814 changes: 654 additions & 160 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Large diffs are not rendered by default.

20 changes: 15 additions & 5 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,28 @@
-type subscription_id() :: byte().
-type group_id() :: {vhost(), stream(), consumer_name()}.
-type owner() :: binary().
-type consumer_status() :: active | waiting | deactivating.
-type consumer_connectivity() :: connected | disconnected | forgotten.

-record(consumer,
{pid :: pid(),
subscription_id :: subscription_id(),
owner :: owner(), %% just a label
active :: boolean()}).
status :: {consumer_connectivity(), consumer_status()}}).
-record(group,
{consumers :: [#consumer{}], partition_index :: integer()}).
-record(rabbit_stream_sac_coordinator,
{groups :: #{group_id() => #group{}},
pids_groups ::
#{connection_pid() =>
#{group_id() => true}}, %% inner map acts as a set
{groups :: groups(),
pids_groups :: pids_groups(),
%% future extensibility
reserved_1,
reserved_2}).

-type group() :: #group{}.
-type groups() :: #{group_id() => group()}.
%% inner map acts as a set
-type pids_groups() :: #{connection_pid() => #{group_id() => true}}.

%% commands
-record(command_register_consumer,
{vhost :: vhost(),
Expand All @@ -56,3 +62,7 @@
-record(command_activate_consumer,
{vhost :: vhost(), stream :: stream(),
consumer_name :: consumer_name()}).
-record(command_connection_reconnected,
{pid :: connection_pid()}).
-record(command_purge_nodes,
{nodes :: [node()]}).
Loading
Loading