Registry

A local, decentralized and scalable key-value process storage.

It allows developers to lookup one or more processes with a given key. If the registry has :unique keys, a key points to 0 or 1 processes. If the registry allows :duplicate keys, a single key may point to any number of processes. In both cases, different keys could identify the same process.

Each entry in the registry is associated to the process that has registered the key. If the process crashes, the keys associated to that process are automatically removed. All key comparisons in the registry are done using the match operation (===).

The registry can be used for different purposes, such as name lookups (using the :via option), storing properties, custom dispatching rules, or a pubsub implementation. We explore some of those use cases below.

The registry may also be transparently partitioned, which provides more scalable behaviour for running registries on highly concurrent environments with thousands or millions of entries.

Using in :via

Once the registry is started with a given name (using Registry.start_link/2), it can be used to register and access named processes using the {:via, Registry, {registry, key}} tuple:

{:ok, _} = Registry.start_link(:unique, Registry.ViaTest)
name = {:via, Registry, {Registry.ViaTest, "agent"}}
{:ok, _} = Agent.start_link(fn -> 0 end, name: name)
Agent.get(name, & &1)
#=> 0
Agent.update(name, & &1 + 1)
Agent.get(name, & &1)
#=> 1

Typically the registry is started as part of a supervision tree though:

supervisor(Registry, [:unique, Registry.ViaTest])

Only registries with unique keys can be used in :via. If the name is already taken, the case-specific start_link function (Agent.start_link/2 in the example above) will return {:error, {:already_started, current_pid}}.

Using as a dispatcher

Registry has a dispatch mechanism that allows developers to implement custom dispatch logic triggered from the caller. For example, let’s say we have a duplicate registry started as so:

{:ok, _} = Registry.start_link(:duplicate, Registry.DispatcherTest)

By calling register/3, different processes can register under a given key and associate any value under that key. In this case, let’s register the current process under the key "hello" and attach the {IO, :inspect} tuple to it:

{:ok, _} = Registry.register(Registry.DispatcherTest, "hello", {IO, :inspect})

Now, an entity interested in dispatching events for a given key may call dispatch/3 passing in the key and a callback. This callback will be invoked with a list of all the values registered under the requested key, alongside the pid of the process that registered each value, in the form of {pid, value} tuples. In our example, value will be the {module, function} tuple in the code above:

Registry.dispatch(Registry.DispatcherTest, "hello", fn entries ->
  for {pid, {module, function}} <- entries, do: apply(module, function, [pid])
end)
# Prints #PID<...> where the pid is for the process that called register/3 above
#=> :ok

Dispatching happens in the process that calls dispatch/3 either serially or concurrently in case of multiple partitions (via spawned tasks). The registered processes are not involved in dispatching unless involving them is done explicitly (for example, by sending them a message in the callback).

Furthermore, if there is a failure when dispatching, due to a bad registration, dispatching will always fail and the registered process will not be notified. Therefore let’s make sure we at least wrap and report those errors:

require Logger
Registry.dispatch(Registry.DispatcherTest, "hello", fn entries ->
  for {pid, {module, function}} <- entries do
    try do
      apply(module, function, [pid])
    catch
      kind, reason ->
        formatted = Exception.format(kind, reason, System.stacktrace)
        Logger.error "Registry.dispatch/3 failed with #{formatted}"
    end
  end
end)
# Prints #PID<...>
#=> :ok

You could also replace the whole apply system by explicitly sending messages. That’s the example we will see next.

Using as a PubSub

Registries can also be used to implement a local, non-distributed, scalable PubSub by relying on the dispatch/3 function, similarly to the previous section: in this case, however, we will send messages to each associated process, instead of invoking a given module-function.

In this example, we will also set the number of partitions to the number of schedulers online, which will make the registry more performant on highly concurrent environments as each partition will spawn a new process, allowing dispatching to happen in parallel:

{:ok, _} = Registry.start_link(:duplicate, Registry.PubSubTest,
                               partitions: System.schedulers_online)
{:ok, _} = Registry.register(Registry.PubSubTest, "hello", [])
Registry.dispatch(Registry.PubSubTest, "hello", fn entries ->
  for {pid, _} <- entries, do: send(pid, {:broadcast, "world"})
end)
#=> :ok

The example above broadcasted the message {:broadcast, "world"} to all processes registered under the “topic” (or “key” as we called it until now) "hello".

