Merge pull request #6473 from terry-xiaoyu/fix_connector_apis

fix(connector): add type and name in repsonse of GET /connectors
This commit is contained in:
Shawn 2021-12-18 09:35:45 +08:00 committed by GitHub
commit b2e7a6a249
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 134 additions and 85 deletions

View File

@ -8,9 +8,9 @@
# connector = "mqtt:my_mqtt_connector"
# direction = ingress
# ## topic mappings for this bridge
# from_remote_topic = "aws/#"
# remote_topic = "aws/#"
# subscribe_qos = 1
# to_local_topic = "from_aws/${topic}"
# local_topic = "from_aws/${topic}"
# payload = "${payload}"
# qos = "${qos}"
# retain = "${retain}"
@ -21,8 +21,8 @@
# connector = "mqtt:my_mqtt_connector"
# direction = egress
# ## topic mappings for this bridge
# from_local_topic = "emqx/#"
# to_remote_topic = "from_emqx/${topic}"
# local_topic = "emqx/#"
# remote_topic = "from_emqx/${topic}"
# payload = "${payload}"
# qos = 1
# retain = false

View File

@ -67,9 +67,16 @@ load_hook(Bridges) ->
end, maps:to_list(Bridge))
end, maps:to_list(Bridges)).
do_load_hook(#{from_local_topic := _}) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
ok;
do_load_hook(#{local_topic := _} = Conf) ->
case maps:find(direction, Conf) of
error ->
%% this bridge has no direction field, it means that it has only egress bridges
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
{ok, egress} ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
{ok, ingress} ->
ok
end;
do_load_hook(_Conf) -> ok.
unload_hook() ->
@ -218,7 +225,7 @@ recreate(Type, Name, Conf) ->
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
create_dry_run(Type, Conf) ->
Conf0 = Conf#{<<"ingress">> => #{<<"from_remote_topic">> => <<"t">>}},
Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},
case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of
{ok, Conf1} ->
emqx_resource:create_dry_run_local(emqx_bridge:resource_type(Type), Conf1);
@ -263,7 +270,7 @@ get_matched_bridges(Topic) ->
end, Acc0, Conf)
end, [], Bridges).
get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) ->
get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of
true -> [bridge_id(BType, BName) | Acc];
false -> Acc

View File

@ -161,7 +161,7 @@ info_example_basic(http, _) ->
pool_size => 4,
enable_pipelining => true,
ssl => #{enable => false},
from_local_topic => <<"emqx_http/#">>,
local_topic => <<"emqx_http/#">>,
method => post,
body => <<"${payload}">>
};
@ -169,21 +169,21 @@ info_example_basic(mqtt, ingress) ->
#{
connector => <<"mqtt:my_mqtt_connector">>,
direction => ingress,
from_remote_topic => <<"aws/#">>,
subscribe_qos => 1,
to_local_topic => <<"from_aws/${topic}">>,
remote_topic => <<"aws/#">>,
remote_qos => 1,
local_topic => <<"from_aws/${topic}">>,
local_qos => <<"${qos}">>,
payload => <<"${payload}">>,
qos => <<"${qos}">>,
retain => <<"${retain}">>
};
info_example_basic(mqtt, egress) ->
#{
connector => <<"mqtt:my_mqtt_connector">>,
direction => egress,
from_local_topic => <<"emqx/#">>,
to_remote_topic => <<"from_emqx/${topic}">>,
local_topic => <<"emqx/#">>,
remote_topic => <<"from_emqx/${topic}">>,
remote_qos => <<"${qos}">>,
payload => <<"${payload}">>,
qos => 1,
retain => false
}.

View File

