From bff41296aa140c7e4f2fcb34f35e11c571cba217 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Wed, 29 Dec 2021 16:56:24 +0800
Subject: [PATCH 01/13] fix(ecpool): update ecpool to 0.5.2
---
rebar.config | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/rebar.config b/rebar.config
index 7b08d38a0..61a3f4510 100644
--- a/rebar.config
+++ b/rebar.config
@@ -56,7 +56,7 @@
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.11.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.2.9"}}}
- , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
+ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
, {replayq, "0.3.3"}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}}
From ea1aaa9806a9f59aeef7641c8bf5209e61175c93 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Wed, 29 Dec 2021 16:58:02 +0800
Subject: [PATCH 02/13] fix(bridge): remove clientid config from MQTT bridges
Don't allow the user provide the clientid for connecting the remote broker.
We generate the clientid using the bridge id and node name.
---
apps/emqx_bridge/src/emqx_bridge_api.erl | 4 ++--
apps/emqx_connector/src/emqx_connector_api.erl | 4 ++--
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl | 6 +-----
3 files changed, 5 insertions(+), 9 deletions(-)
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 0f291ac1a..1358cccdc 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -431,8 +431,8 @@ rpc_multicall(Func, Args) ->
end.
filter_out_request_body(Conf) ->
- ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>, <<"node_metrics">>,
- <<"metrics">>, <<"node">>],
+ ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>,
+ <<"node_metrics">>, <<"metrics">>, <<"node">>],
maps:without(ExtraConfs, Conf).
rpc_call(Node, Fun, Args) ->
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index 5d6bddb6a..c20632658 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -218,7 +218,7 @@ schema("/connectors/:id") ->
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
{error, not_found} ->
case emqx_connector:update(ConnType, ConnName,
- maps:without([<<"type">>, <<"name">>], Params)) of
+ filter_out_request_body(Params)) of
{ok, #{raw_config := RawConf}} ->
Id = emqx_connector:connector_id(ConnType, ConnName),
{201, format_resp(Id, RawConf)};
@@ -279,7 +279,7 @@ format_resp(ConnId, RawConf) ->
}.
filter_out_request_body(Conf) ->
- ExtraConfs = [<<"num_of_bridges">>, <<"type">>, <<"name">>],
+ ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>, <<"name">>],
maps:without(ExtraConfs, Conf).
bin(S) when is_list(S) ->
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index 6fabb6b5d..bee1861bd 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -39,7 +39,7 @@ fields("config") ->
fields("connector") ->
[ {mode,
- sc(hoconsc:enum([cluster_singleton, cluster_shareload]),
+ sc(hoconsc:enum([cluster_shareload]),
#{ default => cluster_shareload
, desc => """
The mode of the MQTT Bridge. Can be one of 'cluster_singleton' or 'cluster_shareload'
@@ -76,10 +76,6 @@ topic filters for 'remote_topic' of ingress connections.
#{ default => "emqx"
, desc => "The password of the MQTT protocol"
})}
- , {clientid,
- sc(binary(),
- #{ desc => "The clientid of the MQTT protocol"
- })}
, {clean_start,
sc(boolean(),
#{ default => true
From c23436166b861e106d37b98d42d42916d0c5a086 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Wed, 29 Dec 2021 17:27:02 +0800
Subject: [PATCH 03/13] fix(bridge): HTTP connector should failed on non-200
status codes
---
apps/emqx_connector/src/emqx_connector_http.erl | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index ac0847a91..2f72c869e 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -211,8 +211,20 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
request => NRequest, reason => Reason,
connector => InstId}),
emqx_resource:query_failed(AfterQuery);
- _ ->
- emqx_resource:query_success(AfterQuery)
+ {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
+ emqx_resource:query_success(AfterQuery);
+ {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
+ emqx_resource:query_success(AfterQuery);
+ {ok, StatusCode, _} ->
+ ?SLOG(error, #{msg => "http connector do reqeust, received error response",
+ request => NRequest, connector => InstId,
+ status_code => StatusCode}),
+ emqx_resource:query_failed(AfterQuery);
+ {ok, StatusCode, _, _} ->
+ ?SLOG(error, #{msg => "http connector do reqeust, received error response",
+ request => NRequest, connector => InstId,
+ status_code => StatusCode}),
+ emqx_resource:query_failed(AfterQuery)
end,
Result.
From aefcd6275bae5e6744f129979bd76d7e4895a9e2 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Wed, 29 Dec 2021 17:40:56 +0800
Subject: [PATCH 04/13] fix(bridges): ingress MQTT bridges didn't increase
counters on msg received
---
.../src/emqx_connector_mqtt.erl | 24 ++++++++------
.../src/mqtt/emqx_connector_mqtt_mod.erl | 1 -
.../src/mqtt/emqx_connector_mqtt_msg.erl | 32 +++++++++++++++++--
3 files changed, 45 insertions(+), 12 deletions(-)
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index a9647b2c1..c216e905c 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -29,7 +29,7 @@
, bridges/0
]).
--export([on_message_received/2]).
+-export([on_message_received/3]).
%% callbacks of behaviour emqx_resource
-export([ on_start/2
@@ -105,14 +105,17 @@ drop_bridge(Name) ->
case supervisor:terminate_child(?MODULE, Name) of
ok ->
supervisor:delete_child(?MODULE, Name);
+ {error, not_found} ->
+ ok;
{error, Error} ->
{error, Error}
end.
%% ===================================================================
-%% When use this bridge as a data source, ?MODULE:on_message_received/2 will be called
+%% When use this bridge as a data source, ?MODULE:on_message_received will be called
%% if the bridge received msgs from the remote broker.
-on_message_received(Msg, HookPoint) ->
+on_message_received(Msg, HookPoint, InstId) ->
+ _ = emqx_resource:query(InstId, {message_received, Msg}),
emqx:run_hook(HookPoint, [Msg]).
%% ===================================================================
@@ -123,8 +126,8 @@ on_start(InstId, Conf) ->
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{
name => InstanceId,
- clientid => clientid(maps:get(clientid, Conf, InstId)),
- subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined)),
+ clientid => clientid(InstId),
+ subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), InstId),
forwards => make_forward_confs(maps:get(egress, Conf, undefined))
},
case ?MODULE:create_bridge(BridgeConf) of
@@ -149,6 +152,9 @@ on_stop(_InstId, #{name := InstanceId}) ->
connector => InstanceId, reason => Reason})
end.
+on_query(_InstId, {message_received, _Msg}, AfterQuery, _State) ->
+ emqx_resource:query_success(AfterQuery);
+
on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
@@ -166,15 +172,15 @@ ensure_mqtt_worker_started(InstanceId) ->
{error, Reason} -> {error, Reason}
end.
-make_sub_confs(EmptyMap) when map_size(EmptyMap) == 0 ->
+make_sub_confs(EmptyMap, _) when map_size(EmptyMap) == 0 ->
undefined;
-make_sub_confs(undefined) ->
+make_sub_confs(undefined, _) ->
undefined;
-make_sub_confs(SubRemoteConf) ->
+make_sub_confs(SubRemoteConf, InstId) ->
case maps:take(hookpoint, SubRemoteConf) of
error -> SubRemoteConf;
{HookPoint, SubConf} ->
- MFA = {?MODULE, on_message_received, [HookPoint]},
+ MFA = {?MODULE, on_message_received, [HookPoint, InstId]},
SubConf#{on_message_received => MFA}
end.
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl
index 3ab410391..d7abcda84 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl
@@ -168,7 +168,6 @@ handle_publish(Msg, undefined) ->
handle_publish(Msg, Vars) ->
?SLOG(debug, #{msg => "publish_to_local_broker",
message => Msg, vars => Vars}),
- emqx_metrics:inc('bridge.mqtt.message_received_from_remote', 1),
case Vars of
#{on_message_received := {Mod, Func, Args}} ->
_ = erlang:apply(Mod, Func, [Msg | Args]);
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
index 1357037ee..a0dd9eec1 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl
@@ -61,7 +61,7 @@ make_pub_vars(Mountpoint, Conf) when is_map(Conf) ->
-> exp_msg().
to_remote_msg(#message{flags = Flags0} = Msg, Vars) ->
Retain0 = maps:get(retain, Flags0, false),
- MapMsg = maps:put(retain, Retain0, emqx_message:to_map(Msg)),
+ MapMsg = maps:put(retain, Retain0, emqx_rule_events:eventmsg_publish(Msg)),
to_remote_msg(MapMsg, Vars);
to_remote_msg(MapMsg, #{remote_topic := TopicToken, payload := PayloadToken,
remote_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) when is_map(MapMsg) ->
@@ -78,9 +78,10 @@ to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
Msg#message{topic = topic(Mountpoint, Topic)}.
%% published from remote node over a MQTT connection
-to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
+to_broker_msg(#{dup := Dup, properties := Props} = MapMsg0,
#{local_topic := TopicToken, payload := PayloadToken,
local_qos := QoSToken, retain := RetainToken, mountpoint := Mountpoint}) ->
+ MapMsg = format_msg_received(MapMsg0),
Topic = replace_vars_in_str(TopicToken, MapMsg),
Payload = process_payload(PayloadToken, MapMsg),
QoS = replace_simple_var(QoSToken, MapMsg),
@@ -89,6 +90,33 @@ to_broker_msg(#{dup := Dup, properties := Props} = MapMsg,
emqx_message:set_flags(#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload))).
+format_msg_received(#{dup := Dup, payload := Payload, properties := Props,
+ qos := QoS, retain := Retain, topic := Topic}) ->
+ #{event => '$bridges/mqtt',
+ id => emqx_guid:to_hexstr(emqx_guid:gen()),
+ payload => Payload,
+ topic => Topic,
+ qos => QoS,
+ flags => #{dup => Dup, retain => Retain},
+ pub_props => printable_maps(Props),
+ timestamp => erlang:system_time(millisecond),
+ node => node()
+ }.
+
+printable_maps(undefined) -> #{};
+printable_maps(Headers) ->
+ maps:fold(
+ fun ('User-Property', V0, AccIn) when is_list(V0) ->
+ AccIn#{
+ 'User-Property' => maps:from_list(V0),
+ 'User-Property-Pairs' => [#{
+ key => Key,
+ value => Value
+ } || {Key, Value} <- V0]
+ };
+ (K, V0, AccIn) -> AccIn#{K => V0}
+ end, #{}, Headers).
+
process_payload([], Msg) ->
emqx_json:encode(Msg);
process_payload(Tks, Msg) ->
From 14089a572e5f9d7929d7ee409b192853c24eb90d Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Wed, 29 Dec 2021 18:09:56 +0800
Subject: [PATCH 05/13] fix(bridge): changes timeouts from 30s to 15s
---
apps/emqx_bridge/etc/emqx_bridge.conf | 4 ++--
apps/emqx_bridge/src/emqx_bridge_api.erl | 4 ++--
apps/emqx_bridge/src/emqx_bridge_http_schema.erl | 2 +-
apps/emqx_connector/src/emqx_connector_api.erl | 4 ++--
apps/emqx_connector/src/emqx_connector_http.erl | 2 +-
apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl | 4 ++--
6 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf
index 04f4709b8..de931ae12 100644
--- a/apps/emqx_bridge/etc/emqx_bridge.conf
+++ b/apps/emqx_bridge/etc/emqx_bridge.conf
@@ -34,8 +34,8 @@
# direction = egress
# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url
# url = "http://localhost:9901/messages/${topic}"
-# request_timeout = "30s"
-# connect_timeout = "30s"
+# request_timeout = "15s"
+# connect_timeout = "15s"
# max_retries = 3
# retry_interval = "10s"
# pool_type = "random"
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 1358cccdc..4def97327 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -158,8 +158,8 @@ method_example(_Type, _Direction, put) ->
info_example_basic(http, _) ->
#{
url => <<"http://localhost:9901/messages/${topic}">>,
- request_timeout => <<"30s">>,
- connect_timeout => <<"30s">>,
+ request_timeout => <<"15s">>,
+ connect_timeout => <<"15s">>,
max_retries => 3,
retry_interval => <<"10s">>,
pool_type => <<"random">>,
diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
index 540a6a070..a5937509c 100644
--- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
@@ -59,7 +59,7 @@ Template with variables is allowed.
"""
})}
, {request_timeout, mk(emqx_schema:duration_ms(),
- #{ default => <<"30s">>
+ #{ default => <<"15s">>
, desc =>"""
How long will the HTTP request timeout.
"""
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index c20632658..35e0c8ecb 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -107,14 +107,14 @@ info_example_basic(mqtt) ->
#{
mode => cluster_shareload,
server => <<"127.0.0.1:1883">>,
- reconnect_interval => <<"30s">>,
+ reconnect_interval => <<"15s">>,
proto_ver => <<"v4">>,
username => <<"foo">>,
password => <<"bar">>,
clientid => <<"foo">>,
clean_start => true,
keepalive => <<"300s">>,
- retry_interval => <<"30s">>,
+ retry_interval => <<"15s">>,
max_inflight => 100,
ssl => #{
enable => false
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index 2f72c869e..509e293cf 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -75,7 +75,7 @@ For example: http://localhost:9901/
})}
, {connect_timeout,
sc(emqx_schema:duration_ms(),
- #{ default => "30s"
+ #{ default => "15s"
, desc => "The timeout when connecting to the HTTP server"
})}
, {max_retries,
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index bee1861bd..303617a29 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -60,7 +60,7 @@ topic filters for 'remote_topic' of ingress connections.
#{ default => "127.0.0.1:1883"
, desc => "The host and port of the remote MQTT broker"
})}
- , {reconnect_interval, mk_duration("reconnect interval", #{default => "30s"})}
+ , {reconnect_interval, mk_duration("reconnect interval", #{default => "15s"})}
, {proto_ver,
sc(hoconsc:enum([v3, v4, v5]),
#{ default => v4
@@ -82,7 +82,7 @@ topic filters for 'remote_topic' of ingress connections.
, desc => "The clean-start or the clean-session of the MQTT protocol"
})}
, {keepalive, mk_duration("keepalive", #{default => "300s"})}
- , {retry_interval, mk_duration("retry interval", #{default => "30s"})}
+ , {retry_interval, mk_duration("retry interval", #{default => "15s"})}
, {max_inflight,
sc(integer(),
#{ default => 32
From 110ae62b24407d7bb282c1aec0e884d5e0ccdc78 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Wed, 29 Dec 2021 18:41:50 +0800
Subject: [PATCH 06/13] fix(bridge): don't concat names into ids
---
apps/emqx_bridge/src/emqx_bridge_api.erl | 2 +-
apps/emqx_connector/src/emqx_connector_api.erl | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 4def97327..900ad99ee 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -276,7 +276,7 @@ schema("/bridges/:id/operation/:operation") ->
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf0}) ->
Conf = filter_out_request_body(Conf0),
- BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
+ BridgeName = emqx_misc:gen_id(),
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index 35e0c8ecb..82b63476d 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -212,7 +212,7 @@ schema("/connectors/:id") ->
{200, [format_resp(Conn) || Conn <- emqx_connector:list()]};
'/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) ->
- ConnName = maps:get(<<"name">>, Params, emqx_misc:gen_id()),
+ ConnName = emqx_misc:gen_id(),
case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} ->
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
From d11cf6ad64531dd3620dcc1ba017d0433a0173d2 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Thu, 30 Dec 2021 12:31:13 +0800
Subject: [PATCH 07/13] fix(bridges): store connector name and bridge name to
config files
---
apps/emqx_bridge/src/emqx_bridge.erl | 5 +-
apps/emqx_bridge/src/emqx_bridge_api.erl | 9 +-
.../src/emqx_bridge_http_schema.erl | 4 +
apps/emqx_bridge/src/emqx_bridge_schema.erl | 6 +-
.../test/emqx_bridge_api_SUITE.erl | 116 +++++-------
.../emqx_connector/src/emqx_connector_api.erl | 9 +-
.../src/emqx_connector_http.erl | 2 +-
.../src/emqx_connector_schema.erl | 9 +
.../src/mqtt/emqx_connector_mqtt_schema.erl | 2 +-
.../test/emqx_connector_api_SUITE.erl | 177 ++++++++----------
.../src/emqx_resource_instance.erl | 5 +-
11 files changed, 168 insertions(+), 176 deletions(-)
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index a6681d3f1..d46ce217e 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -222,7 +222,10 @@ update(Type, Name, {OldConf, Conf}) ->
true ->
%% we don't need to recreate the bridge if this config change is only to
%% toggole the config 'bridge.{type}.{name}.enable'
- ok
+ case maps:get(enable, Conf, true) of
+ false -> stop(Type, Name);
+ true -> start(Type, Name)
+ end
end.
recreate(Type, Name) ->
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 900ad99ee..a5b9aa984 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -356,9 +356,8 @@ operation_to_conf_req(<<"restart">>) -> restart;
operation_to_conf_req(_) -> invalid.
ensure_bridge_created(BridgeType, BridgeName, Conf) ->
- Conf1 = maps:without([<<"type">>, <<"name">>], Conf),
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
- Conf1, #{override_to => cluster}) of
+ Conf, #{override_to => cluster}) of
{ok, _} -> ok;
{error, Reason} ->
{error, error_msg('BAD_ARG', Reason)}
@@ -411,12 +410,12 @@ aggregate_metrics(AllMetrics) ->
format_resp(#{id := Id, raw_config := RawConf,
resource_data := #{status := Status, metrics := Metrics}}) ->
- {Type, Name} = emqx_bridge:parse_bridge_id(Id),
+ {Type, BridgeName} = emqx_bridge:parse_bridge_id(Id),
IsConnected = fun(started) -> connected; (_) -> disconnected end,
RawConf#{
id => Id,
type => Type,
- name => Name,
+ name => maps:get(<<"name">>, RawConf, BridgeName),
node => node(),
status => IsConnected(Status),
metrics => Metrics
@@ -431,7 +430,7 @@ rpc_multicall(Func, Args) ->
end.
filter_out_request_body(Conf) ->
- ExtraConfs = [<<"id">>, <<"status">>, <<"node_status">>,
+ ExtraConfs = [<<"id">>, <<"type">>, <<"status">>, <<"node_status">>,
<<"node_metrics">>, <<"metrics">>, <<"node">>],
maps:without(ExtraConfs, Conf).
diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
index a5937509c..494911d21 100644
--- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
@@ -84,6 +84,10 @@ basic_config() ->
#{ desc => "Enable or disable this bridge"
, default => true
})}
+ , {name,
+ mk(binary(),
+ #{ desc => "Bridge name, used as a human-readable description of the bridge."
+ })}
, {direction,
mk(egress,
#{ desc => "The direction of this bridge, MUST be egress"
diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl
index 82fc79ebf..00d461098 100644
--- a/apps/emqx_bridge/src/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl
@@ -43,9 +43,13 @@ http_schema(Method) ->
common_bridge_fields() ->
[ {enable,
mk(boolean(),
- #{ desc =>"Enable or disable this bridge"
+ #{ desc => "Enable or disable this bridge"
, default => true
})}
+ , {name,
+ mk(binary(),
+ #{ desc => "Bridge name, used as a human-readable description of the bridge."
+ })}
, {connector,
mk(binary(),
#{ nullable => false
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index 7724d467c..807ad32f6 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -23,12 +23,13 @@
-define(CONF_DEFAULT, <<"bridges: {}">>).
-define(BRIDGE_TYPE, <<"http">>).
-define(BRIDGE_NAME, <<"test_bridge">>).
--define(BRIDGE_ID, <<"http:test_bridge">>).
-define(URL(PORT, PATH), list_to_binary(
io_lib:format("http://localhost:~s/~s",
[integer_to_list(PORT), PATH]))).
--define(HTTP_BRIDGE(URL),
+-define(HTTP_BRIDGE(URL, TYPE, NAME),
#{
+ <<"type">> => TYPE,
+ <<"name">> => NAME,
<<"url">> => URL,
<<"local_topic">> => <<"emqx_http/#">>,
<<"method">> => <<"post">>,
@@ -145,32 +146,18 @@ t_http_crud_apis(_) ->
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?HTTP_BRIDGE(URL1)#{
- <<"type">> => ?BRIDGE_TYPE,
- <<"name">> => ?BRIDGE_NAME
- }),
+ ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
%ct:pal("---bridge: ~p", [Bridge]),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
- , <<"type">> := ?BRIDGE_TYPE
- , <<"name">> := ?BRIDGE_NAME
- , <<"status">> := _
- , <<"node_status">> := [_|_]
- , <<"metrics">> := _
- , <<"node_metrics">> := [_|_]
- , <<"url">> := URL1
- }, jsx:decode(Bridge)),
-
- %% create a again returns an error
- {ok, 400, RetMsg} = request(post, uri(["bridges"]),
- ?HTTP_BRIDGE(URL1)#{
- <<"type">> => ?BRIDGE_TYPE,
- <<"name">> => ?BRIDGE_NAME
- }),
- ?assertMatch(
- #{ <<"code">> := _
- , <<"message">> := <<"bridge already exists">>
- }, jsx:decode(RetMsg)),
+ #{ <<"id">> := BridgeID
+ , <<"type">> := ?BRIDGE_TYPE
+ , <<"name">> := ?BRIDGE_NAME
+ , <<"status">> := _
+ , <<"node_status">> := [_|_]
+ , <<"metrics">> := _
+ , <<"node_metrics">> := [_|_]
+ , <<"url">> := URL1
+ } = jsx:decode(Bridge),
%% send an message to emqx and the message should be forwarded to the HTTP server
Body = <<"my msg">>,
@@ -188,9 +175,9 @@ t_http_crud_apis(_) ->
end),
%% update the request-path of the bridge
URL2 = ?URL(Port, "path2"),
- {ok, 200, Bridge2} = request(put, uri(["bridges", ?BRIDGE_ID]),
- ?HTTP_BRIDGE(URL2)),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ {ok, 200, Bridge2} = request(put, uri(["bridges", BridgeID]),
+ ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
+ ?assertMatch(#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
@@ -202,7 +189,7 @@ t_http_crud_apis(_) ->
%% list all bridges again, assert Bridge2 is in it
{ok, 200, Bridge2Str} = request(get, uri(["bridges"]), []),
- ?assertMatch([#{ <<"id">> := ?BRIDGE_ID
+ ?assertMatch([#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
@@ -213,8 +200,8 @@ t_http_crud_apis(_) ->
}], jsx:decode(Bridge2Str)),
%% get the bridge by id
- {ok, 200, Bridge3Str} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ {ok, 200, Bridge3Str} = request(get, uri(["bridges", BridgeID]), []),
+ ?assertMatch(#{ <<"id">> := BridgeID
, <<"type">> := ?BRIDGE_TYPE
, <<"name">> := ?BRIDGE_NAME
, <<"status">> := _
@@ -238,12 +225,12 @@ t_http_crud_apis(_) ->
end),
%% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% update a deleted bridge returns an error
- {ok, 404, ErrMsg2} = request(put, uri(["bridges", ?BRIDGE_ID]),
- ?HTTP_BRIDGE(URL2)),
+ {ok, 404, ErrMsg2} = request(put, uri(["bridges", BridgeID]),
+ ?HTTP_BRIDGE(URL2, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
?assertMatch(
#{ <<"code">> := _
, <<"message">> := <<"bridge not found">>
@@ -251,52 +238,51 @@ t_http_crud_apis(_) ->
ok.
t_start_stop_bridges(_) ->
+ %% assert we there's no bridges at first
+ {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
+
Port = start_http_server(fun handle_fun_200_ok/2),
URL1 = ?URL(Port, "abc"),
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?HTTP_BRIDGE(URL1)#{
- <<"type">> => ?BRIDGE_TYPE,
- <<"name">> => ?BRIDGE_NAME
- }),
+ ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, ?BRIDGE_NAME)),
%ct:pal("the bridge ==== ~p", [Bridge]),
- ?assertMatch(
- #{ <<"id">> := ?BRIDGE_ID
- , <<"type">> := ?BRIDGE_TYPE
- , <<"name">> := ?BRIDGE_NAME
- , <<"status">> := _
- , <<"node_status">> := [_|_]
- , <<"metrics">> := _
- , <<"node_metrics">> := [_|_]
- , <<"url">> := URL1
- }, jsx:decode(Bridge)),
+ #{ <<"id">> := BridgeID
+ , <<"type">> := ?BRIDGE_TYPE
+ , <<"name">> := ?BRIDGE_NAME
+ , <<"status">> := _
+ , <<"node_status">> := [_|_]
+ , <<"metrics">> := _
+ , <<"node_metrics">> := [_|_]
+ , <<"url">> := URL1
+ } = jsx:decode(Bridge),
%% stop it
- {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
- {ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
+ {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
+ ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)),
%% start again
- {ok, 200, <<>>} = request(post, operation_path(start), <<"">>),
- {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ {ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>),
+ {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
+ ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% restart an already started bridge
- {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
- {ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
+ {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
+ ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% stop it again
- {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
%% restart a stopped bridge
- {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
- {ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID
+ {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
+ {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
+ ?assertMatch(#{ <<"id">> := BridgeID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge4)),
%% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID]), []),
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []).
%%--------------------------------------------------------------------
@@ -332,5 +318,5 @@ auth_header_() ->
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
-operation_path(Oper) ->
- uri(["bridges", ?BRIDGE_ID, "operation", Oper]).
+operation_path(Oper, BridgeID) ->
+ uri(["bridges", BridgeID, "operation", Oper]).
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index 82b63476d..4989cf17e 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -155,8 +155,7 @@ schema("/connectors") ->
},
post => #{
tags => [<<"connectors">>],
- description => <<"Create a new connector by given Id
"
- "The ID must be of format '{type}:{name}'">>,
+ description => <<"Create a new connector">>,
summary => <<"Create connector">>,
requestBody => post_request_body_schema(),
responses => #{
@@ -270,16 +269,16 @@ format_resp(#{<<"id">> := Id} = RawConf) ->
format_resp(ConnId, RawConf) ->
NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)),
- {Type, Name} = emqx_connector:parse_connector_id(ConnId),
+ {Type, ConnName} = emqx_connector:parse_connector_id(ConnId),
RawConf#{
<<"id">> => ConnId,
<<"type">> => Type,
- <<"name">> => Name,
+ <<"name">> => maps:get(<<"name">>, RawConf, ConnName),
<<"num_of_bridges">> => NumOfBridges
}.
filter_out_request_body(Conf) ->
- ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>, <<"name">>],
+ ExtraConfs = [<<"clientid">>, <<"num_of_bridges">>, <<"type">>],
maps:without(ExtraConfs, Conf).
bin(S) when is_list(S) ->
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index 509e293cf..77d498c6b 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -112,7 +112,7 @@ If the request is provided, the caller can send HTTP requests via
emqx_resource:query(ResourceId, {send_message, BridgeId, Message})
"""
})}
- ] ++ emqx_connector_schema_lib:ssl_fields();
+ ] ++ emqx_connector_schema:common_fields() ++ emqx_connector_schema_lib:ssl_fields();
fields("request") ->
[ {method, hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{nullable => true})}
diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl
index 33d10802b..ed663ee60 100644
--- a/apps/emqx_connector/src/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema.erl
@@ -8,6 +8,8 @@
-export([roots/0, fields/1]).
+-export([common_fields/0]).
+
-export([ get_response/0
, put_request/0
, post_request/0
@@ -49,3 +51,10 @@ fields("connectors") ->
schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_connector_", Type])).
+
+common_fields() ->
+ [ {name,
+ mk(binary(),
+ #{ desc => "Connector name, used as a human-readable description of the connector."
+ })}
+ ].
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index 303617a29..44add053c 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -94,7 +94,7 @@ topic filters for 'remote_topic' of ingress connections.
Queue messages in disk files.
"""
})}
- ] ++ emqx_connector_schema_lib:ssl_fields();
+ ] ++ emqx_connector_schema:common_fields() ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress") ->
%% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
index 307852546..1a96a3596 100644
--- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
@@ -26,11 +26,8 @@
-define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>).
-define(CONNECTR_TYPE, <<"mqtt">>).
-define(CONNECTR_NAME, <<"test_connector">>).
--define(CONNECTR_ID, <<"mqtt:test_connector">>).
-define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>).
-define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>).
--define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>).
--define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>).
-define(MQTT_CONNECOTR(Username),
#{
<<"server">> => <<"127.0.0.1:1883">>,
@@ -123,32 +120,21 @@ t_mqtt_crud_apis(_) ->
, <<"name">> => ?CONNECTR_NAME
}),
- %ct:pal("---connector: ~p", [Connector]),
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
- , <<"type">> := ?CONNECTR_TYPE
- , <<"name">> := ?CONNECTR_NAME
- , <<"server">> := <<"127.0.0.1:1883">>
- , <<"username">> := User1
- , <<"password">> := <<"">>
- , <<"proto_ver">> := <<"v4">>
- , <<"ssl">> := #{<<"enable">> := false}
- }, jsx:decode(Connector)),
-
- %% create a again returns an error
- {ok, 400, RetMsg} = request(post, uri(["connectors"]),
- ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE
- , <<"name">> => ?CONNECTR_NAME
- }),
- ?assertMatch(
- #{ <<"code">> := _
- , <<"message">> := <<"connector already exists">>
- }, jsx:decode(RetMsg)),
+ #{ <<"id">> := ConnctorID
+ , <<"type">> := ?CONNECTR_TYPE
+ , <<"name">> := ?CONNECTR_NAME
+ , <<"server">> := <<"127.0.0.1:1883">>
+ , <<"username">> := User1
+ , <<"password">> := <<"">>
+ , <<"proto_ver">> := <<"v4">>
+ , <<"ssl">> := #{<<"enable">> := false}
+ } = jsx:decode(Connector),
%% update the request-path of the connector
User2 = <<"user2">>,
- {ok, 200, Connector2} = request(put, uri(["connectors", ?CONNECTR_ID]),
+ {ok, 200, Connector2} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR(User2)),
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ ?assertMatch(#{ <<"id">> := ConnctorID
, <<"server">> := <<"127.0.0.1:1883">>
, <<"username">> := User2
, <<"password">> := <<"">>
@@ -158,7 +144,7 @@ t_mqtt_crud_apis(_) ->
%% list all connectors again, assert Connector2 is in it
{ok, 200, Connector2Str} = request(get, uri(["connectors"]), []),
- ?assertMatch([#{ <<"id">> := ?CONNECTR_ID
+ ?assertMatch([#{ <<"id">> := ConnctorID
, <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">>
@@ -169,8 +155,8 @@ t_mqtt_crud_apis(_) ->
}], jsx:decode(Connector2Str)),
%% get the connector by id
- {ok, 200, Connector3Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ {ok, 200, Connector3Str} = request(get, uri(["connectors", ConnctorID]), []),
+ ?assertMatch(#{ <<"id">> := ConnctorID
, <<"type">> := ?CONNECTR_TYPE
, <<"name">> := ?CONNECTR_NAME
, <<"server">> := <<"127.0.0.1:1883">>
@@ -181,11 +167,11 @@ t_mqtt_crud_apis(_) ->
}, jsx:decode(Connector3Str)),
%% delete the connector
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
%% update a deleted connector returns an error
- {ok, 404, ErrMsg2} = request(put, uri(["connectors", ?CONNECTR_ID]),
+ {ok, 404, ErrMsg2} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR(User2)),
?assertMatch(
#{ <<"code">> := _
@@ -205,28 +191,28 @@ t_mqtt_conn_bridge_ingress(_) ->
, <<"name">> => ?CONNECTR_NAME
}),
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
- , <<"server">> := <<"127.0.0.1:1883">>
- , <<"num_of_bridges">> := 0
- , <<"username">> := User1
- , <<"password">> := <<"">>
- , <<"proto_ver">> := <<"v4">>
- , <<"ssl">> := #{<<"enable">> := false}
- }, jsx:decode(Connector)),
+ #{ <<"id">> := ConnctorID
+ , <<"server">> := <<"127.0.0.1:1883">>
+ , <<"num_of_bridges">> := 0
+ , <<"username">> := User1
+ , <<"password">> := <<"">>
+ , <<"proto_ver">> := <<"v4">>
+ , <<"ssl">> := #{<<"enable">> := false}
+ } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{
+ ?MQTT_BRIDGE_INGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_INGRESS
}),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS
- , <<"type">> := <<"mqtt">>
- , <<"status">> := <<"connected">>
- , <<"connector">> := ?CONNECTR_ID
- }, jsx:decode(Bridge)),
+ #{ <<"id">> := BridgeIDIngress
+ , <<"type">> := <<"mqtt">>
+ , <<"status">> := <<"connected">>
+ , <<"connector">> := ConnctorID
+ } = jsx:decode(Bridge),
%% we now test if the bridge works as expected
@@ -252,17 +238,17 @@ t_mqtt_conn_bridge_ingress(_) ->
end),
%% get the connector by id, verify the num_of_bridges now is 1
- {ok, 200, Connector1Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ {ok, 200, Connector1Str} = request(get, uri(["connectors", ConnctorID]), []),
+ ?assertMatch(#{ <<"id">> := ConnctorID
, <<"num_of_bridges">> := 1
}, jsx:decode(Connector1Str)),
%% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []),
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
ok.
@@ -279,29 +265,28 @@ t_mqtt_conn_bridge_egress(_) ->
}),
%ct:pal("---connector: ~p", [Connector]),
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
- , <<"server">> := <<"127.0.0.1:1883">>
- , <<"username">> := User1
- , <<"password">> := <<"">>
- , <<"proto_ver">> := <<"v4">>
- , <<"ssl">> := #{<<"enable">> := false}
- }, jsx:decode(Connector)),
+ #{ <<"id">> := ConnctorID
+ , <<"server">> := <<"127.0.0.1:1883">>
+ , <<"username">> := User1
+ , <<"password">> := <<"">>
+ , <<"proto_ver">> := <<"v4">>
+ , <<"ssl">> := #{<<"enable">> := false}
+ } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
+ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
- %ct:pal("---bridge: ~p", [Bridge]),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
- , <<"type">> := ?CONNECTR_TYPE
- , <<"name">> := ?BRIDGE_NAME_EGRESS
- , <<"status">> := <<"connected">>
- , <<"connector">> := ?CONNECTR_ID
- }, jsx:decode(Bridge)),
+ #{ <<"id">> := BridgeIDEgress
+ , <<"type">> := ?CONNECTR_TYPE
+ , <<"name">> := ?BRIDGE_NAME_EGRESS
+ , <<"status">> := <<"connected">>
+ , <<"connector">> := ConnctorID
+ } = jsx:decode(Bridge),
%% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>,
@@ -326,19 +311,19 @@ t_mqtt_conn_bridge_egress(_) ->
end),
%% verify the metrics of the bridge
- {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
+ {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
+ ?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)
, <<"node_metrics">> :=
[#{<<"node">> := _, <<"metrics">> := ?metrics(1, 1, 0, _, _, _)}]
}, jsx:decode(BridgeStr)),
%% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
ok.
@@ -358,37 +343,37 @@ t_mqtt_conn_update(_) ->
}),
%ct:pal("---connector: ~p", [Connector]),
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
- , <<"server">> := <<"127.0.0.1:1883">>
- }, jsx:decode(Connector)),
+ #{ <<"id">> := ConnctorID
+ , <<"server">> := <<"127.0.0.1:1883">>
+ } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
+ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
- , <<"type">> := <<"mqtt">>
- , <<"name">> := ?BRIDGE_NAME_EGRESS
- , <<"status">> := <<"connected">>
- , <<"connector">> := ?CONNECTR_ID
- }, jsx:decode(Bridge)),
+ #{ <<"id">> := BridgeIDEgress
+ , <<"type">> := <<"mqtt">>
+ , <<"name">> := ?BRIDGE_NAME_EGRESS
+ , <<"status">> := <<"connected">>
+ , <<"connector">> := ConnctorID
+ } = jsx:decode(Bridge),
%% then we try to update 'server' of the connector, to an unavailable IP address
%% the update should fail because of 'unreachable' or 'connrefused'
- {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ?CONNECTR_ID]),
+ {ok, 400, _ErrorMsg} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:2603">>)),
%% we fix the 'server' parameter to a normal one, it should work
- {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
+ {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1 : 1883">>)),
%% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_update2(_) ->
@@ -404,36 +389,36 @@ t_mqtt_conn_update2(_) ->
, <<"name">> => ?CONNECTR_NAME
}),
- ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
- , <<"server">> := <<"127.0.0.1:2603">>
- }, jsx:decode(Connector)),
+ #{ <<"id">> := ConnctorID
+ , <<"server">> := <<"127.0.0.1:2603">>
+ } = jsx:decode(Connector),
%% ... and a MQTT bridge, using POST
%% we bind this bridge to the connector created just now
{ok, 201, Bridge} = request(post, uri(["bridges"]),
- ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{
+ ?MQTT_BRIDGE_EGRESS(ConnctorID)#{
<<"type">> => ?CONNECTR_TYPE,
<<"name">> => ?BRIDGE_NAME_EGRESS
}),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
- , <<"type">> := <<"mqtt">>
- , <<"name">> := ?BRIDGE_NAME_EGRESS
- , <<"status">> := <<"disconnected">>
- , <<"connector">> := ?CONNECTR_ID
- }, jsx:decode(Bridge)),
+ #{ <<"id">> := BridgeIDEgress
+ , <<"type">> := <<"mqtt">>
+ , <<"name">> := ?BRIDGE_NAME_EGRESS
+ , <<"status">> := <<"disconnected">>
+ , <<"connector">> := ConnctorID
+ } = jsx:decode(Bridge),
%% we fix the 'server' parameter to a normal one, it should work
- {ok, 200, _} = request(put, uri(["connectors", ?CONNECTR_ID]),
+ {ok, 200, _} = request(put, uri(["connectors", ConnctorID]),
?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)),
- {ok, 200, BridgeStr} = request(get, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
- ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS
+ {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
+ ?assertMatch(#{ <<"id">> := BridgeIDEgress
, <<"status">> := <<"connected">>
}, jsx:decode(BridgeStr)),
%% delete the bridge
- {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_EGRESS]), []),
+ {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% delete the connector
- {ok, 204, <<>>} = request(delete, uri(["connectors", ?CONNECTR_ID]), []),
+ {ok, 204, <<>>} = request(delete, uri(["connectors", ConnctorID]), []),
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []).
t_mqtt_conn_testing(_) ->
diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl
index 497affa5e..ebc812805 100644
--- a/apps/emqx_resource/src/emqx_resource_instance.erl
+++ b/apps/emqx_resource/src/emqx_resource_instance.erl
@@ -216,7 +216,10 @@ do_remove(Mod, InstId, ResourceState) ->
do_restart(InstId) ->
case lookup(InstId) of
{ok, #{mod := Mod, state := ResourceState, config := Config} = Data} ->
- _ = emqx_resource:call_stop(InstId, Mod, ResourceState),
+ _ = case ResourceState of
+ undefine -> ok;
+ _ -> emqx_resource:call_stop(InstId, Mod, ResourceState)
+ end,
case emqx_resource:call_start(InstId, Mod, Config) of
{ok, NewResourceState} ->
ets:insert(emqx_resource_instance,
From 9d733c2ec51bcca1bdfa47c2c911ee009fe27dd3 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Thu, 30 Dec 2021 14:40:28 +0800
Subject: [PATCH 08/13] fix(resource): typos on restart a resource
---
apps/emqx_connector/src/emqx_connector_http.erl | 2 +-
apps/emqx_connector/src/emqx_connector_schema.erl | 9 ---------
.../src/mqtt/emqx_connector_mqtt_schema.erl | 7 ++++++-
apps/emqx_resource/src/emqx_resource_instance.erl | 2 +-
4 files changed, 8 insertions(+), 12 deletions(-)
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index 77d498c6b..509e293cf 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -112,7 +112,7 @@ If the request is provided, the caller can send HTTP requests via
emqx_resource:query(ResourceId, {send_message, BridgeId, Message})
"""
})}
- ] ++ emqx_connector_schema:common_fields() ++ emqx_connector_schema_lib:ssl_fields();
+ ] ++ emqx_connector_schema_lib:ssl_fields();
fields("request") ->
[ {method, hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{nullable => true})}
diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl
index ed663ee60..33d10802b 100644
--- a/apps/emqx_connector/src/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema.erl
@@ -8,8 +8,6 @@
-export([roots/0, fields/1]).
--export([common_fields/0]).
-
-export([ get_response/0
, put_request/0
, post_request/0
@@ -51,10 +49,3 @@ fields("connectors") ->
schema_mod(Type) ->
list_to_atom(lists:concat(["emqx_connector_", Type])).
-
-common_fields() ->
- [ {name,
- mk(binary(),
- #{ desc => "Connector name, used as a human-readable description of the connector."
- })}
- ].
diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
index 44add053c..b3484f5d9 100644
--- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
+++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl
@@ -55,6 +55,11 @@ clientid conflicts between different nodes. And we can only use shared subscript
topic filters for 'remote_topic' of ingress connections.
"""
})}
+ , {name,
+ sc(binary(),
+ #{ nullable => true
+ , desc => "Connector name, used as a human-readable description of the connector."
+ })}
, {server,
sc(emqx_schema:ip_port(),
#{ default => "127.0.0.1:1883"
@@ -94,7 +99,7 @@ topic filters for 'remote_topic' of ingress connections.
Queue messages in disk files.
"""
})}
- ] ++ emqx_connector_schema:common_fields() ++ emqx_connector_schema_lib:ssl_fields();
+ ] ++ emqx_connector_schema_lib:ssl_fields();
fields("ingress") ->
%% the message maybe subscribed by rules, in this case 'local_topic' is not necessary
diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl
index ebc812805..c23d55511 100644
--- a/apps/emqx_resource/src/emqx_resource_instance.erl
+++ b/apps/emqx_resource/src/emqx_resource_instance.erl
@@ -217,7 +217,7 @@ do_restart(InstId) ->
case lookup(InstId) of
{ok, #{mod := Mod, state := ResourceState, config := Config} = Data} ->
_ = case ResourceState of
- undefine -> ok;
+ undefined -> ok;
_ -> emqx_resource:call_stop(InstId, Mod, ResourceState)
end,
case emqx_resource:call_start(InstId, Mod, Config) of
From e2d899ad6e0f0ad6117c31c89151ab8e6adacf83 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Thu, 30 Dec 2021 15:30:41 +0800
Subject: [PATCH 09/13] fix(bridge): HTTP reqeust crash if using GET an DELETE
method
---
apps/emqx_connector/src/emqx_connector_http.erl | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl
index 509e293cf..8b366070d 100644
--- a/apps/emqx_connector/src/emqx_connector_http.erl
+++ b/apps/emqx_connector/src/emqx_connector_http.erl
@@ -201,7 +201,7 @@ on_query(InstId, {KeyOrNum, Method, Request, Timeout}, AfterQuery,
#{pool_name := PoolName, base_path := BasePath} = State) ->
?TRACE("QUERY", "http_connector_received",
#{request => Request, connector => InstId, state => State}),
- NRequest = update_path(BasePath, Request),
+ NRequest = formalize_request(Method, BasePath, Request),
case Result = ehttpc:request(case KeyOrNum of
undefined -> PoolName;
_ -> {PoolName, KeyOrNum}
@@ -310,10 +310,14 @@ check_ssl_opts(URLFrom, Conf) ->
{_, _} -> false
end.
-update_path(BasePath, {Path, Headers}) ->
- {filename:join(BasePath, Path), Headers};
-update_path(BasePath, {Path, Headers, Body}) ->
- {filename:join(BasePath, Path), Headers, Body}.
+formalize_request(Method, BasePath, {Path, Headers, _Body})
+ when Method =:= get; Method =:= delete ->
+ formalize_request(Method, BasePath, {Path, Headers});
+formalize_request(_Method, BasePath, {Path, Headers, Body}) ->
+ {filename:join(BasePath, Path), Headers, Body};
+
+formalize_request(_Method, BasePath, {Path, Headers}) ->
+ {filename:join(BasePath, Path), Headers}.
bin(Bin) when is_binary(Bin) ->
Bin;
From ea2d4674dfbb445495827fc372659ed92b590c06 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Thu, 30 Dec 2021 15:45:13 +0800
Subject: [PATCH 10/13] fix(resource): metrics were cleared after updating the
resource
---
apps/emqx_resource/src/emqx_resource_instance.erl | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl
index c23d55511..745b8b684 100644
--- a/apps/emqx_resource/src/emqx_resource_instance.erl
+++ b/apps/emqx_resource/src/emqx_resource_instance.erl
@@ -149,7 +149,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) ->
TestInstId = iolist_to_binary(emqx_misc:gen_id(16)),
case do_create_dry_run(TestInstId, ResourceType, Config) of
ok ->
- do_remove(ResourceType, InstId, ResourceState),
+ do_remove(ResourceType, InstId, ResourceState, false),
do_create(InstId, ResourceType, Config, #{force_create => true});
Error ->
Error
@@ -208,10 +208,15 @@ do_remove(InstId) ->
end.
do_remove(Mod, InstId, ResourceState) ->
+ do_remove(Mod, InstId, ResourceState, true).
+
+do_remove(Mod, InstId, ResourceState, ClearMetrics) ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
ets:delete(emqx_resource_instance, InstId),
- ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId),
- ok.
+ case ClearMetrics of
+ true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
+ false -> ok
+ end.
do_restart(InstId) ->
case lookup(InstId) of
From a42ab3d9dabf596fc6963947d34ef31e25297caf Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Thu, 30 Dec 2021 19:14:09 +0800
Subject: [PATCH 11/13] fix(rule): use emqx_conf:update/3 to make changes to
all nodes
---
apps/emqx_rule_engine/src/emqx_rule_engine_api.erl | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
index cbfda16db..d9138ffd1 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl
@@ -172,7 +172,7 @@ param_path_id() ->
{ok, _Rule} ->
{400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}};
not_found ->
- case emqx:update_config(ConfPath, Params, #{}) of
+ case emqx_conf:update(ConfPath, Params, #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{201, format_rule_resp(Rule)};
@@ -200,7 +200,7 @@ param_path_id() ->
'/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) ->
Params = filter_out_request_body(Params0),
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
- case emqx:update_config(ConfPath, Params, #{}) of
+ case emqx_conf:update(ConfPath, Params, #{}) of
{ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} ->
[Rule] = get_one_rule(AllRules, Id),
{200, format_rule_resp(Rule)};
From 626a4c471391af4a766ef41016ce719b60cd8cd1 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Thu, 30 Dec 2021 21:53:32 +0800
Subject: [PATCH 12/13] fix(machine): some apps not restarted after joining
into the cluster
---
apps/emqx_machine/src/emqx_machine_boot.erl | 29 ++++++++++++++-------
1 file changed, 20 insertions(+), 9 deletions(-)
diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl
index 24541990b..cffb914d8 100644
--- a/apps/emqx_machine/src/emqx_machine_boot.erl
+++ b/apps/emqx_machine/src/emqx_machine_boot.erl
@@ -96,7 +96,6 @@ reboot_apps() ->
, emqx_resource
, emqx_rule_engine
, emqx_bridge
- , emqx_bridge_mqtt
, emqx_plugin_libs
, emqx_management
, emqx_retainer
@@ -112,17 +111,17 @@ sorted_reboot_apps() ->
app_deps(App) ->
case application:get_key(App, applications) of
- undefined -> [];
+ undefined -> undefined;
{ok, List} -> lists:filter(fun(A) -> lists:member(A, reboot_apps()) end, List)
end.
sorted_reboot_apps(Apps) ->
G = digraph:new(),
try
- lists:foreach(fun({App, Deps}) -> add_app(G, App, Deps) end, Apps),
+ NoDepApps = add_apps_to_digraph(G, Apps),
case digraph_utils:topsort(G) of
Sorted when is_list(Sorted) ->
- Sorted;
+ Sorted ++ (NoDepApps -- Sorted);
false ->
Loops = find_loops(G),
error({circular_application_dependency, Loops})
@@ -131,17 +130,29 @@ sorted_reboot_apps(Apps) ->
digraph:delete(G)
end.
-add_app(G, App, undefined) ->
+add_apps_to_digraph(G, Apps) ->
+ lists:foldl(fun
+ ({App, undefined}, Acc) ->
+ ?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
+ Acc;
+ ({App, []}, Acc) ->
+ Acc ++ [App]; %% use '++' to keep the original order
+ ({App, Deps}, Acc) ->
+ add_app_deps_to_digraph(G, App, Deps),
+ Acc
+ end, [], Apps).
+
+add_app_deps_to_digraph(G, App, undefined) ->
?SLOG(debug, #{msg => "app_is_not_loaded", app => App}),
%% not loaded
- add_app(G, App, []);
-add_app(_G, _App, []) ->
+ add_app_deps_to_digraph(G, App, []);
+add_app_deps_to_digraph(_G, _App, []) ->
ok;
-add_app(G, App, [Dep | Deps]) ->
+add_app_deps_to_digraph(G, App, [Dep | Deps]) ->
digraph:add_vertex(G, App),
digraph:add_vertex(G, Dep),
digraph:add_edge(G, Dep, App), %% dep -> app as dependency
- add_app(G, App, Deps).
+ add_app_deps_to_digraph(G, App, Deps).
find_loops(G) ->
lists:filtermap(
From 94a596556095be2dd588035a3f81ef005dd97126 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Thu, 30 Dec 2021 23:46:50 +0800
Subject: [PATCH 13/13] fix(rule): dead lock when update configs for rules
---
apps/emqx_machine/test/emqx_machine_SUITE.erl | 2 +-
apps/emqx_rule_engine/src/emqx_rule_engine.erl | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/apps/emqx_machine/test/emqx_machine_SUITE.erl b/apps/emqx_machine/test/emqx_machine_SUITE.erl
index 03d9e6ba9..a760d2f5f 100644
--- a/apps/emqx_machine/test/emqx_machine_SUITE.erl
+++ b/apps/emqx_machine/test/emqx_machine_SUITE.erl
@@ -43,7 +43,7 @@ init_per_suite(Config) ->
%%
application:unload(emqx_authz),
- emqx_common_test_helpers:start_apps([]),
+ emqx_common_test_helpers:start_apps([emqx_conf]),
Config.
end_per_suite(_Config) ->
diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
index 5316ca5ef..6a579cbb0 100644
--- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl
+++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl
@@ -187,11 +187,11 @@ init([]) ->
{ok, #{}}.
handle_call({insert_rule, Rule}, _From, State) ->
- _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_insert_rule, [Rule]),
+ do_insert_rule(Rule),
{reply, ok, State};
handle_call({delete_rule, Rule}, _From, State) ->
- _ = emqx_plugin_libs_rule:cluster_call(?MODULE, do_delete_rule, [Rule]),
+ do_delete_rule(Rule),
{reply, ok, State};
handle_call(Req, _From, State) ->