diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 66029d4ca..f27fa414e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -57,6 +57,12 @@ target_set/2 ]). +%% Subscriptions to changes: +-export([ + subscribe/2, + unsubscribe/1 +]). + %% gen_server -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -125,6 +131,9 @@ | {error, {nonexistent_sites, [site()]}} | {error, _}. +%% Subject of the subscription: +-type subject() :: emqx_ds:db(). + %% Peristent term key: -define(emqx_ds_builtin_site, emqx_ds_builtin_site). @@ -336,11 +345,21 @@ target_set(DB, Shard) -> undefined end. +%%================================================================================ + +subscribe(Pid, Subject) -> + gen_server:call(?SERVER, {subscribe, Pid, Subject}, infinity). + +unsubscribe(Pid) -> + gen_server:call(?SERVER, {unsubscribe, Pid}, infinity). + %%================================================================================ %% behavior callbacks %%================================================================================ --record(s, {}). +-record(s, { + subs = #{} :: #{pid() => {subject(), _Monitor :: reference()}} +}). init([]) -> process_flag(trap_exit, true), @@ -348,14 +367,24 @@ init([]) -> ensure_tables(), ensure_site(), S = #s{}, + {ok, _Node} = mnesia:subscribe({table, ?SHARD_TAB, simple}), {ok, S}. +handle_call({subscribe, Pid, Subject}, _From, S) -> + {reply, ok, handle_subscribe(Pid, Subject, S)}; +handle_call({unsubscribe, Pid}, _From, S) -> + {reply, ok, handle_unsubscribe(Pid, S)}; handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. handle_cast(_Cast, S) -> {noreply, S}. +handle_info({mnesia_table_event, {write, #?SHARD_TAB{shard = {DB, Shard}}, _}}, S) -> + ok = notify_subscribers(DB, {shard, DB, Shard}, S), + {noreply, S}; +handle_info({'DOWN', _MRef, process, Pid, _Reason}, S) -> + {noreply, handle_unsubscribe(Pid, S)}; handle_info(_Info, S) -> {noreply, S}. @@ -613,6 +642,38 @@ transaction(Fun, Args) -> {error, Reason} end. +%%==================================================================== + +handle_subscribe(Pid, Subject, S = #s{subs = Subs0}) -> + case maps:is_key(Pid, Subs0) of + false -> + MRef = erlang:monitor(process, Pid), + Subs = Subs0#{Pid => {Subject, MRef}}, + S#s{subs = Subs}; + true -> + S + end. + +handle_unsubscribe(Pid, S = #s{subs = Subs0}) -> + case maps:take(Pid, Subs0) of + {{_Subject, MRef}, Subs} -> + _ = erlang:demonitor(MRef, [flush]), + S#s{subs = Subs}; + error -> + S + end. + +notify_subscribers(EventSubject, Event, #s{subs = Subs}) -> + maps:foreach( + fun(Pid, {Subject, _MRef}) -> + Subject == EventSubject andalso + erlang:send(Pid, {changed, Event}) + end, + Subs + ). + +%%==================================================================== + %% @doc Intersperse elements of two lists. %% Example: intersperse([1, 2], [3, 4, 5]) -> [1, 3, 2, 4, 5]. -spec intersperse([X], [Y]) -> [X | Y].