feat(mqtt-bridge): avoid middleman process

Instead, supervise `emqtt` client process directly.
This commit is contained in:
Andrew Mayorov 2023-01-30 18:35:17 +03:00
parent 4d146c521b
commit d0c10b59aa
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
7 changed files with 338 additions and 773 deletions

View File

@ -640,7 +640,7 @@ t_bridges_probe(Config) ->
?assertMatch( ?assertMatch(
#{ #{
<<"code">> := <<"TEST_FAILED">>, <<"code">> := <<"TEST_FAILED">>,
<<"message">> := <<"#{reason => econnrefused", _/binary>> <<"message">> := <<"econnrefused">>
}, },
jsx:decode(ConnRefused) jsx:decode(ConnRefused)
), ),

View File

@ -825,15 +825,15 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
<<"type">> => ?TYPE_MQTT, <<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS, <<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF, <<"egress">> => ?EGRESS_CONF,
%% to make it reconnect quickly
<<"reconnect_interval">> => <<"1s">>,
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"worker_pool_size">> => 2, <<"worker_pool_size">> => 2,
<<"query_mode">> => <<"sync">>, <<"query_mode">> => <<"sync">>,
%% using a long time so we can test recovery %% using a long time so we can test recovery
<<"request_timeout">> => <<"15s">>, <<"request_timeout">> => <<"15s">>,
%% to make it check the healthy quickly %% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">> <<"health_check_interval">> => <<"0.5s">>,
%% to make it reconnect quickly
<<"auto_restart_interval">> => <<"1s">>
} }
} }
), ),
@ -911,7 +911,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
Decoded1 = jsx:decode(BridgeStr1), Decoded1 = jsx:decode(BridgeStr1),
DecodedMetrics1 = jsx:decode(BridgeMetricsStr1), DecodedMetrics1 = jsx:decode(BridgeMetricsStr1),
?assertMatch( ?assertMatch(
Status when (Status == <<"connected">> orelse Status == <<"connecting">>), Status when (Status == <<"connecting">> orelse Status == <<"disconnected">>),
maps:get(<<"status">>, Decoded1) maps:get(<<"status">>, Decoded1)
), ),
%% matched >= 3 because of possible retries. %% matched >= 3 because of possible retries.

View File

