Merge pull request #6145 from emqx/dev/e4.2.9
Auto-pull-request-on-2021-11-12
This commit is contained in:
commit
47b2642423
|
@ -7,7 +7,7 @@
|
||||||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
||||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
||||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
|
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
|
||||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.9"}}},
|
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.10"}}},
|
||||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
|
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
|
||||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
%% -*-: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
|
|
||||||
{VSN,
|
{VSN,
|
||||||
[
|
[
|
||||||
|
@ -92,6 +92,7 @@
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.5">>, [
|
{<<"4.2.5">>, [
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
|
@ -110,6 +111,7 @@
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[6-7]">>, [
|
{<<"4.2.[6-7]">>, [
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
|
@ -120,8 +122,11 @@
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.6">>, [
|
{<<"4.2.8">>, [
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []}
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_cm, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
],
|
],
|
||||||
|
@ -216,6 +221,7 @@
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.5">>, [
|
{<<"4.2.5">>, [
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
{load_module, emqx_session, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
{load_module, emqx_congestion, brutal_purge, soft_purge, []},
|
||||||
|
@ -234,6 +240,7 @@
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.[6-7]">>, [
|
{<<"4.2.[6-7]">>, [
|
||||||
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||||
|
@ -244,8 +251,11 @@
|
||||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||||
]},
|
]},
|
||||||
{<<"4.2.6">>, [
|
{<<"4.2.8">>, [
|
||||||
{load_module, emqx_channel, brutal_purge, soft_purge, []}
|
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||||
|
{load_module, emqx_cm, brutal_purge, soft_purge, []}
|
||||||
]},
|
]},
|
||||||
{<<".*">>, []}
|
{<<".*">>, []}
|
||||||
]
|
]
|
||||||
|
|
|
@ -945,8 +945,11 @@ handle_info({sock_closed, Reason}, Channel =
|
||||||
Shutdown -> Shutdown
|
Shutdown -> Shutdown
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) ->
|
handle_info({sock_closed, _Reason}, Channel = #channel{conn_state = disconnected}) ->
|
||||||
?LOG(error, "Unexpected sock_closed: ~p", [Reason]),
|
%% Since sock_closed messages can be generated multiple times,
|
||||||
|
%% we can simply ignore errors of this type in the disconnected state.
|
||||||
|
%% e.g. when the socket send function returns an error, there is already
|
||||||
|
%% a tcp_closed delivered to the process mailbox
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(clean_acl_cache, Channel) ->
|
handle_info(clean_acl_cache, Channel) ->
|
||||||
|
|
150
src/emqx_cm.erl
150
src/emqx_cm.erl
|
@ -70,7 +70,10 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% Internal export
|
%% Internal export
|
||||||
-export([stats_fun/0]).
|
-export([stats_fun/0, clean_down/1]).
|
||||||
|
|
||||||
|
%% Test export
|
||||||
|
-export([register_channel_/3]).
|
||||||
|
|
||||||
-type(chan_pid() :: pid()).
|
-type(chan_pid() :: pid()).
|
||||||
|
|
||||||
|
@ -88,11 +91,13 @@
|
||||||
%% Batch drain
|
%% Batch drain
|
||||||
-define(BATCH_SIZE, 100000).
|
-define(BATCH_SIZE, 100000).
|
||||||
|
|
||||||
-define(T_TAKEOVER, 15000).
|
|
||||||
|
|
||||||
%% Server name
|
%% Server name
|
||||||
-define(CM, ?MODULE).
|
-define(CM, ?MODULE).
|
||||||
|
|
||||||
|
-define(T_KICK, 5000).
|
||||||
|
-define(T_GET_INFO, 5000).
|
||||||
|
-define(T_TAKEOVER, 15000).
|
||||||
|
|
||||||
%% @doc Start the channel manager.
|
%% @doc Start the channel manager.
|
||||||
-spec(start_link() -> startlink_ret()).
|
-spec(start_link() -> startlink_ret()).
|
||||||
start_link() ->
|
start_link() ->
|
||||||
|
@ -161,7 +166,7 @@ get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
error:badarg -> undefined
|
error:badarg -> undefined
|
||||||
end;
|
end;
|
||||||
get_chan_info(ClientId, ChanPid) ->
|
get_chan_info(ClientId, ChanPid) ->
|
||||||
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]).
|
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid], ?T_GET_INFO).
|
||||||
|
|
||||||
%% @doc Update infos of the channel.
|
%% @doc Update infos of the channel.
|
||||||
-spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
|
-spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
|
||||||
|
@ -186,7 +191,7 @@ get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
error:badarg -> undefined
|
error:badarg -> undefined
|
||||||
end;
|
end;
|
||||||
get_chan_stats(ClientId, ChanPid) ->
|
get_chan_stats(ClientId, ChanPid) ->
|
||||||
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
|
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO).
|
||||||
|
|
||||||
%% @doc Set channel's stats.
|
%% @doc Set channel's stats.
|
||||||
-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
|
-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
|
||||||
|
@ -254,7 +259,7 @@ takeover_session(ClientId) ->
|
||||||
takeover_session(ClientId, ChanPid);
|
takeover_session(ClientId, ChanPid);
|
||||||
ChanPids ->
|
ChanPids ->
|
||||||
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
||||||
?LOG(error, "More than one channel found: ~p", [ChanPids]),
|
?LOG(error, "more_than_one_channel_found: ~p", [ChanPids]),
|
||||||
lists:foreach(fun(StalePid) ->
|
lists:foreach(fun(StalePid) ->
|
||||||
catch discard_session(ClientId, StalePid)
|
catch discard_session(ClientId, StalePid)
|
||||||
end, StalePids),
|
end, StalePids),
|
||||||
|
@ -266,78 +271,111 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
{error, not_found};
|
||||||
ConnMod when is_atom(ConnMod) ->
|
ConnMod when is_atom(ConnMod) ->
|
||||||
|
%% TODO: if takeover times out, maybe kill the old?
|
||||||
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
|
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
|
||||||
{ok, ConnMod, ChanPid, Session}
|
{ok, ConnMod, ChanPid, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
takeover_session(ClientId, ChanPid) ->
|
takeover_session(ClientId, ChanPid) ->
|
||||||
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid]).
|
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
|
||||||
|
|
||||||
%% @doc Discard all the sessions identified by the ClientId.
|
%% @doc Discard all the sessions identified by the ClientId.
|
||||||
-spec(discard_session(emqx_types:clientid()) -> ok).
|
-spec(discard_session(emqx_types:clientid()) -> ok).
|
||||||
discard_session(ClientId) when is_binary(ClientId) ->
|
discard_session(ClientId) when is_binary(ClientId) ->
|
||||||
case lookup_channels(ClientId) of
|
case lookup_channels(ClientId) of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
ChanPids ->
|
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
|
||||||
lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_discard_session(ClientId, Pid) ->
|
%% @private Kick a local stale session to force it step down.
|
||||||
|
%% If failed to kick (e.g. timeout) force a kill.
|
||||||
|
%% Keeping the stale pid around, or returning error or raise an exception
|
||||||
|
%% benefits nobody.
|
||||||
|
-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
|
||||||
|
kick_or_kill(Action, ConnMod, Pid) ->
|
||||||
try
|
try
|
||||||
discard_session(ClientId, Pid)
|
%% this is essentailly a gen_server:call implemented in emqx_connection
|
||||||
|
%% and emqx_ws_connection.
|
||||||
|
%% the handle_call is implemented in emqx_channel
|
||||||
|
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
|
||||||
catch
|
catch
|
||||||
_ : noproc -> % emqx_ws_connection: call
|
_ : noproc -> % emqx_ws_connection: call
|
||||||
?LOG(debug, "session_already_gone: ~p", [Pid]),
|
?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]),
|
||||||
ok;
|
ok;
|
||||||
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
||||||
?LOG(debug, "session_already_gone: ~p", [Pid]),
|
?LOG(debug, "session_already_gone: ~p, action: ~p", [Pid, Action]),
|
||||||
ok;
|
ok;
|
||||||
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
|
_ : {shutdown, _} ->
|
||||||
?LOG(debug, "session_already_gone: ~p", [Pid]),
|
?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]),
|
||||||
ok;
|
ok;
|
||||||
_ : {{shutdown, _}, _} ->
|
_ : {{shutdown, _}, _} ->
|
||||||
?LOG(debug, "session_already_shutdown: ~p", [Pid]),
|
?LOG(debug, "session_already_shutdown: ~p, action: ~p", [Pid, Action]),
|
||||||
ok;
|
ok;
|
||||||
_ : Error : St ->
|
_ : {timeout, {gen_server, call, _}} ->
|
||||||
?LOG(debug, "failed_to_discard_session: ~p, "
|
?LOG(warning, "session_kick_timeout: ~p, action: ~p, "
|
||||||
"error: ~p, stacktrace: ~0p", [Pid, Error, St])
|
"stale_channel: ~p",
|
||||||
|
[Pid, Action, stale_channel_info(Pid)]),
|
||||||
|
ok = force_kill(Pid);
|
||||||
|
_ : Error ->
|
||||||
|
?LOG(error, "session_kick_exception: ~p, action: ~p, "
|
||||||
|
"reason: ~p, stacktrace: ~p, stale_channel: ~p",
|
||||||
|
[Pid, Action, Error, erlang:get_stacktrace(), stale_channel_info(Pid)]),
|
||||||
|
ok = force_kill(Pid)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
force_kill(Pid) ->
|
||||||
case get_chann_conn_mod(ClientId, ChanPid) of
|
exit(Pid, kill),
|
||||||
undefined -> ok;
|
ok.
|
||||||
ConnMod when is_atom(ConnMod) ->
|
|
||||||
ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
|
stale_channel_info(Pid) ->
|
||||||
end;
|
process_info(Pid, [status, message_queue_len, current_stacktrace]).
|
||||||
|
|
||||||
discard_session(ClientId, ChanPid) ->
|
discard_session(ClientId, ChanPid) ->
|
||||||
rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]).
|
kick_session(discard, ClientId, ChanPid).
|
||||||
|
|
||||||
|
kick_session(ClientId, ChanPid) ->
|
||||||
|
kick_session(kick, ClientId, ChanPid).
|
||||||
|
|
||||||
|
%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
|
||||||
|
kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
|
case get_chann_conn_mod(ClientId, ChanPid) of
|
||||||
|
undefined ->
|
||||||
|
%% already deregistered
|
||||||
|
ok;
|
||||||
|
ConnMod when is_atom(ConnMod) ->
|
||||||
|
ok = kick_or_kill(Action, ConnMod, ChanPid)
|
||||||
|
end;
|
||||||
|
kick_session(Action, ClientId, ChanPid) ->
|
||||||
|
%% call remote node on the old APIs because we do not know if they have upgraded
|
||||||
|
%% to have kick_session/3
|
||||||
|
Function = case Action of
|
||||||
|
discard -> discard_session;
|
||||||
|
kick -> kick_session
|
||||||
|
end,
|
||||||
|
try
|
||||||
|
rpc_call(node(ChanPid), Function, [ClientId, ChanPid], ?T_KICK)
|
||||||
|
catch
|
||||||
|
Error : Reason ->
|
||||||
|
%% This should mostly be RPC failures.
|
||||||
|
%% However, if the node is still running the old version
|
||||||
|
%% code (prior to emqx app 4.3.10) some of the RPC handler
|
||||||
|
%% exceptions may get propagated to a new version node
|
||||||
|
?LOG(error, "failed_to_kick_session_on_remote_node ~p: ~p ~p ~p",
|
||||||
|
[node(ChanPid), Action, Error, Reason])
|
||||||
|
end.
|
||||||
|
|
||||||
kick_session(ClientId) ->
|
kick_session(ClientId) ->
|
||||||
case lookup_channels(ClientId) of
|
case lookup_channels(ClientId) of
|
||||||
[] -> {error, not_found};
|
[] ->
|
||||||
[ChanPid] ->
|
?LOG(warning, "kiecked_an_unknown_session ~ts", [ClientId]),
|
||||||
kick_session(ClientId, ChanPid);
|
ok;
|
||||||
ChanPids ->
|
ChanPids ->
|
||||||
[ChanPid|StalePids] = lists:reverse(ChanPids),
|
case length(ChanPids) > 1 of
|
||||||
?LOG(error, "More than one channel found: ~p", [ChanPids]),
|
true -> ?LOG(info, "more_than_one_channel_found: ~p", [ChanPids]);
|
||||||
lists:foreach(fun(StalePid) ->
|
false -> ok
|
||||||
catch discard_session(ClientId, StalePid)
|
end,
|
||||||
end, StalePids),
|
lists:foreach(fun(Pid) -> kick_session(ClientId, Pid) end, ChanPids)
|
||||||
kick_session(ClientId, ChanPid)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|
||||||
case get_chan_info(ClientId, ChanPid) of
|
|
||||||
#{conninfo := #{conn_mod := ConnMod}} ->
|
|
||||||
ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
|
|
||||||
undefined ->
|
|
||||||
{error, not_found}
|
|
||||||
end;
|
|
||||||
|
|
||||||
kick_session(ClientId, ChanPid) ->
|
|
||||||
rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]).
|
|
||||||
|
|
||||||
%% @doc Is clean start?
|
%% @doc Is clean start?
|
||||||
% is_clean_start(#{clean_start := false}) -> false;
|
% is_clean_start(#{clean_start := false}) -> false;
|
||||||
% is_clean_start(_Attrs) -> true.
|
% is_clean_start(_Attrs) -> true.
|
||||||
|
@ -373,10 +411,16 @@ lookup_channels(local, ClientId) ->
|
||||||
[ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
|
[ChanPid || {_, ChanPid} <- ets:lookup(?CHAN_TAB, ClientId)].
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
rpc_call(Node, Fun, Args) ->
|
rpc_call(Node, Fun, Args, Timeout) ->
|
||||||
case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
|
case rpc:call(Node, ?MODULE, Fun, Args, 2 * Timeout) of
|
||||||
{badrpc, Reason} -> error(Reason);
|
{badrpc, Reason} ->
|
||||||
Res -> Res
|
%% since eqmx app 4.3.10, the 'kick' and 'discard' calls hanndler
|
||||||
|
%% should catch all exceptions and always return 'ok'.
|
||||||
|
%% This leaves 'badrpc' only possible when there is problem
|
||||||
|
%% calling the remote node.
|
||||||
|
error({badrpc, Reason});
|
||||||
|
Res ->
|
||||||
|
Res
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
@ -409,7 +453,7 @@ handle_cast(Msg, State) ->
|
||||||
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
|
||||||
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
|
||||||
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
|
||||||
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun clean_down/1, Items]),
|
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),
|
||||||
{noreply, State#{chan_pmon := PMon1}};
|
{noreply, State#{chan_pmon := PMon1}};
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
@ -445,5 +489,5 @@ get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
error:badarg -> undefined
|
error:badarg -> undefined
|
||||||
end;
|
end;
|
||||||
get_chann_conn_mod(ClientId, ChanPid) ->
|
get_chann_conn_mod(ClientId, ChanPid) ->
|
||||||
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).
|
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO).
|
||||||
|
|
||||||
|
|
|
@ -685,7 +685,10 @@ handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({sock_error, Reason}, State) ->
|
handle_info({sock_error, Reason}, State) ->
|
||||||
Reason =/= closed andalso ?LOG(error, "Socket error: ~p", [Reason]),
|
case Reason =/= closed andalso Reason =/= einval of
|
||||||
|
true -> ?LOG(warning, "socket_error: ~p", [Reason]);
|
||||||
|
false -> ok
|
||||||
|
end,
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
|
|
|
@ -67,6 +67,8 @@
|
||||||
version => ?MQTT_PROTO_V4
|
version => ?MQTT_PROTO_V4
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-define(MULTIPLIER_MAX, 16#200000).
|
||||||
|
|
||||||
-dialyzer({no_match, [serialize_utf8_string/2]}).
|
-dialyzer({no_match, [serialize_utf8_string/2]}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -142,7 +144,7 @@ parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
|
||||||
parse_frame(Rest, Header, 2, Options);
|
parse_frame(Rest, Header, 2, Options);
|
||||||
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
|
parse_remaining_len(<<1:1, _Len:7, _Rest/binary>>, _Header, Multiplier, _Value, _Options)
|
||||||
when Multiplier > 2097152 ->
|
when Multiplier > ?MULTIPLIER_MAX ->
|
||||||
error(malformed_variable_byte_integer);
|
error(malformed_variable_byte_integer);
|
||||||
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Options) ->
|
||||||
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
parse_remaining_len(Rest, Header, Multiplier * ?HIGHBIT, Value + Len * Multiplier, Options);
|
||||||
|
@ -411,6 +413,9 @@ parse_property(<<16#2A, Val, Bin/binary>>, Props) ->
|
||||||
|
|
||||||
parse_variable_byte_integer(Bin) ->
|
parse_variable_byte_integer(Bin) ->
|
||||||
parse_variable_byte_integer(Bin, 1, 0).
|
parse_variable_byte_integer(Bin, 1, 0).
|
||||||
|
parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value)
|
||||||
|
when Multiplier > ?MULTIPLIER_MAX ->
|
||||||
|
error(malformed_variable_byte_integer);
|
||||||
parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
|
parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
|
||||||
parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
|
parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
|
||||||
parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
|
parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
|
||||||
|
|
|
@ -31,6 +31,12 @@
|
||||||
conn_mod => emqx_connection,
|
conn_mod => emqx_connection,
|
||||||
receive_maximum => 100}}).
|
receive_maximum => 100}}).
|
||||||
|
|
||||||
|
-define(WAIT(PATTERN, TIMEOUT, RET),
|
||||||
|
fun() ->
|
||||||
|
receive PATTERN -> RET
|
||||||
|
after TIMEOUT -> error({timeout, ?LINE}) end
|
||||||
|
end()).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% CT callbacks
|
%% CT callbacks
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -151,19 +157,98 @@ t_open_session_race_condition(_) ->
|
||||||
exit(Pid, kill), timer:sleep(100),
|
exit(Pid, kill), timer:sleep(100),
|
||||||
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
|
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
|
||||||
|
|
||||||
t_discard_session(_) ->
|
t_kick_session_discard_normal(_) ->
|
||||||
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
test_kick_session(discard, normal).
|
||||||
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
|
|
||||||
ok = emqx_cm:discard_session(<<"clientid">>),
|
t_kick_session_discard_shutdown(_) ->
|
||||||
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
test_kick_session(discard, shutdown).
|
||||||
ok = emqx_cm:discard_session(<<"clientid">>),
|
|
||||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
t_kick_session_discard_shutdown_with_reason(_) ->
|
||||||
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
test_kick_session(discard, {shutdown, discard}).
|
||||||
ok = emqx_cm:discard_session(<<"clientid">>),
|
|
||||||
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
|
t_kick_session_discard_timeout(_) ->
|
||||||
ok = emqx_cm:discard_session(<<"clientid">>),
|
test_kick_session(discard, timeout).
|
||||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
||||||
ok = meck:unload(emqx_connection).
|
t_kick_session_discard_noproc(_) ->
|
||||||
|
test_kick_session(discard, noproc).
|
||||||
|
|
||||||
|
t_kick_session_kick_normal(_) ->
|
||||||
|
test_kick_session(discard, normal).
|
||||||
|
|
||||||
|
t_kick_session_kick_shutdown(_) ->
|
||||||
|
test_kick_session(discard, shutdown).
|
||||||
|
|
||||||
|
t_kick_session_kick_shutdown_with_reason(_) ->
|
||||||
|
test_kick_session(discard, {shutdown, discard}).
|
||||||
|
|
||||||
|
t_kick_session_kick_timeout(_) ->
|
||||||
|
test_kick_session(discard, timeout).
|
||||||
|
|
||||||
|
t_kick_session_kick_noproc(_) ->
|
||||||
|
test_kick_session(discard, noproc).
|
||||||
|
|
||||||
|
test_kick_session(Action, Reason) ->
|
||||||
|
ClientId = rand_client_id(),
|
||||||
|
#{conninfo := ConnInfo} = ?ChanInfo,
|
||||||
|
FakeSessionFun =
|
||||||
|
fun Loop() ->
|
||||||
|
receive
|
||||||
|
{'$gen_call', From, A} when A =:= kick orelse
|
||||||
|
A =:= discard ->
|
||||||
|
case Reason of
|
||||||
|
normal ->
|
||||||
|
gen_server:reply(From, ok);
|
||||||
|
timeout ->
|
||||||
|
%% no response to the call
|
||||||
|
Loop();
|
||||||
|
_ ->
|
||||||
|
exit(Reason)
|
||||||
|
end;
|
||||||
|
Msg ->
|
||||||
|
ct:pal("(~p) fake_session_discarded ~p", [Action, Msg]),
|
||||||
|
Loop()
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{Pid1, _} = spawn_monitor(FakeSessionFun),
|
||||||
|
{Pid2, _} = spawn_monitor(FakeSessionFun),
|
||||||
|
ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo),
|
||||||
|
ok = emqx_cm:register_channel_(ClientId, Pid1, ConnInfo),
|
||||||
|
ok = emqx_cm:register_channel_(ClientId, Pid2, ConnInfo),
|
||||||
|
?assertEqual([Pid1, Pid2], lists:sort(emqx_cm:lookup_channels(ClientId))),
|
||||||
|
case Reason of
|
||||||
|
noproc -> exit(Pid1, kill), exit(Pid2, kill);
|
||||||
|
_ -> ok
|
||||||
|
end,
|
||||||
|
ok = case Action of
|
||||||
|
kick -> emqx_cm:kick_session(ClientId);
|
||||||
|
discard -> emqx_cm:discard_session(ClientId)
|
||||||
|
end,
|
||||||
|
case Reason =:= timeout orelse Reason =:= noproc of
|
||||||
|
true ->
|
||||||
|
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)),
|
||||||
|
?assertEqual(killed, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R));
|
||||||
|
false ->
|
||||||
|
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid1, R}, 2000, R)),
|
||||||
|
?assertEqual(Reason, ?WAIT({'DOWN', _, process, Pid2, R}, 2000, R))
|
||||||
|
end,
|
||||||
|
ok = flush_emqx_pool(),
|
||||||
|
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
|
||||||
|
|
||||||
|
rand_client_id() ->
|
||||||
|
list_to_binary("client-id-" ++ integer_to_list(erlang:system_time())).
|
||||||
|
|
||||||
|
%% Channel deregistration is delegated to emqx_pool as a sync tasks.
|
||||||
|
%% The emqx_pool is pool of workers, and there is no way to know
|
||||||
|
%% which worker was picked for the last deregistration task.
|
||||||
|
%% This help function creates a large enough number of async tasks
|
||||||
|
%% to sync with the pool workers.
|
||||||
|
%% The number of tasks should be large enough to ensure all workers have
|
||||||
|
%% the chance to work on at least one of the tasks.
|
||||||
|
flush_emqx_pool() ->
|
||||||
|
Self = self(),
|
||||||
|
L = lists:seq(1, 1000),
|
||||||
|
lists:foreach(fun(I) -> emqx_pool:async_submit(fun() -> Self ! {done, I} end, []) end, L),
|
||||||
|
lists:foreach(fun(I) -> receive {done, I} -> ok end end, L).
|
||||||
|
|
||||||
t_takeover_session(_) ->
|
t_takeover_session(_) ->
|
||||||
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
|
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
|
||||||
|
@ -178,21 +263,6 @@ t_takeover_session(_) ->
|
||||||
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
|
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
|
||||||
emqx_cm:unregister_channel(<<"clientid">>).
|
emqx_cm:unregister_channel(<<"clientid">>).
|
||||||
|
|
||||||
t_kick_session(_) ->
|
|
||||||
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
|
||||||
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end),
|
|
||||||
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
|
|
||||||
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
|
||||||
test = emqx_cm:kick_session(<<"clientid">>),
|
|
||||||
erlang:spawn(fun() ->
|
|
||||||
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
|
||||||
timer:sleep(1000)
|
|
||||||
end),
|
|
||||||
ct:sleep(100),
|
|
||||||
test = emqx_cm:kick_session(<<"clientid">>),
|
|
||||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
|
||||||
ok = meck:unload(emqx_connection).
|
|
||||||
|
|
||||||
t_all_channels(_) ->
|
t_all_channels(_) ->
|
||||||
?assertEqual(true, is_list(emqx_cm:all_channels())).
|
?assertEqual(true, is_list(emqx_cm:all_channels())).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue