diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link.erl b/apps/emqx_cluster_link/src/emqx_cluster_link.erl index 19211cb56..d91b33c3a 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link.erl @@ -205,7 +205,7 @@ actor_init( {error, <<"bad_remote_cluster_link_name">>} end; #{enable := false} -> - {error, <<"clster_link_disabled">>} + {error, <<"cluster_link_disabled">>} end. actor_init_ack(#{actor := Actor}, Res, MsgIn) -> diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl index fdcbd91c7..b7d165419 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_router_syncer.erl @@ -16,6 +16,13 @@ push_persistent_route/4 ]). +%% debug/test helpers +-export([ + status/1, + where/1, + where/2 +]). + -export([ start_link_actor/4, start_link_syncer/4 @@ -46,8 +53,8 @@ -define(CLIENT_NAME(Cluster), ?NAME(Cluster, client)). -define(SYNCER_NAME(Cluster), ?NAME(Cluster, syncer)). -define(SYNCER_REF(Cluster), {via, gproc, ?SYNCER_NAME(Cluster)}). --define(ACTOR_REF(Cluster), {via, gproc, ?NAME(Cluster, actor)}). -define(ACTOR_NAME(Cluster), ?NAME(Cluster, actor)). +-define(ACTOR_REF(Cluster), {via, gproc, ?ACTOR_NAME(Cluster)}). -define(MAX_BATCH_SIZE, 4000). -define(MIN_SYNC_INTERVAL, 10). @@ -85,6 +92,22 @@ end ). +-record(st, { + target :: binary(), + actor :: binary(), + incarnation :: non_neg_integer(), + client :: undefined | pid(), + bootstrapped :: boolean(), + reconnect_timer :: undefined | reference(), + heartbeat_timer :: undefined | reference(), + actor_init_req_id :: undefined | binary(), + actor_init_timer :: undefined | reference(), + remote_actor_info :: undefined | map(), + status :: connecting | connected | disconnected, + error :: undefined | term(), + link_conf :: map() +}). + push(TargetCluster, OpName, Topic, ID) -> do_push(?SYNCER_NAME(TargetCluster), OpName, Topic, ID). @@ -99,6 +122,24 @@ do_push(SyncerName, OpName, Topic, ID) -> dropped end. +%% Debug/test helpers +where(Cluster) -> + where(actor, Cluster). + +where(actor, Cluster) -> + gproc:where(?ACTOR_NAME(Cluster)); +where(ps_actor, Cluster) -> + gproc:where(?PS_ACTOR_NAME(Cluster)). + +status(Cluster) -> + case where(actor, Cluster) of + Pid when is_pid(Pid) -> + #st{error = Err, status = Status} = sys:get_state(Pid), + #{error => Err, status => Status}; + undefined -> + undefined + end. + %% Supervisor: %% 1. Actor + MQTT Client %% 2. Syncer @@ -290,24 +331,6 @@ syncer_spec(ChildID, Actor, Incarnation, SyncerRef, ClientName) -> type => worker }. -%% - --record(st, { - target :: binary(), - actor :: binary(), - incarnation :: non_neg_integer(), - client :: undefined | pid(), - bootstrapped :: boolean(), - reconnect_timer :: undefined | reference(), - heartbeat_timer :: undefined | reference(), - actor_init_req_id :: undefined | binary(), - actor_init_timer :: undefined | reference(), - remote_actor_info :: undefined | map(), - status :: connecting | connected | disconnected, - error :: undefined | term(), - link_conf :: map() -}). - mk_state(#{upstream := TargetCluster} = LinkConf, Actor, Incarnation) -> #st{ target = TargetCluster, @@ -361,6 +384,12 @@ handle_info( remote_link_proto_ver => maps:get(proto_ver, AckInfoMap, undefined) }), _ = maybe_alarm(Reason, St1), + ?tp( + debug, + clink_handshake_error, + #{actor => {St1#st.actor, St1#st.incarnation}, reason => Reason} + ), + %% TODO: retry after a timeout? {noreply, St1#st{error = Reason, status = disconnected}} end; handle_info({publish, #{}}, St) -> @@ -376,7 +405,7 @@ handle_info({timeout, TRef, actor_reinit}, St = #st{actor_init_timer = TRef}) -> Reason = init_timeout, _ = maybe_alarm(Reason, St), {noreply, - init_remote_actor(St#st{reconnect_timer = undefined, status = disconnected, error = Reason})}; + init_remote_actor(St#st{actor_init_timer = undefined, status = disconnected, error = Reason})}; handle_info({timeout, TRef, _Heartbeat}, St = #st{heartbeat_timer = TRef}) -> {noreply, process_heartbeat(St#st{heartbeat_timer = undefined})}; %% Stale timeout. @@ -386,7 +415,8 @@ handle_info(Info, St) -> ?SLOG(warning, #{msg => "unexpected_info", info => Info}), {noreply, St}. -terminate(_Reason, _State) -> +terminate(_Reason, State) -> + _ = maybe_deactivate_alarm(State), ok. process_connect(St = #st{target = TargetCluster, actor = Actor, link_conf = Conf}) -> @@ -507,6 +537,11 @@ run_bootstrap(St = #st{target = TargetCluster, link_conf = #{topics := Topics}}) run_bootstrap(Bootstrap, St) -> case emqx_cluster_link_router_bootstrap:next_batch(Bootstrap) of done -> + ?tp( + debug, + clink_route_bootstrap_complete, + #{actor => {St#st.actor, St#st.incarnation}, cluster => St#st.target} + ), process_bootstrapped(St); {Batch, NBootstrap} -> %% TODO: Better error handling. diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl index 922af832f..e38cd3999 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_SUITE.erl @@ -15,7 +15,17 @@ %% all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, shared_subs}, + {group, non_shared_subs} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + [ + {shared_subs, AllTCs}, + {non_shared_subs, AllTCs} + ]. init_per_suite(Config) -> Config. @@ -23,6 +33,14 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. +init_per_group(shared_subs, Config) -> + [{is_shared_sub, true} | Config]; +init_per_group(non_shared_subs, Config) -> + [{is_shared_sub, false} | Config]. + +end_per_group(_Group, _Config) -> + ok. + init_per_testcase(TCName, Config) -> emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config). @@ -136,11 +154,14 @@ t_message_forwarding('end', Config) -> t_message_forwarding(Config) -> [SourceNode1 | _] = nodes_source(Config), [TargetNode1, TargetNode2 | _] = nodes_target(Config), + SourceC1 = start_client("t_message_forwarding", SourceNode1), TargetC1 = start_client("t_message_forwarding1", TargetNode1), TargetC2 = start_client("t_message_forwarding2", TargetNode2), - {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/+">>, qos1), - {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/#">>, qos1), + IsShared = ?config(is_shared_sub, Config), + + {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, <<"t/+">>), qos1), + {ok, _, _} = emqtt:subscribe(TargetC2, maybe_shared_topic(IsShared, <<"t/#">>), qos1), {ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}), {ok, _} = emqtt:publish(SourceC1, <<"t/42">>, <<"hello">>, qos1), ?assertReceive( @@ -178,8 +199,10 @@ t_target_extrouting_gc(Config) -> SourceC1 = start_client("t_target_extrouting_gc", SourceNode1), TargetC1 = start_client_unlink("t_target_extrouting_gc1", TargetNode1), TargetC2 = start_client_unlink("t_target_extrouting_gc2", TargetNode2), - {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/#">>, qos1), - {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/+">>, qos1), + IsShared = ?config(is_shared_sub, Config), + + {ok, _, _} = emqtt:subscribe(TargetC1, maybe_shared_topic(IsShared, <<"t/#">>), qos1), + {ok, _, _} = emqtt:subscribe(TargetC2, maybe_shared_topic(IsShared, <<"t/+">>), qos1), {ok, _} = ?block_until(#{?snk_kind := clink_route_sync_complete}), {ok, _} = emqtt:publish(SourceC1, <<"t/1">>, <<"HELLO1">>, qos1), {ok, _} = emqtt:publish(SourceC1, <<"t/2/ext">>, <<"HELLO2">>, qos1), @@ -232,6 +255,11 @@ t_target_extrouting_gc(Config) -> %% +maybe_shared_topic(true = _IsShared, Topic) -> + <<"$share/test-group/", Topic/binary>>; +maybe_shared_topic(false = _IsShared, Topic) -> + Topic. + start_client_unlink(ClientId, Node) -> Client = start_client(ClientId, Node), _ = erlang:unlink(Client), diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl new file mode 100644 index 000000000..c5ec8da6c --- /dev/null +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl @@ -0,0 +1,132 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(API_PATH, emqx_mgmt_api_test_util:api_path(["cluster", "links"])). +-define(CONF_PATH, [cluster, links]). + +-define(CACERT, << + "-----BEGIN CERTIFICATE-----\n" + "MIIDUTCCAjmgAwIBAgIJAPPYCjTmxdt/MA0GCSqGSIb3DQEBCwUAMD8xCzAJBgNV\n" + "BAYTAkNOMREwDwYDVQQIDAhoYW5nemhvdTEMMAoGA1UECgwDRU1RMQ8wDQYDVQQD\n" + "DAZSb290Q0EwHhcNMjAwNTA4MDgwNjUyWhcNMzAwNTA2MDgwNjUyWjA/MQswCQYD\n" + "VQQGEwJDTjERMA8GA1UECAwIaGFuZ3pob3UxDDAKBgNVBAoMA0VNUTEPMA0GA1UE\n" + "AwwGUm9vdENBMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzcgVLex1\n" + "EZ9ON64EX8v+wcSjzOZpiEOsAOuSXOEN3wb8FKUxCdsGrsJYB7a5VM/Jot25Mod2\n" + "juS3OBMg6r85k2TWjdxUoUs+HiUB/pP/ARaaW6VntpAEokpij/przWMPgJnBF3Ur\n" + "MjtbLayH9hGmpQrI5c2vmHQ2reRZnSFbY+2b8SXZ+3lZZgz9+BaQYWdQWfaUWEHZ\n" + "uDaNiViVO0OT8DRjCuiDp3yYDj3iLWbTA/gDL6Tf5XuHuEwcOQUrd+h0hyIphO8D\n" + "tsrsHZ14j4AWYLk1CPA6pq1HIUvEl2rANx2lVUNv+nt64K/Mr3RnVQd9s8bK+TXQ\n" + "KGHd2Lv/PALYuwIDAQABo1AwTjAdBgNVHQ4EFgQUGBmW+iDzxctWAWxmhgdlE8Pj\n" + "EbQwHwYDVR0jBBgwFoAUGBmW+iDzxctWAWxmhgdlE8PjEbQwDAYDVR0TBAUwAwEB\n" + "/zANBgkqhkiG9w0BAQsFAAOCAQEAGbhRUjpIred4cFAFJ7bbYD9hKu/yzWPWkMRa\n" + "ErlCKHmuYsYk+5d16JQhJaFy6MGXfLgo3KV2itl0d+OWNH0U9ULXcglTxy6+njo5\n" + "CFqdUBPwN1jxhzo9yteDMKF4+AHIxbvCAJa17qcwUKR5MKNvv09C6pvQDJLzid7y\n" + "E2dkgSuggik3oa0427KvctFf8uhOV94RvEDyqvT5+pgNYZ2Yfga9pD/jjpoHEUlo\n" + "88IGU8/wJCx3Ds2yc8+oBg/ynxG8f/HmCC1ET6EHHoe2jlo8FpU/SgGtghS1YL30\n" + "IWxNsPrUP+XsZpBJy/mvOhE5QXo6Y35zDqqj8tI7AGmAWu22jg==\n" + "-----END CERTIFICATE-----" +>>). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + %% This is called by emqx_machine in EMQX release + emqx_otel_app:configure_otel_deps(), + Apps = emqx_cth_suite:start( + [ + emqx_conf, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}, + emqx_cluster_link + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + Auth = auth_header(), + [{suite_apps, Apps}, {auth, Auth} | Config]. + +end_per_suite(Config) -> + emqx_cth_suite:stop(?config(suite_apps, Config)), + emqx_config:delete_override_conf_files(), + ok. + +auth_header() -> + {ok, API} = emqx_common_test_http:create_default_app(), + emqx_common_test_http:auth_header(API). + +init_per_testcase(_TC, Config) -> + {ok, _} = emqx_cluster_link_config:update([]), + Config. + +end_per_testcase(_TC, _Config) -> + ok. + +t_put_get_valid(Config) -> + Auth = ?config(auth, Config), + Path = ?API_PATH, + {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), + ?assertMatch([], emqx_utils_json:decode(Resp)), + + Link1 = #{ + <<"pool_size">> => 1, + <<"server">> => <<"emqxcl_2.nohost:31883">>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"name">> => <<"emqcl_1">> + }, + Link2 = #{ + <<"pool_size">> => 1, + <<"server">> => <<"emqxcl_2.nohost:41883">>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"name">> => <<"emqcl_2">> + }, + ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link1, Link2])), + + {ok, Resp1} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), + ?assertMatch([Link1, Link2], emqx_utils_json:decode(Resp1)), + + DisabledLink1 = Link1#{<<"enable">> => false}, + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [DisabledLink1, Link2]) + ), + + {ok, Resp2} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), + ?assertMatch([DisabledLink1, Link2], emqx_utils_json:decode(Resp2)), + + SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT}, + SSLLink1 = Link1#{<<"ssl">> => SSL}, + ?assertMatch( + {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link2, SSLLink1]) + ), + {ok, Resp3} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), + + ?assertMatch( + [Link2, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}], + emqx_utils_json:decode(Resp3) + ). + +t_put_invalid(Config) -> + Auth = ?config(auth, Config), + Path = ?API_PATH, + Link = #{ + <<"pool_size">> => 1, + <<"server">> => <<"emqxcl_2.nohost:31883">>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"name">> => <<"emqcl_1">> + }, + ?assertMatch( + {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link, Link]) + ), + ?assertMatch( + {error, {_, 400, _}}, + emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [maps:remove(<<"name">>, Link)]) + ). diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl new file mode 100644 index 000000000..e09f12ce4 --- /dev/null +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_config_SUITE.erl @@ -0,0 +1,647 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_config_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_testcase(TCName, Config) -> + emqx_common_test_helpers:init_per_testcase(?MODULE, TCName, Config). + +end_per_testcase(TCName, Config) -> + emqx_common_test_helpers:end_per_testcase(?MODULE, TCName, Config). + +mk_clusters(NameA, NameB, PortA, PortB, ConfA, ConfB, Config) -> + AppsA = [{emqx_conf, ConfA}, emqx_cluster_link], + AppsA1 = [ + {emqx_conf, combine([ConfA, conf_mqtt_listener(PortA)])}, + emqx_cluster_link + ], + AppsB = [{emqx_conf, ConfB}, emqx_cluster_link], + AppsB1 = [ + {emqx_conf, combine([ConfB, conf_mqtt_listener(PortB)])}, + emqx_cluster_link + ], + + NodesA = emqx_cth_cluster:mk_nodespecs( + [ + {mk_nodename(NameA, 1), #{apps => AppsA}}, + {mk_nodename(NameA, 2), #{apps => AppsA}}, + {mk_nodename(NameA, 3), #{apps => AppsA1, role => replicant}} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + NodesB = emqx_cth_cluster:mk_nodespecs( + [ + {mk_nodename(NameB, 1), #{apps => AppsB, base_port => 20100}}, + {mk_nodename(NameB, 2), #{apps => AppsB1, base_port => 20200}} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {NodesA, NodesB}. + +t_config_update('init', Config) -> + NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]), + NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]), + LPortA = 31883, + LPortB = 41883, + ConfA = combine([conf_cluster(NameA), conf_log()]), + ConfB = combine([conf_cluster(NameB), conf_log()]), + {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config), + ClusterA = emqx_cth_cluster:start(NodesA), + ClusterB = emqx_cth_cluster:start(NodesB), + ok = snabbkaffe:start_trace(), + [ + {cluster_a, ClusterA}, + {cluster_b, ClusterB}, + {lport_a, LPortA}, + {lport_b, LPortB}, + {name_a, NameA}, + {name_b, NameB} + | Config + ]; +t_config_update('end', Config) -> + ok = snabbkaffe:stop(), + ok = emqx_cth_cluster:stop(?config(cluster_a, Config)), + ok = emqx_cth_cluster:stop(?config(cluster_b, Config)). + +t_config_update(Config) -> + [NodeA1, _, _] = ?config(cluster_a, Config), + [NodeB1, _] = ?config(cluster_b, Config), + LPortA = ?config(lport_a, Config), + LPortB = ?config(lport_b, Config), + NameA = ?config(name_a, Config), + NameB = ?config(name_b, Config), + + ClientA = start_client("t_config_a", NodeA1), + ClientB = start_client("t_config_b", NodeB1), + + {ok, _, _} = emqtt:subscribe(ClientA, <<"t/test/1/+">>, qos1), + {ok, _, _} = emqtt:subscribe(ClientB, <<"t/test-topic">>, qos1), + + %% add link + LinkConfA = #{ + <<"enable">> => true, + <<"pool_size">> => 1, + <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"upstream">> => NameB + }, + LinkConfB = #{ + <<"enable">> => true, + <<"pool_size">> => 1, + <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"upstream">> => NameA + }, + + {ok, SubRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := clink_route_bootstrap_complete}), + %% 5 nodes = 5 actors (durable storage is dsabled) + 5, + 30_000 + ), + ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])), + ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])), + + ?assertMatch( + {ok, [ + #{?snk_kind := clink_route_bootstrap_complete}, + #{?snk_kind := clink_route_bootstrap_complete}, + #{?snk_kind := clink_route_bootstrap_complete}, + #{?snk_kind := clink_route_bootstrap_complete}, + #{?snk_kind := clink_route_bootstrap_complete} + ]}, + snabbkaffe:receive_events(SubRef) + ), + + {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"hello-from-a">>, qos1), + {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"hello-from-b">>, qos1), + + ?assertReceive( + {publish, #{ + topic := <<"t/test-topic">>, payload := <<"hello-from-a">>, client_pid := ClientB + }}, + 7000 + ), + ?assertReceive( + {publish, #{ + topic := <<"t/test/1/1">>, payload := <<"hello-from-b">>, client_pid := ClientA + }}, + 7000 + ), + %% no more messages expected + ?assertNotReceive({publish, _Message = #{}}), + + {ok, SubRef1} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := clink_route_bootstrap_complete}), + %% 3 nodes in cluster a + 3, + 30_000 + ), + + %% update link + LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]}, + ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])), + + ?assertMatch( + {ok, [ + #{?snk_kind := clink_route_bootstrap_complete}, + #{?snk_kind := clink_route_bootstrap_complete}, + #{?snk_kind := clink_route_bootstrap_complete} + ]}, + snabbkaffe:receive_events(SubRef1) + ), + + %% wait for route sync on ClientA node + {{ok, _, _}, {ok, _}} = ?wait_async_action( + emqtt:subscribe(ClientA, <<"t/new/1">>, qos1), + #{?snk_kind := clink_route_sync_complete, ?snk_meta := #{node := NodeA1}}, + 10_000 + ), + + %% not expected to be received anymore + {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"not-expected-hello-from-b">>, qos1), + {ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"hello-from-b-1">>, qos1), + ?assertReceive( + {publish, #{topic := <<"t/new/1">>, payload := <<"hello-from-b-1">>, client_pid := ClientA}}, + 7000 + ), + ?assertNotReceive({publish, _Message = #{}}), + + %% disable link + LinkConfA2 = LinkConfA1#{<<"enable">> => false}, + ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA2]])), + %% must be already blocked by the receiving cluster even if externak routing state is not + %% updated yet + {ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"not-expected-hello-from-b-1">>, qos1), + + LinkConfB1 = LinkConfB#{<<"enable">> => false}, + ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB1]])), + {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"not-expected-hello-from-a">>, qos1), + + ?assertNotReceive({publish, _Message = #{}}, 3000), + + %% delete links + ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])), + ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[]])), + + ok = emqtt:stop(ClientA), + ok = emqtt:stop(ClientB). + +t_config_validations('init', Config) -> + NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]), + NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]), + LPortA = 31883, + LPortB = 41883, + ConfA = combine([conf_cluster(NameA), conf_log()]), + ConfB = combine([conf_cluster(NameB), conf_log()]), + %% Single node clusters are enough for a basic validation test + {[NodeA, _, _], [NodeB, _]} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config), + ClusterA = emqx_cth_cluster:start([NodeA]), + ClusterB = emqx_cth_cluster:start([NodeB]), + ok = snabbkaffe:start_trace(), + [ + {cluster_a, ClusterA}, + {cluster_b, ClusterB}, + {lport_a, LPortA}, + {lport_b, LPortB}, + {name_a, NameA}, + {name_b, NameB} + | Config + ]; +t_config_validations('end', Config) -> + ok = snabbkaffe:stop(), + ok = emqx_cth_cluster:stop(?config(cluster_a, Config)), + ok = emqx_cth_cluster:stop(?config(cluster_b, Config)). + +t_config_validations(Config) -> + [NodeA] = ?config(cluster_a, Config), + LPortB = ?config(lport_b, Config), + + NameB = ?config(name_b, Config), + + LinkConfA = #{ + <<"enable">> => true, + <<"pool_size">> => 1, + <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"upstream">> => NameB + }, + DuplicatedLinks = [LinkConfA, LinkConfA#{<<"enable">> => false, <<"pool_size">> => 2}], + ?assertMatch( + {error, #{reason := #{reason := duplicated_cluster_links, names := _}}}, + erpc:call(NodeA, emqx_cluster_link_config, update, [DuplicatedLinks]) + ), + + InvalidTopics = [<<"t/test/#">>, <<"$LINK/cluster/test/#">>], + InvalidTopics1 = [<<"t/+/#/+">>, <<>>], + ?assertMatch( + {error, #{reason := #{reason := invalid_topics, topics := _}}}, + erpc:call(NodeA, emqx_cluster_link_config, update, [ + [LinkConfA#{<<"topics">> => InvalidTopics}] + ]) + ), + ?assertMatch( + {error, #{reason := #{reason := invalid_topics, topics := _}}}, + erpc:call(NodeA, emqx_cluster_link_config, update, [ + [LinkConfA#{<<"topics">> => InvalidTopics1}] + ]) + ), + ?assertMatch( + {error, #{reason := required_field}}, + erpc:call(NodeA, emqx_cluster_link_config, update, [ + [maps:remove(<<"upstream">>, LinkConfA)] + ]) + ), + ?assertMatch( + {error, #{reason := required_field}}, + erpc:call(NodeA, emqx_cluster_link_config, update, [[maps:remove(<<"server">>, LinkConfA)]]) + ), + ?assertMatch( + {error, #{reason := required_field}}, + erpc:call(NodeA, emqx_cluster_link_config, update, [[maps:remove(<<"topics">>, LinkConfA)]]) + ), + + %% Some valid changes to cover different update scenarios (msg resource changed, actor changed, both changed) + ?assertMatch( + {ok, _}, + erpc:call(NodeA, emqx_cluster_link_config, update, [[LinkConfA]]) + ), + LinkConfUnknown = LinkConfA#{ + <<"upstream">> => <<"no-cluster">>, <<"server">> => <<"no-cluster.emqx:31883">> + }, + ?assertMatch( + {ok, _}, + erpc:call(NodeA, emqx_cluster_link_config, update, [ + [LinkConfA#{<<"pool_size">> => 5}, LinkConfUnknown] + ]) + ), + + ?assertMatch( + {ok, _}, + erpc:call(NodeA, emqx_cluster_link_config, update, [ + [LinkConfA, LinkConfUnknown#{<<"topics">> => []}] + ]) + ), + + ?assertMatch( + {ok, _}, + erpc:call( + NodeA, + emqx_cluster_link_config, + update, + [ + [ + LinkConfA#{ + <<"clientid">> => <<"new-client">>, + <<"username">> => <<"user">> + }, + LinkConfUnknown#{ + <<"clientid">> => <<"new-client">>, + <<"username">> => <<"user">> + } + ] + ] + ) + ). + +t_config_update_ds('init', Config) -> + NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]), + NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]), + LPortA = 31883, + LPortB = 41883, + ConfA = combine([conf_cluster(NameA), conf_log(), conf_ds()]), + ConfB = combine([conf_cluster(NameB), conf_log(), conf_ds()]), + {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config), + ClusterA = emqx_cth_cluster:start(NodesA), + ClusterB = emqx_cth_cluster:start(NodesB), + ok = snabbkaffe:start_trace(), + [ + {cluster_a, ClusterA}, + {cluster_b, ClusterB}, + {lport_a, LPortA}, + {lport_b, LPortB}, + {name_a, NameA}, + {name_b, NameB} + | Config + ]; +t_config_update_ds('end', Config) -> + ok = snabbkaffe:stop(), + ok = emqx_cth_cluster:stop(?config(cluster_a, Config)), + ok = emqx_cth_cluster:stop(?config(cluster_b, Config)). + +t_config_update_ds(Config) -> + [NodeA1, _, _] = ?config(cluster_a, Config), + [NodeB1, _] = ?config(cluster_b, Config), + LPortA = ?config(lport_a, Config), + LPortB = ?config(lport_b, Config), + NameA = ?config(name_a, Config), + NameB = ?config(name_b, Config), + + ClientA = start_client("t_config_a", NodeA1, false), + ClientB = start_client("t_config_b", NodeB1, false), + {ok, _, _} = emqtt:subscribe(ClientA, <<"t/test/1/+">>, qos1), + {ok, _, _} = emqtt:subscribe(ClientB, <<"t/test-topic">>, qos1), + + LinkConfA = #{ + <<"enable">> => true, + <<"pool_size">> => 1, + <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"upstream">> => NameB + }, + LinkConfB = #{ + <<"enable">> => true, + <<"pool_size">> => 1, + <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"upstream">> => NameA + }, + + {ok, SubRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := clink_route_bootstrap_complete}), + %% 5 nodes = 9 actors (durable storage is enabled, + %% 1 replicant node is not doing ds bootstrap) + 9, + 30_000 + ), + ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]])), + ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])), + + ?assertMatch( + [#{ps_actor_incarnation := 0}], erpc:call(NodeA1, emqx, get_config, [[cluster, links]]) + ), + ?assertMatch( + [#{ps_actor_incarnation := 0}], erpc:call(NodeB1, emqx, get_config, [[cluster, links]]) + ), + + {ok, Events} = snabbkaffe:receive_events(SubRef), + ?assertEqual(9, length(Events)), + + {ok, _} = emqtt:publish(ClientA, <<"t/test-topic">>, <<"hello-from-a">>, qos1), + {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"hello-from-b">>, qos1), + + ?assertReceive( + {publish, #{ + topic := <<"t/test-topic">>, payload := <<"hello-from-a">>, client_pid := ClientB + }}, + 30_000 + ), + ?assertReceive( + {publish, #{ + topic := <<"t/test/1/1">>, payload := <<"hello-from-b">>, client_pid := ClientA + }}, + 30_000 + ), + %% no more messages expected + ?assertNotReceive({publish, _Message = #{}}), + {ok, SubRef1} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := clink_route_bootstrap_complete}), + %% 3 nodes (1 replicant) in cluster a (5 actors including ds) + 5, + 30_000 + ), + + %% update link + + LinkConfA1 = LinkConfA#{<<"pool_size">> => 2, <<"topics">> => [<<"t/new/+">>]}, + ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA1]])), + + {ok, Events1} = snabbkaffe:receive_events(SubRef1), + ?assertEqual(5, length(Events1)), + + %% wait for route sync on ClientA node + {{ok, _, _}, {ok, _}} = ?wait_async_action( + emqtt:subscribe(ClientA, <<"t/new/1">>, qos1), + #{ + ?snk_kind := clink_route_sync_complete, + ?snk_meta := #{node := NodeA1}, + actor := {<<"ps-routes-v1">>, 1} + }, + 10_000 + ), + %% not expected to be received anymore + {ok, _} = emqtt:publish(ClientB, <<"t/test/1/1">>, <<"not-expected-hello-from-b">>, qos1), + {ok, _} = emqtt:publish(ClientB, <<"t/new/1">>, <<"hello-from-b-1">>, qos1), + ?assertReceive( + {publish, #{topic := <<"t/new/1">>, payload := <<"hello-from-b-1">>, client_pid := ClientA}}, + 30_000 + ), + ?assertNotReceive({publish, _Message = #{}}), + + ?assertMatch( + [#{ps_actor_incarnation := 1}], erpc:call(NodeA1, emqx, get_config, [[cluster, links]]) + ), + ?assertMatch( + [#{ps_actor_incarnation := 1}], erpc:call(NodeA1, emqx, get_config, [[cluster, links]]) + ), + + ok = emqtt:stop(ClientA), + ok = emqtt:stop(ClientB). + +t_misconfigured_links('init', Config) -> + NameA = fmt("~s_~s", [?FUNCTION_NAME, "a"]), + NameB = fmt("~s_~s", [?FUNCTION_NAME, "b"]), + LPortA = 31883, + LPortB = 41883, + ConfA = combine([conf_cluster(NameA), conf_log()]), + ConfB = combine([conf_cluster(NameB), conf_log()]), + {NodesA, NodesB} = mk_clusters(NameA, NameB, LPortA, LPortB, ConfA, ConfB, Config), + ClusterA = emqx_cth_cluster:start(NodesA), + ClusterB = emqx_cth_cluster:start(NodesB), + ok = snabbkaffe:start_trace(), + [ + {cluster_a, ClusterA}, + {cluster_b, ClusterB}, + {lport_a, LPortA}, + {lport_b, LPortB}, + {name_a, NameA}, + {name_b, NameB} + | Config + ]; +t_misconfigured_links('end', Config) -> + ok = snabbkaffe:stop(), + ok = emqx_cth_cluster:stop(?config(cluster_a, Config)), + ok = emqx_cth_cluster:stop(?config(cluster_b, Config)). + +t_misconfigured_links(Config) -> + [NodeA1, _, _] = ?config(cluster_a, Config), + [NodeB1, _] = ?config(cluster_b, Config), + LPortA = ?config(lport_a, Config), + LPortB = ?config(lport_b, Config), + NameA = ?config(name_a, Config), + NameB = ?config(name_b, Config), + + ClientA = start_client("t_config_a", NodeA1), + ClientB = start_client("t_config_b", NodeB1), + + {ok, _, _} = emqtt:subscribe(ClientA, <<"t/test/1/+">>, qos1), + {ok, _, _} = emqtt:subscribe(ClientB, <<"t/test-topic">>, qos1), + + LinkConfA = #{ + <<"enable">> => true, + <<"pool_size">> => 1, + <<"server">> => <<"localhost:", (integer_to_binary(LPortB))/binary>>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"upstream">> => <<"bad-b-name">> + }, + LinkConfB = #{ + <<"enable">> => true, + <<"pool_size">> => 1, + <<"server">> => <<"localhost:", (integer_to_binary(LPortA))/binary>>, + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], + <<"upstream">> => NameA + }, + + ?assertMatch({ok, _}, erpc:call(NodeB1, emqx_cluster_link_config, update, [[LinkConfB]])), + + {{ok, _}, {ok, _}} = ?wait_async_action( + erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA]]), + #{ + ?snk_kind := clink_handshake_error, + reason := <<"bad_remote_cluster_link_name">>, + ?snk_meta := #{node := NodeA1} + }, + 10_000 + ), + timer:sleep(10), + ?assertMatch( + #{error := <<"bad_remote_cluster_link_name">>}, + erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [<<"bad-b-name">>]) + ), + + {{ok, _}, {ok, _}} = ?wait_async_action( + erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]), + #{ + ?snk_kind := clink_route_bootstrap_complete, + ?snk_meta := #{node := NodeA1} + }, + 10_000 + ), + ?assertMatch( + #{status := connected, error := undefined}, + erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [NameB]) + ), + ?assertEqual( + undefined, erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [<<"bad-b-name">>]) + ), + + ?assertMatch( + {ok, _}, + erpc:call( + NodeB1, + emqx_cluster_link_config, + update, + [ + [ + LinkConfB#{<<"enable">> => false}, + %% An extra dummy link to keep B hook/external_broker registered and be able to + %% respond with "link disabled error" for the first disabled link + LinkConfB#{<<"upstream">> => <<"bad-a-name">>} + ] + ] + ) + ), + + ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])), + {{ok, _}, {ok, _}} = ?wait_async_action( + erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]), + #{ + ?snk_kind := clink_handshake_error, + reason := <<"cluster_link_disabled">>, + ?snk_meta := #{node := NodeA1} + }, + 10_000 + ), + timer:sleep(10), + ?assertMatch( + #{error := <<"cluster_link_disabled">>}, + erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [NameB]) + ), + + ?assertMatch( + {ok, _}, + erpc:call(NodeB1, emqx_cluster_link_config, update, [ + [LinkConfB#{<<"upstream">> => <<"bad-a-name">>}] + ]) + ), + ?assertMatch({ok, _}, erpc:call(NodeA1, emqx_cluster_link_config, update, [[]])), + + {{ok, _}, {ok, _}} = ?wait_async_action( + erpc:call(NodeA1, emqx_cluster_link_config, update, [[LinkConfA#{<<"upstream">> => NameB}]]), + #{ + ?snk_kind := clink_handshake_error, + reason := <<"unknown_cluster">>, + ?snk_meta := #{node := NodeA1} + }, + 10_000 + ), + timer:sleep(10), + ?assertMatch( + #{error := <<"unknown_cluster">>}, + erpc:call(NodeA1, emqx_cluster_link_router_syncer, status, [NameB]) + ), + + ok = emqtt:stop(ClientA), + ok = emqtt:stop(ClientB). + +start_client(ClientId, Node) -> + start_client(ClientId, Node, true). + +start_client(ClientId, Node, CleanStart) -> + Port = tcp_port(Node), + {ok, Client} = emqtt:start_link( + [ + {proto_ver, v5}, + {clientid, ClientId}, + {port, Port}, + {clean_start, CleanStart} + | [{properties, #{'Session-Expiry-Interval' => 300}} || CleanStart =:= false] + ] + ), + {ok, _} = emqtt:connect(Client), + Client. + +tcp_port(Node) -> + {_Host, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), + Port. + +combine([Entry | Rest]) -> + lists:foldl(fun emqx_cth_suite:merge_config/2, Entry, Rest). + +conf_mqtt_listener(LPort) when is_integer(LPort) -> + fmt("listeners.tcp.clink { bind = ~p }", [LPort]); +conf_mqtt_listener(_) -> + "". + +conf_cluster(ClusterName) -> + fmt("cluster.name = ~s", [ClusterName]). + +conf_log() -> + "log.file { enable = true, level = debug, path = node.log, supervisor_reports = progress }". + +conf_ds() -> + "durable_sessions.enable = true". + +fmt(Fmt, Args) -> + emqx_utils:format(Fmt, Args). + +mk_nodename(BaseName, Idx) -> + binary_to_atom(fmt("emqx_clink_~s_~b", [BaseName, Idx])). diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl index bb281ce4c..5bd63862a 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_extrouter_SUITE.erl @@ -123,7 +123,8 @@ t_actor_gc(_Config) -> [<<"global/#">>, <<"topic/#">>, <<"topic/42/+">>], topics_sorted() ), - _AS13 = apply_operation(heartbeat, AS12, 50_000), + _AS13 = apply_operation(heartbeat, AS12, 60_000), + ?assertEqual( 1, emqx_cluster_link_extrouter:actor_gc(env(60_000))