From 8f304d3456688cd2b7d73703898b859ea178d42b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 12 Jan 2024 16:24:52 -0300 Subject: [PATCH] test(bridge_v2_api): refactor suite to use CT matrix --- apps/emqx/test/emqx_common_test_helpers.erl | 29 +- .../test/emqx_bridge_v2_api_SUITE.erl | 1230 +++++++++-------- 2 files changed, 669 insertions(+), 590 deletions(-) diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index d9c9470eb..9438d227e 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -1389,29 +1389,40 @@ matrix_to_groups(Module, Cases) -> Cases ). -add_case_matrix(Module, Case, Acc0) -> - {RootGroup, Matrix} = Module:Case(matrix), +add_case_matrix(Module, TestCase, Acc0) -> + {MaybeRootGroup, Matrix} = + case Module:TestCase(matrix) of + {RootGroup0, Matrix0} -> + {RootGroup0, Matrix0}; + Matrix0 -> + {undefined, Matrix0} + end, lists:foldr( fun(Row, Acc) -> - add_group([RootGroup | Row], Acc, Case) + case MaybeRootGroup of + undefined -> + add_group(Row, Acc, TestCase); + RootGroup -> + add_group([RootGroup | Row], Acc, TestCase) + end end, Acc0, Matrix ). -add_group([], Acc, Case) -> - case lists:member(Case, Acc) of +add_group([], Acc, TestCase) -> + case lists:member(TestCase, Acc) of true -> Acc; false -> - [Case | Acc] + [TestCase | Acc] end; -add_group([Name | More], Acc, Cases) -> +add_group([Name | More], Acc, TestCases) -> case lists:keyfind(Name, 1, Acc) of false -> - [{Name, [], add_group(More, [], Cases)} | Acc]; + [{Name, [], add_group(More, [], TestCases)} | Acc]; {Name, [], SubGroup} -> - New = {Name, [], add_group(More, SubGroup, Cases)}, + New = {Name, [], add_group(More, SubGroup, TestCases)}, lists:keystore(Name, 1, Acc, New) end. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 0c34610ea..0ce8c620e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -24,9 +24,9 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl"). --define(ROOT, "actions"). +-define(ACTIONS_ROOT, "actions"). --define(CONNECTOR_NAME, <<"my_connector">>). +-define(ACTION_CONNECTOR_NAME, <<"my_connector">>). -define(RESOURCE(NAME, TYPE), #{ <<"enable">> => true, @@ -35,10 +35,10 @@ <<"name">> => NAME }). --define(CONNECTOR_TYPE_STR, "kafka_producer"). --define(CONNECTOR_TYPE, <>). +-define(ACTION_CONNECTOR_TYPE_STR, "kafka_producer"). +-define(ACTION_CONNECTOR_TYPE, <>). -define(KAFKA_BOOTSTRAP_HOST, <<"127.0.0.1:9092">>). --define(KAFKA_CONNECTOR(Name, BootstrapHosts), ?RESOURCE(Name, ?CONNECTOR_TYPE)#{ +-define(KAFKA_CONNECTOR(Name, BootstrapHosts), ?RESOURCE(Name, ?ACTION_CONNECTOR_TYPE)#{ <<"authentication">> => <<"none">>, <<"bootstrap_hosts">> => BootstrapHosts, <<"connect_timeout">> => <<"5s">>, @@ -53,14 +53,14 @@ } }). --define(CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). --define(CONNECTOR, ?CONNECTOR(?CONNECTOR_NAME)). +-define(ACTIONS_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). +-define(ACTIONS_CONNECTOR, ?ACTIONS_CONNECTOR(?ACTION_CONNECTOR_NAME)). -define(MQTT_LOCAL_TOPIC, <<"mqtt/local/topic">>). -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). --define(BRIDGE_TYPE_STR, "kafka_producer"). --define(BRIDGE_TYPE, <>). --define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?BRIDGE_TYPE)#{ +-define(ACTION_TYPE_STR, "kafka_producer"). +-define(ACTION_TYPE, <>). +-define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?ACTION_TYPE)#{ <<"connector">> => Connector, <<"kafka">> => #{ <<"buffer">> => #{ @@ -99,12 +99,12 @@ <<"health_check_interval">> => <<"32s">> } }). --define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)). +-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?ACTION_CONNECTOR_NAME)). -define(KAFKA_BRIDGE_UPDATE(Name, Connector), maps:without([<<"name">>, <<"type">>], ?KAFKA_BRIDGE(Name, Connector)) ). --define(KAFKA_BRIDGE_UPDATE(Name), ?KAFKA_BRIDGE_UPDATE(Name, ?CONNECTOR_NAME)). +-define(KAFKA_BRIDGE_UPDATE(Name), ?KAFKA_BRIDGE_UPDATE(Name, ?ACTION_CONNECTOR_NAME)). -define(APPSPECS, [ emqx_conf, @@ -120,34 +120,27 @@ {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ). +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + -if(?EMQX_RELEASE_EDITION == ee). %% For now we got only kafka implementing `bridge_v2` and that is enterprise only. all() -> - [ - {group, single}, - {group, cluster_later_join}, - {group, cluster} - ]. + All0 = emqx_common_test_helpers:all(?MODULE), + All = All0 -- matrix_cases(), + Groups = lists:map(fun({G, _, _}) -> {group, G} end, groups()), + Groups ++ All. -else. all() -> []. -endif. +matrix_cases() -> + emqx_common_test_helpers:all(?MODULE). + groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), - SingleOnlyTests = [ - t_bridges_probe, - t_broken_bridge_config, - t_fix_broken_bridge_config - ], - ClusterLaterJoinOnlyTCs = [ - t_cluster_later_join_metrics - ], - [ - {single, [], AllTCs -- ClusterLaterJoinOnlyTCs}, - {cluster_later_join, [], ClusterLaterJoinOnlyTCs}, - {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs} - ]. + emqx_common_test_helpers:matrix_to_groups(?MODULE, matrix_cases()). suite() -> [{timetrap, {seconds, 60}}]. @@ -164,10 +157,12 @@ init_per_group(cluster = Name, Config) -> init_per_group(cluster_later_join = Name, Config) -> Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); -init_per_group(Name, Config) -> - WorkDir = filename:join(?config(priv_dir, Config), Name), +init_per_group(single = Group, Config) -> + WorkDir = filename:join(?config(priv_dir, Config), Group), Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), - init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]). + init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]); +init_per_group(actions, Config) -> + [{bridge_kind, action} | Config]. init_api(Config) -> Node = ?config(node, Config), @@ -193,8 +188,10 @@ end_per_group(Group, Config) when Group =:= cluster_later_join -> ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)); -end_per_group(_, Config) -> +end_per_group(single, Config) -> emqx_cth_suite:stop(?config(group_apps, Config)), + ok; +end_per_group(_Group, _Config) -> ok. init_per_testcase(t_action_types, Config) -> @@ -212,7 +209,7 @@ init_per_testcase(_TestCase, Config) -> Nodes -> [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] end, - {ok, 201, _} = request(post, uri(["connectors"]), ?CONNECTOR, Config), + {ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR, Config), Config. end_per_testcase(_TestCase, Config) -> @@ -227,6 +224,10 @@ end_per_testcase(_TestCase, Config) -> ok = emqx_common_test_helpers:call_janitor(), ok. +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + -define(CONNECTOR_IMPL, emqx_bridge_v2_dummy_connector). init_mocks() -> case emqx_release:edition() of @@ -243,7 +244,7 @@ init_mocks() -> ?CONNECTOR_IMPL, on_start, fun - (<<"connector:", ?CONNECTOR_TYPE_STR, ":bad_", _/binary>>, _C) -> + (<<"connector:", ?ACTION_CONNECTOR_TYPE_STR, ":bad_", _/binary>>, _C) -> {ok, bad_connector_state}; (_I, _C) -> {ok, connector_state} @@ -280,442 +281,6 @@ clear_resources() -> emqx_connector:list() ). -%%------------------------------------------------------------------------------ -%% Testcases -%%------------------------------------------------------------------------------ - -%% We have to pretend testing a kafka bridge since at this point that's the -%% only one that's implemented. - -t_bridges_lifecycle(Config) -> - %% assert we there's no bridges at first - {ok, 200, []} = request_json(get, uri([?ROOT]), Config), - - {ok, 404, _} = request(get, uri([?ROOT, "foo"]), Config), - {ok, 404, _} = request(get, uri([?ROOT, "kafka_producer:foo"]), Config), - - %% need a var for patterns below - BridgeName = ?BRIDGE_NAME, - ?assertMatch( - {ok, 201, #{ - <<"type">> := ?BRIDGE_TYPE, - <<"name">> := BridgeName, - <<"enable">> := true, - <<"status">> := <<"connected">>, - <<"node_status">> := [_ | _], - <<"connector">> := ?CONNECTOR_NAME, - <<"parameters">> := #{}, - <<"local_topic">> := _, - <<"resource_opts">> := _ - }}, - request_json( - post, - uri([?ROOT]), - ?KAFKA_BRIDGE(?BRIDGE_NAME), - Config - ) - ), - - %% list all bridges, assert bridge is in it - ?assertMatch( - {ok, 200, [ - #{ - <<"type">> := ?BRIDGE_TYPE, - <<"name">> := BridgeName, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _] - } - ]}, - request_json(get, uri([?ROOT]), Config) - ), - - %% list all bridges, assert bridge is in it - ?assertMatch( - {ok, 200, [ - #{ - <<"type">> := ?BRIDGE_TYPE, - <<"name">> := BridgeName, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _] - } - ]}, - request_json(get, uri([?ROOT]), Config) - ), - - %% get the bridge by id - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), - ?assertMatch( - {ok, 200, #{ - <<"type">> := ?BRIDGE_TYPE, - <<"name">> := BridgeName, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _] - }}, - request_json(get, uri([?ROOT, BridgeID]), Config) - ), - - ?assertMatch( - {ok, 400, #{ - <<"code">> := <<"BAD_REQUEST">>, - <<"message">> := _ - }}, - request_json(post, uri([?ROOT, BridgeID, "brababbel"]), Config) - ), - - %% update bridge config - {ok, 201, _} = request(post, uri(["connectors"]), ?CONNECTOR(<<"foobla">>), Config), - ?assertMatch( - {ok, 200, #{ - <<"type">> := ?BRIDGE_TYPE, - <<"name">> := BridgeName, - <<"connector">> := <<"foobla">>, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _] - }}, - request_json( - put, - uri([?ROOT, BridgeID]), - ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"foobla">>), - Config - ) - ), - - %% update bridge with unknown connector name - {ok, 400, #{ - <<"code">> := <<"BAD_REQUEST">>, - <<"message">> := Message1 - }} = - request_json( - put, - uri([?ROOT, BridgeID]), - ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"does_not_exist">>), - Config - ), - ?assertMatch( - #{<<"reason">> := <<"connector_not_found_or_wrong_type">>}, - emqx_utils_json:decode(Message1) - ), - - %% update bridge with connector of wrong type - {ok, 201, _} = - request( - post, - uri(["connectors"]), - (?CONNECTOR(<<"foobla2">>))#{ - <<"type">> => <<"azure_event_hub_producer">>, - <<"authentication">> => #{ - <<"username">> => <<"emqxuser">>, - <<"password">> => <<"topSecret">>, - <<"mechanism">> => <<"plain">> - }, - <<"ssl">> => #{ - <<"enable">> => true, - <<"server_name_indication">> => <<"auto">>, - <<"verify">> => <<"verify_none">>, - <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] - } - }, - Config - ), - {ok, 400, #{ - <<"code">> := <<"BAD_REQUEST">>, - <<"message">> := Message2 - }} = - request_json( - put, - uri([?ROOT, BridgeID]), - ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"foobla2">>), - Config - ), - ?assertMatch( - #{<<"reason">> := <<"connector_not_found_or_wrong_type">>}, - emqx_utils_json:decode(Message2) - ), - - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config), - {ok, 200, []} = request_json(get, uri([?ROOT]), Config), - - %% try create with unknown connector name - {ok, 400, #{ - <<"code">> := <<"BAD_REQUEST">>, - <<"message">> := Message3 - }} = - request_json( - post, - uri([?ROOT]), - ?KAFKA_BRIDGE(?BRIDGE_NAME, <<"does_not_exist">>), - Config - ), - ?assertMatch( - #{<<"reason">> := <<"connector_not_found_or_wrong_type">>}, - emqx_utils_json:decode(Message3) - ), - - %% try create bridge with connector of wrong type - {ok, 400, #{ - <<"code">> := <<"BAD_REQUEST">>, - <<"message">> := Message4 - }} = - request_json( - post, - uri([?ROOT]), - ?KAFKA_BRIDGE(?BRIDGE_NAME, <<"foobla2">>), - Config - ), - ?assertMatch( - #{<<"reason">> := <<"connector_not_found_or_wrong_type">>}, - emqx_utils_json:decode(Message4) - ), - - %% make sure nothing has been created above - {ok, 200, []} = request_json(get, uri([?ROOT]), Config), - - %% update a deleted bridge returns an error - ?assertMatch( - {ok, 404, #{ - <<"code">> := <<"NOT_FOUND">>, - <<"message">> := _ - }}, - request_json( - put, - uri([?ROOT, BridgeID]), - ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME), - Config - ) - ), - - %% deleting a non-existing bridge should result in an error - ?assertMatch( - {ok, 404, #{ - <<"code">> := <<"NOT_FOUND">>, - <<"message">> := _ - }}, - request_json(delete, uri([?ROOT, BridgeID]), Config) - ), - - %% try delete unknown bridge id - ?assertMatch( - {ok, 404, #{ - <<"code">> := <<"NOT_FOUND">>, - <<"message">> := <<"Invalid bridge ID", _/binary>> - }}, - request_json(delete, uri([?ROOT, "foo"]), Config) - ), - - %% Try create bridge with bad characters as name - {ok, 400, _} = request(post, uri([?ROOT]), ?KAFKA_BRIDGE(<<"隋达"/utf8>>), Config), - {ok, 400, _} = request(post, uri([?ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config), - ok. - -t_broken_bridge_config(Config) -> - emqx_cth_suite:stop_apps([emqx_bridge]), - BridgeName = ?BRIDGE_NAME, - StartOps = - #{ - config => - "actions {\n" - " " - ?BRIDGE_TYPE_STR - " {\n" - " " ++ binary_to_list(BridgeName) ++ - " {\n" - " connector = does_not_exist\n" - " enable = true\n" - " kafka {\n" - " topic = test-topic-one-partition\n" - " }\n" - " local_topic = \"mqtt/local/topic\"\n" - " resource_opts {health_check_interval = 32s}\n" - " }\n" - " }\n" - "}\n" - "\n", - schema_mod => emqx_bridge_v2_schema - }, - emqx_cth_suite:start_app(emqx_bridge, StartOps), - - ?assertMatch( - {ok, 200, [ - #{ - <<"name">> := BridgeName, - <<"type">> := ?BRIDGE_TYPE, - <<"connector">> := <<"does_not_exist">>, - <<"status">> := <<"disconnected">>, - <<"error">> := <<"Not installed">> - } - ]}, - request_json(get, uri([?ROOT]), Config) - ), - - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), - ?assertEqual( - {ok, 204, <<>>}, - request(delete, uri([?ROOT, BridgeID]), Config) - ), - - ?assertEqual( - {ok, 200, []}, - request_json(get, uri([?ROOT]), Config) - ), - - ok. - -t_fix_broken_bridge_config(Config) -> - emqx_cth_suite:stop_apps([emqx_bridge]), - BridgeName = ?BRIDGE_NAME, - StartOps = - #{ - config => - "actions {\n" - " " - ?BRIDGE_TYPE_STR - " {\n" - " " ++ binary_to_list(BridgeName) ++ - " {\n" - " connector = does_not_exist\n" - " enable = true\n" - " kafka {\n" - " topic = test-topic-one-partition\n" - " }\n" - " local_topic = \"mqtt/local/topic\"\n" - " resource_opts {health_check_interval = 32s}\n" - " }\n" - " }\n" - "}\n" - "\n", - schema_mod => emqx_bridge_v2_schema - }, - emqx_cth_suite:start_app(emqx_bridge, StartOps), - - ?assertMatch( - {ok, 200, [ - #{ - <<"name">> := BridgeName, - <<"type">> := ?BRIDGE_TYPE, - <<"connector">> := <<"does_not_exist">>, - <<"status">> := <<"disconnected">>, - <<"error">> := <<"Not installed">> - } - ]}, - request_json(get, uri([?ROOT]), Config) - ), - - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), - request_json( - put, - uri([?ROOT, BridgeID]), - ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, ?CONNECTOR_NAME), - Config - ), - - ?assertMatch( - {ok, 200, #{ - <<"connector">> := ?CONNECTOR_NAME, - <<"status">> := <<"connected">> - }}, - request_json(get, uri([?ROOT, BridgeID]), Config) - ), - - ok. - -t_start_bridge_unknown_node(Config) -> - {ok, 404, _} = - request( - post, - uri(["nodes", "thisbetterbenotanatomyet", ?ROOT, "kafka_producer:foo", start]), - Config - ), - {ok, 404, _} = - request( - post, - uri(["nodes", "undefined", ?ROOT, "kafka_producer:foo", start]), - Config - ). - -t_start_bridge_node(Config) -> - do_start_bridge(node, Config). - -t_start_bridge_cluster(Config) -> - do_start_bridge(cluster, Config). - -do_start_bridge(TestType, Config) -> - %% assert we there's no bridges at first - {ok, 200, []} = request_json(get, uri([?ROOT]), Config), - - Name = atom_to_binary(TestType), - ?assertMatch( - {ok, 201, #{ - <<"type">> := ?BRIDGE_TYPE, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := <<"connected">>, - <<"node_status">> := [_ | _] - }}, - request_json( - post, - uri([?ROOT]), - ?KAFKA_BRIDGE(Name), - Config - ) - ), - - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), - - %% start again - {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), - ?assertMatch( - {ok, 200, #{<<"status">> := <<"connected">>}}, - request_json(get, uri([?ROOT, BridgeID]), Config) - ), - %% start a started bridge - {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), - ?assertMatch( - {ok, 200, #{<<"status">> := <<"connected">>}}, - request_json(get, uri([?ROOT, BridgeID]), Config) - ), - - {ok, 400, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config), - - %% Make start bridge fail - expect_on_all_nodes( - ?CONNECTOR_IMPL, - on_add_channel, - fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end, - Config - ), - - connector_operation(Config, ?BRIDGE_TYPE, ?CONNECTOR_NAME, stop), - connector_operation(Config, ?BRIDGE_TYPE, ?CONNECTOR_NAME, start), - - {ok, 400, _} = request(post, {operation, TestType, start, BridgeID}, Config), - - %% Make start bridge succeed - - expect_on_all_nodes( - ?CONNECTOR_IMPL, - on_add_channel, - fun(_, _, _ResId, _Channel) -> {ok, connector_state} end, - Config - ), - - %% try to start again - {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), - - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config), - {ok, 200, []} = request_json(get, uri([?ROOT]), Config), - - %% Fail parse-id check - {ok, 404, _} = request(post, {operation, TestType, start, <<"wreckbook_fugazi">>}, Config), - %% Looks ok but doesn't exist - {ok, 404, _} = request(post, {operation, TestType, start, <<"webhook:cptn_hook">>}, Config), - ok. - expect_on_all_nodes(Mod, Function, Fun, Config) -> case ?config(cluster_nodes, Config) of undefined -> @@ -751,6 +316,548 @@ connector_operation(Config, ConnectorType, ConnectorName, OperationName) -> ok = emqx_connector_resource:OperationName(ConnectorType, ConnectorName) end. +listen_on_random_port() -> + SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], + case gen_tcp:listen(0, SockOpts) of + {ok, Sock} -> + {ok, Port} = inet:port(Sock), + {Port, Sock}; + {error, Reason} when Reason /= eaddrinuse -> + {error, Reason} + end. + +request(Method, URL, Config) -> + request(Method, URL, [], Config). + +request(Method, {operation, Type, Op, BridgeID}, Body, Config) -> + URL = operation_path(Type, Op, BridgeID, Config), + request(Method, URL, Body, Config); +request(Method, URL, Body, Config) -> + AuthHeader = emqx_common_test_http:auth_header(?config(api_key, Config)), + Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, + emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts). + +request(Method, URL, Body, Decoder, Config) -> + case request(Method, URL, Body, Config) of + {ok, Code, Response} -> + case Decoder(Response) of + {error, _} = Error -> Error; + Decoded -> {ok, Code, Decoded} + end; + Otherwise -> + Otherwise + end. + +request_json(Method, URLLike, Config) -> + request(Method, URLLike, [], fun json/1, Config). + +request_json(Method, URLLike, Body, Config) -> + request(Method, URLLike, Body, fun json/1, Config). + +operation_path(node, Oper, BridgeID, Config) -> + uri(["nodes", ?config(node, Config), ?ACTIONS_ROOT, BridgeID, Oper]); +operation_path(cluster, Oper, BridgeID, _Config) -> + uri([?ACTIONS_ROOT, BridgeID, Oper]). + +enable_path(Enable, BridgeID) -> + uri([?ACTIONS_ROOT, BridgeID, "enable", Enable]). + +publish_message(Topic, Body, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]). + +update_config(Path, Value, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx, update_config, [Path, Value]). + +get_raw_config(Path, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx, get_raw_config, [Path]). + +add_user_auth(Chain, AuthenticatorID, User, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx_authentication, add_user, [Chain, AuthenticatorID, User]). + +delete_user_auth(Chain, AuthenticatorID, User, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx_authentication, delete_user, [Chain, AuthenticatorID, User]). + +str(S) when is_list(S) -> S; +str(S) when is_binary(S) -> binary_to_list(S). + +json(B) when is_binary(B) -> + case emqx_utils_json:safe_decode(B, [return_maps]) of + {ok, Term} -> + Term; + {error, Reason} = Error -> + ct:pal("Failed to decode json: ~p~n~p", [Reason, B]), + Error + end. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +%% We have to pretend testing a kafka bridge since at this point that's the +%% only one that's implemented. + +t_bridges_lifecycle(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; +t_bridges_lifecycle(Config) -> + %% assert we there's no bridges at first + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), + + {ok, 404, _} = request(get, uri([?ACTIONS_ROOT, "foo"]), Config), + {ok, 404, _} = request(get, uri([?ACTIONS_ROOT, "kafka_producer:foo"]), Config), + + %% need a var for patterns below + BridgeName = ?BRIDGE_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?ACTION_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"connector">> := ?ACTION_CONNECTOR_NAME, + <<"parameters">> := #{}, + <<"local_topic">> := _, + <<"resource_opts">> := _ + }}, + request_json( + post, + uri([?ACTIONS_ROOT]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ) + ), + + %% list all bridges, assert bridge is in it + ?assertMatch( + {ok, 200, [ + #{ + <<"type">> := ?ACTION_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + } + ]}, + request_json(get, uri([?ACTIONS_ROOT]), Config) + ), + + %% list all bridges, assert bridge is in it + ?assertMatch( + {ok, 200, [ + #{ + <<"type">> := ?ACTION_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + } + ]}, + request_json(get, uri([?ACTIONS_ROOT]), Config) + ), + + %% get the bridge by id + BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ?BRIDGE_NAME), + ?assertMatch( + {ok, 200, #{ + <<"type">> := ?ACTION_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + }}, + request_json(get, uri([?ACTIONS_ROOT, BridgeID]), Config) + ), + + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := _ + }}, + request_json(post, uri([?ACTIONS_ROOT, BridgeID, "brababbel"]), Config) + ), + + %% update bridge config + {ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR(<<"foobla">>), Config), + ?assertMatch( + {ok, 200, #{ + <<"type">> := ?ACTION_TYPE, + <<"name">> := BridgeName, + <<"connector">> := <<"foobla">>, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + }}, + request_json( + put, + uri([?ACTIONS_ROOT, BridgeID]), + ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"foobla">>), + Config + ) + ), + + %% update bridge with unknown connector name + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := Message1 + }} = + request_json( + put, + uri([?ACTIONS_ROOT, BridgeID]), + ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"does_not_exist">>), + Config + ), + ?assertMatch( + #{<<"reason">> := <<"connector_not_found_or_wrong_type">>}, + emqx_utils_json:decode(Message1) + ), + + %% update bridge with connector of wrong type + {ok, 201, _} = + request( + post, + uri(["connectors"]), + (?ACTIONS_CONNECTOR(<<"foobla2">>))#{ + <<"type">> => <<"azure_event_hub_producer">>, + <<"authentication">> => #{ + <<"username">> => <<"emqxuser">>, + <<"password">> => <<"topSecret">>, + <<"mechanism">> => <<"plain">> + }, + <<"ssl">> => #{ + <<"enable">> => true, + <<"server_name_indication">> => <<"auto">>, + <<"verify">> => <<"verify_none">>, + <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] + } + }, + Config + ), + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := Message2 + }} = + request_json( + put, + uri([?ACTIONS_ROOT, BridgeID]), + ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"foobla2">>), + Config + ), + ?assertMatch( + #{<<"reason">> := <<"connector_not_found_or_wrong_type">>}, + emqx_utils_json:decode(Message2) + ), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri([?ACTIONS_ROOT, BridgeID]), Config), + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), + + %% try create with unknown connector name + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := Message3 + }} = + request_json( + post, + uri([?ACTIONS_ROOT]), + ?KAFKA_BRIDGE(?BRIDGE_NAME, <<"does_not_exist">>), + Config + ), + ?assertMatch( + #{<<"reason">> := <<"connector_not_found_or_wrong_type">>}, + emqx_utils_json:decode(Message3) + ), + + %% try create bridge with connector of wrong type + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := Message4 + }} = + request_json( + post, + uri([?ACTIONS_ROOT]), + ?KAFKA_BRIDGE(?BRIDGE_NAME, <<"foobla2">>), + Config + ), + ?assertMatch( + #{<<"reason">> := <<"connector_not_found_or_wrong_type">>}, + emqx_utils_json:decode(Message4) + ), + + %% make sure nothing has been created above + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), + + %% update a deleted bridge returns an error + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := _ + }}, + request_json( + put, + uri([?ACTIONS_ROOT, BridgeID]), + ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME), + Config + ) + ), + + %% deleting a non-existing bridge should result in an error + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := _ + }}, + request_json(delete, uri([?ACTIONS_ROOT, BridgeID]), Config) + ), + + %% try delete unknown bridge id + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Invalid bridge ID", _/binary>> + }}, + request_json(delete, uri([?ACTIONS_ROOT, "foo"]), Config) + ), + + %% Try create bridge with bad characters as name + {ok, 400, _} = request(post, uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(<<"隋达"/utf8>>), Config), + {ok, 400, _} = request(post, uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config), + ok. + +t_broken_bridge_config(matrix) -> + [ + [single, actions] + ]; +t_broken_bridge_config(Config) -> + emqx_cth_suite:stop_apps([emqx_bridge]), + BridgeName = ?BRIDGE_NAME, + StartOps = + #{ + config => + "actions {\n" + " " + ?ACTION_TYPE_STR + " {\n" + " " ++ binary_to_list(BridgeName) ++ + " {\n" + " connector = does_not_exist\n" + " enable = true\n" + " kafka {\n" + " topic = test-topic-one-partition\n" + " }\n" + " local_topic = \"mqtt/local/topic\"\n" + " resource_opts {health_check_interval = 32s}\n" + " }\n" + " }\n" + "}\n" + "\n", + schema_mod => emqx_bridge_v2_schema + }, + emqx_cth_suite:start_app(emqx_bridge, StartOps), + + ?assertMatch( + {ok, 200, [ + #{ + <<"name">> := BridgeName, + <<"type">> := ?ACTION_TYPE, + <<"connector">> := <<"does_not_exist">>, + <<"status">> := <<"disconnected">>, + <<"error">> := <<"Not installed">> + } + ]}, + request_json(get, uri([?ACTIONS_ROOT]), Config) + ), + + BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ?BRIDGE_NAME), + ?assertEqual( + {ok, 204, <<>>}, + request(delete, uri([?ACTIONS_ROOT, BridgeID]), Config) + ), + + ?assertEqual( + {ok, 200, []}, + request_json(get, uri([?ACTIONS_ROOT]), Config) + ), + + ok. + +t_fix_broken_bridge_config(matrix) -> + [ + [single, actions] + ]; +t_fix_broken_bridge_config(Config) -> + emqx_cth_suite:stop_apps([emqx_bridge]), + BridgeName = ?BRIDGE_NAME, + StartOps = + #{ + config => + "actions {\n" + " " + ?ACTION_TYPE_STR + " {\n" + " " ++ binary_to_list(BridgeName) ++ + " {\n" + " connector = does_not_exist\n" + " enable = true\n" + " kafka {\n" + " topic = test-topic-one-partition\n" + " }\n" + " local_topic = \"mqtt/local/topic\"\n" + " resource_opts {health_check_interval = 32s}\n" + " }\n" + " }\n" + "}\n" + "\n", + schema_mod => emqx_bridge_v2_schema + }, + emqx_cth_suite:start_app(emqx_bridge, StartOps), + + ?assertMatch( + {ok, 200, [ + #{ + <<"name">> := BridgeName, + <<"type">> := ?ACTION_TYPE, + <<"connector">> := <<"does_not_exist">>, + <<"status">> := <<"disconnected">>, + <<"error">> := <<"Not installed">> + } + ]}, + request_json(get, uri([?ACTIONS_ROOT]), Config) + ), + + BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ?BRIDGE_NAME), + request_json( + put, + uri([?ACTIONS_ROOT, BridgeID]), + ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, ?ACTION_CONNECTOR_NAME), + Config + ), + + ?assertMatch( + {ok, 200, #{ + <<"connector">> := ?ACTION_CONNECTOR_NAME, + <<"status">> := <<"connected">> + }}, + request_json(get, uri([?ACTIONS_ROOT, BridgeID]), Config) + ), + + ok. + +t_start_bridge_unknown_node(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; +t_start_bridge_unknown_node(Config) -> + {ok, 404, _} = + request( + post, + uri(["nodes", "thisbetterbenotanatomyet", ?ACTIONS_ROOT, "kafka_producer:foo", start]), + Config + ), + {ok, 404, _} = + request( + post, + uri(["nodes", "undefined", ?ACTIONS_ROOT, "kafka_producer:foo", start]), + Config + ). + +t_start_bridge_node(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; +t_start_bridge_node(Config) -> + do_start_bridge(node, Config). + +t_start_bridge_cluster(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; +t_start_bridge_cluster(Config) -> + do_start_bridge(cluster, Config). + +do_start_bridge(TestType, Config) -> + %% assert we there's no bridges at first + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), + + Name = atom_to_binary(TestType), + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?ACTION_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri([?ACTIONS_ROOT]), + ?KAFKA_BRIDGE(Name), + Config + ) + ), + + BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, Name), + + %% start again + {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri([?ACTIONS_ROOT, BridgeID]), Config) + ), + %% start a started bridge + {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri([?ACTIONS_ROOT, BridgeID]), Config) + ), + + {ok, 400, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config), + + %% Make start bridge fail + expect_on_all_nodes( + ?CONNECTOR_IMPL, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {error, <<"my_error">>} end, + Config + ), + + connector_operation(Config, ?ACTION_TYPE, ?ACTION_CONNECTOR_NAME, stop), + connector_operation(Config, ?ACTION_TYPE, ?ACTION_CONNECTOR_NAME, start), + + {ok, 400, _} = request(post, {operation, TestType, start, BridgeID}, Config), + + %% Make start bridge succeed + + expect_on_all_nodes( + ?CONNECTOR_IMPL, + on_add_channel, + fun(_, _, _ResId, _Channel) -> {ok, connector_state} end, + Config + ), + + %% try to start again + {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri([?ACTIONS_ROOT, BridgeID]), Config), + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), + + %% Fail parse-id check + {ok, 404, _} = request(post, {operation, TestType, start, <<"wreckbook_fugazi">>}, Config), + %% Looks ok but doesn't exist + {ok, 404, _} = request(post, {operation, TestType, start, <<"webhook:cptn_hook">>}, Config), + ok. + %% t_start_stop_inconsistent_bridge_node(Config) -> %% start_stop_inconsistent_bridge(node, Config). @@ -861,6 +968,10 @@ connector_operation(Config, ConnectorType, ConnectorName, OperationName) -> %% {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config), %% {ok, 200, []} = request_json(get, uri([?ROOT]), Config). +t_bridges_probe(matrix) -> + [ + [single, actions] + ]; t_bridges_probe(Config) -> {ok, 204, <<>>} = request( post, @@ -905,15 +1016,20 @@ t_bridges_probe(Config) -> ), ok. +t_cascade_delete_actions(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; t_cascade_delete_actions(Config) -> %% assert we there's no bridges at first - {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), %% then we add a a bridge, using POST %% POST /actions/ will create a bridge - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ?BRIDGE_NAME), {ok, 201, _} = request( post, - uri([?ROOT]), + uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(?BRIDGE_NAME), Config ), @@ -931,10 +1047,10 @@ t_cascade_delete_actions(Config) -> %% delete the bridge will also delete the actions from the rules {ok, 204, _} = request( delete, - uri([?ROOT, BridgeID]) ++ "?also_delete_dep_actions=true", + uri([?ACTIONS_ROOT, BridgeID]) ++ "?also_delete_dep_actions=true", Config ), - {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), ?assertMatch( {ok, 200, #{<<"actions">> := []}}, request_json(get, uri(["rules", RuleId]), Config) @@ -943,7 +1059,7 @@ t_cascade_delete_actions(Config) -> {ok, 201, _} = request( post, - uri([?ROOT]), + uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(?BRIDGE_NAME), Config ), @@ -960,19 +1076,24 @@ t_cascade_delete_actions(Config) -> ), {ok, 400, Body} = request( delete, - uri([?ROOT, BridgeID]), + uri([?ACTIONS_ROOT, BridgeID]), Config ), ?assertMatch(#{<<"rules">> := [_ | _]}, emqx_utils_json:decode(Body, [return_maps])), - {ok, 200, [_]} = request_json(get, uri([?ROOT]), Config), + {ok, 200, [_]} = request_json(get, uri([?ACTIONS_ROOT]), Config), %% Cleanup {ok, 204, _} = request( delete, - uri([?ROOT, BridgeID]) ++ "?also_delete_dep_actions=true", + uri([?ACTIONS_ROOT, BridgeID]) ++ "?also_delete_dep_actions=true", Config ), - {ok, 200, []} = request_json(get, uri([?ROOT]), Config). + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config). +t_action_types(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; t_action_types(Config) -> Res = request_json(get, uri(["action_types"]), Config), ?assertMatch({ok, 200, _}, Res), @@ -981,11 +1102,16 @@ t_action_types(Config) -> ?assert(lists:all(fun is_binary/1, Types), #{types => Types}), ok. +t_bad_name(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; t_bad_name(Config) -> Name = <<"_bad_name">>, Res = request_json( post, - uri([?ROOT]), + uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(Name), Config ), @@ -1001,31 +1127,36 @@ t_bad_name(Config) -> ), ok. +t_metrics(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; t_metrics(Config) -> - {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), ActionName = ?BRIDGE_NAME, ?assertMatch( {ok, 201, _}, request_json( post, - uri([?ROOT]), + uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(?BRIDGE_NAME), Config ) ), - ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName), + ActionID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ActionName), ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"matched">> := 0}, <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 0}} | _] }}, - request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config) ), - {ok, 200, Bridge} = request_json(get, uri([?ROOT, ActionID]), Config), + {ok, 200, Bridge} = request_json(get, uri([?ACTIONS_ROOT, ActionID]), Config), ?assertNot(maps:is_key(<<"metrics">>, Bridge)), ?assertNot(maps:is_key(<<"node_metrics">>, Bridge)), @@ -1041,12 +1172,12 @@ t_metrics(Config) -> <<"metrics">> := #{<<"matched">> := 1}, <<"node_metrics">> := [#{<<"metrics">> := #{<<"matched">> := 1}} | _] }}, - request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config) ) ), %% check for absence of metrics when listing all bridges - {ok, 200, Bridges} = request_json(get, uri([?ROOT]), Config), + {ok, 200, Bridges} = request_json(get, uri([?ACTIONS_ROOT]), Config), ?assertNotMatch( [ #{ @@ -1058,21 +1189,26 @@ t_metrics(Config) -> ), ok. +t_reset_metrics(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; t_reset_metrics(Config) -> %% assert there's no bridges at first - {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), ActionName = ?BRIDGE_NAME, ?assertMatch( {ok, 201, _}, request_json( post, - uri([?ROOT]), + uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(?BRIDGE_NAME), Config ) ), - ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName), + ActionID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ActionName), Body = <<"my msg">>, _ = publish_message(?MQTT_LOCAL_TOPIC, Body, Config), @@ -1084,11 +1220,11 @@ t_reset_metrics(Config) -> <<"metrics">> := #{<<"matched">> := 1}, <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] }}, - request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config) ) ), - {ok, 204, <<>>} = request(put, uri([?ROOT, ActionID, "metrics", "reset"]), Config), + {ok, 204, <<>>} = request(put, uri([?ACTIONS_ROOT, ActionID, "metrics", "reset"]), Config), ?retry( _Sleep0 = 200, @@ -1098,28 +1234,34 @@ t_reset_metrics(Config) -> <<"metrics">> := #{<<"matched">> := 0}, <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] }}, - request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config) ) ), ok. +t_cluster_later_join_metrics(matrix) -> + [ + [cluster_later_join, actions] + ]; t_cluster_later_join_metrics(Config) -> [PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config), Name = ?BRIDGE_NAME, ActionParams = ?KAFKA_BRIDGE(Name), - ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + ActionID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, Name), ?check_trace( begin %% Create a bridge on only one of the nodes. - ?assertMatch({ok, 201, _}, request_json(post, uri([?ROOT]), ActionParams, Config)), + ?assertMatch( + {ok, 201, _}, request_json(post, uri([?ACTIONS_ROOT]), ActionParams, Config) + ), %% Pre-condition. ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"success">> := _}, <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] }}, - request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config) ), %% Now join the other node join with the api node. ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]), @@ -1130,7 +1272,7 @@ t_cluster_later_join_metrics(Config) -> <<"metrics">> := #{<<"success">> := _}, <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _] }}, - request_json(get, uri([?ROOT, ActionID, "metrics"]), Config) + request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config) ), ok end, @@ -1138,94 +1280,20 @@ t_cluster_later_join_metrics(Config) -> ), ok. +t_raw_config_response_defaults(matrix) -> + [ + [single, actions], + [cluster, actions] + ]; t_raw_config_response_defaults(Config) -> Params = maps:remove(<<"enable">>, ?KAFKA_BRIDGE(?BRIDGE_NAME)), ?assertMatch( {ok, 201, #{<<"enable">> := true}}, request_json( post, - uri([?ROOT]), + uri([?ACTIONS_ROOT]), Params, Config ) ), ok. - -%%% helpers -listen_on_random_port() -> - SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], - case gen_tcp:listen(0, SockOpts) of - {ok, Sock} -> - {ok, Port} = inet:port(Sock), - {Port, Sock}; - {error, Reason} when Reason /= eaddrinuse -> - {error, Reason} - end. - -request(Method, URL, Config) -> - request(Method, URL, [], Config). - -request(Method, {operation, Type, Op, BridgeID}, Body, Config) -> - URL = operation_path(Type, Op, BridgeID, Config), - request(Method, URL, Body, Config); -request(Method, URL, Body, Config) -> - AuthHeader = emqx_common_test_http:auth_header(?config(api_key, Config)), - Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, - emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts). - -request(Method, URL, Body, Decoder, Config) -> - case request(Method, URL, Body, Config) of - {ok, Code, Response} -> - case Decoder(Response) of - {error, _} = Error -> Error; - Decoded -> {ok, Code, Decoded} - end; - Otherwise -> - Otherwise - end. - -request_json(Method, URLLike, Config) -> - request(Method, URLLike, [], fun json/1, Config). - -request_json(Method, URLLike, Body, Config) -> - request(Method, URLLike, Body, fun json/1, Config). - -operation_path(node, Oper, BridgeID, Config) -> - uri(["nodes", ?config(node, Config), ?ROOT, BridgeID, Oper]); -operation_path(cluster, Oper, BridgeID, _Config) -> - uri([?ROOT, BridgeID, Oper]). - -enable_path(Enable, BridgeID) -> - uri([?ROOT, BridgeID, "enable", Enable]). - -publish_message(Topic, Body, Config) -> - Node = ?config(node, Config), - erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]). - -update_config(Path, Value, Config) -> - Node = ?config(node, Config), - erpc:call(Node, emqx, update_config, [Path, Value]). - -get_raw_config(Path, Config) -> - Node = ?config(node, Config), - erpc:call(Node, emqx, get_raw_config, [Path]). - -add_user_auth(Chain, AuthenticatorID, User, Config) -> - Node = ?config(node, Config), - erpc:call(Node, emqx_authentication, add_user, [Chain, AuthenticatorID, User]). - -delete_user_auth(Chain, AuthenticatorID, User, Config) -> - Node = ?config(node, Config), - erpc:call(Node, emqx_authentication, delete_user, [Chain, AuthenticatorID, User]). - -str(S) when is_list(S) -> S; -str(S) when is_binary(S) -> binary_to_list(S). - -json(B) when is_binary(B) -> - case emqx_utils_json:safe_decode(B, [return_maps]) of - {ok, Term} -> - Term; - {error, Reason} = Error -> - ct:pal("Failed to decode json: ~p~n~p", [Reason, B]), - Error - end.