The third argument given to register/3 is a value associated to the current process. While in the previous section we used it when dispatching, in this particular example we are not interested in it, so we have set it to an empty list. You could store a more meaningful value if necessary.

Registrations

Looking up, dispatching and registering are efficient and immediate at the cost of delayed unsubscription. For example, if a process crashes, its keys are automatically removed from the registry but the change may not propagate immediately. This means certain operations may return processes that are already dead. When such may happen, it will be explicitly stated in the function documentation.

However, keep in mind those cases are typically not an issue. After all, a process referenced by a pid may crash at any time, including between getting the value from the registry and sending it a message. Many parts of the standard library are designed to cope with that, such as Process.monitor/1 which will deliver the :DOWN message immediately if the monitored process is already dead and Kernel.send/2 which acts as a no-op for dead processes.

ETS

Note that the registry uses one ETS table plus two ETS tables per partition.

Summary

Types

key()

The type of keys allowed on registration

kind()

The type of the registry

meta_key()

The type of registry metadata keys

meta_value()

The type of registry metadata values

registry()

The registry identifier

value()

The type of values allowed on registration

Functions

dispatch(registry, key, mfa_or_fun)

Invokes the callback with all entries under key in each partition for the given registry

keys(registry, pid)

Returns the known keys for the given pid in registry in no particular order

lookup(registry, key)

Finds the {pid, value} pair for the given key in registry in no particular order

match(registry, key, pattern)

Returns {pid, value} pairs under the given key in registry that match pattern

meta(registry, key)

Reads registry metadata given on start_link/3

put_meta(registry, key, value)

Stores registry metadata

register(registry, key, value)

Registers the current process under the given key in registry

start_link(kind, registry, options \\ [])

Starts the registry as a supervisor process

unregister(registry, key)

Unregisters all entries for the given key associated to the current process in registry

update_value(registry, key, callback)

Updates the value for key for the current process in the unique registry

Types

key()

key() :: term()

The type of keys allowed on registration

kind()

kind() :: :unique | :duplicate

The type of the registry

meta_key()

meta_key() :: atom() | tuple()

The type of registry metadata keys

meta_value()

meta_value() :: term()

The type of registry metadata values

registry()

registry() :: atom()

The registry identifier

value()

value() :: term()

The type of values allowed on registration

Functions

dispatch(registry, key, mfa_or_fun)

dispatch(registry(), key(), (entries :: [{pid(), value()}] -> term())) :: :ok

Invokes the callback with all entries under key in each partition for the given registry.

The list of entries is a non-empty list of two-element tuples where the first element is the pid and the second element is the value associated to the pid. If there are no entries for the given key, the callback is never invoked.

If the registry is not partitioned, the callback is invoked in the process that calls dispatch/3. If the registry is partitioned, the callback is invoked concurrently per partition by starting a task linked to the caller. The callback, however, is only invoked if there are entries for that partition.

See the module documentation for examples of using the dispatch/3 function for building custom dispatching or a pubsub system.

keys(registry, pid)

keys(registry(), pid()) :: [key()]

Returns the known keys for the given pid in registry in no particular order.

If the registry is unique, the keys are unique. Otherwise they may contain duplicates if the process was registered under the same key multiple times. The list will be empty if the process is dead or it has no keys in this registry.

Examples

Registering under a unique registry does not allow multiple entries:

iex> Registry.start_link(:unique, Registry.UniqueKeysTest)
iex> Registry.keys(Registry.UniqueKeysTest, self())
[]
iex> {:ok, _} = Registry.register(Registry.UniqueKeysTest, "hello", :world)
iex> Registry.register(Registry.UniqueKeysTest, "hello", :later) # registry is :unique
{:error, {:already_registered, self()}}
iex> Registry.keys(Registry.UniqueKeysTest, self())
["hello"]

Such is possible for duplicate registries though:

iex> Registry.start_link(:duplicate, Registry.DuplicateKeysTest)
iex> Registry.keys(Registry.DuplicateKeysTest, self())
[]
iex> {:ok, _} = Registry.register(Registry.DuplicateKeysTest, "hello", :world)
iex> {:ok, _} = Registry.register(Registry.DuplicateKeysTest, "hello", :world)
iex> Registry.keys(Registry.DuplicateKeysTest, self())
["hello", "hello"]

lookup(registry, key)

lookup(registry(), key()) :: [{pid(), value()}]

Finds the {pid, value} pair for the given key in registry in no particular order.

An empty list if there is no match.

