refactor(mqttconn): simplify mqtt connector

Inline `emqx_connector_mqtt_msg` module code into
`emqx_connector_mqtt_worker` module, since it's not really used
anywhere else and does not provide any reusable abstractions.
This commit is contained in:
Andrew Mayorov 2023-05-19 16:32:22 +03:00
parent bd956d00b6
commit 67d703f8c5
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 122 additions and 179 deletions

View File

@ -1,134 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt_msg).
-export([
to_remote_msg/2,
to_broker_msg/3
]).
-export([
replace_vars_in_str/2,
replace_simple_var/2
]).
-export_type([msg/0]).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqtt/include/emqtt.hrl").
-type msg() :: emqx_types:message().
-type exp_msg() :: emqx_types:message() | #mqtt_msg{}.
-type remote_config() :: #{
topic := binary(),
qos := original | integer(),
retain := original | boolean(),
payload := binary()
}.
-type variables() :: #{
mountpoint := undefined | binary(),
remote := remote_config()
}.
%% @doc Make export format:
%% 1. Mount topic to a prefix
%% 2. Fix QoS to 1
%% @end
%% Shame that we have to know the callback module here
%% would be great if we can get rid of #mqtt_msg{} record
%% and use #message{} in all places.
-spec to_remote_msg(msg() | map(), variables()) ->
exp_msg().
to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false),
{Columns, _} = emqx_rule_events:eventmsg_publish(Msg),
MapMsg = maps:put(retain, Retain0, Columns),
to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{
remote := #{
topic := TopicToken,
qos := QoSToken,
retain := RetainToken
} = Remote
}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(Remote, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
#mqtt_msg{
qos = QoS,
retain = Retain,
topic = Topic,
props = emqx_utils:pub_props_to_packet(PubProps),
payload = Payload
}.
%% published from remote node over a MQTT connection
to_broker_msg(Msg, Vars, undefined) ->
to_broker_msg(Msg, Vars, #{});
to_broker_msg(
#{dup := Dup} = MapMsg,
#{
local := #{
topic := TopicToken,
qos := QoSToken,
retain := RetainToken
} = Local
},
Props
) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(Local, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
set_headers(
Props#{properties => emqx_utils:pub_props_to_packet(PubProps)},
emqx_message:set_flags(
#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, Topic, Payload)
)
).
process_payload(From, MapMsg) ->
do_process_payload(maps:get(payload, From, undefined), MapMsg).
do_process_payload(undefined, Msg) ->
emqx_utils_json:encode(Msg);
do_process_payload(Tks, Msg) ->
replace_vars_in_str(Tks, Msg).
%% Replace a string contains vars to another string in which the placeholders are replace by the
%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
%% "a: 1".
replace_vars_in_str(Tokens, Data) when is_list(Tokens) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary});
replace_vars_in_str(Val, _Data) ->
Val.
%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
%% value will be an integer 1.
replace_simple_var(Tokens, Data) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
Var;
replace_simple_var(Val, _Data) ->
Val.
set_headers(Val, Msg) ->
emqx_message:set_headers(Val, Msg).

View File

