feat(shared_sub): per group strategies and local strategy

Adds backward compatible per group strategy for shared subscriptions
Adds local shared subscription strategy
This commit is contained in:
Georgy Sychev 2022-03-22 18:06:01 +04:00
parent 16dc0d6555
commit 0c74227995
5 changed files with 250 additions and 40 deletions

View File

@ -2264,14 +2264,27 @@ broker.session_locking_strategy = quorum
## Dispatch strategy for shared subscription
##
## Value: Enum
## - hash_clientid
## - hash # same as hash_clientid
## - hash_topic
## - local
## - random
## - round_robin
## - sticky
## - hash # same as hash_clientid
## - hash_clientid
## - hash_topic
broker.shared_subscription_strategy = random
## Per group dispatch strategy for shared subscription
##
## Value: Enum
## - hash_clientid
## - hash # same as hash_clientid
## - hash_topic
## - local
## - random
## - round_robin
## - sticky
broker.sample_group.shared_subscription_strategy = local
## Enable/disable shared dispatch acknowledgement for QoS1 and QoS2 messages
## This should allow messages to be dispatched to a different subscriber in
## the group in case the picked (based on shared_subscription_strategy) one # is offline

View File

@ -2311,7 +2311,7 @@ end}.
{datatype, {enum, [local,leader,quorum,all]}}
]}.
%% @doc Shared Subscription Dispatch Strategy.
%% @doc Default shared Subscription Dispatch Strategy.
{mapping, "broker.shared_subscription_strategy", "emqx.shared_subscription_strategy", [
{default, round_robin},
{datatype,
@ -2320,11 +2320,37 @@ end}.
round_robin, %% round robin alive subscribers one message after another
sticky, %% pick a random subscriber and stick to it
hash, %% hash client ID to a group member
local, %% send to some locally available subscriber
hash_clientid,
hash_topic
]}}
]}.
%% @doc Per group Shared Subscription Dispatch Strategy
{mapping, "broker.$name.shared_subscription_strategy", "emqx.shared_subscription_strategy_per_group", [
{default, round_robin},
{datatype,
{enum,
[random, %% randomly pick a subscriber
round_robin, %% round robin alive subscribers one message after another
sticky, %% pick a random subscriber and stick to it
hash, %% hash client ID to a group member
local, %% send to some locally available subscriber
hash_clientid,
hash_topic
]}}
]}.
{translation, "emqx.shared_subscription_strategy_per_group", fun(Conf) ->
Conf0 = cuttlefish_variable:filter_by_prefix("broker", Conf),
Groups = lists:filtermap(fun({["broker", Group, "shared_subscription_strategy"], Strategy}) ->
{true, {Group, Strategy}};
(_) ->
false
end, Conf0),
maps:from_list(Groups)
end}.
%% @doc Enable or disable shared dispatch acknowledgement for QoS1 and QoS2 messages
{mapping, "broker.shared_dispatch_ack_enabled", "emqx.shared_dispatch_ack_enabled",
[ {default, false},

View File

@ -4,6 +4,7 @@
[{"4.3.14",
[{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.13",
@ -434,6 +435,7 @@
[{"4.3.14",
[{load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.13",

View File

@ -47,7 +47,12 @@
]).
%% for testing
-export([subscribers/2, ack_enabled/0]).
-ifdef(TEST).
-export([ subscribers/2
, ack_enabled/0
, strategy/1
]).
-endif.
%% gen_server callbacks
-export([ init/1
@ -63,6 +68,7 @@
-type strategy() :: random
| round_robin
| sticky
| local
| hash %% same as hash_clientid, backward compatible
| hash_clientid
| hash_topic.
@ -121,7 +127,7 @@ dispatch(Group, Topic, Delivery) ->
dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
#message{from = ClientId, topic = SourceTopic} = Msg,
case pick(strategy(), ClientId, SourceTopic, Group, Topic, FailedSubs) of
case pick(strategy(Group), ClientId, SourceTopic, Group, Topic, FailedSubs) of
false ->
{error, no_subscribers};
{Type, SubPid} ->
@ -133,9 +139,16 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
end
end.
-spec(strategy() -> strategy()).
strategy() ->
emqx:get_env(shared_subscription_strategy, random).
-spec(strategy(emqx_topic:group()) -> strategy()).
strategy(Group) ->
case emqx:get_env(shared_subscription_strategy_per_group, #{}) of
#{Group := Strategy} ->
Strategy;
_ ->
emqx:get_env(shared_subscription_strategy, random)
end.
-spec(ack_enabled() -> boolean()).
ack_enabled() ->
@ -267,6 +280,13 @@ do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
end.
pick_subscriber(_Group, _Topic, _Strategy, _ClientId, _SourceTopic, [Sub]) -> Sub;
pick_subscriber(Group, Topic, local, ClientId, SourceTopic, Subs) ->
case lists:filter(fun(Pid) -> erlang:node(Pid) =:= node() end, Subs) of
[_ | _] = LocalSubs ->
pick_subscriber(Group, Topic, random, ClientId, SourceTopic, LocalSubs);
[] ->
pick_subscriber(Group, Topic, random, ClientId, SourceTopic, Subs)
end;
pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) ->
Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, length(Subs)),
lists:nth(Nth, Subs).

View File

@ -35,6 +35,7 @@
all() -> emqx_ct:all(?SUITE).
init_per_suite(Config) ->
net_kernel:start(['master@127.0.0.1', longnames]),
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
@ -125,7 +126,7 @@ t_no_connection_nack(_) ->
emqx:publish(M#message{id = PacketId})
end,
SendF(1),
timer:sleep(200),
ct:sleep(200),
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
?assertMatch([#{packet_id := 1}], recv_msgs(1)),
@ -164,19 +165,24 @@ t_no_connection_nack(_) ->
ok.
t_random(_) ->
ok = ensure_config(random, true),
test_two_messages(random).
t_round_robin(_) ->
ok = ensure_config(round_robin, true),
test_two_messages(round_robin).
t_sticky(_) ->
ok = ensure_config(sticky, true),
test_two_messages(sticky).
t_hash(_) ->
test_two_messages(hash, false).
ok = ensure_config(hash, false),
test_two_messages(hash).
t_hash_clinetid(_) ->
test_two_messages(hash_clientid, false).
ok = ensure_config(hash_clientid, false),
test_two_messages(hash_clientid).
t_hash_topic(_) ->
ok = ensure_config(hash_topic, false),
@ -242,55 +248,48 @@ t_not_so_sticky(_) ->
ok.
test_two_messages(Strategy) ->
test_two_messages(Strategy, _WithAck = true).
test_two_messages(Strategy, <<"group1">>).
test_two_messages(Strategy, WithAck) ->
ok = ensure_config(Strategy, WithAck),
test_two_messages(Strategy, Group) ->
Topic = <<"foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 0}),
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 0}),
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 0, Topic, <<"hello2">>),
emqtt:subscribe(ConnPid1, {<<"$share/group1/foo/bar">>, 0}),
emqtt:subscribe(ConnPid2, {<<"$share/group1/foo/bar">>, 0}),
ct:sleep(100),
emqx:publish(Message1),
Me = self(),
WaitF = fun(ExpectedPayload) ->
case last_message(ExpectedPayload, [ConnPid1, ConnPid2]) of
{true, Pid} ->
Me ! {subscriber, Pid},
true;
Other ->
Other
end
end,
WaitF(<<"hello1">>),
UsedSubPid1 = receive {subscriber, P1} -> P1 end,
emqx_broker:publish(Message2),
WaitF(<<"hello2">>),
UsedSubPid2 = receive {subscriber, P2} -> P2 end,
case Strategy of
sticky -> ?assert(UsedSubPid1 =:= UsedSubPid2);
round_robin -> ?assert(UsedSubPid1 =/= UsedSubPid2);
hash -> ?assert(UsedSubPid1 =:= UsedSubPid2);
_ -> ok
end,
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
emqx:publish(Message2),
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
emqtt:stop(ConnPid1),
emqtt:stop(ConnPid2),
case Strategy of
sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2);
round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
hash -> ?assertEqual(UsedSubPid1, UsedSubPid2);
_ -> ok
end,
ok.
last_message(ExpectedPayload, Pids) ->
receive
{publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
ct:pal("~p ====== ~p", [Pids, Pid]),
ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]),
{true, Pid}
after 100 ->
ct:pal("not yet"),
<<"not yet?">>
end.
@ -314,6 +313,103 @@ t_uncovered_func(_) ->
ignored = emqx_shared_sub ! ignored,
{mnesia_table_event, []} = emqx_shared_sub ! {mnesia_table_event, []}.
t_per_group_config(_) ->
ok = ensure_group_config(#{
<<"local_group_fallback">> => local,
<<"local_group">> => local,
<<"round_robin_group">> => round_robin,
<<"sticky_group">> => sticky
}),
%% Each test is repeated 4 times because random strategy may technically pass the test
%% so we run 8 tests to make random pass in only 1/256 runs
test_two_messages(sticky, <<"sticky_group">>),
test_two_messages(sticky, <<"sticky_group">>),
test_two_messages(round_robin, <<"round_robin_group">>),
test_two_messages(round_robin, <<"round_robin_group">>),
test_two_messages(sticky, <<"sticky_group">>),
test_two_messages(sticky, <<"sticky_group">>),
test_two_messages(round_robin, <<"round_robin_group">>),
test_two_messages(round_robin, <<"round_robin_group">>).
t_local(_) ->
Node = start_slave('local_shared_sub_test', 21884),
GroupConfig = #{
<<"local_group_fallback">> => local,
<<"local_group">> => local,
<<"round_robin_group">> => round_robin,
<<"sticky_group">> => sticky
},
ok = ensure_group_config(Node, GroupConfig),
ok = ensure_group_config(GroupConfig),
Topic = <<"local_foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {port, 21884}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/local_group/local_foo/bar">>, 0}),
emqtt:subscribe(ConnPid2, {<<"$share/local_group/local_foo/bar">>, 0}),
ct:sleep(100),
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
emqx:publish(Message1),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
rpc:call(Node, emqx, publish, [Message2]),
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
RemoteLocalGroupStrategy = rpc:call(Node, emqx_shared_sub, strategy, [<<"local_group">>]),
emqtt:stop(ConnPid1),
emqtt:stop(ConnPid2),
stop_slave(Node),
?assertEqual(local, emqx_shared_sub:strategy(<<"local_group">>)),
?assertEqual(local, RemoteLocalGroupStrategy),
?assertNotEqual(UsedSubPid1, UsedSubPid2),
ok.
t_local_fallback(_) ->
ok = ensure_group_config(#{
<<"local_group_fallback">> => local,
<<"local_group">> => local,
<<"round_robin_group">> => round_robin,
<<"sticky_group">> => sticky
}),
Topic = <<"local_foo/bar">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
Node = start_slave('local_fallback_shared_sub_test', 11885),
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
{ok, _} = emqtt:connect(ConnPid1),
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
emqtt:subscribe(ConnPid1, {<<"$share/local_group_fallback/local_foo/bar">>, 0}),
emqx:publish(Message1),
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1]),
rpc:call(Node, emqx, publish, [Message2]),
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1]),
emqtt:stop(ConnPid1),
stop_slave(Node),
?assertEqual(UsedSubPid1, UsedSubPid2),
ok.
%%--------------------------------------------------------------------
%% help functions
%%--------------------------------------------------------------------
@ -326,6 +422,12 @@ ensure_config(Strategy, AckEnabled) ->
application:set_env(emqx, shared_dispatch_ack_enabled, AckEnabled),
ok.
ensure_group_config(Node, Group2Strategy) ->
rpc:call(Node, application, set_env, [emqx, shared_subscription_strategy_per_group, Group2Strategy]).
ensure_group_config(Group2Strategy) ->
application:set_env(emqx, shared_subscription_strategy_per_group, Group2Strategy).
subscribed(Group, Topic, Pid) ->
lists:member(Pid, emqx_shared_sub:subscribers(Group, Topic)).
@ -343,3 +445,50 @@ recv_msgs(Count, Msgs) ->
Msgs
end.
start_slave(Name, Port) ->
{ok, Node} = ct_slave:start(list_to_atom(atom_to_list(Name) ++ "@" ++ host()),
[{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}]),
pong = net_adm:ping(Node),
setup_node(Node, Port),
Node.
stop_slave(Node) ->
rpc:call(Node, ekka, leave, []),
ct_slave:stop(Node).
host() ->
[_, Host] = string:tokens(atom_to_list(node()), "@"), Host.
ebin_path() ->
string:join(["-pa" | lists:filter(fun is_lib/1, code:get_path())], " ").
is_lib(Path) ->
string:prefix(Path, code:lib_dir()) =:= nomatch.
setup_node(Node, Port) ->
EnvHandler =
fun(emqx) ->
application:set_env(
emqx,
listeners,
[#{listen_on => {{127,0,0,1},Port},
name => "internal",
opts => [{zone,internal}],
proto => tcp}]),
application:set_env(gen_rpc, port_discovery, manual),
ok;
(_) ->
ok
end,
[ok = rpc:call(Node, application, load, [App]) || App <- [gen_rpc, emqx]],
ok = rpc:call(Node, emqx_ct_helpers, start_apps, [[emqx], EnvHandler]),
rpc:call(Node, ekka, join, [node()]),
ok.