For unique registries, a single partition lookup is necessary. For duplicate registries, all partitions must be looked up.

Examples

In the example below we register the current process and look it up both from itself and other processes:

iex> Registry.start_link(:unique, Registry.UniqueLookupTest)
iex> Registry.lookup(Registry.UniqueLookupTest, "hello")
[]
iex> {:ok, _} = Registry.register(Registry.UniqueLookupTest, "hello", :world)
iex> Registry.lookup(Registry.UniqueLookupTest, "hello")
[{self(), :world}]
iex> Task.async(fn -> Registry.lookup(Registry.UniqueLookupTest, "hello") end) |> Task.await
[{self(), :world}]

The same applies to duplicate registries:

iex> Registry.start_link(:duplicate, Registry.DuplicateLookupTest)
iex> Registry.lookup(Registry.DuplicateLookupTest, "hello")
[]
iex> {:ok, _} = Registry.register(Registry.DuplicateLookupTest, "hello", :world)
iex> Registry.lookup(Registry.DuplicateLookupTest, "hello")
[{self(), :world}]
iex> {:ok, _} = Registry.register(Registry.DuplicateLookupTest, "hello", :another)
iex> Enum.sort(Registry.lookup(Registry.DuplicateLookupTest, "hello"))
[{self(), :another}, {self(), :world}]

match(registry, key, pattern)

match(registry(), key(), match_pattern :: atom() | tuple()) :: [{pid(), term()}]

Returns {pid, value} pairs under the given key in registry that match pattern.

Pattern must be an atom or a tuple that will match the structure of the value stored in the registry. The atom :_ can be used to ignore a given value or tuple element, while :”$1” can be used to temporarily assign part of pattern to a variable for a subsequent comparison.

An empty list will be returned if there is no match.

For unique registries, a single partition lookup is necessary. For duplicate registries, all partitions must be looked up.

Examples

In the example below we register the current process under the same key in a duplicate registry but with different values:

iex> Registry.start_link(:duplicate, Registry.MatchTest)
iex> {:ok, _} = Registry.register(Registry.MatchTest, "hello", {1, :atom, 1})
iex> {:ok, _} = Registry.register(Registry.MatchTest, "hello", {2, :atom, 2})
iex> Registry.match(Registry.MatchTest, "hello", {1, :_, :_})
[{self(), {1, :atom, 1}}]
iex> Registry.match(Registry.MatchTest, "hello", {2, :_, :_})
[{self(), {2, :atom, 2}}]
iex> Registry.match(Registry.MatchTest, "hello", {:_, :atom, :_}) |> Enum.sort()
[{self(), {1, :atom, 1}}, {self(), {2, :atom, 2}}]
iex> Registry.match(Registry.MatchTest, "hello", {:"$1", :_, :"$1"}) |> Enum.sort()
[{self(), {1, :atom, 1}}, {self(), {2, :atom, 2}}]

meta(registry, key)

meta(registry(), meta_key()) :: {:ok, meta_value()} | :error

Reads registry metadata given on start_link/3.

Atoms and tuples are allowed as keys.

Examples

iex> Registry.start_link(:unique, Registry.MetaTest, meta: [custom_key: "custom_value"])
iex> Registry.meta(Registry.MetaTest, :custom_key)
{:ok, "custom_value"}
iex> Registry.meta(Registry.MetaTest, :unknown_key)
:error

put_meta(registry, key, value)

put_meta(registry(), meta_key(), meta_value()) :: :ok

Stores registry metadata.

Atoms and tuples are allowed as keys.

Examples

iex> Registry.start_link(:unique, Registry.PutMetaTest)
iex> Registry.put_meta(Registry.PutMetaTest, :custom_key, "custom_value")
:ok
iex> Registry.meta(Registry.PutMetaTest, :custom_key)
{:ok, "custom_value"}
iex> Registry.put_meta(Registry.PutMetaTest, {:tuple, :key}, "tuple_value")
:ok
iex> Registry.meta(Registry.PutMetaTest, {:tuple, :key})
{:ok, "tuple_value"}

register(registry, key, value)

register(registry(), key(), value()) ::
  {:ok, pid()} |
  {:error, {:already_registered, pid()}}

Registers the current process under the given key in registry.

A value to be associated with this registration must also be given. This value will be retrieved whenever dispatching or doing a key lookup.

This function returns {:ok, owner} or {:error, reason}. The owner is the pid is the registry partition responsible for the pid. The owner is automatically linked to the caller.

