From a5fc26736deed3675c6614c56f1f44e891b240fc Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 29 May 2023 19:43:20 +0300 Subject: [PATCH] refactor(mqttconn): split ingress/egress into 2 separate pools Each with a more refined set of responsibilities, at the cost of slight code duplication. Also provide two different config fields for each pool size. --- .../test/emqx_bridge_api_SUITE.erl | 9 +- .../test/emqx_bridge_mqtt_SUITE.erl | 10 +- .../src/emqx_connector_mqtt.erl | 244 ++++++--- .../src/mqtt/emqx_connector_mqtt_egress.erl | 162 ++++++ .../src/mqtt/emqx_connector_mqtt_ingress.erl | 272 ++++++++++ .../src/mqtt/emqx_connector_mqtt_msg.erl | 95 ++++ .../src/mqtt/emqx_connector_mqtt_schema.erl | 21 +- .../src/mqtt/emqx_connector_mqtt_worker.erl | 490 ------------------ apps/emqx_resource/src/emqx_resource.erl | 3 +- rel/i18n/emqx_connector_mqtt_schema.hocon | 18 + 10 files changed, 761 insertions(+), 563 deletions(-) create mode 100644 apps/emqx_connector/src/mqtt/emqx_connector_mqtt_egress.erl create mode 100644 apps/emqx_connector/src/mqtt/emqx_connector_mqtt_ingress.erl create mode 100644 apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl delete mode 100644 apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 1ac6750a4..ecab986e8 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -47,7 +47,14 @@ <<"server">> => SERVER, <<"username">> => <<"user1">>, <<"password">> => <<"">>, - <<"proto_ver">> => <<"v5">> + <<"proto_ver">> => <<"v5">>, + <<"egress">> => #{ + <<"remote">> => #{ + <<"topic">> => <<"emqx/${topic}">>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => false + } + } }). -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)). diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index aecb04e03..6c36e08e7 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -22,9 +22,7 @@ -include("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include("emqx_dashboard/include/emqx_dashboard.hrl"). %% output functions -export([inspect/3]). @@ -259,8 +257,8 @@ t_mqtt_conn_bridge_ingress_shared_subscription(_) -> ?SERVER_CONF(<<>>)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => BridgeName, - <<"pool_size">> => PoolSize, <<"ingress">> => #{ + <<"pool_size">> => PoolSize, <<"remote">> => #{ <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, <<"qos">> => 1 @@ -305,9 +303,11 @@ t_mqtt_egress_bridge_ignores_clean_start(_) -> ), ResourceID = emqx_bridge_resource:resource_id(BridgeID), + {ok, _Group, #{state := #{egress_pool_name := EgressPoolName}}} = + emqx_resource_manager:lookup_cached(ResourceID), ClientInfo = ecpool:pick_and_do( - ResourceID, - {emqx_connector_mqtt_worker, info, []}, + EgressPoolName, + {emqx_connector_mqtt_egress, info, []}, no_handover ), ?assertMatch( diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 0658c28a7..a75f4db39 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -52,34 +52,134 @@ on_start(ResourceId, Conf) -> connector => ResourceId, config => emqx_utils:redact(Conf) }), - BasicOpts = mk_worker_opts(ResourceId, Conf), - BridgeOpts = BasicOpts#{ - ingress => mk_ingress_config(maps:get(ingress, Conf, #{}), Conf, ResourceId), - egress => maps:get(egress, Conf, #{}) - }, - {ok, ClientOpts, WorkerConf} = emqx_connector_mqtt_worker:init(ResourceId, BridgeOpts), - case emqx_resource_pool:start(ResourceId, emqx_connector_mqtt_worker, ClientOpts) of + case start_ingress(ResourceId, Conf) of + {ok, Result1} -> + case start_egress(ResourceId, Conf) of + {ok, Result2} -> + {ok, maps:merge(Result1, Result2)}; + {error, Reason} -> + _ = stop_ingress(Result1), + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +start_ingress(ResourceId, Conf) -> + ClientOpts = mk_client_opts(ResourceId, "ingress", Conf), + case mk_ingress_config(ResourceId, Conf) of + Ingress = #{} -> + start_ingress(ResourceId, Ingress, ClientOpts); + undefined -> + {ok, #{}} + end. + +start_ingress(ResourceId, Ingress, ClientOpts) -> + PoolName = <>, + PoolSize = choose_ingress_pool_size(Ingress), + Options = [ + {name, PoolName}, + {pool_size, PoolSize}, + {ingress, Ingress}, + {client_opts, ClientOpts} + ], + case emqx_resource_pool:start(PoolName, emqx_connector_mqtt_ingress, Options) of ok -> - {ok, #{config => WorkerConf}}; + {ok, #{ingress_pool_name => PoolName}}; {error, {start_pool_failed, _, Reason}} -> {error, Reason} end. -on_stop(ResourceId, #{}) -> +choose_ingress_pool_size(#{remote := #{topic := RemoteTopic}, pool_size := PoolSize}) -> + case emqx_topic:parse(RemoteTopic) of + {_Filter, #{share := _Name}} -> + % NOTE: this is shared subscription, many workers may subscribe + PoolSize; + {_Filter, #{}} -> + % NOTE: this is regular subscription, only one worker should subscribe + ?SLOG(warning, #{ + msg => "ingress_pool_size_ignored", + reason => + "Remote topic filter is not a shared subscription, " + "ingress pool will start with a single worker", + config_pool_size => PoolSize, + pool_size => 1 + }), + 1 + end. + +start_egress(ResourceId, Conf) -> + % NOTE + % We are ignoring the user configuration here because there's currently no reliable way + % to ensure proper session recovery according to the MQTT spec. + ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, "egress", Conf)), + case mk_egress_config(Conf) of + Egress = #{} -> + start_egress(ResourceId, Egress, ClientOpts); + undefined -> + {ok, #{}} + end. + +start_egress(ResourceId, Egress, ClientOpts) -> + PoolName = <>, + PoolSize = maps:get(pool_size, Egress), + Options = [ + {name, PoolName}, + {pool_size, PoolSize}, + {client_opts, ClientOpts} + ], + case emqx_resource_pool:start(PoolName, emqx_connector_mqtt_egress, Options) of + ok -> + {ok, #{ + egress_pool_name => PoolName, + egress_config => emqx_connector_mqtt_egress:config(Egress) + }}; + {error, {start_pool_failed, _, Reason}} -> + {error, Reason} + end. + +on_stop(ResourceId, State) -> ?SLOG(info, #{ msg => "stopping_mqtt_connector", connector => ResourceId }), - emqx_resource_pool:stop(ResourceId). + ok = stop_ingress(State), + ok = stop_egress(State). -on_query(ResourceId, {send_message, Msg}, #{config := Config}) -> +stop_ingress(#{ingress_pool_name := PoolName}) -> + emqx_resource_pool:stop(PoolName); +stop_ingress(#{}) -> + ok. + +stop_egress(#{egress_pool_name := PoolName}) -> + emqx_resource_pool:stop(PoolName); +stop_egress(#{}) -> + ok. + +on_query( + ResourceId, + {send_message, Msg}, + #{egress_pool_name := PoolName, egress_config := Config} +) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), - handle_send_result(with_worker(ResourceId, send_to_remote, [Msg, Config])). + handle_send_result(with_worker(PoolName, send, [Msg, Config])); +on_query(ResourceId, {send_message, Msg}, #{}) -> + ?SLOG(error, #{ + msg => "forwarding_unavailable", + connector => ResourceId, + message => Msg, + reason => "Egress is not configured" + }). -on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{config := Config}) -> +on_query_async( + ResourceId, + {send_message, Msg}, + CallbackIn, + #{egress_pool_name := PoolName, egress_config := Config} +) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), Callback = {fun on_async_result/2, [CallbackIn]}, - Result = with_worker(ResourceId, send_to_remote_async, [Msg, Callback, Config]), + Result = with_worker(PoolName, send_async, [Msg, Callback, Config]), case Result of ok -> ok; @@ -87,13 +187,20 @@ on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{config := Config}) {ok, Pid}; {error, Reason} -> {error, classify_error(Reason)} - end. + end; +on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) -> + ?SLOG(error, #{ + msg => "forwarding_unavailable", + connector => ResourceId, + message => Msg, + reason => "Egress is not configured" + }). with_worker(ResourceId, Fun, Args) -> Worker = ecpool:get_client(ResourceId), case is_pid(Worker) andalso ecpool_worker:client(Worker) of {ok, Client} -> - erlang:apply(emqx_connector_mqtt_worker, Fun, [Client | Args]); + erlang:apply(emqx_connector_mqtt_egress, Fun, [Client | Args]); {error, Reason} -> {error, Reason}; false -> @@ -135,8 +242,9 @@ classify_error(shutdown = Reason) -> classify_error(Reason) -> {unrecoverable_error, Reason}. -on_get_status(ResourceId, #{}) -> - Workers = [Worker || {_Name, Worker} <- ecpool:workers(ResourceId)], +on_get_status(_ResourceId, State) -> + Pools = maps:to_list(maps:with([ingress_pool_name, egress_pool_name], State)), + Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)], try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of Statuses -> combine_status(Statuses) @@ -145,10 +253,12 @@ on_get_status(ResourceId, #{}) -> connecting end. -get_status(Worker) -> +get_status({Pool, Worker}) -> case ecpool_worker:client(Worker) of - {ok, Client} -> - emqx_connector_mqtt_worker:status(Client); + {ok, Client} when Pool == ingress_pool_name -> + emqx_connector_mqtt_ingress:status(Client); + {ok, Client} when Pool == egress_pool_name -> + emqx_connector_mqtt_egress:status(Client); {error, _} -> disconnected end. @@ -165,56 +275,68 @@ combine_status(Statuses) -> disconnected end. -mk_ingress_config(Ingress, _Conf, _) when map_size(Ingress) == 0 -> - Ingress; -mk_ingress_config(Ingress, #{hookpoint := HookPoint}, ResourceId) -> - MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]}, - Ingress#{on_message_received => MFA}; -mk_ingress_config(_Ingress, Conf, ResourceId) -> - error({no_hookpoint_provided, ResourceId, Conf}). - -mk_worker_opts( +mk_ingress_config( ResourceId, #{ + ingress := Ingress = #{remote := _}, server := Server, - pool_size := PoolSize, - proto_ver := ProtoVer, - bridge_mode := BridgeMode, - clean_start := CleanStart, - keepalive := KeepAlive, - retry_interval := RetryIntv, - max_inflight := MaxInflight, - ssl := #{enable := EnableSsl} = Ssl - } = Conf + hookpoint := HookPoint + } ) -> - Options = #{ + Ingress#{ server => Server, - pool_size => PoolSize, - %% 30s + on_message_received => {?MODULE, on_message_received, [HookPoint, ResourceId]} + }; +mk_ingress_config(ResourceId, #{ingress := #{remote := _}} = Conf) -> + error({no_hookpoint_provided, ResourceId, Conf}); +mk_ingress_config(_ResourceId, #{}) -> + undefined. + +mk_egress_config(#{egress := Egress = #{remote := _}}) -> + Egress; +mk_egress_config(#{}) -> + undefined. + +mk_client_opts( + ResourceId, + ClientScope, + Config = #{ + server := Server, + keepalive := KeepAlive, + ssl := #{enable := EnableSsl} = Ssl + } +) -> + HostPort = emqx_connector_mqtt_schema:parse_server(Server), + Options = maps:with( + [ + proto_ver, + username, + password, + clean_start, + retry_interval, + max_inflight, + % Opening a connection in bridge mode will form a non-standard mqtt connection message. + % A load balancing server (such as haproxy) is often set up before the emqx broker server. + % When the load balancing server enables mqtt connection packet inspection, + % non-standard mqtt connection packets might be filtered out by LB. + bridge_mode + ], + Config + ), + Options#{ + hosts => [HostPort], + clientid => clientid(ResourceId, ClientScope, Config), connect_timeout => 30, - proto_ver => ProtoVer, - %% Opening a connection in bridge mode will form a non-standard mqtt connection message. - %% A load balancing server (such as haproxy) is often set up before the emqx broker server. - %% When the load balancing server enables mqtt connection packet inspection, - %% non-standard mqtt connection packets might be filtered out by LB. - clientid => clientid(ResourceId, Conf), - bridge_mode => BridgeMode, keepalive => ms_to_s(KeepAlive), - clean_start => CleanStart, - retry_interval => RetryIntv, - max_inflight => MaxInflight, + force_ping => true, ssl => EnableSsl, ssl_opts => maps:to_list(maps:remove(enable, Ssl)) - }, - maps:merge( - Options, - maps:with([username, password], Conf) - ). + }. ms_to_s(Ms) -> erlang:ceil(Ms / 1000). -clientid(Id, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) -> - iolist_to_binary([Prefix, ":", Id, ":", atom_to_list(node())]); -clientid(Id, _Conf) -> - iolist_to_binary([Id, ":", atom_to_list(node())]). +clientid(Id, ClientScope, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) -> + iolist_to_binary([Prefix, ":", Id, ":", ClientScope, ":", atom_to_list(node())]); +clientid(Id, ClientScope, _Conf) -> + iolist_to_binary([Id, ":", ClientScope, ":", atom_to_list(node())]). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_egress.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_egress.erl new file mode 100644 index 000000000..0e413cbc9 --- /dev/null +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_egress.erl @@ -0,0 +1,162 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_mqtt_egress). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). + +-behaviour(ecpool_worker). + +%% ecpool +-export([connect/1]). + +-export([ + config/1, + send/3, + send_async/4 +]). + +%% management APIs +-export([ + status/1, + info/1 +]). + +-type name() :: term(). +-type message() :: emqx_types:message() | map(). +-type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}. +-type remote_message() :: #mqtt_msg{}. + +-type option() :: + {name, name()} + %% see `emqtt:option()` + | {client_opts, map()}. + +-type egress() :: #{ + local => #{ + topic => emqx_topic:topic() + }, + remote := emqx_connector_mqtt_msg:msgvars() +}. + +%% @doc Start an ingress bridge worker. +-spec connect([option() | {ecpool_worker_id, pos_integer()}]) -> + {ok, pid()} | {error, _Reason}. +connect(Options) -> + ?SLOG(debug, #{ + msg => "egress_client_starting", + options => emqx_utils:redact(Options) + }), + Name = proplists:get_value(name, Options), + WorkerId = proplists:get_value(ecpool_worker_id, Options), + ClientOpts = proplists:get_value(client_opts, Options), + case emqtt:start_link(mk_client_opts(WorkerId, ClientOpts)) of + {ok, Pid} -> + connect(Pid, Name); + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "egress_client_start_failed", + config => emqx_utils:redact(ClientOpts), + reason => Reason + }), + Error + end. + +mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) -> + ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}. + +mk_clientid(WorkerId, ClientId) -> + iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + +connect(Pid, Name) -> + case emqtt:connect(Pid) of + {ok, _Props} -> + {ok, Pid}; + {error, Reason} = Error -> + ?SLOG(warning, #{ + msg => "egress_client_connect_failed", + reason => Reason, + name => Name + }), + _ = catch emqtt:stop(Pid), + Error + end. + +%% + +-spec config(map()) -> + egress(). +config(#{remote := RC = #{}} = Conf) -> + Conf#{remote => emqx_connector_mqtt_msg:parse(RC)}. + +-spec send(pid(), message(), egress()) -> + ok. +send(Pid, MsgIn, Egress) -> + emqtt:publish(Pid, export_msg(MsgIn, Egress)). + +-spec send_async(pid(), message(), callback(), egress()) -> + ok | {ok, pid()}. +send_async(Pid, MsgIn, Callback, Egress) -> + ok = emqtt:publish_async(Pid, export_msg(MsgIn, Egress), _Timeout = infinity, Callback), + {ok, Pid}. + +export_msg(Msg, #{remote := Remote}) -> + to_remote_msg(Msg, Remote). + +-spec to_remote_msg(message(), emqx_connector_mqtt_msg:msgvars()) -> + remote_message(). +to_remote_msg(#message{flags = Flags} = Msg, Vars) -> + {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), + to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars); +to_remote_msg(Msg = #{}, Remote) -> + #{ + topic := Topic, + payload := Payload, + qos := QoS, + retain := Retain + } = emqx_connector_mqtt_msg:render(Msg, Remote), + PubProps = maps:get(pub_props, Msg, #{}), + #mqtt_msg{ + qos = QoS, + retain = Retain, + topic = Topic, + props = emqx_utils:pub_props_to_packet(PubProps), + payload = Payload + }. + +%% + +-spec info(pid()) -> + [{atom(), term()}]. +info(Pid) -> + emqtt:info(Pid). + +-spec status(pid()) -> + emqx_resource:resource_status(). +status(Pid) -> + try + case proplists:get_value(socket, info(Pid)) of + Socket when Socket /= undefined -> + connected; + undefined -> + connecting + end + catch + exit:{noproc, _} -> + disconnected + end. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_ingress.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_ingress.erl new file mode 100644 index 000000000..c11895c49 --- /dev/null +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_ingress.erl @@ -0,0 +1,272 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_mqtt_ingress). + +-include_lib("emqx/include/logger.hrl"). + +-behaviour(ecpool_worker). + +%% ecpool +-export([connect/1]). + +%% management APIs +-export([ + status/1, + info/1 +]). + +-export([handle_publish/4]). +-export([handle_disconnect/1]). + +-type name() :: term(). + +-type option() :: + {name, name()} + | {ingress, map()} + %% see `emqtt:option()` + | {client_opts, map()}. + +-type ingress() :: #{ + server := string(), + remote := #{ + topic := emqx_topic:topic(), + qos => emqx_types:qos() + }, + local := emqx_connector_mqtt_msg:msgvars(), + on_message_received := {module(), atom(), [term()]} +}. + +%% @doc Start an ingress bridge worker. +-spec connect([option() | {ecpool_worker_id, pos_integer()}]) -> + {ok, pid()} | {error, _Reason}. +connect(Options) -> + ?SLOG(debug, #{ + msg => "ingress_client_starting", + options => emqx_utils:redact(Options) + }), + Name = proplists:get_value(name, Options), + WorkerId = proplists:get_value(ecpool_worker_id, Options), + Ingress = config(proplists:get_value(ingress, Options), Name), + ClientOpts = proplists:get_value(client_opts, Options), + case emqtt:start_link(mk_client_opts(WorkerId, Ingress, ClientOpts)) of + {ok, Pid} -> + connect(Pid, Name, Ingress); + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "client_start_failed", + config => emqx_utils:redact(ClientOpts), + reason => Reason + }), + Error + end. + +mk_client_opts(WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) -> + ClientOpts#{ + clientid := mk_clientid(WorkerId, ClientId), + msg_handler => mk_client_event_handler(Ingress) + }. + +mk_clientid(WorkerId, ClientId) -> + iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + +mk_client_event_handler(Ingress = #{}) -> + IngressVars = maps:with([server], Ingress), + OnMessage = maps:get(on_message_received, Ingress, undefined), + LocalPublish = + case Ingress of + #{local := Local = #{topic := _}} -> + Local; + #{} -> + undefined + end, + #{ + publish => {fun ?MODULE:handle_publish/4, [OnMessage, LocalPublish, IngressVars]}, + disconnected => {fun ?MODULE:handle_disconnect/1, []} + }. + +-spec connect(pid(), name(), ingress()) -> + {ok, pid()} | {error, _Reason}. +connect(Pid, Name, Ingress) -> + case emqtt:connect(Pid) of + {ok, _Props} -> + case subscribe_remote_topic(Pid, Ingress) of + {ok, _, _RCs} -> + {ok, Pid}; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "ingress_client_subscribe_failed", + ingress => Ingress, + reason => Reason + }), + _ = catch emqtt:stop(Pid), + Error + end; + {error, Reason} = Error -> + ?SLOG(warning, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name + }), + _ = catch emqtt:stop(Pid), + Error + end. + +subscribe_remote_topic(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) -> + emqtt:subscribe(Pid, RemoteTopic, QoS). + +%% + +-spec config(map(), name()) -> + ingress(). +config(#{remote := RC, local := LC} = Conf, BridgeName) -> + Conf#{ + remote => parse_remote(RC, BridgeName), + local => emqx_connector_mqtt_msg:parse(LC) + }. + +parse_remote(#{qos := QoSIn} = Conf, BridgeName) -> + QoS = downgrade_ingress_qos(QoSIn), + case QoS of + QoSIn -> + ok; + _ -> + ?SLOG(warning, #{ + msg => "downgraded_unsupported_ingress_qos", + qos_configured => QoSIn, + qos_used => QoS, + name => BridgeName + }) + end, + Conf#{qos => QoS}. + +downgrade_ingress_qos(2) -> + 1; +downgrade_ingress_qos(QoS) -> + QoS. + +%% + +-spec info(pid()) -> + [{atom(), term()}]. +info(Pid) -> + emqtt:info(Pid). + +-spec status(pid()) -> + emqx_resource:resource_status(). +status(Pid) -> + try + case proplists:get_value(socket, info(Pid)) of + Socket when Socket /= undefined -> + connected; + undefined -> + connecting + end + catch + exit:{noproc, _} -> + disconnected + end. + +%% + +handle_publish(#{properties := Props} = MsgIn, OnMessage, LocalPublish, IngressVars) -> + Msg = import_msg(MsgIn, IngressVars), + ?SLOG(debug, #{ + msg => "publish_local", + message => Msg + }), + maybe_on_message_received(Msg, OnMessage), + maybe_publish_local(Msg, LocalPublish, Props). + +handle_disconnect(_Reason) -> + ok. + +maybe_on_message_received(Msg, {Mod, Func, Args}) -> + erlang:apply(Mod, Func, [Msg | Args]); +maybe_on_message_received(_Msg, undefined) -> + ok. + +maybe_publish_local(Msg, Local = #{}, Props) -> + emqx_broker:publish(to_broker_msg(Msg, Local, Props)); +maybe_publish_local(_Msg, undefined, _Props) -> + ok. + +%% + +import_msg( + #{ + dup := Dup, + payload := Payload, + properties := Props, + qos := QoS, + retain := Retain, + topic := Topic + }, + #{server := Server} +) -> + #{ + id => emqx_guid:to_hexstr(emqx_guid:gen()), + server => Server, + payload => Payload, + topic => Topic, + qos => QoS, + dup => Dup, + retain => Retain, + pub_props => printable_maps(Props), + message_received_at => erlang:system_time(millisecond) + }. + +printable_maps(undefined) -> + #{}; +printable_maps(Headers) -> + maps:fold( + fun + ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [ + #{ + key => Key, + value => Value + } + || {Key, Value} <- V0 + ] + }; + (K, V0, AccIn) -> + AccIn#{K => V0} + end, + #{}, + Headers + ). + +%% published from remote node over a MQTT connection +to_broker_msg(Msg, Vars, undefined) -> + to_broker_msg(Msg, Vars, #{}); +to_broker_msg(#{dup := Dup} = Msg, Local, Props) -> + #{ + topic := Topic, + payload := Payload, + qos := QoS, + retain := Retain + } = emqx_connector_mqtt_msg:render(Msg, Local), + PubProps = maps:get(pub_props, Msg, #{}), + emqx_message:set_headers( + Props#{properties => emqx_utils:pub_props_to_packet(PubProps)}, + emqx_message:set_flags( + #{dup => Dup, retain => Retain}, + emqx_message:make(bridge, QoS, Topic, Payload) + ) + ). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl new file mode 100644 index 000000000..b57d69df6 --- /dev/null +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -0,0 +1,95 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_mqtt_msg). + +-export([parse/1]). +-export([render/2]). + +-export_type([msgvars/0]). + +-type template() :: emqx_plugin_libs_rule:tmpl_token(). + +-type msgvars() :: #{ + topic => template(), + qos => template() | emqx_types:qos(), + retain => template() | boolean(), + payload => template() | undefined +}. + +%% + +-spec parse(#{ + topic => iodata(), + qos => iodata() | emqx_types:qos(), + retain => iodata() | boolean(), + payload => iodata() +}) -> + msgvars(). +parse(Conf) -> + Acc1 = parse_field(topic, Conf, Conf), + Acc2 = parse_field(qos, Conf, Acc1), + Acc3 = parse_field(payload, Conf, Acc2), + parse_field(retain, Conf, Acc3). + +parse_field(Key, Conf, Acc) -> + case Conf of + #{Key := Val} when is_binary(Val) -> + Acc#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)}; + #{Key := Val} -> + Acc#{Key => Val}; + #{} -> + Acc + end. + +render( + Msg, + #{ + topic := TopicToken, + qos := QoSToken, + retain := RetainToken + } = Vars +) -> + #{ + topic => render_string(TopicToken, Msg), + payload => render_payload(Vars, Msg), + qos => render_simple_var(QoSToken, Msg), + retain => render_simple_var(RetainToken, Msg) + }. + +render_payload(From, MapMsg) -> + do_render_payload(maps:get(payload, From, undefined), MapMsg). + +do_render_payload(undefined, Msg) -> + emqx_utils_json:encode(Msg); +do_render_payload(Tks, Msg) -> + render_string(Tks, Msg). + +%% Replace a string contains vars to another string in which the placeholders are replace by the +%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: +%% "a: 1". +render_string(Tokens, Data) when is_list(Tokens) -> + emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary}); +render_string(Val, _Data) -> + Val. + +%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result +%% value will be an integer 1. +render_simple_var(Tokens, Data) when is_list(Tokens) -> + [Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), + Var; +render_simple_var(Val, _Data) -> + Val. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index f02fd19ad..6d06029b9 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -73,7 +73,6 @@ fields("server_configs") -> } )}, {server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, - {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})}, {reconnect_interval, mk(string(), #{deprecated => {since, "v5.0.16"}})}, {proto_ver, @@ -135,12 +134,13 @@ fields("server_configs") -> ] ++ emqx_connector_schema_lib:ssl_fields(); fields("ingress") -> [ - {"remote", + {pool_size, fun ingress_pool_size/1}, + {remote, mk( ref(?MODULE, "ingress_remote"), #{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_remote")} )}, - {"local", + {local, mk( ref(?MODULE, "ingress_local"), #{ @@ -206,7 +206,8 @@ fields("ingress_local") -> ]; fields("egress") -> [ - {"local", + {pool_size, fun egress_pool_size/1}, + {local, mk( ref(?MODULE, "egress_local"), #{ @@ -214,7 +215,7 @@ fields("egress") -> required => false } )}, - {"remote", + {remote, mk( ref(?MODULE, "egress_remote"), #{ @@ -274,6 +275,16 @@ fields("egress_remote") -> )} ]. +ingress_pool_size(desc) -> + ?DESC("ingress_pool_size"); +ingress_pool_size(Prop) -> + emqx_connector_schema_lib:pool_size(Prop). + +egress_pool_size(desc) -> + ?DESC("egress_pool_size"); +egress_pool_size(Prop) -> + emqx_connector_schema_lib:pool_size(Prop). + desc("server_configs") -> ?DESC("server_configs"); desc("ingress") -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl deleted file mode 100644 index 223d4d058..000000000 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ /dev/null @@ -1,490 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_connector_mqtt_worker). - --include_lib("emqx/include/logger.hrl"). --include_lib("emqx/include/emqx.hrl"). - -%% APIs --export([ - init/2, - connect/1, - stop/1 -]). - -%% management APIs --export([ - status/1, - ping/1, - info/1, - send_to_remote/3, - send_to_remote_async/4 -]). - --export([handle_publish/4]). --export([handle_disconnect/1]). - --export_type([config/0]). - --type template() :: emqx_plugin_libs_rule:tmpl_token(). - --type name() :: term(). --type options() :: #{ - % endpoint - server := iodata(), - pool_size := pos_integer(), - % emqtt client options - proto_ver := v3 | v4 | v5, - username := binary(), - password := binary(), - clientid := binary(), - clean_start := boolean(), - max_inflight := pos_integer(), - connect_timeout := pos_integer(), - retry_interval := timeout(), - keepalive := non_neg_integer(), - bridge_mode := boolean(), - ssl := boolean(), - ssl_opts := proplists:proplist(), - % bridge options - ingress := map(), - egress := map() -}. - --type client_option() :: - emqtt:option() - | {pool_size, pos_integer()} - | {name, name()} - | {ingress, ingress() | undefined}. - --type config() :: egress() | undefined. - --type ingress() :: #{ - remote := #{ - topic := emqx_topic:topic(), - qos => emqx_types:qos() - }, - local := msgvars(), - on_message_received := {module(), atom(), [term()]} -}. - --type egress() :: #{ - local => #{ - topic => emqx_topic:topic() - }, - remote := msgvars() -}. - --type msgvars() :: #{ - topic => template(), - qos => template() | emqx_types:qos(), - retain => template() | boolean(), - payload => template() | undefined -}. - --include_lib("emqx/include/logger.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). - --spec init(name(), options()) -> - {ok, [client_option()], config()}. -init(Name, BridgeOpts) -> - Ingress = pre_process_ingress(maps:get(ingress, BridgeOpts), Name, BridgeOpts), - Egress = pre_process_egress(maps:get(egress, BridgeOpts)), - ClientOpts = mk_client_options(Name, Ingress, BridgeOpts), - {ok, maps:to_list(ClientOpts), Egress}. - -%% @doc Start a bridge worker. --spec connect([client_option() | {ecpool_worker_id, pos_integer()}]) -> - {ok, pid()} | {error, _Reason}. -connect(ClientOpts0) -> - ?SLOG(debug, #{ - msg => "client_starting", - options => emqx_utils:redact(ClientOpts0) - }), - {value, {_, Name}, ClientOpts1} = lists:keytake(name, 1, ClientOpts0), - {value, {_, WorkerId}, ClientOpts} = lists:keytake(ecpool_worker_id, 1, ClientOpts1), - case emqtt:start_link(mk_emqtt_opts(WorkerId, ClientOpts)) of - {ok, Pid} -> - connect(Pid, Name, WorkerId, ClientOpts); - {error, Reason} = Error -> - ?SLOG(error, #{ - msg => "client_start_failed", - config => emqx_utils:redact(ClientOpts), - reason => Reason - }), - Error - end. - -mk_emqtt_opts(WorkerId, ClientOpts) -> - ClientId = proplists:get_value(clientid, ClientOpts), - lists:keystore(clientid, 1, ClientOpts, {clientid, mk_clientid(WorkerId, ClientId)}). - -mk_clientid(WorkerId, ClientId) -> - iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). - -connect(Pid, Name, WorkerId, ClientOpts) -> - case emqtt:connect(Pid) of - {ok, _Props} -> - Ingress = proplists:get_value(ingress, ClientOpts), - case subscribe_remote_topic(Pid, WorkerId, Ingress) of - false -> - {ok, Pid}; - {ok, _, _RCs} -> - {ok, Pid}; - {error, Reason} = Error -> - ?SLOG(error, #{ - msg => "client_subscribe_failed", - ingress => Ingress, - reason => Reason - }), - _ = catch emqtt:stop(Pid), - Error - end; - {error, Reason} = Error -> - ?SLOG(warning, #{ - msg => "client_connect_failed", - reason => Reason, - name => Name - }), - _ = catch emqtt:stop(Pid), - Error - end. - -subscribe_remote_topic(Pid, WorkerId, #{remote := #{topic := RemoteTopic, qos := QoS}}) -> - case emqx_topic:parse(RemoteTopic) of - {_Filter, #{share := _Name}} -> - % NOTE: this is shared subscription, each worker may subscribe - emqtt:subscribe(Pid, RemoteTopic, QoS); - {_Filter, #{}} when WorkerId =:= 1 -> - % NOTE: this is regular subscription, only the first worker should subscribe - emqtt:subscribe(Pid, RemoteTopic, QoS); - {_Filter, #{}} -> - false - end; -subscribe_remote_topic(_Ref, _, undefined) -> - false. - -mk_client_options(Name, Ingress, BridgeOpts) -> - Server = iolist_to_binary(maps:get(server, BridgeOpts)), - HostPort = emqx_connector_mqtt_schema:parse_server(Server), - CleanStart = - case Ingress of - #{remote := _} -> - maps:get(clean_start, BridgeOpts); - undefined -> - %% NOTE - %% We are ignoring the user configuration here because there's currently no reliable way - %% to ensure proper session recovery according to the MQTT spec. - true - end, - Opts = maps:with( - [ - pool_size, - proto_ver, - username, - password, - clientid, - max_inflight, - connect_timeout, - retry_interval, - keepalive, - bridge_mode, - ssl, - ssl_opts - ], - BridgeOpts - ), - Opts#{ - name => Name, - ingress => Ingress, - msg_handler => mk_client_event_handler(Ingress, #{server => Server}), - hosts => [HostPort], - clean_start => CleanStart, - force_ping => true - }. - -mk_client_event_handler(Ingress = #{}, Opts) -> - OnMessage = maps:get(on_message_received, Ingress, undefined), - LocalPublish = - case Ingress of - #{local := Local = #{topic := _}} -> - Local; - #{} -> - undefined - end, - #{ - publish => {fun ?MODULE:handle_publish/4, [OnMessage, LocalPublish, Opts]}, - disconnected => {fun ?MODULE:handle_disconnect/1, []} - }; -mk_client_event_handler(undefined, _Opts) -> - undefined. - -stop(Pid) -> - emqtt:stop(Pid). - -info(Pid) -> - emqtt:info(Pid). - -status(Pid) -> - try - case proplists:get_value(socket, info(Pid)) of - Socket when Socket /= undefined -> - connected; - undefined -> - connecting - end - catch - exit:{noproc, _} -> - disconnected - end. - -ping(Pid) -> - emqtt:ping(Pid). - -send_to_remote(Pid, MsgIn, Conf) -> - do_send(Pid, export_msg(MsgIn, Conf)). - -do_send(Pid, Msg) when Msg /= undefined -> - emqtt:publish(Pid, Msg); -do_send(_Name, undefined) -> - ok. - -send_to_remote_async(Pid, MsgIn, Callback, Conf) -> - do_send_async(Pid, export_msg(MsgIn, Conf), Callback). - -do_send_async(Pid, Msg, Callback) when Msg /= undefined -> - ok = emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback), - {ok, Pid}; -do_send_async(_Pid, undefined, _Callback) -> - ok. - -pre_process_ingress( - #{remote := RC, local := LC} = Conf, - BridgeName, - BridgeOpts -) when is_map(Conf) -> - Conf#{ - remote => pre_process_in_remote(RC, BridgeName, BridgeOpts), - local => pre_process_common(LC) - }; -pre_process_ingress(Conf, _, _) when is_map(Conf) -> - %% have no 'local' field in the config - undefined. - -pre_process_egress(#{remote := RC} = Conf) when is_map(Conf) -> - Conf#{remote => pre_process_common(RC)}; -pre_process_egress(Conf) when is_map(Conf) -> - %% have no 'remote' field in the config - undefined. - -pre_process_common(Conf0) -> - Conf1 = pre_process_conf(topic, Conf0), - Conf2 = pre_process_conf(qos, Conf1), - Conf3 = pre_process_conf(payload, Conf2), - pre_process_conf(retain, Conf3). - -pre_process_conf(Key, Conf) -> - case maps:find(Key, Conf) of - error -> - Conf; - {ok, Val} when is_binary(Val) -> - Conf#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)}; - {ok, Val} -> - Conf#{Key => Val} - end. - -pre_process_in_remote(#{qos := QoSIn} = Conf, BridgeName, BridgeOpts) -> - QoS = downgrade_ingress_qos(QoSIn), - case QoS of - QoSIn -> - ok; - _ -> - ?SLOG(warning, #{ - msg => "downgraded_unsupported_ingress_qos", - qos_configured => QoSIn, - qos_used => QoS, - name => BridgeName, - options => BridgeOpts - }) - end, - Conf#{qos => QoS}. - -downgrade_ingress_qos(2) -> - 1; -downgrade_ingress_qos(QoS) -> - QoS. - -export_msg(Msg, #{remote := Remote}) -> - to_remote_msg(Msg, Remote); -export_msg(Msg, undefined) -> - ?SLOG(error, #{ - msg => "forwarding_unavailable", - message => Msg, - reason => "egress is not configured" - }), - undefined. - -%% - -handle_publish(#{properties := Props} = MsgIn, OnMessage, LocalPublish, Opts) -> - Msg = import_msg(MsgIn, Opts), - ?SLOG(debug, #{ - msg => "publish_local", - message => Msg - }), - maybe_on_message_received(Msg, OnMessage), - maybe_publish_local(Msg, LocalPublish, Props). - -handle_disconnect(_Reason) -> - ok. - -maybe_on_message_received(Msg, {Mod, Func, Args}) -> - erlang:apply(Mod, Func, [Msg | Args]); -maybe_on_message_received(_Msg, undefined) -> - ok. - -maybe_publish_local(Msg, Local = #{}, Props) -> - emqx_broker:publish(to_broker_msg(Msg, Local, Props)); -maybe_publish_local(_Msg, undefined, _Props) -> - ok. - -import_msg( - #{ - dup := Dup, - payload := Payload, - properties := Props, - qos := QoS, - retain := Retain, - topic := Topic - }, - #{server := Server} -) -> - #{ - id => emqx_guid:to_hexstr(emqx_guid:gen()), - server => Server, - payload => Payload, - topic => Topic, - qos => QoS, - dup => Dup, - retain => Retain, - pub_props => printable_maps(Props), - message_received_at => erlang:system_time(millisecond) - }. - -printable_maps(undefined) -> - #{}; -printable_maps(Headers) -> - maps:fold( - fun - ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{ - 'User-Property' => maps:from_list(V0), - 'User-Property-Pairs' => [ - #{ - key => Key, - value => Value - } - || {Key, Value} <- V0 - ] - }; - (K, V0, AccIn) -> - AccIn#{K => V0} - end, - #{}, - Headers - ). - -%% Shame that we have to know the callback module here -%% would be great if we can get rid of #mqtt_msg{} record -%% and use #message{} in all places. --spec to_remote_msg(emqx_types:message() | map(), msgvars()) -> - #mqtt_msg{}. -to_remote_msg(#message{flags = Flags} = Msg, Vars) -> - {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), - to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars); -to_remote_msg( - MapMsg, - #{ - topic := TopicToken, - qos := QoSToken, - retain := RetainToken - } = Remote -) when is_map(MapMsg) -> - Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = process_payload(Remote, MapMsg), - QoS = replace_simple_var(QoSToken, MapMsg), - Retain = replace_simple_var(RetainToken, MapMsg), - PubProps = maps:get(pub_props, MapMsg, #{}), - #mqtt_msg{ - qos = QoS, - retain = Retain, - topic = Topic, - props = emqx_utils:pub_props_to_packet(PubProps), - payload = Payload - }. - -%% published from remote node over a MQTT connection -to_broker_msg(Msg, Vars, undefined) -> - to_broker_msg(Msg, Vars, #{}); -to_broker_msg( - #{dup := Dup} = MapMsg, - #{ - topic := TopicToken, - qos := QoSToken, - retain := RetainToken - } = Local, - Props -) -> - Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = process_payload(Local, MapMsg), - QoS = replace_simple_var(QoSToken, MapMsg), - Retain = replace_simple_var(RetainToken, MapMsg), - PubProps = maps:get(pub_props, MapMsg, #{}), - set_headers( - Props#{properties => emqx_utils:pub_props_to_packet(PubProps)}, - emqx_message:set_flags( - #{dup => Dup, retain => Retain}, - emqx_message:make(bridge, QoS, Topic, Payload) - ) - ). - -process_payload(From, MapMsg) -> - do_process_payload(maps:get(payload, From, undefined), MapMsg). - -do_process_payload(undefined, Msg) -> - emqx_utils_json:encode(Msg); -do_process_payload(Tks, Msg) -> - replace_vars_in_str(Tks, Msg). - -%% Replace a string contains vars to another string in which the placeholders are replace by the -%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: -%% "a: 1". -replace_vars_in_str(Tokens, Data) when is_list(Tokens) -> - emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary}); -replace_vars_in_str(Val, _Data) -> - Val. - -%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result -%% value will be an integer 1. -replace_simple_var(Tokens, Data) when is_list(Tokens) -> - [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), - Var; -replace_simple_var(Val, _Data) -> - Val. - -set_headers(Val, Msg) -> - emqx_message:set_headers(Val, Msg). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 840c6cfec..37d7b1696 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -121,7 +121,8 @@ -export_type([ resource_id/0, - resource_data/0 + resource_data/0, + resource_status/0 ]). -optional_callbacks([ diff --git a/rel/i18n/emqx_connector_mqtt_schema.hocon b/rel/i18n/emqx_connector_mqtt_schema.hocon index e37e87e49..509fc4209 100644 --- a/rel/i18n/emqx_connector_mqtt_schema.hocon +++ b/rel/i18n/emqx_connector_mqtt_schema.hocon @@ -32,6 +32,14 @@ is configured, then both the data got from the rule and the MQTT messages that m egress_desc.label: """Egress Configs""" +egress_pool_size.desc: +"""Size of the pool of MQTT clients that will publish messages to the remote broker.
+ Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:egress:${node}:${n}' + where 'n' is the number of a client inside the pool.""" + +egress_pool_size.label: +"""Pool Size""" + egress_local.desc: """The configs about receiving messages from local broker.""" @@ -75,6 +83,16 @@ ingress_desc.desc: ingress_desc.label: """Ingress Configs""" +ingress_pool_size.desc: +"""Size of the pool of MQTT clients that will ingest messages from the remote broker.
+ This value will be respected only if 'remote.topic' is a shared subscription topic filter, + otherwise only a single MQTT client will be used. + Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:ingress:${node}:${n}' + where 'n' is the number of a client inside the pool.""" + +ingress_pool_size.label: +"""Pool Size""" + ingress_local.desc: """The configs about sending message to the local broker."""