diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index 3b6357d7d..aeb44d529 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -350,111 +350,6 @@ t_round_robin_per_group_even_distribution_two_groups(_) -> ), ok. -t_round_robin_per_group_two_nodes_publish_to_same_node(_) -> - ensure_config(round_robin_per_group), - Node = start_slave('rr_p_g_t_n', 31337), - ensure_node_config(Node, round_robin_per_group), - - %% connect two subscribers on each node - Topic = <<"foo/bar">>, - {ok, Subscriber0} = emqtt:start_link([{clientid, <<"C0">>}]), - {ok, Subscriber1} = emqtt:start_link([{clientid, <<"C1">>}]), - {ok, Subscriber2} = emqtt:start_link([{clientid, <<"C2">>}, {port, 31337}]), - {ok, Subscriber3} = emqtt:start_link([{clientid, <<"C3">>}, {port, 31337}]), - SubscriberPids = [Subscriber0, Subscriber1, Subscriber2, Subscriber3], - lists:foreach(fun(P) -> emqtt:connect(P) end, SubscriberPids), - - %% node 1 subscribers - emqtt:subscribe(Subscriber0, {<<"$share/group1/", Topic/binary>>, 0}), - emqtt:subscribe(Subscriber1, {<<"$share/group1/", Topic/binary>>, 0}), - %% node 2 subscribers - emqtt:subscribe(Subscriber2, {<<"$share/group1/", Topic/binary>>, 0}), - emqtt:subscribe(Subscriber3, {<<"$share/group1/", Topic/binary>>, 0}), - - publish_fire_and_forget(10, Topic), - - AllMessages = recv_msgs(10), - MessagesBySubscriber = lists:foldl( - fun(#{client_pid := Subscriber, payload := Payload}, Acc) -> - maps:update_with(Subscriber, fun(T) -> [Payload | T] end, [Payload], Acc) - end, - maps:new(), - AllMessages - ), - lists:foreach(fun(Pid) -> emqtt:stop(Pid) end, SubscriberPids), - stop_slave(Node), - - ?assertEqual( - #{ - Subscriber0 => [<<"0">>, <<"4">>, <<"8">>], - Subscriber1 => [<<"1">>, <<"5">>, <<"9">>], - Subscriber2 => [<<"2">>, <<"6">>], - Subscriber3 => [<<"3">>, <<"7">>] - }, - MessagesBySubscriber - ). - -t_round_robin_per_group_two_nodes_alternating_publish(_) -> - ensure_config(round_robin_per_group), - Node = start_slave('rr_p_g_t_n_2', 41338), - ensure_node_config(Node, round_robin_per_group), - - %% connect two subscribers on each node - Topic = <<"foo/bar">>, - {ok, Subscriber0} = emqtt:start_link([{clientid, <<"C0">>}]), - {ok, Subscriber1} = emqtt:start_link([{clientid, <<"C1">>}]), - {ok, Subscriber2} = emqtt:start_link([{clientid, <<"C2">>}, {port, 41338}]), - {ok, Subscriber3} = emqtt:start_link([{clientid, <<"C3">>}, {port, 41338}]), - SubscriberPids = [Subscriber0, Subscriber1, Subscriber2, Subscriber3], - lists:foreach(fun(P) -> emqtt:connect(P) end, SubscriberPids), - - %% node 1 subscribers - emqtt:subscribe(Subscriber0, {<<"$share/group1/", Topic/binary>>, 0}), - emqtt:subscribe(Subscriber1, {<<"$share/group1/", Topic/binary>>, 0}), - %% node 2 subscribers - emqtt:subscribe(Subscriber2, {<<"$share/group1/", Topic/binary>>, 0}), - emqtt:subscribe(Subscriber3, {<<"$share/group1/", Topic/binary>>, 0}), - - %% alternate publish messages between the nodes - lists:foreach( - fun(I) -> - Message = erlang:integer_to_binary(I), - {ok, PublisherPid} = - case I rem 2 of - 0 -> emqtt:start_link(); - 1 -> emqtt:start_link([{port, 41338}]) - end, - {ok, _} = emqtt:connect(PublisherPid), - emqtt:publish(PublisherPid, Topic, Message), - emqtt:stop(PublisherPid), - ct:sleep(50) - end, - lists:seq(0, 9) - ), - - AllMessages = recv_msgs(10), - MessagesBySubscriber = lists:foldl( - fun(#{client_pid := Subscriber, payload := Payload}, Acc) -> - maps:update_with(Subscriber, fun(T) -> [Payload | T] end, [Payload], Acc) - end, - maps:new(), - AllMessages - ), - lists:foreach(fun(Pid) -> emqtt:stop(Pid) end, SubscriberPids), - stop_slave(Node), - - %% this result show that when clustered round_robin_per_group behaves like the normal round_robin - %% strategy meaning that subscribers receive two consecutive messages which is not ideal - ?assertEqual( - #{ - Subscriber0 => [<<"0">>, <<"1">>, <<"8">>, <<"9">>], - Subscriber1 => [<<"2">>, <<"3">>], - Subscriber2 => [<<"4">>, <<"5">>], - Subscriber3 => [<<"6">>, <<"7">>] - }, - MessagesBySubscriber - ). - t_sticky(_) -> ok = ensure_config(sticky, true), test_two_messages(sticky).