Skip to content
154 changes: 119 additions & 35 deletions lib/elixir/lib/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ defmodule Registry do
Note that the registry uses one ETS table plus two ETS tables per partition.
"""

@keys [:unique, :duplicate]
@keys [:unique, :duplicate, {:duplicate, :key}, {:duplicate, :pid}]
@all_info -1
@key_info -2

@typedoc "The registry identifier"
@type registry :: atom

@typedoc "The type of the registry"
@type keys :: :unique | :duplicate
@type keys :: :unique | :duplicate | {:duplicate, :key} | {:duplicate, :pid}

@typedoc "The type of keys allowed on registration"
@type key :: term
Expand Down Expand Up @@ -266,8 +266,8 @@ defmodule Registry do
:undefined
end

{kind, _, _} ->
raise ArgumentError, ":via is not supported for #{kind} registries"
{{:duplicate, _}, _, _} ->
raise ArgumentError, ":via is not supported for duplicate registries"
end
end

Expand Down Expand Up @@ -329,11 +329,24 @@ defmodule Registry do
{Registry, keys: :unique, name: MyApp.Registry, partitions: System.schedulers_online()}
], strategy: :one_for_one)

For `:duplicate` registries with many different keys (e.g., many topics with
few subscribers each), you can optimize key-based lookups by partitioning by key:

Registry.start_link(
keys: {:duplicate, :key},
name: MyApp.TopicRegistry,
partitions: System.schedulers_online()
)

This allows key-based lookups to check only a single partition instead of
searching all partitions. Use the default `:pid` partitioning when you have
fewer keys with many entries each (e.g., one topic with many subscribers).

## Options

The registry requires the following keys:

* `:keys` - chooses if keys are `:unique` or `:duplicate`
* `:keys` - chooses if keys are `:unique`, `:duplicate`, `{:duplicate, :key}`, or `{:duplicate, :pid}`
* `:name` - the name of the registry and its tables

The following keys are optional:
Expand All @@ -345,16 +358,40 @@ defmodule Registry do
crashes. Messages sent to listeners are of type `t:listener_message/0`.
* `:meta` - a keyword list of metadata to be attached to the registry.

For `:duplicate` registries, you can specify the partitioning strategy
directly in the `:keys` option:

* `:duplicate` or `{:duplicate, :pid}` - Use `:pid` partitioning (default)
when you have keys with many entries (e.g., one topic with many subscribers).
This is the traditional behavior and groups all entries from the same process together.

* `{:duplicate, :key}` - Use `:key` partitioning when entries are spread across
many different keys (e.g., many topics with few subscribers each). This makes
key-based lookups more efficient as they only need to check a single partition
instead of all partitions.

"""
@doc since: "1.5.0"
@spec start_link([start_option]) :: {:ok, pid} | {:error, term}
def start_link(options) do
keys = Keyword.get(options, :keys)

if keys not in @keys do
raise ArgumentError,
"expected :keys to be given and be one of :unique or :duplicate, got: #{inspect(keys)}"
end
# Validate and normalize keys format
kind =
case keys do
{:duplicate, partition_strategy} when partition_strategy in [:key, :pid] ->
{:duplicate, partition_strategy}

:unique ->
:unique

:duplicate ->
{:duplicate, :pid}

_ ->
raise ArgumentError,
"expected :keys to be given and be one of :unique, :duplicate, {:duplicate, :key}, or {:duplicate, :pid}, got: #{inspect(keys)}"
end

name =
case Keyword.fetch(options, :name) do
Expand Down Expand Up @@ -397,11 +434,18 @@ defmodule Registry do

# The @info format must be kept in sync with Registry.Partition optimization.
entries = [
{@all_info, {keys, partitions, nil, nil, listeners}},
{@key_info, {keys, partitions, nil}} | meta
{@all_info, {kind, partitions, nil, nil, listeners}},
{@key_info, {kind, partitions, nil}} | meta
]

Registry.Supervisor.start_link(keys, name, partitions, listeners, entries, compressed)
Registry.Supervisor.start_link(
kind,
name,
partitions,
listeners,
entries,
compressed
)
end

@doc false
Expand Down Expand Up @@ -468,7 +512,8 @@ defmodule Registry do
end

{kind, _, _} ->
raise ArgumentError, "Registry.update_value/3 is not supported for #{kind} registries"
raise ArgumentError,
"Registry.update_value/3 is not supported for #{inspect(kind)} registries"
end
end

Expand Down Expand Up @@ -508,12 +553,12 @@ defmodule Registry do
|> List.wrap()
|> apply_non_empty_to_mfa_or_fun(mfa_or_fun)

{:duplicate, 1, key_ets} ->
{{:duplicate, _}, 1, key_ets} ->
key_ets
|> safe_lookup_second(key)
|> apply_non_empty_to_mfa_or_fun(mfa_or_fun)

{:duplicate, partitions, _} ->
{{:duplicate, _}, partitions, _} ->
if Keyword.get(opts, :parallel, false) do
registry
|> dispatch_parallel(key, mfa_or_fun, partitions)
Expand Down Expand Up @@ -625,10 +670,14 @@ defmodule Registry do
[]
end

{:duplicate, 1, key_ets} ->
{{:duplicate, _}, 1, key_ets} ->
safe_lookup_second(key_ets, key)

{:duplicate, partitions, _key_ets} ->
{{:duplicate, :key}, partitions, _key_ets} ->
partition = hash(key, partitions)
safe_lookup_second(key_ets!(registry, partition), key)

