feat(mqttconn): stop using gproc in hot path

Also drop fiddling with `mountpoint` since this option seems not to be
used anywhere.
This commit is contained in:
Andrew Mayorov 2023-05-18 20:53:42 +03:00
parent 4da0d83faf
commit bd956d00b6
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 186 additions and 232 deletions

View File

@ -256,11 +256,11 @@ t_mqtt_egress_bridge_ignores_clean_start(_) ->
}
),
{ok, _, #{state := #{name := WorkerName}}} =
{ok, _, #{state := #{worker := WorkerPid}}} =
emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)),
?assertMatch(
#{clean_start := true},
maps:from_list(emqx_connector_mqtt_worker:info(WorkerName))
maps:from_list(emqx_connector_mqtt_worker:info(WorkerPid))
),
%% delete the bridge

View File

@ -25,8 +25,8 @@
callback_mode/0,
start_link/0,
init/1,
create_bridge/1,
drop_bridge/1,
create_bridge/2,
remove_bridge/1,
bridges/0
]).
@ -56,11 +56,10 @@ init([]) ->
},
{ok, {SupFlag, []}}.
bridge_spec(Config) ->
{Name, NConfig} = maps:take(name, Config),
bridge_spec(Name, Options) ->
#{
id => Name,
start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]},
start => {emqx_connector_mqtt_worker, start_link, [Name, Options]},
restart => temporary,
shutdown => 1000
}.
@ -72,10 +71,10 @@ bridges() ->
|| {Name, _Pid, _, _} <- supervisor:which_children(?MODULE)
].
create_bridge(Config) ->
supervisor:start_child(?MODULE, bridge_spec(Config)).
create_bridge(Name, Options) ->
supervisor:start_child(?MODULE, bridge_spec(Name, Options)).
drop_bridge(Name) ->
remove_bridge(Name) ->
case supervisor:terminate_child(?MODULE, Name) of
ok ->
supervisor:delete_child(?MODULE, Name);
@ -95,36 +94,39 @@ on_message_received(Msg, HookPoint, ResId) ->
%% ===================================================================
callback_mode() -> async_if_possible.
on_start(InstanceId, Conf) ->
on_start(ResourceId, Conf) ->
?SLOG(info, #{
msg => "starting_mqtt_connector",
connector => InstanceId,
connector => ResourceId,
config => emqx_utils:redact(Conf)
}),
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{
name => InstanceId,
clientid => clientid(InstanceId, Conf),
subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstanceId),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
BridgeOpts = BasicConf#{
clientid => clientid(ResourceId, Conf),
subscriptions => make_sub_confs(maps:get(ingress, Conf, #{}), Conf, ResourceId),
forwards => maps:get(egress, Conf, #{})
},
case ?MODULE:create_bridge(BridgeConf) of
{ok, _Pid} ->
ensure_mqtt_worker_started(InstanceId, BridgeConf);
case create_bridge(ResourceId, BridgeOpts) of
{ok, Pid, {ConnProps, WorkerConf}} ->
{ok, #{
name => ResourceId,
worker => Pid,
config => WorkerConf,
props => ConnProps
}};
{error, {already_started, _Pid}} ->
ok = ?MODULE:drop_bridge(InstanceId),
{ok, _} = ?MODULE:create_bridge(BridgeConf),
ensure_mqtt_worker_started(InstanceId, BridgeConf);
ok = remove_bridge(ResourceId),
on_start(ResourceId, Conf);
{error, Reason} ->
{error, Reason}
end.
on_stop(_InstId, #{name := InstanceId}) ->
on_stop(ResourceId, #{}) ->
?SLOG(info, #{
msg => "stopping_mqtt_connector",
connector => InstanceId
connector => ResourceId
}),
case ?MODULE:drop_bridge(InstanceId) of
case remove_bridge(ResourceId) of
ok ->
ok;
{error, not_found} ->
@ -132,24 +134,24 @@ on_stop(_InstId, #{name := InstanceId}) ->
{error, Reason} ->
?SLOG(error, #{
msg => "stop_mqtt_connector_error",
connector => InstanceId,
connector => ResourceId,
reason => Reason
})
end.
on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
case emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg) of
on_query(ResourceId, {send_message, Msg}, #{worker := Pid, config := Config}) ->
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
case emqx_connector_mqtt_worker:send_to_remote(Pid, Msg, Config) of
ok ->
ok;
{error, Reason} ->
classify_error(Reason)
end.
on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{worker := Pid, config := Config}) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
Callback = {fun on_async_result/2, [CallbackIn]},
case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of
case emqx_connector_mqtt_worker:send_to_remote_async(Pid, Msg, Callback, Config) of
ok ->
ok;
{ok, Pid} ->
@ -172,8 +174,8 @@ apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
erlang:apply(M, F, A ++ [Result]).
on_get_status(_InstId, #{name := InstanceId}) ->
emqx_connector_mqtt_worker:status(InstanceId).
on_get_status(_ResourceId, #{worker := Pid}) ->
emqx_connector_mqtt_worker:status(Pid).
classify_error(disconnected = Reason) ->
{error, {recoverable_error, Reason}};
@ -186,33 +188,13 @@ classify_error(shutdown = Reason) ->
classify_error(Reason) ->
{error, {unrecoverable_error, Reason}}.
ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
case emqx_connector_mqtt_worker:connect(InstanceId) of
{ok, Properties} ->
{ok, #{name => InstanceId, config => BridgeConf, props => Properties}};
{error, Reason} ->
{error, Reason}
end.
make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
undefined;
make_sub_confs(undefined, _Conf, _) ->
undefined;
make_sub_confs(SubRemoteConf, Conf, ResourceId) ->
case maps:find(hookpoint, Conf) of
error ->
error({no_hookpoint_provided, Conf});
{ok, HookPoint} ->
MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]},
SubRemoteConf#{on_message_received => MFA}
end.
make_forward_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
undefined;
make_forward_confs(undefined) ->
undefined;
make_forward_confs(FrowardConf) ->
FrowardConf.
make_sub_confs(Subscriptions, _Conf, _) when map_size(Subscriptions) == 0 ->
Subscriptions;
make_sub_confs(Subscriptions, #{hookpoint := HookPoint}, ResourceId) ->
MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]},
Subscriptions#{on_message_received => MFA};
make_sub_confs(_SubRemoteConf, Conf, ResourceId) ->
error({no_hookpoint_provided, ResourceId, Conf}).
basic_config(
#{
@ -227,17 +209,14 @@ basic_config(
} = Conf
) ->
BasicConf = #{
%% connection opts
server => Server,
%% 30s
connect_timeout => 30,
auto_reconnect => true,
proto_ver => ProtoVer,
%% Opening bridge_mode will form a non-standard mqtt connection message.
%% Opening a connection in bridge mode will form a non-standard mqtt connection message.
%% A load balancing server (such as haproxy) is often set up before the emqx broker server.
%% When the load balancing server enables mqtt connection packet inspection,
%% non-standard mqtt connection packets will be filtered out by LB.
%% So let's disable bridge_mode.
%% non-standard mqtt connection packets might be filtered out by LB.
bridge_mode => BridgeMode,
keepalive => ms_to_s(KeepAlive),
clean_start => CleanStart,
@ -246,18 +225,9 @@ basic_config(
ssl => EnableSsl,
ssl_opts => maps:to_list(maps:remove(enable, Ssl))
},
maybe_put_fields([username, password], Conf, BasicConf).
maybe_put_fields(Fields, Conf, Acc0) ->
lists:foldl(
fun(Key, Acc) ->
case maps:find(Key, Conf) of
error -> Acc;
{ok, Val} -> Acc#{Key => Val}
end
end,
Acc0,
Fields
maps:merge(
BasicConf,
maps:with([username, password], Conf)
).
ms_to_s(Ms) ->

View File

@ -17,7 +17,6 @@
-module(emqx_connector_mqtt_msg).
-export([
make_pub_vars/2,
to_remote_msg/2,
to_broker_msg/3
]).
@ -46,11 +45,6 @@
remote := remote_config()
}.
make_pub_vars(_, undefined) ->
undefined;
make_pub_vars(Mountpoint, Conf) when is_map(Conf) ->
Conf#{mountpoint => Mountpoint}.
%% @doc Make export format:
%% 1. Mount topic to a prefix
%% 2. Fix QoS to 1
@ -70,8 +64,7 @@ to_remote_msg(MapMsg, #{
topic := TopicToken,
qos := QoSToken,
retain := RetainToken
} = Remote,
mountpoint := Mountpoint
} = Remote
}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(Remote, MapMsg),
@ -81,12 +74,10 @@ to_remote_msg(MapMsg, #{
#mqtt_msg{
qos = QoS,
retain = Retain,
topic = topic(Mountpoint, Topic),
topic = Topic,
props = emqx_utils:pub_props_to_packet(PubProps),
payload = Payload
};
to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}.
}.
%% published from remote node over a MQTT connection
to_broker_msg(Msg, Vars, undefined) ->
@ -98,8 +89,7 @@ to_broker_msg(
topic := TopicToken,
qos := QoSToken,
retain := RetainToken
} = Local,
mountpoint := Mountpoint
} = Local
},
Props
) ->
@ -112,7 +102,7 @@ to_broker_msg(
Props#{properties => emqx_utils:pub_props_to_packet(PubProps)},
emqx_message:set_flags(
#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)
emqx_message:make(bridge, QoS, Topic, Payload)
)
).
@ -142,5 +132,3 @@ replace_simple_var(Val, _Data) ->
set_headers(Val, Msg) ->
emqx_message:set_headers(Val, Msg).
topic(undefined, Topic) -> Topic;
topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic).

