diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index fd67c622c..1a3edb484 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -58,14 +58,14 @@ ). -if(?EMQX_RELEASE_EDITION == ee). -bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; -bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; +bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; +bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; bridge_to_resource_type(webhook) -> emqx_connector_http; bridge_to_resource_type(BridgeType) -> emqx_ee_bridge:resource_type(BridgeType). -else. -bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; -bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; +bridge_to_resource_type(<<"mqtt">>) -> emqx_bridge_mqtt_connector; +bridge_to_resource_type(mqtt) -> emqx_bridge_mqtt_connector; bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; bridge_to_resource_type(webhook) -> emqx_connector_http. -endif. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 1ac6750a4..ecab986e8 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -47,7 +47,14 @@ <<"server">> => SERVER, <<"username">> => <<"user1">>, <<"password">> => <<"">>, - <<"proto_ver">> => <<"v5">> + <<"proto_ver">> => <<"v5">>, + <<"egress">> => #{ + <<"remote">> => #{ + <<"topic">> => <<"emqx/${topic}">>, + <<"qos">> => <<"${qos}">>, + <<"retain">> => false + } + } }). -define(MQTT_BRIDGE(SERVER), ?MQTT_BRIDGE(SERVER, <<"mqtt_egress_test_bridge">>)). diff --git a/apps/emqx_bridge_mqtt/README.md b/apps/emqx_bridge_mqtt/README.md new file mode 100644 index 000000000..f286913e7 --- /dev/null +++ b/apps/emqx_bridge_mqtt/README.md @@ -0,0 +1,32 @@ +# EMQX MQTT Broker Bridge + +This application connects EMQX to virtually any MQTT broker adhering to either [MQTTv3][1] or [MQTTv5][2] standard. The connection is facilitated through the _MQTT bridge_ abstraction, allowing for the flow of data in both directions: from the remote broker to EMQX (ingress) and from EMQX to the remote broker (egress). + +User can create a rule and easily ingest into a remote MQTT broker by leveraging [EMQX Rules][3]. + + +# Documentation + +- Refer to [Bridge Data into MQTT Broker][4] for how to use EMQX dashboard to set up ingress or egress bridge, or even both at the same time. + +- Refer to [EMQX Rules][3] for the EMQX rules engine introduction. + + +# HTTP APIs + +Several APIs are provided for bridge management, refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) for more detailed information. + + +# Contributing + +Please see our [contributing guide](../../CONTRIBUTING.md). + + +# License + +Apache License 2.0, see [LICENSE](../../APL.txt). + +[1]: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1 +[2]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html +[3]: https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html +[4]: https://www.emqx.io/docs/en/v5.0/data-integration/data-bridge-mqtt.html diff --git a/apps/emqx_bridge_mqtt/rebar.config b/apps/emqx_bridge_mqtt/rebar.config new file mode 100644 index 000000000..35ccc1a37 --- /dev/null +++ b/apps/emqx_bridge_mqtt/rebar.config @@ -0,0 +1,3 @@ +{deps, [ + {emqx, {path, "../../apps/emqx"}} +]}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src new file mode 100644 index 000000000..54d7ffbed --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -0,0 +1,18 @@ +%% -*- mode: erlang -*- +{application, emqx_bridge_mqtt, [ + {description, "EMQX MQTT Broker Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx, + emqx_resource, + emqx_bridge, + emqtt + ]}, + {env, []}, + {modules, []}, + {licenses, ["Apache 2.0"]}, + {links, []} +]}. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl new file mode 100644 index 000000000..60a512011 --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -0,0 +1,340 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_mqtt_connector). + +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-behaviour(emqx_resource). + +-export([on_message_received/3]). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_query_async/4, + on_get_status/2 +]). + +-export([on_async_result/2]). + +-define(HEALTH_CHECK_TIMEOUT, 1000). + +%% =================================================================== +%% When use this bridge as a data source, ?MODULE:on_message_received will be called +%% if the bridge received msgs from the remote broker. +on_message_received(Msg, HookPoint, ResId) -> + emqx_resource_metrics:received_inc(ResId), + emqx:run_hook(HookPoint, [Msg]). + +%% =================================================================== +callback_mode() -> async_if_possible. + +on_start(ResourceId, Conf) -> + ?SLOG(info, #{ + msg => "starting_mqtt_connector", + connector => ResourceId, + config => emqx_utils:redact(Conf) + }), + case start_ingress(ResourceId, Conf) of + {ok, Result1} -> + case start_egress(ResourceId, Conf) of + {ok, Result2} -> + {ok, maps:merge(Result1, Result2)}; + {error, Reason} -> + _ = stop_ingress(Result1), + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +start_ingress(ResourceId, Conf) -> + ClientOpts = mk_client_opts(ResourceId, "ingress", Conf), + case mk_ingress_config(ResourceId, Conf) of + Ingress = #{} -> + start_ingress(ResourceId, Ingress, ClientOpts); + undefined -> + {ok, #{}} + end. + +start_ingress(ResourceId, Ingress, ClientOpts) -> + PoolName = <>, + PoolSize = choose_ingress_pool_size(ResourceId, Ingress), + Options = [ + {name, PoolName}, + {pool_size, PoolSize}, + {ingress, Ingress}, + {client_opts, ClientOpts} + ], + case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_ingress, Options) of + ok -> + {ok, #{ingress_pool_name => PoolName}}; + {error, {start_pool_failed, _, Reason}} -> + {error, Reason} + end. + +choose_ingress_pool_size( + ResourceId, + #{remote := #{topic := RemoteTopic}, pool_size := PoolSize} +) -> + case emqx_topic:parse(RemoteTopic) of + {_Filter, #{share := _Name}} -> + % NOTE: this is shared subscription, many workers may subscribe + PoolSize; + {_Filter, #{}} -> + % NOTE: this is regular subscription, only one worker should subscribe + ?SLOG(warning, #{ + msg => "mqtt_bridge_ingress_pool_size_ignored", + connector => ResourceId, + reason => + "Remote topic filter is not a shared subscription, " + "ingress pool will start with a single worker", + config_pool_size => PoolSize, + pool_size => 1 + }), + 1 + end. + +start_egress(ResourceId, Conf) -> + % NOTE + % We are ignoring the user configuration here because there's currently no reliable way + % to ensure proper session recovery according to the MQTT spec. + ClientOpts = maps:put(clean_start, true, mk_client_opts(ResourceId, "egress", Conf)), + case mk_egress_config(Conf) of + Egress = #{} -> + start_egress(ResourceId, Egress, ClientOpts); + undefined -> + {ok, #{}} + end. + +start_egress(ResourceId, Egress, ClientOpts) -> + PoolName = <>, + PoolSize = maps:get(pool_size, Egress), + Options = [ + {name, PoolName}, + {pool_size, PoolSize}, + {client_opts, ClientOpts} + ], + case emqx_resource_pool:start(PoolName, emqx_bridge_mqtt_egress, Options) of + ok -> + {ok, #{ + egress_pool_name => PoolName, + egress_config => emqx_bridge_mqtt_egress:config(Egress) + }}; + {error, {start_pool_failed, _, Reason}} -> + {error, Reason} + end. + +on_stop(ResourceId, State) -> + ?SLOG(info, #{ + msg => "stopping_mqtt_connector", + connector => ResourceId + }), + ok = stop_ingress(State), + ok = stop_egress(State). + +stop_ingress(#{ingress_pool_name := PoolName}) -> + emqx_resource_pool:stop(PoolName); +stop_ingress(#{}) -> + ok. + +stop_egress(#{egress_pool_name := PoolName}) -> + emqx_resource_pool:stop(PoolName); +stop_egress(#{}) -> + ok. + +on_query( + ResourceId, + {send_message, Msg}, + #{egress_pool_name := PoolName, egress_config := Config} +) -> + ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), + handle_send_result(with_egress_client(PoolName, send, [Msg, Config])); +on_query(ResourceId, {send_message, Msg}, #{}) -> + ?SLOG(error, #{ + msg => "forwarding_unavailable", + connector => ResourceId, + message => Msg, + reason => "Egress is not configured" + }). + +on_query_async( + ResourceId, + {send_message, Msg}, + CallbackIn, + #{egress_pool_name := PoolName, egress_config := Config} +) -> + ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), + Callback = {fun on_async_result/2, [CallbackIn]}, + Result = with_egress_client(PoolName, send_async, [Msg, Callback, Config]), + case Result of + ok -> + ok; + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {error, Reason} -> + {error, classify_error(Reason)} + end; +on_query_async(ResourceId, {send_message, Msg}, _Callback, #{}) -> + ?SLOG(error, #{ + msg => "forwarding_unavailable", + connector => ResourceId, + message => Msg, + reason => "Egress is not configured" + }). + +with_egress_client(ResourceId, Fun, Args) -> + ecpool:pick_and_do(ResourceId, {emqx_bridge_mqtt_egress, Fun, Args}, no_handover). + +on_async_result(Callback, Result) -> + apply_callback_function(Callback, handle_send_result(Result)). + +apply_callback_function(F, Result) when is_function(F) -> + erlang:apply(F, [Result]); +apply_callback_function({F, A}, Result) when is_function(F), is_list(A) -> + erlang:apply(F, A ++ [Result]); +apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) -> + erlang:apply(M, F, A ++ [Result]). + +handle_send_result(ok) -> + ok; +handle_send_result({ok, #{reason_code := ?RC_SUCCESS}}) -> + ok; +handle_send_result({ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}}) -> + ok; +handle_send_result({ok, Reply}) -> + {error, classify_reply(Reply)}; +handle_send_result({error, Reason}) -> + {error, classify_error(Reason)}. + +classify_reply(Reply = #{reason_code := _}) -> + {unrecoverable_error, Reply}. + +classify_error(disconnected = Reason) -> + {recoverable_error, Reason}; +classify_error(ecpool_empty) -> + {recoverable_error, disconnected}; +classify_error({disconnected, _RC, _} = Reason) -> + {recoverable_error, Reason}; +classify_error({shutdown, _} = Reason) -> + {recoverable_error, Reason}; +classify_error(shutdown = Reason) -> + {recoverable_error, Reason}; +classify_error(Reason) -> + {unrecoverable_error, Reason}. + +on_get_status(_ResourceId, State) -> + Pools = maps:to_list(maps:with([ingress_pool_name, egress_pool_name], State)), + Workers = [{Pool, Worker} || {Pool, PN} <- Pools, {_Name, Worker} <- ecpool:workers(PN)], + try emqx_utils:pmap(fun get_status/1, Workers, ?HEALTH_CHECK_TIMEOUT) of + Statuses -> + combine_status(Statuses) + catch + exit:timeout -> + connecting + end. + +get_status({Pool, Worker}) -> + case ecpool_worker:client(Worker) of + {ok, Client} when Pool == ingress_pool_name -> + emqx_bridge_mqtt_ingress:status(Client); + {ok, Client} when Pool == egress_pool_name -> + emqx_bridge_mqtt_egress:status(Client); + {error, _} -> + disconnected + end. + +combine_status(Statuses) -> + %% NOTE + %% Natural order of statuses: [connected, connecting, disconnected] + %% * `disconnected` wins over any other status + %% * `connecting` wins over `connected` + case lists:reverse(lists:usort(Statuses)) of + [Status | _] -> + Status; + [] -> + disconnected + end. + +mk_ingress_config( + ResourceId, + #{ + ingress := Ingress = #{remote := _}, + server := Server, + hookpoint := HookPoint + } +) -> + Ingress#{ + server => Server, + on_message_received => {?MODULE, on_message_received, [HookPoint, ResourceId]} + }; +mk_ingress_config(ResourceId, #{ingress := #{remote := _}} = Conf) -> + error({no_hookpoint_provided, ResourceId, Conf}); +mk_ingress_config(_ResourceId, #{}) -> + undefined. + +mk_egress_config(#{egress := Egress = #{remote := _}}) -> + Egress; +mk_egress_config(#{}) -> + undefined. + +mk_client_opts( + ResourceId, + ClientScope, + Config = #{ + server := Server, + keepalive := KeepAlive, + ssl := #{enable := EnableSsl} = Ssl + } +) -> + HostPort = emqx_bridge_mqtt_connector_schema:parse_server(Server), + Options = maps:with( + [ + proto_ver, + username, + password, + clean_start, + retry_interval, + max_inflight, + % Opening a connection in bridge mode will form a non-standard mqtt connection message. + % A load balancing server (such as haproxy) is often set up before the emqx broker server. + % When the load balancing server enables mqtt connection packet inspection, + % non-standard mqtt connection packets might be filtered out by LB. + bridge_mode + ], + Config + ), + Options#{ + hosts => [HostPort], + clientid => clientid(ResourceId, ClientScope, Config), + connect_timeout => 30, + keepalive => ms_to_s(KeepAlive), + force_ping => true, + ssl => EnableSsl, + ssl_opts => maps:to_list(maps:remove(enable, Ssl)) + }. + +ms_to_s(Ms) -> + erlang:ceil(Ms / 1000). + +clientid(Id, ClientScope, _Conf = #{clientid_prefix := Prefix}) when is_binary(Prefix) -> + iolist_to_binary([Prefix, ":", Id, ":", ClientScope, ":", atom_to_list(node())]); +clientid(Id, ClientScope, _Conf) -> + iolist_to_binary([Id, ":", ClientScope, ":", atom_to_list(node())]). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl similarity index 91% rename from apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl rename to apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl index 2a40980af..1dc3ca5f8 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector_schema.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_connector_mqtt_schema). +-module(emqx_bridge_mqtt_connector_schema). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -68,7 +68,8 @@ fields("server_configs") -> hoconsc:enum([cluster_shareload]), #{ default => cluster_shareload, - desc => ?DESC("mode") + desc => ?DESC("mode"), + deprecated => {since, "v5.1.0 & e5.1.0"} } )}, {server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, @@ -133,16 +134,17 @@ fields("server_configs") -> ] ++ emqx_connector_schema_lib:ssl_fields(); fields("ingress") -> [ - {"remote", + {pool_size, fun ingress_pool_size/1}, + {remote, mk( ref(?MODULE, "ingress_remote"), - #{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_remote")} + #{desc => ?DESC("ingress_remote")} )}, - {"local", + {local, mk( ref(?MODULE, "ingress_local"), #{ - desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local") + desc => ?DESC("ingress_local") } )} ]; @@ -204,19 +206,20 @@ fields("ingress_local") -> ]; fields("egress") -> [ - {"local", + {pool_size, fun egress_pool_size/1}, + {local, mk( ref(?MODULE, "egress_local"), #{ - desc => ?DESC(emqx_connector_mqtt_schema, "egress_local"), + desc => ?DESC("egress_local"), required => false } )}, - {"remote", + {remote, mk( ref(?MODULE, "egress_remote"), #{ - desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote"), + desc => ?DESC("egress_remote"), required => true } )} @@ -272,6 +275,16 @@ fields("egress_remote") -> )} ]. +ingress_pool_size(desc) -> + ?DESC("ingress_pool_size"); +ingress_pool_size(Prop) -> + emqx_connector_schema_lib:pool_size(Prop). + +egress_pool_size(desc) -> + ?DESC("egress_pool_size"); +egress_pool_size(Prop) -> + emqx_connector_schema_lib:pool_size(Prop). + desc("server_configs") -> ?DESC("server_configs"); desc("ingress") -> diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl new file mode 100644 index 000000000..673a30726 --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_egress.erl @@ -0,0 +1,162 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_egress). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). + +-behaviour(ecpool_worker). + +%% ecpool +-export([connect/1]). + +-export([ + config/1, + send/3, + send_async/4 +]). + +%% management APIs +-export([ + status/1, + info/1 +]). + +-type name() :: term(). +-type message() :: emqx_types:message() | map(). +-type callback() :: {function(), [_Arg]} | {module(), atom(), [_Arg]}. +-type remote_message() :: #mqtt_msg{}. + +-type option() :: + {name, name()} + %% see `emqtt:option()` + | {client_opts, map()}. + +-type egress() :: #{ + local => #{ + topic => emqx_topic:topic() + }, + remote := emqx_bridge_mqtt_msg:msgvars() +}. + +%% @doc Start an ingress bridge worker. +-spec connect([option() | {ecpool_worker_id, pos_integer()}]) -> + {ok, pid()} | {error, _Reason}. +connect(Options) -> + ?SLOG(debug, #{ + msg => "egress_client_starting", + options => emqx_utils:redact(Options) + }), + Name = proplists:get_value(name, Options), + WorkerId = proplists:get_value(ecpool_worker_id, Options), + ClientOpts = proplists:get_value(client_opts, Options), + case emqtt:start_link(mk_client_opts(WorkerId, ClientOpts)) of + {ok, Pid} -> + connect(Pid, Name); + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "egress_client_start_failed", + config => emqx_utils:redact(ClientOpts), + reason => Reason + }), + Error + end. + +mk_client_opts(WorkerId, ClientOpts = #{clientid := ClientId}) -> + ClientOpts#{clientid := mk_clientid(WorkerId, ClientId)}. + +mk_clientid(WorkerId, ClientId) -> + iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + +connect(Pid, Name) -> + case emqtt:connect(Pid) of + {ok, _Props} -> + {ok, Pid}; + {error, Reason} = Error -> + ?SLOG(warning, #{ + msg => "egress_client_connect_failed", + reason => Reason, + name => Name + }), + _ = catch emqtt:stop(Pid), + Error + end. + +%% + +-spec config(map()) -> + egress(). +config(#{remote := RC = #{}} = Conf) -> + Conf#{remote => emqx_bridge_mqtt_msg:parse(RC)}. + +-spec send(pid(), message(), egress()) -> + ok. +send(Pid, MsgIn, Egress) -> + emqtt:publish(Pid, export_msg(MsgIn, Egress)). + +-spec send_async(pid(), message(), callback(), egress()) -> + ok | {ok, pid()}. +send_async(Pid, MsgIn, Callback, Egress) -> + ok = emqtt:publish_async(Pid, export_msg(MsgIn, Egress), _Timeout = infinity, Callback), + {ok, Pid}. + +export_msg(Msg, #{remote := Remote}) -> + to_remote_msg(Msg, Remote). + +-spec to_remote_msg(message(), emqx_bridge_mqtt_msg:msgvars()) -> + remote_message(). +to_remote_msg(#message{flags = Flags} = Msg, Vars) -> + {EventMsg, _} = emqx_rule_events:eventmsg_publish(Msg), + to_remote_msg(EventMsg#{retain => maps:get(retain, Flags, false)}, Vars); +to_remote_msg(Msg = #{}, Remote) -> + #{ + topic := Topic, + payload := Payload, + qos := QoS, + retain := Retain + } = emqx_bridge_mqtt_msg:render(Msg, Remote), + PubProps = maps:get(pub_props, Msg, #{}), + #mqtt_msg{ + qos = QoS, + retain = Retain, + topic = Topic, + props = emqx_utils:pub_props_to_packet(PubProps), + payload = Payload + }. + +%% + +-spec info(pid()) -> + [{atom(), term()}]. +info(Pid) -> + emqtt:info(Pid). + +-spec status(pid()) -> + emqx_resource:resource_status(). +status(Pid) -> + try + case proplists:get_value(socket, info(Pid)) of + Socket when Socket /= undefined -> + connected; + undefined -> + connecting + end + catch + exit:{noproc, _} -> + disconnected + end. diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl new file mode 100644 index 000000000..91ec27e74 --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -0,0 +1,274 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_ingress). + +-include_lib("emqx/include/logger.hrl"). + +-behaviour(ecpool_worker). + +%% ecpool +-export([connect/1]). + +%% management APIs +-export([ + status/1, + info/1 +]). + +-export([handle_publish/5]). +-export([handle_disconnect/1]). + +-type name() :: term(). + +-type option() :: + {name, name()} + | {ingress, map()} + %% see `emqtt:option()` + | {client_opts, map()}. + +-type ingress() :: #{ + server := string(), + remote := #{ + topic := emqx_topic:topic(), + qos => emqx_types:qos() + }, + local := emqx_bridge_mqtt_msg:msgvars(), + on_message_received := {module(), atom(), [term()]} +}. + +%% @doc Start an ingress bridge worker. +-spec connect([option() | {ecpool_worker_id, pos_integer()}]) -> + {ok, pid()} | {error, _Reason}. +connect(Options) -> + ?SLOG(debug, #{ + msg => "ingress_client_starting", + options => emqx_utils:redact(Options) + }), + Name = proplists:get_value(name, Options), + WorkerId = proplists:get_value(ecpool_worker_id, Options), + Ingress = config(proplists:get_value(ingress, Options), Name), + ClientOpts = proplists:get_value(client_opts, Options), + case emqtt:start_link(mk_client_opts(Name, WorkerId, Ingress, ClientOpts)) of + {ok, Pid} -> + connect(Pid, Name, Ingress); + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "client_start_failed", + config => emqx_utils:redact(ClientOpts), + reason => Reason + }), + Error + end. + +mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) -> + ClientOpts#{ + clientid := mk_clientid(WorkerId, ClientId), + msg_handler => mk_client_event_handler(Name, Ingress) + }. + +mk_clientid(WorkerId, ClientId) -> + iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). + +mk_client_event_handler(Name, Ingress = #{}) -> + IngressVars = maps:with([server], Ingress), + OnMessage = maps:get(on_message_received, Ingress, undefined), + LocalPublish = + case Ingress of + #{local := Local = #{topic := _}} -> + Local; + #{} -> + undefined + end, + #{ + publish => {fun ?MODULE:handle_publish/5, [Name, OnMessage, LocalPublish, IngressVars]}, + disconnected => {fun ?MODULE:handle_disconnect/1, []} + }. + +-spec connect(pid(), name(), ingress()) -> + {ok, pid()} | {error, _Reason}. +connect(Pid, Name, Ingress) -> + case emqtt:connect(Pid) of + {ok, _Props} -> + case subscribe_remote_topic(Pid, Ingress) of + {ok, _, _RCs} -> + {ok, Pid}; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "ingress_client_subscribe_failed", + ingress => Ingress, + name => Name, + reason => Reason + }), + _ = catch emqtt:stop(Pid), + Error + end; + {error, Reason} = Error -> + ?SLOG(warning, #{ + msg => "ingress_client_connect_failed", + reason => Reason, + name => Name + }), + _ = catch emqtt:stop(Pid), + Error + end. + +subscribe_remote_topic(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) -> + emqtt:subscribe(Pid, RemoteTopic, QoS). + +%% + +-spec config(map(), name()) -> + ingress(). +config(#{remote := RC, local := LC} = Conf, BridgeName) -> + Conf#{ + remote => parse_remote(RC, BridgeName), + local => emqx_bridge_mqtt_msg:parse(LC) + }. + +parse_remote(#{qos := QoSIn} = Conf, BridgeName) -> + QoS = downgrade_ingress_qos(QoSIn), + case QoS of + QoSIn -> + ok; + _ -> + ?SLOG(warning, #{ + msg => "downgraded_unsupported_ingress_qos", + qos_configured => QoSIn, + qos_used => QoS, + name => BridgeName + }) + end, + Conf#{qos => QoS}. + +downgrade_ingress_qos(2) -> + 1; +downgrade_ingress_qos(QoS) -> + QoS. + +%% + +-spec info(pid()) -> + [{atom(), term()}]. +info(Pid) -> + emqtt:info(Pid). + +-spec status(pid()) -> + emqx_resource:resource_status(). +status(Pid) -> + try + case proplists:get_value(socket, info(Pid)) of + Socket when Socket /= undefined -> + connected; + undefined -> + connecting + end + catch + exit:{noproc, _} -> + disconnected + end. + +%% + +handle_publish(#{properties := Props} = MsgIn, Name, OnMessage, LocalPublish, IngressVars) -> + Msg = import_msg(MsgIn, IngressVars), + ?SLOG(debug, #{ + msg => "ingress_publish_local", + message => Msg, + name => Name + }), + maybe_on_message_received(Msg, OnMessage), + maybe_publish_local(Msg, LocalPublish, Props). + +handle_disconnect(_Reason) -> + ok. + +maybe_on_message_received(Msg, {Mod, Func, Args}) -> + erlang:apply(Mod, Func, [Msg | Args]); +maybe_on_message_received(_Msg, undefined) -> + ok. + +maybe_publish_local(Msg, Local = #{}, Props) -> + emqx_broker:publish(to_broker_msg(Msg, Local, Props)); +maybe_publish_local(_Msg, undefined, _Props) -> + ok. + +%% + +import_msg( + #{ + dup := Dup, + payload := Payload, + properties := Props, + qos := QoS, + retain := Retain, + topic := Topic + }, + #{server := Server} +) -> + #{ + id => emqx_guid:to_hexstr(emqx_guid:gen()), + server => Server, + payload => Payload, + topic => Topic, + qos => QoS, + dup => Dup, + retain => Retain, + pub_props => printable_maps(Props), + message_received_at => erlang:system_time(millisecond) + }. + +printable_maps(undefined) -> + #{}; +printable_maps(Headers) -> + maps:fold( + fun + ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [ + #{ + key => Key, + value => Value + } + || {Key, Value} <- V0 + ] + }; + (K, V0, AccIn) -> + AccIn#{K => V0} + end, + #{}, + Headers + ). + +%% published from remote node over a MQTT connection +to_broker_msg(Msg, Vars, undefined) -> + to_broker_msg(Msg, Vars, #{}); +to_broker_msg(#{dup := Dup} = Msg, Local, Props) -> + #{ + topic := Topic, + payload := Payload, + qos := QoS, + retain := Retain + } = emqx_bridge_mqtt_msg:render(Msg, Local), + PubProps = maps:get(pub_props, Msg, #{}), + emqx_message:set_headers( + Props#{properties => emqx_utils:pub_props_to_packet(PubProps)}, + emqx_message:set_flags( + #{dup => Dup, retain => Retain}, + emqx_message:make(bridge, QoS, Topic, Payload) + ) + ). diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl new file mode 100644 index 000000000..8a8cffe55 --- /dev/null +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_msg.erl @@ -0,0 +1,95 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_msg). + +-export([parse/1]). +-export([render/2]). + +-export_type([msgvars/0]). + +-type template() :: emqx_plugin_libs_rule:tmpl_token(). + +-type msgvars() :: #{ + topic => template(), + qos => template() | emqx_types:qos(), + retain => template() | boolean(), + payload => template() | undefined +}. + +%% + +-spec parse(#{ + topic => iodata(), + qos => iodata() | emqx_types:qos(), + retain => iodata() | boolean(), + payload => iodata() +}) -> + msgvars(). +parse(Conf) -> + Acc1 = parse_field(topic, Conf, Conf), + Acc2 = parse_field(qos, Conf, Acc1), + Acc3 = parse_field(payload, Conf, Acc2), + parse_field(retain, Conf, Acc3). + +parse_field(Key, Conf, Acc) -> + case Conf of + #{Key := Val} when is_binary(Val) -> + Acc#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)}; + #{Key := Val} -> + Acc#{Key => Val}; + #{} -> + Acc + end. + +render( + Msg, + #{ + topic := TopicToken, + qos := QoSToken, + retain := RetainToken + } = Vars +) -> + #{ + topic => render_string(TopicToken, Msg), + payload => render_payload(Vars, Msg), + qos => render_simple_var(QoSToken, Msg), + retain => render_simple_var(RetainToken, Msg) + }. + +render_payload(From, MapMsg) -> + do_render_payload(maps:get(payload, From, undefined), MapMsg). + +do_render_payload(undefined, Msg) -> + emqx_utils_json:encode(Msg); +do_render_payload(Tks, Msg) -> + render_string(Tks, Msg). + +%% Replace a string contains vars to another string in which the placeholders are replace by the +%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: +%% "a: 1". +render_string(Tokens, Data) when is_list(Tokens) -> + emqx_placeholder:proc_tmpl(Tokens, Data, #{return => full_binary}); +render_string(Val, _Data) -> + Val. + +%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result +%% value will be an integer 1. +render_simple_var(Tokens, Data) when is_list(Tokens) -> + [Var] = emqx_placeholder:proc_tmpl(Tokens, Data, #{return => rawlist}), + Var; +render_simple_var(Val, _Data) -> + Val. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl similarity index 97% rename from apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl rename to apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl index 5cd1693c7..a312dfaa9 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_schema.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_schema.erl @@ -42,7 +42,7 @@ fields("config") -> } )} ] ++ - emqx_connector_mqtt_schema:fields("config"); + emqx_bridge_mqtt_connector_schema:fields("config"); fields("creation_opts") -> Opts = emqx_resource_schema:fields("creation_opts"), [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl similarity index 93% rename from apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl rename to apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl index f0de07da2..81f9a3573 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_mqtt_SUITE.erl @@ -22,9 +22,7 @@ -include("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include("emqx_dashboard/include/emqx_dashboard.hrl"). %% output functions -export([inspect/3]). @@ -132,13 +130,11 @@ suite() -> init_per_suite(Config) -> _ = application:load(emqx_conf), - %% some testcases (may from other app) already get emqx_connector started - _ = application:stop(emqx_resource), - _ = application:stop(emqx_connector), ok = emqx_common_test_helpers:start_apps( [ emqx_rule_engine, emqx_bridge, + emqx_bridge_mqtt, emqx_dashboard ], fun set_special_configs/1 @@ -152,9 +148,10 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_common_test_helpers:stop_apps([ - emqx_rule_engine, + emqx_dashboard, + emqx_bridge_mqtt, emqx_bridge, - emqx_dashboard + emqx_rule_engine ]), ok. @@ -221,6 +218,12 @@ t_mqtt_conn_bridge_ingress(_) -> request(put, uri(["bridges", BridgeIDIngress]), ServerConf) ), + %% non-shared subscription, verify that only one client is subscribed + ?assertEqual( + 1, + length(emqx:subscribers(<>)) + ), + %% we now test if the bridge works as expected RemoteTopic = <>, LocalTopic = <>, @@ -245,6 +248,48 @@ t_mqtt_conn_bridge_ingress(_) -> ok. +t_mqtt_conn_bridge_ingress_shared_subscription(_) -> + PoolSize = 4, + Ns = lists:seq(1, 10), + BridgeName = atom_to_binary(?FUNCTION_NAME), + BridgeID = create_bridge( + ?SERVER_CONF(<<>>)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => BridgeName, + <<"ingress">> => #{ + <<"pool_size">> => PoolSize, + <<"remote">> => #{ + <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, + <<"qos">> => 1 + }, + <<"local">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"payload">> => <<"${clientid}">>, + <<"retain">> => <<"${retain}">> + } + } + } + ), + + RemoteTopic = <>, + LocalTopic = <>, + ok = emqx:subscribe(LocalTopic), + + _ = emqx_utils:pmap( + fun emqx:publish/1, + [emqx_message:make(RemoteTopic, <<>>) || _ <- Ns] + ), + _ = [assert_mqtt_msg_received(LocalTopic) || _ <- Ns], + + ?assertEqual( + PoolSize, + length(emqx_shared_sub:subscribers(<<"ingress">>, <>)) + ), + + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + ok. + t_mqtt_egress_bridge_ignores_clean_start(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeID = create_bridge( @@ -256,11 +301,17 @@ t_mqtt_egress_bridge_ignores_clean_start(_) -> } ), - {ok, _, #{state := #{name := WorkerName}}} = - emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)), + ResourceID = emqx_bridge_resource:resource_id(BridgeID), + {ok, _Group, #{state := #{egress_pool_name := EgressPoolName}}} = + emqx_resource_manager:lookup_cached(ResourceID), + ClientInfo = ecpool:pick_and_do( + EgressPoolName, + {emqx_bridge_mqtt_egress, info, []}, + no_handover + ), ?assertMatch( #{clean_start := true}, - maps:from_list(emqx_connector_mqtt_worker:info(WorkerName)) + maps:from_list(ClientInfo) ), %% delete the bridge diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index e9f79723a..283c27f31 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.24"}, + {vsn, "0.1.25"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl deleted file mode 100644 index bb8cc00d1..000000000 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ /dev/null @@ -1,319 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- --module(emqx_connector_mqtt). - --include("emqx_connector.hrl"). - --include_lib("typerefl/include/types.hrl"). --include_lib("hocon/include/hoconsc.hrl"). --include_lib("emqx/include/logger.hrl"). - --behaviour(supervisor). --behaviour(emqx_resource). - -%% API and callbacks for supervisor --export([ - callback_mode/0, - start_link/0, - init/1, - create_bridge/1, - drop_bridge/1, - bridges/0 -]). - --export([on_message_received/3]). - -%% callbacks of behaviour emqx_resource --export([ - on_start/2, - on_stop/2, - on_query/3, - on_query_async/4, - on_get_status/2 -]). - --export([on_async_result/2]). - --behaviour(hocon_schema). - --import(hoconsc, [mk/2]). - --export([ - roots/0, - fields/1 -]). - -%%===================================================================== -%% Hocon schema -roots() -> - fields("config"). - -fields("config") -> - emqx_connector_mqtt_schema:fields("config"); -fields("get") -> - [ - {num_of_bridges, - mk( - integer(), - #{desc => ?DESC("num_of_bridges")} - )} - ] ++ fields("post"); -fields("put") -> - emqx_connector_mqtt_schema:fields("server_configs"); -fields("post") -> - [ - {type, - mk( - mqtt, - #{ - required => true, - desc => ?DESC("type") - } - )}, - {name, - mk( - binary(), - #{ - required => true, - desc => ?DESC("name") - } - )} - ] ++ fields("put"). - -%% =================================================================== -%% supervisor APIs -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - SupFlag = #{ - strategy => one_for_one, - intensity => 100, - period => 10 - }, - {ok, {SupFlag, []}}. - -bridge_spec(Config) -> - {Name, NConfig} = maps:take(name, Config), - #{ - id => Name, - start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]}, - restart => temporary, - shutdown => 1000 - }. - --spec bridges() -> [{_Name, _Status}]. -bridges() -> - [ - {Name, emqx_connector_mqtt_worker:status(Name)} - || {Name, _Pid, _, _} <- supervisor:which_children(?MODULE) - ]. - -create_bridge(Config) -> - supervisor:start_child(?MODULE, bridge_spec(Config)). - -drop_bridge(Name) -> - case supervisor:terminate_child(?MODULE, Name) of - ok -> - supervisor:delete_child(?MODULE, Name); - {error, not_found} -> - ok; - {error, Error} -> - {error, Error} - end. - -%% =================================================================== -%% When use this bridge as a data source, ?MODULE:on_message_received will be called -%% if the bridge received msgs from the remote broker. -on_message_received(Msg, HookPoint, ResId) -> - emqx_resource_metrics:received_inc(ResId), - emqx:run_hook(HookPoint, [Msg]). - -%% =================================================================== -callback_mode() -> async_if_possible. - -on_start(InstanceId, Conf) -> - ?SLOG(info, #{ - msg => "starting_mqtt_connector", - connector => InstanceId, - config => emqx_utils:redact(Conf) - }), - BasicConf = basic_config(Conf), - BridgeConf = BasicConf#{ - name => InstanceId, - clientid => clientid(InstanceId, Conf), - subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstanceId), - forwards => make_forward_confs(maps:get(egress, Conf, undefined)) - }, - case ?MODULE:create_bridge(BridgeConf) of - {ok, _Pid} -> - ensure_mqtt_worker_started(InstanceId, BridgeConf); - {error, {already_started, _Pid}} -> - ok = ?MODULE:drop_bridge(InstanceId), - {ok, _} = ?MODULE:create_bridge(BridgeConf), - ensure_mqtt_worker_started(InstanceId, BridgeConf); - {error, Reason} -> - {error, Reason} - end. - -on_stop(_InstId, #{name := InstanceId}) -> - ?SLOG(info, #{ - msg => "stopping_mqtt_connector", - connector => InstanceId - }), - case ?MODULE:drop_bridge(InstanceId) of - ok -> - ok; - {error, not_found} -> - ok; - {error, Reason} -> - ?SLOG(error, #{ - msg => "stop_mqtt_connector_error", - connector => InstanceId, - reason => Reason - }) - end. - -on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> - ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - case emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg) of - ok -> - ok; - {error, Reason} -> - classify_error(Reason) - end. - -on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) -> - ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - Callback = {fun on_async_result/2, [CallbackIn]}, - case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of - ok -> - ok; - {ok, Pid} -> - {ok, Pid}; - {error, Reason} -> - classify_error(Reason) - end. - -on_async_result(Callback, ok) -> - apply_callback_function(Callback, ok); -on_async_result(Callback, {ok, _} = Ok) -> - apply_callback_function(Callback, Ok); -on_async_result(Callback, {error, Reason}) -> - apply_callback_function(Callback, classify_error(Reason)). - -apply_callback_function(F, Result) when is_function(F) -> - erlang:apply(F, [Result]); -apply_callback_function({F, A}, Result) when is_function(F), is_list(A) -> - erlang:apply(F, A ++ [Result]); -apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) -> - erlang:apply(M, F, A ++ [Result]). - -on_get_status(_InstId, #{name := InstanceId}) -> - emqx_connector_mqtt_worker:status(InstanceId). - -classify_error(disconnected = Reason) -> - {error, {recoverable_error, Reason}}; -classify_error({disconnected, _RC, _} = Reason) -> - {error, {recoverable_error, Reason}}; -classify_error({shutdown, _} = Reason) -> - {error, {recoverable_error, Reason}}; -classify_error(shutdown = Reason) -> - {error, {recoverable_error, Reason}}; -classify_error(Reason) -> - {error, {unrecoverable_error, Reason}}. - -ensure_mqtt_worker_started(InstanceId, BridgeConf) -> - case emqx_connector_mqtt_worker:connect(InstanceId) of - {ok, Properties} -> - {ok, #{name => InstanceId, config => BridgeConf, props => Properties}}; - {error, Reason} -> - {error, Reason} - end. - -make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> - undefined; -make_sub_confs(undefined, _Conf, _) -> - undefined; -make_sub_confs(SubRemoteConf, Conf, ResourceId) -> - case maps:find(hookpoint, Conf) of - error -> - error({no_hookpoint_provided, Conf}); - {ok, HookPoint} -> - MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]}, - SubRemoteConf#{on_message_received => MFA} - end. - -make_forward_confs(EmptyMap) when map_size(EmptyMap) == 0 -> - undefined; -make_forward_confs(undefined) -> - undefined; -make_forward_confs(FrowardConf) -> - FrowardConf. - -basic_config( - #{ - server := Server, - proto_ver := ProtoVer, - bridge_mode := BridgeMode, - clean_start := CleanStart, - keepalive := KeepAlive, - retry_interval := RetryIntv, - max_inflight := MaxInflight, - ssl := #{enable := EnableSsl} = Ssl - } = Conf -) -> - BasicConf = #{ - %% connection opts - server => Server, - %% 30s - connect_timeout => 30, - auto_reconnect => true, - proto_ver => ProtoVer, - %% Opening bridge_mode will form a non-standard mqtt connection message. - %% A load balancing server (such as haproxy) is often set up before the emqx broker server. - %% When the load balancing server enables mqtt connection packet inspection, - %% non-standard mqtt connection packets will be filtered out by LB. - %% So let's disable bridge_mode. - bridge_mode => BridgeMode, - keepalive => ms_to_s(KeepAlive), - clean_start => CleanStart, - retry_interval => RetryIntv, - max_inflight => MaxInflight, - ssl => EnableSsl, - ssl_opts => maps:to_list(maps:remove(enable, Ssl)) - }, - maybe_put_fields([username, password], Conf, BasicConf). - -maybe_put_fields(Fields, Conf, Acc0) -> - lists:foldl( - fun(Key, Acc) -> - case maps:find(Key, Conf) of - error -> Acc; - {ok, Val} -> Acc#{Key => Val} - end - end, - Acc0, - Fields - ). - -ms_to_s(Ms) -> - erlang:ceil(Ms / 1000). - -clientid(Id, _Conf = #{clientid_prefix := Prefix = <<_/binary>>}) -> - iolist_to_binary([Prefix, ":", Id, ":", atom_to_list(node())]); -clientid(Id, _Conf) -> - iolist_to_binary([Id, ":", atom_to_list(node())]). diff --git a/apps/emqx_connector/src/emqx_connector_sup.erl b/apps/emqx_connector/src/emqx_connector_sup.erl index 13516813f..21c0f2677 100644 --- a/apps/emqx_connector/src/emqx_connector_sup.erl +++ b/apps/emqx_connector/src/emqx_connector_sup.erl @@ -33,7 +33,6 @@ init([]) -> period => 20 }, ChildSpecs = [ - child_spec(emqx_connector_mqtt), child_spec(emqx_connector_jwt_sup) ], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl deleted file mode 100644 index df1114483..000000000 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ /dev/null @@ -1,168 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_connector_mqtt_msg). - --export([ - to_binary/1, - from_binary/1, - make_pub_vars/2, - to_remote_msg/2, - to_broker_msg/3, - estimate_size/1 -]). - --export([ - replace_vars_in_str/2, - replace_simple_var/2 -]). - --export_type([msg/0]). - --include_lib("emqx/include/emqx.hrl"). - --include_lib("emqtt/include/emqtt.hrl"). - --type msg() :: emqx_types:message(). --type exp_msg() :: emqx_types:message() | #mqtt_msg{}. --type remote_config() :: #{ - topic := binary(), - qos := original | integer(), - retain := original | boolean(), - payload := binary() -}. --type variables() :: #{ - mountpoint := undefined | binary(), - remote := remote_config() -}. - -make_pub_vars(_, undefined) -> - undefined; -make_pub_vars(Mountpoint, Conf) when is_map(Conf) -> - Conf#{mountpoint => Mountpoint}. - -%% @doc Make export format: -%% 1. Mount topic to a prefix -%% 2. Fix QoS to 1 -%% @end -%% Shame that we have to know the callback module here -%% would be great if we can get rid of #mqtt_msg{} record -%% and use #message{} in all places. --spec to_remote_msg(msg() | map(), variables()) -> - exp_msg(). -to_remote_msg(#message{flags = Flags0} = Msg, Vars) -> - Retain0 = maps:get(retain, Flags0, false), - {Columns, _} = emqx_rule_events:eventmsg_publish(Msg), - MapMsg = maps:put(retain, Retain0, Columns), - to_remote_msg(MapMsg, Vars); -to_remote_msg(MapMsg, #{ - remote := #{ - topic := TopicToken, - qos := QoSToken, - retain := RetainToken - } = Remote, - mountpoint := Mountpoint -}) when is_map(MapMsg) -> - Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = process_payload(Remote, MapMsg), - QoS = replace_simple_var(QoSToken, MapMsg), - Retain = replace_simple_var(RetainToken, MapMsg), - PubProps = maps:get(pub_props, MapMsg, #{}), - #mqtt_msg{ - qos = QoS, - retain = Retain, - topic = topic(Mountpoint, Topic), - props = emqx_utils:pub_props_to_packet(PubProps), - payload = Payload - }; -to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> - Msg#message{topic = topic(Mountpoint, Topic)}. - -%% published from remote node over a MQTT connection -to_broker_msg(Msg, Vars, undefined) -> - to_broker_msg(Msg, Vars, #{}); -to_broker_msg( - #{dup := Dup} = MapMsg, - #{ - local := #{ - topic := TopicToken, - qos := QoSToken, - retain := RetainToken - } = Local, - mountpoint := Mountpoint - }, - Props -) -> - Topic = replace_vars_in_str(TopicToken, MapMsg), - Payload = process_payload(Local, MapMsg), - QoS = replace_simple_var(QoSToken, MapMsg), - Retain = replace_simple_var(RetainToken, MapMsg), - PubProps = maps:get(pub_props, MapMsg, #{}), - set_headers( - Props#{properties => emqx_utils:pub_props_to_packet(PubProps)}, - emqx_message:set_flags( - #{dup => Dup, retain => Retain}, - emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload) - ) - ). - -process_payload(From, MapMsg) -> - do_process_payload(maps:get(payload, From, undefined), MapMsg). - -do_process_payload(undefined, Msg) -> - emqx_utils_json:encode(Msg); -do_process_payload(Tks, Msg) -> - replace_vars_in_str(Tks, Msg). - -%% Replace a string contains vars to another string in which the placeholders are replace by the -%% corresponding values. For example, given "a: ${var}", if the var=1, the result string will be: -%% "a: 1". -replace_vars_in_str(Tokens, Data) when is_list(Tokens) -> - emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => full_binary}); -replace_vars_in_str(Val, _Data) -> - Val. - -%% Replace a simple var to its value. For example, given "${var}", if the var=1, then the result -%% value will be an integer 1. -replace_simple_var(Tokens, Data) when is_list(Tokens) -> - [Var] = emqx_plugin_libs_rule:proc_tmpl(Tokens, Data, #{return => rawlist}), - Var; -replace_simple_var(Val, _Data) -> - Val. - -%% @doc Make `binary()' in order to make iodata to be persisted on disk. --spec to_binary(msg()) -> binary(). -to_binary(Msg) -> term_to_binary(Msg). - -%% @doc Unmarshal binary into `msg()'. --spec from_binary(binary()) -> msg(). -from_binary(Bin) -> binary_to_term(Bin). - -%% @doc Estimate the size of a message. -%% Count only the topic length + payload size -%% There is no topic and payload for event message. So count all `Msg` term --spec estimate_size(msg()) -> integer(). -estimate_size(#message{topic = Topic, payload = Payload}) -> - size(Topic) + size(Payload); -estimate_size(#{topic := Topic, payload := Payload}) -> - size(Topic) + size(Payload); -estimate_size(Term) -> - erlang:external_size(Term). - -set_headers(Val, Msg) -> - emqx_message:set_headers(Val, Msg). -topic(undefined, Topic) -> Topic; -topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl deleted file mode 100644 index e49603e51..000000000 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ /dev/null @@ -1,465 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc Bridge works in two layers (1) batching layer (2) transport layer -%% The `bridge' batching layer collects local messages in batches and sends over -%% to remote MQTT node/cluster via `connection' transport layer. -%% In case `REMOTE' is also an EMQX node, `connection' is recommended to be -%% the `gen_rpc' based implementation `emqx_bridge_rpc'. Otherwise `connection' -%% has to be `emqx_connector_mqtt_mod'. -%% -%% ``` -%% +------+ +--------+ -%% | EMQX | | REMOTE | -%% | | | | -%% | (bridge) <==(connection)==> | | -%% | | | | -%% | | | | -%% +------+ +--------+ -%% ''' -%% -%% -%% This module implements 2 kinds of APIs with regards to batching and -%% messaging protocol. (1) A `gen_statem' based local batch collector; -%% (2) APIs for incoming remote batches/messages. -%% -%% Batch collector state diagram -%% -%% [idle] --(0) --> [connecting] --(2)--> [connected] -%% | ^ | -%% | | | -%% '--(1)---'--------(3)------' -%% -%% (0): auto or manual start -%% (1): retry timeout -%% (2): successfully connected to remote node/cluster -%% (3): received {disconnected, Reason} OR -%% failed to send to remote node/cluster. -%% -%% NOTE: A bridge worker may subscribe to multiple (including wildcard) -%% local topics, and the underlying `emqx_bridge_connect' may subscribe to -%% multiple remote topics, however, worker/connections are not designed -%% to support automatic load-balancing, i.e. in case it can not keep up -%% with the amount of messages coming in, administrator should split and -%% balance topics between worker/connections manually. -%% -%% NOTES: -%% * Local messages are all normalised to QoS-1 when exporting to remote - --module(emqx_connector_mqtt_worker). - --include_lib("snabbkaffe/include/snabbkaffe.hrl"). --include_lib("emqx/include/logger.hrl"). - -%% APIs --export([ - start_link/2, - stop/1 -]). - -%% management APIs --export([ - connect/1, - status/1, - ping/1, - info/1, - send_to_remote/2, - send_to_remote_async/3 -]). - --export([handle_publish/3]). --export([handle_disconnect/1]). - --export_type([ - config/0, - ack_ref/0 -]). - --type name() :: term(). -% -type qos() :: emqx_types:qos(). --type config() :: map(). --type ack_ref() :: term(). -% -type topic() :: emqx_types:topic(). - --include_lib("emqx/include/logger.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). - --define(REF(Name), {via, gproc, ?NAME(Name)}). --define(NAME(Name), {n, l, Name}). - -%% @doc Start a bridge worker. Supported configs: -%% mountpoint: The topic mount point for messages sent to remote node/cluster -%% `undefined', `<<>>' or `""' to disable -%% forwards: Local topics to subscribe. -%% -%% Find more connection specific configs in the callback modules -%% of emqx_bridge_connect behaviour. --spec start_link(name(), map()) -> - {ok, pid()} | {error, _Reason}. -start_link(Name, BridgeOpts) -> - ?SLOG(debug, #{ - msg => "client_starting", - name => Name, - options => BridgeOpts - }), - Conf = init_config(Name, BridgeOpts), - Options = mk_client_options(Conf, BridgeOpts), - case emqtt:start_link(Options) of - {ok, Pid} -> - true = gproc:reg_other(?NAME(Name), Pid, Conf), - {ok, Pid}; - {error, Reason} = Error -> - ?SLOG(error, #{ - msg => "client_start_failed", - config => emqx_utils:redact(BridgeOpts), - reason => Reason - }), - Error - end. - -init_config(Name, Opts) -> - Mountpoint = maps:get(forward_mountpoint, Opts, undefined), - Subscriptions = maps:get(subscriptions, Opts, undefined), - Forwards = maps:get(forwards, Opts, undefined), - #{ - mountpoint => format_mountpoint(Mountpoint), - subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts), - forwards => pre_process_forwards(Forwards) - }. - -mk_client_options(Conf, BridgeOpts) -> - Server = iolist_to_binary(maps:get(server, BridgeOpts)), - HostPort = emqx_connector_mqtt_schema:parse_server(Server), - Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined), - Subscriptions = maps:get(subscriptions, Conf), - Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), - CleanStart = - case Subscriptions of - #{remote := _} -> - maps:get(clean_start, BridgeOpts); - undefined -> - %% NOTE - %% We are ignoring the user configuration here because there's currently no reliable way - %% to ensure proper session recovery according to the MQTT spec. - true - end, - Opts = maps:without( - [ - address, - auto_reconnect, - conn_type, - mountpoint, - forwards, - receive_mountpoint, - subscriptions - ], - BridgeOpts - ), - Opts#{ - msg_handler => mk_client_event_handler(Vars, #{server => Server}), - hosts => [HostPort], - clean_start => CleanStart, - force_ping => true, - proto_ver => maps:get(proto_ver, BridgeOpts, v4) - }. - -mk_client_event_handler(Vars, Opts) when Vars /= undefined -> - #{ - publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, - disconnected => {fun ?MODULE:handle_disconnect/1, []} - }; -mk_client_event_handler(undefined, _Opts) -> - undefined. - -connect(Name) -> - #{subscriptions := Subscriptions} = get_config(Name), - case emqtt:connect(get_pid(Name)) of - {ok, Properties} -> - case subscribe_remote_topics(Name, Subscriptions) of - ok -> - {ok, Properties}; - {ok, _, _RCs} -> - {ok, Properties}; - {error, Reason} = Error -> - ?SLOG(error, #{ - msg => "client_subscribe_failed", - subscriptions => Subscriptions, - reason => Reason - }), - Error - end; - {error, Reason} = Error -> - ?SLOG(warning, #{ - msg => "client_connect_failed", - reason => Reason, - name => Name - }), - Error - end. -subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) -> - emqtt:subscribe(ref(Ref), FromTopic, QoS); -subscribe_remote_topics(_Ref, undefined) -> - ok. - -stop(Ref) -> - emqtt:stop(ref(Ref)). - -info(Ref) -> - emqtt:info(ref(Ref)). - -status(Ref) -> - try - case proplists:get_value(socket, info(Ref)) of - Socket when Socket /= undefined -> - connected; - undefined -> - connecting - end - catch - exit:{noproc, _} -> - disconnected - end. - -ping(Ref) -> - emqtt:ping(ref(Ref)). - -send_to_remote(Name, MsgIn) -> - trycall(fun() -> do_send(Name, export_msg(Name, MsgIn)) end). - -do_send(Name, {true, Msg}) -> - case emqtt:publish(get_pid(Name), Msg) of - ok -> - ok; - {ok, #{reason_code := RC}} when - RC =:= ?RC_SUCCESS; - RC =:= ?RC_NO_MATCHING_SUBSCRIBERS - -> - ok; - {ok, #{reason_code := RC, reason_code_name := Reason}} -> - ?SLOG(warning, #{ - msg => "remote_publish_failed", - message => Msg, - reason_code => RC, - reason_code_name => Reason - }), - {error, Reason}; - {error, Reason} -> - ?SLOG(info, #{ - msg => "client_failed", - reason => Reason - }), - {error, Reason} - end; -do_send(_Name, false) -> - ok. - -send_to_remote_async(Name, MsgIn, Callback) -> - trycall(fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end). - -do_send_async(Name, {true, Msg}, Callback) -> - Pid = get_pid(Name), - ok = emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback), - {ok, Pid}; -do_send_async(_Name, false, _Callback) -> - ok. - -ref(Pid) when is_pid(Pid) -> - Pid; -ref(Term) -> - ?REF(Term). - -trycall(Fun) -> - try - Fun() - catch - throw:noproc -> - {error, disconnected}; - exit:{noproc, _} -> - {error, disconnected} - end. - -format_mountpoint(undefined) -> - undefined; -format_mountpoint(Prefix) -> - binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). - -pre_process_subscriptions(undefined, _, _) -> - undefined; -pre_process_subscriptions( - #{remote := RC, local := LC} = Conf, - BridgeName, - BridgeOpts -) when is_map(Conf) -> - Conf#{ - remote => pre_process_in_remote(RC, BridgeName, BridgeOpts), - local => pre_process_in_out_common(LC) - }; -pre_process_subscriptions(Conf, _, _) when is_map(Conf) -> - %% have no 'local' field in the config - undefined. - -pre_process_forwards(undefined) -> - undefined; -pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) -> - Conf#{remote => pre_process_in_out_common(RC)}; -pre_process_forwards(Conf) when is_map(Conf) -> - %% have no 'remote' field in the config - undefined. - -pre_process_in_out_common(Conf0) -> - Conf1 = pre_process_conf(topic, Conf0), - Conf2 = pre_process_conf(qos, Conf1), - Conf3 = pre_process_conf(payload, Conf2), - pre_process_conf(retain, Conf3). - -pre_process_conf(Key, Conf) -> - case maps:find(Key, Conf) of - error -> - Conf; - {ok, Val} when is_binary(Val) -> - Conf#{Key => emqx_plugin_libs_rule:preproc_tmpl(Val)}; - {ok, Val} -> - Conf#{Key => Val} - end. - -pre_process_in_remote(#{qos := QoSIn} = Conf, BridgeName, BridgeOpts) -> - QoS = downgrade_ingress_qos(QoSIn), - case QoS of - QoSIn -> - ok; - _ -> - ?SLOG(warning, #{ - msg => "downgraded_unsupported_ingress_qos", - qos_configured => QoSIn, - qos_used => QoS, - name => BridgeName, - options => BridgeOpts - }) - end, - Conf#{qos => QoS}. - -downgrade_ingress_qos(2) -> - 1; -downgrade_ingress_qos(QoS) -> - QoS. - -get_pid(Name) -> - case gproc:where(?NAME(Name)) of - Pid when is_pid(Pid) -> - Pid; - undefined -> - throw(noproc) - end. - -get_config(Name) -> - try - gproc:lookup_value(?NAME(Name)) - catch - error:badarg -> - throw(noproc) - end. - -export_msg(Name, Msg) -> - case get_config(Name) of - #{forwards := Forwards = #{}, mountpoint := Mountpoint} -> - {true, export_msg(Mountpoint, Forwards, Msg)}; - #{forwards := undefined} -> - ?SLOG(error, #{ - msg => "forwarding_unavailable", - message => Msg, - reason => "egress is not configured" - }), - false - end. - -export_msg(Mountpoint, Forwards, Msg) -> - Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), - emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars). - -%% - -handle_publish(#{properties := Props} = MsgIn, Vars, Opts) -> - Msg = import_msg(MsgIn, Opts), - ?SLOG(debug, #{ - msg => "publish_local", - message => Msg, - vars => Vars - }), - case Vars of - #{on_message_received := {Mod, Func, Args}} -> - _ = erlang:apply(Mod, Func, [Msg | Args]); - _ -> - ok - end, - maybe_publish_local(Msg, Vars, Props). - -handle_disconnect(_Reason) -> - ok. - -maybe_publish_local(Msg, Vars, Props) -> - case emqx_utils_maps:deep_get([local, topic], Vars, undefined) of - %% local topic is not set, discard it - undefined -> - ok; - _ -> - emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)) - end. - -import_msg( - #{ - dup := Dup, - payload := Payload, - properties := Props, - qos := QoS, - retain := Retain, - topic := Topic - }, - #{server := Server} -) -> - #{ - id => emqx_guid:to_hexstr(emqx_guid:gen()), - server => Server, - payload => Payload, - topic => Topic, - qos => QoS, - dup => Dup, - retain => Retain, - pub_props => printable_maps(Props), - message_received_at => erlang:system_time(millisecond) - }. - -printable_maps(undefined) -> - #{}; -printable_maps(Headers) -> - maps:fold( - fun - ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{ - 'User-Property' => maps:from_list(V0), - 'User-Property-Pairs' => [ - #{ - key => Key, - value => Value - } - || {Key, Value} <- V0 - ] - }; - (K, V0, AccIn) -> - AccIn#{K => V0} - end, - #{}, - Headers - ). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 840c6cfec..37d7b1696 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -121,7 +121,8 @@ -export_type([ resource_id/0, - resource_data/0 + resource_data/0, + resource_status/0 ]). -optional_callbacks([ diff --git a/changes/ce/perf-10754.en.md b/changes/ce/perf-10754.en.md new file mode 100644 index 000000000..ef32960be --- /dev/null +++ b/changes/ce/perf-10754.en.md @@ -0,0 +1,3 @@ +The MQTT bridge has been enhanced to utilize connection pooling and leverage available parallelism, substantially improving throughput. + +As a consequence, single MQTT bridge now uses a pool of `clientid`s to connect to the remote broker. diff --git a/mix.exs b/mix.exs index 2e2882e15..dab11693c 100644 --- a/mix.exs +++ b/mix.exs @@ -59,7 +59,7 @@ defmodule EMQXUmbrella.MixProject do {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.10", override: true}, - {:ecpool, github: "emqx/ecpool", tag: "0.5.3", override: true}, + {:ecpool, github: "emqx/ecpool", tag: "0.5.4", override: true}, {:replayq, github: "emqx/replayq", tag: "0.3.7", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, # maybe forbid to fetch quicer @@ -310,6 +310,7 @@ defmodule EMQXUmbrella.MixProject do :emqx_connector, :emqx_exhook, :emqx_bridge, + :emqx_bridge_mqtt, :emqx_modules, :emqx_management, :emqx_retainer, @@ -372,6 +373,7 @@ defmodule EMQXUmbrella.MixProject do emqx_gateway_exproto: :permanent, emqx_exhook: :permanent, emqx_bridge: :permanent, + emqx_bridge_mqtt: :permanent, emqx_rule_engine: :permanent, emqx_modules: :permanent, emqx_management: :permanent, diff --git a/rebar.config b/rebar.config index 81545a7be..6a44b8074 100644 --- a/rebar.config +++ b/rebar.config @@ -66,7 +66,7 @@ , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.10"}}} - , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.3"}}} + , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} diff --git a/rebar.config.erl b/rebar.config.erl index d7ffa37e4..d265f53cd 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -427,6 +427,7 @@ relx_apps(ReleaseType, Edition) -> emqx_gateway_exproto, emqx_exhook, emqx_bridge, + emqx_bridge_mqtt, emqx_rule_engine, emqx_modules, emqx_management, diff --git a/rel/i18n/emqx_connector_mqtt_schema.hocon b/rel/i18n/emqx_bridge_mqtt_connector_schema.hocon similarity index 82% rename from rel/i18n/emqx_connector_mqtt_schema.hocon rename to rel/i18n/emqx_bridge_mqtt_connector_schema.hocon index e37e87e49..7c7bf68c9 100644 --- a/rel/i18n/emqx_connector_mqtt_schema.hocon +++ b/rel/i18n/emqx_bridge_mqtt_connector_schema.hocon @@ -1,4 +1,4 @@ -emqx_connector_mqtt_schema { +emqx_bridge_mqtt_connector_schema { bridge_mode.desc: """If enable bridge mode. @@ -32,6 +32,14 @@ is configured, then both the data got from the rule and the MQTT messages that m egress_desc.label: """Egress Configs""" +egress_pool_size.desc: +"""Size of the pool of MQTT clients that will publish messages to the remote broker.
+Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:egress:${node}:${n}' +where 'n' is the number of a client inside the pool.""" + +egress_pool_size.label: +"""Pool Size""" + egress_local.desc: """The configs about receiving messages from local broker.""" @@ -75,6 +83,17 @@ ingress_desc.desc: ingress_desc.label: """Ingress Configs""" +ingress_pool_size.desc: +"""Size of the pool of MQTT clients that will ingest messages from the remote broker.
+This value will be respected only if 'remote.topic' is a shared subscription topic or topic-filter +(for example `$share/name1/topic1` or `$share/name2/topic2/#`), otherwise only a single MQTT client will be used. +Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:ingress:${node}:${n}' +where 'n' is the number of a client inside the pool. +NOTE: Non-shared subscription will not work well when EMQX is clustered.""" + +ingress_pool_size.label: +"""Pool Size""" + ingress_local.desc: """The configs about sending message to the local broker.""" diff --git a/rel/i18n/emqx_connector_mqtt.hocon b/rel/i18n/emqx_connector_mqtt.hocon deleted file mode 100644 index 80303b825..000000000 --- a/rel/i18n/emqx_connector_mqtt.hocon +++ /dev/null @@ -1,21 +0,0 @@ -emqx_connector_mqtt { - -name.desc: -"""Connector name, used as a human-readable description of the connector.""" - -name.label: -"""Connector Name""" - -num_of_bridges.desc: -"""The current number of bridges that are using this connector.""" - -num_of_bridges.label: -"""Num of Bridges""" - -type.desc: -"""The Connector Type.""" - -type.label: -"""Connector Type""" - -}