fix(mqtt_bridge): render valid messages from incomplete rule data
This commit is contained in:
parent
a8c6280a5e
commit
adf22f1f10
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqx_bridge_mqtt_msg).
|
||||
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
|
||||
-export([parse/1]).
|
||||
-export([render/2]).
|
||||
|
||||
|
@ -66,8 +68,8 @@ render(
|
|||
#{
|
||||
topic => render_string(TopicToken, Msg),
|
||||
payload => render_payload(Vars, Msg),
|
||||
qos => render_simple_var(QoSToken, Msg),
|
||||
retain => render_simple_var(RetainToken, Msg)
|
||||
qos => render_simple_var(QoSToken, Msg, ?QOS_0),
|
||||
retain => render_simple_var(RetainToken, Msg, false)
|
||||
}.
|
||||
|
||||
render_payload(From, MapMsg) ->
|
||||
|
@ -80,16 +82,23 @@ do_render_payload(Tks, Msg) ->
|
|||
|
||||
%% Replace a string contains vars to another string in which the placeholders are replace by the
|
||||
%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
|
||||
%% "a: 1".
|
||||
%% "a: 1". Undefined vars will be replaced by empty strings.
|
||||
render_string(Tokens, Data) when is_list(Tokens) ->
|
||||
emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary});
|
||||
emqx_placeholder:proc_tmpl(Tokens, Data, #{
|
||||
return => full_binary, var_trans => fun undefined_as_empty/1
|
||||
});
|
||||
render_string(Val, _Data) ->
|
||||
Val.
|
||||
|
||||
undefined_as_empty(undefined) ->
|
||||
<<>>;
|
||||
undefined_as_empty(Val) ->
|
||||
emqx_utils_conv:bin(Val).
|
||||
|
||||
%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
|
||||
%% value will be an integer 1.
|
||||
render_simple_var(Tokens, Data) when is_list(Tokens) ->
|
||||
render_simple_var(Tokens, Data, Default) when is_list(Tokens) ->
|
||||
[Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}),
|
||||
Var;
|
||||
render_simple_var(Val, _Data) ->
|
||||
emqx_maybe:define(Var, Default);
|
||||
render_simple_var(Val, _Data, _Default) ->
|
||||
Val.
|
||||
|
|
|
@ -836,6 +836,40 @@ t_egress_mqtt_bridge_with_rules(_) ->
|
|||
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []).
|
||||
|
||||
t_egress_mqtt_bridge_with_dummy_rule(_) ->
|
||||
BridgeIDEgress = create_bridge(
|
||||
?SERVER_CONF#{
|
||||
<<"name">> => ?BRIDGE_NAME_EGRESS,
|
||||
<<"egress">> => ?EGRESS_CONF
|
||||
}
|
||||
),
|
||||
|
||||
{ok, 201, Rule} = request(
|
||||
post,
|
||||
uri(["rules"]),
|
||||
#{
|
||||
<<"name">> => <<"A_rule_send_empty_messages_to_a_sink_mqtt_bridge">>,
|
||||
<<"enable">> => true,
|
||||
<<"actions">> => [BridgeIDEgress],
|
||||
%% select something useless from what a message cannot be composed
|
||||
<<"sql">> => <<"SELECT x from \"t/1\"">>
|
||||
}
|
||||
),
|
||||
#{<<"id">> := RuleId} = emqx_utils_json:decode(Rule),
|
||||
|
||||
%% PUBLISH a message to the rule.
|
||||
Payload = <<"hi">>,
|
||||
RuleTopic = <<"t/1">>,
|
||||
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/">>,
|
||||
emqx:subscribe(RemoteTopic),
|
||||
timer:sleep(100),
|
||||
emqx:publish(emqx_message:make(RuleTopic, Payload)),
|
||||
%% we should receive a message on the "remote" broker, with specified topic
|
||||
assert_mqtt_msg_received(RemoteTopic, <<>>),
|
||||
|
||||
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
|
||||
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []).
|
||||
|
||||
t_mqtt_conn_bridge_egress_reconnect(_) ->
|
||||
%% then we add a mqtt connector, using POST
|
||||
BridgeIDEgress = create_bridge(
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
Always render valid messages for egress MQTT data bridge from the data fetched by Rule SQL, even if the data is incomplete and placeholders used in the bridge configuration are missing.
|
||||
Previously, some messages were rendered as invalid and were discarded by the MQTT egress data bridge.
|
||||
|
||||
Render undefined variables as empty strings in `payload` and `topic` templates of the MQTT egress data bridge. Previously, undefined variables were rendered as `undefined` strings.
|
Loading…
Reference in New Issue