feat(mqttconn): subscribe each worker if shared subcription

Also rename `subscriptions` -> `ingress` and `forwards` -> `egress` for
consistency with the config schema.
This commit is contained in:
Andrew Mayorov 2023-05-29 13:57:55 +03:00
parent 81e78516aa
commit 4e6269bedb
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 98 additions and 56 deletions

View File

@ -221,6 +221,12 @@ t_mqtt_conn_bridge_ingress(_) ->
request(put, uri(["bridges", BridgeIDIngress]), ServerConf)
),
%% non-shared subscription, verify that only one client is subscribed
?assertEqual(
1,
length(emqx:subscribers(<<?INGRESS_REMOTE_TOPIC, "/#">>))
),
%% we now test if the bridge works as expected
RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
@ -245,6 +251,48 @@ t_mqtt_conn_bridge_ingress(_) ->
ok.
t_mqtt_conn_bridge_ingress_shared_subscription(_) ->
PoolSize = 4,
Ns = lists:seq(1, 10),
BridgeName = atom_to_binary(?FUNCTION_NAME),
BridgeID = create_bridge(
?SERVER_CONF(<<>>)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => BridgeName,
<<"pool_size">> => PoolSize,
<<"ingress">> => #{
<<"remote">> => #{
<<"topic">> => <<"$share/ingress/", ?INGRESS_REMOTE_TOPIC, "/#">>,
<<"qos">> => 1
},
<<"local">> => #{
<<"topic">> => <<?INGRESS_LOCAL_TOPIC, "/${topic}">>,
<<"qos">> => <<"${qos}">>,
<<"payload">> => <<"${clientid}">>,
<<"retain">> => <<"${retain}">>
}
}
}
),
RemoteTopic = <<?INGRESS_REMOTE_TOPIC, "/1">>,
LocalTopic = <<?INGRESS_LOCAL_TOPIC, "/", RemoteTopic/binary>>,
ok = emqx:subscribe(LocalTopic),
_ = emqx_utils:pmap(
fun emqx:publish/1,
[emqx_message:make(RemoteTopic, <<>>) || _ <- Ns]
),
_ = [assert_mqtt_msg_received(LocalTopic) || _ <- Ns],
?assertEqual(
PoolSize,
length(emqx_shared_sub:subscribers(<<"ingress">>, <<?INGRESS_REMOTE_TOPIC, "/#">>))
),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
ok.
t_mqtt_egress_bridge_ignores_clean_start(_) ->
BridgeName = atom_to_binary(?FUNCTION_NAME),
BridgeID = create_bridge(

View File

@ -54,8 +54,8 @@ on_start(ResourceId, Conf) ->
}),
BasicOpts = mk_worker_opts(ResourceId, Conf),
BridgeOpts = BasicOpts#{
subscriptions => make_sub_confs(maps:get(ingress, Conf, #{}), Conf, ResourceId),
forwards => maps:get(egress, Conf, #{})
ingress => mk_ingress_config(maps:get(ingress, Conf, #{}), Conf, ResourceId),
egress => maps:get(egress, Conf, #{})
},
{ok, ClientOpts, WorkerConf} = emqx_connector_mqtt_worker:init(ResourceId, BridgeOpts),
case emqx_resource_pool:start(ResourceId, emqx_connector_mqtt_worker, ClientOpts) of
@ -165,12 +165,12 @@ combine_status(Statuses) ->
disconnected
end.
make_sub_confs(Subscriptions, _Conf, _) when map_size(Subscriptions) == 0 ->
Subscriptions;
make_sub_confs(Subscriptions, #{hookpoint := HookPoint}, ResourceId) ->
mk_ingress_config(Ingress, _Conf, _) when map_size(Ingress) == 0 ->
Ingress;
mk_ingress_config(Ingress, #{hookpoint := HookPoint}, ResourceId) ->
MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]},
Subscriptions#{on_message_received => MFA};
make_sub_confs(_SubRemoteConf, Conf, ResourceId) ->
Ingress#{on_message_received => MFA};
mk_ingress_config(_Ingress, Conf, ResourceId) ->
error({no_hookpoint_provided, ResourceId, Conf}).
mk_worker_opts(

View File

@ -60,21 +60,18 @@
ssl := boolean(),
ssl_opts := proplists:proplist(),
% bridge options
subscriptions := map(),
forwards := map()
ingress := map(),
egress := map()
}.
-type client_option() ::
emqtt:option()
| {name, name()}
| {subscriptions, subscriptions() | undefined}.
| {ingress, ingress() | undefined}.
-type config() :: #{
subscriptions := subscriptions() | undefined,
forwards := forwards() | undefined
}.
-type config() :: egress() | undefined.
-type subscriptions() :: #{
-type ingress() :: #{
remote := #{
topic := emqx_topic:topic(),
qos => emqx_types:qos()
@ -83,7 +80,7 @@
on_message_received := {module(), atom(), [term()]}
}.
-type forwards() :: #{
-type egress() :: #{
local => #{
topic => emqx_topic:topic()
},
@ -103,13 +100,10 @@
-spec init(name(), options()) ->
{ok, [client_option()], config()}.
init(Name, BridgeOpts) ->
Config = init_config(Name, BridgeOpts),
ClientOpts0 = mk_client_options(Config, BridgeOpts),
ClientOpts = ClientOpts0#{
name => Name,
subscriptions => maps:get(subscriptions, Config)
},
{ok, maps:to_list(ClientOpts), Config}.
Ingress = pre_process_ingress(maps:get(ingress, BridgeOpts), Name, BridgeOpts),
Egress = pre_process_egress(maps:get(egress, BridgeOpts)),
ClientOpts = mk_client_options(Name, Ingress, BridgeOpts),
{ok, maps:to_list(ClientOpts), Egress}.
%% @doc Start a bridge worker.
-spec connect([client_option() | {ecpool_worker_id, pos_integer()}]) ->
@ -134,7 +128,7 @@ connect(ClientOpts0) ->
end.
mk_emqtt_opts(WorkerId, ClientOpts) ->
{_, ClientId} = lists:keyfind(clientid, 1, ClientOpts),
ClientId = proplists:get_value(clientid, ClientOpts),
lists:keystore(clientid, 1, ClientOpts, {clientid, mk_clientid(WorkerId, ClientId)}).
mk_clientid(WorkerId, ClientId) ->
@ -143,10 +137,8 @@ mk_clientid(WorkerId, ClientId) ->
connect(Pid, Name, WorkerId, ClientOpts) ->
case emqtt:connect(Pid) of
{ok, _Props} ->
% NOTE
% Subscribe to remote topics only when the first worker is started.
Subscriptions = proplists:get_value(subscriptions, ClientOpts),
case WorkerId =:= 1 andalso subscribe_remote_topics(Pid, Subscriptions) of
Ingress = proplists:get_value(ingress, ClientOpts),
case subscribe_remote_topic(Pid, WorkerId, Ingress) of
false ->
{ok, Pid};
{ok, _, _RCs} ->
@ -154,7 +146,7 @@ connect(Pid, Name, WorkerId, ClientOpts) ->
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_subscribe_failed",
subscriptions => Subscriptions,
ingress => Ingress,
reason => Reason
}),
_ = catch emqtt:stop(Pid),
@ -170,25 +162,25 @@ connect(Pid, Name, WorkerId, ClientOpts) ->
Error
end.
subscribe_remote_topics(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) ->
emqtt:subscribe(Pid, RemoteTopic, QoS);
subscribe_remote_topics(_Ref, undefined) ->
subscribe_remote_topic(Pid, WorkerId, #{remote := #{topic := RemoteTopic, qos := QoS}}) ->
case emqx_topic:parse(RemoteTopic) of
{_Filter, #{share := _Name}} ->
% NOTE: this is shared subscription, each worker may subscribe
emqtt:subscribe(Pid, RemoteTopic, QoS);
{_Filter, #{}} when WorkerId =:= 1 ->
% NOTE: this is regular subscription, only the first worker should subscribe
emqtt:subscribe(Pid, RemoteTopic, QoS);
{_Filter, #{}} ->
false
end;
subscribe_remote_topic(_Ref, _, undefined) ->
false.
init_config(Name, Opts) ->
Subscriptions = maps:get(subscriptions, Opts, undefined),
Forwards = maps:get(forwards, Opts, undefined),
#{
subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts),
forwards => pre_process_forwards(Forwards)
}.
mk_client_options(Config, BridgeOpts) ->
mk_client_options(Name, Ingress, BridgeOpts) ->
Server = iolist_to_binary(maps:get(server, BridgeOpts)),
HostPort = emqx_connector_mqtt_schema:parse_server(Server),
Subscriptions = maps:get(subscriptions, Config),
CleanStart =
case Subscriptions of
case Ingress of
#{remote := _} ->
maps:get(clean_start, BridgeOpts);
undefined ->
@ -214,16 +206,18 @@ mk_client_options(Config, BridgeOpts) ->
BridgeOpts
),
Opts#{
msg_handler => mk_client_event_handler(Subscriptions, #{server => Server}),
name => Name,
ingress => Ingress,
msg_handler => mk_client_event_handler(Ingress, #{server => Server}),
hosts => [HostPort],
clean_start => CleanStart,
force_ping => true
}.
mk_client_event_handler(Subscriptions = #{}, Opts) ->
OnMessage = maps:get(on_message_received, Subscriptions, undefined),
mk_client_event_handler(Ingress = #{}, Opts) ->
OnMessage = maps:get(on_message_received, Ingress, undefined),
LocalPublish =
case Subscriptions of
case Ingress of
#{local := Local = #{topic := _}} ->
Local;
#{} ->
@ -275,26 +269,26 @@ do_send_async(Pid, Msg, Callback) when Msg /= undefined ->
do_send_async(_Pid, undefined, _Callback) ->
ok.
pre_process_subscriptions(
pre_process_ingress(
#{remote := RC, local := LC} = Conf,
BridgeName,
BridgeOpts
) when is_map(Conf) ->
Conf#{
remote => pre_process_in_remote(RC, BridgeName, BridgeOpts),
local => pre_process_in_out_common(LC)
local => pre_process_common(LC)
};
pre_process_subscriptions(Conf, _, _) when is_map(Conf) ->
pre_process_ingress(Conf, _, _) when is_map(Conf) ->
%% have no 'local' field in the config
undefined.
pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) ->
Conf#{remote => pre_process_in_out_common(RC)};
pre_process_forwards(Conf) when is_map(Conf) ->
pre_process_egress(#{remote := RC} = Conf) when is_map(Conf) ->
Conf#{remote => pre_process_common(RC)};
pre_process_egress(Conf) when is_map(Conf) ->
%% have no 'remote' field in the config
undefined.
pre_process_in_out_common(Conf0) ->
pre_process_common(Conf0) ->
Conf1 = pre_process_conf(topic, Conf0),
Conf2 = pre_process_conf(qos, Conf1),
Conf3 = pre_process_conf(payload, Conf2),
@ -331,9 +325,9 @@ downgrade_ingress_qos(2) ->
downgrade_ingress_qos(QoS) ->
QoS.
export_msg(Msg, #{forwards := #{remote := Remote}}) ->
export_msg(Msg, #{remote := Remote}) ->
to_remote_msg(Msg, Remote);
export_msg(Msg, #{forwards := undefined}) ->
export_msg(Msg, undefined) ->
?SLOG(error, #{
msg => "forwarding_unavailable",
message => Msg,