feat(bridge): add descriptions to the mqtt bridge schema

This commit is contained in:
Shawn 2021-10-25 11:45:54 +08:00
parent 0cbdaa0f40
commit 63f942a1b8
6 changed files with 146 additions and 51 deletions

View File

@ -51,9 +51,9 @@ end_per_testcase(_, _Config) ->
-define(HTTP_BRIDGE(PATH), -define(HTTP_BRIDGE(PATH),
#{ #{
<<"base_url">> => <<"http://localhost:9901">>, <<"base_url">> => <<"http://localhost:9901">>,
<<"egress_channels">> => #{ <<"egress">> => #{
<<"a">> => #{ <<"a">> => #{
<<"subscribe_local_topic">> => <<"emqx_http/#">>, <<"from_local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>, <<"method">> => <<"post">>,
<<"path">> => PATH, <<"path">> => PATH,
<<"body">> => <<"${payload}">>, <<"body">> => <<"${payload}">>,
@ -114,7 +114,7 @@ t_crud_apis(_) ->
, bridge_type := http , bridge_type := http
, is_connected := _ , is_connected := _
, node := _ , node := _
, <<"egress_channels">> := #{ , <<"egress">> := #{
<<"a">> := #{<<"path">> := ?PATH1} <<"a">> := #{<<"path">> := ?PATH1}
} }
}, Bridge), }, Bridge),
@ -127,7 +127,7 @@ t_crud_apis(_) ->
?assertMatch(#{ id := <<"http:test_bridge">> ?assertMatch(#{ id := <<"http:test_bridge">>
, bridge_type := http , bridge_type := http
, is_connected := _ , is_connected := _
, <<"egress_channels">> := #{ , <<"egress">> := #{
<<"a">> := #{<<"path">> := ?PATH2} <<"a">> := #{<<"path">> := ?PATH2}
} }
}, Bridge2), }, Bridge2),

View File

