refactor(bridges): rename some config entries for MQTT bridge

This commit is contained in:
Shawn 2021-12-17 21:22:04 +08:00
parent 9b34f6f9a3
commit 0699682f38
10 changed files with 97 additions and 67 deletions

View File

@ -8,9 +8,9 @@
# connector = "mqtt:my_mqtt_connector" # connector = "mqtt:my_mqtt_connector"
# direction = ingress # direction = ingress
# ## topic mappings for this bridge # ## topic mappings for this bridge
# from_remote_topic = "aws/#" # remote_topic = "aws/#"
# subscribe_qos = 1 # subscribe_qos = 1
# to_local_topic = "from_aws/${topic}" # local_topic = "from_aws/${topic}"
# payload = "${payload}" # payload = "${payload}"
# qos = "${qos}" # qos = "${qos}"
# retain = "${retain}" # retain = "${retain}"
@ -21,8 +21,8 @@
# connector = "mqtt:my_mqtt_connector" # connector = "mqtt:my_mqtt_connector"
# direction = egress # direction = egress
# ## topic mappings for this bridge # ## topic mappings for this bridge
# from_local_topic = "emqx/#" # local_topic = "emqx/#"
# to_remote_topic = "from_emqx/${topic}" # remote_topic = "from_emqx/${topic}"
# payload = "${payload}" # payload = "${payload}"
# qos = 1 # qos = 1
# retain = false # retain = false

View File

@ -67,9 +67,16 @@ load_hook(Bridges) ->
end, maps:to_list(Bridge)) end, maps:to_list(Bridge))
end, maps:to_list(Bridges)). end, maps:to_list(Bridges)).
do_load_hook(#{from_local_topic := _}) -> do_load_hook(#{local_topic := _} = Conf) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), case maps:find(direction, Conf) of
ok; error ->
%% this bridge has no direction field, it means that it has only egress bridges
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
{ok, egress} ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
{ok, ingress} ->
ok
end;
do_load_hook(_Conf) -> ok. do_load_hook(_Conf) -> ok.
unload_hook() -> unload_hook() ->
@ -218,7 +225,7 @@ recreate(Type, Name, Conf) ->
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []). emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
create_dry_run(Type, Conf) -> create_dry_run(Type, Conf) ->
Conf0 = Conf#{<<"ingress">> => #{<<"from_remote_topic">> => <<"t">>}}, Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},
case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of
{ok, Conf1} -> {ok, Conf1} ->
emqx_resource:create_dry_run_local(emqx_bridge:resource_type(Type), Conf1); emqx_resource:create_dry_run_local(emqx_bridge:resource_type(Type), Conf1);
@ -263,7 +270,7 @@ get_matched_bridges(Topic) ->
end, Acc0, Conf) end, Acc0, Conf)
end, [], Bridges). end, [], Bridges).
get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) -> get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of case emqx_topic:match(Topic, Filter) of
true -> [bridge_id(BType, BName) | Acc]; true -> [bridge_id(BType, BName) | Acc];
false -> Acc false -> Acc

View File

@ -161,7 +161,7 @@ info_example_basic(http, _) ->
pool_size => 4, pool_size => 4,
enable_pipelining => true, enable_pipelining => true,
ssl => #{enable => false}, ssl => #{enable => false},
from_local_topic => <<"emqx_http/#">>, local_topic => <<"emqx_http/#">>,
method => post, method => post,
body => <<"${payload}">> body => <<"${payload}">>
}; };
@ -169,9 +169,9 @@ info_example_basic(mqtt, ingress) ->
#{ #{
connector => <<"mqtt:my_mqtt_connector">>, connector => <<"mqtt:my_mqtt_connector">>,
direction => ingress, direction => ingress,
from_remote_topic => <<"aws/#">>, remote_topic => <<"aws/#">>,
subscribe_qos => 1, remote_qos => 1,
to_local_topic => <<"from_aws/${topic}">>, local_topic => <<"from_aws/${topic}">>,
payload => <<"${payload}">>, payload => <<"${payload}">>,
qos => <<"${qos}">>, qos => <<"${qos}">>,
retain => <<"${retain}">> retain => <<"${retain}">>
@ -180,8 +180,8 @@ info_example_basic(mqtt, egress) ->
#{ #{
connector => <<"mqtt:my_mqtt_connector">>, connector => <<"mqtt:my_mqtt_connector">>,
direction => egress, direction => egress,
from_local_topic => <<"emqx/#">>, local_topic => <<"emqx/#">>,
to_remote_topic => <<"from_emqx/${topic}">>, remote_topic => <<"from_emqx/${topic}">>,
payload => <<"${payload}">>, payload => <<"${payload}">>,
qos => 1, qos => 1,
retain => false retain => false

