Merge pull request #12347 from savonarola/0117-render-valid-mqtt-messages-from-rules

fix(mqtt_bridge): render valid messages from incomplete rule data
This commit is contained in:
Ilya Averyanov 2024-01-18 14:24:05 +02:00 committed by GitHub
commit 3be3677005
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 55 additions and 8 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge_mqtt, [
{description, "EMQX MQTT Broker Bridge"},
{vsn, "0.1.7"},
{vsn, "0.1.8"},
{registered, []},
{applications, [
kernel,

View File

@ -16,6 +16,8 @@
-module(emqx_bridge_mqtt_msg).
-include_lib("emqx/include/emqx_mqtt.hrl").
-export([parse/1]).
-export([render/2]).
@ -66,8 +68,8 @@ render(
#{
topic => render_string(TopicToken, Msg),
payload => render_payload(Vars, Msg),
qos => render_simple_var(QoSToken, Msg),
retain => render_simple_var(RetainToken, Msg)
qos => render_simple_var(QoSToken, Msg, ?QOS_0),
retain => render_simple_var(RetainToken, Msg, false)
}.
render_payload(From, MapMsg) ->
@ -80,16 +82,23 @@ do_render_payload(Tks, Msg) ->
%% Replace a string contains vars to another string in which the placeholders are replace by the
%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
%% "a: 1".
%% "a: 1". Undefined vars will be replaced by empty strings.
render_string(Tokens, Data) when is_list(Tokens) ->
emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary});
emqx_placeholder:proc_tmpl(Tokens, Data, #{
return => full_binary, var_trans => fun undefined_as_empty/1
});
render_string(Val, _Data) ->
Val.
undefined_as_empty(undefined) ->
<<>>;
undefined_as_empty(Val) ->
emqx_utils_conv:bin(Val).
%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
%% value will be an integer 1.
render_simple_var(Tokens, Data) when is_list(Tokens) ->
render_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
[Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
Var;
render_simple_var(Val, _Data) ->
emqx_maybe:define(Var, Default);
render_simple_var(Val, _Data, _Default) ->
Val.

View File

@ -839,6 +839,40 @@ t_egress_mqtt_bridge_with_rules(_) ->
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []).
t_egress_mqtt_bridge_with_dummy_rule(_) ->
BridgeIDEgress = create_bridge(
?SERVER_CONF#{
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF
}
),
{ok, 201, Rule} = request(
post,
uri(["rules"]),
#{
<<"name">> => <<"A_rule_send_empty_messages_to_a_sink_mqtt_bridge">>,
<<"enable">> => true,
<<"actions">> => [BridgeIDEgress],
%% select something useless from what a message cannot be composed
<<"sql">> => <<"SELECT x from \"t/1\"">>
}
),
#{<<"id">> := RuleId} = emqx_utils_json:decode(Rule),
%% PUBLISH a message to the rule.
Payload = <<"hi">>,
RuleTopic = <<"t/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
emqx:publish(emqx_message:make(RuleTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic
assert_mqtt_msg_received(RemoteTopic, <<>>),
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []).
t_mqtt_conn_bridge_egress_reconnect(_) ->
%% then we add a mqtt connector, using POST
BridgeIDEgress = create_bridge(

View File

@ -0,0 +1,4 @@
Always render valid messages for egress MQTT data bridge from the data fetched by Rule SQL, even if the data is incomplete and placeholders used in the bridge configuration are missing.
Previously, some messages were rendered as invalid and were discarded by the MQTT egress data bridge.
Render undefined variables as empty strings in `payload` and `topic` templates of the MQTT egress data bridge. Previously, undefined variables were rendered as `undefined` strings.