diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 67fb5d019..aecb04e03 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -221,6 +221,12 @@ t_mqtt_conn_bridge_ingress(_) -> request(put, uri(["bridges", BridgeIDIngress]), ServerConf) ), + %% non-shared subscription, verify that only one client is subscribed + ?assertEqual( + 1, + length(emqx:subscribers(<>)) + ), + %% we now test if the bridge works as expected RemoteTopic = <>, LocalTopic = <>, @@ -245,6 +251,48 @@ t_mqtt_conn_bridge_ingress(_) -> ok. +t_mqtt_conn_bridge_ingress_shared_subscription(_) -> + PoolSize = 4, + Ns = lists:seq(1, 10), + BridgeName = atom_to_binary(?FUNCTION_NAME), + BridgeID = create_bridge( + ?SERVER_CONF(<<>>)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => BridgeName, + <<"pool_size">> => PoolSize, + <<"ingress">> => #{ + <<"remote">> => #{ + <<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>, + <<"qos">> => 1 + }, + <<"local">> => #{ + <<"topic">> => <>, + <<"qos">> => <<"${qos}">>, + <<"payload">> => <<"${clientid}">>, + <<"retain">> => <<"${retain}">> + } + } + } + ), + + RemoteTopic = <>, + LocalTopic = <>, + ok = emqx:subscribe(LocalTopic), + + _ = emqx_utils:pmap( + fun emqx:publish/1, + [emqx_message:make(RemoteTopic, <<>>) || _ <- Ns] + ), + _ = [assert_mqtt_msg_received(LocalTopic) || _ <- Ns], + + ?assertEqual( + PoolSize, + length(emqx_shared_sub:subscribers(<<"ingress">>, <>)) + ), + + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), + ok. + t_mqtt_egress_bridge_ignores_clean_start(_) -> BridgeName = atom_to_binary(?FUNCTION_NAME), BridgeID = create_bridge( diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 30791afe3..09228254a 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -54,8 +54,8 @@ on_start(ResourceId, Conf) -> }), BasicOpts = mk_worker_opts(ResourceId, Conf), BridgeOpts = BasicOpts#{ - subscriptions => make_sub_confs(maps:get(ingress, Conf, #{}), Conf, ResourceId), - forwards => maps:get(egress, Conf, #{}) + ingress => mk_ingress_config(maps:get(ingress, Conf, #{}), Conf, ResourceId), + egress => maps:get(egress, Conf, #{}) }, {ok, ClientOpts, WorkerConf} = emqx_connector_mqtt_worker:init(ResourceId, BridgeOpts), case emqx_resource_pool:start(ResourceId, emqx_connector_mqtt_worker, ClientOpts) of @@ -165,12 +165,12 @@ combine_status(Statuses) -> disconnected end. -make_sub_confs(Subscriptions, _Conf, _) when map_size(Subscriptions) == 0 -> - Subscriptions; -make_sub_confs(Subscriptions, #{hookpoint := HookPoint}, ResourceId) -> +mk_ingress_config(Ingress, _Conf, _) when map_size(Ingress) == 0 -> + Ingress; +mk_ingress_config(Ingress, #{hookpoint := HookPoint}, ResourceId) -> MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]}, - Subscriptions#{on_message_received => MFA}; -make_sub_confs(_SubRemoteConf, Conf, ResourceId) -> + Ingress#{on_message_received => MFA}; +mk_ingress_config(_Ingress, Conf, ResourceId) -> error({no_hookpoint_provided, ResourceId, Conf}). mk_worker_opts( diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 7e33a55ca..ede477602 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -60,21 +60,18 @@ ssl := boolean(), ssl_opts := proplists:proplist(), % bridge options - subscriptions := map(), - forwards := map() + ingress := map(), + egress := map() }. -type client_option() :: emqtt:option() | {name, name()} - | {subscriptions, subscriptions() | undefined}. + | {ingress, ingress() | undefined}. --type config() :: #{ - subscriptions := subscriptions() | undefined, - forwards := forwards() | undefined -}. +-type config() :: egress() | undefined. --type subscriptions() :: #{ +-type ingress() :: #{ remote := #{ topic := emqx_topic:topic(), qos => emqx_types:qos() @@ -83,7 +80,7 @@ on_message_received := {module(), atom(), [term()]} }. --type forwards() :: #{ +-type egress() :: #{ local => #{ topic => emqx_topic:topic() }, @@ -103,13 +100,10 @@ -spec init(name(), options()) -> {ok, [client_option()], config()}. init(Name, BridgeOpts) -> - Config = init_config(Name, BridgeOpts), - ClientOpts0 = mk_client_options(Config, BridgeOpts), - ClientOpts = ClientOpts0#{ - name => Name, - subscriptions => maps:get(subscriptions, Config) - }, - {ok, maps:to_list(ClientOpts), Config}. + Ingress = pre_process_ingress(maps:get(ingress, BridgeOpts), Name, BridgeOpts), + Egress = pre_process_egress(maps:get(egress, BridgeOpts)), + ClientOpts = mk_client_options(Name, Ingress, BridgeOpts), + {ok, maps:to_list(ClientOpts), Egress}. %% @doc Start a bridge worker. -spec connect([client_option() | {ecpool_worker_id, pos_integer()}]) -> @@ -134,7 +128,7 @@ connect(ClientOpts0) -> end. mk_emqtt_opts(WorkerId, ClientOpts) -> - {_, ClientId} = lists:keyfind(clientid, 1, ClientOpts), + ClientId = proplists:get_value(clientid, ClientOpts), lists:keystore(clientid, 1, ClientOpts, {clientid, mk_clientid(WorkerId, ClientId)}). mk_clientid(WorkerId, ClientId) -> @@ -143,10 +137,8 @@ mk_clientid(WorkerId, ClientId) -> connect(Pid, Name, WorkerId, ClientOpts) -> case emqtt:connect(Pid) of {ok, _Props} -> - % NOTE - % Subscribe to remote topics only when the first worker is started. - Subscriptions = proplists:get_value(subscriptions, ClientOpts), - case WorkerId =:= 1 andalso subscribe_remote_topics(Pid, Subscriptions) of + Ingress = proplists:get_value(ingress, ClientOpts), + case subscribe_remote_topic(Pid, WorkerId, Ingress) of false -> {ok, Pid}; {ok, _, _RCs} -> @@ -154,7 +146,7 @@ connect(Pid, Name, WorkerId, ClientOpts) -> {error, Reason} = Error -> ?SLOG(error, #{ msg => "client_subscribe_failed", - subscriptions => Subscriptions, + ingress => Ingress, reason => Reason }), _ = catch emqtt:stop(Pid), @@ -170,25 +162,25 @@ connect(Pid, Name, WorkerId, ClientOpts) -> Error end. -subscribe_remote_topics(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) -> - emqtt:subscribe(Pid, RemoteTopic, QoS); -subscribe_remote_topics(_Ref, undefined) -> +subscribe_remote_topic(Pid, WorkerId, #{remote := #{topic := RemoteTopic, qos := QoS}}) -> + case emqx_topic:parse(RemoteTopic) of + {_Filter, #{share := _Name}} -> + % NOTE: this is shared subscription, each worker may subscribe + emqtt:subscribe(Pid, RemoteTopic, QoS); + {_Filter, #{}} when WorkerId =:= 1 -> + % NOTE: this is regular subscription, only the first worker should subscribe + emqtt:subscribe(Pid, RemoteTopic, QoS); + {_Filter, #{}} -> + false + end; +subscribe_remote_topic(_Ref, _, undefined) -> false. -init_config(Name, Opts) -> - Subscriptions = maps:get(subscriptions, Opts, undefined), - Forwards = maps:get(forwards, Opts, undefined), - #{ - subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts), - forwards => pre_process_forwards(Forwards) - }. - -mk_client_options(Config, BridgeOpts) -> +mk_client_options(Name, Ingress, BridgeOpts) -> Server = iolist_to_binary(maps:get(server, BridgeOpts)), HostPort = emqx_connector_mqtt_schema:parse_server(Server), - Subscriptions = maps:get(subscriptions, Config), CleanStart = - case Subscriptions of + case Ingress of #{remote := _} -> maps:get(clean_start, BridgeOpts); undefined -> @@ -214,16 +206,18 @@ mk_client_options(Config, BridgeOpts) -> BridgeOpts ), Opts#{ - msg_handler => mk_client_event_handler(Subscriptions, #{server => Server}), + name => Name, + ingress => Ingress, + msg_handler => mk_client_event_handler(Ingress, #{server => Server}), hosts => [HostPort], clean_start => CleanStart, force_ping => true }. -mk_client_event_handler(Subscriptions = #{}, Opts) -> - OnMessage = maps:get(on_message_received, Subscriptions, undefined), +mk_client_event_handler(Ingress = #{}, Opts) -> + OnMessage = maps:get(on_message_received, Ingress, undefined), LocalPublish = - case Subscriptions of + case Ingress of #{local := Local = #{topic := _}} -> Local; #{} -> @@ -275,26 +269,26 @@ do_send_async(Pid, Msg, Callback) when Msg /= undefined -> do_send_async(_Pid, undefined, _Callback) -> ok. -pre_process_subscriptions( +pre_process_ingress( #{remote := RC, local := LC} = Conf, BridgeName, BridgeOpts ) when is_map(Conf) -> Conf#{ remote => pre_process_in_remote(RC, BridgeName, BridgeOpts), - local => pre_process_in_out_common(LC) + local => pre_process_common(LC) }; -pre_process_subscriptions(Conf, _, _) when is_map(Conf) -> +pre_process_ingress(Conf, _, _) when is_map(Conf) -> %% have no 'local' field in the config undefined. -pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) -> - Conf#{remote => pre_process_in_out_common(RC)}; -pre_process_forwards(Conf) when is_map(Conf) -> +pre_process_egress(#{remote := RC} = Conf) when is_map(Conf) -> + Conf#{remote => pre_process_common(RC)}; +pre_process_egress(Conf) when is_map(Conf) -> %% have no 'remote' field in the config undefined. -pre_process_in_out_common(Conf0) -> +pre_process_common(Conf0) -> Conf1 = pre_process_conf(topic, Conf0), Conf2 = pre_process_conf(qos, Conf1), Conf3 = pre_process_conf(payload, Conf2), @@ -331,9 +325,9 @@ downgrade_ingress_qos(2) -> downgrade_ingress_qos(QoS) -> QoS. -export_msg(Msg, #{forwards := #{remote := Remote}}) -> +export_msg(Msg, #{remote := Remote}) -> to_remote_msg(Msg, Remote); -export_msg(Msg, #{forwards := undefined}) -> +export_msg(Msg, undefined) -> ?SLOG(error, #{ msg => "forwarding_unavailable", message => Msg,