From 925d46fe86433aef4973424e8403239ddd00673e Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sat, 1 Jan 2022 04:12:20 +0800 Subject: [PATCH] fix(connector): add testcase for binding egress mqtt bridge to rules --- apps/emqx_bridge/src/emqx_bridge.erl | 15 +++ .../test/emqx_connector_api_SUITE.erl | 91 +++++++++++++++++++ 2 files changed, 106 insertions(+) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 0495f00e4..2e610b2b9 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -39,11 +39,14 @@ , lookup/3 , list/0 , list_bridges_by_connector/1 + , create/2 , create/3 , recreate/2 , recreate/3 , create_dry_run/2 + , remove/1 , remove/3 + , update/2 , update/3 , start/2 , stop/2 @@ -207,6 +210,10 @@ stop(Type, Name) -> restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). +create(BridgeId, Conf) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + create(BridgeType, BridgeName, Conf). + create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), @@ -217,6 +224,10 @@ create(Type, Name, Conf) -> {error, Reason} -> {error, Reason} end. +update(BridgeId, {OldConf, Conf}) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + update(BridgeType, BridgeName, {OldConf, Conf}). + update(Type, Name, {OldConf, Conf}) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% @@ -263,6 +274,10 @@ create_dry_run(Type, Conf) -> Error end. +remove(BridgeId) -> + {BridgeType, BridgeName} = parse_bridge_id(BridgeId), + remove(BridgeType, BridgeName, #{}). + remove(Type, Name, _Conf) -> ?SLOG(info, #{msg => "remove_bridge", type => Type, name => Name}), case emqx_resource:remove_local(resource_id(Type, Name)) of diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 96d793640..936982e75 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -110,8 +110,20 @@ init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), Config. end_per_testcase(_, _Config) -> + clear_resources(), ok. +clear_resources() -> + lists:foreach(fun(#{id := Id}) -> + ok = emqx_rule_engine:delete_rule(Id) + end, emqx_rule_engine:get_rules()), + lists:foreach(fun(#{id := Id}) -> + ok = emqx_bridge:remove(Id) + end, emqx_bridge:list()), + lists:foreach(fun(#{<<"id">> := Id}) -> + ok = emqx_connector:delete(Id) + end, emqx_connector:list()). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -507,6 +519,85 @@ t_ingress_mqtt_bridge_with_rules(_) -> {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). +t_egress_mqtt_bridge_with_rules(_) -> + {ok, 201, Connector} = request(post, uri(["connectors"]), + ?MQTT_CONNECOTR(<<"user1">>)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), + #{ <<"id">> := ConnctorID } = jsx:decode(Connector), + + {ok, 201, Bridge} = request(post, uri(["bridges"]), + ?MQTT_BRIDGE_EGRESS(ConnctorID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), + #{ <<"id">> := BridgeIDEgress } = jsx:decode(Bridge), + + {ok, 201, Rule} = request(post, uri(["rules"]), + #{<<"name">> => <<"A rule send messages to a sink mqtt bridge">>, + <<"enable">> => true, + <<"outputs">> => [BridgeIDEgress], + <<"sql">> => <<"SELECT * from \"t/1\"">> + }), + #{<<"id">> := RuleId} = jsx:decode(Rule), + + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = Payload}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + emqx:unsubscribe(RemoteTopic), + + %% PUBLISH a message to the rule. + Payload2 = <<"hi">>, + RuleTopic = <<"t/1">>, + RemoteTopic2 = <<"remote_topic/", RuleTopic/binary>>, + emqx:subscribe(RemoteTopic2), + emqx:publish(emqx_message:make(RuleTopic, Payload2)), + {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), + #{ <<"id">> := RuleId + , <<"metrics">> := #{<<"matched">> := 1} + } = jsx:decode(Rule1), + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic2, #message{payload = Payload2}} -> + ct:pal("local broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), + true; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch(#{ <<"id">> := BridgeIDEgress + , <<"metrics">> := ?metrics(2, 2, 0, _, _, _) + , <<"node_metrics">> := + [#{<<"node">> := _, <<"metrics">> := ?metrics(2, 2, 0, _, _, _)}] + }, jsx:decode(BridgeStr)), + + {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []). + %%-------------------------------------------------------------------- %% HTTP Request %%--------------------------------------------------------------------