From 8a804c56f36585d58508cc31aee1a835f4b82e62 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 28 Jun 2017 15:51:24 +0800 Subject: [PATCH] Monitor/Unmonitor membership events --- src/emqttd_router.erl | 13 +++++++------ src/emqttd_sm_helper.erl | 9 +++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index e4a42b85d..9efe8b612 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -216,7 +216,7 @@ stop() -> gen_server:call(?ROUTER, stop). %%-------------------------------------------------------------------- init([]) -> - ekka:subscribe(membership), + ekka:monitor(membership), ets:new(mqtt_local_route, [set, named_table, protected]), {ok, TRef} = timer:send_interval(timer:seconds(1), stats), {ok, #state{stats_timer = TRef}}. @@ -239,14 +239,15 @@ handle_cast({del_local_route, Topic}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({membership, {member_up, _Node}}, State) -> - {noreply, State}; - -handle_info({membership, {member_down, Node}}, State) -> +handle_info({membership, {mnesia, down, Node}}, State) -> clean_routes_(Node), update_stats_(), {noreply, State, hibernate}; +handle_info({membership, _Event}, State) -> + %% ignore + {noreply, State}; + handle_info(stats, State) -> update_stats_(), {noreply, State, hibernate}; @@ -256,7 +257,7 @@ handle_info(_Info, State) -> terminate(_Reason, #state{stats_timer = TRef}) -> timer:cancel(TRef), - ekka:unsubscribe(membership). + ekka:unmonitor(membership). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 652c363f0..2081ea9e5 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -42,7 +42,7 @@ start_link(StatsFun) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []). init([StatsFun]) -> - ekka:subscribe(membership), + ekka:monitor(membership), {ok, TRef} = timer:send_interval(timer:seconds(1), tick), {ok, #state{stats_fun = StatsFun, ticker = TRef}}. @@ -52,7 +52,7 @@ handle_call(Req, _From, State) -> handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). -handle_info({membership, {mnesia_down, Node}}, State) -> +handle_info({membership, {mnesia, down, Node}}, State) -> Fun = fun() -> ClientIds = mnesia:select(mqtt_session, [{#mqtt_session{client_id = '$1', sess_pid = '$2', _ = '_'}, @@ -62,9 +62,6 @@ handle_info({membership, {mnesia_down, Node}}, State) -> mnesia:async_dirty(Fun), {noreply, State}; -handle_info({membership, {mnesia_up, _Node}}, State) -> - {noreply, State}; - handle_info({membership, _Event}, State) -> {noreply, State}; @@ -76,7 +73,7 @@ handle_info(Info, State) -> terminate(_Reason, _State = #state{ticker = TRef}) -> timer:cancel(TRef), - ekka:unsubscribe(membership). + ekka:unmonitor(membership). code_change(_OldVsn, State, _Extra) -> {ok, State}.