From ee8e4690860200e86ab5b2baf6ac2582e904d325 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 11 Oct 2023 17:25:45 +0200 Subject: [PATCH] test: port all remaining tests --- .../test/emqx_connector_api_SUITE.erl | 914 +++++++----------- 1 file changed, 356 insertions(+), 558 deletions(-) diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index e4ead6d68..cb2bd906b 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -32,8 +32,10 @@ <<"name">> => NAME }). --define(CONNECTOR_TYPE_KAFKA, <<"kafka">>). --define(KAFKA_CONNECTOR(Name, BootstrapHosts), ?CONNECTOR(Name, ?CONNECTOR_TYPE_KAFKA)#{ +-define(CONNECTOR_TYPE_STR, "kafka"). +-define(CONNECTOR_TYPE, <>). +-define(KAFKA_BOOTSTRAP_HOST, <<"127.0.0.1:9092">>). +-define(KAFKA_CONNECTOR(Name, BootstrapHosts), ?CONNECTOR(Name, ?CONNECTOR_TYPE)#{ <<"authentication">> => <<"none">>, <<"bootstrap_hosts">> => BootstrapHosts, <<"connect_timeout">> => <<"5s">>, @@ -47,6 +49,7 @@ <<"tcp_keepalive">> => <<"none">> } }). +-define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). %% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>). %% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{ @@ -104,16 +107,15 @@ all() -> [ - %, - {group, single} + {group, single}, %{group, cluster_later_join}, - %{group, cluster} + {group, cluster} ]. groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), SingleOnlyTests = [ - t_connector_lifecycle + t_connectors_probe ], ClusterLaterJoinOnlyTCs = [ % t_cluster_later_join_metrics @@ -136,18 +138,18 @@ end_per_suite(_Config) -> init_per_group(cluster = Name, Config) -> Nodes = [NodePrimary | _] = mk_cluster(Name, Config), init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); -init_per_group(cluster_later_join = Name, Config) -> - Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), - init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); +%% init_per_group(cluster_later_join = Name, Config) -> +%% Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), +%% init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); init_per_group(Name, Config) -> WorkDir = filename:join(?config(priv_dir, Config), Name), Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]). init_api(Config) -> - APINode = ?config(node, Config), - {ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []), - [{api, App} | Config]. + Node = ?config(node, Config), + {ok, ApiKey} = erpc:call(Node, emqx_common_test_http, create_default_app, []), + [{api_key, ApiKey} | Config]. mk_cluster(Name, Config) -> mk_cluster(Name, Config, #{}). @@ -172,21 +174,69 @@ end_per_group(_, Config) -> emqx_cth_suite:stop(?config(group_apps, Config)), ok. -init_per_testcase(TestCase, Config) -> - ?MODULE:TestCase({init, Config}). +init_per_testcase(_TestCase, Config) -> + case ?config(cluster_nodes, Config) of + undefined -> + init_mocks(); + Nodes -> + [erpc:call(Node, ?MODULE, inject_mocks, []) || Node <- Nodes] + end, + Config. -end_per_testcase(TestCase, Config) -> +end_per_testcase(_TestCase, Config) -> + case ?config(cluster_nodes, Config) of + undefined -> + meck:unload(); + Nodes -> + [erpc:call(Node, meck, unload, []) || Node <- Nodes] + end, Node = ?config(node, Config), ok = emqx_common_test_helpers:call_janitor(), ok = erpc:call(Node, fun clear_resources/0), - ?MODULE:TestCase({'end', Config}). + ok. + +inject_mocks() -> + _Pid = spawn(fun() -> + _Mocks = ?MODULE:init_mocks(), + receive + stop_mocks -> + meck:unload() + end + end). + +-define(CONNECTOR_IMPL, dummy_connector_impl). +init_mocks() -> + meck:new(emqx_connector_ee_schema, [passthrough]), + meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), + meck:new(?CONNECTOR_IMPL, [non_strict]), + meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), + meck:expect( + ?CONNECTOR_IMPL, + on_start, + fun + (<<"connector:", ?CONNECTOR_TYPE_STR, ":bad_", _/binary>>, _C) -> + {ok, bad_connector_state}; + (_I, _C) -> + {ok, connector_state} + end + ), + meck:expect(?CONNECTOR_IMPL, on_stop, 2, ok), + meck:expect( + ?CONNECTOR_IMPL, + on_get_status, + fun + (_, bad_connector_state) -> connecting; + (_, _) -> connected + end + ), + [?CONNECTOR_IMPL, emqx_connector_ee_schema]. clear_resources() -> lists:foreach( fun(#{type := Type, name := Name}) -> - {ok, _} = emqx_bridge:remove(Type, Name) + {ok, _} = emqx_connector:remove(Type, Name) end, - emqx_bridge:list() + emqx_connector:list() ). %%------------------------------------------------------------------------------ @@ -196,43 +246,28 @@ clear_resources() -> %% We have to pretend testing a kafka connector since at this point that's the %% only one that's implemented. --define(CONNECTOR_IMPL, dummy_connector_impl). -t_connector_lifecycle({init, Config}) -> - meck:new(emqx_connector_ee_schema, [passthrough]), - meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), - meck:new(?CONNECTOR_IMPL, [non_strict]), - meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), - meck:expect(?CONNECTOR_IMPL, on_start, 2, {ok, connector_state}), - meck:expect(?CONNECTOR_IMPL, on_stop, 2, ok), - meck:expect(?CONNECTOR_IMPL, on_get_status, 2, connected), - [{mocked_mods, [?CONNECTOR_IMPL, emqx_connector_ee_schema]} | Config]; -t_connector_lifecycle({'end', Config}) -> - MockedMods = ?config(mocked_mods, Config), - meck:unload(MockedMods), - Config; -t_connector_lifecycle(Config) -> +t_connectors_lifecycle(Config) -> %% assert we there's no bridges at first {ok, 200, []} = request_json(get, uri(["connectors"]), Config), {ok, 404, _} = request(get, uri(["connectors", "foo"]), Config), {ok, 404, _} = request(get, uri(["connectors", "kafka:foo"]), Config), - BootstrapHosts = <<"localhost:9092">>, - % needed for patterns below + %% need a var for patterns below ConnectorName = ?CONNECTOR_NAME, ?assertMatch( {ok, 201, #{ - <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"type">> := ?CONNECTOR_TYPE, <<"name">> := ConnectorName, <<"enable">> := true, - <<"bootstrap_hosts">> := BootstrapHosts, + <<"bootstrap_hosts">> := _, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _] }}, request_json( post, uri(["connectors"]), - ?KAFKA_CONNECTOR(?CONNECTOR_NAME, BootstrapHosts), + ?KAFKA_CONNECTOR(?CONNECTOR_NAME), Config ) ), @@ -241,7 +276,7 @@ t_connector_lifecycle(Config) -> ?assertMatch( {ok, 200, [ #{ - <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"type">> := ?CONNECTOR_TYPE, <<"name">> := ConnectorName, <<"enable">> := true, <<"status">> := _, @@ -251,14 +286,13 @@ t_connector_lifecycle(Config) -> request_json(get, uri(["connectors"]), Config) ), - ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE_KAFKA, ?CONNECTOR_NAME), - %% send an message to emqx and the message should be forwarded to the HTTP server + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME), - %% [TODO] update the request-path of the connector ?assertMatch( {ok, 200, #{ - <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"type">> := ?CONNECTOR_TYPE, <<"name">> := ConnectorName, + <<"bootstrap_hosts">> := <<"foobla:1234">>, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _] @@ -266,7 +300,7 @@ t_connector_lifecycle(Config) -> request_json( put, uri(["connectors", ConnectorID]), - ?KAFKA_CONNECTOR(?CONNECTOR_NAME, BootstrapHosts), + ?KAFKA_CONNECTOR(?CONNECTOR_NAME, <<"foobla:1234">>), Config ) ), @@ -275,7 +309,7 @@ t_connector_lifecycle(Config) -> ?assertMatch( {ok, 200, [ #{ - <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"type">> := ?CONNECTOR_TYPE, <<"name">> := ConnectorName, <<"enable">> := true, <<"status">> := _, @@ -288,7 +322,7 @@ t_connector_lifecycle(Config) -> %% get the connector by id ?assertMatch( {ok, 200, #{ - <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"type">> := ?CONNECTOR_TYPE, <<"name">> := ConnectorName, <<"enable">> := true, <<"status">> := _, @@ -297,62 +331,13 @@ t_connector_lifecycle(Config) -> request_json(get, uri(["connectors", ConnectorID]), Config) ), - %% Test bad updates - %% ================ - - %% Add connector with a name that is too long - %% We only support connector names up to 255 characters - %% LongName = list_to_binary(lists:duplicate(256, $a)), - %% NameTooLongRequestResult = request_json( - %% post, - %% uri(["connectors"]), - %% ?HTTP_CONNECTOR(URL1, LongName), - %% Config - %% ), - %% ?assertMatch( - %% {ok, 400, _}, - %% NameTooLongRequestResult - %% ), - %% {ok, 400, #{<<"message">> := NameTooLongMessage}} = NameTooLongRequestResult, - %% %% Use regex to check that the message contains the name - %% Match = re:run(NameTooLongMessage, LongName), - %% ?assertMatch({match, _}, Match), - %% %% Add connector without the URL field - %% {ok, 400, PutFail1} = request_json( - %% put, - %% uri(["connectors", ConnectorID]), - %% maps:remove(<<"url">>, ?HTTP_CONNECTOR(URL2, Name)), - %% Config - %% ), - %% ?assertMatch( - %% #{<<"reason">> := <<"required_field">>}, - %% json(maps:get(<<"message">>, PutFail1)) - %% ), - %% {ok, 400, PutFail2} = request_json( - %% put, - %% uri(["connectors", ConnectorID]), - %% maps:put(<<"curl">>, URL2, maps:remove(<<"url">>, ?HTTP_CONNECTOR(URL2, Name))), - %% Config - %% ), - %% ?assertMatch( - %% #{ - %% <<"reason">> := <<"unknown_fields">>, - %% <<"unknown">> := <<"curl">> - %% }, - %% json(maps:get(<<"message">>, PutFail2)) - %% ), - %% {ok, 400, _} = request_json( - %% put, - %% uri(["connectors", ConnectorID]), - %% ?HTTP_CONNECTOR(<<"localhost:1234/foo">>, Name), - %% Config - %% ), - %% {ok, 400, _} = request_json( - %% put, - %% uri(["connectors", ConnectorID]), - %% ?HTTP_CONNECTOR(<<"htpp://localhost:12341234/foo">>, Name), - %% Config - %% ), + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Invalid operation", _/binary>> + }}, + request_json(post, uri(["connectors", ConnectorID, "brababbel"]), Config) + ), %% delete the connector {ok, 204, <<>>} = request(delete, uri(["connectors", ConnectorID]), Config), @@ -367,20 +352,11 @@ t_connector_lifecycle(Config) -> request_json( put, uri(["connectors", ConnectorID]), - ?KAFKA_CONNECTOR(?CONNECTOR_NAME, BootstrapHosts), + ?KAFKA_CONNECTOR(?CONNECTOR_NAME), Config ) ), - %% try delete bad connector id - ?assertMatch( - {ok, 404, #{ - <<"code">> := <<"NOT_FOUND">>, - <<"message">> := <<"Invalid connector ID", _/binary>> - }}, - request_json(delete, uri(["connectors", "foo"]), Config) - ), - %% Deleting a non-existing connector should result in an error ?assertMatch( {ok, 404, #{ @@ -390,198 +366,142 @@ t_connector_lifecycle(Config) -> request_json(delete, uri(["connectors", ConnectorID]), Config) ), - %% Create non working connector - %% BrokenURL = ?URL(Port + 1, "/foo"), - %% {ok, 201, BrokenConnector} = request( - %% post, - %% uri(["connectors"]), - %% ?HTTP_CONNECTOR(BrokenURL, Name), - %% fun json/1, - %% Config - %% ), - %% ?assertMatch( - %% #{ - %% <<"type">> := ?CONNECTOR_TYPE_HTTP, - %% <<"name">> := Name, - %% <<"enable">> := true, - %% <<"status">> := <<"disconnected">>, - %% <<"status_reason">> := <<"Connection refused">>, - %% <<"node_status">> := [ - %% #{ - %% <<"status">> := <<"disconnected">>, - %% <<"status_reason">> := <<"Connection refused">> - %% } - %% | _ - %% ], - %% <<"url">> := BrokenURL - %% }, - %% BrokenConnector - %% ), + %% try delete unknown connector id + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Invalid connector ID", _/binary>> + }}, + request_json(delete, uri(["connectors", "foo"]), Config) + ), - %% {ok, 200, FixedConnector} = request_json( - %% put, - %% uri(["connectors", ConnectorID]), - %% ?HTTP_CONNECTOR(URL1), - %% Config - %% ), - %% ?assertMatch( - %% #{ - %% <<"status">> := <<"connected">>, - %% <<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _] - %% } when - %% not is_map_key(<<"status_reason">>, FixedConnector) andalso - %% not is_map_key(<<"status_reason">>, FixedNodeStatus), - %% FixedConnector - %% ), - - %% %% Try create connector with bad characters as name - %% {ok, 400, _} = request(post, uri(["connectors"]), ?HTTP_CONNECTOR(URL1, <<"隋达"/utf8>>), Config), - - %% %% Missing scheme in URL - %% {ok, 400, _} = request( - %% post, - %% uri(["connectors"]), - %% ?HTTP_CONNECTOR(<<"localhost:1234/foo">>, <<"missing_url_scheme">>), - %% Config - %% ), - - %% %% Invalid port - %% {ok, 400, _} = request( - %% post, - %% uri(["connectors"]), - %% ?HTTP_CONNECTOR(<<"http://localhost:12341234/foo">>, <<"invalid_port">>), - %% Config - %% ), - - %% {ok, 204, <<>>} = request(delete, uri(["connectors", ConnectorID]), Config) + %% Try create connector with bad characters as name + {ok, 400, _} = request(post, uri(["connectors"]), ?KAFKA_CONNECTOR(<<"隋达"/utf8>>), Config), ok. -%% t_start_bridge_unknown_node(Config) -> -%% {ok, 404, _} = -%% request( -%% post, -%% uri(["nodes", "thisbetterbenotanatomyet", "bridges", "webhook:foo", start]), -%% Config -%% ), -%% {ok, 404, _} = -%% request( -%% post, -%% uri(["nodes", "undefined", "bridges", "webhook:foo", start]), -%% Config -%% ). +t_start_connector_unknown_node(Config) -> + {ok, 404, _} = + request( + post, + uri(["nodes", "thisbetterbenotanatomyet", "connectors", "kafka:foo", start]), + Config + ), + {ok, 404, _} = + request( + post, + uri(["nodes", "undefined", "connectors", "kafka:foo", start]), + Config + ). -%% t_start_stop_bridges_node(Config) -> -%% do_start_stop_bridges(node, Config). +t_start_stop_connectors_node(Config) -> + do_start_stop_connectors(node, Config). -%% t_start_stop_bridges_cluster(Config) -> -%% do_start_stop_bridges(cluster, Config). +t_start_stop_connectors_cluster(Config) -> + do_start_stop_connectors(cluster, Config). -%% do_start_stop_bridges(Type, Config) -> -%% %% assert we there's no bridges at first -%% {ok, 200, []} = request_json(get, uri(["bridges"]), Config), +do_start_stop_connectors(TestType, Config) -> + %% assert we there's no connectors at first + {ok, 200, []} = request_json(get, uri(["connectors"]), Config), -%% Port = ?config(port, Config), -%% URL1 = ?URL(Port, "abc"), -%% Name = atom_to_binary(Type), -%% ?assertMatch( -%% {ok, 201, #{ -%% <<"type">> := ?BRIDGE_TYPE_HTTP, -%% <<"name">> := Name, -%% <<"enable">> := true, -%% <<"status">> := <<"connected">>, -%% <<"node_status">> := [_ | _], -%% <<"url">> := URL1 -%% }}, -%% request_json( -%% post, -%% uri(["bridges"]), -%% ?HTTP_BRIDGE(URL1, Name), -%% Config -%% ) -%% ), + Name = atom_to_binary(TestType), + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?CONNECTOR_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri(["connectors"]), + ?KAFKA_CONNECTOR(Name), + Config + ) + ), -%% BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), -%% ExpectedStatus = -%% case ?config(group, Config) of -%% cluster when Type == node -> -%% <<"inconsistent">>; -%% _ -> -%% <<"stopped">> -%% end, + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), + ExpectedStatus = + case ?config(group, Config) of + cluster when TestType == node -> + <<"inconsistent">>; + _ -> + <<"stopped">> + end, -%% %% stop it -%% {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config), -%% ?assertMatch( -%% {ok, 200, #{<<"status">> := ExpectedStatus}}, -%% request_json(get, uri(["bridges", BridgeID]), Config) -%% ), -%% %% start again -%% {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config), -%% ?assertMatch( -%% {ok, 200, #{<<"status">> := <<"connected">>}}, -%% request_json(get, uri(["bridges", BridgeID]), Config) -%% ), -%% %% start a started bridge -%% {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config), -%% ?assertMatch( -%% {ok, 200, #{<<"status">> := <<"connected">>}}, -%% request_json(get, uri(["bridges", BridgeID]), Config) -%% ), -%% %% restart an already started bridge -%% {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config), -%% ?assertMatch( -%% {ok, 200, #{<<"status">> := <<"connected">>}}, -%% request_json(get, uri(["bridges", BridgeID]), Config) -%% ), -%% %% stop it again -%% {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config), -%% %% restart a stopped bridge -%% {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config), -%% ?assertMatch( -%% {ok, 200, #{<<"status">> := <<"connected">>}}, -%% request_json(get, uri(["bridges", BridgeID]), Config) -%% ), + %% stop it + {ok, 204, <<>>} = request(post, {operation, TestType, stop, ConnectorID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := ExpectedStatus}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + %% start again + {ok, 204, <<>>} = request(post, {operation, TestType, start, ConnectorID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + %% start a started connector + {ok, 204, <<>>} = request(post, {operation, TestType, start, ConnectorID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + %% restart an already started connector + {ok, 204, <<>>} = request(post, {operation, TestType, restart, ConnectorID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + %% stop it again + {ok, 204, <<>>} = request(post, {operation, TestType, stop, ConnectorID}, Config), + %% restart a stopped connector + {ok, 204, <<>>} = request(post, {operation, TestType, restart, ConnectorID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), -%% {ok, 404, _} = request(post, {operation, Type, invalidop, BridgeID}, Config), + {ok, 404, _} = request(post, {operation, TestType, invalidop, ConnectorID}, Config), -%% %% delete the bridge -%% {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), -%% {ok, 200, []} = request_json(get, uri(["bridges"]), Config), + %% delete the connector + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnectorID]), Config), + {ok, 200, []} = request_json(get, uri(["connectors"]), Config), -%% %% Fail parse-id check -%% {ok, 404, _} = request(post, {operation, Type, start, <<"wreckbook_fugazi">>}, Config), -%% %% Looks ok but doesn't exist -%% {ok, 404, _} = request(post, {operation, Type, start, <<"webhook:cptn_hook">>}, Config), + %% Fail parse-id check + {ok, 404, _} = request(post, {operation, TestType, start, <<"wreckbook_fugazi">>}, Config), + %% Looks ok but doesn't exist + {ok, 404, _} = request(post, {operation, TestType, start, <<"webhook:cptn_hook">>}, Config), -%% %% Create broken bridge -%% {ListenPort, Sock} = listen_on_random_port(), -%% %% Connecting to this endpoint should always timeout -%% BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])), -%% BadName = <<"bad_", (atom_to_binary(Type))/binary>>, -%% ?assertMatch( -%% {ok, 201, #{ -%% <<"type">> := ?BRIDGE_TYPE_MQTT, -%% <<"name">> := BadName, -%% <<"enable">> := true, -%% <<"server">> := BadServer, -%% <<"status">> := <<"connecting">>, -%% <<"node_status">> := [_ | _] -%% }}, -%% request_json( -%% post, -%% uri(["bridges"]), -%% ?MQTT_BRIDGE(BadServer, BadName), -%% Config -%% ) -%% ), -%% BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), -%% ?assertMatch( -%% %% request from product: return 400 on such errors -%% {ok, SC, _} when SC == 500 orelse SC == 400, -%% request(post, {operation, Type, start, BadBridgeID}, Config) -%% ), -%% ok = gen_tcp:close(Sock), -%% ok. + %% Create broken connector + {ListenPort, Sock} = listen_on_random_port(), + %% Connecting to this endpoint should always timeout + BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])), + BadName = <<"bad_", (atom_to_binary(TestType))/binary>>, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?CONNECTOR_TYPE, + <<"name">> := BadName, + <<"enable">> := true, + <<"bootstrap_hosts">> := BadServer, + <<"status">> := <<"connecting">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri(["connectors"]), + ?KAFKA_CONNECTOR(BadName, BadServer), + Config + ) + ), + BadConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, BadName), + ?assertMatch( + %% request from product: return 400 on such errors + {ok, SC, _} when SC == 500 orelse SC == 400, + request(post, {operation, TestType, start, BadConnectorID}, Config) + ), + ok = gen_tcp:close(Sock), + ok. %% t_start_stop_inconsistent_bridge_node(Config) -> %% start_stop_inconsistent_bridge(node, Config). @@ -622,277 +542,146 @@ t_connector_lifecycle(Config) -> %% post, {operation, Type, stop, <<"webhook:bridge_not_found">>}, Config %% ). -%% t_enable_disable_bridges(Config) -> -%% %% assert we there's no bridges at first -%% {ok, 200, []} = request_json(get, uri(["bridges"]), Config), +t_enable_disable_connectors(Config) -> + %% assert we there's no connectors at first + {ok, 200, []} = request_json(get, uri(["connectors"]), Config), -%% Name = ?BRIDGE_NAME, -%% Port = ?config(port, Config), -%% URL1 = ?URL(Port, "abc"), -%% ?assertMatch( -%% {ok, 201, #{ -%% <<"type">> := ?BRIDGE_TYPE_HTTP, -%% <<"name">> := Name, -%% <<"enable">> := true, -%% <<"status">> := <<"connected">>, -%% <<"node_status">> := [_ | _], -%% <<"url">> := URL1 -%% }}, -%% request_json( -%% post, -%% uri(["bridges"]), -%% ?HTTP_BRIDGE(URL1, Name), -%% Config -%% ) -%% ), -%% BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), -%% %% disable it -%% {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), -%% ?assertMatch( -%% {ok, 200, #{<<"status">> := <<"stopped">>}}, -%% request_json(get, uri(["bridges", BridgeID]), Config) -%% ), -%% %% enable again -%% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), -%% ?assertMatch( -%% {ok, 200, #{<<"status">> := <<"connected">>}}, -%% request_json(get, uri(["bridges", BridgeID]), Config) -%% ), -%% %% enable an already started bridge -%% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), -%% ?assertMatch( -%% {ok, 200, #{<<"status">> := <<"connected">>}}, -%% request_json(get, uri(["bridges", BridgeID]), Config) -%% ), -%% %% disable it again -%% {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), + Name = ?CONNECTOR_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?CONNECTOR_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri(["connectors"]), + ?KAFKA_CONNECTOR(Name), + Config + ) + ), + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), + %% disable it + {ok, 204, <<>>} = request(put, enable_path(false, ConnectorID), Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"stopped">>}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + %% enable again + {ok, 204, <<>>} = request(put, enable_path(true, ConnectorID), Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + %% enable an already started connector + {ok, 204, <<>>} = request(put, enable_path(true, ConnectorID), Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + %% disable it again + {ok, 204, <<>>} = request(put, enable_path(false, ConnectorID), Config), -%% %% bad param -%% {ok, 404, _} = request(put, enable_path(foo, BridgeID), Config), -%% {ok, 404, _} = request(put, enable_path(true, "foo"), Config), -%% {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config), + %% bad param + {ok, 404, _} = request(put, enable_path(foo, ConnectorID), Config), + {ok, 404, _} = request(put, enable_path(true, "foo"), Config), + {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config), -%% {ok, 400, Res} = request(post, {operation, node, start, BridgeID}, <<>>, fun json/1, Config), -%% ?assertEqual( -%% #{ -%% <<"code">> => <<"BAD_REQUEST">>, -%% <<"message">> => <<"Forbidden operation, bridge not enabled">> -%% }, -%% Res -%% ), -%% {ok, 400, Res} = request(post, {operation, cluster, start, BridgeID}, <<>>, fun json/1, Config), + {ok, 400, Res} = request(post, {operation, node, start, ConnectorID}, <<>>, fun json/1, Config), + ?assertEqual( + #{ + <<"code">> => <<"BAD_REQUEST">>, + <<"message">> => <<"Forbidden operation, connector not enabled">> + }, + Res + ), + {ok, 400, Res} = request( + post, {operation, cluster, start, ConnectorID}, <<>>, fun json/1, Config + ), -%% %% enable a stopped bridge -%% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), -%% ?assertMatch( -%% {ok, 200, #{<<"status">> := <<"connected">>}}, -%% request_json(get, uri(["bridges", BridgeID]), Config) -%% ), -%% %% delete the bridge -%% {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), -%% {ok, 200, []} = request_json(get, uri(["bridges"]), Config). + %% enable a stopped connector + {ok, 204, <<>>} = request(put, enable_path(true, ConnectorID), Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + %% delete the connector + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnectorID]), Config), + {ok, 200, []} = request_json(get, uri(["connectors"]), Config). -%% t_with_redact_update(Config) -> -%% Name = <<"redact_update">>, -%% Type = <<"mqtt">>, -%% Password = <<"123456">>, -%% Template = #{ -%% <<"type">> => Type, -%% <<"name">> => Name, -%% <<"server">> => <<"127.0.0.1:1883">>, -%% <<"username">> => <<"test">>, -%% <<"password">> => Password, -%% <<"ingress">> => -%% #{<<"remote">> => #{<<"topic">> => <<"t/#">>}} -%% }, +t_with_redact_update(Config) -> + Name = <<"redact_update">>, + Password = <<"123456">>, + Template = ?KAFKA_CONNECTOR(Name)#{ + <<"authentication">> => #{ + <<"mechanism">> => <<"plain">>, + <<"username">> => <<"test">>, + <<"password">> => Password + } + }, -%% {ok, 201, _} = request( -%% post, -%% uri(["bridges"]), -%% Template, -%% Config -%% ), + {ok, 201, _} = request( + post, + uri(["connectors"]), + Template, + Config + ), -%% %% update with redacted config -%% BridgeConf = emqx_utils:redact(Template), -%% BridgeID = emqx_bridge_resource:bridge_id(Type, Name), -%% {ok, 200, _} = request(put, uri(["bridges", BridgeID]), BridgeConf, Config), -%% ?assertEqual( -%% Password, -%% get_raw_config([bridges, Type, Name, password], Config) -%% ), -%% ok. + %% update with redacted config + ConnectorConf = emqx_utils:redact(Template), + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), + {ok, 200, _} = request(put, uri(["connectors", ConnectorID]), ConnectorConf, Config), + ?assertEqual( + Password, + get_raw_config([connectors, ?CONNECTOR_TYPE, Name, authentication, password], Config) + ), + ok. -%% t_bridges_probe(Config) -> -%% Port = ?config(port, Config), -%% URL = ?URL(Port, "some_path"), +t_connectors_probe(Config) -> + {ok, 204, <<>>} = request( + post, + uri(["connectors_probe"]), + ?KAFKA_CONNECTOR(?CONNECTOR_NAME), + Config + ), -%% {ok, 204, <<>>} = request( -%% post, -%% uri(["bridges_probe"]), -%% ?HTTP_BRIDGE(URL), -%% Config -%% ), + %% second time with same name is ok since no real connector created + {ok, 204, <<>>} = request( + post, + uri(["connectors_probe"]), + ?KAFKA_CONNECTOR(?CONNECTOR_NAME), + Config + ), -%% %% second time with same name is ok since no real bridge created -%% {ok, 204, <<>>} = request( -%% post, -%% uri(["bridges_probe"]), -%% ?HTTP_BRIDGE(URL), -%% Config -%% ), + meck:expect(?CONNECTOR_IMPL, on_start, 2, {error, on_start_error}), -%% ?assertMatch( -%% {ok, 400, #{ -%% <<"code">> := <<"TEST_FAILED">>, -%% <<"message">> := _ -%% }}, -%% request_json( -%% post, -%% uri(["bridges_probe"]), -%% ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>), -%% Config -%% ) -%% ), + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"TEST_FAILED">>, + <<"message">> := _ + }}, + request_json( + post, + uri(["connectors_probe"]), + ?KAFKA_CONNECTOR(<<"broken_connector">>, <<"brokenhost:1234">>), + Config + ) + ), -%% %% Missing scheme in URL -%% ?assertMatch( -%% {ok, 400, #{ -%% <<"code">> := <<"TEST_FAILED">>, -%% <<"message">> := _ -%% }}, -%% request_json( -%% post, -%% uri(["bridges_probe"]), -%% ?HTTP_BRIDGE(<<"203.0.113.3:1234/foo">>), -%% Config -%% ) -%% ), + meck:expect(?CONNECTOR_IMPL, on_start, 2, {ok, connector_state}), -%% %% Invalid port -%% ?assertMatch( -%% {ok, 400, #{ -%% <<"code">> := <<"TEST_FAILED">>, -%% <<"message">> := _ -%% }}, -%% request_json( -%% post, -%% uri(["bridges_probe"]), -%% ?HTTP_BRIDGE(<<"http://203.0.113.3:12341234/foo">>), -%% Config -%% ) -%% ), - -%% {ok, 204, _} = request( -%% post, -%% uri(["bridges_probe"]), -%% ?MQTT_BRIDGE(<<"127.0.0.1:1883">>), -%% Config -%% ), - -%% ?assertMatch( -%% {ok, 400, #{ -%% <<"code">> := <<"TEST_FAILED">>, -%% <<"message">> := <<"Connection refused">> -%% }}, -%% request_json( -%% post, -%% uri(["bridges_probe"]), -%% ?MQTT_BRIDGE(<<"127.0.0.1:2883">>), -%% Config -%% ) -%% ), - -%% ?assertMatch( -%% {ok, 400, #{ -%% <<"code">> := <<"TEST_FAILED">>, -%% <<"message">> := <<"Could not resolve host">> -%% }}, -%% request_json( -%% post, -%% uri(["bridges_probe"]), -%% ?MQTT_BRIDGE(<<"nohost:2883">>), -%% Config -%% ) -%% ), - -%% AuthnConfig = #{ -%% <<"mechanism">> => <<"password_based">>, -%% <<"backend">> => <<"built_in_database">>, -%% <<"user_id_type">> => <<"username">> -%% }, -%% Chain = 'mqtt:global', -%% {ok, _} = update_config( -%% [authentication], -%% {create_authenticator, Chain, AuthnConfig}, -%% Config -%% ), -%% User = #{user_id => <<"u">>, password => <<"p">>}, -%% AuthenticatorID = <<"password_based:built_in_database">>, -%% {ok, _} = add_user_auth( -%% Chain, -%% AuthenticatorID, -%% User, -%% Config -%% ), - -%% emqx_common_test_helpers:on_exit(fun() -> -%% delete_user_auth(Chain, AuthenticatorID, User, Config) -%% end), - -%% ?assertMatch( -%% {ok, 400, #{ -%% <<"code">> := <<"TEST_FAILED">>, -%% <<"message">> := <<"Unauthorized client">> -%% }}, -%% request_json( -%% post, -%% uri(["bridges_probe"]), -%% ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{<<"proto_ver">> => <<"v4">>}, -%% Config -%% ) -%% ), - -%% ?assertMatch( -%% {ok, 400, #{ -%% <<"code">> := <<"TEST_FAILED">>, -%% <<"message">> := <<"Bad username or password">> -%% }}, -%% request_json( -%% post, -%% uri(["bridges_probe"]), -%% ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{ -%% <<"proto_ver">> => <<"v4">>, -%% <<"password">> => <<"mySecret">>, -%% <<"username">> => <<"u">> -%% }, -%% Config -%% ) -%% ), - -%% ?assertMatch( -%% {ok, 400, #{ -%% <<"code">> := <<"TEST_FAILED">>, -%% <<"message">> := <<"Not authorized">> -%% }}, -%% request_json( -%% post, -%% uri(["bridges_probe"]), -%% ?MQTT_BRIDGE(<<"127.0.0.1:1883">>), -%% Config -%% ) -%% ), - -%% ?assertMatch( -%% {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, -%% request_json( -%% post, -%% uri(["bridges_probe"]), -%% ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>), -%% Config -%% ) -%% ), -%% ok. + ?assertMatch( + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, + request_json( + post, + uri(["connectors_probe"]), + ?CONNECTOR(<<"broken_connector">>, <<"unknown_type">>), + Config + ) + ), + ok. %%% helpers listen_on_random_port() -> @@ -912,14 +701,17 @@ request(Method, {operation, Type, Op, BridgeID}, Body, Config) -> URL = operation_path(Type, Op, BridgeID, Config), request(Method, URL, Body, Config); request(Method, URL, Body, Config) -> - AuthHeader = emqx_common_test_http:auth_header(?config(api, Config)), + AuthHeader = emqx_common_test_http:auth_header(?config(api_key, Config)), Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts). request(Method, URL, Body, Decoder, Config) -> case request(Method, URL, Body, Config) of {ok, Code, Response} -> - {ok, Code, Decoder(Response)}; + case Decoder(Response) of + {error, _} = Error -> Error; + Decoded -> {ok, Code, Decoded} + end; Otherwise -> Otherwise end. @@ -930,13 +722,13 @@ request_json(Method, URLLike, Config) -> request_json(Method, URLLike, Body, Config) -> request(Method, URLLike, Body, fun json/1, Config). -operation_path(node, Oper, BridgeID, Config) -> - uri(["nodes", ?config(node, Config), "bridges", BridgeID, Oper]); -operation_path(cluster, Oper, BridgeID, _Config) -> - uri(["bridges", BridgeID, Oper]). +operation_path(node, Oper, ConnectorID, Config) -> + uri(["nodes", ?config(node, Config), "connectors", ConnectorID, Oper]); +operation_path(cluster, Oper, ConnectorID, _Config) -> + uri(["connectors", ConnectorID, Oper]). -enable_path(Enable, BridgeID) -> - uri(["bridges", BridgeID, "enable", Enable]). +enable_path(Enable, ConnectorID) -> + uri(["connectors", ConnectorID, "enable", Enable]). publish_message(Topic, Body, Config) -> Node = ?config(node, Config), @@ -962,4 +754,10 @@ str(S) when is_list(S) -> S; str(S) when is_binary(S) -> binary_to_list(S). json(B) when is_binary(B) -> - emqx_utils_json:decode(B, [return_maps]). + case emqx_utils_json:safe_decode(B, [return_maps]) of + {ok, Term} -> + Term; + {error, Reason} = Error -> + ct:pal("Failed to decode json: ~p~n~p", [Reason, B]), + Error + end.