301 lines
11 KiB
Erlang
301 lines
11 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_bridge_api_SUITE).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-define(CONF_DEFAULT, <<"bridges: {}">>).
|
|
-define(BRIDGE_TYPE, <<"http">>).
|
|
-define(BRIDGE_NAME, <<"test_bridge">>).
|
|
-define(BRIDGE_ID, <<"http:test_bridge">>).
|
|
-define(URL(PORT, PATH), list_to_binary(
|
|
io_lib:format("http://localhost:~s/~s",
|
|
[integer_to_list(PORT), PATH]))).
|
|
-define(HTTP_BRIDGE(URL),
|
|
#{
|
|
<<"url">> => URL,
|
|
<<"local_topic">> => <<"emqx_http/#">>,
|
|
<<"method">> => <<"post">>,
|
|
<<"ssl">> => #{<<"enable">> => false},
|
|
<<"body">> => <<"${payload}">>,
|
|
<<"headers">> => #{
|
|
<<"content-type">> => <<"application/json">>
|
|
}
|
|
|
|
}).
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
groups() ->
|
|
[].
|
|
|
|
suite() ->
|
|
[{timetrap,{seconds,30}}].
|
|
|
|
init_per_suite(Config) ->
|
|
ok = emqx_config:put([emqx_dashboard], #{
|
|
default_username => <<"admin">>,
|
|
default_password => <<"public">>,
|
|
listeners => [#{
|
|
protocol => http,
|
|
port => 18083
|
|
}]
|
|
}),
|
|
_ = application:load(emqx_conf),
|
|
%% some testcases (may from other app) already get emqx_connector started
|
|
_ = application:stop(emqx_resource),
|
|
_ = application:stop(emqx_connector),
|
|
ok = emqx_common_test_helpers:start_apps([emqx_bridge, emqx_dashboard]),
|
|
ok = emqx_config:init_load(emqx_bridge_schema, ?CONF_DEFAULT),
|
|
Config.
|
|
|
|
end_per_suite(_Config) ->
|
|
emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_dashboard]),
|
|
ok.
|
|
|
|
init_per_testcase(_, Config) ->
|
|
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
|
Config.
|
|
end_per_testcase(_, _Config) ->
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% HTTP server for testing
|
|
%%------------------------------------------------------------------------------
|
|
start_http_server(HandleFun) ->
|
|
Parent = self(),
|
|
spawn_link(fun() ->
|
|
{Port, Sock} = listen_on_random_port(),
|
|
Parent ! {port, Port},
|
|
loop(Sock, HandleFun)
|
|
end),
|
|
receive
|
|
{port, Port} -> Port
|
|
after
|
|
2000 -> error({timeout, start_http_server})
|
|
end.
|
|
|
|
listen_on_random_port() ->
|
|
Min = 1024, Max = 65000,
|
|
Port = rand:uniform(Max - Min) + Min,
|
|
case gen_tcp:listen(Port, [{active, false}, {reuseaddr, true}]) of
|
|
{ok, Sock} -> {Port, Sock};
|
|
{error, eaddrinuse} -> listen_on_random_port()
|
|
end.
|
|
|
|
loop(Sock, HandleFun) ->
|
|
{ok, Conn} = gen_tcp:accept(Sock),
|
|
Handler = spawn(fun () -> HandleFun(Conn) end),
|
|
gen_tcp:controlling_process(Conn, Handler),
|
|
loop(Sock, HandleFun).
|
|
|
|
make_response(CodeStr, Str) ->
|
|
B = iolist_to_binary(Str),
|
|
iolist_to_binary(
|
|
io_lib:fwrite(
|
|
"HTTP/1.0 ~s\nContent-Type: text/html\nContent-Length: ~p\n\n~s",
|
|
[CodeStr, size(B), B])).
|
|
|
|
handle_fun_200_ok(Conn) ->
|
|
case gen_tcp:recv(Conn, 0) of
|
|
{ok, Request} ->
|
|
gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
|
|
self() ! {http_server, received, Request},
|
|
handle_fun_200_ok(Conn);
|
|
{error, closed} ->
|
|
gen_tcp:close(Conn)
|
|
end.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Testcases
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_http_crud_apis(_) ->
|
|
Port = start_http_server(fun handle_fun_200_ok/1),
|
|
%% assert we there's no bridges at first
|
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
|
%% then we add a http bridge, using POST
|
|
%% POST /bridges/ will create a bridge
|
|
URL1 = ?URL(Port, "path1"),
|
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
|
?HTTP_BRIDGE(URL1)#{
|
|
<<"type">> => ?BRIDGE_TYPE,
|
|
<<"name">> => ?BRIDGE_NAME
|
|
}),
|
|
|
|
%ct:pal("---bridge: ~p", [Bridge]),
|
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
|
|
, <<"type">> := ?BRIDGE_TYPE
|
|
, <<"name">> := ?BRIDGE_NAME
|
|
, <<"status">> := _
|
|
, <<"node_status">> := [_|_]
|
|
, <<"metrics">> := _
|
|
, <<"node_metrics">> := [_|_]
|
|
, <<"url">> := URL1
|
|
}, jsx:decode(Bridge)),
|
|
|
|
%% create a again returns an error
|
|
{ok, 400, RetMsg} = request(post, uri(["bridges"]),
|
|
?HTTP_BRIDGE(URL1)#{
|
|
<<"type">> => ?BRIDGE_TYPE,
|
|
<<"name">> => ?BRIDGE_NAME
|
|
}),
|
|
?assertMatch(
|
|
#{ <<"code">> := _
|
|
, <<"message">> := <<"bridge already exists">>
|
|
}, jsx:decode(RetMsg)),
|
|
|
|
%% update the request-path of the bridge
|
|
URL2 = ?URL(Port, "path2"),
|
|
{ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]),
|
|
?HTTP_BRIDGE(URL2)),
|
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
|
|
, <<"type">> := ?BRIDGE_TYPE
|
|
, <<"name">> := ?BRIDGE_NAME
|
|
, <<"status">> := _
|
|
, <<"node_status">> := [_|_]
|
|
, <<"metrics">> := _
|
|
, <<"node_metrics">> := [_|_]
|
|
, <<"url">> := URL2
|
|
}, jsx:decode(Bridge2)),
|
|
|
|
%% list all bridges again, assert Bridge2 is in it
|
|
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
|
|
?assertMatch([#{ <<"id">> := ?BRIDGE_ID
|
|
, <<"type">> := ?BRIDGE_TYPE
|
|
, <<"name">> := ?BRIDGE_NAME
|
|
, <<"status">> := _
|
|
, <<"node_status">> := [_|_]
|
|
, <<"metrics">> := _
|
|
, <<"node_metrics">> := [_|_]
|
|
, <<"url">> := URL2
|
|
}], jsx:decode(Bridge2Str)),
|
|
|
|
%% get the bridge by id
|
|
{ok, 200, Bridge3Str} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
|
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
|
|
, <<"type">> := ?BRIDGE_TYPE
|
|
, <<"name">> := ?BRIDGE_NAME
|
|
, <<"status">> := _
|
|
, <<"node_status">> := [_|_]
|
|
, <<"metrics">> := _
|
|
, <<"node_metrics">> := [_|_]
|
|
, <<"url">> := URL2
|
|
}, jsx:decode(Bridge3Str)),
|
|
|
|
%% delete the bridge
|
|
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
|
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
|
|
|
%% update a deleted bridge returns an error
|
|
{ok, 404, ErrMsg2} = request(put, uri(["bridges", ?BRIDGE_ID]),
|
|
?HTTP_BRIDGE(URL2)),
|
|
?assertMatch(
|
|
#{ <<"code">> := _
|
|
, <<"message">> := <<"bridge not found">>
|
|
}, jsx:decode(ErrMsg2)),
|
|
ok.
|
|
|
|
t_start_stop_bridges(_) ->
|
|
Port = start_http_server(fun handle_fun_200_ok/1),
|
|
URL1 = ?URL(Port, "abc"),
|
|
{ok, 201, Bridge} = request(post, uri(["bridges"]),
|
|
?HTTP_BRIDGE(URL1)#{
|
|
<<"type">> => ?BRIDGE_TYPE,
|
|
<<"name">> => ?BRIDGE_NAME
|
|
}),
|
|
%ct:pal("the bridge ==== ~p", [Bridge]),
|
|
?assertMatch(
|
|
#{ <<"id">> := ?BRIDGE_ID
|
|
, <<"type">> := ?BRIDGE_TYPE
|
|
, <<"name">> := ?BRIDGE_NAME
|
|
, <<"status">> := _
|
|
, <<"node_status">> := [_|_]
|
|
, <<"metrics">> := _
|
|
, <<"node_metrics">> := [_|_]
|
|
, <<"url">> := URL1
|
|
}, jsx:decode(Bridge)),
|
|
%% stop it
|
|
{ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
|
|
{ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
|
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
|
|
, <<"status">> := <<"disconnected">>
|
|
}, jsx:decode(Bridge2)),
|
|
%% start again
|
|
{ok, 200, <<>>} = request(post, operation_path(start), <<"">>),
|
|
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
|
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
|
|
, <<"status">> := <<"connected">>
|
|
}, jsx:decode(Bridge3)),
|
|
%% restart an already started bridge
|
|
{ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
|
|
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
|
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
|
|
, <<"status">> := <<"connected">>
|
|
}, jsx:decode(Bridge3)),
|
|
%% stop it again
|
|
{ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
|
|
%% restart a stopped bridge
|
|
{ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
|
|
{ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
|
|
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
|
|
, <<"status">> := <<"connected">>
|
|
}, jsx:decode(Bridge4)),
|
|
%% delete the bridge
|
|
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
|
|
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% HTTP Request
|
|
%%--------------------------------------------------------------------
|
|
-define(HOST, "http://127.0.0.1:18083/").
|
|
-define(API_VERSION, "v5").
|
|
-define(BASE_PATH, "api").
|
|
|
|
request(Method, Url, Body) ->
|
|
Request = case Body of
|
|
[] -> {Url, [auth_header_()]};
|
|
_ -> {Url, [auth_header_()], "application/json", jsx:encode(Body)}
|
|
end,
|
|
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
|
|
case httpc:request(Method, Request, [], [{body_format, binary}]) of
|
|
{error, socket_closed_remotely} ->
|
|
{error, socket_closed_remotely};
|
|
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return} } ->
|
|
{ok, Code, Return};
|
|
{ok, {Reason, _, _}} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
uri() -> uri([]).
|
|
uri(Parts) when is_list(Parts) ->
|
|
NParts = [E || E <- Parts],
|
|
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
|
|
|
|
auth_header_() ->
|
|
Username = <<"admin">>,
|
|
Password = <<"public">>,
|
|
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
|
|
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
|
|
|
|
operation_path(Oper) ->
|
|
uri(["bridges", ?BRIDGE_ID, "operation", Oper]).
|