@ -100,8 +100,8 @@ on_start(InstId, Conf) ->
"bridge:" ++ NamePrefix = binary_to_list(InstId), "bridge:" ++ NamePrefix = binary_to_list(InstId),
BasicConf = basic_config(Conf), BasicConf = basic_config(Conf),
InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}}, InitRes = {ok, #{name_prefix => NamePrefix, baisc_conf => BasicConf, channels => []}},
InOutConfigs = taged_map_list(ingress_channels, maps:get(ingress_channels, Conf, #{})) InOutConfigs = taged_map_list(ingress, maps:get(ingress, Conf, #{}))
++ taged_map_list(egress_channels, maps:get(egress_channels, Conf, #{})), ++ taged_map_list(egress, maps:get(egress, Conf, #{})),
lists:foldl(fun lists:foldl(fun
(_InOutConf, {error, Reason}) -> (_InOutConf, {error, Reason}) ->
{error, Reason}; {error, Reason};
@ -120,7 +120,7 @@ on_stop(InstId, #{channels := NameList}) ->
end, NameList). end, NameList).
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the %% TODO: let the emqx_resource trigger on_query/4 automatically according to the
%% `ingress_channels` and `egress_channels` config %% `ingress` and `egress` config
on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
baisc_conf := BasicConf}) -> baisc_conf := BasicConf}) ->
create_channel(Conf, Prefix, BasicConf); create_channel(Conf, Prefix, BasicConf);
@ -136,36 +136,36 @@ on_health_check(_InstId, #{channels := NameList} = State) ->
false -> {error, {some_channel_down, Results}, State} false -> {error, {some_channel_down, Results}, State}
end. end.
create_channel({{ingress_channels, Id}, #{subscribe_remote_topic := RemoteT} = Conf}, create_channel({{ingress, Id}, #{from_remote_topic := RemoteT} = Conf},
NamePrefix, BasicConf) -> NamePrefix, BasicConf) ->
LocalT = maps:get(local_topic, Conf, undefined), LocalT = maps:get(to_local_topic, Conf, undefined),
ChannId = ingress_channel_id(NamePrefix, Id), ChannId = ingress_channel_id(NamePrefix, Id),
?SLOG(info, #{msg => "creating ingress channel", ?SLOG(info, #{msg => "creating ingress channel",
remote_topic => RemoteT, to_remote_topic => RemoteT,
local_topic => LocalT, to_local_topic => LocalT,
channel_id => ChannId}), channel_id => ChannId}),
do_create_channel(BasicConf#{ do_create_channel(BasicConf#{
name => ChannId, name => ChannId,
clientid => clientid(ChannId), clientid => clientid(ChannId),
subscriptions => Conf#{ subscriptions => Conf#{
local_topic => LocalT, to_local_topic => LocalT,
on_message_received => {fun ?MODULE:on_message_received/2, [ChannId]} on_message_received => {fun ?MODULE:on_message_received/2, [ChannId]}
}, },
forwards => undefined}); forwards => undefined});
create_channel({{egress_channels, Id}, #{remote_topic := RemoteT} = Conf}, create_channel({{egress, Id}, #{to_remote_topic := RemoteT} = Conf},
NamePrefix, BasicConf) -> NamePrefix, BasicConf) ->
LocalT = maps:get(subscribe_local_topic, Conf, undefined), LocalT = maps:get(from_local_topic, Conf, undefined),
ChannId = egress_channel_id(NamePrefix, Id), ChannId = egress_channel_id(NamePrefix, Id),
?SLOG(info, #{msg => "creating egress channel", ?SLOG(info, #{msg => "creating egress channel",
remote_topic => RemoteT, to_remote_topic => RemoteT,
local_topic => LocalT, to_local_topic => LocalT,
channel_id => ChannId}), channel_id => ChannId}),
do_create_channel(BasicConf#{ do_create_channel(BasicConf#{
name => ChannId, name => ChannId,
clientid => clientid(ChannId), clientid => clientid(ChannId),
subscriptions => undefined, subscriptions => undefined,
forwards => Conf#{subscribe_local_topic => LocalT}}). forwards => Conf#{from_local_topic => LocalT}}).
remove_channel(ChannId) -> remove_channel(ChannId) ->
?SLOG(info, #{msg => "removing channel", ?SLOG(info, #{msg => "removing channel",
@ -229,9 +229,9 @@ taged_map_list(Tag, Map) ->
[{{Tag, K}, V} || {K, V} <- maps:to_list(Map)]. [{{Tag, K}, V} || {K, V} <- maps:to_list(Map)].
ingress_channel_id(Prefix, Id) -> ingress_channel_id(Prefix, Id) ->
channel_name("ingress_channels", Prefix, Id). channel_name("ingress", Prefix, Id).
egress_channel_id(Prefix, Id) -> egress_channel_id(Prefix, Id) ->
channel_name("egress_channels", Prefix, Id). channel_name("egress", Prefix, Id).
channel_name(Type, Prefix, Id) -> channel_name(Type, Prefix, Id) ->
list_to_atom(str(Prefix) ++ ":" ++ Type ++ ":" ++ str(Id)). list_to_atom(str(Prefix) ++ ":" ++ Type ++ ":" ++ str(Id)).

View File

@ -65,7 +65,7 @@ start(Config) ->
case emqtt:connect(Pid) of case emqtt:connect(Pid) of
{ok, _} -> {ok, _} ->
try try
ok = subscribe_remote_topics(Pid, Subscriptions), ok = from_remote_topics(Pid, Subscriptions),
{ok, #{client_pid => Pid, subscriptions => Subscriptions}} {ok, #{client_pid => Pid, subscriptions => Subscriptions}}
catch catch
throw : Reason -> throw : Reason ->
@ -167,7 +167,7 @@ handle_publish(Msg, #{on_message_received := {OnMsgRcvdFunc, Args}} = Vars) ->
message => Msg, vars => Vars}), message => Msg, vars => Vars}),
emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1), emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
_ = erlang:apply(OnMsgRcvdFunc, [Msg | Args]), _ = erlang:apply(OnMsgRcvdFunc, [Msg | Args]),
case maps:get(local_topic, Vars, undefined) of case maps:get(to_local_topic, Vars, undefined) of
undefined -> ok; undefined -> ok;
_Topic -> _Topic ->
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)) emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
@ -182,8 +182,8 @@ make_hdlr(Parent, Vars) ->
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
}. }.
subscribe_remote_topics(_ClientPid, undefined) -> ok; from_remote_topics(_ClientPid, undefined) -> ok;
subscribe_remote_topics(ClientPid, #{subscribe_remote_topic := FromTopic, subscribe_qos := QoS}) -> from_remote_topics(ClientPid, #{from_remote_topic := FromTopic, subscribe_qos := QoS}) ->
case emqtt:subscribe(ClientPid, FromTopic, QoS) of case emqtt:subscribe(ClientPid, FromTopic, QoS) of
{ok, _, _} -> ok; {ok, _, _} -> ok;
Error -> throw(Error) Error -> throw(Error)

View File

@ -36,7 +36,7 @@
-type variables() :: #{ -type variables() :: #{
mountpoint := undefined | binary(), mountpoint := undefined | binary(),
remote_topic := binary(), to_remote_topic := binary(),
qos := original | integer(), qos := original | integer(),
retain := original | boolean(), retain := original | boolean(),
payload := binary() payload := binary()
@ -59,7 +59,7 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false), Retain0 = maps:get(retain, Flags0, false),
MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)),
to_remote_msg(MapMsg, Vars); to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken, to_remote_msg(MapMsg, #{to_remote_topic := TopicToken, payload := PayloadToken,
qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg),
@ -75,7 +75,7 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
%% published from remote node over a MQTT connection %% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
#{local_topic := TopicToken, payload := PayloadToken, #{to_local_topic := TopicToken, payload := PayloadToken,
qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg),

