From 59ac0b14247723fb4ffc0a99a2f5d70624b5f8b6 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 3 Jan 2023 17:54:07 +0800 Subject: [PATCH 1/2] fix(mqtt-bridge): transmit raw msg payload with empty template --- .../src/mqtt/emqx_connector_mqtt_msg.erl | 17 +++++++++-------- changes/v5.0.14/fix-9672.en.md | 1 + changes/v5.0.14/fix-9672.zh.md | 1 + 3 files changed, 11 insertions(+), 8 deletions(-) create mode 100644 changes/v5.0.14/fix-9672.en.md create mode 100644 changes/v5.0.14/fix-9672.zh.md diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index bdd516db6..defbbaea2 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -71,14 +71,13 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> to_remote_msg(MapMsg, #{ remote := #{ topic := TopicToken, - payload := PayloadToken, qos := QoSToken, retain := RetainToken - }, + } = Remote, mountpoint := Mountpoint }) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = process_payload(PayloadToken, MapMsg), + Payload = process_payload(Remote, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), PubProps = maps:get(pub_props, MapMsg, #{}), @@ -100,16 +99,15 @@ to_broker_msg( #{ local := #{ topic := TopicToken, - payload := PayloadToken, qos := QoSToken, retain := RetainToken - }, + } = Local, mountpoint := Mountpoint }, Props ) -> Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = process_payload(PayloadToken, MapMsg), + Payload = process_payload(Local, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), PubProps = maps:get(pub_props, MapMsg, #{}), @@ -121,9 +119,12 @@ to_broker_msg( ) ). -process_payload([], Msg) -> +process_payload(From, MapMsg) -> + do_process_payload(maps:get(payload, From, undefined), MapMsg). + +do_process_payload(undefined, Msg) -> emqx_json:encode(Msg); -process_payload(Tks, Msg) -> +do_process_payload(Tks, Msg) -> replace_vars_in_str(Tks, Msg). %% Replace a string contains vars to another string in which the placeholders are replace by the diff --git a/changes/v5.0.14/fix-9672.en.md b/changes/v5.0.14/fix-9672.en.md new file mode 100644 index 000000000..01724801e --- /dev/null +++ b/changes/v5.0.14/fix-9672.en.md @@ -0,0 +1 @@ +Fix the problem that the bridge is not available when the Payload template is empty in the MQTT bridge. diff --git a/changes/v5.0.14/fix-9672.zh.md b/changes/v5.0.14/fix-9672.zh.md new file mode 100644 index 000000000..86106cc1d --- /dev/null +++ b/changes/v5.0.14/fix-9672.zh.md @@ -0,0 +1 @@ +修复 MQTT 桥接中 Payload 模板为空时桥接不可用的问题。 From b7259d9a20d3083e77cbd7633860fe8fea5ea281 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 3 Jan 2023 18:11:29 +0800 Subject: [PATCH 2/2] test(mqtt-bridge): use empty payload template for ingress/egress mqtt bridge --- .../test/emqx_bridge_mqtt_SUITE.erl | 167 ++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index ae4fc4692..cc5e8a97a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -75,6 +75,29 @@ } }). +-define(INGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{ + <<"remote">> => #{ + <<"topic">> => <>, + <<"qos">> => 2 + }, + <<"local">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => <<"${retain}">> + } +}). + +-define(EGRESS_CONF_NO_PAYLOAD_TEMPLATE, #{ + <<"local">> => #{ + <<"topic">> => <> + }, + <<"remote">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => <<"${retain}">> + } +}). + inspect(Selected, _Envs, _Args) -> persistent_term:put(?MODULE, #{inspect => Selected}). @@ -209,6 +232,76 @@ t_mqtt_conn_bridge_ingress(_) -> ok. +t_mqtt_conn_bridge_ingress_no_payload_template(_) -> + User1 = <<"user1">>, + %% create an MQTT bridge, using POST + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_INGRESS, + <<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_INGRESS + } = jsx:decode(Bridge), + BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), + + %% we now test if the bridge works as expected + RemoteTopic = <>, + LocalTopic = <>, + Payload = <<"hello">>, + emqx:subscribe(LocalTopic), + timer:sleep(100), + %% PUBLISH a message to the 'remote' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(RemoteTopic, Payload)), + %% we should receive a message on the local broker, with specified topic + ?assert( + receive + {deliver, LocalTopic, #message{payload = MapMsg}} -> + ct:pal("local broker got message: ~p on topic ~p", [MapMsg, LocalTopic]), + %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here. + case jsx:decode(MapMsg) of + #{<<"payload">> := Payload} -> + true; + _ -> + false + end; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 0, <<"received">> := 1} + } + ] + }, + jsx:decode(BridgeStr) + ), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + ok. + t_mqtt_conn_bridge_egress(_) -> %% then we add a mqtt connector, using POST User1 = <<"user1">>, @@ -276,6 +369,80 @@ t_mqtt_conn_bridge_egress(_) -> {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. +t_mqtt_conn_bridge_egress_no_payload_template(_) -> + %% then we add a mqtt connector, using POST + User1 = <<"user1">>, + + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_EGRESS + } = jsx:decode(Bridge), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + %% we now test if the bridge works as expected + LocalTopic = <>, + RemoteTopic = <>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + timer:sleep(100), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + %% we should receive a message on the "remote" broker, with specified topic + ?assert( + receive + {deliver, RemoteTopic, #message{payload = MapMsg, from = From}} -> + ct:pal("local broker got message: ~p on topic ~p", [MapMsg, RemoteTopic]), + %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here. + Size = byte_size(ResourceID), + ?assertMatch(<>, From), + case jsx:decode(MapMsg) of + #{<<"payload">> := Payload} -> + true; + _ -> + false + end; + Msg -> + ct:pal("Msg: ~p", [Msg]), + false + after 100 -> + false + end + ), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0} + } + ] + }, + jsx:decode(BridgeStr) + ), + + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + + ok. + t_egress_custom_clientid_prefix(_Config) -> User1 = <<"user1">>, {ok, 201, Bridge} = request(