From 7f3e23eed9a6de7182373fd092d082547dd63494 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 10 Oct 2023 20:51:55 +0200 Subject: [PATCH] test: initial add of /connectors api suite --- .../test/emqx_connector_api_SUITE.erl | 950 ++++++++++++++++++ 1 file changed, 950 insertions(+) create mode 100644 apps/emqx_connector/test/emqx_connector_api_SUITE.erl diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl new file mode 100644 index 000000000..02cee7fa4 --- /dev/null +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -0,0 +1,950 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_api_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_mgmt_api_test_util, [uri/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). + +-define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))). +-define(CONNECTOR(NAME, TYPE), #{ + <<"enable">> => true, + %<<"ssl">> => #{<<"enable">> => false}, + <<"type">> => TYPE, + <<"name">> => NAME +}). + +-define(CONNECTOR_TYPE_KAFKA, <<"kafka">>). +-define(KAFKA_CONNECTOR(Name, BootstrapHosts), ?CONNECTOR(Name, ?CONNECTOR_TYPE_KAFKA)#{ + <<"authentication">> => <<"none">>, + <<"bootstrap_hosts">> => BootstrapHosts, + <<"connect_timeout">> => <<"5s">>, + <<"metadata_request_timeout">> => <<"5s">>, + <<"min_metadata_refresh_interval">> => <<"3s">>, + <<"socket_opts">> => + #{ + <<"nodelay">> => true, + <<"recbuf">> => <<"1024KB">>, + <<"sndbuf">> => <<"1024KB">>, + <<"tcp_keepalive">> => <<"none">> + } +}). + +%% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>). +%% -define(MQTT_CONNECTOR(SERVER, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_MQTT)#{ +%% <<"server">> => SERVER, +%% <<"username">> => <<"user1">>, +%% <<"password">> => <<"">>, +%% <<"proto_ver">> => <<"v5">>, +%% <<"egress">> => #{ +%% <<"remote">> => #{ +%% <<"topic">> => <<"emqx/${topic}">>, +%% <<"qos">> => <<"${qos}">>, +%% <<"retain">> => false +%% } +%% } +%% }). +%% -define(MQTT_CONNECTOR(SERVER), ?MQTT_CONNECTOR(SERVER, <<"mqtt_egress_test_connector">>)). + +%% -define(CONNECTOR_TYPE_HTTP, <<"kafka">>). +%% -define(HTTP_CONNECTOR(URL, NAME), ?CONNECTOR(NAME, ?CONNECTOR_TYPE_HTTP)#{ +%% <<"url">> => URL, +%% <<"local_topic">> => <<"emqx_webhook/#">>, +%% <<"method">> => <<"post">>, +%% <<"body">> => <<"${payload}">>, +%% <<"headers">> => #{ +%% % NOTE +%% % The Pascal-Case is important here. +%% % The reason is kinda ridiculous: `emqx_connector_resource:create_dry_run/2` converts +%% % connector config keys into atoms, and the atom 'Content-Type' exists in the ERTS +%% % when this happens (while the 'content-type' does not). +%% <<"Content-Type">> => <<"application/json">> +%% } +%% }). +%% -define(HTTP_CONNECTOR(URL), ?HTTP_CONNECTOR(URL, ?CONNECTOR_NAME)). + +%% -define(URL(PORT, PATH), +%% list_to_binary( +%% io_lib:format( +%% "http://localhost:~s/~s", +%% [integer_to_list(PORT), PATH] +%% ) +%% ) +%% ). + +-define(APPSPECS, [ + emqx_conf, + emqx, + emqx_authn, + emqx_management, + {emqx_connector, "connectors {}"}, + emqx_bridge_kafka +]). + +-define(APPSPEC_DASHBOARD, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} +). + +all() -> + [ + %, + {group, single} + %{group, cluster_later_join}, + %{group, cluster} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + SingleOnlyTests = [ + t_http_crud_apis + %t_bridges_probe + ], + ClusterLaterJoinOnlyTCs = [ + % t_cluster_later_join_metrics + ], + [ + {single, [], AllTCs -- ClusterLaterJoinOnlyTCs}, + {cluster_later_join, [], ClusterLaterJoinOnlyTCs}, + {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs} + ]. + +suite() -> + [{timetrap, {seconds, 60}}]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(cluster = Name, Config) -> + Nodes = [NodePrimary | _] = mk_cluster(Name, Config), + init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); +init_per_group(cluster_later_join = Name, Config) -> + Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), + init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); +init_per_group(Name, Config) -> + WorkDir = filename:join(?config(priv_dir, Config), Name), + Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), + init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]). + +init_api(Config) -> + APINode = ?config(node, Config), + {ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []), + [{api, App} | Config]. + +mk_cluster(Name, Config) -> + mk_cluster(Name, Config, #{}). + +mk_cluster(Name, Config, Opts) -> + Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD], + Node2Apps = ?APPSPECS, + emqx_cth_cluster:start( + [ + {emqx_bridge_api_SUITE_1, Opts#{role => core, apps => Node1Apps}}, + {emqx_bridge_api_SUITE_2, Opts#{role => core, apps => Node2Apps}} + ], + #{work_dir => filename:join(?config(priv_dir, Config), Name)} + ). + +end_per_group(Group, Config) when + Group =:= cluster; + Group =:= cluster_later_join +-> + ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)); +end_per_group(_, Config) -> + emqx_cth_suite:stop(?config(group_apps, Config)), + ok. + +end_per_testcase(_, Config) -> + Node = ?config(node, Config), + ok = emqx_common_test_helpers:call_janitor(), + ok = erpc:call(Node, fun clear_resources/0), + ok. + +clear_resources() -> + lists:foreach( + fun(#{type := Type, name := Name}) -> + {ok, _} = emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +%% We have to pretend testing a kafka connector since at this point that's the +%% only one that's implemented. + +t_http_crud_apis(Config) -> + %% assert we there's no bridges at first + {ok, 200, []} = request_json(get, uri(["connectors"]), Config), + + {ok, 404, _} = request(get, uri(["connectors", "foo"]), Config), + {ok, 404, _} = request(get, uri(["connectors", "kafka:foo"]), Config), + + BootstrapHosts = <<"localhost:9092">>, + % needed for patterns below + ConnectorName = ?CONNECTOR_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"name">> := ConnectorName, + <<"enable">> := true, + <<"bootstrap_hosts">> := BootstrapHosts, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri(["connectors"]), + ?KAFKA_CONNECTOR(?CONNECTOR_NAME, BootstrapHosts), + Config + ) + ), + + %% list all connectors, assert Connector is in it + ?assertMatch( + {ok, 200, [ + #{ + <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"name">> := ConnectorName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + } + ]}, + request_json(get, uri(["connectors"]), Config) + ), + + ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE_KAFKA, ?CONNECTOR_NAME), + %% send an message to emqx and the message should be forwarded to the HTTP server + + %% [TODO] update the request-path of the connector + ?assertMatch( + {ok, 200, #{ + <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"name">> := ConnectorName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + }}, + request_json( + put, + uri(["connectors", ConnectorID]), + ?KAFKA_CONNECTOR(?CONNECTOR_NAME, BootstrapHosts), + Config + ) + ), + + %% list all connectors, assert Connector is in it + ?assertMatch( + {ok, 200, [ + #{ + <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"name">> := ConnectorName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + } + ]}, + request_json(get, uri(["connectors"]), Config) + ), + + %% get the connector by id + ?assertMatch( + {ok, 200, #{ + <<"type">> := ?CONNECTOR_TYPE_KAFKA, + <<"name">> := ConnectorName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + }}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + + %% Test bad updates + %% ================ + + %% Add connector with a name that is too long + %% We only support connector names up to 255 characters + %% LongName = list_to_binary(lists:duplicate(256, $a)), + %% NameTooLongRequestResult = request_json( + %% post, + %% uri(["connectors"]), + %% ?HTTP_CONNECTOR(URL1, LongName), + %% Config + %% ), + %% ?assertMatch( + %% {ok, 400, _}, + %% NameTooLongRequestResult + %% ), + %% {ok, 400, #{<<"message">> := NameTooLongMessage}} = NameTooLongRequestResult, + %% %% Use regex to check that the message contains the name + %% Match = re:run(NameTooLongMessage, LongName), + %% ?assertMatch({match, _}, Match), + %% %% Add connector without the URL field + %% {ok, 400, PutFail1} = request_json( + %% put, + %% uri(["connectors", ConnectorID]), + %% maps:remove(<<"url">>, ?HTTP_CONNECTOR(URL2, Name)), + %% Config + %% ), + %% ?assertMatch( + %% #{<<"reason">> := <<"required_field">>}, + %% json(maps:get(<<"message">>, PutFail1)) + %% ), + %% {ok, 400, PutFail2} = request_json( + %% put, + %% uri(["connectors", ConnectorID]), + %% maps:put(<<"curl">>, URL2, maps:remove(<<"url">>, ?HTTP_CONNECTOR(URL2, Name))), + %% Config + %% ), + %% ?assertMatch( + %% #{ + %% <<"reason">> := <<"unknown_fields">>, + %% <<"unknown">> := <<"curl">> + %% }, + %% json(maps:get(<<"message">>, PutFail2)) + %% ), + %% {ok, 400, _} = request_json( + %% put, + %% uri(["connectors", ConnectorID]), + %% ?HTTP_CONNECTOR(<<"localhost:1234/foo">>, Name), + %% Config + %% ), + %% {ok, 400, _} = request_json( + %% put, + %% uri(["connectors", ConnectorID]), + %% ?HTTP_CONNECTOR(<<"htpp://localhost:12341234/foo">>, Name), + %% Config + %% ), + + %% delete the connector + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnectorID]), Config), + {ok, 200, []} = request_json(get, uri(["connectors"]), Config), + + %% update a deleted connector returns an error + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := _ + }}, + request_json( + put, + uri(["connectors", ConnectorID]), + ?KAFKA_CONNECTOR(?CONNECTOR_NAME, BootstrapHosts), + Config + ) + ), + + %% try delete bad connector id + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Invalid connector ID", _/binary>> + }}, + request_json(delete, uri(["connectors", "foo"]), Config) + ), + + %% Deleting a non-existing connector should result in an error + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := _ + }}, + request_json(delete, uri(["connectors", ConnectorID]), Config) + ), + + %% Create non working connector + %% BrokenURL = ?URL(Port + 1, "/foo"), + %% {ok, 201, BrokenConnector} = request( + %% post, + %% uri(["connectors"]), + %% ?HTTP_CONNECTOR(BrokenURL, Name), + %% fun json/1, + %% Config + %% ), + %% ?assertMatch( + %% #{ + %% <<"type">> := ?CONNECTOR_TYPE_HTTP, + %% <<"name">> := Name, + %% <<"enable">> := true, + %% <<"status">> := <<"disconnected">>, + %% <<"status_reason">> := <<"Connection refused">>, + %% <<"node_status">> := [ + %% #{ + %% <<"status">> := <<"disconnected">>, + %% <<"status_reason">> := <<"Connection refused">> + %% } + %% | _ + %% ], + %% <<"url">> := BrokenURL + %% }, + %% BrokenConnector + %% ), + + %% {ok, 200, FixedConnector} = request_json( + %% put, + %% uri(["connectors", ConnectorID]), + %% ?HTTP_CONNECTOR(URL1), + %% Config + %% ), + %% ?assertMatch( + %% #{ + %% <<"status">> := <<"connected">>, + %% <<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _] + %% } when + %% not is_map_key(<<"status_reason">>, FixedConnector) andalso + %% not is_map_key(<<"status_reason">>, FixedNodeStatus), + %% FixedConnector + %% ), + + %% %% Try create connector with bad characters as name + %% {ok, 400, _} = request(post, uri(["connectors"]), ?HTTP_CONNECTOR(URL1, <<"隋达"/utf8>>), Config), + + %% %% Missing scheme in URL + %% {ok, 400, _} = request( + %% post, + %% uri(["connectors"]), + %% ?HTTP_CONNECTOR(<<"localhost:1234/foo">>, <<"missing_url_scheme">>), + %% Config + %% ), + + %% %% Invalid port + %% {ok, 400, _} = request( + %% post, + %% uri(["connectors"]), + %% ?HTTP_CONNECTOR(<<"http://localhost:12341234/foo">>, <<"invalid_port">>), + %% Config + %% ), + + %% {ok, 204, <<>>} = request(delete, uri(["connectors", ConnectorID]), Config) + ok. + +%% t_start_bridge_unknown_node(Config) -> +%% {ok, 404, _} = +%% request( +%% post, +%% uri(["nodes", "thisbetterbenotanatomyet", "bridges", "webhook:foo", start]), +%% Config +%% ), +%% {ok, 404, _} = +%% request( +%% post, +%% uri(["nodes", "undefined", "bridges", "webhook:foo", start]), +%% Config +%% ). + +%% t_start_stop_bridges_node(Config) -> +%% do_start_stop_bridges(node, Config). + +%% t_start_stop_bridges_cluster(Config) -> +%% do_start_stop_bridges(cluster, Config). + +%% do_start_stop_bridges(Type, Config) -> +%% %% assert we there's no bridges at first +%% {ok, 200, []} = request_json(get, uri(["bridges"]), Config), + +%% Port = ?config(port, Config), +%% URL1 = ?URL(Port, "abc"), +%% Name = atom_to_binary(Type), +%% ?assertMatch( +%% {ok, 201, #{ +%% <<"type">> := ?BRIDGE_TYPE_HTTP, +%% <<"name">> := Name, +%% <<"enable">> := true, +%% <<"status">> := <<"connected">>, +%% <<"node_status">> := [_ | _], +%% <<"url">> := URL1 +%% }}, +%% request_json( +%% post, +%% uri(["bridges"]), +%% ?HTTP_BRIDGE(URL1, Name), +%% Config +%% ) +%% ), + +%% BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), +%% ExpectedStatus = +%% case ?config(group, Config) of +%% cluster when Type == node -> +%% <<"inconsistent">>; +%% _ -> +%% <<"stopped">> +%% end, + +%% %% stop it +%% {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := ExpectedStatus}}, +%% request_json(get, uri(["bridges", BridgeID]), Config) +%% ), +%% %% start again +%% {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri(["bridges", BridgeID]), Config) +%% ), +%% %% start a started bridge +%% {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri(["bridges", BridgeID]), Config) +%% ), +%% %% restart an already started bridge +%% {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri(["bridges", BridgeID]), Config) +%% ), +%% %% stop it again +%% {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config), +%% %% restart a stopped bridge +%% {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri(["bridges", BridgeID]), Config) +%% ), + +%% {ok, 404, _} = request(post, {operation, Type, invalidop, BridgeID}, Config), + +%% %% delete the bridge +%% {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), +%% {ok, 200, []} = request_json(get, uri(["bridges"]), Config), + +%% %% Fail parse-id check +%% {ok, 404, _} = request(post, {operation, Type, start, <<"wreckbook_fugazi">>}, Config), +%% %% Looks ok but doesn't exist +%% {ok, 404, _} = request(post, {operation, Type, start, <<"webhook:cptn_hook">>}, Config), + +%% %% Create broken bridge +%% {ListenPort, Sock} = listen_on_random_port(), +%% %% Connecting to this endpoint should always timeout +%% BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])), +%% BadName = <<"bad_", (atom_to_binary(Type))/binary>>, +%% ?assertMatch( +%% {ok, 201, #{ +%% <<"type">> := ?BRIDGE_TYPE_MQTT, +%% <<"name">> := BadName, +%% <<"enable">> := true, +%% <<"server">> := BadServer, +%% <<"status">> := <<"connecting">>, +%% <<"node_status">> := [_ | _] +%% }}, +%% request_json( +%% post, +%% uri(["bridges"]), +%% ?MQTT_BRIDGE(BadServer, BadName), +%% Config +%% ) +%% ), +%% BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), +%% ?assertMatch( +%% %% request from product: return 400 on such errors +%% {ok, SC, _} when SC == 500 orelse SC == 400, +%% request(post, {operation, Type, start, BadBridgeID}, Config) +%% ), +%% ok = gen_tcp:close(Sock), +%% ok. + +%% t_start_stop_inconsistent_bridge_node(Config) -> +%% start_stop_inconsistent_bridge(node, Config). + +%% t_start_stop_inconsistent_bridge_cluster(Config) -> +%% start_stop_inconsistent_bridge(cluster, Config). + +%% start_stop_inconsistent_bridge(Type, Config) -> +%% Port = ?config(port, Config), +%% URL = ?URL(Port, "abc"), +%% Node = ?config(node, Config), + +%% erpc:call(Node, fun() -> +%% meck:new(emqx_bridge_resource, [passthrough, no_link]), +%% meck:expect( +%% emqx_bridge_resource, +%% stop, +%% fun +%% (_, <<"bridge_not_found">>) -> {error, not_found}; +%% (BridgeType, Name) -> meck:passthrough([BridgeType, Name]) +%% end +%% ) +%% end), + +%% emqx_common_test_helpers:on_exit(fun() -> +%% erpc:call(Node, fun() -> +%% meck:unload([emqx_bridge_resource]) +%% end) +%% end), + +%% {ok, 201, _Bridge} = request( +%% post, +%% uri(["bridges"]), +%% ?HTTP_BRIDGE(URL, <<"bridge_not_found">>), +%% Config +%% ), +%% {ok, 503, _} = request( +%% post, {operation, Type, stop, <<"webhook:bridge_not_found">>}, Config +%% ). + +%% t_enable_disable_bridges(Config) -> +%% %% assert we there's no bridges at first +%% {ok, 200, []} = request_json(get, uri(["bridges"]), Config), + +%% Name = ?BRIDGE_NAME, +%% Port = ?config(port, Config), +%% URL1 = ?URL(Port, "abc"), +%% ?assertMatch( +%% {ok, 201, #{ +%% <<"type">> := ?BRIDGE_TYPE_HTTP, +%% <<"name">> := Name, +%% <<"enable">> := true, +%% <<"status">> := <<"connected">>, +%% <<"node_status">> := [_ | _], +%% <<"url">> := URL1 +%% }}, +%% request_json( +%% post, +%% uri(["bridges"]), +%% ?HTTP_BRIDGE(URL1, Name), +%% Config +%% ) +%% ), +%% BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), +%% %% disable it +%% {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"stopped">>}}, +%% request_json(get, uri(["bridges", BridgeID]), Config) +%% ), +%% %% enable again +%% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri(["bridges", BridgeID]), Config) +%% ), +%% %% enable an already started bridge +%% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri(["bridges", BridgeID]), Config) +%% ), +%% %% disable it again +%% {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), + +%% %% bad param +%% {ok, 404, _} = request(put, enable_path(foo, BridgeID), Config), +%% {ok, 404, _} = request(put, enable_path(true, "foo"), Config), +%% {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config), + +%% {ok, 400, Res} = request(post, {operation, node, start, BridgeID}, <<>>, fun json/1, Config), +%% ?assertEqual( +%% #{ +%% <<"code">> => <<"BAD_REQUEST">>, +%% <<"message">> => <<"Forbidden operation, bridge not enabled">> +%% }, +%% Res +%% ), +%% {ok, 400, Res} = request(post, {operation, cluster, start, BridgeID}, <<>>, fun json/1, Config), + +%% %% enable a stopped bridge +%% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri(["bridges", BridgeID]), Config) +%% ), +%% %% delete the bridge +%% {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), +%% {ok, 200, []} = request_json(get, uri(["bridges"]), Config). + +%% t_with_redact_update(Config) -> +%% Name = <<"redact_update">>, +%% Type = <<"mqtt">>, +%% Password = <<"123456">>, +%% Template = #{ +%% <<"type">> => Type, +%% <<"name">> => Name, +%% <<"server">> => <<"127.0.0.1:1883">>, +%% <<"username">> => <<"test">>, +%% <<"password">> => Password, +%% <<"ingress">> => +%% #{<<"remote">> => #{<<"topic">> => <<"t/#">>}} +%% }, + +%% {ok, 201, _} = request( +%% post, +%% uri(["bridges"]), +%% Template, +%% Config +%% ), + +%% %% update with redacted config +%% BridgeConf = emqx_utils:redact(Template), +%% BridgeID = emqx_bridge_resource:bridge_id(Type, Name), +%% {ok, 200, _} = request(put, uri(["bridges", BridgeID]), BridgeConf, Config), +%% ?assertEqual( +%% Password, +%% get_raw_config([bridges, Type, Name, password], Config) +%% ), +%% ok. + +%% t_bridges_probe(Config) -> +%% Port = ?config(port, Config), +%% URL = ?URL(Port, "some_path"), + +%% {ok, 204, <<>>} = request( +%% post, +%% uri(["bridges_probe"]), +%% ?HTTP_BRIDGE(URL), +%% Config +%% ), + +%% %% second time with same name is ok since no real bridge created +%% {ok, 204, <<>>} = request( +%% post, +%% uri(["bridges_probe"]), +%% ?HTTP_BRIDGE(URL), +%% Config +%% ), + +%% ?assertMatch( +%% {ok, 400, #{ +%% <<"code">> := <<"TEST_FAILED">>, +%% <<"message">> := _ +%% }}, +%% request_json( +%% post, +%% uri(["bridges_probe"]), +%% ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>), +%% Config +%% ) +%% ), + +%% %% Missing scheme in URL +%% ?assertMatch( +%% {ok, 400, #{ +%% <<"code">> := <<"TEST_FAILED">>, +%% <<"message">> := _ +%% }}, +%% request_json( +%% post, +%% uri(["bridges_probe"]), +%% ?HTTP_BRIDGE(<<"203.0.113.3:1234/foo">>), +%% Config +%% ) +%% ), + +%% %% Invalid port +%% ?assertMatch( +%% {ok, 400, #{ +%% <<"code">> := <<"TEST_FAILED">>, +%% <<"message">> := _ +%% }}, +%% request_json( +%% post, +%% uri(["bridges_probe"]), +%% ?HTTP_BRIDGE(<<"http://203.0.113.3:12341234/foo">>), +%% Config +%% ) +%% ), + +%% {ok, 204, _} = request( +%% post, +%% uri(["bridges_probe"]), +%% ?MQTT_BRIDGE(<<"127.0.0.1:1883">>), +%% Config +%% ), + +%% ?assertMatch( +%% {ok, 400, #{ +%% <<"code">> := <<"TEST_FAILED">>, +%% <<"message">> := <<"Connection refused">> +%% }}, +%% request_json( +%% post, +%% uri(["bridges_probe"]), +%% ?MQTT_BRIDGE(<<"127.0.0.1:2883">>), +%% Config +%% ) +%% ), + +%% ?assertMatch( +%% {ok, 400, #{ +%% <<"code">> := <<"TEST_FAILED">>, +%% <<"message">> := <<"Could not resolve host">> +%% }}, +%% request_json( +%% post, +%% uri(["bridges_probe"]), +%% ?MQTT_BRIDGE(<<"nohost:2883">>), +%% Config +%% ) +%% ), + +%% AuthnConfig = #{ +%% <<"mechanism">> => <<"password_based">>, +%% <<"backend">> => <<"built_in_database">>, +%% <<"user_id_type">> => <<"username">> +%% }, +%% Chain = 'mqtt:global', +%% {ok, _} = update_config( +%% [authentication], +%% {create_authenticator, Chain, AuthnConfig}, +%% Config +%% ), +%% User = #{user_id => <<"u">>, password => <<"p">>}, +%% AuthenticatorID = <<"password_based:built_in_database">>, +%% {ok, _} = add_user_auth( +%% Chain, +%% AuthenticatorID, +%% User, +%% Config +%% ), + +%% emqx_common_test_helpers:on_exit(fun() -> +%% delete_user_auth(Chain, AuthenticatorID, User, Config) +%% end), + +%% ?assertMatch( +%% {ok, 400, #{ +%% <<"code">> := <<"TEST_FAILED">>, +%% <<"message">> := <<"Unauthorized client">> +%% }}, +%% request_json( +%% post, +%% uri(["bridges_probe"]), +%% ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{<<"proto_ver">> => <<"v4">>}, +%% Config +%% ) +%% ), + +%% ?assertMatch( +%% {ok, 400, #{ +%% <<"code">> := <<"TEST_FAILED">>, +%% <<"message">> := <<"Bad username or password">> +%% }}, +%% request_json( +%% post, +%% uri(["bridges_probe"]), +%% ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{ +%% <<"proto_ver">> => <<"v4">>, +%% <<"password">> => <<"mySecret">>, +%% <<"username">> => <<"u">> +%% }, +%% Config +%% ) +%% ), + +%% ?assertMatch( +%% {ok, 400, #{ +%% <<"code">> := <<"TEST_FAILED">>, +%% <<"message">> := <<"Not authorized">> +%% }}, +%% request_json( +%% post, +%% uri(["bridges_probe"]), +%% ?MQTT_BRIDGE(<<"127.0.0.1:1883">>), +%% Config +%% ) +%% ), + +%% ?assertMatch( +%% {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, +%% request_json( +%% post, +%% uri(["bridges_probe"]), +%% ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>), +%% Config +%% ) +%% ), +%% ok. + +%%% helpers +listen_on_random_port() -> + SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], + case gen_tcp:listen(0, SockOpts) of + {ok, Sock} -> + {ok, Port} = inet:port(Sock), + {Port, Sock}; + {error, Reason} when Reason /= eaddrinuse -> + {error, Reason} + end. + +request(Method, URL, Config) -> + request(Method, URL, [], Config). + +request(Method, {operation, Type, Op, BridgeID}, Body, Config) -> + URL = operation_path(Type, Op, BridgeID, Config), + request(Method, URL, Body, Config); +request(Method, URL, Body, Config) -> + AuthHeader = emqx_common_test_http:auth_header(?config(api, Config)), + Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, + emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts). + +request(Method, URL, Body, Decoder, Config) -> + case request(Method, URL, Body, Config) of + {ok, Code, Response} -> + {ok, Code, Decoder(Response)}; + Otherwise -> + Otherwise + end. + +request_json(Method, URLLike, Config) -> + request(Method, URLLike, [], fun json/1, Config). + +request_json(Method, URLLike, Body, Config) -> + request(Method, URLLike, Body, fun json/1, Config). + +operation_path(node, Oper, BridgeID, Config) -> + uri(["nodes", ?config(node, Config), "bridges", BridgeID, Oper]); +operation_path(cluster, Oper, BridgeID, _Config) -> + uri(["bridges", BridgeID, Oper]). + +enable_path(Enable, BridgeID) -> + uri(["bridges", BridgeID, "enable", Enable]). + +publish_message(Topic, Body, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]). + +update_config(Path, Value, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx, update_config, [Path, Value]). + +get_raw_config(Path, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx, get_raw_config, [Path]). + +add_user_auth(Chain, AuthenticatorID, User, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx_authentication, add_user, [Chain, AuthenticatorID, User]). + +delete_user_auth(Chain, AuthenticatorID, User, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx_authentication, delete_user, [Chain, AuthenticatorID, User]). + +str(S) when is_list(S) -> S; +str(S) when is_binary(S) -> binary_to_list(S). + +json(B) when is_binary(B) -> + emqx_utils_json:decode(B, [return_maps]).