@ -23,12 +23,12 @@ For example, <code> http://localhost:9901/${topic} </code> is allowed, but
is not allowed.
"""
})}
, {from_local_topic, mk(binary(),
, {local_topic, mk(binary(),
#{ desc =>"""
The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
match the from_local_topic will be forwarded.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded.
match the local_topic will be forwarded.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches
local_topic will be forwarded.
"""
})}
, {method, mk(method(),

View File

@ -30,7 +30,7 @@
-define(HTTP_BRIDGE(URL),
#{
<<"url">> => URL,
<<"from_local_topic">> => <<"emqx_http/#">>,
<<"local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>,
<<"ssl">> => #{<<"enable">> => false},
<<"body">> => <<"${payload}">>,

View File

@ -209,7 +209,7 @@ schema("/connectors/:id") ->
end.
'/connectors'(get, _Request) ->
{200, emqx_connector:list()};
{200, [format_resp(Conn) || Conn <- emqx_connector:list()]};
'/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
ConnName = maps:get(<<"name">>, Params, emqx_misc:gen_id()),
@ -264,10 +264,16 @@ error_msg(Code, Msg) when is_binary(Msg) ->
error_msg(Code, Msg) ->
#{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
format_resp(#{<<"id">> := Id} = RawConf) ->
format_resp(Id, RawConf).
format_resp(ConnId, RawConf) ->
NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)),
{Type, Name} = emqx_connector:parse_connector_id(ConnId),
RawConf#{
<<"id">> => ConnId,
<<"type">> => Type,
<<"name">> => Name,
<<"num_of_bridges">> => NumOfBridges
}.

View File

@ -65,7 +65,7 @@ start(Config) ->
case emqtt:connect(Pid) of
{ok, _} ->
try
ok = from_remote_topics(Pid, Subscriptions),
ok = sub_remote_topics(Pid, Subscriptions),
{ok, #{client_pid => Pid, subscriptions => Subscriptions}}
catch
throw : Reason ->
@ -171,7 +171,7 @@ handle_publish(Msg, Vars) ->
_ = erlang:apply(Mod, Func, [Msg | Args]);
_ -> ok
end,
case maps:get(to_local_topic, Vars, undefined) of
case maps:get(local_topic, Vars, undefined) of
undefined -> ok;
_Topic ->
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
@ -186,8 +186,8 @@ make_hdlr(Parent, Vars) ->
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
}.
from_remote_topics(_ClientPid, undefined) -> ok;
from_remote_topics(ClientPid, #{from_remote_topic := FromTopic, subscribe_qos := QoS}) ->
sub_remote_topics(_ClientPid, undefined) -> ok;
sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
case emqtt:subscribe(ClientPid, FromTopic, QoS) of
{ok, _, _} -> ok;
Error -> throw(Error)

View File

@ -24,6 +24,10 @@
, estimate_size/1
]).
-export([ replace_vars_in_str/2
, replace_simple_var/2
]).
-export_type([msg/0]).
-include_lib("emqx/include/emqx.hrl").
@ -36,8 +40,8 @@
-type variables() :: #{
mountpoint := undefined | binary(),
to_remote_topic := binary(),
qos := original | integer(),
remote_topic := binary(),
remote_qos := original | integer(),
retain := original | boolean(),
payload := binary()
}.
@ -59,8 +63,8 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false),
MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)),
to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{to_remote_topic := TopicToken, payload := PayloadToken,
qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
@ -75,8 +79,8 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
%% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
#{to_local_topic := TopicToken, payload := PayloadToken,
qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
#{local_topic := TopicToken, payload := PayloadToken,
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),

View File

@ -8,7 +8,7 @@
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% cluster_shareload under the License is cluster_shareload on an "AS IS" BASIS,
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
@ -52,7 +52,7 @@ In 'cluster_shareload' mode, the incomming load from the remote broker is shared
using shared subscription.<br>
Note that the 'clientid' is suffixed by the node name, this is to avoid
clientid conflicts between different nodes. And we can only use shared subscription
topic filters for 'from_remote_topic'.
topic filters for 'remote_topic' of ingress connections.
"""
})}
, {server,
@ -101,24 +101,31 @@ Queue messages in disk files.
] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress") ->
%% the message maybe subscribed by rules, in this case 'to_local_topic' is not necessary
[ {from_remote_topic,
%% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
[ {remote_topic,
sc(binary(),
#{ nullable => false
, desc => "Receive messages from which topic of the remote broker"
})}
, {subscribe_qos,
, {remote_qos,
sc(qos(),
#{ default => 1
, desc => "The QoS level to be used when subscribing to the remote broker"
})}
, {to_local_topic,
, {local_topic,
sc(binary(),
#{ desc => """
Send messages to which topic of the local broker.<br>
Template with variables is allowed.
"""
})}
, {local_qos,
sc(qos(),
#{ default => <<"${qos}">>
, desc => """
The QoS of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
, {hookpoint,
sc(binary(),
#{ desc => """
@ -128,12 +135,12 @@ The hookpoint will be triggered when there's any message received from the remot
] ++ common_inout_confs();
fields("egress") ->
%% the message maybe sent from rules, in this case 'from_local_topic' is not necessary
[ {from_local_topic,
%% the message maybe sent from rules, in this case 'local_topic' is not necessary
[ {local_topic,
sc(binary(),
#{ desc => "The local topic to be forwarded to the remote broker"
})}
, {to_remote_topic,
, {remote_topic,
sc(binary(),
#{ default => <<"${topic}">>
, desc => """
@ -141,6 +148,13 @@ Forward to which topic of the remote broker.<br>
Template with variables is allowed.
"""
})}
, {remote_qos,
sc(qos(),
#{ default => <<"${qos}">>
, desc => """
The QoS of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
] ++ common_inout_confs();
fields("replayq") ->
@ -187,31 +201,24 @@ topic_mappings() ->
ingress_desc() -> """
The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
send them to the local broker.<br>
Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain',
Template with variables is allowed in 'local_topic', 'remote_qos', 'qos', 'retain',
'payload'.<br>
NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is
configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and
NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also local_topic is
configured, then messages got from the remote broker will be sent to both the 'local_topic' and
the rule.
""".
egress_desc() -> """
The egress config defines how this bridge forwards messages from the local broker to the remote
broker.<br>
Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic
Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.<br>
NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic
is configured, then both the data got from the rule and the MQTT messages that matches
from_local_topic will be forwarded.
local_topic will be forwarded.
""".
common_inout_confs() ->
[ {qos,
sc(qos(),
#{ default => <<"${qos}">>
, desc => """
The QoS of the MQTT message to be sent.<br>
Template with variables is allowed."""
})}
, {retain,
[ {retain,
sc(hoconsc:union([boolean(), binary()]),
#{ default => <<"${retain}">>
, desc => """

View File

@ -226,16 +226,22 @@ open_replayq(Name, QCfg) ->
marshaller => fun ?MODULE:msg_marshaller/1}).
pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) ->
ConnectOpts#{subscriptions => pre_process_in_out(InConf),
forwards => pre_process_in_out(OutConf)}.
ConnectOpts#{subscriptions => pre_process_in_out(in, InConf),
forwards => pre_process_in_out(out, OutConf)}.
pre_process_in_out(undefined) -> undefined;
pre_process_in_out(Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(to_local_topic, Conf),
Conf2 = pre_process_conf(to_remote_topic, Conf1),
Conf3 = pre_process_conf(payload, Conf2),
Conf4 = pre_process_conf(qos, Conf3),
pre_process_conf(retain, Conf4).
pre_process_in_out(_, undefined) -> undefined;
pre_process_in_out(in, Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(local_topic, Conf),
Conf2 = pre_process_conf(local_qos, Conf1),
pre_process_in_out_common(Conf2);
pre_process_in_out(out, Conf) when is_map(Conf) ->
Conf1 = pre_process_conf(remote_topic, Conf),
Conf2 = pre_process_conf(remote_qos, Conf1),
pre_process_in_out_common(Conf2).
pre_process_in_out_common(Conf) ->
Conf1 = pre_process_conf(payload, Conf),
pre_process_conf(retain, Conf1).
pre_process_conf(Key, Conf) ->
case maps:find(Key, Conf) of

View File

@ -46,11 +46,11 @@
#{
<<"connector">> => ID,
<<"direction">> => <<"ingress">>,
<<"from_remote_topic">> => <<"remote_topic/#">>,
<<"to_local_topic">> => <<"local_topic/${topic}">>,
<<"subscribe_qos">> => 1,
<<"remote_topic">> => <<"remote_topic/#">>,
<<"remote_qos">> => 2,
<<"local_topic">> => <<"local_topic/${topic}">>,
<<"local_qos">> => <<"${qos}">>,
<<"payload">> => <<"${payload}">>,
<<"qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">>
}).
@ -58,10 +58,10 @@
#{
<<"connector">> => ID,
<<"direction">> => <<"egress">>,
<<"from_local_topic">> => <<"local_topic/#">>,
<<"to_remote_topic">> => <<"remote_topic/${topic}">>,
<<"local_topic">> => <<"local_topic/#">>,
<<"remote_topic">> => <<"remote_topic/${topic}">>,
<<"payload">> => <<"${payload}">>,
<<"qos">> => <<"${qos}">>,
<<"remote_qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">>
}).
@ -125,6 +125,8 @@ t_mqtt_crud_apis(_) ->
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User1
, <<"password">> := <<"">>
@ -157,6 +159,8 @@ t_mqtt_crud_apis(_) ->
%% list all connectors again, assert Connector2 is in it
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
?assertMatch([#{ <<"id">> := ?CONNECTR_ID
, <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2
, <<"password">> := <<"">>
@ -167,6 +171,8 @@ t_mqtt_crud_apis(_) ->
%% get the connector by id
{ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2
, <<"password">> := <<"">>

View File

@ -44,6 +44,7 @@
-type rule() ::
#{ id := rule_id()
, name := binary()
, sql := binary()
, outputs := [output()]
, enabled := boolean()

View File

@ -38,14 +38,11 @@ roots() ->
].
fields("rule_creation") ->
[ {"id", sc(binary(),
#{ desc => "The Id of the rule", nullable => false
, example => "my_rule_id"
})}
] ++ emqx_rule_engine_schema:fields("rules");
emqx_rule_engine_schema:fields("rules");
fields("rule_info") ->
[ {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})}
[ rule_id()
, {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})}
, {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})}
, {"from", sc(hoconsc:array(binary()),
#{desc => "The topics of the rule", example => "t/#"})}
@ -182,5 +179,11 @@ qos() ->
{"qos", sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]),
#{desc => "The Message QoS"})}.
rule_id() ->
{"id", sc(binary(),
#{ desc => "The Id of the rule", nullable => false
, example => "293fb66f"
})}.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).