View File

@ -72,34 +72,75 @@
%% management APIs
-export([
connect/1,
status/1,
ping/1,
info/1,
send_to_remote/2,
send_to_remote_async/3
send_to_remote/3,
send_to_remote_async/4
]).
-export([handle_publish/3]).
-export([handle_disconnect/1]).
-export_type([
config/0,
ack_ref/0
]).
-export_type([config/0]).
-type template() :: emqx_plugin_libs_rule:tmpl_token().
-type name() :: term().
% -type qos() :: emqx_types:qos().
-type config() :: map().
-type ack_ref() :: term().
% -type topic() :: emqx_types:topic().
-type options() :: #{
% endpoint
server := iodata(),
% emqtt client options
proto_ver := v3 | v4 | v5,
username := binary(),
password := binary(),
clientid := binary(),
clean_start := boolean(),
max_inflight := pos_integer(),
connect_timeout := pos_integer(),
retry_interval := timeout(),
bridge_mode := boolean(),
ssl := boolean(),
ssl_opts := proplists:proplist(),
% bridge options
subscriptions := map(),
forwards := map()
}.
-type config() :: #{
subscriptions := subscriptions() | undefined,
forwards := forwards() | undefined
}.
-type subscriptions() :: #{
remote := #{
topic := emqx_topic:topic(),
qos => emqx_types:qos()
},
local := #{
topic => template(),
qos => template() | emqx_types:qos(),
retain => template() | boolean(),
payload => template() | undefined
},
on_message_received := {module(), atom(), [term()]}
}.
-type forwards() :: #{
local => #{
topic => emqx_topic:topic()
},
remote := #{
topic := template(),
qos => template() | emqx_types:qos(),
retain => template() | boolean(),
payload => template() | undefined
}
}.
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-define(REF(Name), {via, gproc, ?NAME(Name)}).
-define(NAME(Name), {n, l, Name}).
%% @doc Start a bridge worker. Supported configs:
%% mountpoint: The topic mount point for messages sent to remote node/cluster
%% `undefined', `<<>>' or `""' to disable
@ -107,20 +148,19 @@
%%
%% Find more connection specific configs in the callback modules
%% of emqx_bridge_connect behaviour.
-spec start_link(name(), map()) ->
{ok, pid()} | {error, _Reason}.
-spec start_link(name(), options()) ->
{ok, pid(), {emqtt:properties(), config()}} | {error, _Reason}.
start_link(Name, BridgeOpts) ->
?SLOG(debug, #{
msg => "client_starting",
name => Name,
options => BridgeOpts
}),
Conf = init_config(Name, BridgeOpts),
Options = mk_client_options(Conf, BridgeOpts),
Config = init_config(Name, BridgeOpts),
Options = mk_client_options(Config, BridgeOpts),
case emqtt:start_link(Options) of
{ok, Pid} ->
true = gproc:reg_other(?NAME(Name), Pid, Conf),
{ok, Pid};
connect(Pid, Name, Config);
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_start_failed",
@ -130,22 +170,50 @@ start_link(Name, BridgeOpts) ->
Error
end.
connect(Pid, Name, Config = #{subscriptions := Subscriptions}) ->
case emqtt:connect(Pid) of
{ok, Props} ->
case subscribe_remote_topics(Pid, Subscriptions) of
ok ->
{ok, Pid, {Props, Config}};
{ok, _, _RCs} ->
{ok, Pid, {Props, Config}};
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_subscribe_failed",
subscriptions => Subscriptions,
reason => Reason
}),
_ = emqtt:stop(Pid),
Error
end;
{error, Reason} = Error ->
?SLOG(warning, #{
msg => "client_connect_failed",
reason => Reason,
name => Name
}),
_ = emqtt:stop(Pid),
Error
end.
subscribe_remote_topics(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) ->
emqtt:subscribe(Pid, RemoteTopic, QoS);
subscribe_remote_topics(_Ref, undefined) ->
ok.
init_config(Name, Opts) ->
Mountpoint = maps:get(forward_mountpoint, Opts, undefined),
Subscriptions = maps:get(subscriptions, Opts, undefined),
Forwards = maps:get(forwards, Opts, undefined),
#{
mountpoint => format_mountpoint(Mountpoint),
subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts),
forwards => pre_process_forwards(Forwards)
}.
mk_client_options(Conf, BridgeOpts) ->
mk_client_options(Config, BridgeOpts) ->
Server = iolist_to_binary(maps:get(server, BridgeOpts)),
HostPort = emqx_connector_mqtt_schema:parse_server(Server),
Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined),
Subscriptions = maps:get(subscriptions, Conf),
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions),
Subscriptions = maps:get(subscriptions, Config),
CleanStart =
case Subscriptions of
#{remote := _} ->
@ -156,24 +224,26 @@ mk_client_options(Conf, BridgeOpts) ->
%% to ensure proper session recovery according to the MQTT spec.
true
end,
Opts = maps:without(
Opts = maps:with(
[
address,
auto_reconnect,
conn_type,
mountpoint,
forwards,
receive_mountpoint,
subscriptions
proto_ver,
username,
password,
clientid,
max_inflight,
connect_timeout,
retry_interval,
bridge_mode,
ssl,
ssl_opts
],
BridgeOpts
),
Opts#{
msg_handler => mk_client_event_handler(Vars, #{server => Server}),
msg_handler => mk_client_event_handler(Subscriptions, #{server => Server}),
hosts => [HostPort],
clean_start => CleanStart,
force_ping => true,
proto_ver => maps:get(proto_ver, BridgeOpts, v4)
force_ping => true
}.
mk_client_event_handler(Vars, Opts) when Vars /= undefined ->
@ -184,45 +254,15 @@ mk_client_event_handler(Vars, Opts) when Vars /= undefined ->
mk_client_event_handler(undefined, _Opts) ->
undefined.
connect(Name) ->
#{subscriptions := Subscriptions} = get_config(Name),
case emqtt:connect(get_pid(Name)) of
{ok, Properties} ->
case subscribe_remote_topics(Name, Subscriptions) of
ok ->
{ok, Properties};
{ok, _, _RCs} ->
{ok, Properties};
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_subscribe_failed",
subscriptions => Subscriptions,
reason => Reason
}),
Error
end;
{error, Reason} = Error ->
?SLOG(warning, #{
msg => "client_connect_failed",
reason => Reason,
name => Name
}),
Error
end.
subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) ->
emqtt:subscribe(ref(Ref), FromTopic, QoS);
subscribe_remote_topics(_Ref, undefined) ->
ok.
stop(Pid) ->
emqtt:stop(Pid).
stop(Ref) ->
emqtt:stop(ref(Ref)).
info(Pid) ->
emqtt:info(Pid).
info(Ref) ->
emqtt:info(ref(Ref)).
status(Ref) ->
status(Pid) ->
try
case proplists:get_value(socket, info(Ref)) of
case proplists:get_value(socket, info(Pid)) of
Socket when Socket /= undefined ->
connected;
undefined ->
@ -233,14 +273,14 @@ status(Ref) ->
disconnected
end.
ping(Ref) ->
emqtt:ping(ref(Ref)).
ping(Pid) ->
emqtt:ping(Pid).
send_to_remote(Name, MsgIn) ->
trycall(fun() -> do_send(Name, export_msg(Name, MsgIn)) end).
send_to_remote(Pid, MsgIn, Conf) ->
do_send(Pid, export_msg(MsgIn, Conf)).
do_send(Name, {true, Msg}) ->
case emqtt:publish(get_pid(Name), Msg) of
do_send(Pid, {true, Msg}) ->
case emqtt:publish(Pid, Msg) of
ok ->
ok;
{ok, #{reason_code := RC}} when
@ -266,36 +306,15 @@ do_send(Name, {true, Msg}) ->
do_send(_Name, false) ->
ok.
send_to_remote_async(Name, MsgIn, Callback) ->
trycall(fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end).
send_to_remote_async(Pid, MsgIn, Callback, Conf) ->
do_send_async(Pid, export_msg(MsgIn, Conf), Callback).
do_send_async(Name, {true, Msg}, Callback) ->
Pid = get_pid(Name),
do_send_async(Pid, {true, Msg}, Callback) ->
ok = emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback),
{ok, Pid};
do_send_async(_Name, false, _Callback) ->
do_send_async(_Pid, false, _Callback) ->
ok.
ref(Pid) when is_pid(Pid) ->
Pid;
ref(Term) ->
?REF(Term).
trycall(Fun) ->
try
Fun()
catch
throw:noproc ->
{error, disconnected};
exit:{noproc, _} ->
{error, disconnected}
end.
format_mountpoint(undefined) ->
undefined;
format_mountpoint(Prefix) ->
binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
pre_process_subscriptions(undefined, _, _) ->
undefined;
pre_process_subscriptions(
@ -356,38 +375,15 @@ downgrade_ingress_qos(2) ->
downgrade_ingress_qos(QoS) ->
QoS.
get_pid(Name) ->
case gproc:where(?NAME(Name)) of
Pid when is_pid(Pid) ->
Pid;
undefined ->
throw(noproc)
end.
get_config(Name) ->
try
gproc:lookup_value(?NAME(Name))
catch
error:badarg ->
throw(noproc)
end.
export_msg(Name, Msg) ->
case get_config(Name) of
#{forwards := Forwards = #{}, mountpoint := Mountpoint} ->
{true, export_msg(Mountpoint, Forwards, Msg)};
#{forwards := undefined} ->
?SLOG(error, #{
msg => "forwarding_unavailable",
message => Msg,
reason => "egress is not configured"
}),
false
end.
export_msg(Mountpoint, Forwards, Msg) ->
Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards),
emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars).
export_msg(Msg, #{forwards := Forwards = #{}}) ->
{true, emqx_connector_mqtt_msg:to_remote_msg(Msg, Forwards)};
export_msg(Msg, #{forwards := undefined}) ->
?SLOG(error, #{
msg => "forwarding_unavailable",
message => Msg,
reason => "egress is not configured"
}),
false.
%%