From 1f3bab2bcbf08fe69440085b37c87a0889891542 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 16 Jun 2017 16:02:36 +0800 Subject: [PATCH] Integrate with ekka library, use ekka:subscribe/1 to replace mnesia:subscribe/1 --- src/emqttd_router.erl | 10 ++++------ src/emqttd_sm_helper.erl | 15 +++++++-------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index d3aee421d..e51657285 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -216,7 +216,7 @@ stop() -> gen_server:call(?ROUTER, stop). %%-------------------------------------------------------------------- init([]) -> - mnesia:subscribe(system), + ekka:subscribe(membership), ets:new(mqtt_local_route, [set, named_table, protected]), {ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, #state{stats_timer = TRef}}. @@ -239,12 +239,10 @@ handle_cast({del_local_route, Topic}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({mnesia_system_event, {mnesia_up, Node}}, State) -> - lager:error("Mnesia up: ~p~n", [Node]), +handle_info({member_up, _Node}, State) -> {noreply, State}; -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - lager:error("Mnesia down: ~p~n", [Node]), +handle_info({member_down, Node}, State) -> clean_routes_(Node), update_stats_(), {noreply, State, hibernate}; @@ -271,7 +269,7 @@ handle_info(_Info, State) -> terminate(_Reason, #state{stats_timer = TRef}) -> timer:cancel(TRef), - mnesia:unsubscribe(system). + ekka:unsubscribe(membership). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 0f7cb7ef1..2a32fba58 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -34,7 +34,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {stats_fun, tick_tref}). +-record(state, {stats_fun, ticker}). %% @doc Start a session helper -spec(start_link(fun()) -> {ok, pid()} | ignore | {error, any()}). @@ -42,9 +42,9 @@ start_link(StatsFun) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). init([StatsFun]) -> - mnesia:subscribe(system), + ekka:subscribe(membership), {ok, TRef} = timer:send_interval(timer:seconds(1), tick), - {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. + {ok, #state{stats_fun = StatsFun, ticker = TRef}}. handle_call(Req, _From, State) -> ?UNEXPECTED_REQ(Req, State). @@ -52,8 +52,7 @@ handle_call(Req, _From, State) -> handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - lager:error("!!!Mnesia node down: ~s", [Node]), +handle_info({member_down, Node}, State) -> Fun = fun() -> ClientIds = mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'}, @@ -63,7 +62,7 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> mnesia:async_dirty(Fun), {noreply, State}; -handle_info({mnesia_system_event, {mnesia_up, _Node}}, State) -> +handle_info({member_up, _Node}, State) -> {noreply, State}; handle_info(tick, State) -> @@ -72,9 +71,9 @@ handle_info(tick, State) -> handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). -terminate(_Reason, _State = #state{tick_tref = TRef}) -> +terminate(_Reason, _State = #state{ticker = TRef}) -> timer:cancel(TRef), - mnesia:unsubscribe(system). + ekka:unsubscribe(membership). code_change(_OldVsn, State, _Extra) -> {ok, State}.