Merge pull request #787 from emqtt/emq20
Stop plugins before the broker stopped, clean routes when a node down
This commit is contained in:
commit
82d017440d
|
@ -14,7 +14,7 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% Facade Module for The EMQTT Broker
|
%% Facade Module for The EMQ Broker
|
||||||
|
|
||||||
-module(emqttd).
|
-module(emqttd).
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@
|
||||||
|
|
||||||
-include("emqttd_protocol.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-export([start/0, env/1, env/2, is_running/1]).
|
-export([start/0, env/1, env/2, is_running/1, stop/0]).
|
||||||
|
|
||||||
%% PubSub API
|
%% PubSub API
|
||||||
-export([subscribe/1, subscribe/2, subscribe/3, publish/1,
|
-export([subscribe/1, subscribe/2, subscribe/3, publish/1,
|
||||||
|
@ -57,6 +57,10 @@
|
||||||
-spec(start() -> ok | {error, any()}).
|
-spec(start() -> ok | {error, any()}).
|
||||||
start() -> application:start(?APP).
|
start() -> application:start(?APP).
|
||||||
|
|
||||||
|
%% @doc Stop emqttd application.
|
||||||
|
-spec(stop() -> ok | {error, any()}).
|
||||||
|
stop() -> application:stop(?APP).
|
||||||
|
|
||||||
%% @doc Environment
|
%% @doc Environment
|
||||||
-spec(env(Key:: atom()) -> {ok, any()} | undefined).
|
-spec(env(Key:: atom()) -> {ok, any()} | undefined).
|
||||||
env(Key) -> application:get_env(?APP, Key).
|
env(Key) -> application:get_env(?APP, Key).
|
||||||
|
|
|
@ -284,11 +284,11 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
clean_routes_(Node) ->
|
clean_routes_(Node) ->
|
||||||
Pattern = #mqtt_route{_ = '_', node = Node},
|
Pattern = #mqtt_route{_ = '_', node = Node},
|
||||||
Clean = fun() ->
|
Clean = fun() ->
|
||||||
[mnesia:delete_object(route, R, write) ||
|
[mnesia:delete_object(mqtt_route, R, write) ||
|
||||||
R <- mnesia:match_object(route, Pattern, write)]
|
R <- mnesia:match_object(mqtt_route, Pattern, write)]
|
||||||
end,
|
end,
|
||||||
mnesia:transaction(Clean).
|
mnesia:transaction(Clean).
|
||||||
|
|
||||||
update_stats_() ->
|
update_stats_() ->
|
||||||
emqttd_stats:setstats('routes/count', 'routes/max', mnesia:table_info(route, size)).
|
emqttd_stats:setstats('routes/count', 'routes/max', mnesia:table_info(mqtt_route, size)).
|
||||||
|
|
||||||
|
|
|
@ -332,10 +332,14 @@ handle_cast({unsubscribe, TopicTable}, Session = #session{client_id = Client
|
||||||
end, Subscriptions, TopicTable),
|
end, Subscriptions, TopicTable),
|
||||||
hibernate(Session#session{subscriptions = Subscriptions1});
|
hibernate(Session#session{subscriptions = Subscriptions1});
|
||||||
|
|
||||||
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
|
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId, client_pid = undefined}) ->
|
||||||
?LOG(warning, "destroyed", [], Session),
|
?LOG(warning, "destroyed", [], Session),
|
||||||
shutdown(destroy, Session);
|
shutdown(destroy, Session);
|
||||||
|
|
||||||
|
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId, client_pid = OldClientPid}) ->
|
||||||
|
?LOG(warning, "kickout ~p", [OldClientPid], Session),
|
||||||
|
shutdown(conflict, Session);
|
||||||
|
|
||||||
handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId,
|
handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId,
|
||||||
client_pid = OldClientPid,
|
client_pid = OldClientPid,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
|
|
Loading…
Reference in New Issue