{{:duplicate, :pid}, partitions, _key_ets} ->
for partition <- 0..(partitions - 1),
pair <- safe_lookup_second(key_ets!(registry, partition), key),
do: pair
Expand Down Expand Up @@ -749,10 +798,10 @@ defmodule Registry do
key_ets = key_ets || key_ets!(registry, key, partitions)
:ets.select(key_ets, spec)

{:duplicate, 1, key_ets} ->
{{:duplicate, _}, 1, key_ets} ->
:ets.select(key_ets, spec)

{:duplicate, partitions, _key_ets} ->
{{:duplicate, _}, partitions, _key_ets} ->
for partition <- 0..(partitions - 1),
pair <- :ets.select(key_ets!(registry, partition), spec),
do: pair
Expand Down Expand Up @@ -795,16 +844,35 @@ defmodule Registry do
@spec keys(registry, pid) :: [key]
def keys(registry, pid) when is_atom(registry) and is_pid(pid) do
{kind, partitions, _, pid_ets, _} = info!(registry)
{_, pid_ets} = pid_ets || pid_ets!(registry, pid, partitions)

keys =
try do
spec = [{{pid, :"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}]
:ets.select(pid_ets, spec)
catch
:error, :badarg -> []
pid_etses =
if pid_ets do
{_, pid_ets} = pid_ets
[pid_ets]
else
case kind do
{:duplicate, :key} ->
for partition <- 0..(partitions - 1) do
{_, pid_ets} = pid_ets!(registry, partition)
pid_ets
end

_ ->
{_, pid_ets} = pid_ets!(registry, pid, partitions)
[pid_ets]
end
end

keys =
Enum.flat_map(pid_etses, fn pid_ets ->
try do
spec = [{{pid, :"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}]
:ets.select(pid_ets, spec)
catch
:error, :badarg -> []
end
end)

# Handle the possibility of fake keys
keys = gather_keys(keys, [], false)

Expand Down Expand Up @@ -882,8 +950,17 @@ defmodule Registry do
[]
end

{:duplicate, partitions, key_ets} ->
key_ets = key_ets || key_ets!(registry, pid, partitions)
{{:duplicate, _}, 1, key_ets} ->
for {^pid, value} <- safe_lookup_second(key_ets, key), do: value

{{:duplicate, :key}, partitions, _key_ets} ->
partition = hash(key, partitions)
key_ets = key_ets!(registry, partition)
for {^pid, value} <- safe_lookup_second(key_ets, key), do: value

{{:duplicate, :pid}, partitions, _key_ets} ->
partition = hash(pid, partitions)
key_ets = key_ets!(registry, partition)
for {^pid, value} <- safe_lookup_second(key_ets, key), do: value
end
end
Expand Down Expand Up @@ -1121,7 +1198,7 @@ defmodule Registry do
end
end

defp register_key(:duplicate, key_ets, _key, entry) do
defp register_key({:duplicate, _}, key_ets, _key, entry) do
true = :ets.insert(key_ets, entry)
:ok
end
Expand Down Expand Up @@ -1339,10 +1416,10 @@ defmodule Registry do
key_ets = key_ets || key_ets!(registry, key, partitions)
:ets.select_count(key_ets, spec)

{:duplicate, 1, key_ets} ->
{{:duplicate, _}, 1, key_ets} ->
:ets.select_count(key_ets, spec)

{:duplicate, partitions, _key_ets} ->
{{:duplicate, _}, partitions, _key_ets} ->
Enum.sum_by(0..(partitions - 1), fn partition_index ->
:ets.select_count(key_ets!(registry, partition_index), spec)
end)
Expand Down Expand Up @@ -1512,7 +1589,12 @@ defmodule Registry do
{hash(key, partitions), hash(pid, partitions)}
end

defp partitions(:duplicate, _key, pid, partitions) do
defp partitions({:duplicate, :key}, key, _pid, partitions) do
partition = hash(key, partitions)
{partition, partition}
end

defp partitions({:duplicate, :pid}, _key, pid, partitions) do
partition = hash(pid, partitions)
{partition, partition}
end
Expand Down Expand Up @@ -1576,9 +1658,10 @@ defmodule Registry.Supervisor do
defp strategy_for_kind(:unique), do: :one_for_all

# Duplicate registries have both key and pid partitions hashed
# by pid. This means that, if a PID partition crashes, all of
# by key ({:duplicate, :key}) or pid ({:duplicate, :pid}).
# This means that, if a PID or key partition crashes, all of
# its associated entries are in its sibling table, so we crash one.
defp strategy_for_kind(:duplicate), do: :one_for_one
defp strategy_for_kind({:duplicate, _}), do: :one_for_one
end

defmodule Registry.Partition do
Expand Down Expand Up @@ -1633,6 +1716,7 @@ defmodule Registry.Partition do

def init({kind, registry, i, partitions, key_partition, pid_partition, listeners, compressed}) do
Process.flag(:trap_exit, true)

key_ets = init_key_ets(kind, key_partition, compressed)
pid_ets = init_pid_ets(kind, pid_partition)

Expand All @@ -1659,7 +1743,7 @@ defmodule Registry.Partition do
:ets.new(key_partition, compression_opt(opts, compressed))
end

defp init_key_ets(:duplicate, key_partition, compressed) do
defp init_key_ets({:duplicate, _}, key_partition, compressed) do
opts = [:duplicate_bag, :public, read_concurrency: true, write_concurrency: true]
:ets.new(key_partition, compression_opt(opts, compressed))
end
Expand Down
Loading