@ -105,16 +105,15 @@ init([]) ->
{ok, {SupFlag, []}}. {ok, {SupFlag, []}}.
bridge_spec(Config) -> bridge_spec(Config) ->
{Name, NConfig} = maps:take(name, Config),
#{ #{
id => maps:get(name, Config), id => Name,
start => {emqx_connector_mqtt_worker, start_link, [Config]}, start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]},
restart => permanent, restart => temporary,
shutdown => 5000, shutdown => 5000
type => worker,
modules => [emqx_connector_mqtt_worker]
}. }.
-spec bridges() -> [{node(), map()}]. -spec bridges() -> [{_Name, _Status}].
bridges() -> bridges() ->
[ [
{Name, emqx_connector_mqtt_worker:status(Name)} {Name, emqx_connector_mqtt_worker:status(Name)}
@ -144,8 +143,7 @@ on_message_received(Msg, HookPoint, ResId) ->
%% =================================================================== %% ===================================================================
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
on_start(InstId, Conf) -> on_start(InstanceId, Conf) ->
InstanceId = binary_to_atom(InstId, utf8),
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_mqtt_connector", msg => "starting_mqtt_connector",
connector => InstanceId, connector => InstanceId,
@ -154,8 +152,8 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf), BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{ BridgeConf = BasicConf#{
name => InstanceId, name => InstanceId,
clientid => clientid(InstId, Conf), clientid => clientid(InstanceId, Conf),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstId), subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstanceId),
forwards => make_forward_confs(maps:get(egress, Conf, undefined)) forwards => make_forward_confs(maps:get(egress, Conf, undefined))
}, },
case ?MODULE:create_bridge(BridgeConf) of case ?MODULE:create_bridge(BridgeConf) of
@ -189,35 +187,49 @@ on_stop(_InstId, #{name := InstanceId}) ->
on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg). case emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg) of
ok ->
on_query_async( ok;
_InstId, {error, Reason} ->
{send_message, Msg}, classify_error(Reason)
{ReplyFun, Args},
#{name := InstanceId}
) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}).
on_get_status(_InstId, #{name := InstanceId}) ->
case emqx_connector_mqtt_worker:status(InstanceId) of
connected -> connected;
_ -> connecting
end. end.
on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of
ok ->
% TODO this is racy
{ok, emqx_connector_mqtt_worker:pid(InstanceId)};
{error, Reason} ->
classify_error(Reason)
end.
on_get_status(_InstId, #{name := InstanceId}) ->
emqx_connector_mqtt_worker:status(InstanceId).
classify_error(disconnected = Reason) ->
{error, {recoverable_error, Reason}};
classify_error({disconnected, _RC, _} = Reason) ->
{error, {recoverable_error, Reason}};
classify_error({shutdown, _} = Reason) ->
{error, {recoverable_error, Reason}};
classify_error(Reason) ->
{error, {unrecoverable_error, Reason}}.
ensure_mqtt_worker_started(InstanceId, BridgeConf) -> ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
case emqx_connector_mqtt_worker:ensure_started(InstanceId) of case emqx_connector_mqtt_worker:connect(InstanceId) of
ok -> {ok, #{name => InstanceId, bridge_conf => BridgeConf}}; {ok, Properties} ->
{error, Reason} -> {error, Reason} {ok, #{name => InstanceId, config => BridgeConf, props => Properties}};
{error, Reason} ->
{error, Reason}
end. end.
make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
undefined; undefined;
make_sub_confs(undefined, _Conf, _) -> make_sub_confs(undefined, _Conf, _) ->
undefined; undefined;
make_sub_confs(SubRemoteConf, Conf, InstId) -> make_sub_confs(SubRemoteConf, Conf, InstanceId) ->
ResId = emqx_resource_manager:manager_id_to_resource_id(InstId), ResId = emqx_resource_manager:manager_id_to_resource_id(InstanceId),
case maps:find(hookpoint, Conf) of case maps:find(hookpoint, Conf) of
error -> error ->
error({no_hookpoint_provided, Conf}); error({no_hookpoint_provided, Conf});
@ -251,7 +263,6 @@ basic_config(
%% 30s %% 30s
connect_timeout => 30, connect_timeout => 30,
auto_reconnect => true, auto_reconnect => true,
reconnect_interval => ?AUTO_RECONNECT_INTERVAL,
proto_ver => ProtoVer, proto_ver => ProtoVer,
%% Opening bridge_mode will form a non-standard mqtt connection message. %% Opening bridge_mode will form a non-standard mqtt connection message.
%% A load balancing server (such as haproxy) is often set up before the emqx broker server. %% A load balancing server (such as haproxy) is often set up before the emqx broker server.
@ -264,8 +275,7 @@ basic_config(
retry_interval => RetryIntv, retry_interval => RetryIntv,
max_inflight => MaxInflight, max_inflight => MaxInflight,
ssl => EnableSsl, ssl => EnableSsl,
ssl_opts => maps:to_list(maps:remove(enable, Ssl)), ssl_opts => maps:to_list(maps:remove(enable, Ssl))
if_record_metrics => true
}, },
maybe_put_fields([username, password], Conf, BasicConf). maybe_put_fields([username, password], Conf, BasicConf).

View File

@ -1,235 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol
-module(emqx_connector_mqtt_mod).
-export([
start/1,
send/2,
send_async/3,
stop/1,
ping/1
]).
-export([info/2]).
-export([
ensure_subscribed/3,
ensure_unsubscribed/2
]).
%% callbacks for emqtt
-export([
handle_publish/3,
handle_disconnected/2
]).
-include_lib("emqx/include/logger.hrl").
%%--------------------------------------------------------------------
%% emqx_bridge_connect callbacks
%%--------------------------------------------------------------------
start(Config) ->
Parent = self(),
ServerStr = iolist_to_binary(maps:get(server, Config)),
{Server, Port} = emqx_connector_mqtt_schema:parse_server(ServerStr),
Mountpoint = maps:get(receive_mountpoint, Config, undefined),
Subscriptions = maps:get(subscriptions, Config, undefined),
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
Handlers = make_hdlr(Parent, Vars, #{server => ServerStr}),
Config1 = Config#{
msg_handler => Handlers,
host => Server,
port => Port,
force_ping => true,
proto_ver => maps:get(proto_ver, Config, v4)
},
case emqtt:start_link(process_config(Config1)) of
{ok, Pid} ->
case emqtt:connect(Pid) of
{ok, _} ->
try
ok = sub_remote_topics(Pid, Subscriptions),
{ok, #{client_pid => Pid, subscriptions => Subscriptions}}
catch
throw:Reason ->
ok = stop(#{client_pid => Pid}),
{error, error_reason(Reason, ServerStr)}
end;
{error, Reason} ->
ok = stop(#{client_pid => Pid}),
{error, error_reason(Reason, ServerStr)}
end;
{error, Reason} ->
{error, error_reason(Reason, ServerStr)}
end.
error_reason(Reason, ServerStr) ->
#{reason => Reason, server => ServerStr}.
stop(#{client_pid := Pid}) ->
safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000),
ok.
ping(undefined) ->
pang;
ping(#{client_pid := Pid}) ->
emqtt:ping(Pid).
info(pid, #{client_pid := Pid}) ->
Pid.
ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when
is_pid(Pid)
->
case emqtt:subscribe(Pid, Topic, QoS) of
{ok, _, _} -> Conn#{subscriptions => [{Topic, QoS} | Subs]};
Error -> {error, Error}
end;
ensure_subscribed(_Conn, _Topic, _QoS) ->
%% return ok for now
%% next re-connect should should call start with new topic added to config
ok.
ensure_unsubscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic) when is_pid(Pid) ->
case emqtt:unsubscribe(Pid, Topic) of
{ok, _, _} -> Conn#{subscriptions => lists:keydelete(Topic, 1, Subs)};
Error -> {error, Error}
end;
ensure_unsubscribed(Conn, _) ->
%% return ok for now
%% next re-connect should should call start with this topic deleted from config
Conn.
safe_stop(Pid, StopF, Timeout) ->
MRef = monitor(process, Pid),
unlink(Pid),
try
StopF()
catch
_:_ ->
ok
end,
receive
{'DOWN', MRef, _, _, _} ->
ok
after Timeout ->
exit(Pid, kill)
end.
send(#{client_pid := ClientPid}, Msg) ->
emqtt:publish(ClientPid, Msg).
send_async(#{client_pid := ClientPid}, Msg, Callback) ->
emqtt:publish_async(ClientPid, Msg, infinity, Callback).
handle_publish(Msg, undefined, _Opts) ->
?SLOG(error, #{
msg =>
"cannot_publish_to_local_broker_as"
"_'ingress'_is_not_configured",
message => Msg
});
handle_publish(#{properties := Props} = Msg0, Vars, Opts) ->
Msg = format_msg_received(Msg0, Opts),
?SLOG(debug, #{
msg => "publish_to_local_broker",
message => Msg,
vars => Vars
}),
case Vars of
#{on_message_received := {Mod, Func, Args}} ->
_ = erlang:apply(Mod, Func, [Msg | Args]);
_ ->
ok
end,
maybe_publish_to_local_broker(Msg, Vars, Props).
handle_disconnected(Reason, Parent) ->
Parent ! {disconnected, self(), Reason}.
make_hdlr(Parent, Vars, Opts) ->
#{
publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]},
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
}.
sub_remote_topics(_ClientPid, undefined) ->
ok;
sub_remote_topics(ClientPid, #{remote := #{topic := FromTopic, qos := QoS}}) ->
case emqtt:subscribe(ClientPid, FromTopic, QoS) of
{ok, _, _} -> ok;
Error -> throw(Error)
end.
process_config(Config) ->
maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config).
maybe_publish_to_local_broker(Msg, Vars, Props) ->
case emqx_map_lib:deep_get([local, topic], Vars, undefined) of
%% local topic is not set, discard it
undefined -> ok;
_ -> emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
end.
format_msg_received(
#{
dup := Dup,
payload := Payload,
properties := Props,
qos := QoS,
retain := Retain,
topic := Topic
},
#{server := Server}
) ->
#{
id => emqx_guid:to_hexstr(emqx_guid:gen()),
server => Server,
payload => Payload,
topic => Topic,
qos => QoS,
dup => Dup,
retain => Retain,
pub_props => printable_maps(Props),
message_received_at => erlang:system_time(millisecond)
}.
printable_maps(undefined) ->
#{};
printable_maps(Headers) ->
maps:fold(
fun
('User-Property', V0, AccIn) when is_list(V0) ->
AccIn#{
'User-Property' => maps:from_list(V0),
'User-Property-Pairs' => [
#{
key => Key,
value => Value
}
|| {Key, Value} <- V0
]
};
(K, V0, AccIn) ->
AccIn#{K => V0}
end,
#{},
Headers
).

View File

@ -60,172 +60,252 @@
%% * Local messages are all normalised to QoS-1 when exporting to remote %% * Local messages are all normalised to QoS-1 when exporting to remote
-module(emqx_connector_mqtt_worker). -module(emqx_connector_mqtt_worker).
-behaviour(gen_statem).
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% APIs %% APIs
-export([ -export([
start_link/1, start_link/2,
stop/1 stop/1,
]). pid/1
%% gen_statem callbacks
-export([
terminate/3,
code_change/4,
init/1,
callback_mode/0
]).
%% state functions
-export([
idle/3,
connected/3
]). ]).
%% management APIs %% management APIs
-export([ -export([
ensure_started/1, connect/1,
ensure_stopped/1,
status/1, status/1,
ping/1, ping/1,
send_to_remote/2, send_to_remote/2,
send_to_remote_async/3 send_to_remote_async/3
]). ]).
-export([get_forwards/1]). -export([handle_publish/3]).
-export([handle_disconnect/1]).
-export([get_subscriptions/1]).
-export_type([ -export_type([
config/0, config/0,
ack_ref/0 ack_ref/0
]). ]).
-type id() :: atom() | string() | pid(). -type name() :: term().
-type qos() :: emqx_types:qos(). % -type qos() :: emqx_types:qos().
-type config() :: map(). -type config() :: map().
-type ack_ref() :: term(). -type ack_ref() :: term().
-type topic() :: emqx_types:topic(). % -type topic() :: emqx_types:topic().
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
%% same as default in-flight limit for emqtt -define(REF(Name), {via, gproc, ?NAME(Name)}).
-define(DEFAULT_INFLIGHT_SIZE, 32). -define(NAME(Name), {n, l, Name}).
-define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)).
%% @doc Start a bridge worker. Supported configs: %% @doc Start a bridge worker. Supported configs:
%% start_type: 'manual' (default) or 'auto', when manual, bridge will stay
%% at 'idle' state until a manual call to start it.
%% connect_module: The module which implements emqx_bridge_connect behaviour
%% and work as message batch transport layer
%% reconnect_interval: Delay in milli-seconds for the bridge worker to retry
%% in case of transportation failure.
%% max_inflight: Max number of batches allowed to send-ahead before receiving
%% confirmation from remote node/cluster
%% mountpoint: The topic mount point for messages sent to remote node/cluster %% mountpoint: The topic mount point for messages sent to remote node/cluster
%% `undefined', `<<>>' or `""' to disable %% `undefined', `<<>>' or `""' to disable
%% forwards: Local topics to subscribe. %% forwards: Local topics to subscribe.
%% %%
%% Find more connection specific configs in the callback modules %% Find more connection specific configs in the callback modules
%% of emqx_bridge_connect behaviour. %% of emqx_bridge_connect behaviour.
start_link(Opts) when is_list(Opts) -> -spec start_link(name(), map()) ->
start_link(maps:from_list(Opts)); {ok, pid()} | {error, _Reason}.
start_link(Opts) -> start_link(Name, BridgeOpts) ->
case maps:get(name, Opts, undefined) of ?SLOG(debug, #{
undefined -> msg => "client_starting",
gen_statem:start_link(?MODULE, Opts, []); name => Name,
Name -> options => BridgeOpts
Name1 = name(Name), }),
gen_statem:start_link({local, Name1}, ?MODULE, Opts#{name => Name1}, []) Conf = init_config(BridgeOpts),
Options = mk_client_options(Conf, BridgeOpts),
case emqtt:start_link(Options) of
{ok, Pid} ->
true = gproc:reg_other(?NAME(Name), Pid, Conf),
{ok, Pid};
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_start_failed",
config => emqx_misc:redact(BridgeOpts),
reason => Reason
}),
Error
end. end.
ensure_started(Name) -> init_config(Opts) ->
gen_statem:call(name(Name), ensure_started).
%% @doc Manually stop bridge worker. State idempotency ensured.
ensure_stopped(Name) ->
gen_statem:call(name(Name), ensure_stopped, 5000).
stop(Pid) -> gen_statem:stop(Pid).
status(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, status);
status(Name) ->
gen_statem:call(name(Name), status).
ping(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, ping);
ping(Name) ->
gen_statem:call(name(Name), ping).
send_to_remote(Pid, Msg) when is_pid(Pid) ->
gen_statem:call(Pid, {send_to_remote, Msg});
send_to_remote(Name, Msg) ->
gen_statem:call(name(Name), {send_to_remote, Msg}).
send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) ->
gen_statem:call(Pid, {send_to_remote_async, Msg, Callback});
send_to_remote_async(Name, Msg, Callback) ->
gen_statem:call(name(Name), {send_to_remote_async, Msg, Callback}).
%% @doc Return all forwards (local subscriptions).
-spec get_forwards(id()) -> [topic()].
get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)).
%% @doc Return all subscriptions (subscription over mqtt connection to remote broker).
-spec get_subscriptions(id()) -> [{emqx_types:topic(), qos()}].
get_subscriptions(Name) -> gen_statem:call(name(Name), get_subscriptions).
callback_mode() -> [state_functions].
%% @doc Config should be a map().
init(#{name := Name} = ConnectOpts) ->
?SLOG(debug, #{
msg => "starting_bridge_worker",
name => Name
}),
erlang:process_flag(trap_exit, true),
State = init_state(ConnectOpts),
self() ! idle,
{ok, idle, State#{
connect_opts => pre_process_opts(ConnectOpts)
}}.
init_state(Opts) ->
ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS),
StartType = maps:get(start_type, Opts, manual),
Mountpoint = maps:get(forward_mountpoint, Opts, undefined), Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_INFLIGHT_SIZE), Subscriptions = maps:get(subscriptions, Opts, undefined),
Name = maps:get(name, Opts, undefined), Forwards = maps:get(forwards, Opts, undefined),
#{ #{
start_type => StartType,
reconnect_interval => ReconnDelayMs,
mountpoint => format_mountpoint(Mountpoint), mountpoint => format_mountpoint(Mountpoint),
max_inflight => MaxInflightSize, subscriptions => pre_process_subscriptions(Subscriptions),
connection => undefined, forwards => pre_process_forwards(Forwards)
name => Name
}. }.
pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) -> mk_client_options(Conf, BridgeOpts) ->
ConnectOpts#{ Server = iolist_to_binary(maps:get(server, BridgeOpts)),
subscriptions => pre_process_in_out(in, InConf), HostPort = emqx_connector_mqtt_schema:parse_server(Server),
forwards => pre_process_in_out(out, OutConf) Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined),
Subscriptions = maps:get(subscriptions, Conf),
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
Opts = maps:without(
[
address,
auto_reconnect,
conn_type,
mountpoint,
forwards,
receive_mountpoint,
subscriptions
],
BridgeOpts
),
Opts#{
msg_handler => mk_client_event_handler(Vars, #{server => Server}),
hosts => [HostPort],
force_ping => true,
proto_ver => maps:get(proto_ver, BridgeOpts, v4)
}. }.
pre_process_in_out(_, undefined) -> mk_client_event_handler(Vars, Opts) when Vars /= undefined ->
#{
publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]},
disconnected => {fun ?MODULE:handle_disconnect/1, []}
};
mk_client_event_handler(undefined, _Opts) ->
undefined.
connect(Name) ->
#{subscriptions := Subscriptions} = get_config(Name),
case emqtt:connect(pid(Name)) of
{ok, Properties} ->
case subscribe_remote_topics(Name, Subscriptions) of
ok ->
{ok, Properties};
{ok, _, _RCs} ->
{ok, Properties};
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_subscribe_failed",
subscriptions => Subscriptions,
reason => Reason
}),
Error
end;
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_connect_failed",
reason => Reason
}),
Error
end.
subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) ->
emqtt:subscribe(ref(Ref), FromTopic, QoS);
subscribe_remote_topics(_Ref, undefined) ->
ok.
stop(Ref) ->
emqtt:stop(ref(Ref)).
pid(Name) ->
gproc:lookup_pid(?NAME(Name)).
status(Ref) ->
trycall(
fun() ->
Info = emqtt:info(ref(Ref)),
case proplists:get_value(socket, Info) of
Socket when Socket /= undefined ->
connected;
undefined ->
connecting
end
end,
#{noproc => disconnected}
).
ping(Ref) ->
emqtt:ping(ref(Ref)).
send_to_remote(Name, MsgIn) ->
trycall(
fun() -> do_send(Name, export_msg(Name, MsgIn)) end,
#{
badarg => {error, disconnected},
noproc => {error, disconnected}
}
).
do_send(Name, {true, Msg}) ->
case emqtt:publish(pid(Name), Msg) of
ok ->
ok;
{ok, #{reason_code := RC}} when
RC =:= ?RC_SUCCESS;
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
->
ok;
{ok, #{reason_code := RC, reason_code_name := Reason}} ->
?SLOG(warning, #{
msg => "remote_publish_failed",
message => Msg,
reason_code => RC,
reason_code_name => Reason
}),
{error, Reason};
{error, Reason} ->
?SLOG(info, #{
msg => "client_failed",
reason => Reason
}),
{error, Reason}
end;
do_send(_Name, false) ->
ok.
send_to_remote_async(Name, MsgIn, Callback) ->
trycall(
fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end,
#{badarg => {error, disconnected}}
).
do_send_async(Name, {true, Msg}, Callback) ->
emqtt:publish_async(pid(Name), Msg, _Timeout = infinity, Callback);
do_send_async(_Name, false, _Callback) ->
ok.
ref(Pid) when is_pid(Pid) ->
Pid;
ref(Term) ->
?REF(Term).
trycall(Fun, Else) ->
try
Fun()
catch
error:badarg ->
maps:get(badarg, Else);
exit:{noproc, _} ->
maps:get(noproc, Else)
end.
format_mountpoint(undefined) ->
undefined; undefined;
pre_process_in_out(in, #{local := LC} = Conf) when is_map(Conf) -> format_mountpoint(Prefix) ->
binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
pre_process_subscriptions(undefined) ->
undefined;
pre_process_subscriptions(#{local := LC} = Conf) when is_map(Conf) ->
Conf#{local => pre_process_in_out_common(LC)}; Conf#{local => pre_process_in_out_common(LC)};
pre_process_in_out(in, Conf) when is_map(Conf) -> pre_process_subscriptions(Conf) when is_map(Conf) ->
%% have no 'local' field in the config %% have no 'local' field in the config
undefined.
pre_process_forwards(undefined) ->
undefined; undefined;
pre_process_in_out(out, #{remote := RC} = Conf) when is_map(Conf) -> pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) ->
Conf#{remote => pre_process_in_out_common(RC)}; Conf#{remote => pre_process_in_out_common(RC)};
pre_process_in_out(out, Conf) when is_map(Conf) -> pre_process_forwards(Conf) when is_map(Conf) ->
%% have no 'remote' field in the config %% have no 'remote' field in the config
undefined. undefined.
@ -245,241 +325,112 @@ pre_process_conf(Key, Conf) ->
Conf#{Key => Val} Conf#{Key => Val}
end. end.
code_change(_Vsn, State, Data, _Extra) -> get_config(Name) ->
{ok, State, Data}. gproc:lookup_value(?NAME(Name)).
terminate(_Reason, _StateName, State) -> export_msg(Name, Msg) ->
_ = disconnect(State), case get_config(Name) of
maybe_destroy_session(State). #{forwards := Forwards = #{}, mountpoint := Mountpoint} ->
{true, export_msg(Mountpoint, Forwards, Msg)};
#{forwards := undefined} ->
?SLOG(error, #{
msg => "forwarding_unavailable",
message => Msg,
reason => "egress is not configured"
}),
false
end.
maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) -> export_msg(Mountpoint, Forwards, Msg) ->
try Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
%% Destroy session if clean_start is not set. emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars).
%% Ignore any crashes, just refresh the clean_start = true.
_ = do_connect(State#{connect_opts => ConnectOpts#{clean_start => true}}), %%
_ = disconnect(State),
ok handle_publish(#{properties := Props} = MsgIn, Vars, Opts) ->
catch Msg = import_msg(MsgIn, Opts),
_:_ -> ?SLOG(debug, #{
msg => "publish_local",
message => Msg,
vars => Vars
}),
case Vars of
#{on_message_received := {Mod, Func, Args}} ->
_ = erlang:apply(Mod, Func, [Msg | Args]);
_ ->
ok ok
end; end,
maybe_destroy_session(_State) -> maybe_publish_local(Msg, Vars, Props).
handle_disconnect(_Reason) ->
ok. ok.
%% ensure_started will be deprecated in the future maybe_publish_local(Msg, Vars, Props) ->
idle({call, From}, ensure_started, State) -> case emqx_map_lib:deep_get([local, topic], Vars, undefined) of
case do_connect(State) of %% local topic is not set, discard it
{ok, State1} -> undefined ->
{next_state, connected, State1, {reply, From, ok}}; ok;
{error, Reason, _State} ->
{keep_state_and_data, {reply, From, {error, Reason}}}
end;
idle({call, From}, {send_to_remote, _}, _State) ->
{keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}};
idle({call, From}, {send_to_remote_async, _, _}, _State) ->
{keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}};
%% @doc Standing by for manual start.
idle(info, idle, #{start_type := manual}) ->
keep_state_and_data;
%% @doc Standing by for auto start.
idle(info, idle, #{start_type := auto} = State) ->
connecting(State);
idle(state_timeout, reconnect, State) ->
connecting(State);
idle(Type, Content, State) ->
common(idle, Type, Content, State).
connecting(#{reconnect_interval := ReconnectDelayMs} = State) ->
case do_connect(State) of
{ok, State1} ->
{next_state, connected, State1};
_ -> _ ->
{keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}} emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props))
end. end.
connected({call, From}, {send_to_remote, Msg}, State) -> import_msg(
case do_send(State, Msg) of
{ok, NState} ->
{keep_state, NState, {reply, From, ok}};
{error, Reason} ->
{keep_state_and_data, {reply, From, {error, Reason}}}
end;
connected(
{call, From},
{send_to_remote_async, Msg, Callback},
State = #{connection := Connection}
) ->
_ = do_send_async(State, Msg, Callback),
{keep_state, State, {reply, From, {ok, emqx_connector_mqtt_mod:info(pid, Connection)}}};
connected(
info,
{disconnected, Conn, Reason},
#{connection := Connection, name := Name, reconnect_interval := ReconnectDelayMs} = State
) ->
?tp(info, disconnected, #{name => Name, reason => Reason}),
case Conn =:= maps:get(client_pid, Connection, undefined) of
true ->
{next_state, idle, State#{connection => undefined},
{state_timeout, ReconnectDelayMs, reconnect}};
false ->
keep_state_and_data
end;
connected(Type, Content, State) ->
common(connected, Type, Content, State).
%% Common handlers
common(StateName, {call, From}, status, _State) ->
{keep_state_and_data, {reply, From, StateName}};
common(_StateName, {call, From}, ping, #{connection := Conn} = _State) ->
Reply = emqx_connector_mqtt_mod:ping(Conn),
{keep_state_and_data, {reply, From, Reply}};
common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) ->
{keep_state_and_data, {reply, From, ok}};
common(_StateName, {call, From}, ensure_stopped, #{connection := Conn} = State) ->
Reply = emqx_connector_mqtt_mod:stop(Conn),
{next_state, idle, State#{connection => undefined}, {reply, From, Reply}};
common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) ->
{keep_state_and_data, {reply, From, Forwards}};
common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
{keep_state_and_data, {reply, From, maps:get(subscriptions, Connection, #{})}};
common(_StateName, {call, From}, Req, _State) ->
{keep_state_and_data, {reply, From, {error, {unsupported_request, Req}}}};
common(_StateName, info, {'EXIT', _, _}, State) ->
{keep_state, State};
common(StateName, Type, Content, #{name := Name} = State) ->
?SLOG(error, #{
msg => "bridge_discarded_event",
name => Name,
type => Type,
state_name => StateName,
content => Content
}),
{keep_state, State}.
do_connect(
#{ #{
connect_opts := ConnectOpts, dup := Dup,
name := Name payload := Payload,
} = State properties := Props,
) -> qos := QoS,
case emqx_connector_mqtt_mod:start(ConnectOpts) of retain := Retain,
{ok, Conn} -> topic := Topic
?tp(info, connected, #{name => Name}),
{ok, State#{connection => Conn}};
{error, Reason} ->
ConnectOpts1 = obfuscate(ConnectOpts),
?SLOG(error, #{
msg => "failed_to_connect",
config => ConnectOpts1,
reason => Reason
}),
{error, Reason, State}
end.
do_send(#{connect_opts := #{forwards := undefined}}, Msg) ->
?SLOG(error, #{
msg =>
"cannot_forward_messages_to_remote_broker"
"_as_'egress'_is_not_configured",
messages => Msg
});
do_send(
#{
connection := Connection,
mountpoint := Mountpoint,
connect_opts := #{forwards := Forwards}
} = State,
Msg
) ->
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars),
?SLOG(debug, #{
msg => "publish_to_remote_broker",
message => Msg,
vars => Vars
}),
case emqx_connector_mqtt_mod:send(Connection, ExportMsg) of
ok ->
{ok, State};
{ok, #{reason_code := RC}} when
RC =:= ?RC_SUCCESS;
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS
->
{ok, State};
{ok, #{reason_code := RC, reason_code_name := RCN}} ->
?SLOG(warning, #{
msg => "publish_to_remote_node_falied",
message => Msg,
reason_code => RC,
reason_code_name => RCN
}),
{error, RCN};
{error, Reason} ->
?SLOG(info, #{
msg => "mqtt_bridge_produce_failed",
reason => Reason
}),
{error, Reason}
end.
do_send_async(#{connect_opts := #{forwards := undefined}}, Msg, _Callback) ->
%% TODO: eval callback with undefined error
?SLOG(error, #{
msg =>
"cannot_forward_messages_to_remote_broker"
"_as_'egress'_is_not_configured",
messages => Msg
});
do_send_async(
#{
connection := Connection,
mountpoint := Mountpoint,
connect_opts := #{forwards := Forwards}
}, },
Msg, #{server := Server}
Callback
) -> ) ->
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), #{
ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars), id => emqx_guid:to_hexstr(emqx_guid:gen()),
?SLOG(debug, #{ server => Server,
msg => "publish_to_remote_broker", payload => Payload,
message => Msg, topic => Topic,
vars => Vars qos => QoS,
}), dup => Dup,
emqx_connector_mqtt_mod:send_async(Connection, ExportMsg, Callback). retain => Retain,
pub_props => printable_maps(Props),
message_received_at => erlang:system_time(millisecond)
}.
disconnect(#{connection := Conn} = State) when Conn =/= undefined -> printable_maps(undefined) ->
emqx_connector_mqtt_mod:stop(Conn), #{};
State#{connection => undefined}; printable_maps(Headers) ->
disconnect(State) ->
State.
format_mountpoint(undefined) ->
undefined;
format_mountpoint(Prefix) ->
binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
name(Id) -> list_to_atom(str(Id)).
obfuscate(Map) ->
maps:fold( maps:fold(
fun(K, V, Acc) -> fun
case is_sensitive(K) of ('User-Property', V0, AccIn) when is_list(V0) ->
true -> [{K, '***'} | Acc]; AccIn#{
false -> [{K, V} | Acc] 'User-Property' => maps:from_list(V0),
end 'User-Property-Pairs' => [
#{
key => Key,
value => Value
}
|| {Key, Value} <- V0
]
};
(K, V0, AccIn) ->
AccIn#{K => V0}
end, end,
[], #{},
Map Headers
). ).
is_sensitive(password) -> true; %% TODO
is_sensitive(ssl_opts) -> true; % maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) ->
is_sensitive(_) -> false. % try
% %% Destroy session if clean_start is not set.
str(A) when is_atom(A) -> % %% Ignore any crashes, just refresh the clean_start = true.
atom_to_list(A); % _ = do_connect(State#{connect_opts => ConnectOpts#{clean_start => true}}),
str(B) when is_binary(B) -> % _ = disconnect(State),
binary_to_list(B); % ok
str(S) when is_list(S) -> % catch
S. % _:_ ->
% ok
% end;
% maybe_destroy_session(_State) ->
% ok.

View File

@ -1,60 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt_tests).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
send_and_ack_test() ->
%% delegate from gen_rpc to rpc for unit test
meck:new(emqtt, [passthrough, no_history]),
meck:expect(
emqtt,
start_link,
1,
fun(_) ->
{ok, spawn_link(fun() -> ok end)}
end
),
meck:expect(emqtt, connect, 1, {ok, dummy}),
meck:expect(
emqtt,
stop,
1,
fun(Pid) -> Pid ! stop end
),
meck:expect(
emqtt,
publish,
2,
fun(Client, Msg) ->
Client ! {publish, Msg},
%% as packet id
{ok, Msg}
end
),
try
Max = 1,
Batch = lists:seq(1, Max),
{ok, Conn} = emqx_connector_mqtt_mod:start(#{server => "127.0.0.1:1883"}),
%% return last packet id as batch reference
{ok, _AckRef} = emqx_connector_mqtt_mod:send(Conn, Batch),
ok = emqx_connector_mqtt_mod:stop(Conn)
after
meck:unload(emqtt)
end.

View File

@ -1,101 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_mqtt_worker_tests).
-include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-define(BRIDGE_NAME, test).
-define(BRIDGE_REG_NAME, emqx_connector_mqtt_worker_test).
-define(WAIT(PATTERN, TIMEOUT),
receive
PATTERN ->
ok
after TIMEOUT ->
error(timeout)
end
).
-export([start/1, send/2, stop/1]).
start(#{connect_result := Result, test_pid := Pid, test_ref := Ref}) ->
case is_pid(Pid) of
true -> Pid ! {connection_start_attempt, Ref};
false -> ok
end,
Result.
send(SendFun, Batch) when is_function(SendFun, 2) ->
SendFun(Batch).
stop(_Pid) -> ok.
%% connect first, disconnect, then connect again
disturbance_test() ->
meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]),
meck:expect(emqx_connector_mqtt_mod, start, 1, fun(Conf) -> start(Conf) end),
meck:expect(emqx_connector_mqtt_mod, send, 2, fun(SendFun, Batch) -> send(SendFun, Batch) end),
meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end),
try
emqx_metrics:start_link(),
Ref = make_ref(),
TestPid = self(),
Config = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}),
{ok, Pid} = emqx_connector_mqtt_worker:start_link(Config#{name => bridge_disturbance}),
?assertEqual(Pid, whereis(bridge_disturbance)),
?WAIT({connection_start_attempt, Ref}, 1000),
Pid ! {disconnected, TestPid, test},
?WAIT({connection_start_attempt, Ref}, 1000),
emqx_metrics:stop(),
ok = emqx_connector_mqtt_worker:stop(Pid)
after
meck:unload(emqx_connector_mqtt_mod)
end.
manual_start_stop_test() ->
meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]),
meck:expect(emqx_connector_mqtt_mod, start, 1, fun(Conf) -> start(Conf) end),
meck:expect(emqx_connector_mqtt_mod, send, 2, fun(SendFun, Batch) -> send(SendFun, Batch) end),
meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end),
try
emqx_metrics:start_link(),
Ref = make_ref(),
TestPid = self(),
BridgeName = manual_start_stop,
Config0 = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}),
Config = Config0#{start_type := manual},
{ok, Pid} = emqx_connector_mqtt_worker:start_link(Config#{name => BridgeName}),
%% call ensure_started again should yield the same result
ok = emqx_connector_mqtt_worker:ensure_started(BridgeName),
emqx_connector_mqtt_worker:ensure_stopped(BridgeName),
emqx_metrics:stop(),
ok = emqx_connector_mqtt_worker:stop(Pid)
after
meck:unload(emqx_connector_mqtt_mod)
end.
make_config(Ref, TestPid, Result) ->
#{
start_type => auto,
subscriptions => undefined,
forwards => undefined,
reconnect_interval => 50,
test_pid => TestPid,
test_ref => Ref,
connect_result => Result
}.