From 903a9e57a8bceaade3cb38aef0194730a5383274 Mon Sep 17 00:00:00 2001 From: Turtle Date: Fri, 24 Sep 2021 12:51:28 +0800 Subject: [PATCH 1/9] feat(mqueue): Interleave messages with different priorities --- src/emqx.appup.src | 48 +++++++++++++++----- src/emqx_mqueue.erl | 106 +++++++++++++++++++++++++++++++++++++++----- src/emqx_pqueue.erl | 10 ++++- 3 files changed, 142 insertions(+), 22 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index a10c1d209..442ffe200 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -19,7 +19,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ {add_module, emqx_congestion}, @@ -37,7 +39,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ {add_module, emqx_congestion}, @@ -52,7 +56,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -65,7 +71,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, @@ -77,7 +85,13 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, - {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]} + {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + ]}, + {<<"4.2.[6-7]">>, [ + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<".*">>, []} ], @@ -99,7 +113,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -117,7 +133,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -132,7 +150,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -145,7 +165,9 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, @@ -157,7 +179,13 @@ {load_module, emqx_router_sup, soft_purge, soft_purge, []}, {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, - {load_module, emqx_router, soft_purge, soft_purge, []} + {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + ]}, + {<<"4.2.[6-7]">>, [ + {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} ]}, {<<".*">>, []} ] diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index f77a1d98f..f888f69a3 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -67,6 +67,9 @@ , dropped/1 ]). +-export([ live_upgrade/1 + ]). + -export_type([mqueue/0, options/0]). -type(topic() :: emqx_topic:topic()). @@ -91,6 +94,11 @@ -define(MAX_LEN_INFINITY, 0). -define(INFO_KEYS, [store_qos0, max_len, len, dropped]). +-record(shift_opts, { + multiplier :: non_neg_integer(), + base :: integer() + }). + -record(mqueue, { store_qos0 = false :: boolean(), max_len = ?MAX_LEN_INFINITY :: count(), @@ -98,11 +106,16 @@ dropped = 0 :: count(), p_table = ?NO_PRIORITY_TABLE :: p_table(), default_p = ?LOWEST_PRIORITY :: priority(), - q = ?PQUEUE:new() :: pq() + q = ?PQUEUE:new() :: pq(), + shift_opts :: #shift_opts{}, + last_p :: non_neg_integer() | undefined, + counter :: non_neg_integer() | undefined }). -type(mqueue() :: #mqueue{}). +-define(OLD(Q), Q = {mqueue, _, _, _, _, _, _, _}). + -spec(init(options()) -> mqueue()). init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of @@ -112,7 +125,8 @@ init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> #mqueue{max_len = MaxLen, store_qos0 = QoS_0, p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), - default_p = get_priority_opt(Opts) + default_p = get_priority_opt(Opts), + shift_opts = get_shift_opt(Opts) }. -spec(info(mqueue()) -> emqx_types:infos()). @@ -127,22 +141,30 @@ info(max_len, #mqueue{max_len = MaxLen}) -> info(len, #mqueue{len = Len}) -> Len; info(dropped, #mqueue{dropped = Dropped}) -> - Dropped. + Dropped; +info(Info, ?OLD(MQ)) -> + info(Info, live_upgrade(MQ)). -is_empty(#mqueue{len = Len}) -> Len =:= 0. +is_empty(#mqueue{len = Len}) -> Len =:= 0; +is_empty(?OLD(MQ)) -> is_empty(live_upgrade(MQ)). -len(#mqueue{len = Len}) -> Len. +len(#mqueue{len = Len}) -> Len; +len(?OLD(MQ)) -> len(live_upgrade(MQ)). -max_len(#mqueue{max_len = MaxLen}) -> MaxLen. +max_len(#mqueue{max_len = MaxLen}) -> MaxLen; +max_len(?OLD(MQ)) -> max_len(live_upgrade(MQ)). %% @doc Return number of dropped messages. -spec(dropped(mqueue()) -> count()). -dropped(#mqueue{dropped = Dropped}) -> Dropped. +dropped(#mqueue{dropped = Dropped}) -> Dropped; +dropped(?OLD(MQ)) -> dropped(live_upgrade(MQ)). %% @doc Stats of the mqueue -spec(stats(mqueue()) -> [stat()]). stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> - [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]. + [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]; +stats(?OLD(MQ)) -> + stats(live_upgrade(MQ)). %% @doc Enqueue a message. -spec(in(message(), mqueue()) -> {maybe(message()), mqueue()}). @@ -165,15 +187,34 @@ in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}}; false -> {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}} - end. + end; +in(Msg, ?OLD(MQ)) -> + in(Msg, live_upgrade(MQ)). -spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}). out(MQ = #mqueue{len = 0, q = Q}) -> 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap {empty, MQ}; -out(MQ = #mqueue{q = Q, len = Len}) -> +out(MQ = #mqueue{q = Q, len = Len, last_p = undefined, shift_opts = ShiftOpts}) -> + {{value, Val, Prio}, Q1} = ?PQUEUE:out_p(Q), %% Shouldn't fail, since we've checked the length + MQ1 = MQ#mqueue{ + q = Q1, + len = Len - 1, + last_p = Prio, + counter = init_counter(Prio, ShiftOpts) + }, + {{value, Val}, MQ1}; +out(MQ = #mqueue{q = Q, counter = 0}) -> + MQ1 = MQ#mqueue{ + q = ?PQUEUE:shift(Q), + last_p = undefined + }, + out(MQ1); +out(MQ = #mqueue{q = Q, len = Len, counter = Cnt}) -> {R, Q1} = ?PQUEUE:out(Q), - {R, MQ#mqueue{q = Q1, len = Len - 1}}. + {R, MQ#mqueue{q = Q1, len = Len - 1, counter = Cnt - 1}}; +out(?OLD(MQ)) -> + out(live_upgrade(MQ)). get_opt(Key, Opts, Default) -> case maps:get(Key, Opts, Default) of @@ -194,3 +235,46 @@ get_priority_opt(Opts) -> %% while the highest 'infinity' is a [{infinity, queue:queue()}] get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY; get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp). + +init_counter(?HIGHEST_PRIORITY, Opts) -> + Infinity = 1000000, + init_counter(Infinity, Opts); +init_counter(Prio, #shift_opts{multiplier = Mult, base = Base}) -> + (Prio + Base) * Mult. + +get_shift_opt(Opts) -> + Mult = maps:get(shift_multiplier, Opts, 10), + Min = case Opts of + #{p_table := PTab} -> + case maps:size(PTab) of + 0 -> 0; + _ -> lists:min(maps:values(PTab)) + end; + _ -> + ?LOWEST_PRIORITY + end, + Base = case Min < 0 of + true -> -Min; + false -> 0 + end, + #shift_opts{ + multiplier = Mult, + base = Base + }. + +live_upgrade({mqueue, StoreQos0, MaxLen, Len, Dropped, PTable, DefaultP, Q}) -> + ShiftOpts = case is_map(PTable) of + true -> get_shift_opt(#{p_table => PTable}); + false -> get_shift_opt(#{}) + end, + #mqueue{ store_qos0 = StoreQos0 + , max_len = MaxLen + , dropped = Dropped + , p_table = PTable + , default_p = DefaultP + , len = Len + , q = Q + , shift_opts = ShiftOpts + , last_p = undefined + , counter = undefined + }. diff --git a/src/emqx_pqueue.erl b/src/emqx_pqueue.erl index 85c89866d..5dd81af0b 100644 --- a/src/emqx_pqueue.erl +++ b/src/emqx_pqueue.erl @@ -55,6 +55,7 @@ , filter/2 , fold/3 , highest/1 + , shift/1 ]). -export_type([q/0]). @@ -170,6 +171,14 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +-spec(shift(pqueue()) -> pqueue()). +shift(Q = {queue, _, _, _}) -> + Q; +shift({pqueue, []}) -> + {pqueue, []}; %% Shouldn't happen? +shift({pqueue, [Hd|Rest]}) -> + {pqueue, Rest ++ [Hd]}. %% Let's hope there are not many priorities. + -spec(out_p(pqueue()) -> {empty | {value, any(), priority()}, pqueue()}). out_p({queue, _, _, _} = Q) -> add_p(out(Q), 0); out_p({pqueue, [{P, _} | _]} = Q) -> add_p(out(Q), maybe_negate_priority(P)). @@ -266,4 +275,3 @@ r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}. maybe_negate_priority(infinity) -> infinity; maybe_negate_priority(P) -> -P. - From c0ca7f8beab997d89fb4708c8ffc2b55370540ca Mon Sep 17 00:00:00 2001 From: Turtle Date: Sun, 26 Sep 2021 18:21:34 +0800 Subject: [PATCH 2/9] fix(force_shutdown): cannot suicide if the process hangs up --- src/emqx.appup.src | 48 ++++++++++++++++++++++++++++---------- src/emqx_alarm_handler.erl | 12 +++++----- src/emqx_misc.erl | 22 +++++++++++++++-- 3 files changed, 62 insertions(+), 20 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 442ffe200..0c3ab3da8 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -21,7 +21,9 @@ {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ {add_module, emqx_congestion}, @@ -41,7 +43,9 @@ {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ {add_module, emqx_congestion}, @@ -58,7 +62,9 @@ {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -73,7 +79,9 @@ {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, @@ -87,11 +95,15 @@ {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[6-7]">>, [ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<".*">>, []} ], @@ -115,7 +127,9 @@ {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {"4.2.1", [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -135,7 +149,9 @@ {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[23]">>, [ {load_module, emqx_shared_sub, brutal_purge, soft_purge, []}, @@ -152,7 +168,9 @@ {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.4">>, [ {load_module, emqx_frame, brutal_purge, soft_purge, []}, @@ -167,7 +185,9 @@ {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.5">>, [ {load_module, emqx_channel, brutal_purge, soft_purge, []}, @@ -181,11 +201,15 @@ {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[6-7]">>, [ {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, - {load_module,emqx_mqueue,brutal_purge,soft_purge,[]} + {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, + {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, + {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<".*">>, []} ] diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index 7f66f6eb1..cd52e586f 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -56,20 +56,20 @@ init({_Args, {alarm_handler, _ExistingAlarms}}) -> init(_) -> {ok, []}. -handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> +handle_event({set_alarm, {system_memory_high_watermark, []}}, State) -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => emqx_os_mon:get_sysmem_high_watermark()}), {ok, State}; -handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> - emqx_alarm:activate(high_process_memory_usage, #{pid => Pid, +handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> + emqx_alarm:activate(high_process_memory_usage, #{pid => list_to_binary(pid_to_list(Pid)), high_watermark => emqx_os_mon:get_procmem_high_watermark()}), {ok, State}; -handle_event({clear_alarm, system_memory_high_watermark}, State) -> +handle_event({clear_alarm, system_memory_high_watermark}, State) -> emqx_alarm:deactivate(high_system_memory_usage), {ok, State}; -handle_event({clear_alarm, process_memory_high_watermark}, State) -> +handle_event({clear_alarm, process_memory_high_watermark}, State) -> emqx_alarm:deactivate(high_process_memory_usage), {ok, State}; @@ -85,4 +85,4 @@ handle_call(_Query, State) -> terminate(swap, _State) -> {emqx_alarm_handler, []}; terminate(_, _) -> - ok. \ No newline at end of file + ok. diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 88e3c91a6..55e8b28f9 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -45,6 +45,8 @@ , index_of/2 ]). +-define(OOM_FACTOR, 1.25). + %% @doc Merge options -spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()). merge_opts(Defaults, Options) -> @@ -185,8 +187,8 @@ do_check_oom([{Val, Max, Reason}|Rest]) -> tune_heap_size(#{max_heap_size := MaxHeapSize}) -> %% If set to zero, the limit is disabled. - erlang:process_flag(max_heap_size, #{size => MaxHeapSize, - kill => false, + erlang:process_flag(max_heap_size, #{size => must_kill_heap_size(MaxHeapSize), + kill => true, error_logger => true }); tune_heap_size(undefined) -> ok. @@ -233,3 +235,19 @@ index_of(E, I, [E|_]) -> index_of(E, I, [_|L]) -> index_of(E, I+1, L). +must_kill_heap_size(Size) -> + %% We set the max allowed heap size by `erlang:process_flag(max_heap_size, #{size => Size})`, + %% where the `Size` cannot be set to an integer lager than `(1 bsl 59) - 1` on a 64-bit system, + %% or `(1 bsl 27) - 1` on a 32-bit system. + MaxAllowedSize = case erlang:system_info(wordsize) of + 8 -> % arch_64 + (1 bsl 59) - 1; + 4 -> % arch_32 + (1 bsl 27) - 1 + end, + %% We multiply the size with factor ?OOM_FACTOR, to give the + %% process a chance to suicide by `check_oom/1` + case ceil(Size * ?OOM_FACTOR) of + Size0 when Size0 >= MaxAllowedSize -> MaxAllowedSize; + Size0 -> Size0 + end. From fb65d1c5816244b93b33251cf0ec92c710d6f3df Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 24 Sep 2021 22:29:35 +0800 Subject: [PATCH 3/9] chore: add mqtt-sn protocol define --- include/emqx_mqtt.hrl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 078baf301..261245e60 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -30,11 +30,13 @@ %% MQTT Protocol Version and Names %%-------------------------------------------------------------------- +-define(MQTT_SN_PROTO_V1, 1). -define(MQTT_PROTO_V3, 3). -define(MQTT_PROTO_V4, 4). -define(MQTT_PROTO_V5, 5). -define(PROTOCOL_NAMES, [ + {?MQTT_SN_PROTO_V1, <<"MQTT-SN">>}, %% XXX:Compatible with emqx-sn plug-in {?MQTT_PROTO_V3, <<"MQIsdp">>}, {?MQTT_PROTO_V4, <<"MQTT">>}, {?MQTT_PROTO_V5, <<"MQTT">>}]). From aa5d274464c5e93d71cce58771916fe49a8c9791 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 24 Sep 2021 22:32:17 +0800 Subject: [PATCH 4/9] feat: acl.conf support ipaddrs --- src/emqx_access_rule.erl | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/emqx_access_rule.erl b/src/emqx_access_rule.erl index f0c6bcea9..b0e1aeb0b 100644 --- a/src/emqx_access_rule.erl +++ b/src/emqx_access_rule.erl @@ -28,7 +28,8 @@ -type(who() :: all | binary() | {client, binary()} | {user, binary()} | - {ipaddr, esockd_cidr:cidr_string()}). + {ipaddr, esockd_cidr:cidr_string()} | + {ipaddrs, list(esockd_cidr:cidr_string())}). -type(access() :: subscribe | publish | pubsub). @@ -52,6 +53,8 @@ compile(who, all) -> all; compile(who, {ipaddr, CIDR}) -> {ipaddr, esockd_cidr:parse(CIDR, true)}; +compile(who, {ipaddrs, CIDRs}) -> + {ipaddrs, lists:map(fun(CIDR) -> esockd_cidr:parse(CIDR, true) end, CIDRs)}; compile(who, {client, all}) -> {client, all}; compile(who, {client, ClientId}) -> @@ -108,8 +111,14 @@ match_who(#{username := Username}, {user, Username}) -> true; match_who(#{peerhost := undefined}, {ipaddr, _Tup}) -> false; +match_who(#{peerhost := undefined}, {ipaddrs, _}) -> + false; match_who(#{peerhost := IP}, {ipaddr, CIDR}) -> esockd_cidr:match(IP, CIDR); +match_who(#{peerhost := IP}, {ipaddrs, CIDRs}) -> + lists:any(fun(CIDR) -> + esockd_cidr:match(IP, CIDR) + end, CIDRs); match_who(ClientInfo, {'and', Conds}) when is_list(Conds) -> lists:foldl(fun(Who, Allow) -> match_who(ClientInfo, Who) andalso Allow From 7b177a79297b3723196c703324379f58a9ccec98 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 24 Sep 2021 22:41:53 +0800 Subject: [PATCH 5/9] fix: more safe session call --- src/emqx_cm.erl | 45 ++++++++++++++++++++++++-------------- src/emqx_connection.erl | 6 +++-- src/emqx_ws_connection.erl | 9 +++++--- test/emqx_cm_SUITE.erl | 8 +++---- 4 files changed, 43 insertions(+), 25 deletions(-) diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 200a5f08e..36a280714 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -88,6 +88,8 @@ %% Batch drain -define(BATCH_SIZE, 100000). +-define(T_TAKEOVER, 15000). + %% Server name -define(CM, ?MODULE). @@ -222,7 +224,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), + Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), register_channel_(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, @@ -264,7 +266,7 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> undefined -> {error, not_found}; ConnMod when is_atom(ConnMod) -> - Session = ConnMod:call(ChanPid, {takeover, 'begin'}), + Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), {ok, ConnMod, ChanPid, Session} end; @@ -277,24 +279,35 @@ discard_session(ClientId) when is_binary(ClientId) -> case lookup_channels(ClientId) of [] -> ok; ChanPids -> - lists:foreach( - fun(ChanPid) -> - try - discard_session(ClientId, ChanPid) - catch - _:{noproc,_}:_Stk -> ok; - _:{{shutdown,_},_}:_Stk -> ok; - _:Error:_Stk -> - ?LOG(error, "Failed to discard ~0p: ~0p", [ChanPid, Error]) - end - end, ChanPids) + lists:foreach(fun(Pid) -> do_discard_session(ClientId, Pid) end, ChanPids) + end. + +do_discard_session(ClientId, Pid) -> + try + discard_session(ClientId, Pid) + catch + _ : noproc -> % emqx_ws_connection: call + ?LOG(debug, "session_already_gone: ~p", [Pid]), + ok; + _ : {noproc, _} -> % emqx_connection: gen_server:call + ?LOG(debug, "session_already_gone: ~p", [Pid]), + ok; + _ : {'EXIT', {noproc, _}} -> % rpc_call/3 + ?LOG(debug, "session_already_gone: ~p", [Pid]), + ok; + _ : {{shutdown, _}, _} -> + ?LOG(debug, "session_already_shutdown: ~p", [Pid]), + ok; + _ : Error : St -> + ?LOG(debug, "failed_to_discard_session: ~p, " + "error: ~p, stacktrace: ~0p", [Pid, Error, St]) end. discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chann_conn_mod(ClientId, ChanPid) of undefined -> ok; ConnMod when is_atom(ConnMod) -> - ConnMod:call(ChanPid, discard) + ConnMod:call(ChanPid, discard, ?T_TAKEOVER) end; discard_session(ClientId, ChanPid) -> @@ -317,7 +330,7 @@ kick_session(ClientId) -> kick_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chan_info(ClientId, ChanPid) of #{conninfo := #{conn_mod := ConnMod}} -> - ConnMod:call(ChanPid, kick); + ConnMod:call(ChanPid, kick, ?T_TAKEOVER); undefined -> {error, not_found} end; @@ -361,7 +374,7 @@ lookup_channels(local, ClientId) -> %% @private rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of + case rpc:call(Node, ?MODULE, Fun, Args, 2 * ?T_TAKEOVER) of {badrpc, Reason} -> error(Reason); Res -> Res end. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 720b7ec98..69337adf2 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -38,7 +38,7 @@ , stats/1 ]). --export([call/2]). +-export([call/2, call/3]). %% Callback -export([init/4]). @@ -168,7 +168,9 @@ stats(#state{transport = Transport, lists:append([SockStats, ConnStats, ChanStats, ProcStats]). call(Pid, Req) -> - gen_server:call(Pid, Req, infinity). + call(Pid, Req, infinity). +call(Pid, Req, Timeout) -> + gen_server:call(Pid, Req, Timeout). stop(Pid) -> gen_server:stop(Pid). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index aacc4c4f2..ac975850e 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -34,7 +34,7 @@ , stats/1 ]). --export([call/2]). +-export([call/2, call/3]). %% WebSocket callbacks -export([ init/2 @@ -151,7 +151,10 @@ stats(#state{channel = Channel}) -> %% kick|discard|takeover -spec(call(pid(), Req :: term()) -> Reply :: term()). -call(WsPid, Req) when is_pid(WsPid) -> +call(WsPid, Req) -> + call(WsPid, Req, 5000). + +call(WsPid, Req, Timeout) when is_pid(WsPid) -> Mref = erlang:monitor(process, WsPid), WsPid ! {call, {self(), Mref}, Req}, receive @@ -160,7 +163,7 @@ call(WsPid, Req) when is_pid(WsPid) -> Reply; {'DOWN', Mref, _, _, Reason} -> exit(Reason) - after 5000 -> + after Timeout -> erlang:demonitor(Mref, [flush]), exit(timeout) end. diff --git a/test/emqx_cm_SUITE.erl b/test/emqx_cm_SUITE.erl index 6a4b84d01..b32cdaf23 100644 --- a/test/emqx_cm_SUITE.erl +++ b/test/emqx_cm_SUITE.erl @@ -77,7 +77,7 @@ t_get_set_chan_stats(_) -> t_open_session(_) -> ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), + ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), ClientInfo = #{zone => external, clientid => <<"clientid">>, @@ -153,14 +153,14 @@ t_open_session_race_condition(_) -> t_discard_session(_) -> ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), + ok = meck:expect(emqx_connection, call, fun(_, _, _) -> ok end), ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), ok = emqx_cm:discard_session(<<"clientid">>), - ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), + ok = meck:expect(emqx_connection, call, fun(_, _, _) -> error(testing) end), ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>), ok = meck:unload(emqx_connection). @@ -180,7 +180,7 @@ t_takeover_session(_) -> t_kick_session(_) -> ok = meck:new(emqx_connection, [passthrough, no_history]), - ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), + ok = meck:expect(emqx_connection, call, fun(_, _, _) -> test end), {error, not_found} = emqx_cm:kick_session(<<"clientid">>), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []), test = emqx_cm:kick_session(<<"clientid">>), From 5f2adb42ed229c93e161567abea1681d0bb27944 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 26 Sep 2021 18:04:16 +0800 Subject: [PATCH 6/9] chore: export keepalive funcs for mqtt-sn --- src/emqx_channel.erl | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 679d2c72a..e0789382a 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -49,7 +49,10 @@ ]). %% Export for emqx_sn --export([do_deliver/2]). +-export([ do_deliver/2 + , ensure_keepalive/2 + , clear_keepalive/1 + ]). %% Exports for CT -export([set_field/3]). @@ -1512,6 +1515,15 @@ ensure_keepalive_timer(Interval, Channel = #channel{clientinfo = #{zone := Zone} Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). +clear_keepalive(Channel = #channel{timers = Timers}) -> + case maps:get(alive_timer, Timers, undefined) of + undefined -> + Channel; + TRef -> + emqx_misc:cancel_timer(TRef), + Channel#channel{timers = maps:without([alive_timer], Timers)} + end. + %%-------------------------------------------------------------------- %% Maybe Resume Session From b9527cfe9d3a90a52cb6563f2fa7cfbca6d05278 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 26 Sep 2021 18:40:29 +0800 Subject: [PATCH 7/9] test: increase waiting time to avoid test failure --- test/mqtt_protocol_v5_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index 68655621a..7a6d08ea6 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -278,7 +278,7 @@ t_connect_emit_stats_timeout(_) -> [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), ?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))), - timer:sleep(IdleTimeout), + timer:sleep(IdleTimeout+500), ?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))), ok = emqtt:disconnect(Client). From 34375c6cc6d0ac3d51e1f6079ef9a4659b922b0b Mon Sep 17 00:00:00 2001 From: JianBo He Date: Sun, 26 Sep 2021 19:40:49 +0800 Subject: [PATCH 8/9] chore(appup): update appup.src --- src/emqx.appup.src | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 0c3ab3da8..a60b33bfd 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -20,6 +20,8 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -42,6 +44,8 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -61,6 +65,8 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -78,6 +84,8 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -94,12 +102,19 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, []}, {load_module, emqx_router, soft_purge, soft_purge, [emqx_trie]}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[6-7]">>, [ + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -126,6 +141,8 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -148,6 +165,8 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -167,6 +186,8 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -184,6 +205,8 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, @@ -200,12 +223,19 @@ {load_module, emqx_broker, soft_purge, soft_purge, []}, {load_module, emqx_trie, soft_purge, soft_purge, [emqx_router]}, {load_module, emqx_router, soft_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]} ]}, {<<"4.2.[6-7]">>, [ + {load_module, emqx_channel, brutal_purge, soft_purge, []}, + {load_module, emqx_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_ws_connection, brutal_purge, soft_purge, []}, + {load_module, emqx_cm, brutal_purge, soft_purge, []}, + {load_module, emqx_access_rule, brutal_purge, soft_purge, []}, {load_module,emqx_pqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_mqueue,brutal_purge,soft_purge,[]}, {load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]}, From e86e1e0430de1bda71eaaeff8c13e289ae55de53 Mon Sep 17 00:00:00 2001 From: Turtle Date: Mon, 27 Sep 2021 08:38:12 +0800 Subject: [PATCH 9/9] fix(ekka): kill the process if don't release lock --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 7e7014123..2370a9b87 100644 --- a/rebar.config +++ b/rebar.config @@ -7,7 +7,7 @@ {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.5"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.6"}}}, + {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.9"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} ]}.