%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_bridge_api_SUITE). -compile(nowarn_export_all). -compile(export_all). -import(emqx_mgmt_api_test_util, [uri/1]). -import(emqx_common_test_helpers, [on_exit/1]). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/test_macros.hrl"). -define(BRIDGE_TYPE_HTTP, <<"webhook">>). -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). -define(URL(PORT, PATH), list_to_binary( io_lib:format( "http://localhost:~s/~s", [integer_to_list(PORT), PATH] ) ) ). -define(BRIDGE(NAME, TYPE), #{ <<"ssl">> => #{<<"enable">> => false}, <<"type">> => TYPE, <<"name">> => NAME }). -define(BRIDGE_TYPE_MQTT, <<"mqtt">>). -define(MQTT_BRIDGE(SERVER, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_MQTT)#{ <<"server">> => SERVER, <<"username">> => <<"user1">>, <<"password">> => <<"">>, <<"proto_ver">> => <<"v5">>, <<"egress">> => #{ <<"remote">> => #{ <<"topic">> => <<"emqx/${topic}">>, <<"qos">> => <<"${qos}">>, <<"retain">> => false } } }). -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)). -define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{ <<"url">> => URL, <<"local_topic">> => <<"emqx_webhook/#">>, <<"method">> => <<"post">>, <<"body">> => <<"${payload}">>, <<"headers">> => #{ % NOTE % The Pascal-Case is important here. % The reason is kinda ridiculous: `emqx_bridge_resource:create_dry_run/2` converts % bridge config keys into atoms, and the atom 'Content-Type' exists in the ERTS % when this happens (while the 'content-type' does not). <<"Content-Type">> => <<"application/json">> } }). -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)). -define(APPSPECS, [ emqx, emqx_conf, emqx_auth, emqx_auth_mnesia, emqx_management, emqx_connector, emqx_bridge_http, {emqx_bridge, "actions {}\n bridges {}"}, {emqx_rule_engine, "rule_engine { rules {} }"} ]). -define(APPSPEC_DASHBOARD, {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} ). all() -> [ {group, single}, {group, cluster_later_join}, {group, cluster} ]. groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), SingleOnlyTests = [ t_broken_bpapi_vsn, t_old_bpapi_vsn, t_bridges_probe ], ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics], [ {single, [], AllTCs -- ClusterLaterJoinOnlyTCs}, {cluster_later_join, [], ClusterLaterJoinOnlyTCs}, {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs} ]. suite() -> [{timetrap, {seconds, 120}}]. init_per_suite(Config) -> Config. end_per_suite(_Config) -> ok. init_per_group(cluster = Name, Config) -> Nodes = [NodePrimary | _] = mk_cluster(Name, Config), init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); init_per_group(cluster_later_join = Name, Config) -> Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); init_per_group(_Name, Config) -> WorkDir = emqx_cth_suite:work_dir(Config), Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]). init_api(Config) -> APINode = ?config(node, Config), {ok, App} = erpc:call(APINode, emqx_common_test_http, create_default_app, []), [{api, App} | Config]. mk_cluster(Name, Config) -> mk_cluster(Name, Config, #{}). mk_cluster(Name, Config, Opts) -> Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD], Node2Apps = ?APPSPECS, emqx_cth_cluster:start( [ {emqx_bridge_api_SUITE1, Opts#{role => core, apps => Node1Apps}}, {emqx_bridge_api_SUITE2, Opts#{role => core, apps => Node2Apps}} ], #{work_dir => emqx_cth_suite:work_dir(Name, Config)} ). end_per_group(Group, Config) when Group =:= cluster; Group =:= cluster_later_join -> ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)); end_per_group(_, Config) -> emqx_cth_suite:stop(?config(group_apps, Config)), ok. init_per_testcase(t_broken_bpapi_vsn, Config) -> meck:new(emqx_bpapi, [passthrough]), meck:expect(emqx_bpapi, supported_version, 2, -1), meck:new(emqx_bridge_api, [passthrough]), meck:expect(emqx_bridge_api, supported_versions, 1, []), init_per_testcase(common, Config); init_per_testcase(t_old_bpapi_vsn, Config) -> meck:new(emqx_bpapi, [passthrough]), meck:expect(emqx_bpapi, supported_version, 1, 1), meck:expect(emqx_bpapi, supported_version, 2, 1), init_per_testcase(common, Config); init_per_testcase(_, Config) -> {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. end_per_testcase(t_broken_bpapi_vsn, Config) -> meck:unload(), end_per_testcase(common, Config); end_per_testcase(t_old_bpapi_vsn, Config) -> meck:unload(), end_per_testcase(common, Config); end_per_testcase(_, Config) -> Sock = ?config(sock, Config), Acceptor = ?config(acceptor, Config), Node = ?config(node, Config), ok = emqx_common_test_helpers:call_janitor(), ok = stop_http_server(Sock, Acceptor), ok = erpc:call(Node, fun clear_resources/0), ok. clear_resources() -> emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), lists:foreach( fun(#{type := Type, name := Name}) -> ok = emqx_bridge:remove(Type, Name) end, emqx_bridge:list() ). %%------------------------------------------------------------------------------ %% HTTP server for testing %%------------------------------------------------------------------------------ start_http_server(HandleFun) -> process_flag(trap_exit, true), Parent = self(), {Port, Sock} = listen_on_random_port(), Acceptor = spawn_link(fun() -> accept_loop(Sock, HandleFun, Parent) end), timer:sleep(100), {Port, Sock, Acceptor}. stop_http_server(Sock, Acceptor) -> exit(Acceptor, kill), gen_tcp:close(Sock). listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], case gen_tcp:listen(0, SockOpts) of {ok, Sock} -> {ok, Port} = inet:port(Sock), {Port, Sock}; {error, Reason} when Reason /= eaddrinuse -> {error, Reason} end. accept_loop(Sock, HandleFun, Parent) -> process_flag(trap_exit, true), {ok, Conn} = gen_tcp:accept(Sock), Handler = spawn_link(fun() -> HandleFun(Conn, Parent) end), gen_tcp:controlling_process(Conn, Handler), accept_loop(Sock, HandleFun, Parent). make_response(CodeStr, Str) -> B = iolist_to_binary(Str), iolist_to_binary( io_lib:fwrite( "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s", [CodeStr, size(B), B] ) ). handle_fun_200_ok(Conn, Parent) -> case gen_tcp:recv(Conn, 0) of {ok, ReqStr} -> ct:pal("the http handler got request: ~p", [ReqStr]), Req = parse_http_request(ReqStr), Parent ! {http_server, received, Req}, gen_tcp:send(Conn, make_response("200 OK", "Request OK")), handle_fun_200_ok(Conn, Parent); {error, Reason} -> ct:pal("the http handler recv error: ~p", [Reason]), timer:sleep(100), gen_tcp:close(Conn) end. parse_http_request(ReqStr0) -> [Method, ReqStr1] = string:split(ReqStr0, " ", leading), [Path, ReqStr2] = string:split(ReqStr1, " ", leading), [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading), #{method => Method, path => Path, body => Body}. %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_http_crud_apis(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first {ok, 200, []} = request_json(get, uri(["bridges"]), Config), {ok, 404, _} = request(get, uri(["bridges", "foo"]), Config), {ok, 404, _} = request(get, uri(["bridges", "webhook:foo"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), Name = ?BRIDGE_NAME, ?assertMatch( {ok, 201, #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL1 }}, request_json( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ) ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, _ = publish_message(<<"emqx_webhook/1">>, Body, Config), ?assert( receive {http_server, received, #{ method := <<"POST">>, path := <<"/path1">>, body := Body }} -> true; Msg -> ct:pal("error: http got unexpected request: ~p", [Msg]), false after 100 -> false end ), %% update the request-path of the bridge URL2 = ?URL(Port, "path2"), ?assertMatch( {ok, 200, #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL2 }}, request_json( put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(URL2, Name), Config ) ), %% list all bridges again, assert Bridge2 is in it ?assertMatch( {ok, 200, [ #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL2 } ]}, request_json(get, uri(["bridges"]), Config) ), %% get the bridge by id ?assertMatch( {ok, 200, #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL2 }}, request_json(get, uri(["bridges", BridgeID]), Config) ), %% send an message to emqx again, check the path has been changed _ = publish_message(<<"emqx_webhook/1">>, Body, Config), ?assert( receive {http_server, received, #{path := <<"/path2">>}} -> true; Msg2 -> ct:pal("error: http got unexpected request: ~p", [Msg2]), false after 100 -> false end ), %% Test bad updates %% ================ %% Add bridge with a name that is too long %% We only support bridge names up to 255 characters LongName = list_to_binary(lists:duplicate(256, $a)), NameTooLongRequestResult = request_json( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, LongName), Config ), ?assertMatch( {ok, 400, _}, NameTooLongRequestResult ), {ok, 400, #{<<"message">> := NameTooLongMessage}} = NameTooLongRequestResult, %% Use regex to check that the message contains the name Match = re:run(NameTooLongMessage, LongName), ?assertMatch({match, _}, Match), %% Add bridge without the URL field {ok, 400, PutFail1} = request_json( put, uri(["bridges", BridgeID]), maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name)), Config ), ?assertMatch( #{<<"reason">> := <<"required_field">>}, json(maps:get(<<"message">>, PutFail1)) ), {ok, 400, PutFail2} = request_json( put, uri(["bridges", BridgeID]), maps:put(<<"curl">>, URL2, maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name))), Config ), ?assertMatch( #{<<"reason">> := <<"required_field">>}, json(maps:get(<<"message">>, PutFail2)) ), {ok, 400, _} = request_json( put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(<<"localhost:1234/foo">>, Name), Config ), {ok, 400, PutFail3} = request_json( put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(<<"htpp://localhost:12341234/foo">>, Name), Config ), ?assertMatch( #{<<"kind">> := <<"validation_error">>}, json(maps:get(<<"message">>, PutFail3)) ), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% update a deleted bridge returns an error ?assertMatch( {ok, 404, #{ <<"code">> := <<"NOT_FOUND">>, <<"message">> := _ }}, request_json( put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(URL2, Name), Config ) ), %% try delete bad bridge id ?assertMatch( {ok, 404, #{ <<"code">> := <<"NOT_FOUND">>, <<"message">> := <<"Invalid bridge ID", _/binary>> }}, request_json(delete, uri(["bridges", "foo"]), Config) ), %% Deleting a non-existing bridge should result in an error ?assertMatch( {ok, 404, #{ <<"code">> := <<"NOT_FOUND">>, <<"message">> := _ }}, request_json(delete, uri(["bridges", BridgeID]), Config) ), %% Create non working bridge BrokenURL = ?URL(Port + 1, "foo"), {ok, 201, BrokenBridge} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(BrokenURL, Name), fun json/1, Config ), ?assertMatch( #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := <<"disconnected">>, <<"status_reason">> := <<"Connection refused">>, <<"node_status">> := [ #{ <<"status">> := <<"disconnected">>, <<"status_reason">> := <<"Connection refused">> } | _ ], <<"url">> := BrokenURL }, BrokenBridge ), {ok, 200, FixedBridge} = request_json( put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(URL1), Config ), ?assertMatch( #{ <<"status">> := <<"connected">>, <<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _] } when not is_map_key(<<"status_reason">>, FixedBridge) andalso not is_map_key(<<"status_reason">>, FixedNodeStatus), FixedBridge ), %% Try create bridge with bad characters as name {ok, 400, _} = request(post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, <<"隋达"/utf8>>), Config), %% Missing scheme in URL {ok, 400, _} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(<<"localhost:1234/foo">>, <<"missing_url_scheme">>), Config ), %% Invalid port {ok, 400, _} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(<<"http://localhost:12341234/foo">>, <<"invalid_port">>), Config ), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config). t_http_bridges_local_topic(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), Name1 = <<"t_http_bridges_with_local_topic1">>, Name2 = <<"t_http_bridges_without_local_topic1">>, %% create one http bridge with local_topic {ok, 201, _} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name1), Config ), %% and we create another one without local_topic {ok, 201, _} = request( post, uri(["bridges"]), maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, Name2)), Config ), BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name1), BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name2), %% Send an message to emqx and the message should be forwarded to the HTTP server. %% This is to verify we can have 2 bridges with and without local_topic fields %% at the same time. Body = <<"my msg">>, _ = publish_message(<<"emqx_webhook/1">>, Body, Config), ?assert( receive {http_server, received, #{ method := <<"POST">>, path := <<"/path1">>, body := Body }} -> true; Msg -> ct:pal("error: http got unexpected request: ~p", [Msg]), false after 100 -> false end ), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID1]), Config), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID2]), Config). t_check_dependent_actions_on_delete(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), Name = <<"t_http_crud_apis">>, BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), {ok, 201, _} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ), {ok, 201, #{<<"id">> := RuleId}} = request_json( post, uri(["rules"]), #{ <<"name">> => <<"t_http_crud_apis">>, <<"enable">> => true, <<"actions">> => [BridgeID], <<"sql">> => <<"SELECT * from \"t\"">> }, Config ), %% deleting the bridge should fail because there is a rule that depends on it {ok, 400, Body} = request( delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=false", Config ), ?assertMatch(#{<<"rules">> := [_ | _]}, emqx_utils_json:decode(Body, [return_maps])), %% delete the rule first {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config), %% then delete the bridge is OK {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), {ok, 200, []} = request_json(get, uri(["bridges"]), Config). t_cascade_delete_actions(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), Name = <<"t_http_crud_apis">>, BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), {ok, 201, _} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ), {ok, 201, #{<<"id">> := RuleId}} = request_json( post, uri(["rules"]), #{ <<"name">> => <<"t_http_crud_apis">>, <<"enable">> => true, <<"actions">> => [BridgeID], <<"sql">> => <<"SELECT * from \"t\"">> }, Config ), %% delete the bridge will also delete the actions from the rules {ok, 204, _} = request( delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=true", Config ), {ok, 200, []} = request_json(get, uri(["bridges"]), Config), ?assertMatch( {ok, 200, #{<<"actions">> := []}}, request_json(get, uri(["rules", RuleId]), Config) ), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config), {ok, 201, _} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ), {ok, 201, _} = request( post, uri(["rules"]), #{ <<"name">> => <<"t_http_crud_apis">>, <<"enable">> => true, <<"actions">> => [BridgeID], <<"sql">> => <<"SELECT * from \"t\"">> }, Config ), {ok, 204, _} = request( delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", Config ), {ok, 200, []} = request_json(get, uri(["bridges"]), Config). t_broken_bpapi_vsn(Config) -> Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), Name = <<"t_bad_bpapi_vsn">>, {ok, 201, _Bridge} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% still works since we redirect to 'restart' {ok, 501, <<>>} = request(post, {operation, cluster, start, BridgeID}, Config), {ok, 501, <<>>} = request(post, {operation, node, start, BridgeID}, Config), ok. t_old_bpapi_vsn(Config) -> Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), Name = <<"t_bad_bpapi_vsn">>, {ok, 201, _Bridge} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), {ok, 204, <<>>} = request(post, {operation, cluster, stop, BridgeID}, Config), {ok, 204, <<>>} = request(post, {operation, node, stop, BridgeID}, Config), %% still works since we redirect to 'restart' {ok, 204, <<>>} = request(post, {operation, cluster, start, BridgeID}, Config), {ok, 204, <<>>} = request(post, {operation, node, start, BridgeID}, Config), {ok, 204, <<>>} = request(post, {operation, cluster, restart, BridgeID}, Config), {ok, 204, <<>>} = request(post, {operation, node, restart, BridgeID}, Config), ok. t_start_bridge_unknown_node(Config) -> {ok, 404, _} = request( post, uri(["nodes", "thisbetterbenotanatomyet", "bridges", "webhook:foo", start]), Config ), {ok, 404, _} = request( post, uri(["nodes", "undefined", "bridges", "webhook:foo", start]), Config ). t_start_stop_bridges_node(Config) -> do_start_stop_bridges(node, Config). t_start_stop_bridges_cluster(Config) -> do_start_stop_bridges(cluster, Config). do_start_stop_bridges(Type, Config) -> %% assert we there's no bridges at first {ok, 200, []} = request_json(get, uri(["bridges"]), Config), Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), Name = atom_to_binary(Type), ?assertMatch( {ok, 201, #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], <<"url">> := URL1 }}, request_json( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ) ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), ExpectedStatus = case ?config(group, Config) of cluster when Type == node -> <<"inconsistent">>; _ -> <<"stopped">> end, %% stop it {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config), ?assertMatch( {ok, 200, #{<<"status">> := ExpectedStatus}}, request_json(get, uri(["bridges", BridgeID]), Config) ), %% start again {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config), ?assertMatch( {ok, 200, #{<<"status">> := <<"connected">>}}, request_json(get, uri(["bridges", BridgeID]), Config) ), %% start a started bridge {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config), ?assertMatch( {ok, 200, #{<<"status">> := <<"connected">>}}, request_json(get, uri(["bridges", BridgeID]), Config) ), %% restart an already started bridge {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config), ?assertMatch( {ok, 200, #{<<"status">> := <<"connected">>}}, request_json(get, uri(["bridges", BridgeID]), Config) ), %% stop it again {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config), %% restart a stopped bridge {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config), ?assertMatch( {ok, 200, #{<<"status">> := <<"connected">>}}, request_json(get, uri(["bridges", BridgeID]), Config) ), {ok, 404, _} = request(post, {operation, Type, invalidop, BridgeID}, Config), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% Fail parse-id check {ok, 404, _} = request(post, {operation, Type, start, <<"wreckbook_fugazi">>}, Config), %% Looks ok but doesn't exist {ok, 404, _} = request(post, {operation, Type, start, <<"webhook:cptn_hook">>}, Config), %% Create broken bridge {ListenPort, Sock} = listen_on_random_port(), %% Connecting to this endpoint should always timeout BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])), BadName = <<"bad_", (atom_to_binary(Type))/binary>>, ?assertMatch( {ok, 201, #{ <<"type">> := ?BRIDGE_TYPE_MQTT, <<"name">> := BadName, <<"enable">> := true, <<"server">> := BadServer, <<"status">> := <<"connecting">>, <<"node_status">> := [_ | _] }}, request_json( post, uri(["bridges"]), ?MQTT_BRIDGE(BadServer, BadName), Config ) ), BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), ?assertMatch( %% request from product: return 400 on such errors {ok, SC, _} when SC == 500 orelse SC == 400, request(post, {operation, Type, start, BadBridgeID}, Config) ), ok = gen_tcp:close(Sock), ok. t_start_stop_inconsistent_bridge_node(Config) -> start_stop_inconsistent_bridge(node, Config). t_start_stop_inconsistent_bridge_cluster(Config) -> start_stop_inconsistent_bridge(cluster, Config). start_stop_inconsistent_bridge(Type, Config) -> Port = ?config(port, Config), URL = ?URL(Port, "abc"), Node = ?config(node, Config), erpc:call(Node, fun() -> meck:new(emqx_bridge_resource, [passthrough, no_link]), meck:expect( emqx_bridge_resource, stop, fun (_, <<"bridge_not_found">>) -> {error, not_found}; (BridgeType, Name) -> meck:passthrough([BridgeType, Name]) end ) end), on_exit(fun() -> erpc:call(Node, fun() -> meck:unload([emqx_bridge_resource]) end) end), {ok, 201, _Bridge} = request( post, uri(["bridges"]), ?HTTP_BRIDGE(URL, <<"bridge_not_found">>), Config ), {ok, 503, _} = request( post, {operation, Type, stop, <<"webhook:bridge_not_found">>}, Config ). t_enable_disable_bridges(Config) -> %% assert we there's no bridges at first {ok, 200, []} = request_json(get, uri(["bridges"]), Config), Name = ?BRIDGE_NAME, Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), ?assertMatch( {ok, 201, #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], <<"url">> := URL1 }}, request_json( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ) ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% disable it {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), ?assertMatch( {ok, 200, #{<<"status">> := <<"stopped">>}}, request_json(get, uri(["bridges", BridgeID]), Config) ), %% enable again {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), ?assertMatch( {ok, 200, #{<<"status">> := <<"connected">>}}, request_json(get, uri(["bridges", BridgeID]), Config) ), %% enable an already started bridge {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), ?assertMatch( {ok, 200, #{<<"status">> := <<"connected">>}}, request_json(get, uri(["bridges", BridgeID]), Config) ), %% disable it again {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), %% bad param {ok, 404, _} = request(put, enable_path(foo, BridgeID), Config), {ok, 404, _} = request(put, enable_path(true, "foo"), Config), {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config), {ok, 400, Res} = request(post, {operation, node, start, BridgeID}, <<>>, fun json/1, Config), ?assertEqual( #{ <<"code">> => <<"BAD_REQUEST">>, <<"message">> => <<"Forbidden operation, bridge not enabled">> }, Res ), {ok, 400, Res} = request(post, {operation, cluster, start, BridgeID}, <<>>, fun json/1, Config), %% enable a stopped bridge {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), ?assertMatch( {ok, 200, #{<<"status">> := <<"connected">>}}, request_json(get, uri(["bridges", BridgeID]), Config) ), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), {ok, 200, []} = request_json(get, uri(["bridges"]), Config). t_reset_bridges(Config) -> %% assert there's no bridges at first {ok, 200, []} = request_json(get, uri(["bridges"]), Config), Name = ?BRIDGE_NAME, Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), ?assertMatch( {ok, 201, #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], <<"url">> := URL1 }}, request_json( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ) ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), Config), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), {ok, 200, []} = request_json(get, uri(["bridges"]), Config). t_with_redact_update(Config) -> ok = snabbkaffe:start_trace(), on_exit(fun() -> ok = snabbkaffe:stop() end), Name = <<"redact_update">>, Type = <<"mqtt">>, Password = <<"123456">>, Template = #{ <<"type">> => Type, <<"name">> => Name, <<"server">> => <<"127.0.0.1:1883">>, <<"username">> => <<"test">>, <<"password">> => Password, <<"ingress">> => #{<<"remote">> => #{<<"topic">> => <<"t/#">>}} }, {ok, 201, _} = request( post, uri(["bridges"]), Template, Config ), %% update with redacted config BridgeConf = emqx_utils:redact(Template), BridgeID = emqx_bridge_resource:bridge_id(Type, Name), {ok, 200, _} = request(put, uri(["bridges", BridgeID]), BridgeConf, Config), %% bridge is migrated after creation ConfigRootKey = connectors, ?assertEqual( Password, get_raw_config([ConfigRootKey, Type, Name, password], Config) ), %% probe with new password; should not be considered redacted {_, {ok, #{params := UsedParams}}} = ?wait_async_action( request( post, uri(["bridges_probe"]), Template#{<<"password">> := <<"newpassword">>}, Config ), #{?snk_kind := bridge_v1_api_dry_run}, 1_000 ), UsedPassword0 = maps:get(<<"password">>, UsedParams), %% the password field schema makes %% `emqx_dashboard_swagger:filter_check_request_and_translate_body' wrap the password. %% hack: this fails with `badfun' in CI only, due to cover compile, if not evaluated %% in the original node... PrimaryNode = ?config(node, Config), erpc:call(PrimaryNode, fun() -> ?assertEqual(<<"newpassword">>, UsedPassword0()) end), ok = snabbkaffe:stop(), ok. t_bridges_probe(Config) -> Port = ?config(port, Config), URL = ?URL(Port, "some_path"), {ok, 204, <<>>} = request( post, uri(["bridges_probe"]), ?HTTP_BRIDGE(URL), Config ), %% second time with same name is ok since no real bridge created {ok, 204, <<>>} = request( post, uri(["bridges_probe"]), ?HTTP_BRIDGE(URL), Config ), %% with descriptions is ok. {ok, 204, <<>>} = request( post, uri(["bridges_probe"]), (?HTTP_BRIDGE(URL))#{<<"description">> => <<"Test Description">>}, Config ), ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := _ }}, request_json( post, uri(["bridges_probe"]), ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>), Config ) ), %% Missing scheme in URL ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := _ }}, request_json( post, uri(["bridges_probe"]), ?HTTP_BRIDGE(<<"203.0.113.3:1234/foo">>), Config ) ), %% Invalid port ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := _ }}, request_json( post, uri(["bridges_probe"]), ?HTTP_BRIDGE(<<"http://203.0.113.3:12341234/foo">>), Config ) ), {ok, 204, _} = request( post, uri(["bridges_probe"]), ?MQTT_BRIDGE(<<"127.0.0.1:1883">>), Config ), ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Connection refused">> }}, request_json( post, uri(["bridges_probe"]), ?MQTT_BRIDGE(<<"127.0.0.1:2883">>), Config ) ), ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Could not resolve host">> }}, request_json( post, uri(["bridges_probe"]), ?MQTT_BRIDGE(<<"nohost:2883">>), Config ) ), AuthnConfig = #{ <<"mechanism">> => <<"password_based">>, <<"backend">> => <<"built_in_database">>, <<"user_id_type">> => <<"username">> }, Chain = 'mqtt:global', {ok, _} = update_config( [authentication], {create_authenticator, Chain, AuthnConfig}, Config ), User = #{user_id => <<"u">>, password => <<"p">>}, AuthenticatorID = <<"password_based:built_in_database">>, {ok, _} = add_user_auth( Chain, AuthenticatorID, User, Config ), on_exit(fun() -> delete_user_auth(Chain, AuthenticatorID, User, Config) end), ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Unauthorized client">> }}, request_json( post, uri(["bridges_probe"]), ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{<<"proto_ver">> => <<"v4">>}, Config ) ), ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Bad username or password">> }}, request_json( post, uri(["bridges_probe"]), ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{ <<"proto_ver">> => <<"v4">>, <<"password">> => <<"mySecret">>, <<"username">> => <<"u">> }, Config ) ), ?assertMatch( {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Not authorized">> }}, request_json( post, uri(["bridges_probe"]), ?MQTT_BRIDGE(<<"127.0.0.1:1883">>), Config ) ), ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, request_json( post, uri(["bridges_probe"]), ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>), Config ) ), ok. t_metrics(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), Name = ?BRIDGE_NAME, ?assertMatch( {ok, 201, Bridge = #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL1 }} when %% assert that the bridge return doesn't contain metrics anymore not is_map_key(<<"metrics">>, Bridge) andalso not is_map_key(<<"node_metrics">>, Bridge), request_json( post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, Name), Config ) ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% check for empty bridge metrics ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"success">> := 0}, <<"node_metrics">> := [_ | _] }}, request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ), %% check that the bridge doesn't contain metrics anymore {ok, 200, Bridge} = request_json(get, uri(["bridges", BridgeID]), Config), ?assertNot(maps:is_key(<<"metrics">>, Bridge)), ?assertNot(maps:is_key(<<"node_metrics">>, Bridge)), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, _ = publish_message(<<"emqx_webhook/1">>, Body, Config), ?assert( receive {http_server, received, #{ method := <<"POST">>, path := <<"/path1">>, body := Body }} -> true; Msg -> ct:pal("error: http got unexpected request: ~p", [Msg]), false after 100 -> false end ), %% check for non-empty bridge metrics ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"success">> := _}, <<"node_metrics">> := [_ | _] }}, request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ), %% check for absence of metrics when listing all bridges {ok, 200, Bridges} = request_json(get, uri(["bridges"]), Config), ?assertNotMatch( [ #{ <<"metrics">> := #{}, <<"node_metrics">> := [_ | _] } ], Bridges ), ok. %% request_timeout in bridge root should match request_ttl in %% resource_opts. t_inconsistent_webhook_request_timeouts(Config) -> Port = ?config(port, Config), URL1 = ?URL(Port, "path1"), Name = ?BRIDGE_NAME, BadBridgeParams = emqx_utils_maps:deep_merge( ?HTTP_BRIDGE(URL1, Name), #{ <<"request_timeout">> => <<"1s">>, <<"resource_opts">> => #{<<"request_ttl">> => <<"2s">>} } ), %% root request_timeout is deprecated for bridge. {ok, 201, #{ <<"resource_opts">> := ResourceOpts } = Response} = request_json( post, uri(["bridges"]), BadBridgeParams, Config ), ?assertNot(maps:is_key(<<"request_timeout">>, Response)), ?assertMatch(#{<<"request_ttl">> := <<"2s">>}, ResourceOpts), validate_resource_request_ttl(proplists:get_value(group, Config), 2000, Name), ok. t_cluster_later_join_metrics(Config) -> Port = ?config(port, Config), [PrimaryNode, OtherNode | _] = ?config(cluster_nodes, Config), URL1 = ?URL(Port, "path1"), Name = ?BRIDGE_NAME, BridgeParams = ?HTTP_BRIDGE(URL1, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), ?check_trace( #{timetrap => 15_000}, begin %% Create a bridge on only one of the nodes. ?assertMatch({ok, 201, _}, request_json(post, uri(["bridges"]), BridgeParams, Config)), %% Pre-condition. ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"success">> := _}, <<"node_metrics">> := [_ | _] }}, request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ), ct:print("node joining cluster"), %% Now join the other node join with the api node. ok = erpc:call(OtherNode, ekka, join, [PrimaryNode]), %% Hack / workaround for the fact that `emqx_machine_boot' doesn't restart the %% applications, in particular `emqx_conf' doesn't restart and synchronize the %% transaction id. It's also unclear at the moment why the equivalent test in %% `emqx_bridge_v2_api_SUITE' doesn't need this hack. ok = erpc:call(OtherNode, application, stop, [emqx_conf]), ok = erpc:call(OtherNode, application, start, [emqx_conf]), ct:print("node joined cluster"), %% assert: wait for the bridge to be ready on the other node. {_, {ok, _}} = ?wait_async_action( {emqx_cluster_rpc, OtherNode} ! wake_up, #{?snk_kind := cluster_rpc_caught_up, ?snk_meta := #{node := OtherNode}}, 10_000 ), %% Check metrics; shouldn't crash even if the bridge is not %% ready on the node that just joined the cluster. ?assertMatch( {ok, 200, #{ <<"metrics">> := #{<<"success">> := _}, <<"node_metrics">> := [#{<<"metrics">> := #{}}, #{<<"metrics">> := #{}} | _] }}, request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ), ok end, [] ), ok. t_create_with_bad_name(Config) -> Port = ?config(port, Config), URL1 = ?URL(Port, "path1"), Name = <<"test_哈哈">>, BadBridgeParams = emqx_utils_maps:deep_merge( ?HTTP_BRIDGE(URL1, Name), #{ <<"ssl">> => #{ <<"enable">> => true, <<"certfile">> => cert_file("certfile") } } ), {ok, 400, #{ <<"code">> := <<"BAD_REQUEST">>, <<"message">> := Msg0 }} = request_json( post, uri(["bridges"]), BadBridgeParams, Config ), Msg = emqx_utils_json:decode(Msg0, [return_maps]), ?assertMatch( #{ <<"kind">> := <<"validation_error">>, <<"reason">> := <<"Invalid name format.", _/binary>> }, Msg ), ok. validate_resource_request_ttl(single, Timeout, Name) -> SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000}, _BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), ?check_trace( begin {ok, Res} = ?wait_async_action( do_send_message(?BRIDGE_TYPE_HTTP, Name, SentData), #{?snk_kind := async_query}, 1000 ), ?assertMatch({ok, #{id := _ResId, query_opts := #{timeout := Timeout}}}, Res) end, fun(Trace0) -> Trace = ?of_kind(async_query, Trace0), ?assertMatch([#{query_opts := #{timeout := Timeout}}], Trace), ok end ); validate_resource_request_ttl(_Cluster, _Timeout, _Name) -> ignore. do_send_message(BridgeV1Type, Name, Message) -> Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeV1Type), emqx_bridge_v2:send_message(Type, Name, Message, #{}). %% request(Method, URL, Config) -> request(Method, URL, [], Config). request(Method, {operation, Type, Op, BridgeID}, Body, Config) -> URL = operation_path(Type, Op, BridgeID, Config), request(Method, URL, Body, Config); request(Method, URL, Body, Config) -> AuthHeader = emqx_common_test_http:auth_header(?config(api, Config)), Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts). request(Method, URL, Body, Decoder, Config) -> case request(Method, URL, Body, Config) of {ok, Code, Response} -> {ok, Code, Decoder(Response)}; Otherwise -> Otherwise end. request_json(Method, URLLike, Config) -> request(Method, URLLike, [], fun json/1, Config). request_json(Method, URLLike, Body, Config) -> request(Method, URLLike, Body, fun json/1, Config). operation_path(node, Oper, BridgeID, Config) -> uri(["nodes", ?config(node, Config), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID, _Config) -> uri(["bridges", BridgeID, Oper]). enable_path(Enable, BridgeID) -> uri(["bridges", BridgeID, "enable", Enable]). publish_message(Topic, Body, Config) -> Node = ?config(node, Config), erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]). update_config(Path, Value, Config) -> Node = ?config(node, Config), erpc:call(Node, emqx, update_config, [Path, Value]). get_raw_config(Path, Config) -> Node = ?config(node, Config), erpc:call(Node, emqx, get_raw_config, [Path]). add_user_auth(Chain, AuthenticatorID, User, Config) -> Node = ?config(node, Config), erpc:call(Node, emqx_authn_chains, add_user, [Chain, AuthenticatorID, User]). delete_user_auth(Chain, AuthenticatorID, User, Config) -> Node = ?config(node, Config), erpc:call(Node, emqx_authn_chains, delete_user, [Chain, AuthenticatorID, User]). str(S) when is_list(S) -> S; str(S) when is_binary(S) -> binary_to_list(S). json(B) when is_binary(B) -> emqx_utils_json:decode(B, [return_maps]). data_file(Name) -> Dir = code:lib_dir(emqx_bridge, test), {ok, Bin} = file:read_file(filename:join([Dir, "data", Name])), Bin. cert_file(Name) -> data_file(filename:join(["certs", Name])).