Skip to content

Commit fecb221

Browse files
authored
Add key-based partitioning to duplicate registries (#14654)
1 parent e4d0a0d commit fecb221

File tree

4 files changed

+1163
-985
lines changed

4 files changed

+1163
-985
lines changed

lib/elixir/lib/registry.ex

Lines changed: 119 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,15 @@ defmodule Registry do
187187
Note that the registry uses one ETS table plus two ETS tables per partition.
188188
"""
189189

190-
@keys [:unique, :duplicate]
190+
@keys [:unique, :duplicate, {:duplicate, :key}, {:duplicate, :pid}]
191191
@all_info -1
192192
@key_info -2
193193

194194
@typedoc "The registry identifier"
195195
@type registry :: atom
196196

197197
@typedoc "The type of the registry"
198-
@type keys :: :unique | :duplicate
198+
@type keys :: :unique | :duplicate | {:duplicate, :key} | {:duplicate, :pid}
199199

200200
@typedoc "The type of keys allowed on registration"
201201
@type key :: term
@@ -266,8 +266,8 @@ defmodule Registry do
266266
:undefined
267267
end
268268

269-
{kind, _, _} ->
270-
raise ArgumentError, ":via is not supported for #{kind} registries"
269+
{{:duplicate, _}, _, _} ->
270+
raise ArgumentError, ":via is not supported for duplicate registries"
271271
end
272272
end
273273

@@ -329,11 +329,24 @@ defmodule Registry do
329329
{Registry, keys: :unique, name: MyApp.Registry, partitions: System.schedulers_online()}
330330
], strategy: :one_for_one)
331331
332+
For `:duplicate` registries with many different keys (e.g., many topics with
333+
few subscribers each), you can optimize key-based lookups by partitioning by key:
334+
335+
Registry.start_link(
336+
keys: {:duplicate, :key},
337+
name: MyApp.TopicRegistry,
338+
partitions: System.schedulers_online()
339+
)
340+
341+
This allows key-based lookups to check only a single partition instead of
342+
searching all partitions. Use the default `:pid` partitioning when you have
343+
fewer keys with many entries each (e.g., one topic with many subscribers).
344+
332345
## Options
333346
334347
The registry requires the following keys:
335348
336-
* `:keys` - chooses if keys are `:unique` or `:duplicate`
349+
* `:keys` - chooses if keys are `:unique`, `:duplicate`, `{:duplicate, :key}`, or `{:duplicate, :pid}`
337350
* `:name` - the name of the registry and its tables
338351
339352
The following keys are optional:
@@ -345,16 +358,40 @@ defmodule Registry do
345358
crashes. Messages sent to listeners are of type `t:listener_message/0`.
346359
* `:meta` - a keyword list of metadata to be attached to the registry.
347360
361+
For `:duplicate` registries, you can specify the partitioning strategy
362+
directly in the `:keys` option:
363+
364+
* `:duplicate` or `{:duplicate, :pid}` - Use `:pid` partitioning (default)
365+
when you have keys with many entries (e.g., one topic with many subscribers).
366+
This is the traditional behavior and groups all entries from the same process together.
367+
368+
* `{:duplicate, :key}` - Use `:key` partitioning when entries are spread across
369+
many different keys (e.g., many topics with few subscribers each). This makes
370+
key-based lookups more efficient as they only need to check a single partition
371+
instead of all partitions.
372+
348373
"""
349374
@doc since: "1.5.0"
350375
@spec start_link([start_option]) :: {:ok, pid} | {:error, term}
351376
def start_link(options) do
352377
keys = Keyword.get(options, :keys)
353378

354-
if keys not in @keys do
355-
raise ArgumentError,
356-
"expected :keys to be given and be one of :unique or :duplicate, got: #{inspect(keys)}"
357-
end
379+
# Validate and normalize keys format
380+
kind =
381+
case keys do
382+
{:duplicate, partition_strategy} when partition_strategy in [:key, :pid] ->
383+
{:duplicate, partition_strategy}
384+
385+
:unique ->
386+
:unique
387+
388+
:duplicate ->
389+
{:duplicate, :pid}
390+
391+
_ ->
392+
raise ArgumentError,
393+
"expected :keys to be given and be one of :unique, :duplicate, {:duplicate, :key}, or {:duplicate, :pid}, got: #{inspect(keys)}"
394+
end
358395

359396
name =
360397
case Keyword.fetch(options, :name) do
@@ -397,11 +434,18 @@ defmodule Registry do
397434

398435
# The @info format must be kept in sync with Registry.Partition optimization.
399436
entries = [
400-
{@all_info, {keys, partitions, nil, nil, listeners}},
401-
{@key_info, {keys, partitions, nil}} | meta
437+
{@all_info, {kind, partitions, nil, nil, listeners}},
438+
{@key_info, {kind, partitions, nil}} | meta
402439
]
403440

404-
Registry.Supervisor.start_link(keys, name, partitions, listeners, entries, compressed)
441+
Registry.Supervisor.start_link(
442+
kind,
443+
name,
444+
partitions,
445+
listeners,
446+
entries,
447+
compressed
448+
)
405449
end
406450

407451
@doc false
@@ -468,7 +512,8 @@ defmodule Registry do
468512
end
469513

470514
{kind, _, _} ->
471-
raise ArgumentError, "Registry.update_value/3 is not supported for #{kind} registries"
515+
raise ArgumentError,
516+
"Registry.update_value/3 is not supported for #{inspect(kind)} registries"
472517
end
473518
end
474519

@@ -508,12 +553,12 @@ defmodule Registry do
508553
|> List.wrap()
509554
|> apply_non_empty_to_mfa_or_fun(mfa_or_fun)
510555

511-
{:duplicate, 1, key_ets} ->
556+
{{:duplicate, _}, 1, key_ets} ->
512557
key_ets
513558
|> safe_lookup_second(key)
514559
|> apply_non_empty_to_mfa_or_fun(mfa_or_fun)
515560

516-
{:duplicate, partitions, _} ->
561+
{{:duplicate, _}, partitions, _} ->
517562
if Keyword.get(opts, :parallel, false) do
518563
registry
519564
|> dispatch_parallel(key, mfa_or_fun, partitions)
@@ -625,10 +670,14 @@ defmodule Registry do
625670
[]
626671
end
627672

628-
{:duplicate, 1, key_ets} ->
673+
{{:duplicate, _}, 1, key_ets} ->
629674
safe_lookup_second(key_ets, key)
630675

631-
{:duplicate, partitions, _key_ets} ->
676+
{{:duplicate, :key}, partitions, _key_ets} ->
677+
partition = hash(key, partitions)
678+
safe_lookup_second(key_ets!(registry, partition), key)
679+
680+
{{:duplicate, :pid}, partitions, _key_ets} ->
632681
for partition <- 0..(partitions - 1),
633682
pair <- safe_lookup_second(key_ets!(registry, partition), key),
634683
do: pair
@@ -749,10 +798,10 @@ defmodule Registry do
749798
key_ets = key_ets || key_ets!(registry, key, partitions)
750799
:ets.select(key_ets, spec)
751800

752-
{:duplicate, 1, key_ets} ->
801+
{{:duplicate, _}, 1, key_ets} ->
753802
:ets.select(key_ets, spec)
754803

755-
{:duplicate, partitions, _key_ets} ->
804+
{{:duplicate, _}, partitions, _key_ets} ->
756805
for partition <- 0..(partitions - 1),
757806
pair <- :ets.select(key_ets!(registry, partition), spec),
758807
do: pair
@@ -795,16 +844,35 @@ defmodule Registry do
795844
@spec keys(registry, pid) :: [key]
796845
def keys(registry, pid) when is_atom(registry) and is_pid(pid) do
797846
{kind, partitions, _, pid_ets, _} = info!(registry)
798-
{_, pid_ets} = pid_ets || pid_ets!(registry, pid, partitions)
799847

800-
keys =
801-
try do
802-
spec = [{{pid, :"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}]
803-
:ets.select(pid_ets, spec)
804-
catch
805-
:error, :badarg -> []
848+
pid_etses =
849+
if pid_ets do
850+
{_, pid_ets} = pid_ets
851+
[pid_ets]
852+
else
853+
case kind do
854+
{:duplicate, :key} ->
855+
for partition <- 0..(partitions - 1) do
856+
{_, pid_ets} = pid_ets!(registry, partition)
857+
pid_ets
858+
end
859+
860+
_ ->
861+
{_, pid_ets} = pid_ets!(registry, pid, partitions)
862+
[pid_ets]
863+
end
806864
end
807865

866+
keys =
867+
Enum.flat_map(pid_etses, fn pid_ets ->
868+
try do
869+
spec = [{{pid, :"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}]
870+
:ets.select(pid_ets, spec)
871+
catch
872+
:error, :badarg -> []
873+
end
874+
end)
875+
808876
# Handle the possibility of fake keys
809877
keys = gather_keys(keys, [], false)
810878

@@ -882,8 +950,17 @@ defmodule Registry do
882950
[]
883951
end
884952

885-
{:duplicate, partitions, key_ets} ->
886-
key_ets = key_ets || key_ets!(registry, pid, partitions)
953+
{{:duplicate, _}, 1, key_ets} ->
954+
for {^pid, value} <- safe_lookup_second(key_ets, key), do: value
955+
956+
{{:duplicate, :key}, partitions, _key_ets} ->
957+
partition = hash(key, partitions)
958+
key_ets = key_ets!(registry, partition)
959+
for {^pid, value} <- safe_lookup_second(key_ets, key), do: value
960+
961+
{{:duplicate, :pid}, partitions, _key_ets} ->
962+
partition = hash(pid, partitions)
963+
key_ets = key_ets!(registry, partition)
887964
for {^pid, value} <- safe_lookup_second(key_ets, key), do: value
888965
end
889966
end
@@ -1121,7 +1198,7 @@ defmodule Registry do
11211198
end
11221199
end
11231200

1124-
defp register_key(:duplicate, key_ets, _key, entry) do
1201+
defp register_key({:duplicate, _}, key_ets, _key, entry) do
11251202
true = :ets.insert(key_ets, entry)
11261203
:ok
11271204
end
@@ -1339,10 +1416,10 @@ defmodule Registry do
13391416
key_ets = key_ets || key_ets!(registry, key, partitions)
13401417
:ets.select_count(key_ets, spec)
13411418

1342-
{:duplicate, 1, key_ets} ->
1419+
{{:duplicate, _}, 1, key_ets} ->
13431420
:ets.select_count(key_ets, spec)
13441421

1345-
{:duplicate, partitions, _key_ets} ->
1422+
{{:duplicate, _}, partitions, _key_ets} ->
13461423
Enum.sum_by(0..(partitions - 1), fn partition_index ->
13471424
:ets.select_count(key_ets!(registry, partition_index), spec)
13481425
end)
@@ -1512,7 +1589,12 @@ defmodule Registry do
15121589
{hash(key, partitions), hash(pid, partitions)}
15131590
end
15141591

1515-
defp partitions(:duplicate, _key, pid, partitions) do
1592+
defp partitions({:duplicate, :key}, key, _pid, partitions) do
1593+
partition = hash(key, partitions)
1594+
{partition, partition}
1595+
end
1596+
1597+
defp partitions({:duplicate, :pid}, _key, pid, partitions) do
15161598
partition = hash(pid, partitions)
15171599
{partition, partition}
15181600
end
@@ -1576,9 +1658,10 @@ defmodule Registry.Supervisor do
15761658
defp strategy_for_kind(:unique), do: :one_for_all
15771659

15781660
# Duplicate registries have both key and pid partitions hashed
1579-
# by pid. This means that, if a PID partition crashes, all of
1661+
# by key ({:duplicate, :key}) or pid ({:duplicate, :pid}).
1662+
# This means that, if a PID or key partition crashes, all of
15801663
# its associated entries are in its sibling table, so we crash one.
1581-
defp strategy_for_kind(:duplicate), do: :one_for_one
1664+
defp strategy_for_kind({:duplicate, _}), do: :one_for_one
15821665
end
15831666

15841667
defmodule Registry.Partition do
@@ -1633,6 +1716,7 @@ defmodule Registry.Partition do
16331716

16341717
def init({kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed}) do
16351718
Process.flag(:trap_exit, true)
1719+
16361720
key_ets = init_key_ets(kind, key_partition, compressed)
16371721
pid_ets = init_pid_ets(kind, pid_partition)
16381722

@@ -1659,7 +1743,7 @@ defmodule Registry.Partition do
16591743
:ets.new(key_partition, compression_opt(opts, compressed))
16601744
end
16611745

1662-
defp init_key_ets(:duplicate, key_partition, compressed) do
1746+
defp init_key_ets({:duplicate, _}, key_partition, compressed) do
16631747
opts = [:duplicate_bag, :public, read_concurrency: true, write_concurrency: true]
16641748
:ets.new(key_partition, compression_opt(opts, compressed))
16651749
end

0 commit comments

Comments
 (0)