From 9b34f6f9a39d8dc96ad2aa624e17d8451aeddcfe Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Fri, 17 Dec 2021 16:45:42 +0800
Subject: [PATCH 1/5] fix(connector): add type and name in repsonse of GET
/connectors
---
apps/emqx_connector/src/emqx_connector_api.erl | 8 +++++++-
.../src/mqtt/emqx_connector_mqtt_schema.erl | 2 +-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index 2b77d7ac2..9db9f2a93 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -209,7 +209,7 @@ schema("/connectors/:id") ->
end.
'/connectors'(get, _Request) ->
- {200, emqx_connector:list()};
+ {200, [format_resp(Conn) || Conn <- emqx_connector:list()]};
'/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
ConnName = maps:get(<<"name">>, Params, emqx_misc:gen_id()),
@@ -264,10 +264,16 @@ error_msg(Code, Msg) when is_binary(Msg) ->
error_msg(Code, Msg) ->
#{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
+format_resp(#{<<"id">> := Id} = RawConf) ->
+ format_resp(Id, RawConf).
+
format_resp(ConnId, RawConf) ->
NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)),
+ {Type, Name} = emqx_connector:parse_connector_id(ConnId),
RawConf#{
<<"id">> => ConnId,
+ <<"type">> => Type,
+ <<"name">> => Name,
<<"num_of_bridges">> => NumOfBridges
}.
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index 2338129d1..a29acb8f8 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -8,7 +8,7 @@
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
-%% cluster_shareload under the License is cluster_shareload on an "AS IS" BASIS,
+%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
From 0699682f38b1241e390d9ccc1cfd0187dc24d9ba Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Fri, 17 Dec 2021 21:22:04 +0800
Subject: [PATCH 2/5] refactor(bridges): rename some config entries for MQTT
bridge
---
apps/emqx_bridge/etc/emqx_bridge.conf | 8 +--
apps/emqx_bridge/src/emqx_bridge.erl | 17 +++++--
apps/emqx_bridge/src/emqx_bridge_api.erl | 12 ++---
.../src/emqx_bridge_http_schema.erl | 8 +--
.../test/emqx_bridge_api_SUITE.erl | 2 +-
.../src/mqtt/emqx_connector_mqtt_mod.erl | 8 +--
.../src/mqtt/emqx_connector_mqtt_msg.erl | 14 +++--
.../src/mqtt/emqx_connector_mqtt_schema.erl | 51 +++++++++++--------
.../src/mqtt/emqx_connector_mqtt_worker.erl | 24 +++++----
.../test/emqx_connector_api_SUITE.erl | 20 +++++---
10 files changed, 97 insertions(+), 67 deletions(-)
diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf
index a3a07aa7f..fdd4005bf 100644
--- a/apps/emqx_bridge/etc/emqx_bridge.conf
+++ b/apps/emqx_bridge/etc/emqx_bridge.conf
@@ -8,9 +8,9 @@
# connector = "mqtt:my_mqtt_connector"
# direction = ingress
# ## topic mappings for this bridge
-# from_remote_topic = "aws/#"
+# remote_topic = "aws/#"
# subscribe_qos = 1
-# to_local_topic = "from_aws/${topic}"
+# local_topic = "from_aws/${topic}"
# payload = "${payload}"
# qos = "${qos}"
# retain = "${retain}"
@@ -21,8 +21,8 @@
# connector = "mqtt:my_mqtt_connector"
# direction = egress
# ## topic mappings for this bridge
-# from_local_topic = "emqx/#"
-# to_remote_topic = "from_emqx/${topic}"
+# local_topic = "emqx/#"
+# remote_topic = "from_emqx/${topic}"
# payload = "${payload}"
# qos = 1
# retain = false
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index 89455c229..50c39007a 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -67,9 +67,16 @@ load_hook(Bridges) ->
end, maps:to_list(Bridge))
end, maps:to_list(Bridges)).
-do_load_hook(#{from_local_topic := _}) ->
- emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
- ok;
+do_load_hook(#{local_topic := _} = Conf) ->
+ case maps:find(direction, Conf) of
+ error ->
+ %% this bridge has no direction field, it means that it has only egress bridges
+ emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
+ {ok, egress} ->
+ emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []});
+ {ok, ingress} ->
+ ok
+ end;
do_load_hook(_Conf) -> ok.
unload_hook() ->
@@ -218,7 +225,7 @@ recreate(Type, Name, Conf) ->
emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), []).
create_dry_run(Type, Conf) ->
- Conf0 = Conf#{<<"ingress">> => #{<<"from_remote_topic">> => <<"t">>}},
+ Conf0 = Conf#{<<"ingress">> => #{<<"remote_topic">> => <<"t">>}},
case emqx_resource:check_config(emqx_bridge:resource_type(Type), Conf0) of
{ok, Conf1} ->
emqx_resource:create_dry_run_local(emqx_bridge:resource_type(Type), Conf1);
@@ -263,7 +270,7 @@ get_matched_bridges(Topic) ->
end, Acc0, Conf)
end, [], Bridges).
-get_matched_bridge_id(#{from_local_topic := Filter}, Topic, BType, BName, Acc) ->
+get_matched_bridge_id(#{local_topic := Filter}, Topic, BType, BName, Acc) ->
case emqx_topic:match(Topic, Filter) of
true -> [bridge_id(BType, BName) | Acc];
false -> Acc
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index f7f249be6..b0657216f 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -161,7 +161,7 @@ info_example_basic(http, _) ->
pool_size => 4,
enable_pipelining => true,
ssl => #{enable => false},
- from_local_topic => <<"emqx_http/#">>,
+ local_topic => <<"emqx_http/#">>,
method => post,
body => <<"${payload}">>
};
@@ -169,9 +169,9 @@ info_example_basic(mqtt, ingress) ->
#{
connector => <<"mqtt:my_mqtt_connector">>,
direction => ingress,
- from_remote_topic => <<"aws/#">>,
- subscribe_qos => 1,
- to_local_topic => <<"from_aws/${topic}">>,
+ remote_topic => <<"aws/#">>,
+ remote_qos => 1,
+ local_topic => <<"from_aws/${topic}">>,
payload => <<"${payload}">>,
qos => <<"${qos}">>,
retain => <<"${retain}">>
@@ -180,8 +180,8 @@ info_example_basic(mqtt, egress) ->
#{
connector => <<"mqtt:my_mqtt_connector">>,
direction => egress,
- from_local_topic => <<"emqx/#">>,
- to_remote_topic => <<"from_emqx/${topic}">>,
+ local_topic => <<"emqx/#">>,
+ remote_topic => <<"from_emqx/${topic}">>,
payload => <<"${payload}">>,
qos => 1,
retain => false
diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
index 18fc59318..f7644af1d 100644
--- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
@@ -23,12 +23,12 @@ For example, http://localhost:9901/${topic}
is allowed, but
is not allowed.
"""
})}
- , {from_local_topic, mk(binary(),
+ , {local_topic, mk(binary(),
#{ desc =>"""
The MQTT topic filter to be forwarded to the HTTP server. All MQTT PUBLISH messages which topic
-match the from_local_topic will be forwarded.
-NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic is configured, then both the data got from the rule and the MQTT messages that matches
-from_local_topic will be forwarded.
+match the local_topic will be forwarded.
+NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic is configured, then both the data got from the rule and the MQTT messages that matches
+local_topic will be forwarded.
"""
})}
, {method, mk(method(),
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index 716786a8f..65baf7051 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -30,7 +30,7 @@
-define(HTTP_BRIDGE(URL),
#{
<<"url">> => URL,
- <<"from_local_topic">> => <<"emqx_http/#">>,
+ <<"local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>,
<<"ssl">> => #{<<"enable">> => false},
<<"body">> => <<"${payload}">>,
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl
index 4cc240d9d..7fb260130 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl
@@ -65,7 +65,7 @@ start(Config) ->
case emqtt:connect(Pid) of
{ok, _} ->
try
- ok = from_remote_topics(Pid, Subscriptions),
+ ok = sub_remote_topics(Pid, Subscriptions),
{ok, #{client_pid => Pid, subscriptions => Subscriptions}}
catch
throw : Reason ->
@@ -171,7 +171,7 @@ handle_publish(Msg, Vars) ->
_ = erlang:apply(Mod, Func, [Msg | Args]);
_ -> ok
end,
- case maps:get(to_local_topic, Vars, undefined) of
+ case maps:get(local_topic, Vars, undefined) of
undefined -> ok;
_Topic ->
emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars))
@@ -186,8 +186,8 @@ make_hdlr(Parent, Vars) ->
disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]}
}.
-from_remote_topics(_ClientPid, undefined) -> ok;
-from_remote_topics(ClientPid, #{from_remote_topic := FromTopic, subscribe_qos := QoS}) ->
+sub_remote_topics(_ClientPid, undefined) -> ok;
+sub_remote_topics(ClientPid, #{remote_topic := FromTopic, remote_qos := QoS}) ->
case emqtt:subscribe(ClientPid, FromTopic, QoS) of
{ok, _, _} -> ok;
Error -> throw(Error)
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
index 7b49f21fe..4a6eb71f4 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
@@ -24,6 +24,10 @@
, estimate_size/1
]).
+-export([ replace_vars_in_str/2
+ , replace_simple_var/2
+ ]).
+
-export_type([msg/0]).
-include_lib("emqx/include/emqx.hrl").
@@ -36,7 +40,7 @@
-type variables() :: #{
mountpoint := undefined | binary(),
- to_remote_topic := binary(),
+ remote_topic := binary(),
qos := original | integer(),
retain := original | boolean(),
payload := binary()
@@ -59,8 +63,8 @@ to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false),
MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)),
to_remote_msg(MapMsg, Vars);
-to_remote_msg(MapMsg, #{to_remote_topic := TopicToken, payload := PayloadToken,
- qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
+to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
+ remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
@@ -75,8 +79,8 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
%% published from remote node over a MQTT connection
to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
- #{to_local_topic := TopicToken, payload := PayloadToken,
- qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
+ #{local_topic := TopicToken, payload := PayloadToken,
+ local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = replace_vars_in_str(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index a29acb8f8..6fabb6b5d 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -52,7 +52,7 @@ In 'cluster_shareload' mode, the incomming load from the remote broker is shared
using shared subscription.
Note that the 'clientid' is suffixed by the node name, this is to avoid
clientid conflicts between different nodes. And we can only use shared subscription
-topic filters for 'from_remote_topic'.
+topic filters for 'remote_topic' of ingress connections.
"""
})}
, {server,
@@ -101,24 +101,31 @@ Queue messages in disk files.
] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress") ->
- %% the message maybe subscribed by rules, in this case 'to_local_topic' is not necessary
- [ {from_remote_topic,
+ %% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
+ [ {remote_topic,
sc(binary(),
#{ nullable => false
, desc => "Receive messages from which topic of the remote broker"
})}
- , {subscribe_qos,
+ , {remote_qos,
sc(qos(),
#{ default => 1
, desc => "The QoS level to be used when subscribing to the remote broker"
})}
- , {to_local_topic,
+ , {local_topic,
sc(binary(),
#{ desc => """
Send messages to which topic of the local broker.
Template with variables is allowed.
"""
})}
+ , {local_qos,
+ sc(qos(),
+ #{ default => <<"${qos}">>
+ , desc => """
+The QoS of the MQTT message to be sent.
+Template with variables is allowed."""
+ })}
, {hookpoint,
sc(binary(),
#{ desc => """
@@ -128,12 +135,12 @@ The hookpoint will be triggered when there's any message received from the remot
] ++ common_inout_confs();
fields("egress") ->
- %% the message maybe sent from rules, in this case 'from_local_topic' is not necessary
- [ {from_local_topic,
+ %% the message maybe sent from rules, in this case 'local_topic' is not necessary
+ [ {local_topic,
sc(binary(),
#{ desc => "The local topic to be forwarded to the remote broker"
})}
- , {to_remote_topic,
+ , {remote_topic,
sc(binary(),
#{ default => <<"${topic}">>
, desc => """
@@ -141,6 +148,13 @@ Forward to which topic of the remote broker.
Template with variables is allowed.
"""
})}
+ , {remote_qos,
+ sc(qos(),
+ #{ default => <<"${qos}">>
+ , desc => """
+The QoS of the MQTT message to be sent.
+Template with variables is allowed."""
+ })}
] ++ common_inout_confs();
fields("replayq") ->
@@ -187,31 +201,24 @@ topic_mappings() ->
ingress_desc() -> """
The ingress config defines how this bridge receive messages from the remote MQTT broker, and then
send them to the local broker.
-Template with variables is allowed in 'to_local_topic', 'subscribe_qos', 'qos', 'retain',
+Template with variables is allowed in 'local_topic', 'remote_qos', 'qos', 'retain',
'payload'.
-NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also to_local_topic is
-configured, then messages got from the remote broker will be sent to both the 'to_local_topic' and
+NOTE: if this bridge is used as the input of a rule (emqx rule engine), and also local_topic is
+configured, then messages got from the remote broker will be sent to both the 'local_topic' and
the rule.
""".
egress_desc() -> """
The egress config defines how this bridge forwards messages from the local broker to the remote
broker.
-Template with variables is allowed in 'to_remote_topic', 'qos', 'retain', 'payload'.
-NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also from_local_topic
+Template with variables is allowed in 'remote_topic', 'qos', 'retain', 'payload'.
+NOTE: if this bridge is used as the output of a rule (emqx rule engine), and also local_topic
is configured, then both the data got from the rule and the MQTT messages that matches
-from_local_topic will be forwarded.
+local_topic will be forwarded.
""".
common_inout_confs() ->
- [ {qos,
- sc(qos(),
- #{ default => <<"${qos}">>
- , desc => """
-The QoS of the MQTT message to be sent.
-Template with variables is allowed."""
- })}
- , {retain,
+ [ {retain,
sc(hoconsc:union([boolean(), binary()]),
#{ default => <<"${retain}">>
, desc => """
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl
index 95424fe3a..5f6f4b69f 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl
@@ -226,16 +226,22 @@ open_replayq(Name, QCfg) ->
marshaller => fun ?MODULE:msg_marshaller/1}).
pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) ->
- ConnectOpts#{subscriptions => pre_process_in_out(InConf),
- forwards => pre_process_in_out(OutConf)}.
+ ConnectOpts#{subscriptions => pre_process_in_out(in, InConf),
+ forwards => pre_process_in_out(out, OutConf)}.
-pre_process_in_out(undefined) -> undefined;
-pre_process_in_out(Conf) when is_map(Conf) ->
- Conf1 = pre_process_conf(to_local_topic, Conf),
- Conf2 = pre_process_conf(to_remote_topic, Conf1),
- Conf3 = pre_process_conf(payload, Conf2),
- Conf4 = pre_process_conf(qos, Conf3),
- pre_process_conf(retain, Conf4).
+pre_process_in_out(_, undefined) -> undefined;
+pre_process_in_out(in, Conf) when is_map(Conf) ->
+ Conf1 = pre_process_conf(local_topic, Conf),
+ Conf2 = pre_process_conf(local_qos, Conf1),
+ pre_process_in_out_common(Conf2);
+pre_process_in_out(out, Conf) when is_map(Conf) ->
+ Conf1 = pre_process_conf(remote_topic, Conf),
+ Conf2 = pre_process_conf(remote_qos, Conf1),
+ pre_process_in_out_common(Conf2).
+
+pre_process_in_out_common(Conf) ->
+ Conf1 = pre_process_conf(payload, Conf),
+ pre_process_conf(retain, Conf1).
pre_process_conf(Key, Conf) ->
case maps:find(Key, Conf) of
diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
index 760160df4..699579d7f 100644
--- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
@@ -46,11 +46,11 @@
#{
<<"connector">> => ID,
<<"direction">> => <<"ingress">>,
- <<"from_remote_topic">> => <<"remote_topic/#">>,
- <<"to_local_topic">> => <<"local_topic/${topic}">>,
- <<"subscribe_qos">> => 1,
+ <<"remote_topic">> => <<"remote_topic/#">>,
+ <<"remote_qos">> => 2,
+ <<"local_topic">> => <<"local_topic/${topic}">>,
+ <<"local_qos">> => <<"${qos}">>,
<<"payload">> => <<"${payload}">>,
- <<"qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">>
}).
@@ -58,10 +58,10 @@
#{
<<"connector">> => ID,
<<"direction">> => <<"egress">>,
- <<"from_local_topic">> => <<"local_topic/#">>,
- <<"to_remote_topic">> => <<"remote_topic/${topic}">>,
+ <<"local_topic">> => <<"local_topic/#">>,
+ <<"remote_topic">> => <<"remote_topic/${topic}">>,
<<"payload">> => <<"${payload}">>,
- <<"qos">> => <<"${qos}">>,
+ <<"remote_qos">> => <<"${qos}">>,
<<"retain">> => <<"${retain}">>
}).
@@ -125,6 +125,8 @@ t_mqtt_crud_apis(_) ->
%ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ , <<"type">> := ?CONNECTR_TYPE
+ , <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User1
, <<"password">> := <<"">>
@@ -157,6 +159,8 @@ t_mqtt_crud_apis(_) ->
%% list all connectors again, assert Connector2 is in it
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
?assertMatch([#{ <<"id">> := ?CONNECTR_ID
+ , <<"type">> := ?CONNECTR_TYPE
+ , <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2
, <<"password">> := <<"">>
@@ -167,6 +171,8 @@ t_mqtt_crud_apis(_) ->
%% get the connector by id
{ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ , <<"type">> := ?CONNECTR_TYPE
+ , <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2
, <<"password">> := <<"">>
From 7bcd38c1019ae582a005812cc47c20f0bbf8712a Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Fri, 17 Dec 2021 21:41:25 +0800
Subject: [PATCH 3/5] fix(bridge): update qos fields for swagger examples
---
apps/emqx_bridge/src/emqx_bridge_api.erl | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index b0657216f..6f2d5c7ad 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -172,8 +172,8 @@ info_example_basic(mqtt, ingress) ->
remote_topic => <<"aws/#">>,
remote_qos => 1,
local_topic => <<"from_aws/${topic}">>,
+ local_qos => <<"${qos}">>,
payload => <<"${payload}">>,
- qos => <<"${qos}">>,
retain => <<"${retain}">>
};
info_example_basic(mqtt, egress) ->
@@ -182,8 +182,8 @@ info_example_basic(mqtt, egress) ->
direction => egress,
local_topic => <<"emqx/#">>,
remote_topic => <<"from_emqx/${topic}">>,
+ remote_qos => <<"${qos}">>,
payload => <<"${payload}">>,
- qos => 1,
retain => false
}.
From 673a545aa2c02c400427633badcd3483b7bf2ecf Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Fri, 17 Dec 2021 22:15:56 +0800
Subject: [PATCH 4/5] fix(dialyzer): bad type specs
---
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
index 4a6eb71f4..eb483dcc5 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
@@ -41,7 +41,7 @@
-type variables() :: #{
mountpoint := undefined | binary(),
remote_topic := binary(),
- qos := original | integer(),
+ remote_qos := original | integer(),
retain := original | boolean(),
payload := binary()
}.
From 494c08f849084172ac9d0d81ee1587063a128252 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Sat, 18 Dec 2021 05:54:22 +0800
Subject: [PATCH 5/5] refactor(rule): add name field to request body of POST
/rules
---
apps/emqx_rule_engine/include/rule_engine.hrl | 1 +
.../emqx_rule_engine/src/emqx_rule_api_schema.erl | 15 +++++++++------
apps/emqx_rule_engine/src/emqx_rule_engine.erl | 1 +
.../emqx_rule_engine/src/emqx_rule_engine_api.erl | 12 ++++++------
.../src/emqx_rule_engine_schema.erl | 10 +++++++++-
5 files changed, 26 insertions(+), 13 deletions(-)
diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl
index b7ec37d8e..4884f329e 100644
--- a/apps/emqx_rule_engine/include/rule_engine.hrl
+++ b/apps/emqx_rule_engine/include/rule_engine.hrl
@@ -44,6 +44,7 @@
-type rule() ::
#{ id := rule_id()
+ , name := binary()
, sql := binary()
, outputs := [output()]
, enabled := boolean()
diff --git a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
index 1fe75447e..b3b7afe4e 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_api_schema.erl
@@ -38,14 +38,11 @@ roots() ->
].
fields("rule_creation") ->
- [ {"id", sc(binary(),
- #{ desc => "The Id of the rule", nullable => false
- , example => "my_rule_id"
- })}
- ] ++ emqx_rule_engine_schema:fields("rules");
+ emqx_rule_engine_schema:fields("rules");
fields("rule_info") ->
- [ {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})}
+ [ rule_id()
+ , {"metrics", sc(ref("metrics"), #{desc => "The metrics of the rule"})}
, {"node_metrics", sc(ref("node_metrics"), #{desc => "The metrics of the rule"})}
, {"from", sc(hoconsc:array(binary()),
#{desc => "The topics of the rule", example => "t/#"})}
@@ -182,5 +179,11 @@ qos() ->
{"qos", sc(hoconsc:union([typerefl:integer(0), typerefl:integer(1), typerefl:integer(2)]),
#{desc => "The Message QoS"})}.
+rule_id() ->
+ {"id", sc(binary(),
+ #{ desc => "The Id of the rule", nullable => false
+ , example => "293fb66f"
+ })}.
+
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
index 974c6b8a4..35be28610 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
@@ -221,6 +221,7 @@ do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) ->
{ok, Select} ->
Rule = #{
id => RuleId,
+ name => maps:get(name, Params, <<"">>),
created_at => erlang:system_time(millisecond),
enabled => maps:get(enabled, Params, true),
sql => Sql,
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
index 75238fb71..7cfeb5d7e 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
@@ -59,9 +59,6 @@ error_schema(Code, Message) ->
rule_creation_schema() ->
ref(emqx_rule_api_schema, "rule_creation").
-rule_update_schema() ->
- ref(emqx_rule_engine_schema, "rules").
-
rule_test_schema() ->
ref(emqx_rule_api_schema, "rule_test").
@@ -120,7 +117,7 @@ schema("/rules/:id") ->
description => <<"Update a rule by given Id to all nodes in the cluster">>,
summary => <<"Update a Rule">>,
parameters => param_path_id(),
- requestBody => rule_update_schema(),
+ requestBody => rule_creation_schema(),
responses => #{
400 => error_schema('BAD_ARGS', "Invalid Parameters"),
200 => rule_info_schema()
@@ -167,7 +164,8 @@ param_path_id() ->
Records = emqx_rule_engine:get_rules_ordered_by_ts(),
{200, format_rule_resp(Records)};
-'/rules'(post, #{body := #{<<"id">> := Id} = Params}) ->
+'/rules'(post, #{body := Params}) ->
+ Id = maps:get(<<"id">>, Params, list_to_binary(emqx_misc:gen_id(8))),
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
case emqx_rule_engine:get_rule(Id) of
{ok, _Rule} ->
@@ -230,7 +228,8 @@ err_msg(Msg) ->
format_rule_resp(Rules) when is_list(Rules) ->
[format_rule_resp(R) || R <- Rules];
-format_rule_resp(#{ id := Id, created_at := CreatedAt,
+format_rule_resp(#{ id := Id, name := Name,
+ created_at := CreatedAt,
from := Topics,
outputs := Output,
sql := SQL,
@@ -238,6 +237,7 @@ format_rule_resp(#{ id := Id, created_at := CreatedAt,
description := Descr}) ->
NodeMetrics = get_rule_metrics(Id),
#{id => Id,
+ name => Name,
from => Topics,
outputs => format_output(Output),
sql => SQL,
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
index 36cf48da0..ba516bfa7 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_schema.erl
@@ -39,7 +39,8 @@ fields("rule_engine") ->
];
fields("rules") ->
- [ {"sql", sc(binary(),
+ [ rule_name()
+ , {"sql", sc(binary(),
#{ desc => """
SQL query to transform the messages.
Example: SELECT * FROM \"test/topic\" WHERE payload.x = 1
@@ -177,6 +178,13 @@ of the rule, then the string \"undefined\" is used.
})}
].
+rule_name() ->
+ {"name", sc(binary(),
+ #{ desc => "The name of the rule"
+ , default => ""
+ , example => "foo"
+ })}.
+
outputs() ->
[ binary()
, ref("builtin_output_republish")