From 55c18c0b5f2bdacb752fe242493253de7e6e4edb Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 22 Aug 2022 16:05:47 +0800 Subject: [PATCH] fix(bridges): update the test cases for new config structure --- apps/emqx_bridge/test/emqx_bridge_SUITE.erl | 36 +- .../test/emqx_bridge_mqtt_SUITE.erl | 447 ++++++++++ .../src/emqx_connector_mqtt.erl | 16 +- .../src/mqtt/emqx_connector_mqtt_msg.erl | 6 +- .../src/mqtt/emqx_connector_mqtt_schema.erl | 8 +- .../src/mqtt/emqx_connector_mqtt_worker.erl | 7 +- .../test/emqx_connector_SUITE.erl | 94 -- .../test/emqx_connector_api_SUITE.erl | 812 ------------------ 8 files changed, 481 insertions(+), 945 deletions(-) create mode 100644 apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl delete mode 100644 apps/emqx_connector/test/emqx_connector_SUITE.erl delete mode 100644 apps/emqx_connector/test/emqx_connector_api_SUITE.erl diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index d8266f83a..02abbddcb 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -89,36 +89,29 @@ t_get_basic_usage_info_1(_Config) -> ). setup_fake_telemetry_data() -> - ConnectorConf = - #{ - <<"connectors">> => - #{ - <<"mqtt">> => #{ - <<"my_mqtt_connector">> => - #{server => "127.0.0.1:1883"}, - <<"my_mqtt_connector2">> => - #{server => "127.0.0.1:1884"} - } - } - }, MQTTConfig1 = #{ - connector => <<"mqtt:my_mqtt_connector">>, + server => "127.0.0.1:1883", enable => true, - direction => ingress, - remote_topic => <<"aws/#">>, - remote_qos => 1 + ingress => #{ + remote => #{ + topic => <<"aws/#">>, + qos => 1 + } + } }, MQTTConfig2 = #{ - connector => <<"mqtt:my_mqtt_connector2">>, + server => "127.0.0.1:1884", enable => true, - direction => ingress, - remote_topic => <<"$bridges/mqtt:some_bridge_in">>, - remote_qos => 1 + ingress => #{ + remote => #{ + topic => <<"$bridges/mqtt:some_bridge_in">>, + qos => 1 + } + } }, HTTPConfig = #{ url => <<"http://localhost:9901/messages/${topic}">>, enable => true, - direction => egress, local_topic => "emqx_webhook/#", method => post, body => <<"${payload}">>, @@ -143,7 +136,6 @@ setup_fake_telemetry_data() -> } }, Opts = #{raw_with_default => true}, - ok = emqx_common_test_helpers:load_config(emqx_connector_schema, ConnectorConf, Opts), ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, Conf, Opts), ok = snabbkaffe:start_trace(), diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl new file mode 100644 index 000000000..27c101728 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -0,0 +1,447 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). + +-include("emqx/include/emqx.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include("emqx_dashboard/include/emqx_dashboard.hrl"). + +%% output functions +-export([inspect/3]). + +-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). +-define(TYPE_MQTT, <<"mqtt">>). +-define(NAME_MQTT, <<"my_mqtt_bridge">>). +-define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>). +-define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>). +-define(SERVER_CONF(Username), #{ + <<"server">> => <<"127.0.0.1:1883">>, + <<"username">> => Username, + <<"password">> => <<"">>, + <<"proto_ver">> => <<"v4">>, + <<"ssl">> => #{<<"enable">> => false} +}). + +-define(INGRESS_CONF, #{ + <<"remote">> => #{ + <<"topic">> => <<"remote_topic/#">>, + <<"qos">> => 2 + }, + <<"local">> => #{ + <<"topic">> => <<"local_topic/${topic}">>, + <<"qos">> => <<"${qos}">>, + <<"payload">> => <<"${payload}">>, + <<"retain">> => <<"${retain}">> + } +}). + +-define(EGRESS_CONF, #{ + <<"local">> => #{ + <<"topic">> => <<"local_topic/#">> + }, + <<"remote">> => #{ + <<"topic">> => <<"remote_topic/${topic}">>, + <<"payload">> => <<"${payload}">>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => <<"${retain}">> + } +}). + +-define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{ + <<"matched">> := MATCH, + <<"success">> := SUCC, + <<"failed">> := FAILED, + <<"rate">> := SPEED, + <<"rate_last5m">> := SPEED5M, + <<"rate_max">> := SPEEDMAX +}). + +inspect(Selected, _Envs, _Args) -> + persistent_term:put(?MODULE, #{inspect => Selected}). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +groups() -> + []. + +suite() -> + [{timetrap, {seconds, 30}}]. + +init_per_suite(Config) -> + _ = application:load(emqx_conf), + %% some testcases (may from other app) already get emqx_connector started + _ = application:stop(emqx_resource), + _ = application:stop(emqx_connector), + ok = emqx_common_test_helpers:start_apps( + [ + emqx_rule_engine, + emqx_bridge, + emqx_dashboard + ], + fun set_special_configs/1 + ), + ok = emqx_common_test_helpers:load_config( + emqx_rule_engine_schema, + <<"rule_engine {rules {}}">> + ), + ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps([ + emqx_rule_engine, + emqx_bridge, + emqx_dashboard + ]), + ok. + +set_special_configs(emqx_dashboard) -> + emqx_dashboard_api_test_helpers:set_default_config(<<"connector_admin">>); +set_special_configs(_) -> + ok. + +init_per_testcase(_, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), + Config. +end_per_testcase(_, _Config) -> + clear_resources(), + ok. + +clear_resources() -> + lists:foreach( + fun(#{id := Id}) -> + ok = emqx_rule_engine:delete_rule(Id) + end, + emqx_rule_engine:get_rules() + ), + lists:foreach( + fun(#{type := Type, name := Name}) -> + {ok, _} = emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ +t_mqtt_conn_bridge_ingress(_) -> + User1 = <<"user1">>, + %% create an MQTT bridge, using POST + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_INGRESS, + <<"ingress">> => ?INGRESS_CONF + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_INGRESS + } = jsx:decode(Bridge), + BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), + + %% we now test if the bridge works as expected + RemoteTopic = <<"remote_topic/1">>, + LocalTopic = <<"local_topic/", RemoteTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(LocalTopic), + timer:sleep(100), + %% PUBLISH a message to the 'remote' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(RemoteTopic, Payload)), + %% we should receive a message on the local broker, with specified topic + ?assert( + receive + {deliver, LocalTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + ok. + +t_mqtt_conn_bridge_egress(_) -> + %% then we add a mqtt connector, using POST + User1 = <<"user1">>, + + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_EGRESS + } = jsx:decode(Bridge), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + timer:sleep(100), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"metrics">> := ?metrics(1, 1, 0, _, _, _), + <<"node_metrics">> := + [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}] + }, + jsx:decode(BridgeStr) + ), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok. + +t_ingress_mqtt_bridge_with_rules(_) -> + {ok, 201, _} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(<<"user1">>)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_INGRESS, + <<"ingress">> => ?INGRESS_CONF + } + ), + BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), + + {ok, 201, Rule} = request( + post, + uri(["rules"]), + #{ + <<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>, + <<"enable">> => true, + <<"actions">> => [#{<<"function">> => "emqx_bridge_mqtt_SUITE:inspect"}], + <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> + } + ), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + + RemoteTopic = <<"remote_topic/1">>, + LocalTopic = <<"local_topic/", RemoteTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(LocalTopic), + timer:sleep(100), + %% PUBLISH a message to the 'remote' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(RemoteTopic, Payload)), + %% we should receive a message on the local broker, with specified topic + ?assert( + receive + {deliver, LocalTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + %% and also the rule should be matched, with matched + 1: + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + ?assertMatch( + #{ + <<"id">> := RuleId, + <<"metrics">> := #{ + <<"matched">> := 1, + <<"passed">> := 1, + <<"failed">> := 0, + <<"failed.exception">> := 0, + <<"failed.no_result">> := 0, + <<"matched.rate">> := _, + <<"matched.rate.max">> := _, + <<"matched.rate.last5m">> := _, + <<"actions.total">> := 1, + <<"actions.success">> := 1, + <<"actions.failed">> := 0, + <<"actions.failed.out_of_service">> := 0, + <<"actions.failed.unknown">> := 0 + } + }, + jsx:decode(Rule1) + ), + %% we also check if the actions of the rule is triggered + ?assertMatch( + #{ + inspect := #{ + event := <<"$bridges/mqtt", _/binary>>, + id := MsgId, + payload := Payload, + topic := RemoteTopic, + qos := 0, + dup := false, + retain := false, + pub_props := #{}, + timestamp := _ + } + } when is_binary(MsgId), + persistent_term:get(?MODULE) + ), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []). + +t_egress_mqtt_bridge_with_rules(_) -> + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(<<"user1">>)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF + } + ), + #{<<"type">> := ?TYPE_MQTT, <<"name">> := ?BRIDGE_NAME_EGRESS} = jsx:decode(Bridge), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + + {ok, 201, Rule} = request( + post, + uri(["rules"]), + #{ + <<"name">> => <<"A_rule_send_messages_to_a_sink_mqtt_bridge">>, + <<"enable">> => true, + <<"actions">> => [BridgeIDEgress], + <<"sql">> => <<"SELECT * from \"t/1\"">> + } + ), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + timer:sleep(100), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = Payload}} -> + ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + emqx:unsubscribe(RemoteTopic), + + %% PUBLISH a message to the rule. + Payload2 = <<"hi">>, + RuleTopic = <<"t/1">>, + RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, + emqx:subscribe(RemoteTopic2), + timer:sleep(100), + emqx:publish(emqx_message:make(RuleTopic, Payload2)), + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ + <<"id">> := RuleId, + <<"metrics">> := #{ + <<"matched">> := 1, + <<"passed">> := 1, + <<"failed">> := 0, + <<"failed.exception">> := 0, + <<"failed.no_result">> := 0, + <<"matched.rate">> := _, + <<"matched.rate.max">> := _, + <<"matched.rate.last5m">> := _, + <<"actions.total">> := 1, + <<"actions.success">> := 1, + <<"actions.failed">> := 0, + <<"actions.failed.out_of_service">> := 0, + <<"actions.failed.unknown">> := 0 + } + } = jsx:decode(Rule1), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic2, #message{payload = Payload2}} -> + ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"metrics">> := ?metrics(2, 2, 0, _, _, _), + <<"node_metrics">> := + [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}] + }, + jsx:decode(BridgeStr) + ), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []). + +request(Method, Url, Body) -> + request(<<"connector_admin">>, Method, Url, Body). diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index b57a94b36..bdf43885a 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -153,7 +153,7 @@ on_start(InstId, Conf) -> BridgeConf = BasicConf#{ name => InstanceId, clientid => clientid(InstId), - subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), InstId), + subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstId), forwards => make_forward_confs(maps:get(egress, Conf, undefined)) }, case ?MODULE:create_bridge(BridgeConf) of @@ -204,18 +204,18 @@ ensure_mqtt_worker_started(InstanceId, BridgeConf) -> {error, Reason} -> {error, Reason} end. -make_sub_confs(EmptyMap, _) when map_size(EmptyMap) == 0 -> +make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> undefined; -make_sub_confs(undefined, _) -> +make_sub_confs(undefined, _Conf, _) -> undefined; -make_sub_confs(SubRemoteConf, InstId) -> +make_sub_confs(SubRemoteConf, Conf, InstId) -> ResId = emqx_resource_manager:manager_id_to_resource_id(InstId), - case maps:take(hookpoint, SubRemoteConf) of + case maps:find(hookpoint, Conf) of error -> - SubRemoteConf; - {HookPoint, SubConf} -> + error({no_hookpoint_provided, Conf}); + {ok, HookPoint} -> MFA = {?MODULE, on_message_received, [HookPoint, ResId]}, - SubConf#{on_message_received => MFA} + SubRemoteConf#{on_message_received => MFA} end. make_forward_confs(EmptyMap) when map_size(EmptyMap) == 0 -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 7198521f2..fd5fc04f9 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -99,9 +99,9 @@ to_broker_msg( topic := TopicToken, payload := PayloadToken, qos := QoSToken, - retain := RetainToken, - mountpoint := Mountpoint - } + retain := RetainToken + }, + mountpoint := Mountpoint }, Props ) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index 040e5f392..d6c6b1fb7 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -42,13 +42,13 @@ fields("config") -> [ {ingress, mk( - ref(?MODULE, "ingress"), - #{default => #{}} + hoconsc:union([none, ref(?MODULE, "ingress")]), + #{default => undefined} )}, {egress, mk( - ref(?MODULE, "egress"), - #{default => #{}} + hoconsc:union([none, ref(?MODULE, "egress")]), + #{default => undefined} )} ]; fields("server_configs") -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index af5f5d3e7..3f3a4b9ce 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -250,9 +250,12 @@ pre_process_in_out(in, #{local := LC} = Conf) when is_map(Conf) -> Conf#{local => pre_process_in_out_common(LC)}; pre_process_in_out(in, Conf) when is_map(Conf) -> %% have no 'local' field in the config - Conf; + undefined; pre_process_in_out(out, #{remote := RC} = Conf) when is_map(Conf) -> - Conf#{remote => pre_process_in_out_common(RC)}. + Conf#{remote => pre_process_in_out_common(RC)}; +pre_process_in_out(out, Conf) when is_map(Conf) -> + %% have no 'remote' field in the config + undefined. pre_process_in_out_common(Conf0) -> Conf1 = pre_process_conf(topic, Conf0), diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl deleted file mode 100644 index c4a6418c2..000000000 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ /dev/null @@ -1,94 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_connector_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --include("emqx/include/emqx.hrl"). --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - --define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). --define(MQTT_CONNECTOR(Username), #{ - <<"server">> => <<"127.0.0.1:1883">>, - <<"username">> => Username, - <<"password">> => <<"">>, - <<"proto_ver">> => <<"v4">>, - <<"ssl">> => #{<<"enable">> => false} -}). --define(CONNECTOR_TYPE, <<"mqtt">>). --define(CONNECTOR_NAME, <<"test_connector_42">>). - -all() -> - emqx_common_test_helpers:all(?MODULE). - -groups() -> - []. - -suite() -> - []. - -init_per_suite(Config) -> - _ = application:load(emqx_conf), - %% some testcases (may from other app) already get emqx_connector started - _ = application:stop(emqx_resource), - _ = application:stop(emqx_connector), - ok = emqx_common_test_helpers:start_apps( - [ - emqx_connector, - emqx_bridge - ] - ), - ok = emqx_common_test_helpers:load_config(emqx_connector_schema, <<"connectors: {}">>), - Config. - -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([ - emqx_connector, - emqx_bridge - ]), - ok. - -init_per_testcase(_, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(), - Config. -end_per_testcase(_, _Config) -> - ok. - -t_list_raw_empty(_) -> - ok = emqx_config:erase(hd(emqx_connector:config_key_path())), - Result = emqx_connector:list_raw(), - ?assertEqual([], Result). - -t_lookup_raw_error(_) -> - Result = emqx_connector:lookup_raw(<<"foo:bar">>), - ?assertEqual({error, not_found}, Result). - -t_parse_connector_id_error(_) -> - ?assertError( - {invalid_connector_id, <<"foobar">>}, emqx_connector:parse_connector_id(<<"foobar">>) - ). - -t_update_connector_does_not_exist(_) -> - Config = ?MQTT_CONNECTOR(<<"user1">>), - ?assertMatch({ok, _Config}, emqx_connector:update(?CONNECTOR_TYPE, ?CONNECTOR_NAME, Config)). - -t_delete_connector_does_not_exist(_) -> - ?assertEqual({ok, #{post_config_update => #{}}}, emqx_connector:delete(<<"foo:bar">>)). - -t_connector_id_using_list(_) -> - <<"foo:bar">> = emqx_connector:connector_id("foo", "bar"). diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl deleted file mode 100644 index 8c2405fde..000000000 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ /dev/null @@ -1,812 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_connector_api_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). - --include("emqx/include/emqx.hrl"). --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). --include("emqx_dashboard/include/emqx_dashboard.hrl"). - -%% output functions --export([inspect/3]). - --define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). --define(CONNECTR_TYPE, <<"mqtt">>). --define(CONNECTR_NAME, <<"test_connector">>). --define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>). --define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>). --define(MQTT_CONNECTOR(Username), #{ - <<"server">> => <<"127.0.0.1:1883">>, - <<"username">> => Username, - <<"password">> => <<"">>, - <<"proto_ver">> => <<"v4">>, - <<"ssl">> => #{<<"enable">> => false} -}). --define(MQTT_CONNECTOR2(Server), ?MQTT_CONNECTOR(<<"user1">>)#{<<"server">> => Server}). - --define(MQTT_BRIDGE_INGRESS(ID), #{ - <<"connector">> => ID, - <<"direction">> => <<"ingress">>, - <<"remote_topic">> => <<"remote_topic/#">>, - <<"remote_qos">> => 2, - <<"local_topic">> => <<"local_topic/${topic}">>, - <<"local_qos">> => <<"${qos}">>, - <<"payload">> => <<"${payload}">>, - <<"retain">> => <<"${retain}">> -}). - --define(MQTT_BRIDGE_EGRESS(ID), #{ - <<"connector">> => ID, - <<"direction">> => <<"egress">>, - <<"local_topic">> => <<"local_topic/#">>, - <<"remote_topic">> => <<"remote_topic/${topic}">>, - <<"payload">> => <<"${payload}">>, - <<"remote_qos">> => <<"${qos}">>, - <<"retain">> => <<"${retain}">> -}). - --define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{ - <<"matched">> := MATCH, - <<"success">> := SUCC, - <<"failed">> := FAILED, - <<"rate">> := SPEED, - <<"rate_last5m">> := SPEED5M, - <<"rate_max">> := SPEEDMAX -}). - -inspect(Selected, _Envs, _Args) -> - persistent_term:put(?MODULE, #{inspect => Selected}). - -all() -> - emqx_common_test_helpers:all(?MODULE). - -groups() -> - []. - -suite() -> - [{timetrap, {seconds, 30}}]. - -init_per_suite(Config) -> - _ = application:load(emqx_conf), - %% some testcases (may from other app) already get emqx_connector started - _ = application:stop(emqx_resource), - _ = application:stop(emqx_connector), - ok = emqx_common_test_helpers:start_apps( - [ - emqx_rule_engine, - emqx_connector, - emqx_bridge, - emqx_dashboard - ], - fun set_special_configs/1 - ), - ok = emqx_common_test_helpers:load_config(emqx_connector_schema, <<"connectors: {}">>), - ok = emqx_common_test_helpers:load_config( - emqx_rule_engine_schema, - <<"rule_engine {rules {}}">> - ), - ok = emqx_common_test_helpers:load_config(emqx_bridge_schema, ?BRIDGE_CONF_DEFAULT), - Config. - -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([ - emqx_rule_engine, - emqx_connector, - emqx_bridge, - emqx_dashboard - ]), - ok. - -set_special_configs(emqx_dashboard) -> - emqx_dashboard_api_test_helpers:set_default_config(<<"connector_admin">>); -set_special_configs(_) -> - ok. - -init_per_testcase(_, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), - Config. -end_per_testcase(_, _Config) -> - clear_resources(), - ok. - -clear_resources() -> - lists:foreach( - fun(#{id := Id}) -> - ok = emqx_rule_engine:delete_rule(Id) - end, - emqx_rule_engine:get_rules() - ), - lists:foreach( - fun(#{type := Type, name := Name}) -> - {ok, _} = emqx_bridge:remove(Type, Name) - end, - emqx_bridge:list() - ), - lists:foreach( - fun(#{<<"type">> := Type, <<"name">> := Name}) -> - {ok, _} = emqx_connector:delete(Type, Name) - end, - emqx_connector:list_raw() - ). - -%%------------------------------------------------------------------------------ -%% Testcases -%%------------------------------------------------------------------------------ - -t_mqtt_crud_apis(_) -> - %% assert we there's no connectors at first - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - - %% then we add a mqtt connector, using POST - %% POST /connectors/ will create a connector - User1 = <<"user1">>, - {ok, 400, << - "{\"code\":\"BAD_REQUEST\",\"message\"" - ":\"missing some required fields: [name, type]\"}" - >>} = - request( - post, - uri(["connectors"]), - ?MQTT_CONNECTOR(User1)#{<<"type">> => ?CONNECTR_TYPE} - ), - {ok, 201, Connector} = request( - post, - uri(["connectors"]), - ?MQTT_CONNECTOR(User1)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?CONNECTR_NAME - } - ), - - #{ - <<"type">> := ?CONNECTR_TYPE, - <<"name">> := ?CONNECTR_NAME, - <<"server">> := <<"127.0.0.1:1883">>, - <<"username">> := User1, - <<"password">> := <<"">>, - <<"proto_ver">> := <<"v4">>, - <<"ssl">> := #{<<"enable">> := false} - } = jsx:decode(Connector), - ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME), - %% update the request-path of the connector - User2 = <<"user2">>, - {ok, 200, Connector2} = request( - put, - uri(["connectors", ConnctorID]), - ?MQTT_CONNECTOR(User2) - ), - ?assertMatch( - #{ - <<"type">> := ?CONNECTR_TYPE, - <<"name">> := ?CONNECTR_NAME, - <<"server">> := <<"127.0.0.1:1883">>, - <<"username">> := User2, - <<"password">> := <<"">>, - <<"proto_ver">> := <<"v4">>, - <<"ssl">> := #{<<"enable">> := false} - }, - jsx:decode(Connector2) - ), - - %% list all connectors again, assert Connector2 is in it - {ok, 200, Connector2Str} = request(get, uri(["connectors"]), []), - ?assertMatch( - [ - #{ - <<"type">> := ?CONNECTR_TYPE, - <<"name">> := ?CONNECTR_NAME, - <<"server">> := <<"127.0.0.1:1883">>, - <<"username">> := User2, - <<"password">> := <<"">>, - <<"proto_ver">> := <<"v4">>, - <<"ssl">> := #{<<"enable">> := false} - } - ], - jsx:decode(Connector2Str) - ), - - %% get the connector by id - {ok, 200, Connector3Str} = request(get, uri(["connectors", ConnctorID]), []), - ?assertMatch( - #{ - <<"type">> := ?CONNECTR_TYPE, - <<"name">> := ?CONNECTR_NAME, - <<"server">> := <<"127.0.0.1:1883">>, - <<"username">> := User2, - <<"password">> := <<"">>, - <<"proto_ver">> := <<"v4">>, - <<"ssl">> := #{<<"enable">> := false} - }, - jsx:decode(Connector3Str) - ), - - %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - - %% update a deleted connector returns an error - {ok, 404, ErrMsg2} = request( - put, - uri(["connectors", ConnctorID]), - ?MQTT_CONNECTOR(User2) - ), - ?assertMatch( - #{ - <<"code">> := _, - <<"message">> := <<"connector not found">> - }, - jsx:decode(ErrMsg2) - ), - ok. - -t_mqtt_conn_bridge_ingress(_) -> - %% then we add a mqtt connector, using POST - User1 = <<"user1">>, - {ok, 201, Connector} = request( - post, - uri(["connectors"]), - ?MQTT_CONNECTOR(User1)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?CONNECTR_NAME - } - ), - - #{ - <<"type">> := ?CONNECTR_TYPE, - <<"name">> := ?CONNECTR_NAME, - <<"server">> := <<"127.0.0.1:1883">>, - <<"num_of_bridges">> := 0, - <<"username">> := User1, - <<"password">> := <<"">>, - <<"proto_ver">> := <<"v4">>, - <<"ssl">> := #{<<"enable">> := false} - } = jsx:decode(Connector), - ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME), - %% ... and a MQTT bridge, using POST - %% we bind this bridge to the connector created just now - timer:sleep(50), - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?BRIDGE_NAME_INGRESS - } - ), - #{ - <<"type">> := ?CONNECTR_TYPE, - <<"name">> := ?BRIDGE_NAME_INGRESS, - <<"connector">> := ConnctorID - } = jsx:decode(Bridge), - BridgeIDIngress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS), - wait_for_resource_ready(BridgeIDIngress, 5), - - %% we now test if the bridge works as expected - RemoteTopic = <<"remote_topic/1">>, - LocalTopic = <<"local_topic/", RemoteTopic/binary>>, - Payload = <<"hello">>, - emqx:subscribe(LocalTopic), - timer:sleep(100), - %% PUBLISH a message to the 'remote' broker, as we have only one broker, - %% the remote broker is also the local one. - emqx:publish(emqx_message:make(RemoteTopic, Payload)), - %% we should receive a message on the local broker, with specified topic - ?assert( - receive - {deliver, LocalTopic, #message{payload = Payload}} -> - ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), - - %% get the connector by id, verify the num_of_bridges now is 1 - {ok, 200, Connector1Str} = request(get, uri(["connectors", ConnctorID]), []), - ?assertMatch(#{<<"num_of_bridges">> := 1}, jsx:decode(Connector1Str)), - - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - - %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - ok. - -t_mqtt_conn_bridge_egress(_) -> - %% then we add a mqtt connector, using POST - User1 = <<"user1">>, - {ok, 201, Connector} = request( - post, - uri(["connectors"]), - ?MQTT_CONNECTOR(User1)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?CONNECTR_NAME - } - ), - - %ct:pal("---connector: ~p", [Connector]), - #{ - <<"server">> := <<"127.0.0.1:1883">>, - <<"username">> := User1, - <<"password">> := <<"">>, - <<"proto_ver">> := <<"v4">>, - <<"ssl">> := #{<<"enable">> := false} - } = jsx:decode(Connector), - ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME), - %% ... and a MQTT bridge, using POST - %% we bind this bridge to the connector created just now - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?BRIDGE_NAME_EGRESS - } - ), - #{ - <<"type">> := ?CONNECTR_TYPE, - <<"name">> := ?BRIDGE_NAME_EGRESS, - <<"connector">> := ConnctorID - } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), - wait_for_resource_ready(BridgeIDEgress, 5), - - %% we now test if the bridge works as expected - LocalTopic = <<"local_topic/1">>, - RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, - Payload = <<"hello">>, - emqx:subscribe(RemoteTopic), - timer:sleep(100), - %% PUBLISH a message to the 'local' broker, as we have only one broker, - %% the remote broker is also the local one. - emqx:publish(emqx_message:make(LocalTopic, Payload)), - - %% we should receive a message on the "remote" broker, with specified topic - ?assert( - receive - {deliver, RemoteTopic, #message{payload = Payload}} -> - ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), - - %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), - ?assertMatch( - #{ - <<"metrics">> := ?metrics(1, 1, 0, _, _, _), - <<"node_metrics">> := - [#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}] - }, - jsx:decode(BridgeStr) - ), - - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - - %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []), - ok. - -%% t_mqtt_conn_update: -%% - update a connector should also update all of the the bridges -%% - cannot delete a connector that is used by at least one bridge -t_mqtt_conn_update(_) -> - %% then we add a mqtt connector, using POST - {ok, 201, Connector} = request( - post, - uri(["connectors"]), - ?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?CONNECTR_NAME - } - ), - - %ct:pal("---connector: ~p", [Connector]), - #{<<"server">> := <<"127.0.0.1:1883">>} = jsx:decode(Connector), - ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME), - %% ... and a MQTT bridge, using POST - %% we bind this bridge to the connector created just now - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?BRIDGE_NAME_EGRESS - } - ), - #{ - <<"type">> := ?CONNECTR_TYPE, - <<"name">> := ?BRIDGE_NAME_EGRESS, - <<"connector">> := ConnctorID - } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), - wait_for_resource_ready(BridgeIDEgress, 5), - - %% Then we try to update 'server' of the connector, to an unavailable IP address - %% The update OK, we recreate the resource even if the resource is current connected, - %% and the target resource we're going to update is unavailable. - {ok, 200, _} = request( - put, - uri(["connectors", ConnctorID]), - ?MQTT_CONNECTOR2(<<"127.0.0.1:2603">>) - ), - %% we fix the 'server' parameter to a normal one, it should work - {ok, 200, _} = request( - put, - uri(["connectors", ConnctorID]), - ?MQTT_CONNECTOR2(<<"127.0.0.1 : 1883">>) - ), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - - %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). - -t_mqtt_conn_update2(_) -> - %% then we add a mqtt connector, using POST - %% but this connector is point to a unreachable server "2603" - {ok, 201, Connector} = request( - post, - uri(["connectors"]), - ?MQTT_CONNECTOR2(<<"127.0.0.1:2603">>)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?CONNECTR_NAME - } - ), - - #{<<"server">> := <<"127.0.0.1:2603">>} = jsx:decode(Connector), - ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME), - %% ... and a MQTT bridge, using POST - %% we bind this bridge to the connector created just now - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?BRIDGE_NAME_EGRESS - } - ), - #{ - <<"type">> := ?CONNECTR_TYPE, - <<"name">> := ?BRIDGE_NAME_EGRESS, - <<"status">> := <<"disconnected">>, - <<"connector">> := ConnctorID - } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), - %% We try to fix the 'server' parameter, to another unavailable server.. - %% The update should success: we don't check the connectivity of the new config - %% if the resource is now disconnected. - {ok, 200, _} = request( - put, - uri(["connectors", ConnctorID]), - ?MQTT_CONNECTOR2(<<"127.0.0.1:2604">>) - ), - %% we fix the 'server' parameter to a normal one, it should work - {ok, 200, _} = request( - put, - uri(["connectors", ConnctorID]), - ?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>) - ), - wait_for_resource_ready(BridgeIDEgress, 5), - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(BridgeStr)), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - - %% delete the connector - {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []), - {ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []). - -t_mqtt_conn_update3(_) -> - %% we add a mqtt connector, using POST - {ok, 201, _} = request( - post, - uri(["connectors"]), - ?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?CONNECTR_NAME - } - ), - ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME), - %% ... and a MQTT bridge, using POST - %% we bind this bridge to the connector created just now - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?BRIDGE_NAME_EGRESS - } - ), - #{<<"connector">> := ConnctorID} = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), - wait_for_resource_ready(BridgeIDEgress, 5), - - %% delete the connector should fail because it is in use by a bridge - {ok, 403, _} = request(delete, uri(["connectors", ConnctorID]), []), - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - %% the connector now can be deleted without problems - {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). - -t_mqtt_conn_testing(_) -> - %% APIs for testing the connectivity - %% then we add a mqtt connector, using POST - {ok, 204, <<>>} = request( - post, - uri(["connectors_test"]), - ?MQTT_CONNECTOR2(<<"127.0.0.1:1883">>)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?BRIDGE_NAME_EGRESS - } - ), - {ok, 400, _} = request( - post, - uri(["connectors_test"]), - ?MQTT_CONNECTOR2(<<"127.0.0.1:2883">>)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?BRIDGE_NAME_EGRESS - } - ). - -t_ingress_mqtt_bridge_with_rules(_) -> - {ok, 201, _} = request( - post, - uri(["connectors"]), - ?MQTT_CONNECTOR(<<"user1">>)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?CONNECTR_NAME - } - ), - ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME), - - {ok, 201, _} = request( - post, - uri(["bridges"]), - ?MQTT_BRIDGE_INGRESS(ConnctorID)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?BRIDGE_NAME_INGRESS - } - ), - BridgeIDIngress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_INGRESS), - - {ok, 201, Rule} = request( - post, - uri(["rules"]), - #{ - <<"name">> => <<"A_rule_get_messages_from_a_source_mqtt_bridge">>, - <<"enable">> => true, - <<"actions">> => [#{<<"function">> => "emqx_connector_api_SUITE:inspect"}], - <<"sql">> => <<"SELECT * from \"$bridges/", BridgeIDIngress/binary, "\"">> - } - ), - #{<<"id">> := RuleId} = jsx:decode(Rule), - - %% we now test if the bridge works as expected - - RemoteTopic = <<"remote_topic/1">>, - LocalTopic = <<"local_topic/", RemoteTopic/binary>>, - Payload = <<"hello">>, - emqx:subscribe(LocalTopic), - timer:sleep(100), - %% PUBLISH a message to the 'remote' broker, as we have only one broker, - %% the remote broker is also the local one. - wait_for_resource_ready(BridgeIDIngress, 5), - emqx:publish(emqx_message:make(RemoteTopic, Payload)), - %% we should receive a message on the local broker, with specified topic - ?assert( - receive - {deliver, LocalTopic, #message{payload = Payload}} -> - ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), - %% and also the rule should be matched, with matched + 1: - {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), - #{ - <<"id">> := RuleId, - <<"metrics">> := #{ - <<"matched">> := 1, - <<"passed">> := 1, - <<"failed">> := 0, - <<"failed.exception">> := 0, - <<"failed.no_result">> := 0, - <<"matched.rate">> := _, - <<"matched.rate.max">> := _, - <<"matched.rate.last5m">> := _, - <<"actions.total">> := 1, - <<"actions.success">> := 1, - <<"actions.failed">> := 0, - <<"actions.failed.out_of_service">> := 0, - <<"actions.failed.unknown">> := 0 - } - } = jsx:decode(Rule1), - %% we also check if the actions of the rule is triggered - ?assertMatch( - #{ - inspect := #{ - event := <<"$bridges/mqtt", _/binary>>, - id := MsgId, - payload := Payload, - topic := RemoteTopic, - qos := 0, - dup := false, - retain := false, - pub_props := #{}, - timestamp := _ - } - } when is_binary(MsgId), - persistent_term:get(?MODULE) - ), - - {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), - {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). - -t_egress_mqtt_bridge_with_rules(_) -> - {ok, 201, _} = request( - post, - uri(["connectors"]), - ?MQTT_CONNECTOR(<<"user1">>)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?CONNECTR_NAME - } - ), - ConnctorID = emqx_connector:connector_id(?CONNECTR_TYPE, ?CONNECTR_NAME), - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ - <<"type">> => ?CONNECTR_TYPE, - <<"name">> => ?BRIDGE_NAME_EGRESS - } - ), - #{<<"type">> := ?CONNECTR_TYPE, <<"name">> := ?BRIDGE_NAME_EGRESS} = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?CONNECTR_TYPE, ?BRIDGE_NAME_EGRESS), - - {ok, 201, Rule} = request( - post, - uri(["rules"]), - #{ - <<"name">> => <<"A_rule_send_messages_to_a_sink_mqtt_bridge">>, - <<"enable">> => true, - <<"actions">> => [BridgeIDEgress], - <<"sql">> => <<"SELECT * from \"t/1\"">> - } - ), - #{<<"id">> := RuleId} = jsx:decode(Rule), - - %% we now test if the bridge works as expected - LocalTopic = <<"local_topic/1">>, - RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, - Payload = <<"hello">>, - emqx:subscribe(RemoteTopic), - timer:sleep(100), - %% PUBLISH a message to the 'local' broker, as we have only one broker, - %% the remote broker is also the local one. - wait_for_resource_ready(BridgeIDEgress, 5), - emqx:publish(emqx_message:make(LocalTopic, Payload)), - %% we should receive a message on the "remote" broker, with specified topic - ?assert( - receive - {deliver, RemoteTopic, #message{payload = Payload}} -> - ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), - emqx:unsubscribe(RemoteTopic), - - %% PUBLISH a message to the rule. - Payload2 = <<"hi">>, - RuleTopic = <<"t/1">>, - RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, - emqx:subscribe(RemoteTopic2), - timer:sleep(100), - wait_for_resource_ready(BridgeIDEgress, 5), - emqx:publish(emqx_message:make(RuleTopic, Payload2)), - {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), - #{ - <<"id">> := RuleId, - <<"metrics">> := #{ - <<"matched">> := 1, - <<"passed">> := 1, - <<"failed">> := 0, - <<"failed.exception">> := 0, - <<"failed.no_result">> := 0, - <<"matched.rate">> := _, - <<"matched.rate.max">> := _, - <<"matched.rate.last5m">> := _, - <<"actions.total">> := 1, - <<"actions.success">> := 1, - <<"actions.failed">> := 0, - <<"actions.failed.out_of_service">> := 0, - <<"actions.failed.unknown">> := 0 - } - } = jsx:decode(Rule1), - %% we should receive a message on the "remote" broker, with specified topic - ?assert( - receive - {deliver, RemoteTopic2, #message{payload = Payload2}} -> - ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), - - %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), - ?assertMatch( - #{ - <<"metrics">> := ?metrics(2, 2, 0, _, _, _), - <<"node_metrics">> := - [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}] - }, - jsx:decode(BridgeStr) - ), - - {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). - -request(Method, Url, Body) -> - request(<<"connector_admin">>, Method, Url, Body). - -wait_for_resource_ready(InstId, 0) -> - ct:pal("--- bridge ~p: ~p", [InstId, emqx_bridge:lookup(InstId)]), - ct:fail(wait_resource_timeout); -wait_for_resource_ready(InstId, Retry) -> - case emqx_bridge:lookup(InstId) of - {ok, #{resource_data := #{status := connected}}} -> - ok; - _ -> - timer:sleep(100), - wait_for_resource_ready(InstId, Retry - 1) - end.