diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index cb984fcf6..d72e43963 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -55,6 +55,8 @@ only_once_tests() -> [ t_bridge_rule_action_source, t_cluster_group, + t_node_joins_existing_cluster, + t_cluster_node_down, t_multiple_topic_mappings ]. @@ -924,12 +926,16 @@ action_response(Selected, Envs, Args) -> }), ok. -wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout) -> - do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout, #{}). +wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, Timeout) -> + do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, Timeout, #{}). -do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout, Acc0) -> - case map_size(Acc0) =:= NPartitions of +do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, Timeout, Acc0) -> + AllPartitionsCovered = map_size(Acc0) =:= NPartitions, + PresentNodes = lists:usort([N || {_Partition, {N, _MemberId}} <- maps:to_list(Acc0)]), + AllNodesCovered = PresentNodes =:= lists:usort(Nodes), + case AllPartitionsCovered andalso AllNodesCovered of true -> + ct:pal("group balanced: ~p", [Acc0]), {ok, Acc0}; false -> receive @@ -942,7 +948,7 @@ do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout, Acc0) -> topic_assignments => TopicAssignments }, Acc = reconstruct_assignments_from_events(KafkaTopic, [Event], Acc0), - do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Timeout, Acc) + do_wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, Timeout, Acc) after Timeout -> {timeout, Acc0} end @@ -974,6 +980,123 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) -> Assignments ). +setup_group_subscriber_spy(Node) -> + TestPid = self(), + ok = erpc:call( + Node, + fun() -> + ok = meck:new(brod_group_subscriber_v2, [ + passthrough, no_link, no_history, non_strict + ]), + ok = meck:expect( + brod_group_subscriber_v2, + assignments_received, + fun(Pid, MemberId, GenerationId, TopicAssignments) -> + ?tp( + kafka_assignment, + #{ + node => node(), + pid => Pid, + member_id => MemberId, + generation_id => GenerationId, + topic_assignments => TopicAssignments + } + ), + TestPid ! + {kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}}, + meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) + end + ), + ok + end + ). + +wait_for_cluster_rpc(Node) -> + %% need to wait until the config handler is ready after + %% restarting during the cluster join. + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + true = is_pid(erpc:call(Node, erlang, whereis, [emqx_config_handler])) + ). + +setup_and_start_listeners(Node, NodeOpts) -> + erpc:call( + Node, + fun() -> + lists:foreach( + fun(Type) -> + Port = emqx_common_test_helpers:listener_port(NodeOpts, Type), + ok = emqx_config:put( + [listeners, Type, default, bind], + {{127, 0, 0, 1}, Port} + ), + ok = emqx_config:put_raw( + [listeners, Type, default, bind], + iolist_to_binary([<<"127.0.0.1:">>, integer_to_binary(Port)]) + ), + ok + end, + [tcp, ssl, ws, wss] + ), + ok = emqx_listeners:start(), + ok + end + ). + +cluster(Config) -> + PrivDataDir = ?config(priv_dir, Config), + Cluster = emqx_common_test_helpers:emqx_cluster( + [core, core], + [ + {apps, [emqx_conf, emqx_bridge, emqx_rule_engine]}, + {listener_ports, []}, + {peer_mod, slave}, + {priv_data_dir, PrivDataDir}, + {load_schema, true}, + {start_autocluster, true}, + {schema_mod, emqx_ee_conf_schema}, + {env_handler, fun + (emqx) -> + application:set_env(emqx, boot_modules, [broker, router]), + ok; + (emqx_conf) -> + ok; + (_) -> + ok + end} + ] + ), + ct:pal("cluster: ~p", [Cluster]), + Cluster. + +start_async_publisher(Config, KafkaTopic) -> + TId = ets:new(kafka_payloads, [public, ordered_set]), + Loop = fun Go() -> + receive + stop -> ok + after 0 -> + Payload = emqx_guid:to_hexstr(emqx_guid:gen()), + publish(Config, KafkaTopic, [#{key => Payload, value => Payload}]), + ets:insert(TId, {Payload}), + timer:sleep(400), + Go() + end + end, + Pid = spawn_link(Loop), + {TId, Pid}. + +stop_async_publisher(Pid) -> + MRef = monitor(process, Pid), + Pid ! stop, + receive + {'DOWN', MRef, process, Pid, _} -> + ok + after 1_000 -> + ct:fail("publisher didn't die") + end, + ok. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -1500,36 +1623,17 @@ t_bridge_rule_action_source(Config) -> ), ok. +%% checks that an existing cluster can be configured with a kafka +%% consumer bridge and that the consumers will distribute over the two +%% nodes. t_cluster_group(Config) -> - ct:timetrap({seconds, 180}), - TestPid = self(), + ct:timetrap({seconds, 150}), NPartitions = ?config(num_partitions, Config), KafkaTopic = ?config(kafka_topic, Config), KafkaName = ?config(kafka_name, Config), ResourceId = resource_id(Config), BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName), - PrivDataDir = ?config(priv_dir, Config), - Cluster = emqx_common_test_helpers:emqx_cluster( - [core, core], - [ - {apps, [emqx_conf, emqx_bridge, emqx_rule_engine]}, - {listener_ports, []}, - {peer_mod, slave}, - {priv_data_dir, PrivDataDir}, - {load_schema, true}, - {schema_mod, emqx_ee_conf_schema}, - {env_handler, fun - (emqx) -> - application:set_env(emqx, boot_modules, []), - ok; - (emqx_conf) -> - ok; - (_) -> - ok - end} - ] - ), - ct:pal("cluster: ~p", [Cluster]), + Cluster = cluster(Config), ?check_trace( begin Nodes = @@ -1540,47 +1644,19 @@ t_cluster_group(Config) -> on_exit(fun() -> lists:foreach( fun(N) -> + ct:pal("stopping ~p", [N]), ok = emqx_common_test_helpers:stop_slave(N) end, Nodes ) end), - lists:foreach( - fun(N) -> - erpc:call(N, fun() -> - ok = meck:new(brod_group_subscriber_v2, [ - passthrough, no_link, no_history, non_strict - ]), - ok = meck:expect( - brod_group_subscriber_v2, - assignments_received, - fun(Pid, MemberId, GenerationId, TopicAssignments) -> - TestPid ! - {kafka_assignment, node(), - {Pid, MemberId, GenerationId, TopicAssignments}}, - ?tp( - kafka_assignment, - #{ - node => node(), - pid => Pid, - member_id => MemberId, - generation_id => GenerationId, - topic_assignments => TopicAssignments - } - ), - meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments]) - end - ), - ok - end) - end, - Nodes - ), + lists:foreach(fun setup_group_subscriber_spy/1, Nodes), {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), length(Nodes), 15_000 ), + wait_for_cluster_rpc(N2), erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end), {ok, _} = snabbkaffe:receive_events(SRef0), lists:foreach( @@ -1598,8 +1674,7 @@ t_cluster_group(Config) -> %% sleep so that the two nodes have time to distribute the %% subscribers, rather than just one node containing all %% of them. - ct:sleep(10_000), - {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, 30_000), + {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000), lists:foreach( fun(N) -> ?assertEqual( @@ -1630,3 +1705,207 @@ t_cluster_group(Config) -> end ), ok. + +%% test that the kafka consumer group rebalances correctly if a bridge +%% already exists when a new EMQX node joins the cluster. +t_node_joins_existing_cluster(Config) -> + ct:timetrap({seconds, 150}), + TopicMapping = ?config(topic_mapping, Config), + [MQTTTopic] = [MQTTTopic || #{mqtt_topic := MQTTTopic} <- TopicMapping], + NPartitions = ?config(num_partitions, Config), + KafkaTopic = ?config(kafka_topic, Config), + KafkaName = ?config(kafka_name, Config), + ResourceId = resource_id(Config), + BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName), + Cluster = cluster(Config), + ?check_trace( + begin + [{Name1, Opts1}, {Name2, Opts2} | _] = Cluster, + N1 = emqx_common_test_helpers:start_slave(Name1, Opts1), + on_exit(fun() -> ok = emqx_common_test_helpers:stop_slave(N1) end), + setup_group_subscriber_spy(N1), + {{ok, _}, {ok, _}} = + ?wait_async_action( + erpc:call(N1, fun() -> {ok, _} = create_bridge(Config) end), + #{?snk_kind := kafka_consumer_subscriber_started}, + 15_000 + ), + ?assertMatch({ok, _}, erpc:call(N1, emqx_bridge, lookup, [BridgeId])), + {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, [N1], 30_000), + ?assertEqual( + {ok, connected}, + erpc:call(N1, emqx_resource_manager, health_check, [ResourceId]) + ), + + %% Now, we start the second node and have it join the cluster. + setup_and_start_listeners(N1, Opts1), + TCPPort1 = emqx_common_test_helpers:listener_port(Opts1, tcp), + {ok, C1} = emqtt:start_link([{port, TCPPort1}, {proto_ver, v5}]), + on_exit(fun() -> catch emqtt:stop(C1) end), + {ok, _} = emqtt:connect(C1), + {ok, _, [2]} = emqtt:subscribe(C1, MQTTTopic, 2), + + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), + 1, + 30_000 + ), + N2 = emqx_common_test_helpers:start_slave(Name2, Opts2), + on_exit(fun() -> ok = emqx_common_test_helpers:stop_slave(N2) end), + setup_group_subscriber_spy(N2), + Nodes = [N1, N2], + wait_for_cluster_rpc(N2), + + {ok, _} = snabbkaffe:receive_events(SRef0), + ?assertMatch({ok, _}, erpc:call(N2, emqx_bridge, lookup, [BridgeId])), + %% Give some time for the consumers in both nodes to + %% rebalance. + {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000), + %% Publish some messages so we can check they came from each node. + ?retry( + _Sleep1 = 100, + _Attempts1 = 50, + true = erpc:call(N2, emqx_router, has_routes, [MQTTTopic]) + ), + {ok, SRef1} = + snabbkaffe:subscribe( + ?match_event(#{ + ?snk_kind := kafka_consumer_handle_message, + ?snk_span := {complete, _} + }), + NPartitions, + 10_000 + ), + lists:foreach( + fun(N) -> + Key = <<"k", (integer_to_binary(N))/binary>>, + Val = <<"v", (integer_to_binary(N))/binary>>, + publish(Config, KafkaTopic, [#{key => Key, value => Val}]) + end, + lists:seq(1, NPartitions) + ), + {ok, _} = snabbkaffe:receive_events(SRef1), + + #{nodes => Nodes} + end, + fun(Res, Trace0) -> + #{nodes := Nodes} = Res, + Trace1 = ?of_kind(kafka_assignment, Trace0), + Assignments = reconstruct_assignments_from_events(KafkaTopic, Trace1), + NodeAssignments = lists:usort([ + N + || {_Partition, {N, _MemberId}} <- + maps:to_list(Assignments) + ]), + ?assertEqual(lists:usort(Nodes), NodeAssignments), + ?assertEqual(NPartitions, map_size(Assignments)), + Published = receive_published(#{n => NPartitions, timeout => 3_000}), + ct:pal("published:\n ~p", [Published]), + PublishingNodesFromTrace = + [ + N + || #{ + ?snk_kind := kafka_consumer_handle_message, + ?snk_span := start, + ?snk_meta := #{node := N} + } <- Trace0 + ], + ?assertEqual(lists:usort(Nodes), lists:usort(PublishingNodesFromTrace)), + ok + end + ), + ok. + +%% Checks that the consumers get rebalanced after an EMQX nodes goes +%% down. +t_cluster_node_down(Config) -> + ct:timetrap({seconds, 150}), + TopicMapping = ?config(topic_mapping, Config), + [MQTTTopic] = [MQTTTopic || #{mqtt_topic := MQTTTopic} <- TopicMapping], + NPartitions = ?config(num_partitions, Config), + KafkaTopic = ?config(kafka_topic, Config), + KafkaName = ?config(kafka_name, Config), + BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, KafkaName), + Cluster = cluster(Config), + ?check_trace( + begin + {_N2, Opts2} = lists:nth(2, Cluster), + Nodes = + [N1, N2 | _] = + lists:map( + fun({Name, Opts}) -> emqx_common_test_helpers:start_slave(Name, Opts) end, + Cluster + ), + on_exit(fun() -> + lists:foreach( + fun(N) -> + ct:pal("stopping ~p", [N]), + ok = emqx_common_test_helpers:stop_slave(N) + end, + Nodes + ) + end), + lists:foreach(fun setup_group_subscriber_spy/1, Nodes), + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := kafka_consumer_subscriber_started}), + length(Nodes), + 15_000 + ), + wait_for_cluster_rpc(N2), + erpc:call(N2, fun() -> {ok, _} = create_bridge(Config) end), + {ok, _} = snabbkaffe:receive_events(SRef0), + lists:foreach( + fun(N) -> + ?assertMatch( + {ok, _}, + erpc:call(N, emqx_bridge, lookup, [BridgeId]), + #{node => N} + ) + end, + Nodes + ), + {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, Nodes, 30_000), + + %% Now, we stop one of the nodes and watch the group + %% rebalance. + setup_and_start_listeners(N2, Opts2), + TCPPort = emqx_common_test_helpers:listener_port(Opts2, tcp), + {ok, C} = emqtt:start_link([{port, TCPPort}, {proto_ver, v5}]), + on_exit(fun() -> catch emqtt:stop(C) end), + {ok, _} = emqtt:connect(C), + {ok, _, [2]} = emqtt:subscribe(C, MQTTTopic, 2), + {TId, Pid} = start_async_publisher(Config, KafkaTopic), + + ct:pal("stopping node ~p", [N1]), + ok = emqx_common_test_helpers:stop_slave(N1), + + %% Give some time for the consumers in remaining node to + %% rebalance. + {ok, _} = wait_until_group_is_balanced(KafkaTopic, NPartitions, [N2], 60_000), + + ok = stop_async_publisher(Pid), + + #{nodes => Nodes, payloads_tid => TId} + end, + fun(Res, Trace0) -> + #{nodes := Nodes, payloads_tid := TId} = Res, + [_N1, N2 | _] = Nodes, + Trace1 = ?of_kind(kafka_assignment, Trace0), + Assignments = reconstruct_assignments_from_events(KafkaTopic, Trace1), + NodeAssignments = lists:usort([ + N + || {_Partition, {N, _MemberId}} <- + maps:to_list(Assignments) + ]), + %% The surviving node has all the partitions assigned to + %% it. + ?assertEqual([N2], NodeAssignments), + ?assertEqual(NPartitions, map_size(Assignments)), + NumPublished = ets:info(TId, size), + %% All published messages are eventually received. + Published = receive_published(#{n => NumPublished, timeout => 3_000}), + ct:pal("published:\n ~p", [Published]), + ok + end + ), + ok.