diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 2b9bd48aa..21c06284d 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -266,11 +266,16 @@ process_request(#{ } = Conf, Msg) -> Conf#{ method => make_method(emqx_plugin_libs_rule:proc_tmpl(MethodTks, Msg)) , path => emqx_plugin_libs_rule:proc_tmpl(PathTks, Msg) - , body => emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg) + , body => process_request_body(BodyTks, Msg) , headers => maps:to_list(proc_headers(HeadersTks, Msg)) , request_timeout => ReqTimeout }. +process_request_body([], Msg) -> + emqx_json:encode(Msg); +process_request_body(BodyTks, Msg) -> + emqx_plugin_libs_rule:proc_tmpl(BodyTks, Msg). + proc_headers(HeaderTks, Msg) -> maps:fold(fun(K, V, Acc) -> Acc#{emqx_plugin_libs_rule:proc_tmpl(K, Msg) => diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index eb483dcc5..1357037ee 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -66,7 +66,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = replace_vars_in_str(PayloadToken, MapMsg), + Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), #mqtt_msg{qos = QoS, @@ -82,13 +82,18 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, #{local_topic := TopicToken, payload := PayloadToken, local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = replace_vars_in_str(PayloadToken, MapMsg), + Payload = process_payload(PayloadToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg), Retain = replace_simple_var(RetainToken, MapMsg), set_headers(Props, emqx_message:set_flags(#{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))). +process_payload([], Msg) -> + emqx_json:encode(Msg); +process_payload(Tks, Msg) -> + replace_vars_in_str(Tks, Msg). + %% Replace a string contains vars to another string in which the placeholders are replace by the %% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: %% "a: 1".