diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl index 13be52a0c..60a512011 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_connector.erl @@ -76,7 +76,7 @@ start_ingress(ResourceId, Conf) -> start_ingress(ResourceId, Ingress, ClientOpts) -> PoolName = <>, - PoolSize = choose_ingress_pool_size(Ingress), + PoolSize = choose_ingress_pool_size(ResourceId, Ingress), Options = [ {name, PoolName}, {pool_size, PoolSize}, @@ -90,7 +90,10 @@ start_ingress(ResourceId, Ingress, ClientOpts) -> {error, Reason} end. -choose_ingress_pool_size(#{remote := #{topic := RemoteTopic}, pool_size := PoolSize}) -> +choose_ingress_pool_size( + ResourceId, + #{remote := #{topic := RemoteTopic}, pool_size := PoolSize} +) -> case emqx_topic:parse(RemoteTopic) of {_Filter, #{share := _Name}} -> % NOTE: this is shared subscription, many workers may subscribe @@ -98,7 +101,8 @@ choose_ingress_pool_size(#{remote := #{topic := RemoteTopic}, pool_size := PoolS {_Filter, #{}} -> % NOTE: this is regular subscription, only one worker should subscribe ?SLOG(warning, #{ - msg => "ingress_pool_size_ignored", + msg => "mqtt_bridge_ingress_pool_size_ignored", + connector => ResourceId, reason => "Remote topic filter is not a shared subscription, " "ingress pool will start with a single worker", diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl index 78bbf7753..91ec27e74 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_ingress.erl @@ -29,7 +29,7 @@ info/1 ]). --export([handle_publish/4]). +-export([handle_publish/5]). -export([handle_disconnect/1]). -type name() :: term(). @@ -62,7 +62,7 @@ connect(Options) -> WorkerId = proplists:get_value(ecpool_worker_id, Options), Ingress = config(proplists:get_value(ingress, Options), Name), ClientOpts = proplists:get_value(client_opts, Options), - case emqtt:start_link(mk_client_opts(WorkerId, Ingress, ClientOpts)) of + case emqtt:start_link(mk_client_opts(Name, WorkerId, Ingress, ClientOpts)) of {ok, Pid} -> connect(Pid, Name, Ingress); {error, Reason} = Error -> @@ -74,16 +74,16 @@ connect(Options) -> Error end. -mk_client_opts(WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) -> +mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) -> ClientOpts#{ clientid := mk_clientid(WorkerId, ClientId), - msg_handler => mk_client_event_handler(Ingress) + msg_handler => mk_client_event_handler(Name, Ingress) }. mk_clientid(WorkerId, ClientId) -> iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]). -mk_client_event_handler(Ingress = #{}) -> +mk_client_event_handler(Name, Ingress = #{}) -> IngressVars = maps:with([server], Ingress), OnMessage = maps:get(on_message_received, Ingress, undefined), LocalPublish = @@ -94,7 +94,7 @@ mk_client_event_handler(Ingress = #{}) -> undefined end, #{ - publish => {fun ?MODULE:handle_publish/4, [OnMessage, LocalPublish, IngressVars]}, + publish => {fun ?MODULE:handle_publish/5, [Name, OnMessage, LocalPublish, IngressVars]}, disconnected => {fun ?MODULE:handle_disconnect/1, []} }. @@ -110,6 +110,7 @@ connect(Pid, Name, Ingress) -> ?SLOG(error, #{ msg => "ingress_client_subscribe_failed", ingress => Ingress, + name => Name, reason => Reason }), _ = catch emqtt:stop(Pid), @@ -182,11 +183,12 @@ status(Pid) -> %% -handle_publish(#{properties := Props} = MsgIn, OnMessage, LocalPublish, IngressVars) -> +handle_publish(#{properties := Props} = MsgIn, Name, OnMessage, LocalPublish, IngressVars) -> Msg = import_msg(MsgIn, IngressVars), ?SLOG(debug, #{ - msg => "publish_local", - message => Msg + msg => "ingress_publish_local", + message => Msg, + name => Name }), maybe_on_message_received(Msg, OnMessage), maybe_publish_local(Msg, LocalPublish, Props). diff --git a/rel/i18n/emqx_bridge_mqtt_connector_schema.hocon b/rel/i18n/emqx_bridge_mqtt_connector_schema.hocon index ed7a59dcd..7c7bf68c9 100644 --- a/rel/i18n/emqx_bridge_mqtt_connector_schema.hocon +++ b/rel/i18n/emqx_bridge_mqtt_connector_schema.hocon @@ -34,8 +34,8 @@ egress_desc.label: egress_pool_size.desc: """Size of the pool of MQTT clients that will publish messages to the remote broker.
- Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:egress:${node}:${n}' - where 'n' is the number of a client inside the pool.""" +Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:egress:${node}:${n}' +where 'n' is the number of a client inside the pool.""" egress_pool_size.label: """Pool Size""" @@ -85,10 +85,11 @@ ingress_desc.label: ingress_pool_size.desc: """Size of the pool of MQTT clients that will ingest messages from the remote broker.
- This value will be respected only if 'remote.topic' is a shared subscription topic filter, - otherwise only a single MQTT client will be used. - Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:ingress:${node}:${n}' - where 'n' is the number of a client inside the pool.""" +This value will be respected only if 'remote.topic' is a shared subscription topic or topic-filter +(for example `$share/name1/topic1` or `$share/name2/topic2/#`), otherwise only a single MQTT client will be used. +Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:ingress:${node}:${n}' +where 'n' is the number of a client inside the pool. +NOTE: Non-shared subscription will not work well when EMQX is clustered.""" ingress_pool_size.label: """Pool Size"""