feat(mqttconn): employ ecpool instead of a single worker

This commit is contained in:
Andrew Mayorov 2023-05-23 18:54:31 +03:00
parent 6967f621d8
commit 81e78516aa
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 120 additions and 168 deletions

View File

@ -256,11 +256,15 @@ t_mqtt_egress_bridge_ignores_clean_start(_) ->
}
),
{ok, _, #{state := #{worker := WorkerPid}}} =
emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)),
ResourceID = emqx_bridge_resource:resource_id(BridgeID),
ClientInfo = ecpool:pick_and_do(
ResourceID,
{emqx_connector_mqtt_worker, info, []},
no_handover
),
?assertMatch(
#{clean_start := true},
maps:from_list(emqx_connector_mqtt_worker:info(WorkerPid))
maps:from_list(ClientInfo)
),
%% delete the bridge

View File

@ -18,23 +18,13 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
-behaviour(supervisor).
-behaviour(emqx_resource).
%% API and callbacks for supervisor
-export([
callback_mode/0,
start_link/0,
init/1,
create_bridge/2,
remove_bridge/1,
bridges/0
]).
-export([on_message_received/3]).
%% callbacks of behaviour emqx_resource
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
@ -44,46 +34,7 @@
-export([on_async_result/2]).
%% ===================================================================
%% supervisor APIs
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
SupFlag = #{
strategy => one_for_one,
intensity => 100,
period => 10
},
{ok, {SupFlag, []}}.
bridge_spec(Name, Options) ->
#{
id => Name,
start => {emqx_connector_mqtt_worker, start_link, [Name, Options]},
restart => temporary,
shutdown => 1000
}.
-spec bridges() -> [{_Name, _Status}].
bridges() ->
[
{Name, emqx_connector_mqtt_worker:status(Name)}
|| {Name, _Pid, _, _} <- supervisor:which_children(?MODULE)
].
create_bridge(Name, Options) ->
supervisor:start_child(?MODULE, bridge_spec(Name, Options)).
remove_bridge(Name) ->
case supervisor:terminate_child(?MODULE, Name) of
ok ->
supervisor:delete_child(?MODULE, Name);
{error, not_found} ->
ok;
{error, Error} ->
{error, Error}
end.
-define(HEALTH_CHECK_TIMEOUT, 1000).
%% ===================================================================
%% When use this bridge as a data source, ?MODULE:on_message_received will be called
@ -101,24 +52,16 @@ on_start(ResourceId, Conf) ->
connector => ResourceId,
config => emqx_utils:redact(Conf)
}),
BasicConf = basic_config(Conf),
BridgeOpts = BasicConf#{
clientid => clientid(ResourceId, Conf),
BasicOpts = mk_worker_opts(ResourceId, Conf),
BridgeOpts = BasicOpts#{
subscriptions => make_sub_confs(maps:get(ingress, Conf, #{}), Conf, ResourceId),
forwards => maps:get(egress, Conf, #{})
},
case create_bridge(ResourceId, BridgeOpts) of
{ok, Pid, {ConnProps, WorkerConf}} ->
{ok, #{
name => ResourceId,
worker => Pid,
config => WorkerConf,
props => ConnProps
}};
{error, {already_started, _Pid}} ->
ok = remove_bridge(ResourceId),
on_start(ResourceId, Conf);
{error, Reason} ->
{ok, ClientOpts, WorkerConf} = emqx_connector_mqtt_worker:init(ResourceId, BridgeOpts),
case emqx_resource_pool:start(ResourceId, emqx_connector_mqtt_worker, ClientOpts) of
ok ->
{ok, #{config => WorkerConf}};
{error, {start_pool_failed, _, Reason}} ->
{error, Reason}
end.
@ -127,34 +70,34 @@ on_stop(ResourceId, #{}) ->
msg => "stopping_mqtt_connector",
connector => ResourceId
}),
case remove_bridge(ResourceId) of
ok ->
ok;
{error, not_found} ->
ok;
{error, Reason} ->
?SLOG(error, #{
msg => "stop_mqtt_connector_error",
connector => ResourceId,
reason => Reason
})
end.
emqx_resource_pool:stop(ResourceId).
on_query(ResourceId, {send_message, Msg}, #{worker := Pid, config := Config}) ->
on_query(ResourceId, {send_message, Msg}, #{config := Config}) ->
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
Result = emqx_connector_mqtt_worker:send_to_remote(Pid, Msg, Config),
handle_send_result(Result).
handle_send_result(with_worker(ResourceId, send_to_remote, [Msg, Config])).
on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{worker := Pid, config := Config}) ->
on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{config := Config}) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}),
Callback = {fun on_async_result/2, [CallbackIn]},
case emqx_connector_mqtt_worker:send_to_remote_async(Pid, Msg, Callback, Config) of
Result = with_worker(ResourceId, send_to_remote_async, [Msg, Callback, Config]),
case Result of
ok ->
ok;
{ok, Pid} ->
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{error, _} = Error ->
handle_send_result(Error)
{error, Reason} ->
{error, classify_error(Reason)}
end.
with_worker(ResourceId, Fun, Args) ->
Worker = ecpool:get_client(ResourceId),
case is_pid(Worker) andalso ecpool_worker:client(Worker) of
{ok, Client} ->
erlang:apply(emqx_connector_mqtt_worker, Fun, [Client | Args]);
{error, Reason} ->
{error, Reason};
false ->
{error, disconnected}
end.
on_async_result(Callback, Result) ->
@ -167,9 +110,6 @@ apply_callback_function({F, A}, Result) when is_function(F), is_list(A) ->
apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) ->
erlang:apply(M, F, A ++ [Result]).
on_get_status(_ResourceId, #{worker := Pid}) ->
emqx_connector_mqtt_worker:status(Pid).
handle_send_result(ok) ->
ok;
handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) ->
@ -195,6 +135,36 @@ classify_error(shutdown = Reason) ->
classify_error(Reason) ->
{unrecoverable_error, Reason}.
on_get_status(ResourceId, #{}) ->
Workers = [Worker || {_Name, Worker} <- ecpool:workers(ResourceId)],
try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of
Statuses ->
combine_status(Statuses)
catch
exit:timeout ->
connecting
end.
get_status(Worker) ->
case ecpool_worker:client(Worker) of
{ok, Client} ->
emqx_connector_mqtt_worker:status(Client);
{error, _} ->
disconnected
end.
combine_status(Statuses) ->
%% NOTE
%% Natural order of statuses: [connected, connecting, disconnected]
%% * `disconnected` wins over any other status
%% * `connecting` wins over `connected`
case lists:reverse(lists:usort(Statuses)) of
[Status | _] ->
Status;
[] ->
disconnected
end.
make_sub_confs(Subscriptions, _Conf, _) when map_size(Subscriptions) == 0 ->
Subscriptions;
make_sub_confs(Subscriptions, #{hookpoint := HookPoint}, ResourceId) ->
@ -203,7 +173,8 @@ make_sub_confs(Subscriptions, #{hookpoint := HookPoint}, ResourceId) ->
make_sub_confs(_SubRemoteConf, Conf, ResourceId) ->
error({no_hookpoint_provided, ResourceId, Conf}).
basic_config(
mk_worker_opts(
ResourceId,
#{
server := Server,
proto_ver := ProtoVer,
@ -215,7 +186,7 @@ basic_config(
ssl := #{enable := EnableSsl} = Ssl
} = Conf
) ->
BasicConf = #{
Options = #{
server => Server,
%% 30s
connect_timeout => 30,
@ -224,6 +195,7 @@ basic_config(
%% A load balancing server (such as haproxy) is often set up before the emqx broker server.
%% When the load balancing server enables mqtt connection packet inspection,
%% non-standard mqtt connection packets might be filtered out by LB.
clientid => clientid(ResourceId, Conf),
bridge_mode => BridgeMode,
keepalive => ms_to_s(KeepAlive),
clean_start => CleanStart,
@ -233,7 +205,7 @@ basic_config(
ssl_opts => maps:to_list(maps:remove(enable, Ssl))
},
maps:merge(
BasicConf,
Options,
maps:with([username, password], Conf)
).

View File

@ -33,7 +33,6 @@ init([]) ->
period => 20
},
ChildSpecs = [
child_spec(emqx_connector_mqtt),
child_spec(emqx_connector_jwt_sup)
],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -14,51 +14,6 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Bridge works in two layers (1) batching layer (2) transport layer
%% The `bridge' batching layer collects local messages in batches and sends over
%% to remote MQTT node/cluster via `connection' transport layer.
%% In case `REMOTE' is also an EMQX node, `connection' is recommended to be
%% the `gen_rpc' based implementation `emqx_bridge_rpc'. Otherwise `connection'
%% has to be `emqx_connector_mqtt_mod'.
%%
%% ```
%% +------+ +--------+
%% | EMQX | | REMOTE |
%% | | | |
%% | (bridge) <==(connection)==> | |
%% | | | |
%% | | | |
%% +------+ +--------+
%% '''
%%
%%
%% This module implements 2 kinds of APIs with regards to batching and
%% messaging protocol. (1) A `gen_statem' based local batch collector;
%% (2) APIs for incoming remote batches/messages.
%%
%% Batch collector state diagram
%%
%% [idle] --(0) --> [connecting] --(2)--> [connected]
%% | ^ |
%% | | |
%% '--(1)---'--------(3)------'
%%
%% (0): auto or manual start
%% (1): retry timeout
%% (2): successfully connected to remote node/cluster
%% (3): received {disconnected, Reason} OR
%% failed to send to remote node/cluster.
%%
%% NOTE: A bridge worker may subscribe to multiple (including wildcard)
%% local topics, and the underlying `emqx_bridge_connect' may subscribe to
%% multiple remote topics, however, worker/connections are not designed
%% to support automatic load-balancing, i.e. in case it can not keep up
%% with the amount of messages coming in, administrator should split and
%% balance topics between worker/connections manually.
%%
%% NOTES:
%% * Local messages are all normalised to QoS-1 when exporting to remote
-module(emqx_connector_mqtt_worker).
-include_lib("emqx/include/logger.hrl").
@ -66,7 +21,8 @@
%% APIs
-export([
start_link/2,
init/2,
connect/1,
stop/1
]).
@ -99,6 +55,7 @@
max_inflight := pos_integer(),
connect_timeout := pos_integer(),
retry_interval := timeout(),
keepalive := non_neg_integer(),
bridge_mode := boolean(),
ssl := boolean(),
ssl_opts := proplists:proplist(),
@ -107,6 +64,11 @@
forwards := map()
}.
-type client_option() ::
emqtt:option()
| {name, name()}
| {subscriptions, subscriptions() | undefined}.
-type config() :: #{
subscriptions := subscriptions() | undefined,
forwards := forwards() | undefined
@ -138,50 +100,64 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
%% @doc Start a bridge worker. Supported configs:
%% mountpoint: The topic mount point for messages sent to remote node/cluster
%% `undefined', `<<>>' or `""' to disable
%% forwards: Local topics to subscribe.
%%
%% Find more connection specific configs in the callback modules
%% of emqx_bridge_connect behaviour.
-spec start_link(name(), options()) ->
{ok, pid(), {emqtt:properties(), config()}} | {error, _Reason}.
start_link(Name, BridgeOpts) ->
-spec init(name(), options()) ->
{ok, [client_option()], config()}.
init(Name, BridgeOpts) ->
Config = init_config(Name, BridgeOpts),
ClientOpts0 = mk_client_options(Config, BridgeOpts),
ClientOpts = ClientOpts0#{
name => Name,
subscriptions => maps:get(subscriptions, Config)
},
{ok, maps:to_list(ClientOpts), Config}.
%% @doc Start a bridge worker.
-spec connect([client_option() | {ecpool_worker_id, pos_integer()}]) ->
{ok, pid()} | {error, _Reason}.
connect(ClientOpts0) ->
?SLOG(debug, #{
msg => "client_starting",
name => Name,
options => BridgeOpts
options => emqx_utils:redact(ClientOpts0)
}),
Config = init_config(Name, BridgeOpts),
Options = mk_client_options(Config, BridgeOpts),
case emqtt:start_link(Options) of
{value, {_, Name}, ClientOpts1} = lists:keytake(name, 1, ClientOpts0),
{value, {_, WorkerId}, ClientOpts} = lists:keytake(ecpool_worker_id, 1, ClientOpts1),
case emqtt:start_link(mk_emqtt_opts(WorkerId, ClientOpts)) of
{ok, Pid} ->
connect(Pid, Name, Config);
connect(Pid, Name, WorkerId, ClientOpts);
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_start_failed",
config => emqx_utils:redact(BridgeOpts),
config => emqx_utils:redact(ClientOpts),
reason => Reason
}),
Error
end.
connect(Pid, Name, Config = #{subscriptions := Subscriptions}) ->
mk_emqtt_opts(WorkerId, ClientOpts) ->
{_, ClientId} = lists:keyfind(clientid, 1, ClientOpts),
lists:keystore(clientid, 1, ClientOpts, {clientid, mk_clientid(WorkerId, ClientId)}).
mk_clientid(WorkerId, ClientId) ->
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
connect(Pid, Name, WorkerId, ClientOpts) ->
case emqtt:connect(Pid) of
{ok, Props} ->
case subscribe_remote_topics(Pid, Subscriptions) of
ok ->
{ok, Pid, {Props, Config}};
{ok, _Props} ->
% NOTE
% Subscribe to remote topics only when the first worker is started.
Subscriptions = proplists:get_value(subscriptions, ClientOpts),
case WorkerId =:= 1 andalso subscribe_remote_topics(Pid, Subscriptions) of
false ->
{ok, Pid};
{ok, _, _RCs} ->
{ok, Pid, {Props, Config}};
{ok, Pid};
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_subscribe_failed",
subscriptions => Subscriptions,
reason => Reason
}),
_ = emqtt:stop(Pid),
_ = catch emqtt:stop(Pid),
Error
end;
{error, Reason} = Error ->
@ -190,14 +166,14 @@ connect(Pid, Name, Config = #{subscriptions := Subscriptions}) ->
reason => Reason,
name => Name
}),
_ = emqtt:stop(Pid),
_ = catch emqtt:stop(Pid),
Error
end.
subscribe_remote_topics(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) ->
emqtt:subscribe(Pid, RemoteTopic, QoS);
subscribe_remote_topics(_Ref, undefined) ->
ok.
false.
init_config(Name, Opts) ->
Subscriptions = maps:get(subscriptions, Opts, undefined),
@ -230,6 +206,7 @@ mk_client_options(Config, BridgeOpts) ->
max_inflight,
connect_timeout,
retry_interval,
keepalive,
bridge_mode,
ssl,
ssl_opts