@ -61,8 +61,8 @@
-module(emqx_connector_mqtt_worker). -module(emqx_connector_mqtt_worker).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
%% APIs %% APIs
-export([ -export([
@ -79,7 +79,7 @@
send_to_remote_async/4 send_to_remote_async/4
]). ]).
-export([handle_publish/3]). -export([handle_publish/4]).
-export([handle_disconnect/1]). -export([handle_disconnect/1]).
-export_type([config/0]). -export_type([config/0]).
@ -117,12 +117,7 @@
topic := emqx_topic:topic(), topic := emqx_topic:topic(),
qos => emqx_types:qos() qos => emqx_types:qos()
}, },
local := #{ local := msgvars(),
topic => template(),
qos => template() | emqx_types:qos(),
retain => template() | boolean(),
payload => template() | undefined
},
on_message_received := {module(), atom(), [term()]} on_message_received := {module(), atom(), [term()]}
}. }.
@ -130,12 +125,14 @@
local => #{ local => #{
topic => emqx_topic:topic() topic => emqx_topic:topic()
}, },
remote := #{ remote := msgvars()
topic := template(), }.
qos => template() | emqx_types:qos(),
retain => template() | boolean(), -type msgvars() :: #{
payload => template() | undefined topic => template(),
} qos => template() | emqx_types:qos(),
retain => template() | boolean(),
payload => template() | undefined
}. }.
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -246,9 +243,17 @@ mk_client_options(Config, BridgeOpts) ->
force_ping => true force_ping => true
}. }.
mk_client_event_handler(Vars, Opts) when Vars /= undefined -> mk_client_event_handler(Subscriptions = #{}, Opts) ->
OnMessage = maps:get(on_message_received, Subscriptions, undefined),
LocalPublish =
case Subscriptions of
#{local := Local = #{topic := _}} ->
Local;
#{} ->
undefined
end,
#{ #{
publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, publish => {fun ?MODULE:handle_publish/4, [OnMessage, LocalPublish, Opts]},
disconnected => {fun ?MODULE:handle_disconnect/1, []} disconnected => {fun ?MODULE:handle_disconnect/1, []}
}; };
mk_client_event_handler(undefined, _Opts) -> mk_client_event_handler(undefined, _Opts) ->
@ -279,7 +284,7 @@ ping(Pid) ->
send_to_remote(Pid, MsgIn, Conf) -> send_to_remote(Pid, MsgIn, Conf) ->
do_send(Pid, export_msg(MsgIn, Conf)). do_send(Pid, export_msg(MsgIn, Conf)).
do_send(Pid, {true, Msg}) -> do_send(Pid, Msg) when Msg /= undefined ->
case emqtt:publish(Pid, Msg) of case emqtt:publish(Pid, Msg) of
ok -> ok ->
ok; ok;
@ -303,20 +308,18 @@ do_send(Pid, {true, Msg}) ->
}), }),
{error, Reason} {error, Reason}
end; end;
do_send(_Name, false) -> do_send(_Name, undefined) ->
ok. ok.
send_to_remote_async(Pid, MsgIn, Callback, Conf) -> send_to_remote_async(Pid, MsgIn, Callback, Conf) ->
do_send_async(Pid, export_msg(MsgIn, Conf), Callback). do_send_async(Pid, export_msg(MsgIn, Conf), Callback).
do_send_async(Pid, {true, Msg}, Callback) -> do_send_async(Pid, Msg, Callback) when Msg /= undefined ->
ok = emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback), ok = emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback),
{ok, Pid}; {ok, Pid};
do_send_async(_Pid, false, _Callback) -> do_send_async(_Pid, undefined, _Callback) ->
ok. ok.
pre_process_subscriptions(undefined, _, _) ->
undefined;
pre_process_subscriptions( pre_process_subscriptions(
#{remote := RC, local := LC} = Conf, #{remote := RC, local := LC} = Conf,
BridgeName, BridgeName,
@ -330,8 +333,6 @@ pre_process_subscriptions(Conf, _, _) when is_map(Conf) ->
%% have no 'local' field in the config %% have no 'local' field in the config
undefined. undefined.
pre_process_forwards(undefined) ->
undefined;
pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) -> pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) ->
Conf#{remote => pre_process_in_out_common(RC)}; Conf#{remote => pre_process_in_out_common(RC)};
pre_process_forwards(Conf) when is_map(Conf) -> pre_process_forwards(Conf) when is_map(Conf) ->
@ -375,44 +376,39 @@ downgrade_ingress_qos(2) ->
downgrade_ingress_qos(QoS) -> downgrade_ingress_qos(QoS) ->
QoS. QoS.
export_msg(Msg, #{forwards := Forwards = #{}}) -> export_msg(Msg, #{forwards := #{remote := Remote}}) ->
{true, emqx_connector_mqtt_msg:to_remote_msg(Msg, Forwards)}; to_remote_msg(Msg, Remote);
export_msg(Msg, #{forwards := undefined}) -> export_msg(Msg, #{forwards := undefined}) ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "forwarding_unavailable", msg => "forwarding_unavailable",
message => Msg, message => Msg,
reason => "egress is not configured" reason => "egress is not configured"
}), }),
false. undefined.
%% %%
handle_publish(#{properties := Props} = MsgIn, Vars, Opts) -> handle_publish(#{properties := Props} = MsgIn, OnMessage, LocalPublish, Opts) ->
Msg = import_msg(MsgIn, Opts), Msg = import_msg(MsgIn, Opts),
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "publish_local", msg => "publish_local",
message => Msg, message => Msg
vars => Vars
}), }),
case Vars of maybe_on_message_received(Msg, OnMessage),
#{on_message_received := {Mod, Func, Args}} -> maybe_publish_local(Msg, LocalPublish, Props).
_ = erlang:apply(Mod, Func, [Msg | Args]);
_ ->
ok
end,
maybe_publish_local(Msg, Vars, Props).
handle_disconnect(_Reason) -> handle_disconnect(_Reason) ->
ok. ok.
maybe_publish_local(Msg, Vars, Props) -> maybe_on_message_received(Msg, {Mod, Func, Args}) ->
case emqx_utils_maps:deep_get([local, topic], Vars, undefined) of erlang:apply(Mod, Func, [Msg | Args]);
%% local topic is not set, discard it maybe_on_message_received(_Msg, undefined) ->
undefined -> ok.
ok;
_ -> maybe_publish_local(Msg, Local = #{}, Props) ->
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)) emqx_broker:publish(to_broker_msg(Msg, Local, Props));
end. maybe_publish_local(_Msg, undefined, _Props) ->
ok.
import_msg( import_msg(
#{ #{
@ -459,3 +455,84 @@ printable_maps(Headers) ->
#{}, #{},
Headers Headers
). ).
%% Shame that we have to know the callback module here
%% would be great if we can get rid of #mqtt_msg{} record
%% and use #message{} in all places.
-spec to_remote_msg(emqx_types:message() | map(), msgvars()) ->
#mqtt_msg{}.
to_remote_msg(#message{flags = Flags} = Msg, Vars) ->
{EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg),
to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars);
to_remote_msg(
MapMsg,
#{
topic := TopicToken,
qos := QoSToken,
retain := RetainToken
} = Remote
) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(Remote, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
#mqtt_msg{
qos = QoS,
retain = Retain,
topic = Topic,
props = emqx_utils:pub_props_to_packet(PubProps),
payload = Payload
}.
%% published from remote node over a MQTT connection
to_broker_msg(Msg, Vars, undefined) ->
to_broker_msg(Msg, Vars, #{});
to_broker_msg(
#{dup := Dup} = MapMsg,
#{
topic := TopicToken,
qos := QoSToken,
retain := RetainToken
} = Local,
Props
) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(Local, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
set_headers(
Props#{properties => emqx_utils:pub_props_to_packet(PubProps)},
emqx_message:set_flags(
#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, Topic, Payload)
)
).
process_payload(From, MapMsg) ->
do_process_payload(maps:get(payload, From, undefined), MapMsg).
do_process_payload(undefined, Msg) ->
emqx_utils_json:encode(Msg);
do_process_payload(Tks, Msg) ->
replace_vars_in_str(Tks, Msg).
%% Replace a string contains vars to another string in which the placeholders are replace by the
%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be:
%% "a: 1".
replace_vars_in_str(Tokens, Data) when is_list(Tokens) ->
emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary});
replace_vars_in_str(Val, _Data) ->
Val.
%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result
%% value will be an integer 1.
replace_simple_var(Tokens, Data) when is_list(Tokens) ->
[Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}),
Var;
replace_simple_var(Val, _Data) ->
Val.
set_headers(Val, Msg) ->
emqx_message:set_headers(Val, Msg).