feat(dsrepl): allow to subscribe to DB metadata changes

Currently, only shard metadata changes are announced to the
subscribers.
This commit is contained in:
Andrew Mayorov 2024-04-05 17:36:57 +02:00
parent a07295d3bc
commit d6058b7f51
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 62 additions and 1 deletions

View File

@ -57,6 +57,12 @@
target_set/2
]).
%% Subscriptions to changes:
-export([
subscribe/2,
unsubscribe/1
]).
%% gen_server
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
@ -125,6 +131,9 @@
| {error, {nonexistent_sites, [site()]}}
| {error, _}.
%% Subject of the subscription:
-type subject() :: emqx_ds:db().
%% Peristent term key:
-define(emqx_ds_builtin_site, emqx_ds_builtin_site).
@ -336,11 +345,21 @@ target_set(DB, Shard) ->
undefined
end.
%%================================================================================
subscribe(Pid, Subject) ->
gen_server:call(?SERVER, {subscribe, Pid, Subject}, infinity).
unsubscribe(Pid) ->
gen_server:call(?SERVER, {unsubscribe, Pid}, infinity).
%%================================================================================
%% behavior callbacks
%%================================================================================
-record(s, {}).
-record(s, {
subs = #{} :: #{pid() => {subject(), _Monitor :: reference()}}
}).
init([]) ->
process_flag(trap_exit, true),
@ -348,14 +367,24 @@ init([]) ->
ensure_tables(),
ensure_site(),
S = #s{},
{ok, _Node} = mnesia:subscribe({table, ?SHARD_TAB, simple}),
{ok, S}.
handle_call({subscribe, Pid, Subject}, _From, S) ->
{reply, ok, handle_subscribe(Pid, Subject, S)};
handle_call({unsubscribe, Pid}, _From, S) ->
{reply, ok, handle_unsubscribe(Pid, S)};
handle_call(_Call, _From, S) ->
{reply, {error, unknown_call}, S}.
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}}, S) ->
ok = notify_subscribers(DB, {shard, DB, Shard}, S),
{noreply, S};
handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) ->
{noreply, handle_unsubscribe(Pid, S)};
handle_info(_Info, S) ->
{noreply, S}.
@ -613,6 +642,38 @@ transaction(Fun, Args) ->
{error, Reason}
end.
%%====================================================================
handle_subscribe(Pid, Subject, S = #s{subs = Subs0}) ->
case maps:is_key(Pid, Subs0) of
false ->
MRef = erlang:monitor(process, Pid),
Subs = Subs0#{Pid => {Subject, MRef}},
S#s{subs = Subs};
true ->
S
end.
handle_unsubscribe(Pid, S = #s{subs = Subs0}) ->
case maps:take(Pid, Subs0) of
{{_Subject, MRef}, Subs} ->
_ = erlang:demonitor(MRef, [flush]),
S#s{subs = Subs};
error ->
S
end.
notify_subscribers(EventSubject, Event, #s{subs = Subs}) ->
maps:foreach(
fun(Pid, {Subject, _MRef}) ->
Subject == EventSubject andalso
erlang:send(Pid, {changed, Event})
end,
Subs
).
%%====================================================================
%% @doc Intersperse elements of two lists.
%% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5].
-spec intersperse([X], [Y]) -> [X | Y].