View File

@ -23,12 +23,12 @@ For example, <code> http://localhost:9901/${topic} </code> is allowed, but
is not allowed. is not allowed.
""" """
})} })}
, {from_local_topic, mk(binary(), , {local_topic, mk(binary(),
#{ desc =>""" #{ desc =>"""
The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
match the from_local_topic will be forwarded.<br> match the local_topic will be forwarded.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded. local_topic will be forwarded.
""" """
})} })}
, {method, mk(method(), , {method, mk(method(),

View File

@ -30,7 +30,7 @@
-define(HTTP_BRIDGE(URL), -define(HTTP_BRIDGE(URL),
#{ #{
<<"url">> => URL, <<"url">> => URL,
<<"from_local_topic">> => <<"emqx_http/#">>, <<"local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>, <<"method">> => <<"post">>,
<<"ssl">> => #{<<"enable">> => false}, <<"ssl">> => #{<<"enable">> => false},
<<"body">> => <<"${payload}">>, <<"body">> => <<"${payload}">>,

View File

@ -65,7 +65,7 @@ start(Config) ->
case emqtt:connect(Pid) of case emqtt:connect(Pid) of
{ok, _} -> {ok, _} ->
try try
ok = from_remote_topics(Pid, Subscriptions), ok = sub_remote_topics(Pid, Subscriptions),
{ok, #{client_pid => Pid, subscriptions => Subscriptions}} {ok, #{client_pid => Pid, subscriptions => Subscriptions}}
catch catch
throw : Reason -> throw : Reason ->
@ -171,7 +171,7 @@ handle_publish(Msg, Vars) ->
_ = erlang:apply(Mod, Func, [Msg | Args]); _ = erlang:apply(Mod, Func, [Msg | Args]);
_ -> ok _ -> ok
end, end,
case maps:get(to_local_topic, Vars, undefined) of case maps:get(local_topic, Vars, undefined) of
undefined -> ok; undefined -> ok;
_Topic -> _Topic ->
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars)) emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
@ -186,8 +186,8 @@ make_hdlr(Parent, Vars) ->
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
}. }.
from_remote_topics(_ClientPid, undefined) -> ok; sub_remote_topics(_ClientPid, undefined) -> ok;
from_remote_topics(ClientPid, #{from_remote_topic := FromTopic, subscribe_qos := QoS}) -> sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
case emqtt:subscribe(ClientPid, FromTopic, QoS) of case emqtt:subscribe(ClientPid, FromTopic, QoS) of
{ok, _, _} -> ok; {ok, _, _} -> ok;
Error -> throw(Error) Error -> throw(Error)

View File

@ -24,6 +24,10 @@
, estimate_size/1 , estimate_size/1
]). ]).
-export([ replace_vars_in_str/2
, replace_simple_var/2
]).
-export_type([msg/0]). -export_type([msg/0]).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
@ -36,7 +40,7 @@
-type variables() :: #{ -type variables() :: #{
mountpoint := undefined | binary(), mountpoint := undefined | binary(),
to_remote_topic := binary(), remote_topic := binary(),
qos := original | integer(), qos := original | integer(),
retain := original | boolean(), retain := original | boolean(),
payload := binary() payload := binary()
@ -59,8 +63,8 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false), Retain0 = maps:get(retain, Flags0, false),
MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)), MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)),
to_remote_msg(MapMsg, Vars); to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{to_remote_topic := TopicToken, payload := PayloadToken, to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) -> remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg),
@ -75,8 +79,8 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
%% published from remote node over a MQTT connection %% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg, to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
#{to_local_topic := TopicToken, payload := PayloadToken, #{local_topic := TopicToken, payload := PayloadToken,
qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) -> local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
Topic = replace_vars_in_str(TopicToken, MapMsg), Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg), Payload = replace_vars_in_str(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg), QoS = replace_simple_var(QoSToken, MapMsg),

