From d0df4de2a36a752f638a93c890bd84f895e3cb3d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 3 Jun 2024 16:53:38 +0200 Subject: [PATCH] test(cluster-link): add e2e replication actor GC testcase --- .../src/emqx_cluster_link_config.erl | 4 + .../src/emqx_cluster_link_extrouter.erl | 14 +- .../src/emqx_cluster_link_extrouter_gc.erl | 4 + .../src/emqx_cluster_link_mqtt.erl | 31 ++-- .../test/emqx_cluster_link_SUITE.erl | 136 ++++++++++++++---- 5 files changed, 150 insertions(+), 39 deletions(-) diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index 9d840256a..568d1a69d 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -13,7 +13,11 @@ -define(MQTT_HOST_OPTS, #{default_port => 1883}). +-ifndef(TEST). -define(DEFAULT_ACTOR_TTL, 30_000). +-else. +-define(DEFAULT_ACTOR_TTL, 3_000). +-endif. -export([ %% General diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index dc6233d25..1a69733ee 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -4,6 +4,8 @@ -module(emqx_cluster_link_extrouter). +-include_lib("snabbkaffe/include/trace.hrl"). + -export([create_tables/0]). %% Router API @@ -318,8 +320,16 @@ mnesia_actor_heartbeat(ActorID, Incarnation, TS) -> mnesia:abort({nonexistent_actor, ActorID}) end. -clean_incarnation(Rec) -> - transaction(fun ?MODULE:mnesia_clean_incarnation/1, [Rec]). +clean_incarnation(Rec = #actor{id = {Cluster, Actor}}) -> + case transaction(fun ?MODULE:mnesia_clean_incarnation/1, [Rec]) of + ok -> + ?tp(debug, clink_extrouter_actor_cleaned, #{ + cluster => Cluster, + actor => Actor + }); + Result -> + Result + end. mnesia_clean_incarnation(#actor{id = Actor, incarnation = Incarnation, lane = Lane}) -> case mnesia:read(?EXTROUTE_ACTOR_TAB, Actor, write) of diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl index e185c5137..89258b506 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter_gc.erl @@ -20,7 +20,11 @@ -define(SERVER, ?MODULE). +-ifndef(TEST). -define(REPEAT_GC_INTERVAL, 5_000). +-else. +-define(REPEAT_GC_INTERVAL, 1_000). +-endif. %% diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 6091b6ffc..62e021289 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -8,6 +8,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -behaviour(emqx_resource). -behaviour(ecpool_worker). @@ -123,15 +124,19 @@ on_query(_ResourceId, FwdMsg, #{pool_name := PoolName, topic := LinkTopic} = _St is_record(FwdMsg, message) -> #message{topic = Topic, qos = QoS} = FwdMsg, - handle_send_result( - ecpool:pick_and_do( - {PoolName, Topic}, - fun(ConnPid) -> - emqtt:publish(ConnPid, LinkTopic, ?ENCODE(FwdMsg), QoS) - end, - no_handover - ) - ). + PubResult = ecpool:pick_and_do( + {PoolName, Topic}, + fun(ConnPid) -> + emqtt:publish(ConnPid, LinkTopic, ?ENCODE(FwdMsg), QoS) + end, + no_handover + ), + ?tp_ignore_side_effects_in_prod(clink_message_forwarded, #{ + pool => PoolName, + message => FwdMsg, + pub_result => PubResult + }), + handle_send_result(PubResult). on_query_async( _ResourceId, FwdMsg, CallbackIn, #{pool_name := PoolName, topic := LinkTopic} = _State @@ -145,7 +150,13 @@ on_query_async( %% #delivery{} record has no valuable data for a remote link... Payload = ?ENCODE(FwdMsg), %% TODO: check override QOS requirements (if any) - emqtt:publish_async(ConnPid, LinkTopic, Payload, QoS, Callback) + PubResult = emqtt:publish_async(ConnPid, LinkTopic, Payload, QoS, Callback), + ?tp_ignore_side_effects_in_prod(clink_message_forwarded, #{ + pool => PoolName, + message => FwdMsg, + pub_result => PubResult + }), + PubResult end, no_handover ). diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl index 26b951c00..922af832f 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl @@ -7,6 +7,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -17,17 +18,10 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - SourceCluster = start_source_cluster(Config), - TargetCluster = start_target_cluster(Config), - [ - {source_cluster, SourceCluster}, - {target_cluster, TargetCluster} - | Config - ]. + Config. -end_per_suite(Config) -> - ok = emqx_cth_cluster:stop(?config(source_cluster, Config)), - ok = emqx_cth_cluster:stop(?config(target_cluster, Config)). +end_per_suite(_Config) -> + ok. init_per_testcase(TCName, Config) -> emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config). @@ -37,7 +31,7 @@ end_per_testcase(TCName, Config) -> %% -start_source_cluster(Config) -> +mk_source_cluster(BaseName, Config) -> SourceConf = "cluster {" "\n name = cl.source" @@ -51,15 +45,15 @@ start_source_cluster(Config) -> "\n ]}", SourceApps1 = [{emqx_conf, combine([conf_log(), SourceConf])}], SourceApps2 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(41883), SourceConf])}], - emqx_cth_cluster:start( + emqx_cth_cluster:mk_nodespecs( [ - {emqx_clink_msgfwd_source1, #{apps => SourceApps1}}, - {emqx_clink_msgfwd_source2, #{apps => SourceApps2}} + {mk_nodename(BaseName, s1), #{apps => SourceApps1}}, + {mk_nodename(BaseName, s2), #{apps => SourceApps2}} ], #{work_dir => emqx_cth_suite:work_dir(Config)} ). -start_target_cluster(Config) -> +mk_target_cluster(BaseName, Config) -> TargetConf = "cluster {" "\n name = cl.target" @@ -73,14 +67,17 @@ start_target_cluster(Config) -> "\n ]}", TargetApps1 = [{emqx_conf, combine([conf_log(), TargetConf])}], TargetApps2 = [{emqx_conf, combine([conf_log(), conf_mqtt_listener(31883), TargetConf])}], - emqx_cth_cluster:start( + emqx_cth_cluster:mk_nodespecs( [ - {emqx_clink_msgfwd_target1, #{apps => TargetApps1, base_port => 20100}}, - {emqx_clink_msgfwd_target2, #{apps => TargetApps2, base_port => 20200}} + {mk_nodename(BaseName, t1), #{apps => TargetApps1, base_port => 20100}}, + {mk_nodename(BaseName, t2), #{apps => TargetApps2, base_port => 20200}} ], #{work_dir => emqx_cth_suite:work_dir(Config)} ). +mk_nodename(BaseName, Suffix) -> + binary_to_atom(fmt("emqx_clink_~s_~s", [BaseName, Suffix])). + conf_mqtt_listener(LPort) when is_integer(LPort) -> fmt("listeners.tcp.clink { bind = ~p }", [LPort]); conf_mqtt_listener(_) -> @@ -92,8 +89,7 @@ conf_log() -> combine([Entry | Rest]) -> lists:foldl(fun emqx_cth_suite:merge_config/2, Entry, Rest). -start_cluster_link(Config) -> - Nodes = nodes_all(Config), +start_cluster_link(Nodes, Config) -> [{ok, Apps}] = lists:usort( erpc:multicall(Nodes, emqx_cth_suite, start_apps, [ [emqx_cluster_link], @@ -115,20 +111,27 @@ nodes_all(Config) -> nodes_source(Config) ++ nodes_target(Config). nodes_source(Config) -> - ?config(source_cluster, Config). + ?config(source_nodes, Config). nodes_target(Config) -> - ?config(target_cluster, Config). + ?config(target_nodes, Config). %% t_message_forwarding('init', Config) -> - Apps = start_cluster_link(Config), + SourceNodes = emqx_cth_cluster:start(mk_source_cluster(?FUNCTION_NAME, Config)), + TargetNodes = emqx_cth_cluster:start(mk_target_cluster(?FUNCTION_NAME, Config)), + _Apps = start_cluster_link(SourceNodes ++ TargetNodes, Config), ok = snabbkaffe:start_trace(), - [{tc_apps, Apps} | Config]; + [ + {source_nodes, SourceNodes}, + {target_nodes, TargetNodes} + | Config + ]; t_message_forwarding('end', Config) -> ok = snabbkaffe:stop(), - stop_cluster_link(Config). + ok = emqx_cth_cluster:stop(?config(source_nodes, Config)), + ok = emqx_cth_cluster:stop(?config(target_nodes, Config)). t_message_forwarding(Config) -> [SourceNode1 | _] = nodes_source(Config), @@ -150,11 +153,90 @@ t_message_forwarding(Config) -> ok = emqtt:stop(SourceC1), ok = emqtt:stop(TargetC1), ok = emqtt:stop(TargetC2). + +t_target_extrouting_gc('init', Config) -> + SourceCluster = mk_source_cluster(?FUNCTION_NAME, Config), + SourceNodes = emqx_cth_cluster:start(SourceCluster), + TargetCluster = mk_target_cluster(?FUNCTION_NAME, Config), + TargetNodes = emqx_cth_cluster:start(TargetCluster), + _Apps = start_cluster_link(SourceNodes ++ TargetNodes, Config), + ok = snabbkaffe:start_trace(), + [ + {source_cluster, SourceCluster}, + {source_nodes, SourceNodes}, + {target_cluster, TargetCluster}, + {target_nodes, TargetNodes} + | Config + ]; +t_target_extrouting_gc('end', Config) -> + ok = snabbkaffe:stop(), + ok = emqx_cth_cluster:stop(?config(source_nodes, Config)). + +t_target_extrouting_gc(Config) -> + [SourceNode1 | _] = nodes_source(Config), + [TargetNode1, TargetNode2 | _] = nodes_target(Config), + SourceC1 = start_client("t_target_extrouting_gc", SourceNode1), + TargetC1 = start_client_unlink("t_target_extrouting_gc1", TargetNode1), + TargetC2 = start_client_unlink("t_target_extrouting_gc2", TargetNode2), + {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/#">>, qos1), + {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/+">>, qos1), + {ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}), + {ok, _} = emqtt:publish(SourceC1, <<"t/1">>, <<"HELLO1">>, qos1), + {ok, _} = emqtt:publish(SourceC1, <<"t/2/ext">>, <<"HELLO2">>, qos1), + {ok, _} = emqtt:publish(SourceC1, <<"t/3/ext">>, <<"HELLO3">>, qos1), + Pubs1 = [M || {publish, M} <- ?drainMailbox(1_000)], + {ok, _} = ?wait_async_action( + emqx_cth_cluster:stop_node(TargetNode1), + #{?snk_kind := clink_extrouter_actor_cleaned, cluster := <<"cl.target">>} + ), + {ok, _} = emqtt:publish(SourceC1, <<"t/4/ext">>, <<"HELLO4">>, qos1), + {ok, _} = emqtt:publish(SourceC1, <<"t/5">>, <<"HELLO5">>, qos1), + Pubs2 = [M || {publish, M} <- ?drainMailbox(1_000)], + {ok, _} = ?wait_async_action( + emqx_cth_cluster:stop_node(TargetNode2), + #{?snk_kind := clink_extrouter_actor_cleaned, cluster := <<"cl.target">>} + ), ok = emqtt:stop(SourceC1), - ok = emqtt:stop(TargetC1). + %% Verify that extrouter table eventually becomes empty. + ?assertEqual( + [], + erpc:call(SourceNode1, emqx_cluster_link_extrouter, topics, []), + { + erpc:call(SourceNode1, ets, tab2list, [emqx_external_router_actor]), + erpc:call(SourceNode1, ets, tab2list, [emqx_external_router_route]) + } + ), + %% Verify all relevant messages were forwarded. + ?assertMatch( + [ + #{topic := <<"t/1">>, payload := <<"HELLO1">>, client_pid := _C1}, + #{topic := <<"t/1">>, payload := <<"HELLO1">>, client_pid := _C2}, + #{topic := <<"t/2/ext">>, payload := <<"HELLO2">>}, + #{topic := <<"t/3/ext">>, payload := <<"HELLO3">>}, + #{topic := <<"t/5">>, payload := <<"HELLO5">>} + ], + lists:sort(emqx_utils_maps:key_comparer(topic), Pubs1 ++ Pubs2) + ), + %% Verify there was no unnecessary forwarding. + Trace = snabbkaffe:collect_trace(), + ?assertMatch( + [ + #{message := #message{topic = <<"t/1">>, payload = <<"HELLO1">>}}, + #{message := #message{topic = <<"t/2/ext">>, payload = <<"HELLO2">>}}, + #{message := #message{topic = <<"t/3/ext">>, payload = <<"HELLO3">>}}, + #{message := #message{topic = <<"t/5">>, payload = <<"HELLO5">>}} + ], + ?of_kind(clink_message_forwarded, Trace), + Trace + ). %% +start_client_unlink(ClientId, Node) -> + Client = start_client(ClientId, Node), + _ = erlang:unlink(Client), + Client. + start_client(ClientId, Node) -> Port = tcp_port(Node), {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}, {port, Port}]), @@ -166,4 +248,4 @@ tcp_port(Node) -> Port. fmt(Fmt, Args) -> - io_lib:format(Fmt, Args). + emqx_utils:format(Fmt, Args).