View File

@ -67,36 +67,131 @@ fields("config") ->
})} })}
, {keepalive, mk_duration("keepalive", #{default => "300s"})} , {keepalive, mk_duration("keepalive", #{default => "300s"})}
, {retry_interval, mk_duration("retry interval", #{default => "30s"})} , {retry_interval, mk_duration("retry interval", #{default => "30s"})}
, {max_inflight, sc(integer(), #{default => 32})} , {max_inflight,
, {replayq, sc(ref("replayq"))} sc(integer(),
, {ingress_channels, sc(hoconsc:map(id, ref("ingress_channels")), #{default => []})} #{ default => 32
, {egress_channels, sc(hoconsc:map(id, ref("egress_channels")), #{default => []})} , desc => "Max inflight messages (sent but ACK has not received) of the MQTT protocol"
})}
, {replayq,
sc(ref("replayq"),
#{ desc => """
Queue messages in disk files.
"""
})}
, {ingress,
sc(ref("ingress"),
#{ default => #{}
, desc => """
The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
send them to the local broker.<br>
Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain',
'payload'.<br>
NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is
configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and
the rule.
"""
})}
, {egress,
sc(hoconsc:map(id, ref("egress")),
#{ default => #{}
, desc => """
The egress config defines how this bridge forwards messages from the local broker to the remote
broker.<br>
Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic
is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded.
"""
})}
] ++ emqx_connector_schema_lib:ssl_fields(); ] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress_channels") -> fields("ingress") ->
%% the message maybe subscribed by rules, in this case 'local_topic' is not necessary %% the message maybe subscribed by rules, in this case 'to_local_topic' is not necessary
[ {subscribe_remote_topic, sc(binary(), #{nullable => false})} [ {from_remote_topic,
, {local_topic, sc(binary())} sc(binary(),
, {subscribe_qos, sc(qos(), #{default => 1})} #{ nullable => false
, desc => "Receive messages from which topic of the remote broker"
})}
, {subscribe_qos,
sc(qos(),
#{ default => 1
, desc => "The QoS level to be used when subscribing to the remote broker"
})}
, {to_local_topic,
sc(binary(),
#{ desc => """
Send messages to which topic of the local broker.<br>
Template with variables is allowed.
"""
})}
] ++ common_inout_confs(); ] ++ common_inout_confs();
fields("egress_channels") -> fields("egress") ->
%% the message maybe sent from rules, in this case 'subscribe_local_topic' is not necessary %% the message maybe sent from rules, in this case 'from_local_topic' is not necessary
[ {subscribe_local_topic, sc(binary())} [ {from_local_topic,
, {remote_topic, sc(binary(), #{default => <<"${topic}">>})} sc(binary(),
#{ desc => "The local topic to be forwarded to the remote broker"
})}
, {to_remote_topic,
sc(binary(),
#{ default => <<"${topic}">>
, desc => """
Forward to which topic of the remote broker.<br>
Template with variables is allowed.
"""
})}
] ++ common_inout_confs(); ] ++ common_inout_confs();
fields("replayq") -> fields("replayq") ->
[ {dir, hoconsc:union([boolean(), string()])} [ {dir,
, {seg_bytes, sc(emqx_schema:bytesize(), #{default => "100MB"})} sc(hoconsc:union([boolean(), string()]),
, {offload, sc(boolean(), #{default => false})} #{ desc => """
, {max_total_bytes, sc(emqx_schema:bytesize(), #{default => "1024MB"})} The dir where the replayq file saved.<br>
Set to 'false' disables the replayq feature.
"""
})}
, {seg_bytes,
sc(emqx_schema:bytesize(),
#{ default => "100MB"
, desc => """
The size in bytes of a single segment.<br>
A segment is mapping to a file in the replayq dir. If the current segment is full, a new segment
(file) will be opened to write.
"""
})}
, {offload,
sc(boolean(),
#{ default => false
, desc => """
In offload mode, the disk queue is only used to offload queue tail segments.<br>
The messages are cached in the memory first, then it write to the replayq files after the size of
the memory cache reaches 'seg_bytes'.
"""
})}
]. ].
common_inout_confs() -> common_inout_confs() ->
[ {qos, sc(qos(), #{default => <<"${qos}">>})} [ {qos,
, {retain, sc(hoconsc:union([boolean(), binary()]), #{default => <<"${retain}">>})} sc(qos(),
, {payload, sc(binary(), #{default => <<"${payload}">>})} #{ default => <<"${qos}">>
, desc => """
The QoS of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
, {retain,
sc(hoconsc:union([boolean(), binary()]),
#{ default => <<"${retain}">>
, desc => """
The retain flag of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
, {payload,
sc(binary(),
#{ default => <<"${payload}">>
, desc => """
The payload of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
]. ].
qos() -> qos() ->

View File

@ -235,8 +235,8 @@ pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts)
pre_process_in_out(undefined) -> undefined; pre_process_in_out(undefined) -> undefined;
pre_process_in_out(Conf) when is_map(Conf) -> pre_process_in_out(Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(local_topic, Conf), Conf1 = pre_process_conf(to_local_topic, Conf),
Conf2 = pre_process_conf(remote_topic, Conf1), Conf2 = pre_process_conf(to_remote_topic, Conf1),
Conf3 = pre_process_conf(payload, Conf2), Conf3 = pre_process_conf(payload, Conf2),
Conf4 = pre_process_conf(qos, Conf3), Conf4 = pre_process_conf(qos, Conf3),
pre_process_conf(retain, Conf4). pre_process_conf(retain, Conf4).
@ -347,7 +347,7 @@ do_connect(#{connect_opts := ConnectOpts = #{forwards := Forwards},
name := Name} = State) -> name := Name} = State) ->
case Forwards of case Forwards of
undefined -> ok; undefined -> ok;
#{subscribe_local_topic := Topic} -> subscribe_local_topic(Topic, Name) #{from_local_topic := Topic} -> from_local_topic(Topic, Name)
end, end,
case emqx_connector_mqtt_mod:start(ConnectOpts) of case emqx_connector_mqtt_mod:start(ConnectOpts) of
{ok, Conn} -> {ok, Conn} ->
@ -473,9 +473,9 @@ drop_acked_batches(Q, [#{send_ack_ref := Refs,
All All
end. end.
subscribe_local_topic(undefined, _Name) -> from_local_topic(undefined, _Name) ->
ok; ok;
subscribe_local_topic(Topic, Name) -> from_local_topic(Topic, Name) ->
do_subscribe(Topic, Name). do_subscribe(Topic, Name).
topic(T) -> iolist_to_binary(T). topic(T) -> iolist_to_binary(T).