From 0c742279959ccd845806997dc72fb2fdcc7b7c67 Mon Sep 17 00:00:00 2001 From: Georgy Sychev Date: Tue, 22 Mar 2022 18:06:01 +0400 Subject: [PATCH 1/2] feat(shared_sub): per group strategies and local strategy Adds backward compatible per group strategy for shared subscriptions Adds local shared subscription strategy --- etc/emqx.conf | 19 ++- priv/emqx.schema | 28 ++++- src/emqx.appup.src | 2 + src/emqx_shared_sub.erl | 30 ++++- test/emqx_shared_sub_SUITE.erl | 211 ++++++++++++++++++++++++++++----- 5 files changed, 250 insertions(+), 40 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index ac2415649..b7f8e6c2b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2264,14 +2264,27 @@ broker.session_locking_strategy = quorum ## Dispatch strategy for shared subscription ## ## Value: Enum +## - hash_clientid +## - hash # same as hash_clientid +## - hash_topic +## - local ## - random ## - round_robin ## - sticky -## - hash # same as hash_clientid -## - hash_clientid -## - hash_topic broker.shared_subscription_strategy = random +## Per group dispatch strategy for shared subscription +## +## Value: Enum +## - hash_clientid +## - hash # same as hash_clientid +## - hash_topic +## - local +## - random +## - round_robin +## - sticky +broker.sample_group.shared_subscription_strategy = local + ## Enable/disable shared dispatch acknowledgement for QoS1 and QoS2 messages ## This should allow messages to be dispatched to a different subscriber in ## the group in case the picked (based on shared_subscription_strategy) one # is offline diff --git a/priv/emqx.schema b/priv/emqx.schema index a1cf2a97b..944541d76 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2311,7 +2311,7 @@ end}. {datatype, {enum, [local,leader,quorum,all]}} ]}. -%% @doc Shared Subscription Dispatch Strategy. +%% @doc Default shared Subscription Dispatch Strategy. {mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [ {default, round_robin}, {datatype, @@ -2320,11 +2320,37 @@ end}. round_robin, %% round robin alive subscribers one message after another sticky, %% pick a random subscriber and stick to it hash, %% hash client ID to a group member + local, %% send to some locally available subscriber hash_clientid, hash_topic ]}} ]}. +%% @doc Per group Shared Subscription Dispatch Strategy +{mapping, "broker.$name.shared_subscription_strategy", "emqx.shared_subscription_strategy_per_group", [ + {default, round_robin}, + {datatype, + {enum, + [random, %% randomly pick a subscriber + round_robin, %% round robin alive subscribers one message after another + sticky, %% pick a random subscriber and stick to it + hash, %% hash client ID to a group member + local, %% send to some locally available subscriber + hash_clientid, + hash_topic + ]}} +]}. + +{translation, "emqx.shared_subscription_strategy_per_group", fun(Conf) -> + Conf0 = cuttlefish_variable:filter_by_prefix("broker", Conf), + Groups = lists:filtermap(fun({["broker", Group, "shared_subscription_strategy"], Strategy}) -> + {true, {Group, Strategy}}; + (_) -> + false + end, Conf0), + maps:from_list(Groups) +end}. + %% @doc Enable or disable shared dispatch acknowledgement for QoS1 and QoS2 messages {mapping, "broker.shared_dispatch_ack_enabled", "emqx.shared_dispatch_ack_enabled", [ {default, false}, diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 3951c417c..7ddf53448 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -4,6 +4,7 @@ [{"4.3.14", [{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.13", @@ -434,6 +435,7 @@ [{"4.3.14", [{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.13", diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 2de15eb55..aa8168480 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -47,7 +47,12 @@ ]). %% for testing --export([subscribers/2, ack_enabled/0]). +-ifdef(TEST). +-export([ subscribers/2 + , ack_enabled/0 + , strategy/1 + ]). +-endif. %% gen_server callbacks -export([ init/1 @@ -63,6 +68,7 @@ -type strategy() :: random | round_robin | sticky + | local | hash %% same as hash_clientid, backward compatible | hash_clientid | hash_topic. @@ -121,7 +127,7 @@ dispatch(Group, Topic, Delivery) -> dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> #message{from = ClientId, topic = SourceTopic} = Msg, - case pick(strategy(), ClientId, SourceTopic, Group, Topic, FailedSubs) of + case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of false -> {error, no_subscribers}; {Type, SubPid} -> @@ -133,9 +139,16 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> end end. --spec(strategy() -> strategy()). -strategy() -> - emqx:get_env(shared_subscription_strategy, random). +-spec(strategy(emqx_topic:group()) -> strategy()). +strategy(Group) -> + case emqx:get_env(shared_subscription_strategy_per_group, #{}) of + #{Group := Strategy} -> + Strategy; + + _ -> + emqx:get_env(shared_subscription_strategy, random) + end. + -spec(ack_enabled() -> boolean()). ack_enabled() -> @@ -267,6 +280,13 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) -> end. pick_subscriber(_Group, _Topic, _Strategy, _ClientId, _SourceTopic, [Sub]) -> Sub; +pick_subscriber(Group, Topic, local, ClientId, SourceTopic, Subs) -> + case lists:filter(fun(Pid) -> erlang:node(Pid) =:= node() end, Subs) of + [_ | _] = LocalSubs -> + pick_subscriber(Group, Topic, random, ClientId, SourceTopic, LocalSubs); + [] -> + pick_subscriber(Group, Topic, random, ClientId, SourceTopic, Subs) + end; pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) -> Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, length(Subs)), lists:nth(Nth, Subs). diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 54a0de6d2..09b480c0f 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -35,6 +35,7 @@ all() -> emqx_ct:all(?SUITE). init_per_suite(Config) -> + net_kernel:start(['master@127.0.0.1', longnames]), emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. @@ -125,7 +126,7 @@ t_no_connection_nack(_) -> emqx:publish(M#message{id = PacketId}) end, SendF(1), - timer:sleep(200), + ct:sleep(200), %% This is the connection which was picked by broker to dispatch (sticky) for 1st message ?assertMatch([#{packet_id := 1}], recv_msgs(1)), @@ -164,19 +165,24 @@ t_no_connection_nack(_) -> ok. t_random(_) -> + ok = ensure_config(random, true), test_two_messages(random). t_round_robin(_) -> + ok = ensure_config(round_robin, true), test_two_messages(round_robin). t_sticky(_) -> + ok = ensure_config(sticky, true), test_two_messages(sticky). t_hash(_) -> - test_two_messages(hash, false). + ok = ensure_config(hash, false), + test_two_messages(hash). t_hash_clinetid(_) -> - test_two_messages(hash_clientid, false). + ok = ensure_config(hash_clientid, false), + test_two_messages(hash_clientid). t_hash_topic(_) -> ok = ensure_config(hash_topic, false), @@ -242,55 +248,48 @@ t_not_so_sticky(_) -> ok. test_two_messages(Strategy) -> - test_two_messages(Strategy, _WithAck = true). + test_two_messages(Strategy, <<"group1">>). -test_two_messages(Strategy, WithAck) -> - ok = ensure_config(Strategy, WithAck), +test_two_messages(Strategy, Group) -> Topic = <<"foo/bar">>, ClientId1 = <<"ClientId1">>, ClientId2 = <<"ClientId2">>, {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), - {ok, _} = emqtt:connect(ConnPid1), {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), + {ok, _} = emqtt:connect(ConnPid1), {ok, _} = emqtt:connect(ConnPid2), + emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 0}), + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>), - emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}), - emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}), ct:sleep(100), + emqx:publish(Message1), - Me = self(), - WaitF = fun(ExpectedPayload) -> - case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of - {true, Pid} -> - Me ! {subscriber, Pid}, - true; - Other -> - Other - end - end, - WaitF(<<"hello1">>), - UsedSubPid1 = receive {subscriber, P1} -> P1 end, - emqx_broker:publish(Message2), - WaitF(<<"hello2">>), - UsedSubPid2 = receive {subscriber, P2} -> P2 end, - case Strategy of - sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2); - round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2); - hash -> ?assert(UsedSubPid1 =:= UsedSubPid2); - _ -> ok - end, + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + + emqx:publish(Message2), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]), + emqtt:stop(ConnPid1), emqtt:stop(ConnPid2), + + case Strategy of + sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2); + round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2); + hash -> ?assertEqual(UsedSubPid1, UsedSubPid2); + _ -> ok + end, ok. last_message(ExpectedPayload, Pids) -> receive {publish, #{client_pid := Pid, payload := ExpectedPayload}} -> - ct:pal("~p ====== ~p", [Pids, Pid]), + ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]), {true, Pid} after 100 -> + ct:pal("not yet"), <<"not yet?">> end. @@ -314,6 +313,103 @@ t_uncovered_func(_) -> ignored = emqx_shared_sub ! ignored, {mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}. +t_per_group_config(_) -> + ok = ensure_group_config(#{ + <<"local_group_fallback">> => local, + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }), + %% Each test is repeated 4 times because random strategy may technically pass the test + %% so we run 8 tests to make random pass in only 1/256 runs + + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(round_robin, <<"round_robin_group">>), + test_two_messages(round_robin, <<"round_robin_group">>), + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(sticky, <<"sticky_group">>), + test_two_messages(round_robin, <<"round_robin_group">>), + test_two_messages(round_robin, <<"round_robin_group">>). + +t_local(_) -> + Node = start_slave('local_shared_sub_test', 21884), + GroupConfig = #{ + <<"local_group_fallback">> => local, + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }, + ok = ensure_group_config(Node, GroupConfig), + ok = ensure_group_config(GroupConfig), + + Topic = <<"local_foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {port, 21884}]), + {ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]), + + {ok, _} = emqtt:connect(ConnPid1), + {ok, _} = emqtt:connect(ConnPid2), + + emqtt:subscribe(ConnPid1, {<<"$share/local_group/local_foo/bar">>, 0}), + emqtt:subscribe(ConnPid2, {<<"$share/local_group/local_foo/bar">>, 0}), + + ct:sleep(100), + + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), + + emqx:publish(Message1), + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]), + + rpc:call(Node, emqx, publish, [Message2]), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]), + RemoteLocalGroupStrategy = rpc:call(Node, emqx_shared_sub, strategy, [<<"local_group">>]), + + emqtt:stop(ConnPid1), + emqtt:stop(ConnPid2), + stop_slave(Node), + + ?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)), + ?assertEqual(local, RemoteLocalGroupStrategy), + + ?assertNotEqual(UsedSubPid1, UsedSubPid2), + ok. + +t_local_fallback(_) -> + ok = ensure_group_config(#{ + <<"local_group_fallback">> => local, + <<"local_group">> => local, + <<"round_robin_group">> => round_robin, + <<"sticky_group">> => sticky + }), + + Topic = <<"local_foo/bar">>, + ClientId1 = <<"ClientId1">>, + ClientId2 = <<"ClientId2">>, + Node = start_slave('local_fallback_shared_sub_test', 11885), + + {ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]), + {ok, _} = emqtt:connect(ConnPid1), + Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>), + Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>), + + emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/local_foo/bar">>, 0}), + + emqx:publish(Message1), + {true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]), + + rpc:call(Node, emqx, publish, [Message2]), + {true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]), + + emqtt:stop(ConnPid1), + stop_slave(Node), + + ?assertEqual(UsedSubPid1, UsedSubPid2), + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- @@ -326,6 +422,12 @@ ensure_config(Strategy, AckEnabled) -> application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled), ok. +ensure_group_config(Node, Group2Strategy) -> + rpc:call(Node, application, set_env, [emqx, shared_subscription_strategy_per_group, Group2Strategy]). + +ensure_group_config(Group2Strategy) -> + application:set_env(emqx, shared_subscription_strategy_per_group, Group2Strategy). + subscribed(Group, Topic, Pid) -> lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)). @@ -343,3 +445,50 @@ recv_msgs(Count, Msgs) -> Msgs end. +start_slave(Name, Port) -> + {ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()), + [{kill_if_fail, true}, + {monitor_master, true}, + {init_timeout, 10000}, + {startup_timeout, 10000}, + {erl_flags, ebin_path()}]), + + pong = net_adm:ping(Node), + setup_node(Node, Port), + Node. + +stop_slave(Node) -> + rpc:call(Node, ekka, leave, []), + ct_slave:stop(Node). + +host() -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), Host. + +ebin_path() -> + string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " "). + +is_lib(Path) -> + string:prefix(Path, code:lib_dir()) =:= nomatch. + +setup_node(Node, Port) -> + EnvHandler = + fun(emqx) -> + application:set_env( + emqx, + listeners, + [#{listen_on => {{127,0,0,1},Port}, + name => "internal", + opts => [{zone,internal}], + proto => tcp}]), + application:set_env(gen_rpc, port_discovery, manual), + ok; + (_) -> + ok + end, + + [ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]], + ok = rpc:call(Node, emqx_ct_helpers, start_apps, [[emqx], EnvHandler]), + + rpc:call(Node, ekka, join, [node()]), + + ok. From 9694c3277d00300a247ddc5cd5333a367989100c Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 7 Apr 2022 15:17:02 +0200 Subject: [PATCH 2/2] fix(appup): re-generate emqx.appup.src --- src/emqx.appup.src | 54 ++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 7ddf53448..432daad39 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -8,7 +8,8 @@ {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.13", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -24,7 +25,8 @@ {load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.12", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -48,7 +50,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.11", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -74,7 +77,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.10", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -100,7 +104,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -130,7 +135,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -160,7 +166,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -191,7 +198,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -222,7 +230,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -439,7 +448,8 @@ {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.13", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, @@ -455,7 +465,8 @@ {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {"4.3.12", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, @@ -478,7 +489,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.11", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -503,7 +515,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.10", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -528,7 +541,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.9", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -557,7 +571,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, @@ -586,7 +601,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.7", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -616,7 +632,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]}, @@ -646,7 +663,8 @@ {load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_hooks,brutal_purge,soft_purge,[]}, + [{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}, {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_banned,brutal_purge,soft_purge,[]},