fix(cluster-link): deduplicate routes down to dest cluster

This commit is contained in:
Andrew Mayorov 2024-06-03 16:51:01 +02:00 committed by Serge Tupchii
parent ede35df24a
commit c4840b30d2
2 changed files with 15 additions and 4 deletions

View File

@ -119,7 +119,7 @@ create_tables() ->
match_routes(Topic) ->
Matches = emqx_topic_index:matches(Topic, ?EXTROUTE_TAB, [unique]),
%% `unique` opt is not enough, since we keep the original Topic as a part of RouteID
lists:usort([match_to_route(M) || M <- Matches]).
lists:ukeysort(#route.dest, [match_to_route(M) || M <- Matches]).
lookup_routes(Topic) ->
Pat = #extroute{entry = emqx_topic_index:make_key(Topic, '$1'), _ = '_'},

View File

@ -132,13 +132,24 @@ t_message_forwarding('end', Config) ->
t_message_forwarding(Config) ->
[SourceNode1 | _] = nodes_source(Config),
[TargetNode1 | _] = nodes_target(Config),
[TargetNode1, TargetNode2 | _] = nodes_target(Config),
SourceC1 = start_client("t_message_forwarding", SourceNode1),
TargetC1 = start_client("t_message_forwarding", TargetNode1),
TargetC1 = start_client("t_message_forwarding1", TargetNode1),
TargetC2 = start_client("t_message_forwarding2", TargetNode2),
{ok, _, _} = emqtt:subscribe(TargetC1, <<"t/+">>, qos1),
{ok, _, _} = emqtt:subscribe(TargetC2, <<"t/#">>, qos1),
{ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}),
{ok, _} = emqtt:publish(SourceC1, <<"t/42">>, <<"hello">>, qos1),
?assertReceive({publish, #{topic := <<"t/42">>, payload := <<"hello">>}}),
?assertReceive(
{publish, #{topic := <<"t/42">>, payload := <<"hello">>, client_pid := TargetC1}}
),
?assertReceive(
{publish, #{topic := <<"t/42">>, payload := <<"hello">>, client_pid := TargetC2}}
),
?assertNotReceive({publish, _Message = #{}}),
ok = emqtt:stop(SourceC1),
ok = emqtt:stop(TargetC1),
ok = emqtt:stop(TargetC2).
ok = emqtt:stop(SourceC1),
ok = emqtt:stop(TargetC1).