fix(mqttbridge): clarify schema descriptions + log messages
Co-authored-by: Zaiming (Stone) Shi <zmstone@gmail.com>
This commit is contained in:
parent
95cc9b9b72
commit
26819a647c
|
@ -76,7 +76,7 @@ start_ingress(ResourceId, Conf) ->
|
||||||
|
|
||||||
start_ingress(ResourceId, Ingress, ClientOpts) ->
|
start_ingress(ResourceId, Ingress, ClientOpts) ->
|
||||||
PoolName = <<ResourceId/binary, ":ingress">>,
|
PoolName = <<ResourceId/binary, ":ingress">>,
|
||||||
PoolSize = choose_ingress_pool_size(Ingress),
|
PoolSize = choose_ingress_pool_size(ResourceId, Ingress),
|
||||||
Options = [
|
Options = [
|
||||||
{name, PoolName},
|
{name, PoolName},
|
||||||
{pool_size, PoolSize},
|
{pool_size, PoolSize},
|
||||||
|
@ -90,7 +90,10 @@ start_ingress(ResourceId, Ingress, ClientOpts) ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
choose_ingress_pool_size(#{remote := #{topic := RemoteTopic}, pool_size := PoolSize}) ->
|
choose_ingress_pool_size(
|
||||||
|
ResourceId,
|
||||||
|
#{remote := #{topic := RemoteTopic}, pool_size := PoolSize}
|
||||||
|
) ->
|
||||||
case emqx_topic:parse(RemoteTopic) of
|
case emqx_topic:parse(RemoteTopic) of
|
||||||
{_Filter, #{share := _Name}} ->
|
{_Filter, #{share := _Name}} ->
|
||||||
% NOTE: this is shared subscription, many workers may subscribe
|
% NOTE: this is shared subscription, many workers may subscribe
|
||||||
|
@ -98,7 +101,8 @@ choose_ingress_pool_size(#{remote := #{topic := RemoteTopic}, pool_size := PoolS
|
||||||
{_Filter, #{}} ->
|
{_Filter, #{}} ->
|
||||||
% NOTE: this is regular subscription, only one worker should subscribe
|
% NOTE: this is regular subscription, only one worker should subscribe
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "ingress_pool_size_ignored",
|
msg => "mqtt_bridge_ingress_pool_size_ignored",
|
||||||
|
connector => ResourceId,
|
||||||
reason =>
|
reason =>
|
||||||
"Remote topic filter is not a shared subscription, "
|
"Remote topic filter is not a shared subscription, "
|
||||||
"ingress pool will start with a single worker",
|
"ingress pool will start with a single worker",
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
info/1
|
info/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([handle_publish/4]).
|
-export([handle_publish/5]).
|
||||||
-export([handle_disconnect/1]).
|
-export([handle_disconnect/1]).
|
||||||
|
|
||||||
-type name() :: term().
|
-type name() :: term().
|
||||||
|
@ -62,7 +62,7 @@ connect(Options) ->
|
||||||
WorkerId = proplists:get_value(ecpool_worker_id, Options),
|
WorkerId = proplists:get_value(ecpool_worker_id, Options),
|
||||||
Ingress = config(proplists:get_value(ingress, Options), Name),
|
Ingress = config(proplists:get_value(ingress, Options), Name),
|
||||||
ClientOpts = proplists:get_value(client_opts, Options),
|
ClientOpts = proplists:get_value(client_opts, Options),
|
||||||
case emqtt:start_link(mk_client_opts(WorkerId, Ingress, ClientOpts)) of
|
case emqtt:start_link(mk_client_opts(Name, WorkerId, Ingress, ClientOpts)) of
|
||||||
{ok, Pid} ->
|
{ok, Pid} ->
|
||||||
connect(Pid, Name, Ingress);
|
connect(Pid, Name, Ingress);
|
||||||
{error, Reason} = Error ->
|
{error, Reason} = Error ->
|
||||||
|
@ -74,16 +74,16 @@ connect(Options) ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
mk_client_opts(WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) ->
|
mk_client_opts(Name, WorkerId, Ingress, ClientOpts = #{clientid := ClientId}) ->
|
||||||
ClientOpts#{
|
ClientOpts#{
|
||||||
clientid := mk_clientid(WorkerId, ClientId),
|
clientid := mk_clientid(WorkerId, ClientId),
|
||||||
msg_handler => mk_client_event_handler(Ingress)
|
msg_handler => mk_client_event_handler(Name, Ingress)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
mk_clientid(WorkerId, ClientId) ->
|
mk_clientid(WorkerId, ClientId) ->
|
||||||
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
|
iolist_to_binary([ClientId, $: | integer_to_list(WorkerId)]).
|
||||||
|
|
||||||
mk_client_event_handler(Ingress = #{}) ->
|
mk_client_event_handler(Name, Ingress = #{}) ->
|
||||||
IngressVars = maps:with([server], Ingress),
|
IngressVars = maps:with([server], Ingress),
|
||||||
OnMessage = maps:get(on_message_received, Ingress, undefined),
|
OnMessage = maps:get(on_message_received, Ingress, undefined),
|
||||||
LocalPublish =
|
LocalPublish =
|
||||||
|
@ -94,7 +94,7 @@ mk_client_event_handler(Ingress = #{}) ->
|
||||||
undefined
|
undefined
|
||||||
end,
|
end,
|
||||||
#{
|
#{
|
||||||
publish => {fun ?MODULE:handle_publish/4, [OnMessage, LocalPublish, IngressVars]},
|
publish => {fun ?MODULE:handle_publish/5, [Name, OnMessage, LocalPublish, IngressVars]},
|
||||||
disconnected => {fun ?MODULE:handle_disconnect/1, []}
|
disconnected => {fun ?MODULE:handle_disconnect/1, []}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
@ -110,6 +110,7 @@ connect(Pid, Name, Ingress) ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "ingress_client_subscribe_failed",
|
msg => "ingress_client_subscribe_failed",
|
||||||
ingress => Ingress,
|
ingress => Ingress,
|
||||||
|
name => Name,
|
||||||
reason => Reason
|
reason => Reason
|
||||||
}),
|
}),
|
||||||
_ = catch emqtt:stop(Pid),
|
_ = catch emqtt:stop(Pid),
|
||||||
|
@ -182,11 +183,12 @@ status(Pid) ->
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
handle_publish(#{properties := Props} = MsgIn, OnMessage, LocalPublish, IngressVars) ->
|
handle_publish(#{properties := Props} = MsgIn, Name, OnMessage, LocalPublish, IngressVars) ->
|
||||||
Msg = import_msg(MsgIn, IngressVars),
|
Msg = import_msg(MsgIn, IngressVars),
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{
|
||||||
msg => "publish_local",
|
msg => "ingress_publish_local",
|
||||||
message => Msg
|
message => Msg,
|
||||||
|
name => Name
|
||||||
}),
|
}),
|
||||||
maybe_on_message_received(Msg, OnMessage),
|
maybe_on_message_received(Msg, OnMessage),
|
||||||
maybe_publish_local(Msg, LocalPublish, Props).
|
maybe_publish_local(Msg, LocalPublish, Props).
|
||||||
|
|
|
@ -34,8 +34,8 @@ egress_desc.label:
|
||||||
|
|
||||||
egress_pool_size.desc:
|
egress_pool_size.desc:
|
||||||
"""Size of the pool of MQTT clients that will publish messages to the remote broker.<br/>
|
"""Size of the pool of MQTT clients that will publish messages to the remote broker.<br/>
|
||||||
Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:egress:${node}:${n}'
|
Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:egress:${node}:${n}'
|
||||||
where 'n' is the number of a client inside the pool."""
|
where 'n' is the number of a client inside the pool."""
|
||||||
|
|
||||||
egress_pool_size.label:
|
egress_pool_size.label:
|
||||||
"""Pool Size"""
|
"""Pool Size"""
|
||||||
|
@ -85,10 +85,11 @@ ingress_desc.label:
|
||||||
|
|
||||||
ingress_pool_size.desc:
|
ingress_pool_size.desc:
|
||||||
"""Size of the pool of MQTT clients that will ingest messages from the remote broker.<br/>
|
"""Size of the pool of MQTT clients that will ingest messages from the remote broker.<br/>
|
||||||
This value will be respected only if 'remote.topic' is a shared subscription topic filter,
|
This value will be respected only if 'remote.topic' is a shared subscription topic or topic-filter
|
||||||
otherwise only a single MQTT client will be used.
|
(for example `$share/name1/topic1` or `$share/name2/topic2/#`), otherwise only a single MQTT client will be used.
|
||||||
Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:ingress:${node}:${n}'
|
Each MQTT client will be assigned 'clientid' of the form '${clientid_prefix}:${bridge_name}:ingress:${node}:${n}'
|
||||||
where 'n' is the number of a client inside the pool."""
|
where 'n' is the number of a client inside the pool.
|
||||||
|
NOTE: Non-shared subscription will not work well when EMQX is clustered."""
|
||||||
|
|
||||||
ingress_pool_size.label:
|
ingress_pool_size.label:
|
||||||
"""Pool Size"""
|
"""Pool Size"""
|
||||||
|
|
Loading…
Reference in New Issue