From c4840b30d2098c637330543a2a82cfddf360bc52 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 3 Jun 2024 16:51:01 +0200 Subject: [PATCH] fix(cluster-link): deduplicate routes down to dest cluster --- .../src/emqx_cluster_link_extrouter.erl | 2 +- .../test/emqx_cluster_link_SUITE.erl | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index e058cb816..dc6233d25 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -119,7 +119,7 @@ create_tables() -> match_routes(Topic) -> Matches = emqx_topic_index:matches(Topic, ?EXTROUTE_TAB, [unique]), %% `unique` opt is not enough, since we keep the original Topic as a part of RouteID - lists:usort([match_to_route(M) || M <- Matches]). + lists:ukeysort(#route.dest, [match_to_route(M) || M <- Matches]). lookup_routes(Topic) -> Pat = #extroute{entry = emqx_topic_index:make_key(Topic, '$1'), _ = '_'}, diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl index d957eb580..26b951c00 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl @@ -132,13 +132,24 @@ t_message_forwarding('end', Config) -> t_message_forwarding(Config) -> [SourceNode1 | _] = nodes_source(Config), - [TargetNode1 | _] = nodes_target(Config), + [TargetNode1, TargetNode2 | _] = nodes_target(Config), SourceC1 = start_client("t_message_forwarding", SourceNode1), - TargetC1 = start_client("t_message_forwarding", TargetNode1), + TargetC1 = start_client("t_message_forwarding1", TargetNode1), + TargetC2 = start_client("t_message_forwarding2", TargetNode2), {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/+">>, qos1), + {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/#">>, qos1), {ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}), {ok, _} = emqtt:publish(SourceC1, <<"t/42">>, <<"hello">>, qos1), - ?assertReceive({publish, #{topic := <<"t/42">>, payload := <<"hello">>}}), + ?assertReceive( + {publish, #{topic := <<"t/42">>, payload := <<"hello">>, client_pid := TargetC1}} + ), + ?assertReceive( + {publish, #{topic := <<"t/42">>, payload := <<"hello">>, client_pid := TargetC2}} + ), + ?assertNotReceive({publish, _Message = #{}}), + ok = emqtt:stop(SourceC1), + ok = emqtt:stop(TargetC1), + ok = emqtt:stop(TargetC2). ok = emqtt:stop(SourceC1), ok = emqtt:stop(TargetC1).