emqx/apps/emqx_ft/test/emqx_ft_api_SUITE.erl

327 lines
9.5 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_api_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
all() ->
[
{group, single},
{group, cluster}
].
groups() ->
[
{single, [], emqx_common_test_helpers:all(?MODULE)},
{cluster, [], emqx_common_test_helpers:all(?MODULE)}
].
init_per_suite(Config) ->
ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_ft], emqx_ft_test_helpers:env_handler(Config)
),
{ok, _} = emqx:update_config([rpc, port_discovery], manual),
Config.
end_per_suite(_Config) ->
ok = emqx_mgmt_api_test_util:end_suite([emqx_ft, emqx_conf]),
ok.
init_per_group(Group = cluster, Config) ->
Cluster = mk_cluster_specs(Config),
ct:pal("Starting ~p", [Cluster]),
Nodes = [
emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()})
|| {Name, Opts} <- Cluster
],
[{group, Group}, {cluster_nodes, Nodes} | Config];
init_per_group(Group, Config) ->
[{group, Group} | Config].
end_per_group(cluster, Config) ->
ok = lists:foreach(
fun emqx_ft_test_helpers:stop_additional_node/1,
?config(cluster_nodes, Config)
);
end_per_group(_Group, _Config) ->
ok.
mk_cluster_specs(Config) ->
Specs = [
{core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}},
{core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}}
],
CommOpts = [
{env, [{emqx, boot_modules, [broker, listeners]}]},
{apps, [emqx_ft]},
{conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
{env_handler, emqx_ft_test_helpers:env_handler(Config)}
],
emqx_common_test_helpers:emqx_cluster(
Specs,
CommOpts
).
init_per_testcase(Case, Config) ->
[{tc, Case} | Config].
end_per_testcase(t_ft_disabled, _Config) ->
emqx_config:put([file_transfer, enable], true);
end_per_testcase(_Case, _Config) ->
ok.
%%--------------------------------------------------------------------
%% Tests
%%--------------------------------------------------------------------
t_list_files(Config) ->
ClientId = client_id(Config),
FileId = <<"f1">>,
Node = lists:last(cluster(Config)),
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
{ok, 200, #{<<"files">> := Files}} =
request_json(get, uri(["file_transfer", "files"])),
?assertMatch(
[#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
[File || File = #{<<"clientid">> := CId} <- Files, CId == ClientId]
),
{ok, 200, #{<<"files">> := FilesTransfer}} =
request_json(get, uri(["file_transfer", "files", ClientId, FileId])),
?assertMatch(
[#{<<"clientid">> := ClientId, <<"fileid">> := <<"f1">>}],
FilesTransfer
),
?assertMatch(
{ok, 404, #{<<"code">> := <<"FILES_NOT_FOUND">>}},
request_json(get, uri(["file_transfer", "files", ClientId, <<"no-such-file">>]))
).
t_download_transfer(Config) ->
ClientId = client_id(Config),
FileId = <<"f1">>,
Node = lists:last(cluster(Config)),
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
?assertMatch(
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
request_json(
get,
uri(["file_transfer", "file"]) ++ query(#{fileref => FileId})
)
),
?assertMatch(
{ok, 503, _},
request(
get,
uri(["file_transfer", "file"]) ++
query(#{fileref => FileId, node => <<"nonode@nohost">>})
)
),
?assertMatch(
{ok, 404, _},
request(
get,
uri(["file_transfer", "file"]) ++
query(#{fileref => <<"unknown_file">>, node => node()})
)
),
?assertMatch(
{ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
request_json(
get,
uri(["file_transfer", "file"]) ++
query(#{fileref => <<>>, node => node()})
)
),
?assertMatch(
{ok, 404, #{<<"message">> := <<"Invalid query parameter", _/bytes>>}},
request_json(
get,
uri(["file_transfer", "file"]) ++
query(#{fileref => <<"/etc/passwd">>, node => node()})
)
),
{ok, 200, #{<<"files">> := [File]}} =
request_json(get, uri(["file_transfer", "files", ClientId, FileId])),
{ok, 200, Response} = request(get, host() ++ maps:get(<<"uri">>, File)),
?assertEqual(
<<"data">>,
Response
).
t_list_files_paging(Config) ->
ClientId = client_id(Config),
NFiles = 20,
Nodes = cluster(Config),
Uploads = [
{mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)}
|| N <- lists:seq(1, NFiles)
],
ok = lists:foreach(
fun({FileId, Name, Node}) ->
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, <<"data">>, Node)
end,
Uploads
),
?assertMatch(
{ok, 200, #{<<"files">> := [_, _, _], <<"cursor">> := _}},
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 3}))
),
{ok, 200, #{<<"files">> := Files}} =
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100})),
?assert(length(Files) >= NFiles),
?assertNotMatch(
{ok, 200, #{<<"cursor">> := _}},
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 100}))
),
?assertMatch(
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
request_json(get, uri(["file_transfer", "files"]) ++ query(#{limit => 0}))
),
?assertMatch(
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<>>}))
),
?assertMatch(
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
request_json(get, uri(["file_transfer", "files"]) ++ query(#{following => <<"{\"\":}">>}))
),
?assertMatch(
{ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}},
request_json(
get,
uri(["file_transfer", "files"]) ++ query(#{following => <<"whatsthat!?">>})
)
),
PageThrough = fun PageThrough(Query, Acc) ->
case request_json(get, uri(["file_transfer", "files"]) ++ query(Query)) of
{ok, 200, #{<<"files">> := FilesPage, <<"cursor">> := Cursor}} ->
PageThrough(Query#{following => Cursor}, Acc ++ FilesPage);
{ok, 200, #{<<"files">> := FilesPage}} ->
Acc ++ FilesPage
end
end,
?assertEqual(Files, PageThrough(#{limit => 1}, [])),
?assertEqual(Files, PageThrough(#{limit => 8}, [])),
?assertEqual(Files, PageThrough(#{limit => NFiles}, [])).
t_ft_disabled(_Config) ->
?assertMatch(
{ok, 200, _},
request_json(get, uri(["file_transfer", "files"]))
),
?assertMatch(
{ok, 400, _},
request_json(
get,
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>})
)
),
ok = emqx_config:put([file_transfer, enable], false),
?assertMatch(
{ok, 503, _},
request_json(get, uri(["file_transfer", "files"]))
),
?assertMatch(
{ok, 503, _},
request_json(
get,
uri(["file_transfer", "file"]) ++ query(#{fileref => <<"f1">>, node => node()})
)
).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
cluster(Config) ->
[node() | proplists:get_value(cluster_nodes, Config, [])].
client_id(Config) ->
iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])).
mk_file_id(Prefix, N) ->
iolist_to_binary([Prefix, integer_to_list(N)]).
mk_file_name(N) ->
"file." ++ integer_to_list(N).
request(Method, Url) ->
emqx_mgmt_api_test_util:request(Method, Url, []).
request_json(Method, Url) ->
case emqx_mgmt_api_test_util:request(Method, Url, []) of
{ok, Code, Body} ->
{ok, Code, json(Body)};
Otherwise ->
Otherwise
end.
json(Body) when is_binary(Body) ->
emqx_utils_json:decode(Body, [return_maps]).
query(Params) ->
KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
"?" ++ string:join(KVs, "&").
uri_encode(T) ->
emqx_http_lib:uri_encode(to_list(T)).
to_list(A) when is_atom(A) ->
atom_to_list(A);
to_list(A) when is_integer(A) ->
integer_to_list(A);
to_list(B) when is_binary(B) ->
binary_to_list(B);
to_list(L) when is_list(L) ->
L.
pick(N, List) ->
lists:nth(1 + (N rem length(List)), List).