emqx/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl

997 lines
33 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_api_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CONF_DEFAULT, <<"bridges: {}">>).
-define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))).
-define(URL(PORT, PATH),
list_to_binary(
io_lib:format(
"http://localhost:~s/~s",
[integer_to_list(PORT), PATH]
)
)
).
-define(BRIDGE(NAME, TYPE), #{
<<"ssl">> => #{<<"enable">> => false},
<<"type">> => TYPE,
<<"name">> => NAME
}).
-define(BRIDGE_TYPE_MQTT, <<"mqtt">>).
-define(MQTT_BRIDGE(SERVER, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_MQTT)#{
<<"server">> => SERVER,
<<"username">> => <<"user1">>,
<<"password">> => <<"">>,
<<"proto_ver">> => <<"v5">>
}).
-define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)).
-define(HTTP_BRIDGE(URL, TYPE, NAME), ?BRIDGE(NAME, TYPE)#{
<<"url">> => URL,
<<"local_topic">> => <<"emqx_webhook/#">>,
<<"method">> => <<"post">>,
<<"body">> => <<"${payload}">>,
<<"headers">> => #{
<<"content-type">> => <<"application/json">>
}
}).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
suite() ->
[{timetrap, {seconds, 60}}].
init_per_suite(Config) ->
_ = application:load(emqx_conf),
%% some testcases (may from other app) already get emqx_connector started
_ = application:stop(emqx_resource),
_ = application:stop(emqx_connector),
ok = emqx_mgmt_api_test_util:init_suite(
[emqx_rule_engine, emqx_bridge, emqx_authn]
),
ok = emqx_common_test_helpers:load_config(
emqx_rule_engine_schema,
<<"rule_engine {rules {}}">>
),
ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?CONF_DEFAULT),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_authn]),
mria:clear_table(emqx_authn_mnesia),
ok.
init_per_testcase(t_broken_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, -1),
meck:expect(emqx_bpapi, supported_version, 2, -1),
init_per_testcase(commong, Config);
init_per_testcase(t_old_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, 1),
meck:expect(emqx_bpapi, supported_version, 2, 1),
init_per_testcase(common, Config);
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
{Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2),
[{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config].
end_per_testcase(t_broken_bpapi_vsn, Config) ->
meck:unload([emqx_bpapi]),
end_per_testcase(common, Config);
end_per_testcase(t_old_bpapi_vsn, Config) ->
meck:unload([emqx_bpapi]),
end_per_testcase(common, Config);
end_per_testcase(_, Config) ->
Sock = ?config(sock, Config),
Acceptor = ?config(acceptor, Config),
stop_http_server(Sock, Acceptor),
clear_resources(),
ok.
clear_resources() ->
lists:foreach(
fun(#{type := Type, name := Name}) ->
{ok, _} = emqx_bridge:remove(Type, Name)
end,
emqx_bridge:list()
).
%%------------------------------------------------------------------------------
%% HTTP server for testing
%%------------------------------------------------------------------------------
start_http_server(HandleFun) ->
process_flag(trap_exit, true),
Parent = self(),
{Port, Sock} = listen_on_random_port(),
Acceptor = spawn_link(fun() ->
accept_loop(Sock, HandleFun, Parent)
end),
timer:sleep(100),
{Port, Sock, Acceptor}.
stop_http_server(Sock, Acceptor) ->
exit(Acceptor, kill),
gen_tcp:close(Sock).
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
case gen_tcp:listen(0, SockOpts) of
{ok, Sock} ->
{ok, Port} = inet:port(Sock),
{Port, Sock};
{error, Reason} when Reason /= eaddrinuse ->
{error, Reason}
end.
accept_loop(Sock, HandleFun, Parent) ->
process_flag(trap_exit, true),
{ok, Conn} = gen_tcp:accept(Sock),
Handler = spawn_link(fun() -> HandleFun(Conn, Parent) end),
gen_tcp:controlling_process(Conn, Handler),
accept_loop(Sock, HandleFun, Parent).
make_response(CodeStr, Str) ->
B = iolist_to_binary(Str),
iolist_to_binary(
io_lib:fwrite(
"HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s",
[CodeStr, size(B), B]
)
).
handle_fun_200_ok(Conn, Parent) ->
case gen_tcp:recv(Conn, 0) of
{ok, ReqStr} ->
ct:pal("the http handler got request: ~p", [ReqStr]),
Req = parse_http_request(ReqStr),
Parent ! {http_server, received, Req},
gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
handle_fun_200_ok(Conn, Parent);
{error, Reason} ->
ct:pal("the http handler recv error: ~p", [Reason]),
timer:sleep(100),
gen_tcp:close(Conn)
end.
parse_http_request(ReqStr0) ->
[Method, ReqStr1] = string:split(ReqStr0, " ", leading),
[Path, ReqStr2] = string:split(ReqStr1, " ", leading),
[_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading),
[_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading),
#{method => Method, path => Path, body => Body}.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_http_crud_apis(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
{ok, 404, _} = request(get, uri(["bridges", "foo"]), []),
{ok, 404, _} = request(get, uri(["bridges", "webhook:foo"]), []),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
%ct:pal("---bridge: ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
?assert(
receive
{http_server, received, #{
method := <<"POST">>,
path := <<"/path1">>,
body := Body
}} ->
true;
Msg ->
ct:pal("error: http got unexpected request: ~p", [Msg]),
false
after 100 ->
false
end
),
%% update the request-path of the bridge
URL2 = ?URL(Port, "path2"),
{ok, 200, Bridge2} = request(
put,
uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
),
?assertMatch(
#{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL2
},
emqx_json:decode(Bridge2, [return_maps])
),
%% list all bridges again, assert Bridge2 is in it
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
?assertMatch(
[
#{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL2
}
],
emqx_json:decode(Bridge2Str, [return_maps])
),
%% get the bridge by id
{ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(
#{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL2
},
emqx_json:decode(Bridge3Str, [return_maps])
),
%% send an message to emqx again, check the path has been changed
emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
?assert(
receive
{http_server, received, #{path := <<"/path2">>}} ->
true;
Msg2 ->
ct:pal("error: http got unexpected request: ~p", [Msg2]),
false
after 100 ->
false
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% update a deleted bridge returns an error
{ok, 404, ErrMsg2} = request(
put,
uri(["bridges", BridgeID]),
?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, Name)
),
?assertMatch(
#{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := _
},
emqx_json:decode(ErrMsg2, [return_maps])
),
%% try delete bad bridge id
{ok, 404, BadId} = request(delete, uri(["bridges", "foo"]), []),
?assertMatch(
#{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Invalid bridge ID", _/binary>>
},
emqx_json:decode(BadId, [return_maps])
),
%% Deleting a non-existing bridge should result in an error
{ok, 404, ErrMsg3} = request(delete, uri(["bridges", BridgeID]), []),
?assertMatch(
#{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := _
},
emqx_json:decode(ErrMsg3, [return_maps])
),
ok.
t_http_bridges_local_topic(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name1 = <<"t_http_bridges_with_local_topic1">>,
Name2 = <<"t_http_bridges_without_local_topic1">>,
%% create one http bridge with local_topic
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name1)
),
%% and we create another one without local_topic
{ok, 201, _} = request(
post,
uri(["bridges"]),
maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name2))
),
BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name1),
BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name2),
%% Send an message to emqx and the message should be forwarded to the HTTP server.
%% This is to verify we can have 2 bridges with and without local_topic fields
%% at the same time.
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
?assert(
receive
{http_server, received, #{
method := <<"POST">>,
path := <<"/path1">>,
body := Body
}} ->
true;
Msg ->
ct:pal("error: http got unexpected request: ~p", [Msg]),
false
after 100 ->
false
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID1]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID2]), []),
ok.
t_check_dependent_actions_on_delete(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = <<"t_http_crud_apis">>,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
{ok, 201, Rule} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"t_http_crud_apis">>,
<<"enable">> => true,
<<"actions">> => [BridgeID],
<<"sql">> => <<"SELECT * from \"t\"">>
}
),
#{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]),
%% deleting the bridge should fail because there is a rule that depends on it
{ok, 400, _} = request(
delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=false", []
),
%% delete the rule first
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
%% then delete the bridge is OK
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_cascade_delete_actions(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = <<"t_http_crud_apis">>,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
{ok, 201, Rule} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"t_http_crud_apis">>,
<<"enable">> => true,
<<"actions">> => [BridgeID],
<<"sql">> => <<"SELECT * from \"t\"">>
}
),
#{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]),
%% delete the bridge will also delete the actions from the rules
{ok, 204, _} = request(
delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=true", []
),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
{ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []),
?assertMatch(
#{
<<"actions">> := []
},
emqx_json:decode(Rule1, [return_maps])
),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 201, _} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
{ok, 201, _} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"t_http_crud_apis">>,
<<"enable">> => true,
<<"actions">> => [BridgeID],
<<"sql">> => <<"SELECT * from \"t\"">>
}
),
{ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
t_broken_bpapi_vsn(Config) ->
Port = ?config(port, Config),
URL1 = ?URL(Port, "abc"),
Name = <<"t_bad_bpapi_vsn">>,
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% still works since we redirect to 'restart'
{ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
{ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>),
ok.
t_old_bpapi_vsn(Config) ->
Port = ?config(port, Config),
URL1 = ?URL(Port, "abc"),
Name = <<"t_bad_bpapi_vsn">>,
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>),
%% still works since we redirect to 'restart'
{ok, 204, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(cluster, restart, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, restart, BridgeID), <<"">>),
ok.
t_start_stop_bridges_node(Config) ->
{ok, 404, _} =
request(
post,
uri(["nodes", "thisbetterbenotanatomyet", "bridges", "webhook:foo", start]),
<<"">>
),
{ok, 404, _} =
request(
post,
uri(["nodes", "undefined", "bridges", "webhook:foo", start]),
<<"">>
),
do_start_stop_bridges(node, Config).
t_start_stop_bridges_cluster(Config) ->
do_start_stop_bridges(cluster, Config).
do_start_stop_bridges(Type, Config) ->
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
Port = ?config(port, Config),
URL1 = ?URL(Port, "abc"),
Name = atom_to_binary(Type),
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% stop it
{ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])),
%% start again
{ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
%% start a started bridge
{ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>),
{ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3_1, [return_maps])),
%% restart an already started bridge
{ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
%% stop it again
{ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
%% restart a stopped bridge
{ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])),
{ok, 404, _} = request(post, operation_path(Type, invalidop, BridgeID), <<"">>),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% Fail parse-id check
{ok, 404, _} = request(post, operation_path(Type, start, <<"wreckbook_fugazi">>), <<"">>),
%% Looks ok but doesn't exist
{ok, 404, _} = request(post, operation_path(Type, start, <<"webhook:cptn_hook">>), <<"">>),
%% Create broken bridge
{ListenPort, Sock} = listen_on_random_port(),
%% Connecting to this endpoint should always timeout
BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])),
BadName = <<"bad_", (atom_to_binary(Type))/binary>>,
{ok, 201, BadBridge1} = request(
post,
uri(["bridges"]),
?MQTT_BRIDGE(BadServer, BadName)
),
#{
<<"type">> := ?BRIDGE_TYPE_MQTT,
<<"name">> := BadName,
<<"enable">> := true,
<<"server">> := BadServer,
<<"status">> := <<"connecting">>,
<<"node_status">> := [_ | _]
} = emqx_json:decode(BadBridge1, [return_maps]),
BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
?assertMatch(
{ok, SC, _} when SC == 500 orelse SC == 503,
request(post, operation_path(Type, start, BadBridgeID), <<"">>)
),
ok = gen_tcp:close(Sock),
ok.
t_enable_disable_bridges(Config) ->
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
Name = ?BRIDGE_NAME,
Port = ?config(port, Config),
URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% disable it
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])),
%% enable again
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
%% enable an already started bridge
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])),
%% disable it again
{ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>),
%% bad param
{ok, 404, _} = request(put, enable_path(foo, BridgeID), <<"">>),
{ok, 404, _} = request(put, enable_path(true, "foo"), <<"">>),
{ok, 404, _} = request(put, enable_path(true, "webhook:foo"), <<"">>),
{ok, 400, Res} = request(post, operation_path(node, start, BridgeID), <<"">>),
?assertEqual(
<<"{\"code\":\"BAD_REQUEST\",\"message\":\"Forbidden operation, bridge not enabled\"}">>,
Res
),
{ok, 400, Res} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
%% enable a stopped bridge
{ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
t_reset_bridges(Config) ->
%% assert there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
Name = ?BRIDGE_NAME,
Port = ?config(port, Config),
URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
%ct:pal("the bridge ==== ~p", [Bridge]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = emqx_json:decode(Bridge, [return_maps]),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
t_with_redact_update(_Config) ->
Name = <<"redact_update">>,
Type = <<"mqtt">>,
Password = <<"123456">>,
Template = #{
<<"type">> => Type,
<<"name">> => Name,
<<"server">> => <<"127.0.0.1:1883">>,
<<"username">> => <<"test">>,
<<"password">> => Password,
<<"ingress">> =>
#{<<"remote">> => #{<<"topic">> => <<"t/#">>}}
},
{ok, 201, _} = request(
post,
uri(["bridges"]),
Template
),
%% update with redacted config
Conf = emqx_misc:redact(Template),
BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
{ok, 200, _ResBin} = request(
put,
uri(["bridges", BridgeID]),
Conf
),
RawConf = emqx:get_raw_config([bridges, Type, Name]),
Value = maps:get(<<"password">>, RawConf),
?assertEqual(Password, Value),
ok.
t_bridges_probe(Config) ->
Port = ?config(port, Config),
URL = ?URL(Port, "some_path"),
{ok, 204, <<>>} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
),
%% second time with same name is ok since no real bridge created
{ok, 204, <<>>} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(URL, ?BRIDGE_TYPE, ?BRIDGE_NAME)
),
{ok, 400, NxDomain} = request(
post,
uri(["bridges_probe"]),
?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>, ?BRIDGE_TYPE, ?BRIDGE_NAME)
),
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := _
},
emqx_json:decode(NxDomain, [return_maps])
),
{ok, 204, _} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"127.0.0.1:1883">>)
),
{ok, 400, ConnRefused} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"127.0.0.1:2883">>)
),
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Connection refused">>
},
emqx_json:decode(ConnRefused, [return_maps])
),
{ok, 400, HostNotFound} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"nohost:2883">>)
),
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Host not found">>
},
emqx_json:decode(HostNotFound, [return_maps])
),
AuthnConfig = #{
<<"mechanism">> => <<"password_based">>,
<<"backend">> => <<"built_in_database">>,
<<"user_id_type">> => <<"username">>
},
Chain = 'mqtt:global',
emqx:update_config(
[authentication],
{create_authenticator, Chain, AuthnConfig}
),
User = #{user_id => <<"u">>, password => <<"p">>},
AuthenticatorID = <<"password_based:built_in_database">>,
{ok, _} = emqx_authentication:add_user(
Chain,
AuthenticatorID,
User
),
{ok, 400, Unauthorized} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{<<"proto_ver">> => <<"v4">>}
),
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Unauthorized client">>
},
emqx_json:decode(Unauthorized, [return_maps])
),
{ok, 400, Malformed} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{
<<"proto_ver">> => <<"v4">>, <<"password">> => <<"mySecret">>, <<"username">> => <<"u">>
}
),
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Malformed username or password">>
},
emqx_json:decode(Malformed, [return_maps])
),
{ok, 400, NotAuthorized} = request(
post,
uri(["bridges_probe"]),
?MQTT_BRIDGE(<<"127.0.0.1:1883">>)
),
?assertMatch(
#{
<<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"Not authorized">>
},
emqx_json:decode(NotAuthorized, [return_maps])
),
{ok, 400, BadReq} = request(
post,
uri(["bridges_probe"]),
?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>)
),
?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, emqx_json:decode(BadReq, [return_maps])),
ok.
t_metrics(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
%ct:pal("---bridge: ~p", [Bridge]),
Decoded = emqx_json:decode(Bridge, [return_maps]),
#{
<<"type">> := ?BRIDGE_TYPE,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
} = Decoded,
%% assert that the bridge return doesn't contain metrics anymore
?assertNot(maps:is_key(<<"metrics">>, Decoded)),
?assertNot(maps:is_key(<<"node_metrics">>, Decoded)),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% check for empty bridge metrics
{ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
?assertMatch(
#{
<<"metrics">> := #{<<"success">> := 0},
<<"node_metrics">> := [_ | _]
},
emqx_json:decode(Bridge1Str, [return_maps])
),
%% check that the bridge doesn't contain metrics anymore
{ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []),
Decoded2 = emqx_json:decode(Bridge2Str, [return_maps]),
?assertNot(maps:is_key(<<"metrics">>, Decoded2)),
?assertNot(maps:is_key(<<"node_metrics">>, Decoded2)),
%% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>,
emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)),
?assert(
receive
{http_server, received, #{
method := <<"POST">>,
path := <<"/path1">>,
body := Body
}} ->
true;
Msg ->
ct:pal("error: http got unexpected request: ~p", [Msg]),
false
after 100 ->
false
end
),
%% check for non-empty bridge metrics
{ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []),
?assertMatch(
#{
<<"metrics">> := #{<<"success">> := _},
<<"node_metrics">> := [_ | _]
},
emqx_json:decode(Bridge3Str, [return_maps])
),
%% check that metrics isn't returned when listing all bridges
{ok, 200, BridgesStr} = request(get, uri(["bridges"]), []),
?assert(
lists:all(
fun(E) -> not maps:is_key(<<"metrics">>, E) end,
emqx_json:decode(BridgesStr, [return_maps])
)
),
ok.
%% request_timeout in bridge root should match request_timeout in
%% resource_opts.
t_inconsistent_webhook_request_timeouts(Config) ->
Port = ?config(port, Config),
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
BadBridgeParams =
emqx_map_lib:deep_merge(
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name),
#{
<<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
}
),
{ok, 201, RawResponse} = request(
post,
uri(["bridges"]),
BadBridgeParams
),
%% note: same value on both fields
?assertMatch(
#{
<<"request_timeout">> := <<"2s">>,
<<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>}
},
emqx_json:decode(RawResponse, [return_maps])
),
ok.
operation_path(node, Oper, BridgeID) ->
uri(["nodes", node(), "bridges", BridgeID, Oper]);
operation_path(cluster, Oper, BridgeID) ->
uri(["bridges", BridgeID, Oper]).
enable_path(Enable, BridgeID) ->
uri(["bridges", BridgeID, "enable", Enable]).
str(S) when is_list(S) -> S;
str(S) when is_binary(S) -> binary_to_list(S).