From d76f82d3d2456637f26308063e10961aedf797c2 Mon Sep 17 00:00:00 2001
From: Shawn <506895667@qq.com>
Date: Wed, 8 Dec 2021 18:16:38 +0800
Subject: [PATCH] fix(bridges): add new option 'enable' for bridge configs
---
apps/emqx_bridge/etc/emqx_bridge.conf | 3 +
apps/emqx_bridge/src/emqx_bridge.erl | 32 +++++----
apps/emqx_bridge/src/emqx_bridge_api.erl | 69 +++++++++++--------
apps/emqx_bridge/src/emqx_bridge_app.erl | 23 ++++++-
.../src/emqx_bridge_http_schema.erl | 8 ++-
.../src/emqx_bridge_mqtt_schema.erl | 29 ++++----
apps/emqx_bridge/src/emqx_bridge_schema.erl | 38 ++++++----
.../test/emqx_bridge_api_SUITE.erl | 22 ++----
8 files changed, 133 insertions(+), 91 deletions(-)
diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf
index c658bc4ce..a3a07aa7f 100644
--- a/apps/emqx_bridge/etc/emqx_bridge.conf
+++ b/apps/emqx_bridge/etc/emqx_bridge.conf
@@ -4,6 +4,7 @@
## MQTT bridges to/from another MQTT broker
#bridges.mqtt.my_ingress_mqtt_bridge {
+# enable = true
# connector = "mqtt:my_mqtt_connector"
# direction = ingress
# ## topic mappings for this bridge
@@ -16,6 +17,7 @@
#}
#
#bridges.mqtt.my_egress_mqtt_bridge {
+# enable = true
# connector = "mqtt:my_mqtt_connector"
# direction = egress
# ## topic mappings for this bridge
@@ -28,6 +30,7 @@
#
## HTTP bridges to an HTTP server
#bridges.http.my_http_bridge {
+# enable = true
# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url
# url = "http://localhost:9901/messages/${topic}"
# request_timeout = "30s"
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index fdf7de3f0..89455c229 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -18,7 +18,8 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
--export([post_config_update/5]).
+-export([ post_config_update/5
+ ]).
-export([ load_hook/0
, unload_hook/0
@@ -103,14 +104,13 @@ bridge_type(emqx_connector_http) -> http.
post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated}
= diff_confs(NewConf, OldConf),
- Result = perform_bridge_changes([
+ _ = perform_bridge_changes([
{fun remove/3, Removed},
{fun create/3, Added},
{fun update/3, Updated}
]),
ok = unload_hook(),
- ok = load_hook(NewConf),
- Result.
+ ok = load_hook(NewConf).
perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok).
@@ -187,15 +187,11 @@ restart(Type, Name) ->
create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
- ResId = resource_id(Type, Name),
- case emqx_resource:create_local(ResId,
- emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf)) of
- {ok, already_created} ->
- emqx_resource:get_instance(ResId);
- {ok, Data} ->
- {ok, Data};
- {error, Reason} ->
- {error, Reason}
+ case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
+ parse_confs(Type, Name, Conf)) of
+ {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
+ {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
+ {error, Reason} -> {error, Reason}
end.
update(Type, Name, {_OldConf, Conf}) ->
@@ -209,7 +205,10 @@ update(Type, Name, {_OldConf, Conf}) ->
%%
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
config => Conf}),
- recreate(Type, Name, Conf).
+ case recreate(Type, Name, Conf) of
+ {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
+ {error, _} = Err -> Err
+ end.
recreate(Type, Name) ->
recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])).
@@ -326,6 +325,11 @@ parse_url(Url) ->
error({invalid_url, Url})
end.
+maybe_disable_bridge(Type, Name, Conf) ->
+ case maps:get(enable, Conf, true) of
+ false -> stop(Type, Name);
+ true -> ok
+ end.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 5b2b62d82..f7f249be6 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -26,7 +26,7 @@
%% API callbacks
-export(['/bridges'/2, '/bridges/:id'/2,
- '/nodes/:node/bridges/:id/operation/:operation'/2]).
+ '/bridges/:id/operation/:operation'/2]).
-export([ list_local_bridges/1
, lookup_from_local_node/2
@@ -38,11 +38,12 @@
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge:parse_bridge_id(Id) of
- {BridgeType, BridgeName} -> EXPR
+ {BridgeType, BridgeName} ->
+ EXPR
catch
error:{invalid_bridge_id, Id0} ->
- {400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
- ". Bridge Ids must be of format {type}:{name}">>}}
+ {400, error_msg('INVALID_ID', <<"invalid_bridge_id: ", Id0/binary,
+ ". Bridge Ids must be of format {type}:{name}">>)}
end).
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
@@ -67,7 +68,7 @@ namespace() -> "bridge".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
-paths() -> ["/bridges", "/bridges/:id", "/nodes/:node/bridges/:id/operation/:operation"].
+paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation"].
error_schema(Code, Message) ->
[ {code, mk(string(), #{example => Code})}
@@ -78,9 +79,6 @@ get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(),
bridge_info_examples(get)).
-param_path_node() ->
- path_param(node, binary(), atom_to_binary(node(), utf8)).
-
param_path_operation() ->
path_param(operation, enum([start, stop, restart]), <<"start">>).
@@ -129,7 +127,10 @@ info_example(Type, Direction, Method) ->
method_example(Type, Direction, get) ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
- SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
+ SName = case Type of
+ http -> "my_" ++ SType ++ "_bridge";
+ _ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
+ end,
#{
id => bin(SType ++ ":" ++ SName),
type => bin(SType),
@@ -138,7 +139,10 @@ method_example(Type, Direction, get) ->
method_example(Type, Direction, post) ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
- SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
+ SName = case Type of
+ http -> "my_" ++ SType ++ "_bridge";
+ _ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
+ end,
#{
type => bin(SType),
name => bin(SName)
@@ -247,15 +251,14 @@ schema("/bridges/:id") ->
}
};
-schema("/nodes/:node/bridges/:id/operation/:operation") ->
+schema("/bridges/:id/operation/:operation") ->
#{
- operationId => '/nodes/:node/bridges/:id/operation/:operation',
+ operationId => '/bridges/:id/operation/:operation',
post => #{
tags => [<<"bridges">>],
summary => <<"Start/Stop/Restart Bridge">>,
description => <<"Start/Stop/Restart bridges on a specific node">>,
parameters => [
- param_path_node(),
param_path_id(),
param_path_operation()
],
@@ -269,7 +272,8 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) ->
BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
case emqx_bridge:lookup(BridgeType, BridgeName) of
- {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
+ {ok, _} ->
+ {400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
{error, not_found} ->
case ensure_bridge_created(BridgeType, BridgeName, Conf) of
ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201);
@@ -296,7 +300,7 @@ list_local_bridges(Node) ->
{error, Error} -> {400, Error}
end;
{error, not_found} ->
- {404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
+ {404, error_msg('NOT_FOUND',<<"bridge not found">>)}
end);
'/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
@@ -305,7 +309,7 @@ list_local_bridges(Node) ->
#{override_to => cluster}) of
{ok, _} -> {204};
{error, Reason} ->
- {500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
+ {500, error_msg('UNKNOWN_ERROR', Reason)}
end).
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
@@ -324,20 +328,25 @@ lookup_from_local_node(BridgeType, BridgeName) ->
Error -> Error
end.
-'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings :=
- #{node := Node, id := Id, operation := Op}}) ->
- OperFun =
- fun (<<"start">>) -> start;
- (<<"stop">>) -> stop;
- (<<"restart">>) -> restart
- end,
- ?TRY_PARSE_ID(Id,
- case rpc_call(binary_to_atom(Node, latin1), emqx_bridge, OperFun(Op),
- [BridgeType, BridgeName]) of
- ok -> {200};
- {error, Reason} ->
- {500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
- end).
+'/bridges/:id/operation/:operation'(post, #{bindings :=
+ #{id := Id, operation := Op}}) ->
+ ?TRY_PARSE_ID(Id, case operation_to_conf_req(Op) of
+ invalid -> {404, error_msg('BAD_ARG', <<"invalid operation">>)};
+ UpReq ->
+ case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
+ UpReq, #{override_to => cluster}) of
+ {ok, _} -> {200};
+ {error, {pre_config_update, _, bridge_not_found}} ->
+ {404, error_msg('NOT_FOUND', <<"bridge not found">>)};
+ {error, Reason} ->
+ {500, error_msg('UNKNOWN_ERROR', Reason)}
+ end
+ end).
+
+operation_to_conf_req(<<"start">>) -> start;
+operation_to_conf_req(<<"stop">>) -> stop;
+operation_to_conf_req(<<"restart">>) -> restart;
+operation_to_conf_req(_) -> invalid.
ensure_bridge_created(BridgeType, BridgeName, Conf) ->
Conf1 = maps:without([<<"type">>, <<"name">>], Conf),
diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl
index 846b6dd00..519368523 100644
--- a/apps/emqx_bridge/src/emqx_bridge_app.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_app.erl
@@ -19,16 +19,35 @@
-export([start/2, stop/1]).
+-export([ pre_config_update/3
+ ]).
+
+-define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())).
+-define(LEAF_NODE_HDLR_PATH, (emqx_bridge:config_key_path() ++ ['?', '?'])).
+
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_bridge_sup:start_link(),
ok = emqx_bridge:load(),
ok = emqx_bridge:load_hook(),
- emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
+ emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
+ emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
{ok, Sup}.
stop(_State) ->
- emqx_conf:remove_handler(emqx_bridge:config_key_path()),
+ emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH),
+ emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH),
ok = emqx_bridge:unload_hook(),
ok.
+-define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart).
+pre_config_update(_, Oper, undefined) ?IS_OPER(Oper) ->
+ {error, bridge_not_found};
+pre_config_update(_, Oper, OldConfig) ?IS_OPER(Oper) ->
+ {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
+pre_config_update(_, Conf, _OldConfig) ->
+ {ok, Conf}.
+
%% internal functions
+operation_to_enable(start) -> true;
+operation_to_enable(stop) -> false;
+operation_to_enable(restart) -> true.
diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
index 2bef474bd..18fc59318 100644
--- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
@@ -79,7 +79,13 @@ fields("get") ->
] ++ fields("post").
basic_config() ->
- proplists:delete(base_url, emqx_connector_http:fields(config)).
+ [ {enable,
+ mk(boolean(),
+ #{ desc =>"Enable or disable this bridge"
+ , default => true
+ })}
+ ]
+ ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
%%======================================================================================
id_field() ->
diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
index d2cf6b1a8..4b6965349 100644
--- a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
@@ -11,28 +11,30 @@
roots() -> [].
fields("ingress") ->
- [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
- , emqx_bridge_schema:connector_name()
- ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
+ [ emqx_bridge_schema:direction_field(ingress, emqx_connector_mqtt_schema:ingress_desc())
+ ]
+ ++ emqx_bridge_schema:common_bridge_fields()
+ ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
fields("egress") ->
- [ direction(egress, emqx_connector_mqtt_schema:egress_desc())
- , emqx_bridge_schema:connector_name()
- ] ++ emqx_connector_mqtt_schema:fields("egress");
+ [ emqx_bridge_schema:direction_field(egress, emqx_connector_mqtt_schema:egress_desc())
+ ]
+ ++ emqx_bridge_schema:common_bridge_fields()
+ ++ emqx_connector_mqtt_schema:fields("egress");
fields("post_ingress") ->
[ type_field()
, name_field()
- ] ++ fields("ingress");
+ ] ++ proplists:delete(enable, fields("ingress"));
fields("post_egress") ->
[ type_field()
, name_field()
- ] ++ fields("egress");
+ ] ++ proplists:delete(enable, fields("egress"));
fields("put_ingress") ->
- fields("ingress");
+ proplists:delete(enable, fields("ingress"));
fields("put_egress") ->
- fields("egress");
+ proplists:delete(enable, fields("egress"));
fields("get_ingress") ->
[ id_field()
@@ -42,13 +44,6 @@ fields("get_egress") ->
] ++ fields("post_egress").
%%======================================================================================
-direction(Dir, Desc) ->
- {direction, mk(Dir,
- #{ nullable => false
- , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
"
- ++ Desc
- })}.
-
id_field() ->
{id, mk(binary(), #{desc => "The Bridge Id", example => "mqtt:my_mqtt_bridge"})}.
diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl
index ec875d0a4..e06065c7d 100644
--- a/apps/emqx_bridge/src/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl
@@ -11,7 +11,8 @@
, post_request/0
]).
--export([ connector_name/0
+-export([ common_bridge_fields/0
+ , direction_field/2
]).
%%======================================================================================
@@ -24,17 +25,6 @@
get_response() ->
http_schema("get").
-connector_name() ->
- {connector,
- mk(binary(),
- #{ nullable => false
- , desc =>"""
-The connector name to be used for this bridge.
-Connectors are configured as 'connectors.{type}.{name}',
-for example 'connectors.http.mybridge'.
-"""
- })}.
-
put_request() ->
http_schema("put").
@@ -49,6 +39,30 @@ http_schema(Method) ->
hoconsc:union([ref(emqx_bridge_http_schema, Method)
| Schemas]).
+common_bridge_fields() ->
+ [ {enable,
+ mk(boolean(),
+ #{ desc =>"Enable or disable this bridge"
+ , default => true
+ })}
+ , {connector,
+ mk(binary(),
+ #{ nullable => false
+ , desc =>"""
+The connector name to be used for this bridge.
+Connectors are configured as 'connectors.{type}.{name}',
+for example 'connectors.http.mybridge'.
+"""
+ })}
+ ].
+
+direction_field(Dir, Desc) ->
+ {direction, mk(Dir,
+ #{ nullable => false
+ , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
"
+ ++ Desc
+ })}.
+
%%======================================================================================
%% For config files
roots() -> [bridges].
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index 52c8a32de..716786a8f 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -234,37 +234,27 @@ t_start_stop_bridges(_) ->
, <<"url">> := URL1
}, jsx:decode(Bridge)),
%% stop it
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)),
%% start again
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "start"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(start), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% restart an already started bridge
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% stop it again
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
%% restart a stopped bridge
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
@@ -306,3 +296,5 @@ auth_header_() ->
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
+operation_path(Oper) ->
+ uri(["bridges", ?BRIDGE_ID, "operation", Oper]).