From 81e78516aa907f14853722a0256af785ce0baff6 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 23 May 2023 18:54:31 +0300 Subject: [PATCH] feat(mqttconn): employ ecpool instead of a single worker --- .../test/emqx_bridge_mqtt_SUITE.erl | 10 +- .../src/emqx_connector_mqtt.erl | 156 +++++++----------- .../emqx_connector/src/emqx_connector_sup.erl | 1 - .../src/mqtt/emqx_connector_mqtt_worker.erl | 121 ++++++-------- 4 files changed, 120 insertions(+), 168 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index c00eb6b14..67fb5d019 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -256,11 +256,15 @@ t_mqtt_egress_bridge_ignores_clean_start(_) -> } ), - {ok, _, #{state := #{worker := WorkerPid}}} = - emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)), + ResourceID = emqx_bridge_resource:resource_id(BridgeID), + ClientInfo = ecpool:pick_and_do( + ResourceID, + {emqx_connector_mqtt_worker, info, []}, + no_handover + ), ?assertMatch( #{clean_start := true}, - maps:from_list(emqx_connector_mqtt_worker:info(WorkerPid)) + maps:from_list(ClientInfo) ), %% delete the bridge diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index cc40b1606..30791afe3 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -18,23 +18,13 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). --behaviour(supervisor). -behaviour(emqx_resource). -%% API and callbacks for supervisor --export([ - callback_mode/0, - start_link/0, - init/1, - create_bridge/2, - remove_bridge/1, - bridges/0 -]). - -export([on_message_received/3]). %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, on_query/3, @@ -44,46 +34,7 @@ -export([on_async_result/2]). -%% =================================================================== -%% supervisor APIs -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - SupFlag = #{ - strategy => one_for_one, - intensity => 100, - period => 10 - }, - {ok, {SupFlag, []}}. - -bridge_spec(Name, Options) -> - #{ - id => Name, - start => {emqx_connector_mqtt_worker, start_link, [Name, Options]}, - restart => temporary, - shutdown => 1000 - }. - --spec bridges() -> [{_Name, _Status}]. -bridges() -> - [ - {Name, emqx_connector_mqtt_worker:status(Name)} - || {Name, _Pid, _, _} <- supervisor:which_children(?MODULE) - ]. - -create_bridge(Name, Options) -> - supervisor:start_child(?MODULE, bridge_spec(Name, Options)). - -remove_bridge(Name) -> - case supervisor:terminate_child(?MODULE, Name) of - ok -> - supervisor:delete_child(?MODULE, Name); - {error, not_found} -> - ok; - {error, Error} -> - {error, Error} - end. +-define(HEALTH_CHECK_TIMEOUT, 1000). %% =================================================================== %% When use this bridge as a data source, ?MODULE:on_message_received will be called @@ -101,24 +52,16 @@ on_start(ResourceId, Conf) -> connector => ResourceId, config => emqx_utils:redact(Conf) }), - BasicConf = basic_config(Conf), - BridgeOpts = BasicConf#{ - clientid => clientid(ResourceId, Conf), + BasicOpts = mk_worker_opts(ResourceId, Conf), + BridgeOpts = BasicOpts#{ subscriptions => make_sub_confs(maps:get(ingress, Conf, #{}), Conf, ResourceId), forwards => maps:get(egress, Conf, #{}) }, - case create_bridge(ResourceId, BridgeOpts) of - {ok, Pid, {ConnProps, WorkerConf}} -> - {ok, #{ - name => ResourceId, - worker => Pid, - config => WorkerConf, - props => ConnProps - }}; - {error, {already_started, _Pid}} -> - ok = remove_bridge(ResourceId), - on_start(ResourceId, Conf); - {error, Reason} -> + {ok, ClientOpts, WorkerConf} = emqx_connector_mqtt_worker:init(ResourceId, BridgeOpts), + case emqx_resource_pool:start(ResourceId, emqx_connector_mqtt_worker, ClientOpts) of + ok -> + {ok, #{config => WorkerConf}}; + {error, {start_pool_failed, _, Reason}} -> {error, Reason} end. @@ -127,34 +70,34 @@ on_stop(ResourceId, #{}) -> msg => "stopping_mqtt_connector", connector => ResourceId }), - case remove_bridge(ResourceId) of - ok -> - ok; - {error, not_found} -> - ok; - {error, Reason} -> - ?SLOG(error, #{ - msg => "stop_mqtt_connector_error", - connector => ResourceId, - reason => Reason - }) - end. + emqx_resource_pool:stop(ResourceId). -on_query(ResourceId, {send_message, Msg}, #{worker := Pid, config := Config}) -> +on_query(ResourceId, {send_message, Msg}, #{config := Config}) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), - Result = emqx_connector_mqtt_worker:send_to_remote(Pid, Msg, Config), - handle_send_result(Result). + handle_send_result(with_worker(ResourceId, send_to_remote, [Msg, Config])). -on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{worker := Pid, config := Config}) -> +on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{config := Config}) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), Callback = {fun on_async_result/2, [CallbackIn]}, - case emqx_connector_mqtt_worker:send_to_remote_async(Pid, Msg, Callback, Config) of + Result = with_worker(ResourceId, send_to_remote_async, [Msg, Callback, Config]), + case Result of ok -> ok; - {ok, Pid} -> + {ok, Pid} when is_pid(Pid) -> {ok, Pid}; - {error, _} = Error -> - handle_send_result(Error) + {error, Reason} -> + {error, classify_error(Reason)} + end. + +with_worker(ResourceId, Fun, Args) -> + Worker = ecpool:get_client(ResourceId), + case is_pid(Worker) andalso ecpool_worker:client(Worker) of + {ok, Client} -> + erlang:apply(emqx_connector_mqtt_worker, Fun, [Client | Args]); + {error, Reason} -> + {error, Reason}; + false -> + {error, disconnected} end. on_async_result(Callback, Result) -> @@ -167,9 +110,6 @@ apply_callback_function({F, A}, Result) when is_function(F), is_list(A) -> apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) -> erlang:apply(M, F, A ++ [Result]). -on_get_status(_ResourceId, #{worker := Pid}) -> - emqx_connector_mqtt_worker:status(Pid). - handle_send_result(ok) -> ok; handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) -> @@ -195,6 +135,36 @@ classify_error(shutdown = Reason) -> classify_error(Reason) -> {unrecoverable_error, Reason}. +on_get_status(ResourceId, #{}) -> + Workers = [Worker || {_Name, Worker} <- ecpool:workers(ResourceId)], + try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of + Statuses -> + combine_status(Statuses) + catch + exit:timeout -> + connecting + end. + +get_status(Worker) -> + case ecpool_worker:client(Worker) of + {ok, Client} -> + emqx_connector_mqtt_worker:status(Client); + {error, _} -> + disconnected + end. + +combine_status(Statuses) -> + %% NOTE + %% Natural order of statuses: [connected, connecting, disconnected] + %% * `disconnected` wins over any other status + %% * `connecting` wins over `connected` + case lists:reverse(lists:usort(Statuses)) of + [Status | _] -> + Status; + [] -> + disconnected + end. + make_sub_confs(Subscriptions, _Conf, _) when map_size(Subscriptions) == 0 -> Subscriptions; make_sub_confs(Subscriptions, #{hookpoint := HookPoint}, ResourceId) -> @@ -203,7 +173,8 @@ make_sub_confs(Subscriptions, #{hookpoint := HookPoint}, ResourceId) -> make_sub_confs(_SubRemoteConf, Conf, ResourceId) -> error({no_hookpoint_provided, ResourceId, Conf}). -basic_config( +mk_worker_opts( + ResourceId, #{ server := Server, proto_ver := ProtoVer, @@ -215,7 +186,7 @@ basic_config( ssl := #{enable := EnableSsl} = Ssl } = Conf ) -> - BasicConf = #{ + Options = #{ server => Server, %% 30s connect_timeout => 30, @@ -224,6 +195,7 @@ basic_config( %% A load balancing server (such as haproxy) is often set up before the emqx broker server. %% When the load balancing server enables mqtt connection packet inspection, %% non-standard mqtt connection packets might be filtered out by LB. + clientid => clientid(ResourceId, Conf), bridge_mode => BridgeMode, keepalive => ms_to_s(KeepAlive), clean_start => CleanStart, @@ -233,7 +205,7 @@ basic_config( ssl_opts => maps:to_list(maps:remove(enable, Ssl)) }, maps:merge( - BasicConf, + Options, maps:with([username, password], Conf) ). diff --git a/apps/emqx_connector/src/emqx_connector_sup.erl b/apps/emqx_connector/src/emqx_connector_sup.erl index 13516813f..21c0f2677 100644 --- a/apps/emqx_connector/src/emqx_connector_sup.erl +++ b/apps/emqx_connector/src/emqx_connector_sup.erl @@ -33,7 +33,6 @@ init([]) -> period => 20 }, ChildSpecs = [ - child_spec(emqx_connector_mqtt), child_spec(emqx_connector_jwt_sup) ], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 8e3ca3136..7e33a55ca 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -14,51 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Bridge works in two layers (1) batching layer (2) transport layer -%% The `bridge' batching layer collects local messages in batches and sends over -%% to remote MQTT node/cluster via `connection' transport layer. -%% In case `REMOTE' is also an EMQX node, `connection' is recommended to be -%% the `gen_rpc' based implementation `emqx_bridge_rpc'. Otherwise `connection' -%% has to be `emqx_connector_mqtt_mod'. -%% -%% ``` -%% +------+ +--------+ -%% | EMQX | | REMOTE | -%% | | | | -%% | (bridge) <==(connection)==> | | -%% | | | | -%% | | | | -%% +------+ +--------+ -%% ''' -%% -%% -%% This module implements 2 kinds of APIs with regards to batching and -%% messaging protocol. (1) A `gen_statem' based local batch collector; -%% (2) APIs for incoming remote batches/messages. -%% -%% Batch collector state diagram -%% -%% [idle] --(0) --> [connecting] --(2)--> [connected] -%% | ^ | -%% | | | -%% '--(1)---'--------(3)------' -%% -%% (0): auto or manual start -%% (1): retry timeout -%% (2): successfully connected to remote node/cluster -%% (3): received {disconnected, Reason} OR -%% failed to send to remote node/cluster. -%% -%% NOTE: A bridge worker may subscribe to multiple (including wildcard) -%% local topics, and the underlying `emqx_bridge_connect' may subscribe to -%% multiple remote topics, however, worker/connections are not designed -%% to support automatic load-balancing, i.e. in case it can not keep up -%% with the amount of messages coming in, administrator should split and -%% balance topics between worker/connections manually. -%% -%% NOTES: -%% * Local messages are all normalised to QoS-1 when exporting to remote - -module(emqx_connector_mqtt_worker). -include_lib("emqx/include/logger.hrl"). @@ -66,7 +21,8 @@ %% APIs -export([ - start_link/2, + init/2, + connect/1, stop/1 ]). @@ -99,6 +55,7 @@ max_inflight := pos_integer(), connect_timeout := pos_integer(), retry_interval := timeout(), + keepalive := non_neg_integer(), bridge_mode := boolean(), ssl := boolean(), ssl_opts := proplists:proplist(), @@ -107,6 +64,11 @@ forwards := map() }. +-type client_option() :: + emqtt:option() + | {name, name()} + | {subscriptions, subscriptions() | undefined}. + -type config() :: #{ subscriptions := subscriptions() | undefined, forwards := forwards() | undefined @@ -138,50 +100,64 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -%% @doc Start a bridge worker. Supported configs: -%% mountpoint: The topic mount point for messages sent to remote node/cluster -%% `undefined', `<<>>' or `""' to disable -%% forwards: Local topics to subscribe. -%% -%% Find more connection specific configs in the callback modules -%% of emqx_bridge_connect behaviour. --spec start_link(name(), options()) -> - {ok, pid(), {emqtt:properties(), config()}} | {error, _Reason}. -start_link(Name, BridgeOpts) -> +-spec init(name(), options()) -> + {ok, [client_option()], config()}. +init(Name, BridgeOpts) -> + Config = init_config(Name, BridgeOpts), + ClientOpts0 = mk_client_options(Config, BridgeOpts), + ClientOpts = ClientOpts0#{ + name => Name, + subscriptions => maps:get(subscriptions, Config) + }, + {ok, maps:to_list(ClientOpts), Config}. + +%% @doc Start a bridge worker. +-spec connect([client_option() | {ecpool_worker_id, pos_integer()}]) -> + {ok, pid()} | {error, _Reason}. +connect(ClientOpts0) -> ?SLOG(debug, #{ msg => "client_starting", - name => Name, - options => BridgeOpts + options => emqx_utils:redact(ClientOpts0) }), - Config = init_config(Name, BridgeOpts), - Options = mk_client_options(Config, BridgeOpts), - case emqtt:start_link(Options) of + {value, {_, Name}, ClientOpts1} = lists:keytake(name, 1, ClientOpts0), + {value, {_, WorkerId}, ClientOpts} = lists:keytake(ecpool_worker_id, 1, ClientOpts1), + case emqtt:start_link(mk_emqtt_opts(WorkerId, ClientOpts)) of {ok, Pid} -> - connect(Pid, Name, Config); + connect(Pid, Name, WorkerId, ClientOpts); {error, Reason} = Error -> ?SLOG(error, #{ msg => "client_start_failed", - config => emqx_utils:redact(BridgeOpts), + config => emqx_utils:redact(ClientOpts), reason => Reason }), Error end. -connect(Pid, Name, Config = #{subscriptions := Subscriptions}) -> +mk_emqtt_opts(WorkerId, ClientOpts) -> + {_, ClientId} = lists:keyfind(clientid, 1, ClientOpts), + lists:keystore(clientid, 1, ClientOpts, {clientid, mk_clientid(WorkerId, ClientId)}). + +mk_clientid(WorkerId, ClientId) -> + iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + +connect(Pid, Name, WorkerId, ClientOpts) -> case emqtt:connect(Pid) of - {ok, Props} -> - case subscribe_remote_topics(Pid, Subscriptions) of - ok -> - {ok, Pid, {Props, Config}}; + {ok, _Props} -> + % NOTE + % Subscribe to remote topics only when the first worker is started. + Subscriptions = proplists:get_value(subscriptions, ClientOpts), + case WorkerId =:= 1 andalso subscribe_remote_topics(Pid, Subscriptions) of + false -> + {ok, Pid}; {ok, _, _RCs} -> - {ok, Pid, {Props, Config}}; + {ok, Pid}; {error, Reason} = Error -> ?SLOG(error, #{ msg => "client_subscribe_failed", subscriptions => Subscriptions, reason => Reason }), - _ = emqtt:stop(Pid), + _ = catch emqtt:stop(Pid), Error end; {error, Reason} = Error -> @@ -190,14 +166,14 @@ connect(Pid, Name, Config = #{subscriptions := Subscriptions}) -> reason => Reason, name => Name }), - _ = emqtt:stop(Pid), + _ = catch emqtt:stop(Pid), Error end. subscribe_remote_topics(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) -> emqtt:subscribe(Pid, RemoteTopic, QoS); subscribe_remote_topics(_Ref, undefined) -> - ok. + false. init_config(Name, Opts) -> Subscriptions = maps:get(subscriptions, Opts, undefined), @@ -230,6 +206,7 @@ mk_client_options(Config, BridgeOpts) -> max_inflight, connect_timeout, retry_interval, + keepalive, bridge_mode, ssl, ssl_opts