Integrate with ekka library, use ekka:subscribe/1 to replace mnesia:subscribe/1

This commit is contained in:
Feng Lee 2017-06-16 16:02:36 +08:00
parent 1e205720cc
commit 1f3bab2bcb
2 changed files with 11 additions and 14 deletions

View File

@ -216,7 +216,7 @@ stop() -> gen_server:call(?ROUTER, stop).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
mnesia:subscribe(system), ekka:subscribe(membership),
ets:new(mqtt_local_route, [set, named_table, protected]), ets:new(mqtt_local_route, [set, named_table, protected]),
{ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, TRef} = timer:send_interval(timer:seconds(1), stats),
{ok, #state{stats_timer = TRef}}. {ok, #state{stats_timer = TRef}}.
@ -239,12 +239,10 @@ handle_cast({del_local_route, Topic}, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({mnesia_system_event, {mnesia_up, Node}}, State) -> handle_info({member_up, _Node}, State) ->
lager:error("Mnesia up: ~p~n", [Node]),
{noreply, State}; {noreply, State};
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> handle_info({member_down, Node}, State) ->
lager:error("Mnesia down: ~p~n", [Node]),
clean_routes_(Node), clean_routes_(Node),
update_stats_(), update_stats_(),
{noreply, State, hibernate}; {noreply, State, hibernate};
@ -271,7 +269,7 @@ handle_info(_Info, State) ->
terminate(_Reason, #state{stats_timer = TRef}) -> terminate(_Reason, #state{stats_timer = TRef}) ->
timer:cancel(TRef), timer:cancel(TRef),
mnesia:unsubscribe(system). ekka:unsubscribe(membership).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.

View File

@ -34,7 +34,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {stats_fun, tick_tref}). -record(state, {stats_fun, ticker}).
%% @doc Start a session helper %% @doc Start a session helper
-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}). -spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}).
@ -42,9 +42,9 @@ start_link(StatsFun) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []).
init([StatsFun]) -> init([StatsFun]) ->
mnesia:subscribe(system), ekka:subscribe(membership),
{ok, TRef} = timer:send_interval(timer:seconds(1), tick), {ok, TRef} = timer:send_interval(timer:seconds(1), tick),
{ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. {ok, #state{stats_fun = StatsFun, ticker = TRef}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State). ?UNEXPECTED_REQ(Req, State).
@ -52,8 +52,7 @@ handle_call(Req, _From, State) ->
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State). ?UNEXPECTED_MSG(Msg, State).
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> handle_info({member_down, Node}, State) ->
lager:error("!!!Mnesia node down: ~s", [Node]),
Fun = fun() -> Fun = fun() ->
ClientIds = ClientIds =
mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'}, mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'},
@ -63,7 +62,7 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
mnesia:async_dirty(Fun), mnesia:async_dirty(Fun),
{noreply, State}; {noreply, State};
handle_info({mnesia_system_event, {mnesia_up, _Node}}, State) -> handle_info({member_up, _Node}, State) ->
{noreply, State}; {noreply, State};
handle_info(tick, State) -> handle_info(tick, State) ->
@ -72,9 +71,9 @@ handle_info(tick, State) ->
handle_info(Info, State) -> handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State). ?UNEXPECTED_INFO(Info, State).
terminate(_Reason, _State = #state{tick_tref = TRef}) -> terminate(_Reason, _State = #state{ticker = TRef}) ->
timer:cancel(TRef), timer:cancel(TRef),
mnesia:unsubscribe(system). ekka:unsubscribe(membership).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.