test: test cases for Kafka bridge REST API

This commit is contained in:
Kjell Winblad 2022-09-23 10:07:22 +02:00
parent 516d60c7da
commit adc67b165b
2 changed files with 318 additions and 5 deletions

View File

@ -92,7 +92,7 @@ param_path_operation_cluster() ->
#{
in => path,
required => true,
example => <<"start">>,
example => <<"restart">>,
desc => ?DESC("desc_param_path_operation_cluster")
}
)}.

View File

@ -13,6 +13,34 @@
-define(PRODUCER, emqx_bridge_impl_kafka).
%%------------------------------------------------------------------------------
%% Things for REST API tests
%%------------------------------------------------------------------------------
-import(
emqx_common_test_http,
[
request_api/3,
request_api/5,
get_http_data/1
]
).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include("emqx_dashboard.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
-define(HOST, "http://127.0.0.1:18083").
%% -define(API_VERSION, "v5").
-define(BASE_PATH, "/api/v5").
-define(APP_DASHBOARD, emqx_dashboard).
-define(APP_MANAGEMENT, emqx_management).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
@ -36,13 +64,89 @@ wait_until_kafka_is_up(Attempts) ->
end.
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(brod),
{ok, _} = application:ensure_all_started(wolff),
%% Need to unload emqx_authz. See emqx_machine_SUITE:init_per_suite for
%% more info.
application:unload(emqx_authz),
emqx_common_test_helpers:start_apps(
[emqx_conf, emqx_rule_engine, emqx_bridge, emqx_management, emqx_dashboard],
fun set_special_configs/1
),
application:set_env(emqx_machine, applications, [
emqx_prometheus,
emqx_modules,
emqx_dashboard,
emqx_gateway,
emqx_statsd,
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_ee_bridge,
emqx_plugin_libs,
emqx_management,
emqx_retainer,
emqx_exhook,
emqx_authn,
emqx_authz,
emqx_plugin
]),
{ok, _} = application:ensure_all_started(emqx_machine),
wait_until_kafka_is_up(),
%% Wait until bridges API is up
(fun WaitUntilRestApiUp() ->
case show(http_get(["bridges"])) of
{ok, 200, _Res} ->
ok;
Val ->
ct:pal("REST API for bridges not up. Wait and try again. Response: ~p", [Val]),
timer:sleep(1000),
WaitUntilRestApiUp()
end
end)(),
Config.
end_per_suite(_) ->
end_per_suite(Config) ->
emqx_common_test_helpers:stop_apps([
emqx_prometheus,
emqx_modules,
emqx_dashboard,
emqx_gateway,
emqx_statsd,
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_ee_bridge,
emqx_plugin_libs,
emqx_management,
emqx_retainer,
emqx_exhook,
emqx_authn,
emqx_authz,
emqx_plugin,
emqx_conf,
emqx_bridge,
emqx_management,
emqx_dashboard,
emqx_machine
]),
mria:stop(),
Config.
set_special_configs(emqx_management) ->
Listeners = #{http => #{port => 8081}},
Config = #{
listeners => Listeners,
applications => [#{id => "admin", secret => "public"}]
},
emqx_config:put([emqx_management], Config),
ok;
set_special_configs(emqx_dashboard) ->
emqx_dashboard_api_test_helpers:set_default_config(),
ok;
set_special_configs(_) ->
ok.
%%------------------------------------------------------------------------------
%% Test cases for all combinations of SSL, no SSL and authentication types
%%------------------------------------------------------------------------------
t_publish_no_auth(_CtConfig) ->
publish_with_and_without_ssl("none").
@ -59,6 +163,160 @@ t_publish_sasl_scram512(_CtConfig) ->
t_publish_sasl_kerberos(_CtConfig) ->
publish_with_and_without_ssl(valid_sasl_kerberos_settings()).
%%------------------------------------------------------------------------------
%% Test cases for REST api
%%------------------------------------------------------------------------------
show(X) ->
% erlang:display('______________ SHOW ______________:'),
% erlang:display(X),
X.
t_kafka_bridge_rest_api_plain_text(_CtConfig) ->
kafka_bridge_rest_api_all_auth_methods(false).
t_kafka_bridge_rest_api_ssl(_CtConfig) ->
kafka_bridge_rest_api_all_auth_methods(true).
kafka_bridge_rest_api_all_auth_methods(UseSSL) ->
NormalHostsString =
case UseSSL of
true -> kafka_hosts_string_ssl();
false -> kafka_hosts_string()
end,
kafka_bridge_rest_api_helper(#{
<<"bootstrap_hosts">> => NormalHostsString,
<<"authentication">> => <<"none">>
}),
SASLHostsString =
case UseSSL of
true -> kafka_hosts_string_ssl_sasl();
false -> kafka_hosts_string_sasl()
end,
BinifyMap = fun(Map) ->
maps:from_list([
{erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)}
|| {K, V} <- maps:to_list(Map)
])
end,
SSLSettings =
case UseSSL of
true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())};
false -> #{}
end,
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_plain_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_scram256_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_scram512_settings())
},
SSLSettings
)
),
kafka_bridge_rest_api_helper(
maps:merge(
#{
<<"bootstrap_hosts">> => SASLHostsString,
<<"authentication">> => BinifyMap(valid_sasl_kerberos_settings())
},
SSLSettings
)
),
ok.
kafka_bridge_rest_api_helper(Config) ->
UrlEscColon = "%3A",
BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge",
BridgesParts = ["bridges"],
BridgesPartsId = ["bridges", BridgeIdUrlEnc],
OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end,
BridgesPartsOpDisable = OpUrlFun("disable"),
BridgesPartsOpEnable = OpUrlFun("enable"),
BridgesPartsOpRestart = OpUrlFun("restart"),
BridgesPartsOpStop = OpUrlFun("stop"),
%% List bridges
MyKafkaBridgeExists = fun() ->
{ok, _Code, BridgesData} = show(http_get(BridgesParts)),
Bridges = show(json(BridgesData)),
lists:any(
fun
(#{<<"name">> := <<"my_kafka_bridge">>}) -> true;
(_) -> false
end,
Bridges
)
end,
%% Delete if my_kafka_bridge exists
case MyKafkaBridgeExists() of
true ->
%% Delete the bridge my_kafka_bridge
show(
'========================================== DELETE ========================================'
),
{ok, 204, <<>>} = show(http_delete(BridgesPartsId));
false ->
ok
end,
false = MyKafkaBridgeExists(),
%% Create new Kafka bridge
CreateBodyTmp = #{
<<"type">> => <<"kafka">>,
<<"name">> => <<"my_kafka_bridge">>,
<<"bootstrap_hosts">> => maps:get(<<"bootstrap_hosts">>, Config),
<<"enable">> => true,
<<"authentication">> => maps:get(<<"authentication">>, Config),
<<"producer">> => #{
<<"mqtt">> => #{
topic => <<"t/#">>
},
<<"kafka">> => #{
<<"topic">> => <<"test-topic-one-partition">>
}
}
},
CreateBody =
case maps:is_key(<<"ssl">>, Config) of
true -> CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)};
false -> CreateBodyTmp
end,
{ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))),
%% Check that the new bridge is in the list of bridges
true = MyKafkaBridgeExists(),
%% Perform operations
{ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})),
{ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})),
%% Cleanup
{ok, 204, _} = show(http_delete(BridgesPartsId)),
false = MyKafkaBridgeExists(),
ok.
%%------------------------------------------------------------------------------
%% Helper functions
%%------------------------------------------------------------------------------
publish_with_and_without_ssl(AuthSettings) ->
publish_helper(#{
auth_settings => AuthSettings,
@ -212,7 +470,8 @@ valid_ssl_settings() ->
#{
"cacertfile" => <<"/var/lib/secret/ca.crt">>,
"certfile" => <<"/var/lib/secret/client.crt">>,
"keyfile" => <<"/var/lib/secret/client.key">>
"keyfile" => <<"/var/lib/secret/client.key">>,
"enable" => <<"true">>
}.
valid_sasl_plain_settings() ->
@ -243,3 +502,57 @@ kafka_hosts() ->
resolve_kafka_offset(Hosts, Topic, Partition) ->
brod:resolve_offset(Hosts, Topic, Partition, latest).
%%------------------------------------------------------------------------------
%% Internal functions rest API helpers
%%------------------------------------------------------------------------------
bin(X) -> iolist_to_binary(X).
random_num() ->
erlang:system_time(nanosecond).
http_get(Parts) ->
request_api(get, api_path(Parts), auth_header_()).
http_delete(Parts) ->
request_api(delete, api_path(Parts), auth_header_()).
http_post(Parts, Body) ->
request_api(post, api_path(Parts), [], auth_header_(), Body).
http_put(Parts, Body) ->
request_api(put, api_path(Parts), [], auth_header_(), Body).
request_dashboard(Method, Url, Auth) ->
Request = {Url, [Auth]},
do_request_dashboard(Method, Request).
request_dashboard(Method, Url, QueryParams, Auth) ->
Request = {Url ++ "?" ++ QueryParams, [Auth]},
do_request_dashboard(Method, Request).
do_request_dashboard(Method, Request) ->
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _Headers, Return}} when
Code >= 200 andalso Code =< 299
->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
auth_header_() ->
auth_header_(<<"admin">>, <<"public">>).
auth_header_(Username, Password) ->
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
api_path(Parts) ->
?HOST ++ filename:join([?BASE_PATH | Parts]).
json(Data) ->
{ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]),
Jsx.