From b7259d9a20d3083e77cbd7633860fe8fea5ea281 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 3 Jan 2023 18:11:29 +0800 Subject: [PATCH] test(mqtt-bridge): use empty payload template for ingress/egress mqtt bridge --- .../test/emqx_bridge_mqtt_SUITE.erl | 167 ++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index ae4fc4692..cc5e8a97a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -75,6 +75,29 @@ } }). +-define(INGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{ + <<"remote">> => #{ + <<"topic">> => <>, + <<"qos">> => 2 + }, + <<"local">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => <<"${retain}">> + } +}). + +-define(EGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{ + <<"local">> => #{ + <<"topic">> => <> + }, + <<"remote">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => <<"${retain}">> + } +}). + inspect(Selected, _Envs, _Args) -> persistent_term:put(?MODULE, #{inspect => Selected}). @@ -209,6 +232,76 @@ t_mqtt_conn_bridge_ingress(_) -> ok. +t_mqtt_conn_bridge_ingress_no_payload_template(_) -> + User1 = <<"user1">>, + %% create an MQTT bridge, using POST + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_INGRESS, + <<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_INGRESS + } = jsx:decode(Bridge), + BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), + + %% we now test if the bridge works as expected + RemoteTopic = <>, + LocalTopic = <>, + Payload = <<"hello">>, + emqx:subscribe(LocalTopic), + timer:sleep(100), + %% PUBLISH a message to the 'remote' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(RemoteTopic, Payload)), + %% we should receive a message on the local broker, with specified topic + ?assert( + receive + {deliver, LocalTopic, #message{payload = MapMsg}} -> + ct:pal("local broker got message: ~p on topic ~p", [MapMsg, LocalTopic]), + %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here. + case jsx:decode(MapMsg) of + #{<<"payload">> := Payload} -> + true; + _ -> + false + end; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 0, <<"received">> := 1} + } + ] + }, + jsx:decode(BridgeStr) + ), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + ok. + t_mqtt_conn_bridge_egress(_) -> %% then we add a mqtt connector, using POST User1 = <<"user1">>, @@ -276,6 +369,80 @@ t_mqtt_conn_bridge_egress(_) -> {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. +t_mqtt_conn_bridge_egress_no_payload_template(_) -> + %% then we add a mqtt connector, using POST + User1 = <<"user1">>, + + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_EGRESS + } = jsx:decode(Bridge), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + %% we now test if the bridge works as expected + LocalTopic = <>, + RemoteTopic = <>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + timer:sleep(100), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = MapMsg, from = From}} -> + ct:pal("local broker got message: ~p on topic ~p", [MapMsg, RemoteTopic]), + %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here. + Size = byte_size(ResourceID), + ?assertMatch(<>, From), + case jsx:decode(MapMsg) of + #{<<"payload">> := Payload} -> + true; + _ -> + false + end; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0} + } + ] + }, + jsx:decode(BridgeStr) + ), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + ok. + t_egress_custom_clientid_prefix(_Config) -> User1 = <<"user1">>, {ok, 201, Bridge} = request(