View File

@ -221,6 +221,7 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
{ok, Select} ->
Rule = #{
id => RuleId,
name => maps:get(name, Params, <<"">>),
created_at => erlang:system_time(millisecond),
enabled => maps:get(enabled, Params, true),
sql => Sql,

View File

@ -59,9 +59,6 @@ error_schema(Code, Message) ->
rule_creation_schema() ->
ref(emqx_rule_api_schema, "rule_creation").
rule_update_schema() ->
ref(emqx_rule_engine_schema, "rules").
rule_test_schema() ->
ref(emqx_rule_api_schema, "rule_test").
@ -120,7 +117,7 @@ schema("/rules/:id") ->
description => <<"Update a rule by given Id to all nodes in the cluster">>,
summary => <<"Update a Rule">>,
parameters => param_path_id(),
requestBody => rule_update_schema(),
requestBody => rule_creation_schema(),
responses => #{
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
200 => rule_info_schema()
@ -167,7 +164,8 @@ param_path_id() ->
Records = emqx_rule_engine:get_rules_ordered_by_ts(),
{200, format_rule_resp(Records)};
'/rules'(post, #{body := #{<<"id">> := Id} = Params}) ->
'/rules'(post, #{body := Params}) ->
Id = maps:get(<<"id">>, Params, list_to_binary(emqx_misc:gen_id(8))),
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx_rule_engine:get_rule(Id) of
{ok, _Rule} ->
@ -230,7 +228,8 @@ err_msg(Msg) ->
format_rule_resp(Rules) when is_list(Rules) ->
[format_rule_resp(R) || R <- Rules];
format_rule_resp(#{ id := Id, created_at := CreatedAt,
format_rule_resp(#{ id := Id, name := Name,
created_at := CreatedAt,
from := Topics,
outputs := Output,
sql := SQL,
@ -238,6 +237,7 @@ format_rule_resp(#{ id := Id, created_at := CreatedAt,
description := Descr}) ->
NodeMetrics = get_rule_metrics(Id),
#{id => Id,
name => Name,
from => Topics,
outputs => format_output(Output),
sql => SQL,

View File

@ -39,7 +39,8 @@ fields("rule_engine") ->
];
fields("rules") ->
[ {"sql", sc(binary(),
[ rule_name()
, {"sql", sc(binary(),
#{ desc => """
SQL query to transform the messages.<br>
Example: <code>SELECT * FROM \"test/topic\" WHERE payload.x = 1</code><br>
@ -177,6 +178,13 @@ of the rule, then the string \"undefined\" is used.
})}
].
rule_name() ->
{"name", sc(binary(),
#{ desc => "The name of the rule"
, default => ""
, example => "foo"
})}.
outputs() ->
[ binary()
, ref("builtin_output_republish")