If the registry has unique keys, it will return {:ok, owner} unless the key is already associated to a pid, in which case it returns {:error, {:already_registered, pid}}.

If the registry has duplicate keys, multiple registrations from the current process under the same key are allowed.

Examples

Registering under a unique registry does not allow multiple entries:

iex> Registry.start_link(:unique, Registry.UniqueRegisterTest)
iex> {:ok, _} = Registry.register(Registry.UniqueRegisterTest, "hello", :world)
iex> Registry.register(Registry.UniqueRegisterTest, "hello", :later)
{:error, {:already_registered, self()}}
iex> Registry.keys(Registry.UniqueRegisterTest, self())
["hello"]

Such is possible for duplicate registries though:

iex> Registry.start_link(:duplicate, Registry.DuplicateRegisterTest)
iex> {:ok, _} = Registry.register(Registry.DuplicateRegisterTest, "hello", :world)
iex> {:ok, _} = Registry.register(Registry.DuplicateRegisterTest, "hello", :world)
iex> Registry.keys(Registry.DuplicateRegisterTest, self())
["hello", "hello"]

start_link(kind, registry, options \\ [])

start_link(kind(), registry(), options) ::
  {:ok, pid()} |
  {:error, term()} when options: [partitions: pos_integer(), listeners: [atom()], meta: [{meta_key(), meta_value()}]]

Starts the registry as a supervisor process.

Manually it can be started as:

Registry.start_link(:unique, MyApp.Registry)

In your supervisor tree, you would write:

supervisor(Registry, [:unique, MyApp.Registry])

For intensive workloads, the registry may also be partitioned (by specifying the :partitions option). If partitioning is required then a good default is to set the number of partitions to the number of schedulers available:

Registry.start_link(:unique, MyApp.Registry, partitions: System.schedulers_online())

or:

supervisor(Registry, [:unique, MyApp.Registry, [partitions: System.schedulers_online()]])

Options

The registry supports the following options:

  • :partitions - the number of partitions in the registry. Defaults to 1.
  • :listeners - a list of named processes which are notified of :register and :unregister events. The registered process must be monitored by the listener if the listener wants to be notified if the registered process crashes.
  • :meta - a keyword list of metadata to be attached to the registry.

unregister(registry, key)

unregister(registry(), key()) :: :ok

Unregisters all entries for the given key associated to the current process in registry.

Always returns :ok and automatically unlinks the current process from the owner if there are no more keys associated to the current process. See also register/3 to read more about the “owner”.

Examples

It unregister all entries for key for unique registries:

iex> Registry.start_link(:unique, Registry.UniqueUnregisterTest)
iex> Registry.register(Registry.UniqueUnregisterTest, "hello", :world)
iex> Registry.keys(Registry.UniqueUnregisterTest, self())
["hello"]
iex> Registry.unregister(Registry.UniqueUnregisterTest, "hello")
:ok
iex> Registry.keys(Registry.UniqueUnregisterTest, self())
[]

As well as duplicate registries:

iex> Registry.start_link(:duplicate, Registry.DuplicateUnregisterTest)
iex> Registry.register(Registry.DuplicateUnregisterTest, "hello", :world)
iex> Registry.register(Registry.DuplicateUnregisterTest, "hello", :world)
iex> Registry.keys(Registry.DuplicateUnregisterTest, self())
["hello", "hello"]
iex> Registry.unregister(Registry.DuplicateUnregisterTest, "hello")
:ok
iex> Registry.keys(Registry.DuplicateUnregisterTest, self())
[]

update_value(registry, key, callback)

update_value(registry(), key(), (value() -> value())) ::
  {new_value :: term(), old_value :: term()} |
  :error

Updates the value for key for the current process in the unique registry.

Returns a {new_value, old_value} tuple or :error if there is no such key assigned to the current process.

If a non-unique registry is given, an error is raised.

Examples

iex> Registry.start_link(:unique, Registry.UpdateTest)
iex> {:ok, _} = Registry.register(Registry.UpdateTest, "hello", 1)
iex> Registry.lookup(Registry.UpdateTest, "hello")
[{self(), 1}]
iex> Registry.update_value(Registry.UpdateTest, "hello", & &1 + 1)
{2, 1}
iex> Registry.lookup(Registry.UpdateTest, "hello")
[{self(), 2}]

© 2012 Plataformatec
Licensed under the Apache License, Version 2.0.
https://hexdocs.pm/elixir/1.4.5/Registry.html