diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl deleted file mode 100644 index 004819678..000000000 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ /dev/null @@ -1,134 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_connector_mqtt_msg). - --export([ - to_remote_msg/2, - to_broker_msg/3 -]). - --export([ - replace_vars_in_str/2, - replace_simple_var/2 -]). - --export_type([msg/0]). - --include_lib("emqx/include/emqx.hrl"). - --include_lib("emqtt/include/emqtt.hrl"). - --type msg() :: emqx_types:message(). --type exp_msg() :: emqx_types:message() | #mqtt_msg{}. --type remote_config() :: #{ - topic := binary(), - qos := original | integer(), - retain := original | boolean(), - payload := binary() -}. --type variables() :: #{ - mountpoint := undefined | binary(), - remote := remote_config() -}. - -%% @doc Make export format: -%% 1. Mount topic to a prefix -%% 2. Fix QoS to 1 -%% @end -%% Shame that we have to know the callback module here -%% would be great if we can get rid of #mqtt_msg{} record -%% and use #message{} in all places. --spec to_remote_msg(msg() | map(), variables()) -> - exp_msg(). -to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> - Retain0 = maps:get(retain, Flags0, false), - {Columns, _} = emqx_rule_events:eventmsg_publish(Msg), - MapMsg = maps:put(retain, Retain0, Columns), - to_remote_msg(MapMsg, Vars); -to_remote_msg(MapMsg, #{ - remote := #{ - topic := TopicToken, - qos := QoSToken, - retain := RetainToken - } = Remote -}) when is_map(MapMsg) -> - Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = process_payload(Remote, MapMsg), - QoS = replace_simple_var(QoSToken, MapMsg), - Retain = replace_simple_var(RetainToken, MapMsg), - PubProps = maps:get(pub_props, MapMsg, #{}), - #mqtt_msg{ - qos = QoS, - retain = Retain, - topic = Topic, - props = emqx_utils:pub_props_to_packet(PubProps), - payload = Payload - }. - -%% published from remote node over a MQTT connection -to_broker_msg(Msg, Vars, undefined) -> - to_broker_msg(Msg, Vars, #{}); -to_broker_msg( - #{dup := Dup} = MapMsg, - #{ - local := #{ - topic := TopicToken, - qos := QoSToken, - retain := RetainToken - } = Local - }, - Props -) -> - Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = process_payload(Local, MapMsg), - QoS = replace_simple_var(QoSToken, MapMsg), - Retain = replace_simple_var(RetainToken, MapMsg), - PubProps = maps:get(pub_props, MapMsg, #{}), - set_headers( - Props#{properties => emqx_utils:pub_props_to_packet(PubProps)}, - emqx_message:set_flags( - #{dup => Dup, retain => Retain}, - emqx_message:make(bridge, QoS, Topic, Payload) - ) - ). - -process_payload(From, MapMsg) -> - do_process_payload(maps:get(payload, From, undefined), MapMsg). - -do_process_payload(undefined, Msg) -> - emqx_utils_json:encode(Msg); -do_process_payload(Tks, Msg) -> - replace_vars_in_str(Tks, Msg). - -%% Replace a string contains vars to another string in which the placeholders are replace by the -%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: -%% "a: 1". -replace_vars_in_str(Tokens, Data) when is_list(Tokens) -> - emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary}); -replace_vars_in_str(Val, _Data) -> - Val. - -%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result -%% value will be an integer 1. -replace_simple_var(Tokens, Data) when is_list(Tokens) -> - [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), - Var; -replace_simple_var(Val, _Data) -> - Val. - -set_headers(Val, Msg) -> - emqx_message:set_headers(Val, Msg). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index c49d5b180..4169c7f69 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -61,8 +61,8 @@ -module(emqx_connector_mqtt_worker). --include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). %% APIs -export([ @@ -79,7 +79,7 @@ send_to_remote_async/4 ]). --export([handle_publish/3]). +-export([handle_publish/4]). -export([handle_disconnect/1]). -export_type([config/0]). @@ -117,12 +117,7 @@ topic := emqx_topic:topic(), qos => emqx_types:qos() }, - local := #{ - topic => template(), - qos => template() | emqx_types:qos(), - retain => template() | boolean(), - payload => template() | undefined - }, + local := msgvars(), on_message_received := {module(), atom(), [term()]} }. @@ -130,12 +125,14 @@ local => #{ topic => emqx_topic:topic() }, - remote := #{ - topic := template(), - qos => template() | emqx_types:qos(), - retain => template() | boolean(), - payload => template() | undefined - } + remote := msgvars() +}. + +-type msgvars() :: #{ + topic => template(), + qos => template() | emqx_types:qos(), + retain => template() | boolean(), + payload => template() | undefined }. -include_lib("emqx/include/logger.hrl"). @@ -246,9 +243,17 @@ mk_client_options(Config, BridgeOpts) -> force_ping => true }. -mk_client_event_handler(Vars, Opts) when Vars /= undefined -> +mk_client_event_handler(Subscriptions = #{}, Opts) -> + OnMessage = maps:get(on_message_received, Subscriptions, undefined), + LocalPublish = + case Subscriptions of + #{local := Local = #{topic := _}} -> + Local; + #{} -> + undefined + end, #{ - publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, + publish => {fun ?MODULE:handle_publish/4, [OnMessage, LocalPublish, Opts]}, disconnected => {fun ?MODULE:handle_disconnect/1, []} }; mk_client_event_handler(undefined, _Opts) -> @@ -279,7 +284,7 @@ ping(Pid) -> send_to_remote(Pid, MsgIn, Conf) -> do_send(Pid, export_msg(MsgIn, Conf)). -do_send(Pid, {true, Msg}) -> +do_send(Pid, Msg) when Msg /= undefined -> case emqtt:publish(Pid, Msg) of ok -> ok; @@ -303,20 +308,18 @@ do_send(Pid, {true, Msg}) -> }), {error, Reason} end; -do_send(_Name, false) -> +do_send(_Name, undefined) -> ok. send_to_remote_async(Pid, MsgIn, Callback, Conf) -> do_send_async(Pid, export_msg(MsgIn, Conf), Callback). -do_send_async(Pid, {true, Msg}, Callback) -> +do_send_async(Pid, Msg, Callback) when Msg /= undefined -> ok = emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback), {ok, Pid}; -do_send_async(_Pid, false, _Callback) -> +do_send_async(_Pid, undefined, _Callback) -> ok. -pre_process_subscriptions(undefined, _, _) -> - undefined; pre_process_subscriptions( #{remote := RC, local := LC} = Conf, BridgeName, @@ -330,8 +333,6 @@ pre_process_subscriptions(Conf, _, _) when is_map(Conf) -> %% have no 'local' field in the config undefined. -pre_process_forwards(undefined) -> - undefined; pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) -> Conf#{remote => pre_process_in_out_common(RC)}; pre_process_forwards(Conf) when is_map(Conf) -> @@ -375,44 +376,39 @@ downgrade_ingress_qos(2) -> downgrade_ingress_qos(QoS) -> QoS. -export_msg(Msg, #{forwards := Forwards = #{}}) -> - {true, emqx_connector_mqtt_msg:to_remote_msg(Msg, Forwards)}; +export_msg(Msg, #{forwards := #{remote := Remote}}) -> + to_remote_msg(Msg, Remote); export_msg(Msg, #{forwards := undefined}) -> ?SLOG(error, #{ msg => "forwarding_unavailable", message => Msg, reason => "egress is not configured" }), - false. + undefined. %% -handle_publish(#{properties := Props} = MsgIn, Vars, Opts) -> +handle_publish(#{properties := Props} = MsgIn, OnMessage, LocalPublish, Opts) -> Msg = import_msg(MsgIn, Opts), ?SLOG(debug, #{ msg => "publish_local", - message => Msg, - vars => Vars + message => Msg }), - case Vars of - #{on_message_received := {Mod, Func, Args}} -> - _ = erlang:apply(Mod, Func, [Msg | Args]); - _ -> - ok - end, - maybe_publish_local(Msg, Vars, Props). + maybe_on_message_received(Msg, OnMessage), + maybe_publish_local(Msg, LocalPublish, Props). handle_disconnect(_Reason) -> ok. -maybe_publish_local(Msg, Vars, Props) -> - case emqx_utils_maps:deep_get([local, topic], Vars, undefined) of - %% local topic is not set, discard it - undefined -> - ok; - _ -> - emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)) - end. +maybe_on_message_received(Msg, {Mod, Func, Args}) -> + erlang:apply(Mod, Func, [Msg | Args]); +maybe_on_message_received(_Msg, undefined) -> + ok. + +maybe_publish_local(Msg, Local = #{}, Props) -> + emqx_broker:publish(to_broker_msg(Msg, Local, Props)); +maybe_publish_local(_Msg, undefined, _Props) -> + ok. import_msg( #{ @@ -459,3 +455,84 @@ printable_maps(Headers) -> #{}, Headers ). + +%% Shame that we have to know the callback module here +%% would be great if we can get rid of #mqtt_msg{} record +%% and use #message{} in all places. +-spec to_remote_msg(emqx_types:message() | map(), msgvars()) -> + #mqtt_msg{}. +to_remote_msg(#message{flags = Flags} = Msg, Vars) -> + {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), + to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars); +to_remote_msg( + MapMsg, + #{ + topic := TopicToken, + qos := QoSToken, + retain := RetainToken + } = Remote +) when is_map(MapMsg) -> + Topic = replace_vars_in_str(TopicToken, MapMsg), + Payload = process_payload(Remote, MapMsg), + QoS = replace_simple_var(QoSToken, MapMsg), + Retain = replace_simple_var(RetainToken, MapMsg), + PubProps = maps:get(pub_props, MapMsg, #{}), + #mqtt_msg{ + qos = QoS, + retain = Retain, + topic = Topic, + props = emqx_utils:pub_props_to_packet(PubProps), + payload = Payload + }. + +%% published from remote node over a MQTT connection +to_broker_msg(Msg, Vars, undefined) -> + to_broker_msg(Msg, Vars, #{}); +to_broker_msg( + #{dup := Dup} = MapMsg, + #{ + topic := TopicToken, + qos := QoSToken, + retain := RetainToken + } = Local, + Props +) -> + Topic = replace_vars_in_str(TopicToken, MapMsg), + Payload = process_payload(Local, MapMsg), + QoS = replace_simple_var(QoSToken, MapMsg), + Retain = replace_simple_var(RetainToken, MapMsg), + PubProps = maps:get(pub_props, MapMsg, #{}), + set_headers( + Props#{properties => emqx_utils:pub_props_to_packet(PubProps)}, + emqx_message:set_flags( + #{dup => Dup, retain => Retain}, + emqx_message:make(bridge, QoS, Topic, Payload) + ) + ). + +process_payload(From, MapMsg) -> + do_process_payload(maps:get(payload, From, undefined), MapMsg). + +do_process_payload(undefined, Msg) -> + emqx_utils_json:encode(Msg); +do_process_payload(Tks, Msg) -> + replace_vars_in_str(Tks, Msg). + +%% Replace a string contains vars to another string in which the placeholders are replace by the +%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: +%% "a: 1". +replace_vars_in_str(Tokens, Data) when is_list(Tokens) -> + emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary}); +replace_vars_in_str(Val, _Data) -> + Val. + +%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result +%% value will be an integer 1. +replace_simple_var(Tokens, Data) when is_list(Tokens) -> + [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), + Var; +replace_simple_var(Val, _Data) -> + Val. + +set_headers(Val, Msg) -> + emqx_message:set_headers(Val, Msg).