Merge pull request #5816 from emqx/dev/e4.2.8
Auto-pull-request-on-2021-09-27
This commit is contained in:
commit
26b426b7ad
|
@ -30,11 +30,13 @@
|
|||
%% MQTT Protocol Version and Names
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(MQTT_SN_PROTO_V1, 1).
|
||||
-define(MQTT_PROTO_V3, 3).
|
||||
-define(MQTT_PROTO_V4, 4).
|
||||
-define(MQTT_PROTO_V5, 5).
|
||||
|
||||
-define(PROTOCOL_NAMES, [
|
||||
{?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in
|
||||
{?MQTT_PROTO_V3, <<"MQIsdp">>},
|
||||
{?MQTT_PROTO_V4, <<"MQTT">>},
|
||||
{?MQTT_PROTO_V5, <<"MQTT">>}]).
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
|
||||
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
|
||||
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.8"}}},
|
||||
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.9"}}},
|
||||
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
|
||||
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
|
||||
]}.
|
||||
|
|
|
@ -19,7 +19,13 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.2.1", [
|
||||
{add_module, emqx_congestion},
|
||||
|
@ -37,7 +43,13 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.[23]">>, [
|
||||
{add_module, emqx_congestion},
|
||||
|
@ -52,7 +64,13 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.4">>, [
|
||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||
|
@ -65,7 +83,13 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.5">>, [
|
||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||
|
@ -77,7 +101,24 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.[6-7]">>, [
|
||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.6">>, [
|
||||
{load_module, emqx_channel, brutal_purge, soft_purge, []}
|
||||
|
@ -102,7 +143,13 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{"4.2.1", [
|
||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||
|
@ -120,7 +167,13 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.[23]">>, [
|
||||
{load_module, emqx_shared_sub, brutal_purge, soft_purge, []},
|
||||
|
@ -135,7 +188,13 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.4">>, [
|
||||
{load_module, emqx_frame, brutal_purge, soft_purge, []},
|
||||
|
@ -148,7 +207,13 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.5">>, [
|
||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||
|
@ -160,7 +225,24 @@
|
|||
{load_module, emqx_router_sup, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_broker, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]},
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []}
|
||||
{load_module, emqx_router, soft_purge, soft_purge, []},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.[6-7]">>, [
|
||||
{load_module, emqx_channel, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_connection, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_ws_connection, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_cm, brutal_purge, soft_purge, []},
|
||||
{load_module, emqx_access_rule, brutal_purge, soft_purge, []},
|
||||
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_mqueue,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.2.6">>, [
|
||||
{load_module, emqx_channel, brutal_purge, soft_purge, []}
|
||||
|
|
|
@ -28,7 +28,8 @@
|
|||
-type(who() :: all | binary() |
|
||||
{client, binary()} |
|
||||
{user, binary()} |
|
||||
{ipaddr, esockd_cidr:cidr_string()}).
|
||||
{ipaddr, esockd_cidr:cidr_string()} |
|
||||
{ipaddrs, list(esockd_cidr:cidr_string())}).
|
||||
|
||||
-type(access() :: subscribe | publish | pubsub).
|
||||
|
||||
|
@ -52,6 +53,8 @@ compile(who, all) ->
|
|||
all;
|
||||
compile(who, {ipaddr, CIDR}) ->
|
||||
{ipaddr, esockd_cidr:parse(CIDR, true)};
|
||||
compile(who, {ipaddrs, CIDRs}) ->
|
||||
{ipaddrs, lists:map(fun(CIDR) -> esockd_cidr:parse(CIDR, true) end, CIDRs)};
|
||||
compile(who, {client, all}) ->
|
||||
{client, all};
|
||||
compile(who, {client, ClientId}) ->
|
||||
|
@ -108,8 +111,14 @@ match_who(#{username := Username}, {user, Username}) ->
|
|||
true;
|
||||
match_who(#{peerhost := undefined}, {ipaddr, _Tup}) ->
|
||||
false;
|
||||
match_who(#{peerhost := undefined}, {ipaddrs, _}) ->
|
||||
false;
|
||||
match_who(#{peerhost := IP}, {ipaddr, CIDR}) ->
|
||||
esockd_cidr:match(IP, CIDR);
|
||||
match_who(#{peerhost := IP}, {ipaddrs, CIDRs}) ->
|
||||
lists:any(fun(CIDR) ->
|
||||
esockd_cidr:match(IP, CIDR)
|
||||
end, CIDRs);
|
||||
match_who(ClientInfo, {'and', Conds}) when is_list(Conds) ->
|
||||
lists:foldl(fun(Who, Allow) ->
|
||||
match_who(ClientInfo, Who) andalso Allow
|
||||
|
|
|
@ -56,20 +56,20 @@ init({_Args, {alarm_handler, _ExistingAlarms}}) ->
|
|||
init(_) ->
|
||||
{ok, []}.
|
||||
|
||||
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
|
||||
handle_event({set_alarm, {system_memory_high_watermark, []}}, State) ->
|
||||
emqx_alarm:activate(high_system_memory_usage, #{high_watermark => emqx_os_mon:get_sysmem_high_watermark()}),
|
||||
{ok, State};
|
||||
|
||||
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
|
||||
emqx_alarm:activate(high_process_memory_usage, #{pid => Pid,
|
||||
handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
|
||||
emqx_alarm:activate(high_process_memory_usage, #{pid => list_to_binary(pid_to_list(Pid)),
|
||||
high_watermark => emqx_os_mon:get_procmem_high_watermark()}),
|
||||
{ok, State};
|
||||
|
||||
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
|
||||
handle_event({clear_alarm, system_memory_high_watermark}, State) ->
|
||||
emqx_alarm:deactivate(high_system_memory_usage),
|
||||
{ok, State};
|
||||
|
||||
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
|
||||
handle_event({clear_alarm, process_memory_high_watermark}, State) ->
|
||||
emqx_alarm:deactivate(high_process_memory_usage),
|
||||
{ok, State};
|
||||
|
||||
|
@ -85,4 +85,4 @@ handle_call(_Query, State) ->
|
|||
terminate(swap, _State) ->
|
||||
{emqx_alarm_handler, []};
|
||||
terminate(_, _) ->
|
||||
ok.
|
||||
ok.
|
||||
|
|
|
@ -1523,6 +1523,7 @@ clear_keepalive(Channel = #channel{timers = Timers}) ->
|
|||
emqx_misc:cancel_timer(TRef),
|
||||
Channel#channel{timers = maps:without([alive_timer], Timers)}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Maybe Resume Session
|
||||
|
||||
|
|
|
@ -88,6 +88,8 @@
|
|||
%% Batch drain
|
||||
-define(BATCH_SIZE, 100000).
|
||||
|
||||
-define(T_TAKEOVER, 15000).
|
||||
|
||||
%% Server name
|
||||
-define(CM, ?MODULE).
|
||||
|
||||
|
@ -222,7 +224,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
|||
case takeover_session(ClientId) of
|
||||
{ok, ConnMod, ChanPid, Session} ->
|
||||
ok = emqx_session:resume(ClientInfo, Session),
|
||||
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
|
||||
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
|
||||
register_channel_(ClientId, Self, ConnInfo),
|
||||
{ok, #{session => Session,
|
||||
present => true,
|
||||
|
@ -264,7 +266,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
|||
undefined ->
|
||||
{error, not_found};
|
||||
ConnMod when is_atom(ConnMod) ->
|
||||
Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
|
||||
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
|
||||
{ok, ConnMod, ChanPid, Session}
|
||||
end;
|
||||
|
||||
|
@ -277,24 +279,35 @@ discard_session(ClientId) when is_binary(ClientId) ->
|
|||
case lookup_channels(ClientId) of
|
||||
[] -> ok;
|
||||
ChanPids ->
|
||||
lists:foreach(
|
||||
fun(ChanPid) ->
|
||||
try
|
||||
discard_session(ClientId, ChanPid)
|
||||
catch
|
||||
_:{noproc,_}:_Stk -> ok;
|
||||
_:{{shutdown,_},_}:_Stk -> ok;
|
||||
_:Error:_Stk ->
|
||||
?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error])
|
||||
end
|
||||
end, ChanPids)
|
||||
lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids)
|
||||
end.
|
||||
|
||||
do_discard_session(ClientId, Pid) ->
|
||||
try
|
||||
discard_session(ClientId, Pid)
|
||||
catch
|
||||
_ : noproc -> % emqx_ws_connection: call
|
||||
?LOG(debug, "session_already_gone: ~p", [Pid]),
|
||||
ok;
|
||||
_ : {noproc, _} -> % emqx_connection: gen_server:call
|
||||
?LOG(debug, "session_already_gone: ~p", [Pid]),
|
||||
ok;
|
||||
_ : {'EXIT', {noproc, _}} -> % rpc_call/3
|
||||
?LOG(debug, "session_already_gone: ~p", [Pid]),
|
||||
ok;
|
||||
_ : {{shutdown, _}, _} ->
|
||||
?LOG(debug, "session_already_shutdown: ~p", [Pid]),
|
||||
ok;
|
||||
_ : Error : St ->
|
||||
?LOG(debug, "failed_to_discard_session: ~p, "
|
||||
"error: ~p, stacktrace: ~0p", [Pid, Error, St])
|
||||
end.
|
||||
|
||||
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||
case get_chann_conn_mod(ClientId, ChanPid) of
|
||||
undefined -> ok;
|
||||
ConnMod when is_atom(ConnMod) ->
|
||||
ConnMod:call(ChanPid, discard)
|
||||
ConnMod:call(ChanPid, discard, ?T_TAKEOVER)
|
||||
end;
|
||||
|
||||
discard_session(ClientId, ChanPid) ->
|
||||
|
@ -317,7 +330,7 @@ kick_session(ClientId) ->
|
|||
kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||
case get_chan_info(ClientId, ChanPid) of
|
||||
#{conninfo := #{conn_mod := ConnMod}} ->
|
||||
ConnMod:call(ChanPid, kick);
|
||||
ConnMod:call(ChanPid, kick, ?T_TAKEOVER);
|
||||
undefined ->
|
||||
{error, not_found}
|
||||
end;
|
||||
|
@ -361,7 +374,7 @@ lookup_channels(local, ClientId) ->
|
|||
|
||||
%% @private
|
||||
rpc_call(Node, Fun, Args) ->
|
||||
case rpc:call(Node, ?MODULE, Fun, Args) of
|
||||
case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of
|
||||
{badrpc, Reason} -> error(Reason);
|
||||
Res -> Res
|
||||
end.
|
||||
|
|
|
@ -38,7 +38,7 @@
|
|||
, stats/1
|
||||
]).
|
||||
|
||||
-export([call/2]).
|
||||
-export([call/2, call/3]).
|
||||
|
||||
%% Callback
|
||||
-export([init/4]).
|
||||
|
@ -168,7 +168,9 @@ stats(#state{transport = Transport,
|
|||
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
|
||||
|
||||
call(Pid, Req) ->
|
||||
gen_server:call(Pid, Req, infinity).
|
||||
call(Pid, Req, infinity).
|
||||
call(Pid, Req, Timeout) ->
|
||||
gen_server:call(Pid, Req, Timeout).
|
||||
|
||||
stop(Pid) ->
|
||||
gen_server:stop(Pid).
|
||||
|
|
|
@ -45,6 +45,8 @@
|
|||
, index_of/2
|
||||
]).
|
||||
|
||||
-define(OOM_FACTOR, 1.25).
|
||||
|
||||
%% @doc Merge options
|
||||
-spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()).
|
||||
merge_opts(Defaults, Options) ->
|
||||
|
@ -185,8 +187,8 @@ do_check_oom([{Val, Max, Reason}|Rest]) ->
|
|||
|
||||
tune_heap_size(#{max_heap_size := MaxHeapSize}) ->
|
||||
%% If set to zero, the limit is disabled.
|
||||
erlang:process_flag(max_heap_size, #{size => MaxHeapSize,
|
||||
kill => false,
|
||||
erlang:process_flag(max_heap_size, #{size => must_kill_heap_size(MaxHeapSize),
|
||||
kill => true,
|
||||
error_logger => true
|
||||
});
|
||||
tune_heap_size(undefined) -> ok.
|
||||
|
@ -233,3 +235,19 @@ index_of(E, I, [E|_]) ->
|
|||
index_of(E, I, [_|L]) ->
|
||||
index_of(E, I+1, L).
|
||||
|
||||
must_kill_heap_size(Size) ->
|
||||
%% We set the max allowed heap size by `erlang:process_flag(max_heap_size, #{size => Size})`,
|
||||
%% where the `Size` cannot be set to an integer lager than `(1 bsl 59) - 1` on a 64-bit system,
|
||||
%% or `(1 bsl 27) - 1` on a 32-bit system.
|
||||
MaxAllowedSize = case erlang:system_info(wordsize) of
|
||||
8 -> % arch_64
|
||||
(1 bsl 59) - 1;
|
||||
4 -> % arch_32
|
||||
(1 bsl 27) - 1
|
||||
end,
|
||||
%% We multiply the size with factor ?OOM_FACTOR, to give the
|
||||
%% process a chance to suicide by `check_oom/1`
|
||||
case ceil(Size * ?OOM_FACTOR) of
|
||||
Size0 when Size0 >= MaxAllowedSize -> MaxAllowedSize;
|
||||
Size0 -> Size0
|
||||
end.
|
||||
|
|
|
@ -67,6 +67,9 @@
|
|||
, dropped/1
|
||||
]).
|
||||
|
||||
-export([ live_upgrade/1
|
||||
]).
|
||||
|
||||
-export_type([mqueue/0, options/0]).
|
||||
|
||||
-type(topic() :: emqx_topic:topic()).
|
||||
|
@ -91,6 +94,11 @@
|
|||
-define(MAX_LEN_INFINITY, 0).
|
||||
-define(INFO_KEYS, [store_qos0, max_len, len, dropped]).
|
||||
|
||||
-record(shift_opts, {
|
||||
multiplier :: non_neg_integer(),
|
||||
base :: integer()
|
||||
}).
|
||||
|
||||
-record(mqueue, {
|
||||
store_qos0 = false :: boolean(),
|
||||
max_len = ?MAX_LEN_INFINITY :: count(),
|
||||
|
@ -98,11 +106,16 @@
|
|||
dropped = 0 :: count(),
|
||||
p_table = ?NO_PRIORITY_TABLE :: p_table(),
|
||||
default_p = ?LOWEST_PRIORITY :: priority(),
|
||||
q = ?PQUEUE:new() :: pq()
|
||||
q = ?PQUEUE:new() :: pq(),
|
||||
shift_opts :: #shift_opts{},
|
||||
last_p :: non_neg_integer() | undefined,
|
||||
counter :: non_neg_integer() | undefined
|
||||
}).
|
||||
|
||||
-type(mqueue() :: #mqueue{}).
|
||||
|
||||
-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}).
|
||||
|
||||
-spec(init(options()) -> mqueue()).
|
||||
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
|
||||
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
|
||||
|
@ -112,7 +125,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
|
|||
#mqueue{max_len = MaxLen,
|
||||
store_qos0 = QoS_0,
|
||||
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
|
||||
default_p = get_priority_opt(Opts)
|
||||
default_p = get_priority_opt(Opts),
|
||||
shift_opts = get_shift_opt(Opts)
|
||||
}.
|
||||
|
||||
-spec(info(mqueue()) -> emqx_types:infos()).
|
||||
|
@ -127,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) ->
|
|||
info(len, #mqueue{len = Len}) ->
|
||||
Len;
|
||||
info(dropped, #mqueue{dropped = Dropped}) ->
|
||||
Dropped.
|
||||
Dropped;
|
||||
info(Info, ?OLD(MQ)) ->
|
||||
info(Info, live_upgrade(MQ)).
|
||||
|
||||
is_empty(#mqueue{len = Len}) -> Len =:= 0.
|
||||
is_empty(#mqueue{len = Len}) -> Len =:= 0;
|
||||
is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)).
|
||||
|
||||
len(#mqueue{len = Len}) -> Len.
|
||||
len(#mqueue{len = Len}) -> Len;
|
||||
len(?OLD(MQ)) -> len(live_upgrade(MQ)).
|
||||
|
||||
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
|
||||
max_len(#mqueue{max_len = MaxLen}) -> MaxLen;
|
||||
max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)).
|
||||
|
||||
%% @doc Return number of dropped messages.
|
||||
-spec(dropped(mqueue()) -> count()).
|
||||
dropped(#mqueue{dropped = Dropped}) -> Dropped.
|
||||
dropped(#mqueue{dropped = Dropped}) -> Dropped;
|
||||
dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)).
|
||||
|
||||
%% @doc Stats of the mqueue
|
||||
-spec(stats(mqueue()) -> [stat()]).
|
||||
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
|
||||
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
|
||||
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}];
|
||||
stats(?OLD(MQ)) ->
|
||||
stats(live_upgrade(MQ)).
|
||||
|
||||
%% @doc Enqueue a message.
|
||||
-spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}).
|
||||
|
@ -165,15 +187,34 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
|
|||
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
|
||||
false ->
|
||||
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
|
||||
end.
|
||||
end;
|
||||
in(Msg, ?OLD(MQ)) ->
|
||||
in(Msg, live_upgrade(MQ)).
|
||||
|
||||
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
|
||||
out(MQ = #mqueue{len = 0, q = Q}) ->
|
||||
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
|
||||
{empty, MQ};
|
||||
out(MQ = #mqueue{q = Q, len = Len}) ->
|
||||
out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) ->
|
||||
{{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length
|
||||
MQ1 = MQ#mqueue{
|
||||
q = Q1,
|
||||
len = Len - 1,
|
||||
last_p = Prio,
|
||||
counter = init_counter(Prio, ShiftOpts)
|
||||
},
|
||||
{{value, Val}, MQ1};
|
||||
out(MQ = #mqueue{q = Q, counter = 0}) ->
|
||||
MQ1 = MQ#mqueue{
|
||||
q = ?PQUEUE:shift(Q),
|
||||
last_p = undefined
|
||||
},
|
||||
out(MQ1);
|
||||
out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) ->
|
||||
{R, Q1} = ?PQUEUE:out(Q),
|
||||
{R, MQ#mqueue{q = Q1, len = Len - 1}}.
|
||||
{R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}};
|
||||
out(?OLD(MQ)) ->
|
||||
out(live_upgrade(MQ)).
|
||||
|
||||
get_opt(Key, Opts, Default) ->
|
||||
case maps:get(Key, Opts, Default) of
|
||||
|
@ -194,3 +235,46 @@ get_priority_opt(Opts) ->
|
|||
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
|
||||
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
|
||||
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).
|
||||
|
||||
init_counter(?HIGHEST_PRIORITY, Opts) ->
|
||||
Infinity = 1000000,
|
||||
init_counter(Infinity, Opts);
|
||||
init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) ->
|
||||
(Prio + Base) * Mult.
|
||||
|
||||
get_shift_opt(Opts) ->
|
||||
Mult = maps:get(shift_multiplier, Opts, 10),
|
||||
Min = case Opts of
|
||||
#{p_table := PTab} ->
|
||||
case maps:size(PTab) of
|
||||
0 -> 0;
|
||||
_ -> lists:min(maps:values(PTab))
|
||||
end;
|
||||
_ ->
|
||||
?LOWEST_PRIORITY
|
||||
end,
|
||||
Base = case Min < 0 of
|
||||
true -> -Min;
|
||||
false -> 0
|
||||
end,
|
||||
#shift_opts{
|
||||
multiplier = Mult,
|
||||
base = Base
|
||||
}.
|
||||
|
||||
live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) ->
|
||||
ShiftOpts = case is_map(PTable) of
|
||||
true -> get_shift_opt(#{p_table => PTable});
|
||||
false -> get_shift_opt(#{})
|
||||
end,
|
||||
#mqueue{ store_qos0 = StoreQos0
|
||||
, max_len = MaxLen
|
||||
, dropped = Dropped
|
||||
, p_table = PTable
|
||||
, default_p = DefaultP
|
||||
, len = Len
|
||||
, q = Q
|
||||
, shift_opts = ShiftOpts
|
||||
, last_p = undefined
|
||||
, counter = undefined
|
||||
}.
|
||||
|
|
|
@ -55,6 +55,7 @@
|
|||
, filter/2
|
||||
, fold/3
|
||||
, highest/1
|
||||
, shift/1
|
||||
]).
|
||||
|
||||
-export_type([q/0]).
|
||||
|
@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) ->
|
|||
end,
|
||||
{R, NewQ}.
|
||||
|
||||
-spec(shift(pqueue()) -> pqueue()).
|
||||
shift(Q = {queue, _, _, _}) ->
|
||||
Q;
|
||||
shift({pqueue, []}) ->
|
||||
{pqueue, []}; %% Shouldn't happen?
|
||||
shift({pqueue, [Hd|Rest]}) ->
|
||||
{pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities.
|
||||
|
||||
-spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}).
|
||||
out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0);
|
||||
out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)).
|
||||
|
@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}.
|
|||
|
||||
maybe_negate_priority(infinity) -> infinity;
|
||||
maybe_negate_priority(P) -> -P.
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@
|
|||
, stats/1
|
||||
]).
|
||||
|
||||
-export([call/2]).
|
||||
-export([call/2, call/3]).
|
||||
|
||||
%% WebSocket callbacks
|
||||
-export([ init/2
|
||||
|
@ -151,7 +151,10 @@ stats(#state{channel = Channel}) ->
|
|||
|
||||
%% kick|discard|takeover
|
||||
-spec(call(pid(), Req :: term()) -> Reply :: term()).
|
||||
call(WsPid, Req) when is_pid(WsPid) ->
|
||||
call(WsPid, Req) ->
|
||||
call(WsPid, Req, 5000).
|
||||
|
||||
call(WsPid, Req, Timeout) when is_pid(WsPid) ->
|
||||
Mref = erlang:monitor(process, WsPid),
|
||||
WsPid ! {call, {self(), Mref}, Req},
|
||||
receive
|
||||
|
@ -160,7 +163,7 @@ call(WsPid, Req) when is_pid(WsPid) ->
|
|||
Reply;
|
||||
{'DOWN', Mref, _, _, Reason} ->
|
||||
exit(Reason)
|
||||
after 5000 ->
|
||||
after Timeout ->
|
||||
erlang:demonitor(Mref, [flush]),
|
||||
exit(timeout)
|
||||
end.
|
||||
|
|
|
@ -77,7 +77,7 @@ t_get_set_chan_stats(_) ->
|
|||
|
||||
t_open_session(_) ->
|
||||
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
||||
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
|
||||
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
|
||||
|
||||
ClientInfo = #{zone => external,
|
||||
clientid => <<"clientid">>,
|
||||
|
@ -153,14 +153,14 @@ t_open_session_race_condition(_) ->
|
|||
|
||||
t_discard_session(_) ->
|
||||
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
||||
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
|
||||
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end),
|
||||
ok = emqx_cm:discard_session(<<"clientid">>),
|
||||
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
||||
ok = emqx_cm:discard_session(<<"clientid">>),
|
||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
||||
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
||||
ok = emqx_cm:discard_session(<<"clientid">>),
|
||||
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
|
||||
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end),
|
||||
ok = emqx_cm:discard_session(<<"clientid">>),
|
||||
ok = emqx_cm:unregister_channel(<<"clientid">>),
|
||||
ok = meck:unload(emqx_connection).
|
||||
|
@ -180,7 +180,7 @@ t_takeover_session(_) ->
|
|||
|
||||
t_kick_session(_) ->
|
||||
ok = meck:new(emqx_connection, [passthrough, no_history]),
|
||||
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
|
||||
ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end),
|
||||
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
|
||||
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
|
||||
test = emqx_cm:kick_session(<<"clientid">>),
|
||||
|
|
|
@ -278,7 +278,7 @@ t_connect_emit_stats_timeout(_) ->
|
|||
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
|
||||
|
||||
?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
|
||||
timer:sleep(IdleTimeout),
|
||||
timer:sleep(IdleTimeout+500),
|
||||
?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
|
||||
ok = emqtt:disconnect(Client).
|
||||
|
||||
|
|
Loading…
Reference in New Issue