From 11b5b7b6383bfffd00582846b0989c8296d9605a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 23 Mar 2023 00:34:10 +0300 Subject: [PATCH] test(bridge-api): also run testcases in cluster environment Excluding a couple of testcases which does not make much sense running in the cluster. Also try to reduce amount of "noise" in the testcases, making them easier to comprehend. Co-authored-by: Thales Macedo Garitezi --- apps/emqx/test/emqx_common_test_helpers.erl | 7 +- .../test/emqx_bridge_api_SUITE.erl | 1038 ++++++++++------- 2 files changed, 629 insertions(+), 416 deletions(-) diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index d08812075..f445bea94 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -723,7 +723,7 @@ setup_node(Node, Opts) when is_map(Opts) -> ConfigureGenRpc = maps:get(configure_gen_rpc, Opts, true), LoadSchema = maps:get(load_schema, Opts, true), SchemaMod = maps:get(schema_mod, Opts, emqx_schema), - LoadApps = maps:get(load_apps, Opts, [gen_rpc, emqx, ekka, mria] ++ Apps), + LoadApps = maps:get(load_apps, Opts, Apps), Env = maps:get(env, Opts, []), Conf = maps:get(conf, Opts, []), ListenerPorts = maps:get(listener_ports, Opts, [ @@ -741,12 +741,13 @@ setup_node(Node, Opts) when is_map(Opts) -> StartAutocluster = maps:get(start_autocluster, Opts, false), %% Load env before doing anything to avoid overriding - lists:foreach(fun(App) -> rpc:call(Node, ?MODULE, load, [App]) end, LoadApps), + [ok = erpc:call(Node, ?MODULE, load, [App]) || App <- [gen_rpc, ekka, mria, emqx | LoadApps]], + %% Ensure a clean mnesia directory for each run to avoid %% inter-test flakiness. MnesiaDataDir = filename:join([ PrivDataDir, - node(), + Node, integer_to_list(erlang:unique_integer()), "mnesia" ]), diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 47a23e71c..fdbeb1be4 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -18,11 +18,14 @@ -compile(nowarn_export_all). -compile(export_all). --import(emqx_mgmt_api_test_util, [request/3, uri/1]). +-import(emqx_mgmt_api_test_util, [uri/1]). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). --define(CONF_DEFAULT, <<"bridges: {}">>). +-include_lib("snabbkaffe/include/test_macros.hrl"). + +-define(SUITE_APPS, [emqx_conf, emqx_authn, emqx_management, emqx_rule_engine, emqx_bridge]). + -define(BRIDGE_TYPE_HTTP, <<"webhook">>). -define(BRIDGE_NAME, (atom_to_binary(?FUNCTION_NAME))). -define(URL(PORT, PATH), @@ -54,37 +57,123 @@ <<"method">> => <<"post">>, <<"body">> => <<"${payload}">>, <<"headers">> => #{ - <<"content-type">> => <<"application/json">> + % NOTE + % The Pascal-Case is important here. + % The reason is kinda ridiculous: `emqx_bridge_resource:create_dry_run/2` converts + % bridge config keys into atoms, and the atom 'Content-Type' exists in the ERTS + % when this happens (while the 'content-type' does not). + <<"Content-Type">> => <<"application/json">> } }). -define(HTTP_BRIDGE(URL), ?HTTP_BRIDGE(URL, ?BRIDGE_NAME)). all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, single}, + {group, cluster} + ]. groups() -> - []. + SingleOnlyTests = [ + t_broken_bpapi_vsn, + t_old_bpapi_vsn, + t_bridges_probe + ], + [ + {single, [], emqx_common_test_helpers:all(?MODULE)}, + {cluster, [], emqx_common_test_helpers:all(?MODULE) -- SingleOnlyTests} + ]. suite() -> [{timetrap, {seconds, 60}}]. init_per_suite(Config) -> - _ = application:load(emqx_conf), - %% some testcases (may from other app) already get emqx_connector started - _ = application:stop(emqx_resource), - _ = application:stop(emqx_connector), - ok = emqx_mgmt_api_test_util:init_suite( - [emqx_rule_engine, emqx_bridge, emqx_authn] - ), - ok = emqx_common_test_helpers:load_config( - emqx_rule_engine_schema, - <<"rule_engine {rules {}}">> - ), - ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?CONF_DEFAULT), Config. end_per_suite(_Config) -> - emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_authn]), + ok. + +init_per_group(cluster, Config) -> + Cluster = mk_cluster_specs(Config), + ct:pal("Starting ~p", [Cluster]), + Nodes = [ + emqx_common_test_helpers:start_slave(Name, Opts) + || {Name, Opts} <- Cluster + ], + [NodePrimary | NodesRest] = Nodes, + ok = erpc:call(NodePrimary, fun() -> init_node(primary) end), + _ = [ok = erpc:call(Node, fun() -> init_node(regular) end) || Node <- NodesRest], + [{group, cluster}, {cluster_nodes, Nodes}, {api_node, NodePrimary} | Config]; +init_per_group(_, Config) -> + ok = emqx_mgmt_api_test_util:init_suite(?SUITE_APPS), + ok = load_suite_config(emqx_rule_engine), + ok = load_suite_config(emqx_bridge), + [{group, single}, {api_node, node()} | Config]. + +mk_cluster_specs(Config) -> + Specs = [ + {core, emqx_bridge_api_SUITE1, #{}}, + {core, emqx_bridge_api_SUITE2, #{}} + ], + CommonOpts = #{ + env => [{emqx, boot_modules, [broker]}], + apps => [], + % NOTE + % We need to start all those apps _after_ the cluster becomes stable, in the + % `init_node/1`. This is because usual order is broken in very subtle way: + % 1. Node starts apps including `mria` and `emqx_conf` which starts `emqx_cluster_rpc`. + % 2. The `emqx_cluster_rpc` sets up a mnesia table subscription during initialization. + % 3. In the meantime `mria` joins the cluster and notices it should restart. + % 4. Mnesia subscription becomes lost during restarts (god knows why). + % Yet we need to load them before, so that mria / mnesia will know which tables + % should be created in the cluster. + % TODO + % We probably should hide these intricacies behind the `emqx_common_test_helpers`. + load_apps => ?SUITE_APPS ++ [emqx_dashboard], + env_handler => fun load_suite_config/1, + load_schema => false, + priv_data_dir => ?config(priv_dir, Config) + }, + emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts). + +init_node(Type) -> + ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, fun load_suite_config/1), + case Type of + primary -> + ok = emqx_config:put( + [dashboard, listeners], + #{http => #{enable => true, bind => 18083}} + ), + ok = emqx_dashboard:start_listeners(), + ready = emqx_dashboard_listener:regenerate_minirest_dispatch(), + emqx_common_test_http:create_default_app(); + regular -> + ok + end. + +load_suite_config(emqx_rule_engine) -> + ok = emqx_common_test_helpers:load_config( + emqx_rule_engine_schema, + <<"rule_engine { rules {} }">> + ); +load_suite_config(emqx_bridge) -> + ok = emqx_common_test_helpers:load_config( + emqx_bridge_schema, + <<"bridges {}">> + ); +load_suite_config(_) -> + ok. + +end_per_group(cluster, Config) -> + ok = lists:foreach( + fun(Node) -> + _ = erpc:call(Node, emqx_common_test_helpers, stop_apps, [?SUITE_APPS]), + emqx_common_test_helpers:stop_slave(Node) + end, + ?config(cluster_nodes, Config) + ); +end_per_group(_, _Config) -> + emqx_mgmt_api_test_util:end_suite(?SUITE_APPS), mria:clear_table(emqx_authn_mnesia), ok. @@ -98,22 +187,7 @@ init_per_testcase(t_old_bpapi_vsn, Config) -> meck:expect(emqx_bpapi, supported_version, 1, 1), meck:expect(emqx_bpapi, supported_version, 2, 1), init_per_testcase(common, Config); -init_per_testcase(StartStop, Config) when - StartStop == t_start_stop_bridges_cluster; - StartStop == t_start_stop_bridges_node --> - meck:new(emqx_bridge_resource, [passthrough]), - meck:expect( - emqx_bridge_resource, - stop, - fun - (_, <<"bridge_not_found">>) -> {error, not_found}; - (Type, Name) -> meck:passthrough([Type, Name]) - end - ), - init_per_testcase(common, Config); init_per_testcase(_, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. @@ -123,17 +197,13 @@ end_per_testcase(t_broken_bpapi_vsn, Config) -> end_per_testcase(t_old_bpapi_vsn, Config) -> meck:unload([emqx_bpapi]), end_per_testcase(common, Config); -end_per_testcase(StartStop, Config) when - StartStop == t_start_stop_bridges_cluster; - StartStop == t_start_stop_bridges_node --> - meck:unload([emqx_bridge_resource]), - end_per_testcase(common, Config); end_per_testcase(_, Config) -> Sock = ?config(sock, Config), Acceptor = ?config(acceptor, Config), - stop_http_server(Sock, Acceptor), - clear_resources(), + Node = ?config(api_node, Config), + ok = emqx_common_test_helpers:call_janitor(), + ok = stop_http_server(Sock, Acceptor), + ok = erpc:call(Node, fun clear_resources/0), ok. clear_resources() -> @@ -215,35 +285,36 @@ parse_http_request(ReqStr0) -> t_http_crud_apis(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), - {ok, 404, _} = request(get, uri(["bridges", "foo"]), []), - {ok, 404, _} = request(get, uri(["bridges", "webhook:foo"]), []), + {ok, 404, _} = request(get, uri(["bridges", "foo"]), Config), + {ok, 404, _} = request(get, uri(["bridges", "webhook:foo"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), Name = ?BRIDGE_NAME, - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE_HTTP, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _], + <<"url">> := URL1 + }}, + request_json( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, Name), + Config + ) ), - %ct:pal("---bridge: ~p", [Bridge]), - #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _], - <<"url">> := URL1 - } = emqx_json:decode(Bridge, [return_maps]), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, - emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), + _ = publish_message(<<"emqx_webhook/1">>, Body, Config), ?assert( receive {http_server, received, #{ @@ -261,27 +332,26 @@ t_http_crud_apis(Config) -> ), %% update the request-path of the bridge URL2 = ?URL(Port, "path2"), - {ok, 200, Bridge2} = request( - put, - uri(["bridges", BridgeID]), - ?HTTP_BRIDGE(URL2, Name) - ), ?assertMatch( - #{ + {ok, 200, #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL2 - }, - emqx_json:decode(Bridge2, [return_maps]) + }}, + request_json( + put, + uri(["bridges", BridgeID]), + ?HTTP_BRIDGE(URL2, Name), + Config + ) ), %% list all bridges again, assert Bridge2 is in it - {ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []), ?assertMatch( - [ + {ok, 200, [ #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, @@ -290,26 +360,25 @@ t_http_crud_apis(Config) -> <<"node_status">> := [_ | _], <<"url">> := URL2 } - ], - emqx_json:decode(Bridge2Str, [return_maps]) + ]}, + request_json(get, uri(["bridges"]), Config) ), %% get the bridge by id - {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch( - #{ + {ok, 200, #{ <<"type">> := ?BRIDGE_TYPE_HTTP, <<"name">> := Name, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL2 - }, - emqx_json:decode(Bridge3Str, [return_maps]) + }}, + request_json(get, uri(["bridges", BridgeID]), Config) ), %% send an message to emqx again, check the path has been changed - emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), + _ = publish_message(<<"emqx_webhook/1">>, Body, Config), ?assert( receive {http_server, received, #{path := <<"/path2">>}} -> @@ -323,68 +392,64 @@ t_http_crud_apis(Config) -> ), %% Test bad updates - {ok, 400, PutFail1} = request( + {ok, 400, PutFail1} = request_json( put, uri(["bridges", BridgeID]), - maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name)) + maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name)), + Config ), ?assertMatch( #{<<"reason">> := <<"required_field">>}, - emqx_json:decode(maps:get(<<"message">>, emqx_json:decode(PutFail1, [return_maps])), [ - return_maps - ]) + json(maps:get(<<"message">>, PutFail1)) ), - {ok, 400, PutFail2} = request( + {ok, 400, PutFail2} = request_json( put, uri(["bridges", BridgeID]), - maps:put(<<"curl">>, URL2, maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name))) + maps:put(<<"curl">>, URL2, maps:remove(<<"url">>, ?HTTP_BRIDGE(URL2, Name))), + Config ), ?assertMatch( #{ <<"reason">> := <<"unknown_fields">>, <<"unknown">> := <<"curl">> }, - emqx_json:decode(maps:get(<<"message">>, emqx_json:decode(PutFail2, [return_maps])), [ - return_maps - ]) + json(maps:get(<<"message">>, PutFail2)) ), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% update a deleted bridge returns an error - {ok, 404, ErrMsg2} = request( - put, - uri(["bridges", BridgeID]), - ?HTTP_BRIDGE(URL2, Name) - ), ?assertMatch( - #{ + {ok, 404, #{ <<"code">> := <<"NOT_FOUND">>, <<"message">> := _ - }, - emqx_json:decode(ErrMsg2, [return_maps]) + }}, + request_json( + put, + uri(["bridges", BridgeID]), + ?HTTP_BRIDGE(URL2, Name), + Config + ) ), %% try delete bad bridge id - {ok, 404, BadId} = request(delete, uri(["bridges", "foo"]), []), ?assertMatch( - #{ + {ok, 404, #{ <<"code">> := <<"NOT_FOUND">>, <<"message">> := <<"Invalid bridge ID", _/binary>> - }, - emqx_json:decode(BadId, [return_maps]) + }}, + request_json(delete, uri(["bridges", "foo"]), Config) ), %% Deleting a non-existing bridge should result in an error - {ok, 404, ErrMsg3} = request(delete, uri(["bridges", BridgeID]), []), ?assertMatch( - #{ + {ok, 404, #{ <<"code">> := <<"NOT_FOUND">>, <<"message">> := _ - }, - emqx_json:decode(ErrMsg3, [return_maps]) + }}, + request_json(delete, uri(["bridges", BridgeID]), Config) ), %% Create non working bridge @@ -392,37 +457,54 @@ t_http_crud_apis(Config) -> {ok, 201, BrokenBridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(BrokenURL, Name) + ?HTTP_BRIDGE(BrokenURL, Name), + fun json/1, + Config + ), + ?assertMatch( + #{ + <<"type">> := ?BRIDGE_TYPE_HTTP, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Connection refused">>, + <<"node_status">> := [ + #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Connection refused">> + } + | _ + ], + <<"url">> := BrokenURL + }, + BrokenBridge + ), + + {ok, 200, FixedBridge} = request_json( + put, + uri(["bridges", BridgeID]), + ?HTTP_BRIDGE(URL1), + Config + ), + ?assertMatch( + #{ + <<"status">> := <<"connected">>, + <<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _] + } when + not is_map_key(<<"status_reason">>, FixedBridge) andalso + not is_map_key(<<"status_reason">>, FixedNodeStatus), + FixedBridge ), - #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := <<"disconnected">>, - <<"status_reason">> := <<"Connection refused">>, - <<"node_status">> := [ - #{<<"status">> := <<"disconnected">>, <<"status_reason">> := <<"Connection refused">>} - | _ - ], - <<"url">> := BrokenURL - } = emqx_json:decode(BrokenBridge, [return_maps]), - {ok, 200, FixedBridgeResponse} = request(put, uri(["bridges", BridgeID]), ?HTTP_BRIDGE(URL1)), - #{ - <<"status">> := <<"connected">>, - <<"node_status">> := [FixedNodeStatus = #{<<"status">> := <<"connected">>} | _] - } = FixedBridge = emqx_json:decode(FixedBridgeResponse, [return_maps]), - ?assert(not maps:is_key(<<"status_reason">>, FixedBridge)), - ?assert(not maps:is_key(<<"status_reason">>, FixedNodeStatus)), - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), %% Try create bridge with bad characters as name - {ok, 400, _} = request(post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, <<"隋达"/utf8>>)), - ok. + {ok, 400, _} = request(post, uri(["bridges"]), ?HTTP_BRIDGE(URL1, <<"隋达"/utf8>>), Config), + + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config). t_http_bridges_local_topic(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge @@ -433,13 +515,15 @@ t_http_bridges_local_topic(Config) -> {ok, 201, _} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name1) + ?HTTP_BRIDGE(URL1, Name1), + Config ), %% and we create another one without local_topic {ok, 201, _} = request( post, uri(["bridges"]), - maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, Name2)) + maps:remove(<<"local_topic">>, ?HTTP_BRIDGE(URL1, Name2)), + Config ), BridgeID1 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name1), BridgeID2 = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name2), @@ -447,7 +531,7 @@ t_http_bridges_local_topic(Config) -> %% This is to verify we can have 2 bridges with and without local_topic fields %% at the same time. Body = <<"my msg">>, - emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), + _ = publish_message(<<"emqx_webhook/1">>, Body, Config), ?assert( receive {http_server, received, #{ @@ -464,14 +548,13 @@ t_http_bridges_local_topic(Config) -> end ), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID1]), []), - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID2]), []), - ok. + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID1]), Config), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID2]), Config). t_check_dependent_actions_on_delete(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge @@ -481,9 +564,10 @@ t_check_dependent_actions_on_delete(Config) -> {ok, 201, _} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?HTTP_BRIDGE(URL1, Name), + Config ), - {ok, 201, Rule} = request( + {ok, 201, #{<<"id">> := RuleId}} = request_json( post, uri(["rules"]), #{ @@ -491,25 +575,23 @@ t_check_dependent_actions_on_delete(Config) -> <<"enable">> => true, <<"actions">> => [BridgeID], <<"sql">> => <<"SELECT * from \"t\"">> - } + }, + Config ), - #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]), %% deleting the bridge should fail because there is a rule that depends on it {ok, 400, _} = request( - delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=false", [] + delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=false", Config ), %% delete the rule first - {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config), %% then delete the bridge is OK - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - - ok. + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config). t_cascade_delete_actions(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge @@ -519,9 +601,10 @@ t_cascade_delete_actions(Config) -> {ok, 201, _} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?HTTP_BRIDGE(URL1, Name), + Config ), - {ok, 201, Rule} = request( + {ok, 201, #{<<"id">> := RuleId}} = request_json( post, uri(["rules"]), #{ @@ -529,27 +612,27 @@ t_cascade_delete_actions(Config) -> <<"enable">> => true, <<"actions">> => [BridgeID], <<"sql">> => <<"SELECT * from \"t\"">> - } + }, + Config ), - #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]), %% delete the bridge will also delete the actions from the rules {ok, 204, _} = request( - delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=true", [] + delete, + uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=true", + Config ), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), ?assertMatch( - #{ - <<"actions">> := [] - }, - emqx_json:decode(Rule1, [return_maps]) + {ok, 200, #{<<"actions">> := []}}, + request_json(get, uri(["rules", RuleId]), Config) ), - {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), Config), {ok, 201, _} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?HTTP_BRIDGE(URL1, Name), + Config ), {ok, 201, _} = request( post, @@ -559,12 +642,16 @@ t_cascade_delete_actions(Config) -> <<"enable">> => true, <<"actions">> => [BridgeID], <<"sql">> => <<"SELECT * from \"t\"">> - } + }, + Config ), - {ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - ok. + {ok, 204, _} = request( + delete, + uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", + Config + ), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config). t_broken_bpapi_vsn(Config) -> Port = ?config(port, Config), @@ -573,12 +660,13 @@ t_broken_bpapi_vsn(Config) -> {ok, 201, _Bridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?HTTP_BRIDGE(URL1, Name), + Config ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% still works since we redirect to 'restart' - {ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), - {ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), + {ok, 501, <<>>} = request(post, {operation, cluster, start, BridgeID}, Config), + {ok, 501, <<>>} = request(post, {operation, node, start, BridgeID}, Config), ok. t_old_bpapi_vsn(Config) -> @@ -588,31 +676,34 @@ t_old_bpapi_vsn(Config) -> {ok, 201, _Bridge} = request( post, uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?HTTP_BRIDGE(URL1, Name), + Config ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), - {ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>), - {ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, {operation, cluster, stop, BridgeID}, Config), + {ok, 204, <<>>} = request(post, {operation, node, stop, BridgeID}, Config), %% still works since we redirect to 'restart' - {ok, 204, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), - {ok, 204, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), - {ok, 204, <<>>} = request(post, operation_path(cluster, restart, BridgeID), <<"">>), - {ok, 204, <<>>} = request(post, operation_path(node, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, {operation, cluster, start, BridgeID}, Config), + {ok, 204, <<>>} = request(post, {operation, node, start, BridgeID}, Config), + {ok, 204, <<>>} = request(post, {operation, cluster, restart, BridgeID}, Config), + {ok, 204, <<>>} = request(post, {operation, node, restart, BridgeID}, Config), ok. -t_start_stop_bridges_node(Config) -> +t_start_bridge_unknown_node(Config) -> {ok, 404, _} = request( post, uri(["nodes", "thisbetterbenotanatomyet", "bridges", "webhook:foo", start]), - <<"">> + Config ), {ok, 404, _} = request( post, uri(["nodes", "undefined", "bridges", "webhook:foo", start]), - <<"">> - ), + Config + ). + +t_start_stop_bridges_node(Config) -> do_start_stop_bridges(node, Config). t_start_stop_bridges_cluster(Config) -> @@ -620,182 +711,250 @@ t_start_stop_bridges_cluster(Config) -> do_start_stop_bridges(Type, Config) -> %% assert we there's no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), Name = atom_to_binary(Type), - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE_HTTP, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"url">> := URL1 + }}, + request_json( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, Name), + Config + ) ), - %ct:pal("the bridge ==== ~p", [Bridge]), - #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := <<"connected">>, - <<"node_status">> := [_ | _], - <<"url">> := URL1 - } = emqx_json:decode(Bridge, [return_maps]), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), - %% stop it - {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), - {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])), - %% start again - {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])), - %% start a started bridge - {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), - {ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3_1, [return_maps])), - %% restart an already started bridge - {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])), - %% stop it again - {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), - %% restart a stopped bridge - {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), - {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])), - {ok, 404, _} = request(post, operation_path(Type, invalidop, BridgeID), <<"">>), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), + ExpectedStatus = + case ?config(group, Config) of + cluster when Type == node -> + <<"inconsistent">>; + _ -> + <<"stopped">> + end, + + %% stop it + {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := ExpectedStatus}}, + request_json(get, uri(["bridges", BridgeID]), Config) + ), + %% start again + {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["bridges", BridgeID]), Config) + ), + %% start a started bridge + {ok, 204, <<>>} = request(post, {operation, Type, start, BridgeID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["bridges", BridgeID]), Config) + ), + %% restart an already started bridge + {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["bridges", BridgeID]), Config) + ), + %% stop it again + {ok, 204, <<>>} = request(post, {operation, Type, stop, BridgeID}, Config), + %% restart a stopped bridge + {ok, 204, <<>>} = request(post, {operation, Type, restart, BridgeID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["bridges", BridgeID]), Config) + ), + + {ok, 404, _} = request(post, {operation, Type, invalidop, BridgeID}, Config), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% Fail parse-id check - {ok, 404, _} = request(post, operation_path(Type, start, <<"wreckbook_fugazi">>), <<"">>), + {ok, 404, _} = request(post, {operation, Type, start, <<"wreckbook_fugazi">>}, Config), %% Looks ok but doesn't exist - {ok, 404, _} = request(post, operation_path(Type, start, <<"webhook:cptn_hook">>), <<"">>), - - %% - {ok, 201, _Bridge} = request( - post, - uri(["bridges"]), - ?HTTP_BRIDGE(URL1, <<"bridge_not_found">>) - ), - {ok, 503, _} = request( - post, operation_path(Type, stop, <<"webhook:bridge_not_found">>), <<"">> - ), + {ok, 404, _} = request(post, {operation, Type, start, <<"webhook:cptn_hook">>}, Config), %% Create broken bridge {ListenPort, Sock} = listen_on_random_port(), %% Connecting to this endpoint should always timeout BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])), BadName = <<"bad_", (atom_to_binary(Type))/binary>>, - {ok, 201, BadBridge1} = request( - post, - uri(["bridges"]), - ?MQTT_BRIDGE(BadServer, BadName) + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE_MQTT, + <<"name">> := BadName, + <<"enable">> := true, + <<"server">> := BadServer, + <<"status">> := <<"connecting">>, + <<"node_status">> := [_ | _] + }}, + request_json( + post, + uri(["bridges"]), + ?MQTT_BRIDGE(BadServer, BadName), + Config + ) ), - #{ - <<"type">> := ?BRIDGE_TYPE_MQTT, - <<"name">> := BadName, - <<"enable">> := true, - <<"server">> := BadServer, - <<"status">> := <<"connecting">>, - <<"node_status">> := [_ | _] - } = emqx_json:decode(BadBridge1, [return_maps]), BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), ?assertMatch( {ok, SC, _} when SC == 500 orelse SC == 503, - request(post, operation_path(Type, start, BadBridgeID), <<"">>) + request(post, {operation, Type, start, BadBridgeID}, Config) ), ok = gen_tcp:close(Sock), ok. +t_start_stop_inconsistent_bridge_node(Config) -> + start_stop_inconsistent_bridge(node, Config). + +t_start_stop_inconsistent_bridge_cluster(Config) -> + start_stop_inconsistent_bridge(cluster, Config). + +start_stop_inconsistent_bridge(Type, Config) -> + Port = ?config(port, Config), + URL = ?URL(Port, "abc"), + Node = ?config(api_node, Config), + + erpc:call(Node, fun() -> + meck:new(emqx_bridge_resource, [passthrough, no_link]), + meck:expect( + emqx_bridge_resource, + stop, + fun + (_, <<"bridge_not_found">>) -> {error, not_found}; + (BridgeType, Name) -> meck:passthrough([BridgeType, Name]) + end + ) + end), + + emqx_common_test_helpers:on_exit(fun() -> + erpc:call(Node, fun() -> + meck:unload([emqx_bridge_resource]) + end) + end), + + {ok, 201, _Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL, <<"bridge_not_found">>), + Config + ), + {ok, 503, _} = request( + post, {operation, Type, stop, <<"webhook:bridge_not_found">>}, Config + ). + t_enable_disable_bridges(Config) -> %% assert we there's no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), Name = ?BRIDGE_NAME, Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE_HTTP, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"url">> := URL1 + }}, + request_json( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, Name), + Config + ) ), - %ct:pal("the bridge ==== ~p", [Bridge]), - #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := <<"connected">>, - <<"node_status">> := [_ | _], - <<"url">> := URL1 - } = emqx_json:decode(Bridge, [return_maps]), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% disable it - {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), - {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])), + {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"stopped">>}}, + request_json(get, uri(["bridges", BridgeID]), Config) + ), %% enable again - {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])), + {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["bridges", BridgeID]), Config) + ), %% enable an already started bridge - {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), - {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])), + {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["bridges", BridgeID]), Config) + ), %% disable it again - {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), + {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), Config), %% bad param - {ok, 404, _} = request(put, enable_path(foo, BridgeID), <<"">>), - {ok, 404, _} = request(put, enable_path(true, "foo"), <<"">>), - {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), <<"">>), + {ok, 404, _} = request(put, enable_path(foo, BridgeID), Config), + {ok, 404, _} = request(put, enable_path(true, "foo"), Config), + {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config), - {ok, 400, Res} = request(post, operation_path(node, start, BridgeID), <<"">>), + {ok, 400, Res} = request(post, {operation, node, start, BridgeID}, <<>>, fun json/1, Config), ?assertEqual( - <<"{\"code\":\"BAD_REQUEST\",\"message\":\"Forbidden operation, bridge not enabled\"}">>, + #{ + <<"code">> => <<"BAD_REQUEST">>, + <<"message">> => <<"Forbidden operation, bridge not enabled">> + }, Res ), - {ok, 400, Res} = request(post, operation_path(cluster, start, BridgeID), <<"">>), + {ok, 400, Res} = request(post, {operation, cluster, start, BridgeID}, <<>>, fun json/1, Config), %% enable a stopped bridge - {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), - {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])), + {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["bridges", BridgeID]), Config) + ), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config). t_reset_bridges(Config) -> %% assert there's no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), Name = ?BRIDGE_NAME, Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?assertMatch( + {ok, 201, #{ + <<"type">> := ?BRIDGE_TYPE_HTTP, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := <<"connected">>, + <<"node_status">> := [_ | _], + <<"url">> := URL1 + }}, + request_json( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, Name), + Config + ) ), - %ct:pal("the bridge ==== ~p", [Bridge]), - #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := <<"connected">>, - <<"node_status">> := [_ | _], - <<"url">> := URL1 - } = emqx_json:decode(Bridge, [return_maps]), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), - {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), + {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), Config), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config). -t_with_redact_update(_Config) -> +t_with_redact_update(Config) -> Name = <<"redact_update">>, Type = <<"mqtt">>, Password = <<"123456">>, @@ -812,20 +971,18 @@ t_with_redact_update(_Config) -> {ok, 201, _} = request( post, uri(["bridges"]), - Template + Template, + Config ), %% update with redacted config - Conf = emqx_misc:redact(Template), + BridgeConf = emqx_misc:redact(Template), BridgeID = emqx_bridge_resource:bridge_id(Type, Name), - {ok, 200, _ResBin} = request( - put, - uri(["bridges", BridgeID]), - Conf + {ok, 200, _} = request(put, uri(["bridges", BridgeID]), BridgeConf, Config), + ?assertEqual( + Password, + get_raw_config([bridges, Type, Name, password], Config) ), - RawConf = emqx:get_raw_config([bridges, Type, Name]), - Value = maps:get(<<"password">>, RawConf), - ?assertEqual(Password, Value), ok. t_bridges_probe(Config) -> @@ -835,59 +992,62 @@ t_bridges_probe(Config) -> {ok, 204, <<>>} = request( post, uri(["bridges_probe"]), - ?HTTP_BRIDGE(URL) + ?HTTP_BRIDGE(URL), + Config ), %% second time with same name is ok since no real bridge created {ok, 204, <<>>} = request( post, uri(["bridges_probe"]), - ?HTTP_BRIDGE(URL) + ?HTTP_BRIDGE(URL), + Config ), - {ok, 400, NxDomain} = request( - post, - uri(["bridges_probe"]), - ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>) - ), ?assertMatch( - #{ + {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := _ - }, - emqx_json:decode(NxDomain, [return_maps]) + }}, + request_json( + post, + uri(["bridges_probe"]), + ?HTTP_BRIDGE(<<"http://203.0.113.3:1234/foo">>), + Config + ) ), {ok, 204, _} = request( post, uri(["bridges_probe"]), - ?MQTT_BRIDGE(<<"127.0.0.1:1883">>) + ?MQTT_BRIDGE(<<"127.0.0.1:1883">>), + Config ), - {ok, 400, ConnRefused} = request( - post, - uri(["bridges_probe"]), - ?MQTT_BRIDGE(<<"127.0.0.1:2883">>) - ), ?assertMatch( - #{ + {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Connection refused">> - }, - emqx_json:decode(ConnRefused, [return_maps]) + }}, + request_json( + post, + uri(["bridges_probe"]), + ?MQTT_BRIDGE(<<"127.0.0.1:2883">>), + Config + ) ), - {ok, 400, CouldNotResolveHost} = request( - post, - uri(["bridges_probe"]), - ?MQTT_BRIDGE(<<"nohost:2883">>) - ), ?assertMatch( - #{ + {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Could not resolve host">> - }, - emqx_json:decode(CouldNotResolveHost, [return_maps]) + }}, + request_json( + post, + uri(["bridges_probe"]), + ?MQTT_BRIDGE(<<"nohost:2883">>), + Config + ) ), AuthnConfig = #{ @@ -896,118 +1056,123 @@ t_bridges_probe(Config) -> <<"user_id_type">> => <<"username">> }, Chain = 'mqtt:global', - emqx:update_config( + {ok, _} = update_config( [authentication], - {create_authenticator, Chain, AuthnConfig} + {create_authenticator, Chain, AuthnConfig}, + Config ), User = #{user_id => <<"u">>, password => <<"p">>}, AuthenticatorID = <<"password_based:built_in_database">>, - {ok, _} = emqx_authentication:add_user( + {ok, _} = add_user_auth( Chain, AuthenticatorID, - User + User, + Config ), - {ok, 400, Unauthorized} = request( - post, - uri(["bridges_probe"]), - ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{<<"proto_ver">> => <<"v4">>} - ), ?assertMatch( - #{ + {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Unauthorized client">> - }, - emqx_json:decode(Unauthorized, [return_maps]) + }}, + request_json( + post, + uri(["bridges_probe"]), + ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{<<"proto_ver">> => <<"v4">>}, + Config + ) ), - {ok, 400, Malformed} = request( - post, - uri(["bridges_probe"]), - ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{ - <<"proto_ver">> => <<"v4">>, <<"password">> => <<"mySecret">>, <<"username">> => <<"u">> - } - ), ?assertMatch( - #{ + {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Bad username or password">> - }, - emqx_json:decode(Malformed, [return_maps]) + }}, + request_json( + post, + uri(["bridges_probe"]), + ?MQTT_BRIDGE(<<"127.0.0.1:1883">>)#{ + <<"proto_ver">> => <<"v4">>, + <<"password">> => <<"mySecret">>, + <<"username">> => <<"u">> + }, + Config + ) ), - {ok, 400, NotAuthorized} = request( - post, - uri(["bridges_probe"]), - ?MQTT_BRIDGE(<<"127.0.0.1:1883">>) - ), ?assertMatch( - #{ + {ok, 400, #{ <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Not authorized">> - }, - emqx_json:decode(NotAuthorized, [return_maps]) + }}, + request_json( + post, + uri(["bridges_probe"]), + ?MQTT_BRIDGE(<<"127.0.0.1:1883">>), + Config + ) ), - {ok, 400, BadReq} = request( - post, - uri(["bridges_probe"]), - ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>) + ?assertMatch( + {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, + request_json( + post, + uri(["bridges_probe"]), + ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>), + Config + ) ), - ?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, emqx_json:decode(BadReq, [return_maps])), ok. t_metrics(Config) -> Port = ?config(port, Config), %% assert we there's no bridges at first - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 200, []} = request_json(get, uri(["bridges"]), Config), %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), Name = ?BRIDGE_NAME, - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?HTTP_BRIDGE(URL1, Name) + ?assertMatch( + {ok, 201, + Bridge = #{ + <<"type">> := ?BRIDGE_TYPE_HTTP, + <<"name">> := Name, + <<"enable">> := true, + <<"status">> := _, + <<"node_status">> := [_ | _], + <<"url">> := URL1 + }} when + %% assert that the bridge return doesn't contain metrics anymore + not is_map_key(<<"metrics">>, Bridge) andalso + not is_map_key(<<"node_metrics">>, Bridge), + request_json( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, Name), + Config + ) ), - %ct:pal("---bridge: ~p", [Bridge]), - Decoded = emqx_json:decode(Bridge, [return_maps]), - #{ - <<"type">> := ?BRIDGE_TYPE_HTTP, - <<"name">> := Name, - <<"enable">> := true, - <<"status">> := _, - <<"node_status">> := [_ | _], - <<"url">> := URL1 - } = Decoded, - - %% assert that the bridge return doesn't contain metrics anymore - ?assertNot(maps:is_key(<<"metrics">>, Decoded)), - ?assertNot(maps:is_key(<<"node_metrics">>, Decoded)), - BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name), %% check for empty bridge metrics - {ok, 200, Bridge1Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), ?assertMatch( - #{ + {ok, 200, #{ <<"metrics">> := #{<<"success">> := 0}, <<"node_metrics">> := [_ | _] - }, - emqx_json:decode(Bridge1Str, [return_maps]) + }}, + request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ), %% check that the bridge doesn't contain metrics anymore - {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []), - Decoded2 = emqx_json:decode(Bridge2Str, [return_maps]), - ?assertNot(maps:is_key(<<"metrics">>, Decoded2)), - ?assertNot(maps:is_key(<<"node_metrics">>, Decoded2)), + {ok, 200, Bridge} = request_json(get, uri(["bridges", BridgeID]), Config), + ?assertNot(maps:is_key(<<"metrics">>, Bridge)), + ?assertNot(maps:is_key(<<"node_metrics">>, Bridge)), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, - emqx:publish(emqx_message:make(<<"emqx_webhook/1">>, Body)), + _ = publish_message(<<"emqx_webhook/1">>, Body, Config), ?assert( receive {http_server, received, #{ @@ -1025,21 +1190,20 @@ t_metrics(Config) -> ), %% check for non-empty bridge metrics - {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID, "metrics"]), []), ?assertMatch( - #{ + {ok, 200, #{ <<"metrics">> := #{<<"success">> := _}, <<"node_metrics">> := [_ | _] - }, - emqx_json:decode(Bridge3Str, [return_maps]) + }}, + request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) ), %% check that metrics isn't returned when listing all bridges - {ok, 200, BridgesStr} = request(get, uri(["bridges"]), []), + {ok, 200, Bridges} = request_json(get, uri(["bridges"]), Config), ?assert( lists:all( fun(E) -> not maps:is_key(<<"metrics">>, E) end, - emqx_json:decode(BridgesStr, [return_maps]) + Bridges ) ), ok. @@ -1058,28 +1222,76 @@ t_inconsistent_webhook_request_timeouts(Config) -> <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} } ), - {ok, 201, RawResponse} = request( - post, - uri(["bridges"]), - BadBridgeParams - ), - %% note: same value on both fields ?assertMatch( - #{ + {ok, 201, #{ + %% note: same value on both fields <<"request_timeout">> := <<"2s">>, <<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>} - }, - emqx_json:decode(RawResponse, [return_maps]) + }}, + request_json( + post, + uri(["bridges"]), + BadBridgeParams, + Config + ) ), ok. -operation_path(node, Oper, BridgeID) -> - uri(["nodes", node(), "bridges", BridgeID, Oper]); -operation_path(cluster, Oper, BridgeID) -> +%% + +request(Method, URL, Config) -> + request(Method, URL, [], Config). + +request(Method, {operation, Type, Op, BridgeID}, Body, Config) -> + URL = operation_path(Type, Op, BridgeID, Config), + request(Method, URL, Body, Config); +request(Method, URL, Body, Config) -> + Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, + emqx_mgmt_api_test_util:request_api(Method, URL, [], auth_header(Config), Body, Opts). + +request(Method, URL, Body, Decoder, Config) -> + case request(Method, URL, Body, Config) of + {ok, Code, Response} -> + {ok, Code, Decoder(Response)}; + Otherwise -> + Otherwise + end. + +request_json(Method, URLLike, Config) -> + request(Method, URLLike, [], fun json/1, Config). + +request_json(Method, URLLike, Body, Config) -> + request(Method, URLLike, Body, fun json/1, Config). + +auth_header(Config) -> + erpc:call(?config(api_node, Config), emqx_common_test_http, default_auth_header, []). + +operation_path(node, Oper, BridgeID, Config) -> + uri(["nodes", ?config(api_node, Config), "bridges", BridgeID, Oper]); +operation_path(cluster, Oper, BridgeID, _Config) -> uri(["bridges", BridgeID, Oper]). enable_path(Enable, BridgeID) -> uri(["bridges", BridgeID, "enable", Enable]). +publish_message(Topic, Body, Config) -> + Node = ?config(api_node, Config), + erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]). + +update_config(Path, Value, Config) -> + Node = ?config(api_node, Config), + erpc:call(Node, emqx, update_config, [Path, Value]). + +get_raw_config(Path, Config) -> + Node = ?config(api_node, Config), + erpc:call(Node, emqx, get_raw_config, [Path]). + +add_user_auth(Chain, AuthenticatorID, User, Config) -> + Node = ?config(api_node, Config), + erpc:call(Node, emqx_authentication, add_user, [Chain, AuthenticatorID, User]). + str(S) when is_list(S) -> S; str(S) when is_binary(S) -> binary_to_list(S). + +json(B) when is_binary(B) -> + emqx_json:decode(B, [return_maps]).