diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl new file mode 100644 index 000000000..69f573f75 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -0,0 +1,735 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v2_api_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_mgmt_api_test_util, [uri/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). + +-define(ROOT, "bridges_v2"). + +-define(CONNECTOR_NAME, <<"my_connector">>). + +-define(RESOURCE(NAME, TYPE), #{ + <<"enable">> => true, + %<<"ssl">> => #{<<"enable">> => false}, + <<"type">> => TYPE, + <<"name">> => NAME +}). + +-define(CONNECTOR_TYPE_STR, "kafka"). +-define(CONNECTOR_TYPE, <>). +-define(KAFKA_BOOTSTRAP_HOST, <<"127.0.0.1:9092">>). +-define(KAFKA_CONNECTOR(Name, BootstrapHosts), ?RESOURCE(Name, ?CONNECTOR_TYPE)#{ + <<"authentication">> => <<"none">>, + <<"bootstrap_hosts">> => BootstrapHosts, + <<"connect_timeout">> => <<"5s">>, + <<"metadata_request_timeout">> => <<"5s">>, + <<"min_metadata_refresh_interval">> => <<"3s">>, + <<"socket_opts">> => + #{ + <<"nodelay">> => true, + <<"recbuf">> => <<"1024KB">>, + <<"sndbuf">> => <<"1024KB">>, + <<"tcp_keepalive">> => <<"none">> + } +}). + +-define(CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). +-define(CONNECTOR, ?CONNECTOR(?CONNECTOR_NAME)). + +-define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). +-define(BRIDGE_TYPE_STR, "kafka"). +-define(BRIDGE_TYPE, <>). +-define(KAFKA_BRIDGE(Name, Connector), ?RESOURCE(Name, ?BRIDGE_TYPE)#{ + <<"connector">> => Connector, + <<"kafka">> => #{ + <<"buffer">> => #{ + <<"memory_overload_protection">> => true, + <<"mode">> => <<"hybrid">>, + <<"per_partition_limit">> => <<"2GB">>, + <<"segment_bytes">> => <<"100MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"kafka_ext_headers">> => [ + #{ + <<"kafka_ext_header_key">> => <<"clientid">>, + <<"kafka_ext_header_value">> => <<"${clientid}">> + }, + #{ + <<"kafka_ext_header_key">> => <<"topic">>, + <<"kafka_ext_header_value">> => <<"${topic}">> + } + ], + <<"kafka_header_value_encode_mode">> => <<"none">>, + <<"kafka_headers">> => <<"${pub_props}">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_inflight">> => 10, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"timestamp">> => <<"${.timestamp}">>, + <<"value">> => <<"${.}">> + }, + <<"partition_count_refresh_interval">> => <<"60s">>, + <<"partition_strategy">> => <<"random">>, + <<"required_acks">> => <<"all_isr">>, + <<"topic">> => <<"kafka-topic">> + }, + <<"local_topic">> => <<"mqtt/local/topic">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"32s">> + } +}). +-define(KAFKA_BRIDGE(Name), ?KAFKA_BRIDGE(Name, ?CONNECTOR_NAME)). + +%% -define(BRIDGE_TYPE_MQTT, <<"mqtt">>). +%% -define(MQTT_BRIDGE(SERVER, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_MQTT)#{ +%% <<"server">> => SERVER, +%% <<"username">> => <<"user1">>, +%% <<"password">> => <<"">>, +%% <<"proto_ver">> => <<"v5">>, +%% <<"egress">> => #{ +%% <<"remote">> => #{ +%% <<"topic">> => <<"emqx/${topic}">>, +%% <<"qos">> => <<"${qos}">>, +%% <<"retain">> => false +%% } +%% } +%% }). +%% -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)). + +%% -define(BRIDGE_TYPE_HTTP, <<"kafka">>). +%% -define(HTTP_BRIDGE(URL, NAME), ?BRIDGE(NAME, ?BRIDGE_TYPE_HTTP)#{ +%% <<"url">> => URL, +%% <<"local_topic">> => <<"emqx_webhook/#">>, +%% <<"method">> => <<"post">>, +%% <<"body">> => <<"${payload}">>, +%% <<"headers">> => #{ +%% % NOTE +%% % The Pascal-Case is important here. +%% % The reason is kinda ridiculous: `emqx_bridge_resource:create_dry_run/2` converts +%% % bridge config keys into atoms, and the atom 'Content-Type' exists in the ERTS +%% % when this happens (while the 'content-type' does not). +%% <<"Content-Type">> => <<"application/json">> +%% } +%% }). +%% -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)). + +%% -define(URL(PORT, PATH), +%% list_to_binary( +%% io_lib:format( +%% "http://localhost:~s/~s", +%% [integer_to_list(PORT), PATH] +%% ) +%% ) +%% ). + +-define(APPSPECS, [ + emqx_conf, + emqx, + emqx_authn, + emqx_management, + {emqx_bridge, "bridges_v2 {}"} +]). + +-define(APPSPEC_DASHBOARD, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} +). + +all() -> + [ + {group, single}, + %{group, cluster_later_join}, + {group, cluster} + ]. + +groups() -> + AllTCs = emqx_common_test_helpers:all(?MODULE), + SingleOnlyTests = [ + t_bridges_probe + ], + ClusterLaterJoinOnlyTCs = [ + % t_cluster_later_join_metrics + ], + [ + {single, [], AllTCs -- ClusterLaterJoinOnlyTCs}, + {cluster_later_join, [], ClusterLaterJoinOnlyTCs}, + {cluster, [], (AllTCs -- SingleOnlyTests) -- ClusterLaterJoinOnlyTCs} + ]. + +suite() -> + [{timetrap, {seconds, 60}}]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(cluster = Name, Config) -> + Nodes = [NodePrimary | _] = mk_cluster(Name, Config), + init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); +%% init_per_group(cluster_later_join = Name, Config) -> +%% Nodes = [NodePrimary | _] = mk_cluster(Name, Config, #{join_to => undefined}), +%% init_api([{group, Name}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]); +init_per_group(Name, Config) -> + WorkDir = filename:join(?config(priv_dir, Config), Name), + Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), + init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]). + +init_api(Config) -> + Node = ?config(node, Config), + {ok, ApiKey} = erpc:call(Node, emqx_common_test_http, create_default_app, []), + [{api_key, ApiKey} | Config]. + +mk_cluster(Name, Config) -> + mk_cluster(Name, Config, #{}). + +mk_cluster(Name, Config, Opts) -> + Node1Apps = ?APPSPECS ++ [?APPSPEC_DASHBOARD], + Node2Apps = ?APPSPECS, + emqx_cth_cluster:start( + [ + {emqx_bridge_api_SUITE_1, Opts#{role => core, apps => Node1Apps}}, + {emqx_bridge_api_SUITE_2, Opts#{role => core, apps => Node2Apps}} + ], + #{work_dir => filename:join(?config(priv_dir, Config), Name)} + ). + +end_per_group(Group, Config) when + Group =:= cluster; + Group =:= cluster_later_join +-> + ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)); +end_per_group(_, Config) -> + emqx_cth_suite:stop(?config(group_apps, Config)), + ok. + +init_per_testcase(_TestCase, Config) -> + case ?config(cluster_nodes, Config) of + undefined -> + init_mocks(); + Nodes -> + [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] + end, + {ok, 201, _} = request(post, uri(["connectors"]), ?CONNECTOR, Config), + Config. + +end_per_testcase(_TestCase, Config) -> + Node = ?config(node, Config), + ok = erpc:call(Node, fun clear_resources/0), + case ?config(cluster_nodes, Config) of + undefined -> + meck:unload(); + ClusterNodes -> + [erpc:call(ClusterNode, meck, unload, []) || ClusterNode <- ClusterNodes] + end, + ok = emqx_common_test_helpers:call_janitor(), + ok. + +-define(CONNECTOR_IMPL, dummy_connector_impl). +init_mocks() -> + meck:new(emqx_connector_ee_schema, [passthrough, no_link]), + meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), + meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), + meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), + meck:expect( + ?CONNECTOR_IMPL, + on_start, + fun + (<<"connector:", ?CONNECTOR_TYPE_STR, ":bad_", _/binary>>, _C) -> + {ok, bad_connector_state}; + (_I, _C) -> + {ok, connector_state} + end + ), + meck:expect(?CONNECTOR_IMPL, on_stop, 2, ok), + meck:expect( + ?CONNECTOR_IMPL, + on_get_status, + fun + (_, bad_connector_state) -> connecting; + (_, _) -> connected + end + ), + meck:expect(?CONNECTOR_IMPL, on_add_channel, 4, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_remove_channel, 3, {ok, connector_state}), + meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), + [?CONNECTOR_IMPL, emqx_connector_ee_schema]. + +clear_resources() -> + lists:foreach( + fun(#{type := Type, name := Name}) -> + {ok, _} = emqx_bridge_v2:remove(Type, Name) + end, + emqx_bridge_v2:list() + ), + lists:foreach( + fun(#{type := Type, name := Name}) -> + {ok, _} = emqx_connector:remove(Type, Name) + end, + emqx_connector:list() + ). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +%% We have to pretend testing a kafka bridge since at this point that's the +%% only one that's implemented. + +t_bridges_lifecycle(Config) -> + %% assert we there's no bridges at first + {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + + {ok, 404, _} = request(get, uri([?ROOT, "foo"]), Config), + {ok, 404, _} = request(get, uri([?ROOT, "kafka:foo"]), Config), + + %% need a var for patterns below + BridgeName = ?BRIDGE_NAME, + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"connector">> := ?CONNECTOR_NAME, + <<"kafka">> := #{}, + <<"local_topic">> := _, + <<"resource_opts">> := _ + }}, + request_json( + post, + uri([?ROOT]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ) + ), + + %% list all bridges, assert bridge is in it + ?assertMatch( + {ok, 200, [ + #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + } + ]}, + request_json(get, uri([?ROOT]), Config) + ), + + %% list all bridges, assert bridge is in it + ?assertMatch( + {ok, 200, [ + #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + } + ]}, + request_json(get, uri([?ROOT]), Config) + ), + + %% get the bridge by id + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + ?assertMatch( + {ok, 200, #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := BridgeName, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + }}, + request_json(get, uri([?ROOT, BridgeID]), Config) + ), + + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Invalid operation", _/binary>> + }}, + request_json(post, uri([?ROOT, BridgeID, "brababbel"]), Config) + ), + + %% update bridge config + {ok, 201, _} = request(post, uri(["connectors"]), ?CONNECTOR(<<"foobla">>), Config), + ?assertMatch( + {ok, 200, #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := BridgeName, + <<"connector">> := <<"foobla">>, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _] + }}, + request_json( + put, + uri([?ROOT, BridgeID]), + ?KAFKA_BRIDGE(?BRIDGE_NAME, <<"foobla">>), + Config + ) + ), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config), + {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + + %% update a deleted bridge returns an error + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := _ + }}, + request_json( + put, + uri([?ROOT, BridgeID]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ) + ), + + %% Deleting a non-existing bridge should result in an error + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := _ + }}, + request_json(delete, uri([?ROOT, BridgeID]), Config) + ), + + %% try delete unknown bridge id + ?assertMatch( + {ok, 404, #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Invalid bridge ID", _/binary>> + }}, + request_json(delete, uri([?ROOT, "foo"]), Config) + ), + + %% Try create bridge with bad characters as name + {ok, 400, _} = request(post, uri([?ROOT]), ?KAFKA_BRIDGE(<<"隋达"/utf8>>), Config), + ok. + +t_start_bridge_unknown_node(Config) -> + {ok, 404, _} = + request( + post, + uri(["nodes", "thisbetterbenotanatomyet", ?ROOT, "kafka:foo", start]), + Config + ), + {ok, 404, _} = + request( + post, + uri(["nodes", "undefined", ?ROOT, "kafka:foo", start]), + Config + ). + +t_start_bridge_node(Config) -> + do_start_bridge(node, Config). + +t_start_bridge_cluster(Config) -> + do_start_bridge(cluster, Config). + +do_start_bridge(TestType, Config) -> + %% assert we there's no bridges at first + {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + + Name = atom_to_binary(TestType), + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri([?ROOT]), + ?KAFKA_BRIDGE(Name), + Config + ) + ), + + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + + %% start again + {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri([?ROOT, BridgeID]), Config) + ), + %% start a started bridge + {ok, 204, <<>>} = request(post, {operation, TestType, start, BridgeID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri([?ROOT, BridgeID]), Config) + ), + + {ok, 404, _} = request(post, {operation, TestType, invalidop, BridgeID}, Config), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config), + {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + + %% Fail parse-id check + {ok, 404, _} = request(post, {operation, TestType, start, <<"wreckbook_fugazi">>}, Config), + %% Looks ok but doesn't exist + {ok, 404, _} = request(post, {operation, TestType, start, <<"webhook:cptn_hook">>}, Config), + ok. + +%% t_start_stop_inconsistent_bridge_node(Config) -> +%% start_stop_inconsistent_bridge(node, Config). + +%% t_start_stop_inconsistent_bridge_cluster(Config) -> +%% start_stop_inconsistent_bridge(cluster, Config). + +%% start_stop_inconsistent_bridge(Type, Config) -> +%% Node = ?config(node, Config), + +%% erpc:call(Node, fun() -> +%% meck:new(emqx_bridge_resource, [passthrough, no_link]), +%% meck:expect( +%% emqx_bridge_resource, +%% stop, +%% fun +%% (_, <<"bridge_not_found">>) -> {error, not_found}; +%% (BridgeType, Name) -> meck:passthrough([BridgeType, Name]) +%% end +%% ) +%% end), + +%% emqx_common_test_helpers:on_exit(fun() -> +%% erpc:call(Node, fun() -> +%% meck:unload([emqx_bridge_resource]) +%% end) +%% end), + +%% {ok, 201, _Bridge} = request( +%% post, +%% uri([?ROOT]), +%% ?KAFKA_BRIDGE(<<"bridge_not_found">>), +%% Config +%% ), +%% {ok, 503, _} = request( +%% post, {operation, Type, stop, <<"kafka:bridge_not_found">>}, Config +%% ). + +%% [TODO] This is a mess, need to clarify what the actual behavior needs to be +%% like. +%% t_enable_disable_bridges(Config) -> +%% %% assert we there's no bridges at first +%% {ok, 200, []} = request_json(get, uri([?ROOT]), Config), + +%% Name = ?BRIDGE_NAME, +%% ?assertMatch( +%% {ok, 201, #{ +%% <<"type">> := ?BRIDGE_TYPE, +%% <<"name">> := Name, +%% <<"enable">> := true, +%% <<"status">> := <<"connected">>, +%% <<"node_status">> := [_ | _] +%% }}, +%% request_json( +%% post, +%% uri([?ROOT]), +%% ?KAFKA_BRIDGE(Name), +%% Config +%% ) +%% ), +%% BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), +%% %% disable it +%% meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connecting), +%% {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"stopped">>}}, +%% request_json(get, uri([?ROOT, BridgeID]), Config) +%% ), +%% %% enable again +%% meck:expect(?CONNECTOR_IMPL, on_get_channel_status, 3, connected), +%% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri([?ROOT, BridgeID]), Config) +%% ), +%% %% enable an already started bridge +%% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri([?ROOT, BridgeID]), Config) +%% ), +%% %% disable it again +%% {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), + +%% %% bad param +%% {ok, 404, _} = request(put, enable_path(foo, BridgeID), Config), +%% {ok, 404, _} = request(put, enable_path(true, "foo"), Config), +%% {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config), + +%% {ok, 400, Res} = request(post, {operation, node, start, BridgeID}, <<>>, fun json/1, Config), +%% ?assertEqual( +%% #{ +%% <<"code">> => <<"BAD_REQUEST">>, +%% <<"message">> => <<"Forbidden operation, bridge not enabled">> +%% }, +%% Res +%% ), +%% {ok, 400, Res} = request( +%% post, {operation, cluster, start, BridgeID}, <<>>, fun json/1, Config +%% ), + +%% %% enable a stopped bridge +%% {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), +%% ?assertMatch( +%% {ok, 200, #{<<"status">> := <<"connected">>}}, +%% request_json(get, uri([?ROOT, BridgeID]), Config) +%% ), +%% %% delete the bridge +%% {ok, 204, <<>>} = request(delete, uri([?ROOT, BridgeID]), Config), +%% {ok, 200, []} = request_json(get, uri([?ROOT]), Config). + +t_bridges_probe(Config) -> + {ok, 204, <<>>} = request( + post, + uri(["bridges_v2_probe"]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ), + + %% second time with same name is ok since no real bridge created + {ok, 204, <<>>} = request( + post, + uri(["bridges_v2_probe"]), + ?KAFKA_BRIDGE(?BRIDGE_NAME), + Config + ), + + meck:expect(?CONNECTOR_IMPL, on_start, 2, {error, on_start_error}), + + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"TEST_FAILED">>, + <<"message">> := _ + }}, + request_json( + post, + uri(["bridges_v2_probe"]), + ?KAFKA_BRIDGE(<<"broken_bridge">>, <<"brokenhost:1234">>), + Config + ) + ), + + meck:expect(?CONNECTOR_IMPL, on_start, 2, {ok, bridge_state}), + + ?assertMatch( + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, + request_json( + post, + uri(["bridges_v2_probe"]), + ?RESOURCE(<<"broken_bridge">>, <<"unknown_type">>), + Config + ) + ), + ok. + +%%% helpers +listen_on_random_port() -> + SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], + case gen_tcp:listen(0, SockOpts) of + {ok, Sock} -> + {ok, Port} = inet:port(Sock), + {Port, Sock}; + {error, Reason} when Reason /= eaddrinuse -> + {error, Reason} + end. + +request(Method, URL, Config) -> + request(Method, URL, [], Config). + +request(Method, {operation, Type, Op, BridgeID}, Body, Config) -> + URL = operation_path(Type, Op, BridgeID, Config), + request(Method, URL, Body, Config); +request(Method, URL, Body, Config) -> + AuthHeader = emqx_common_test_http:auth_header(?config(api_key, Config)), + Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, + emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts). + +request(Method, URL, Body, Decoder, Config) -> + case request(Method, URL, Body, Config) of + {ok, Code, Response} -> + case Decoder(Response) of + {error, _} = Error -> Error; + Decoded -> {ok, Code, Decoded} + end; + Otherwise -> + Otherwise + end. + +request_json(Method, URLLike, Config) -> + request(Method, URLLike, [], fun json/1, Config). + +request_json(Method, URLLike, Body, Config) -> + request(Method, URLLike, Body, fun json/1, Config). + +operation_path(node, Oper, BridgeID, Config) -> + uri(["nodes", ?config(node, Config), ?ROOT, BridgeID, Oper]); +operation_path(cluster, Oper, BridgeID, _Config) -> + uri([?ROOT, BridgeID, Oper]). + +enable_path(Enable, BridgeID) -> + uri([?ROOT, BridgeID, "enable", Enable]). + +publish_message(Topic, Body, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]). + +update_config(Path, Value, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx, update_config, [Path, Value]). + +get_raw_config(Path, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx, get_raw_config, [Path]). + +add_user_auth(Chain, AuthenticatorID, User, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx_authentication, add_user, [Chain, AuthenticatorID, User]). + +delete_user_auth(Chain, AuthenticatorID, User, Config) -> + Node = ?config(node, Config), + erpc:call(Node, emqx_authentication, delete_user, [Chain, AuthenticatorID, User]). + +str(S) when is_list(S) -> S; +str(S) when is_binary(S) -> binary_to_list(S). + +json(B) when is_binary(B) -> + case emqx_utils_json:safe_decode(B, [return_maps]) of + {ok, Term} -> + Term; + {error, Reason} = Error -> + ct:pal("Failed to decode json: ~p~n~p", [Reason, B]), + Error + end.