Optimize connection and session management
This commit is contained in:
parent
2a747c9d53
commit
8d50c62a94
|
@ -36,7 +36,7 @@
|
||||||
-define(CM, ?MODULE).
|
-define(CM, ?MODULE).
|
||||||
|
|
||||||
%% ETS Tables.
|
%% ETS Tables.
|
||||||
-define(CONN_TAB, emqx_conn).
|
-define(CONN_TAB, emqx_conn).
|
||||||
-define(CONN_ATTRS_TAB, emqx_conn_attrs).
|
-define(CONN_ATTRS_TAB, emqx_conn_attrs).
|
||||||
-define(CONN_STATS_TAB, emqx_conn_stats).
|
-define(CONN_STATS_TAB, emqx_conn_stats).
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ register_connection(ClientId) when is_binary(ClientId) ->
|
||||||
register_connection({ClientId, self()});
|
register_connection({ClientId, self()});
|
||||||
|
|
||||||
register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
||||||
_ = ets:insert(?CONN_TAB, Conn),
|
true = ets:insert(?CONN_TAB, Conn),
|
||||||
notify({registered, ClientId, ConnPid}).
|
notify({registered, ClientId, ConnPid}).
|
||||||
|
|
||||||
-spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list()) -> ok).
|
-spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list()) -> ok).
|
||||||
|
@ -87,10 +87,13 @@ unregister_connection(ClientId) when is_binary(ClientId) ->
|
||||||
unregister_connection({ClientId, self()});
|
unregister_connection({ClientId, self()});
|
||||||
|
|
||||||
unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
|
||||||
_ = ets:delete(?CONN_STATS_TAB, Conn),
|
do_unregister_connection(Conn),
|
||||||
_ = ets:delete(?CONN_ATTRS_TAB, Conn),
|
notify({unregistered, ConnPid}).
|
||||||
_ = ets:delete_object(?CONN_TAB, Conn),
|
|
||||||
notify({unregistered, ClientId, ConnPid}).
|
do_unregister_connection(Conn) ->
|
||||||
|
true = ets:delete(?CONN_STATS_TAB, Conn),
|
||||||
|
true = ets:delete(?CONN_ATTRS_TAB, Conn),
|
||||||
|
true = ets:delete_object(?CONN_TAB, Conn).
|
||||||
|
|
||||||
%% @doc Lookup connection pid
|
%% @doc Lookup connection pid
|
||||||
-spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined).
|
-spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined).
|
||||||
|
@ -138,7 +141,7 @@ handle_call(Req, _From, State) ->
|
||||||
handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
||||||
{noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}};
|
{noreply, State#{conn_pmon := emqx_pmon:monitor(ConnPid, ClientId, PMon)}};
|
||||||
|
|
||||||
handle_cast({notify, {unregistered, _ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
||||||
{noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
|
{noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
|
@ -150,7 +153,12 @@ handle_info({'DOWN', _MRef, process, ConnPid, _Reason}, State = #{conn_pmon := P
|
||||||
undefined ->
|
undefined ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
ClientId ->
|
ClientId ->
|
||||||
unregister_connection({ClientId, ConnPid}),
|
Conn = {ClientId, ConnPid},
|
||||||
|
case ets:member(?CONN_ATTRS_TAB, Conn) of
|
||||||
|
true ->
|
||||||
|
ok = emqx_pool:async_submit(fun do_unregister_connection/1, [Conn]);
|
||||||
|
false -> ok
|
||||||
|
end,
|
||||||
{noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}}
|
{noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
|
|
@ -849,14 +849,13 @@ shutdown(_Reason, #pstate{client_id = undefined}) ->
|
||||||
ok;
|
ok;
|
||||||
shutdown(_Reason, #pstate{connected = false}) ->
|
shutdown(_Reason, #pstate{connected = false}) ->
|
||||||
ok;
|
ok;
|
||||||
shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
|
shutdown(conflict, _PState) ->
|
||||||
Reason =:= discard ->
|
ok;
|
||||||
emqx_cm:unregister_connection(ClientId);
|
shutdown(discard, _PState) ->
|
||||||
shutdown(Reason, PState = #pstate{connected = true,
|
ok;
|
||||||
client_id = ClientId}) ->
|
shutdown(Reason, PState) ->
|
||||||
?LOG(info, "Shutdown for ~p", [Reason]),
|
?LOG(info, "Shutdown for ~p", [Reason]),
|
||||||
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
|
emqx_hooks:run('client.disconnected', [credentials(PState), Reason]).
|
||||||
emqx_cm:unregister_connection(ClientId).
|
|
||||||
|
|
||||||
start_keepalive(0, _PState) ->
|
start_keepalive(0, _PState) ->
|
||||||
ignore;
|
ignore;
|
||||||
|
|
|
@ -645,16 +645,14 @@ handle_info(Info, State) ->
|
||||||
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
|
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(Reason, #state{will_msg = WillMsg, client_id = ClientId, conn_pid = ConnPid}) ->
|
terminate(Reason, #state{will_msg = WillMsg, conn_pid = ConnPid}) ->
|
||||||
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
|
%% Should not run hooks here.
|
||||||
|
%% emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
|
||||||
send_willmsg(WillMsg),
|
send_willmsg(WillMsg),
|
||||||
%% Ensure to shutdown the connection
|
%% Ensure to shutdown the connection
|
||||||
if
|
(ConnPid =:= undefined) orelse ConnPid ! {shutdown, Reason}.
|
||||||
ConnPid =/= undefined ->
|
%% Let it crash.
|
||||||
ConnPid ! {shutdown, Reason};
|
%% emqx_sm:unregister_session(ClientId).
|
||||||
true -> ok
|
|
||||||
end,
|
|
||||||
emqx_sm:unregister_session(ClientId).
|
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
@ -1011,3 +1009,4 @@ noreply(State) ->
|
||||||
|
|
||||||
shutdown(Reason, State) ->
|
shutdown(Reason, State) ->
|
||||||
{stop, {shutdown, Reason}, State}.
|
{stop, {shutdown, Reason}, State}.
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@ init([]) ->
|
||||||
[#{id => session,
|
[#{id => session,
|
||||||
start => {emqx_session, start_link, []},
|
start => {emqx_session, start_link, []},
|
||||||
restart => temporary,
|
restart => temporary,
|
||||||
shutdown => 5000,
|
shutdown => brutal_kill,
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => [emqx_session]}]}}.
|
modules => [emqx_session]}]}}.
|
||||||
|
|
||||||
|
|
|
@ -40,9 +40,9 @@
|
||||||
|
|
||||||
-define(SM, ?MODULE).
|
-define(SM, ?MODULE).
|
||||||
|
|
||||||
%% ETS Tables
|
%% ETS Tables for session management.
|
||||||
-define(SESSION_TAB, emqx_session).
|
-define(SESSION_TAB, emqx_session).
|
||||||
-define(SESSION_P_TAB, emqx_persistent_session).
|
-define(SESSION_P_TAB, emqx_session_p).
|
||||||
-define(SESSION_ATTRS_TAB, emqx_session_attrs).
|
-define(SESSION_ATTRS_TAB, emqx_session_attrs).
|
||||||
-define(SESSION_STATS_TAB, emqx_session_stats).
|
-define(SESSION_STATS_TAB, emqx_session_stats).
|
||||||
|
|
||||||
|
@ -59,8 +59,7 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
|
||||||
end,
|
end,
|
||||||
emqx_sm_locker:trans(ClientId, CleanStart);
|
emqx_sm_locker:trans(ClientId, CleanStart);
|
||||||
|
|
||||||
open_session(SessAttrs = #{clean_start := false,
|
open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) ->
|
||||||
client_id := ClientId}) ->
|
|
||||||
ResumeStart = fun(_) ->
|
ResumeStart = fun(_) ->
|
||||||
case resume_session(ClientId, SessAttrs) of
|
case resume_session(ClientId, SessAttrs) of
|
||||||
{ok, SPid} ->
|
{ok, SPid} ->
|
||||||
|
@ -77,13 +76,14 @@ discard_session(ClientId) when is_binary(ClientId) ->
|
||||||
discard_session(ClientId, self()).
|
discard_session(ClientId, self()).
|
||||||
|
|
||||||
discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
|
discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
|
||||||
lists:foreach(fun({_ClientId, SPid}) ->
|
lists:foreach(
|
||||||
case catch emqx_session:discard(SPid, ConnPid) of
|
fun({_ClientId, SPid}) ->
|
||||||
{Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
|
case catch emqx_session:discard(SPid, ConnPid) of
|
||||||
emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
|
{Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
|
||||||
ok -> ok
|
emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
|
||||||
end
|
ok -> ok
|
||||||
end, lookup_session(ClientId)).
|
end
|
||||||
|
end, lookup_session(ClientId)).
|
||||||
|
|
||||||
%% @doc Try to resume a session.
|
%% @doc Try to resume a session.
|
||||||
-spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}).
|
-spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}).
|
||||||
|
@ -116,19 +116,18 @@ close_session(SPid) when is_pid(SPid) ->
|
||||||
register_session(ClientId, SessAttrs) when is_binary(ClientId) ->
|
register_session(ClientId, SessAttrs) when is_binary(ClientId) ->
|
||||||
register_session({ClientId, self()}, SessAttrs);
|
register_session({ClientId, self()}, SessAttrs);
|
||||||
|
|
||||||
register_session(Session = {ClientId, SPid}, SessAttrs)
|
register_session(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) ->
|
||||||
when is_binary(ClientId), is_pid(SPid) ->
|
true = ets:insert(?SESSION_TAB, Session),
|
||||||
ets:insert(?SESSION_TAB, Session),
|
true = ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}),
|
||||||
ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}),
|
true = proplists:get_value(clean_start, SessAttrs, true)
|
||||||
proplists:get_value(clean_start, SessAttrs, true)
|
orelse ets:insert(?SESSION_P_TAB, Session),
|
||||||
andalso ets:insert(?SESSION_P_TAB, Session),
|
ok = emqx_sm_registry:register_session(Session),
|
||||||
emqx_sm_registry:register_session(Session),
|
|
||||||
notify({registered, ClientId, SPid}).
|
notify({registered, ClientId, SPid}).
|
||||||
|
|
||||||
%% @doc Get session attrs
|
%% @doc Get session attrs
|
||||||
-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())).
|
-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())).
|
||||||
get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
|
get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
|
||||||
safe_lookup_element(?SESSION_ATTRS_TAB, Session, []).
|
emqx_tables:lookup_value(?SESSION_ATTRS_TAB, Session, []).
|
||||||
|
|
||||||
%% @doc Set session attrs
|
%% @doc Set session attrs
|
||||||
-spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()},
|
-spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()},
|
||||||
|
@ -144,17 +143,21 @@ unregister_session(ClientId) when is_binary(ClientId) ->
|
||||||
unregister_session({ClientId, self()});
|
unregister_session({ClientId, self()});
|
||||||
|
|
||||||
unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
|
unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
|
||||||
emqx_sm_registry:unregister_session(Session),
|
ok = do_unregister_session(Session),
|
||||||
ets:delete(?SESSION_STATS_TAB, Session),
|
|
||||||
ets:delete(?SESSION_ATTRS_TAB, Session),
|
|
||||||
ets:delete_object(?SESSION_P_TAB, Session),
|
|
||||||
ets:delete_object(?SESSION_TAB, Session),
|
|
||||||
notify({unregistered, ClientId, SPid}).
|
notify({unregistered, ClientId, SPid}).
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
do_unregister_session(Session) ->
|
||||||
|
true = ets:delete(?SESSION_STATS_TAB, Session),
|
||||||
|
true = ets:delete(?SESSION_ATTRS_TAB, Session),
|
||||||
|
true = ets:delete_object(?SESSION_P_TAB, Session),
|
||||||
|
true = ets:delete_object(?SESSION_TAB, Session),
|
||||||
|
emqx_sm_registry:unregister_session(Session).
|
||||||
|
|
||||||
%% @doc Get session stats
|
%% @doc Get session stats
|
||||||
-spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
|
-spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
|
||||||
get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
|
get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
|
||||||
safe_lookup_element(?SESSION_STATS_TAB, Session, []).
|
emqx_tables:lookup_value(?SESSION_STATS_TAB, Session, []).
|
||||||
|
|
||||||
%% @doc Set session stats
|
%% @doc Set session stats
|
||||||
-spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()},
|
-spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()},
|
||||||
|
@ -168,7 +171,7 @@ set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), i
|
||||||
-spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})).
|
-spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})).
|
||||||
lookup_session(ClientId) ->
|
lookup_session(ClientId) ->
|
||||||
case emqx_sm_registry:is_enabled() of
|
case emqx_sm_registry:is_enabled() of
|
||||||
true -> emqx_sm_registry:lookup_session(ClientId);
|
true -> emqx_sm_registry:lookup_session(ClientId);
|
||||||
false -> ets:lookup(?SESSION_TAB, ClientId)
|
false -> ets:lookup(?SESSION_TAB, ClientId)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -185,13 +188,7 @@ dispatch(ClientId, Topic, Msg) ->
|
||||||
%% @doc Lookup session pid.
|
%% @doc Lookup session pid.
|
||||||
-spec(lookup_session_pid(emqx_types:client_id()) -> pid() | undefined).
|
-spec(lookup_session_pid(emqx_types:client_id()) -> pid() | undefined).
|
||||||
lookup_session_pid(ClientId) ->
|
lookup_session_pid(ClientId) ->
|
||||||
safe_lookup_element(?SESSION_TAB, ClientId, undefined).
|
emqx_tables:lookup_value(?SESSION_TAB, ClientId).
|
||||||
|
|
||||||
safe_lookup_element(Tab, Key, Default) ->
|
|
||||||
try ets:lookup_element(Tab, Key, 2)
|
|
||||||
catch
|
|
||||||
error:badarg -> Default
|
|
||||||
end.
|
|
||||||
|
|
||||||
notify(Event) ->
|
notify(Event) ->
|
||||||
gen_server:cast(?SM, {notify, Event}).
|
gen_server:cast(?SM, {notify, Event}).
|
||||||
|
@ -207,29 +204,34 @@ init([]) ->
|
||||||
ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
|
ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
|
||||||
ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
|
ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
|
||||||
ok = emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
|
ok = emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
|
||||||
{ok, #{session_pmon => emqx_pmon:new()}}.
|
{ok, #{sess_pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
emqx_logger:error("[SM] unexpected call: ~p", [Req]),
|
emqx_logger:error("[SM] unexpected call: ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({notify, {registered, ClientId, SPid}}, State = #{session_pmon := PMon}) ->
|
handle_cast({notify, {registered, ClientId, SPid}}, State = #{sess_pmon := PMon}) ->
|
||||||
{noreply, State#{session_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}};
|
{noreply, State#{sess_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}};
|
||||||
|
|
||||||
handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{session_pmon := PMon}) ->
|
handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{sess_pmon := PMon}) ->
|
||||||
{noreply, State#{session_pmon := emqx_pmon:demonitor(SPid, PMon)}};
|
{noreply, State#{sess_pmon := emqx_pmon:demonitor(SPid, PMon)}};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{session_pmon := PMon}) ->
|
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{sess_pmon := PMon}) ->
|
||||||
case emqx_pmon:find(DownPid, PMon) of
|
case emqx_pmon:find(DownPid, PMon) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
ClientId ->
|
ClientId ->
|
||||||
unregister_session({ClientId, DownPid}),
|
Session = {ClientId, DownPid},
|
||||||
{noreply, State#{session_pmon := emqx_pmon:erase(DownPid, PMon)}}
|
case ets:member(?SESSION_ATTRS_TAB, Session) of
|
||||||
|
true ->
|
||||||
|
ok = emqx_pool:async_submit(fun do_unregister_session/1, [Session]);
|
||||||
|
false -> ok
|
||||||
|
end,
|
||||||
|
{noreply, State#{sess_pmon := emqx_pmon:erase(DownPid, PMon)}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
-export([trans/2, trans/3]).
|
-export([trans/2, trans/3]).
|
||||||
-export([lock/1, lock/2, unlock/1]).
|
-export([lock/1, lock/2, unlock/1]).
|
||||||
|
|
||||||
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
-spec(start_link() -> emqx_types:startlink_ret()).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
ekka_locker:start_link(?MODULE).
|
ekka_locker:start_link(?MODULE).
|
||||||
|
|
||||||
|
|
|
@ -41,8 +41,7 @@ start_link() ->
|
||||||
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
|
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
|
||||||
|
|
||||||
-spec(is_enabled() -> boolean()).
|
-spec(is_enabled() -> boolean()).
|
||||||
is_enabled() ->
|
is_enabled() -> ets:info(?TAB, name) =/= undefined.
|
||||||
ets:info(?TAB, name) =/= undefined.
|
|
||||||
|
|
||||||
-spec(lookup_session(emqx_types:client_id())
|
-spec(lookup_session(emqx_types:client_id())
|
||||||
-> list({emqx_types:client_id(), session_pid()})).
|
-> list({emqx_types:client_id(), session_pid()})).
|
||||||
|
@ -73,7 +72,7 @@ init([]) ->
|
||||||
{storage_properties, [{ets, [{read_concurrency, true},
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
{write_concurrency, true}]}]}]),
|
{write_concurrency, true}]}]}]),
|
||||||
ok = ekka_mnesia:copy_table(?TAB),
|
ok = ekka_mnesia:copy_table(?TAB),
|
||||||
_ = ekka:monitor(membership),
|
ok = ekka:monitor(membership),
|
||||||
{ok, #{}}.
|
{ok, #{}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
|
Loading…
Reference in New Issue