View File

@ -52,7 +52,7 @@ In 'cluster_shareload' mode, the incomming load from the remote broker is shared
using shared subscription.<br> using shared subscription.<br>
Note that the 'clientid' is suffixed by the node name, this is to avoid Note that the 'clientid' is suffixed by the node name, this is to avoid
clientid conflicts between different nodes. And we can only use shared subscription clientid conflicts between different nodes. And we can only use shared subscription
topic filters for 'from_remote_topic'. topic filters for 'remote_topic' of ingress connections.
""" """
})} })}
, {server, , {server,
@ -101,24 +101,31 @@ Queue messages in disk files.
] ++ emqx_connector_schema_lib:ssl_fields(); ] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress") -> fields("ingress") ->
%% the message maybe subscribed by rules, in this case 'to_local_topic' is not necessary %% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
[ {from_remote_topic, [ {remote_topic,
sc(binary(), sc(binary(),
#{ nullable => false #{ nullable => false
, desc => "Receive messages from which topic of the remote broker" , desc => "Receive messages from which topic of the remote broker"
})} })}
, {subscribe_qos, , {remote_qos,
sc(qos(), sc(qos(),
#{ default => 1 #{ default => 1
, desc => "The QoS level to be used when subscribing to the remote broker" , desc => "The QoS level to be used when subscribing to the remote broker"
})} })}
, {to_local_topic, , {local_topic,
sc(binary(), sc(binary(),
#{ desc => """ #{ desc => """
Send messages to which topic of the local broker.<br> Send messages to which topic of the local broker.<br>
Template with variables is allowed. Template with variables is allowed.
""" """
})} })}
, {local_qos,
sc(qos(),
#{ default => <<"${qos}">>
, desc => """
The QoS of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
, {hookpoint, , {hookpoint,
sc(binary(), sc(binary(),
#{ desc => """ #{ desc => """
@ -128,12 +135,12 @@ The hookpoint will be triggered when there's any message received from the remot
] ++ common_inout_confs(); ] ++ common_inout_confs();
fields("egress") -> fields("egress") ->
%% the message maybe sent from rules, in this case 'from_local_topic' is not necessary %% the message maybe sent from rules, in this case 'local_topic' is not necessary
[ {from_local_topic, [ {local_topic,
sc(binary(), sc(binary(),
#{ desc => "The local topic to be forwarded to the remote broker" #{ desc => "The local topic to be forwarded to the remote broker"
})} })}
, {to_remote_topic, , {remote_topic,
sc(binary(), sc(binary(),
#{ default => <<"${topic}">> #{ default => <<"${topic}">>
, desc => """ , desc => """
@ -141,6 +148,13 @@ Forward to which topic of the remote broker.<br>
Template with variables is allowed. Template with variables is allowed.
""" """
})} })}
, {remote_qos,
sc(qos(),
#{ default => <<"${qos}">>
, desc => """
The QoS of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
] ++ common_inout_confs(); ] ++ common_inout_confs();
fields("replayq") -> fields("replayq") ->
@ -187,31 +201,24 @@ topic_mappings() ->
ingress_desc() -> """ ingress_desc() -> """
The ingress config defines how this bridge receive messages from the remote MQTT broker, and then The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
send them to the local broker.<br> send them to the local broker.<br>
Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain', Template with variables is allowed in 'local_topic', 'remote_qos', 'qos', 'retain',
'payload'.<br> 'payload'.<br>
NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also local_topic is
configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and configured, then messages got from the remote broker will be sent to both the 'local_topic' and
the rule. the rule.
""". """.
egress_desc() -> """ egress_desc() -> """
The egress config defines how this bridge forwards messages from the local broker to the remote The egress config defines how this bridge forwards messages from the local broker to the remote
broker.<br> broker.<br>
Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.<br> Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic
is configured, then both the data got from the rule and the MQTT messages that matches is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded. local_topic will be forwarded.
""". """.
common_inout_confs() -> common_inout_confs() ->
[ {qos, [ {retain,
sc(qos(),
#{ default => <<"${qos}">>
, desc => """
The QoS of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
, {retain,
sc(hoconsc:union([boolean(), binary()]), sc(hoconsc:union([boolean(), binary()]),
#{ default => <<"${retain}">> #{ default => <<"${retain}">>
, desc => """ , desc => """

View File

@ -226,16 +226,22 @@ open_replayq(Name, QCfg) ->
marshaller => fun ?MODULE:msg_marshaller/1}). marshaller => fun ?MODULE:msg_marshaller/1}).
pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) -> pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) ->
ConnectOpts#{subscriptions => pre_process_in_out(InConf), ConnectOpts#{subscriptions => pre_process_in_out(in, InConf),
forwards => pre_process_in_out(OutConf)}. forwards => pre_process_in_out(out, OutConf)}.
pre_process_in_out(undefined) -> undefined; pre_process_in_out(_, undefined) -> undefined;
pre_process_in_out(Conf) when is_map(Conf) -> pre_process_in_out(in, Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(to_local_topic, Conf), Conf1 = pre_process_conf(local_topic, Conf),
Conf2 = pre_process_conf(to_remote_topic, Conf1), Conf2 = pre_process_conf(local_qos, Conf1),
Conf3 = pre_process_conf(payload, Conf2), pre_process_in_out_common(Conf2);
Conf4 = pre_process_conf(qos, Conf3), pre_process_in_out(out, Conf) when is_map(Conf) ->
pre_process_conf(retain, Conf4). Conf1 = pre_process_conf(remote_topic, Conf),
Conf2 = pre_process_conf(remote_qos, Conf1),
pre_process_in_out_common(Conf2).
pre_process_in_out_common(Conf) ->
Conf1 = pre_process_conf(payload, Conf),
pre_process_conf(retain, Conf1).
pre_process_conf(Key, Conf) -> pre_process_conf(Key, Conf) ->
case maps:find(Key, Conf) of case maps:find(Key, Conf) of

View File

@ -46,11 +46,11 @@
#{ #{
<<"connector">> => ID, <<"connector">> => ID,
<<"direction">> => <<"ingress">>, <<"direction">> => <<"ingress">>,
<<"from_remote_topic">> => <<"remote_topic/#">>, <<"remote_topic">> => <<"remote_topic/#">>,
<<"to_local_topic">> => <<"local_topic/${topic}">>, <<"remote_qos">> => 2,
<<"subscribe_qos">> => 1, <<"local_topic">> => <<"local_topic/${topic}">>,
<<"local_qos">> => <<"${qos}">>,
<<"payload">> => <<"${payload}">>, <<"payload">> => <<"${payload}">>,
<<"qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">> <<"retain">> => <<"${retain}">>
}). }).
@ -58,10 +58,10 @@
#{ #{
<<"connector">> => ID, <<"connector">> => ID,
<<"direction">> => <<"egress">>, <<"direction">> => <<"egress">>,
<<"from_local_topic">> => <<"local_topic/#">>, <<"local_topic">> => <<"local_topic/#">>,
<<"to_remote_topic">> => <<"remote_topic/${topic}">>, <<"remote_topic">> => <<"remote_topic/${topic}">>,
<<"payload">> => <<"${payload}">>, <<"payload">> => <<"${payload}">>,
<<"qos">> => <<"${qos}">>, <<"remote_qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">> <<"retain">> => <<"${retain}">>
}). }).
@ -125,6 +125,8 @@ t_mqtt_crud_apis(_) ->
%ct:pal("---connector: ~p", [Connector]), %ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User1 , <<"username">> := User1
, <<"password">> := <<"">> , <<"password">> := <<"">>
@ -157,6 +159,8 @@ t_mqtt_crud_apis(_) ->
%% list all connectors again, assert Connector2 is in it %% list all connectors again, assert Connector2 is in it
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []), {ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
?assertMatch([#{ <<"id">> := ?CONNECTR_ID ?assertMatch([#{ <<"id">> := ?CONNECTR_ID
, <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2 , <<"username">> := User2
, <<"password">> := <<"">> , <<"password">> := <<"">>
@ -167,6 +171,8 @@ t_mqtt_crud_apis(_) ->
%% get the connector by id %% get the connector by id
{ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), {ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">> , <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2 , <<"username">> := User2
, <<"password">> := <<"">> , <<"password">> := <<"">>