diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a353c9cf0..6bfa439d1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -92,7 +92,7 @@ param_path_operation_cluster() -> #{ in => path, required => true, - example => <<"start">>, + example => <<"restart">>, desc => ?DESC("desc_param_path_operation_cluster") } )}. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 9ca87d106..19ab05cc5 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -13,6 +13,34 @@ -define(PRODUCER, emqx_bridge_impl_kafka). +%%------------------------------------------------------------------------------ +%% Things for REST API tests +%%------------------------------------------------------------------------------ + +-import( + emqx_common_test_http, + [ + request_api/3, + request_api/5, + get_http_data/1 + ] +). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include("emqx_dashboard.hrl"). + +-define(CONTENT_TYPE, "application/x-www-form-urlencoded"). + +-define(HOST, "http://127.0.0.1:18083"). + +%% -define(API_VERSION, "v5"). + +-define(BASE_PATH, "/api/v5"). + +-define(APP_DASHBOARD, emqx_dashboard). +-define(APP_MANAGEMENT, emqx_management). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -36,13 +64,89 @@ wait_until_kafka_is_up(Attempts) -> end. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - {ok, _} = application:ensure_all_started(wolff), + %% Need to unload emqx_authz. See emqx_machine_SUITE:init_per_suite for + %% more info. + application:unload(emqx_authz), + emqx_common_test_helpers:start_apps( + [emqx_conf, emqx_rule_engine, emqx_bridge, emqx_management, emqx_dashboard], + fun set_special_configs/1 + ), + application:set_env(emqx_machine, applications, [ + emqx_prometheus, + emqx_modules, + emqx_dashboard, + emqx_gateway, + emqx_statsd, + emqx_resource, + emqx_rule_engine, + emqx_bridge, + emqx_ee_bridge, + emqx_plugin_libs, + emqx_management, + emqx_retainer, + emqx_exhook, + emqx_authn, + emqx_authz, + emqx_plugin + ]), + {ok, _} = application:ensure_all_started(emqx_machine), wait_until_kafka_is_up(), + %% Wait until bridges API is up + (fun WaitUntilRestApiUp() -> + case show(http_get(["bridges"])) of + {ok, 200, _Res} -> + ok; + Val -> + ct:pal("REST API for bridges not up. Wait and try again. Response: ~p", [Val]), + timer:sleep(1000), + WaitUntilRestApiUp() + end + end)(), Config. -end_per_suite(_) -> +end_per_suite(Config) -> + emqx_common_test_helpers:stop_apps([ + emqx_prometheus, + emqx_modules, + emqx_dashboard, + emqx_gateway, + emqx_statsd, + emqx_resource, + emqx_rule_engine, + emqx_bridge, + emqx_ee_bridge, + emqx_plugin_libs, + emqx_management, + emqx_retainer, + emqx_exhook, + emqx_authn, + emqx_authz, + emqx_plugin, + emqx_conf, + emqx_bridge, + emqx_management, + emqx_dashboard, + emqx_machine + ]), + mria:stop(), + Config. + +set_special_configs(emqx_management) -> + Listeners = #{http => #{port => 8081}}, + Config = #{ + listeners => Listeners, + applications => [#{id => "admin", secret => "public"}] + }, + emqx_config:put([emqx_management], Config), + ok; +set_special_configs(emqx_dashboard) -> + emqx_dashboard_api_test_helpers:set_default_config(), + ok; +set_special_configs(_) -> ok. +%%------------------------------------------------------------------------------ +%% Test cases for all combinations of SSL, no SSL and authentication types +%%------------------------------------------------------------------------------ t_publish_no_auth(_CtConfig) -> publish_with_and_without_ssl("none"). @@ -59,6 +163,160 @@ t_publish_sasl_scram512(_CtConfig) -> t_publish_sasl_kerberos(_CtConfig) -> publish_with_and_without_ssl(valid_sasl_kerberos_settings()). +%%------------------------------------------------------------------------------ +%% Test cases for REST api +%%------------------------------------------------------------------------------ + +show(X) -> + % erlang:display('______________ SHOW ______________:'), + % erlang:display(X), + X. + +t_kafka_bridge_rest_api_plain_text(_CtConfig) -> + kafka_bridge_rest_api_all_auth_methods(false). + +t_kafka_bridge_rest_api_ssl(_CtConfig) -> + kafka_bridge_rest_api_all_auth_methods(true). + +kafka_bridge_rest_api_all_auth_methods(UseSSL) -> + NormalHostsString = + case UseSSL of + true -> kafka_hosts_string_ssl(); + false -> kafka_hosts_string() + end, + kafka_bridge_rest_api_helper(#{ + <<"bootstrap_hosts">> => NormalHostsString, + <<"authentication">> => <<"none">> + }), + SASLHostsString = + case UseSSL of + true -> kafka_hosts_string_ssl_sasl(); + false -> kafka_hosts_string_sasl() + end, + BinifyMap = fun(Map) -> + maps:from_list([ + {erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)} + || {K, V} <- maps:to_list(Map) + ]) + end, + SSLSettings = + case UseSSL of + true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())}; + false -> #{} + end, + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_plain_settings()) + }, + SSLSettings + ) + ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_scram256_settings()) + }, + SSLSettings + ) + ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_scram512_settings()) + }, + SSLSettings + ) + ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings()) + }, + SSLSettings + ) + ), + ok. + +kafka_bridge_rest_api_helper(Config) -> + UrlEscColon = "%3A", + BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge", + BridgesParts = ["bridges"], + BridgesPartsId = ["bridges", BridgeIdUrlEnc], + OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end, + BridgesPartsOpDisable = OpUrlFun("disable"), + BridgesPartsOpEnable = OpUrlFun("enable"), + BridgesPartsOpRestart = OpUrlFun("restart"), + BridgesPartsOpStop = OpUrlFun("stop"), + %% List bridges + MyKafkaBridgeExists = fun() -> + {ok, _Code, BridgesData} = show(http_get(BridgesParts)), + Bridges = show(json(BridgesData)), + lists:any( + fun + (#{<<"name">> := <<"my_kafka_bridge">>}) -> true; + (_) -> false + end, + Bridges + ) + end, + %% Delete if my_kafka_bridge exists + case MyKafkaBridgeExists() of + true -> + %% Delete the bridge my_kafka_bridge + show( + '========================================== DELETE ========================================' + ), + {ok, 204, <<>>} = show(http_delete(BridgesPartsId)); + false -> + ok + end, + false = MyKafkaBridgeExists(), + %% Create new Kafka bridge + CreateBodyTmp = #{ + <<"type">> => <<"kafka">>, + <<"name">> => <<"my_kafka_bridge">>, + <<"bootstrap_hosts">> => maps:get(<<"bootstrap_hosts">>, Config), + <<"enable">> => true, + <<"authentication">> => maps:get(<<"authentication">>, Config), + <<"producer">> => #{ + <<"mqtt">> => #{ + topic => <<"t/#">> + }, + <<"kafka">> => #{ + <<"topic">> => <<"test-topic-one-partition">> + } + } + }, + CreateBody = + case maps:is_key(<<"ssl">>, Config) of + true -> CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)}; + false -> CreateBodyTmp + end, + {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))), + %% Check that the new bridge is in the list of bridges + true = MyKafkaBridgeExists(), + %% Perform operations + {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), + %% Cleanup + {ok, 204, _} = show(http_delete(BridgesPartsId)), + false = MyKafkaBridgeExists(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper functions +%%------------------------------------------------------------------------------ + publish_with_and_without_ssl(AuthSettings) -> publish_helper(#{ auth_settings => AuthSettings, @@ -212,7 +470,8 @@ valid_ssl_settings() -> #{ "cacertfile" => <<"/var/lib/secret/ca.crt">>, "certfile" => <<"/var/lib/secret/client.crt">>, - "keyfile" => <<"/var/lib/secret/client.key">> + "keyfile" => <<"/var/lib/secret/client.key">>, + "enable" => <<"true">> }. valid_sasl_plain_settings() -> @@ -243,3 +502,57 @@ kafka_hosts() -> resolve_kafka_offset(Hosts, Topic, Partition) -> brod:resolve_offset(Hosts, Topic, Partition, latest). + +%%------------------------------------------------------------------------------ +%% Internal functions rest API helpers +%%------------------------------------------------------------------------------ + +bin(X) -> iolist_to_binary(X). + +random_num() -> + erlang:system_time(nanosecond). + +http_get(Parts) -> + request_api(get, api_path(Parts), auth_header_()). + +http_delete(Parts) -> + request_api(delete, api_path(Parts), auth_header_()). + +http_post(Parts, Body) -> + request_api(post, api_path(Parts), [], auth_header_(), Body). + +http_put(Parts, Body) -> + request_api(put, api_path(Parts), [], auth_header_(), Body). + +request_dashboard(Method, Url, Auth) -> + Request = {Url, [Auth]}, + do_request_dashboard(Method, Request). +request_dashboard(Method, Url, QueryParams, Auth) -> + Request = {Url ++ "?" ++ QueryParams, [Auth]}, + do_request_dashboard(Method, Request). +do_request_dashboard(Method, Request) -> + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], []) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _Headers, Return}} when + Code >= 200 andalso Code =< 299 + -> + {ok, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +auth_header_() -> + auth_header_(<<"admin">>, <<"public">>). + +auth_header_(Username, Password) -> + {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), + {"Authorization", "Bearer " ++ binary_to_list(Token)}. + +api_path(Parts) -> + ?HOST ++ filename:join([?BASE_PATH | Parts]). + +json(Data) -> + {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), + Jsx.