diff --git a/apps/emqx_bridge/etc/emqx_bridge.conf b/apps/emqx_bridge/etc/emqx_bridge.conf
index c658bc4ce..a3a07aa7f 100644
--- a/apps/emqx_bridge/etc/emqx_bridge.conf
+++ b/apps/emqx_bridge/etc/emqx_bridge.conf
@@ -4,6 +4,7 @@
## MQTT bridges to/from another MQTT broker
#bridges.mqtt.my_ingress_mqtt_bridge {
+# enable = true
# connector = "mqtt:my_mqtt_connector"
# direction = ingress
# ## topic mappings for this bridge
@@ -16,6 +17,7 @@
#}
#
#bridges.mqtt.my_egress_mqtt_bridge {
+# enable = true
# connector = "mqtt:my_mqtt_connector"
# direction = egress
# ## topic mappings for this bridge
@@ -28,6 +30,7 @@
#
## HTTP bridges to an HTTP server
#bridges.http.my_http_bridge {
+# enable = true
# ## NOTE: we cannot use placehodler variables in the `scheme://host:port` part of the url
# url = "http://localhost:9901/messages/${topic}"
# request_timeout = "30s"
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index 01e5faf07..89455c229 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -18,7 +18,8 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
--export([post_config_update/5]).
+-export([ post_config_update/5
+ ]).
-export([ load_hook/0
, unload_hook/0
@@ -37,6 +38,7 @@
, lookup/2
, lookup/3
, list/0
+ , list_bridges_by_connector/1
, create/3
, recreate/2
, recreate/3
@@ -102,14 +104,13 @@ bridge_type(emqx_connector_http) -> http.
post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
#{added := Added, removed := Removed, changed := Updated}
= diff_confs(NewConf, OldConf),
- Result = perform_bridge_changes([
+ _ = perform_bridge_changes([
{fun remove/3, Removed},
{fun create/3, Added},
{fun update/3, Updated}
]),
ok = unload_hook(),
- ok = load_hook(NewConf),
- Result.
+ ok = load_hook(NewConf).
perform_bridge_changes(Tasks) ->
perform_bridge_changes(Tasks, ok).
@@ -160,6 +161,10 @@ list() ->
end, Bridges, maps:to_list(NameAndConf))
end, [], maps:to_list(emqx:get_raw_config([bridges], #{}))).
+list_bridges_by_connector(ConnectorId) ->
+ [B || B = #{raw_config := #{<<"connector">> := Id}} <- list(),
+ ConnectorId =:= Id].
+
lookup(Type, Name) ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
lookup(Type, Name, RawConf).
@@ -182,15 +187,11 @@ restart(Type, Name) ->
create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
- ResId = resource_id(Type, Name),
- case emqx_resource:create_local(ResId,
- emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf)) of
- {ok, already_created} ->
- emqx_resource:get_instance(ResId);
- {ok, Data} ->
- {ok, Data};
- {error, Reason} ->
- {error, Reason}
+ case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
+ parse_confs(Type, Name, Conf)) of
+ {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
+ {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
+ {error, Reason} -> {error, Reason}
end.
update(Type, Name, {_OldConf, Conf}) ->
@@ -204,7 +205,10 @@ update(Type, Name, {_OldConf, Conf}) ->
%%
?SLOG(info, #{msg => "update bridge", type => Type, name => Name,
config => Conf}),
- recreate(Type, Name, Conf).
+ case recreate(Type, Name, Conf) of
+ {ok, _} -> maybe_disable_bridge(Type, Name, Conf);
+ {error, _} = Err -> Err
+ end.
recreate(Type, Name) ->
recreate(Type, Name, emqx:get_raw_config([bridges, Type, Name])).
@@ -321,6 +325,11 @@ parse_url(Url) ->
error({invalid_url, Url})
end.
+maybe_disable_bridge(Type, Name, Conf) ->
+ case maps:get(enable, Conf, true) of
+ false -> stop(Type, Name);
+ true -> ok
+ end.
bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str);
diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl
index 5b2b62d82..f7f249be6 100644
--- a/apps/emqx_bridge/src/emqx_bridge_api.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_api.erl
@@ -26,7 +26,7 @@
%% API callbacks
-export(['/bridges'/2, '/bridges/:id'/2,
- '/nodes/:node/bridges/:id/operation/:operation'/2]).
+ '/bridges/:id/operation/:operation'/2]).
-export([ list_local_bridges/1
, lookup_from_local_node/2
@@ -38,11 +38,12 @@
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge:parse_bridge_id(Id) of
- {BridgeType, BridgeName} -> EXPR
+ {BridgeType, BridgeName} ->
+ EXPR
catch
error:{invalid_bridge_id, Id0} ->
- {400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
- ". Bridge Ids must be of format {type}:{name}">>}}
+ {400, error_msg('INVALID_ID', <<"invalid_bridge_id: ", Id0/binary,
+ ". Bridge Ids must be of format {type}:{name}">>)}
end).
-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX),
@@ -67,7 +68,7 @@ namespace() -> "bridge".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
-paths() -> ["/bridges", "/bridges/:id", "/nodes/:node/bridges/:id/operation/:operation"].
+paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation"].
error_schema(Code, Message) ->
[ {code, mk(string(), #{example => Code})}
@@ -78,9 +79,6 @@ get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(),
bridge_info_examples(get)).
-param_path_node() ->
- path_param(node, binary(), atom_to_binary(node(), utf8)).
-
param_path_operation() ->
path_param(operation, enum([start, stop, restart]), <<"start">>).
@@ -129,7 +127,10 @@ info_example(Type, Direction, Method) ->
method_example(Type, Direction, get) ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
- SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
+ SName = case Type of
+ http -> "my_" ++ SType ++ "_bridge";
+ _ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
+ end,
#{
id => bin(SType ++ ":" ++ SName),
type => bin(SType),
@@ -138,7 +139,10 @@ method_example(Type, Direction, get) ->
method_example(Type, Direction, post) ->
SType = atom_to_list(Type),
SDir = atom_to_list(Direction),
- SName = "my_" ++ SDir ++ "_" ++ SType ++ "_bridge",
+ SName = case Type of
+ http -> "my_" ++ SType ++ "_bridge";
+ _ -> "my_" ++ SDir ++ "_" ++ SType ++ "_bridge"
+ end,
#{
type => bin(SType),
name => bin(SName)
@@ -247,15 +251,14 @@ schema("/bridges/:id") ->
}
};
-schema("/nodes/:node/bridges/:id/operation/:operation") ->
+schema("/bridges/:id/operation/:operation") ->
#{
- operationId => '/nodes/:node/bridges/:id/operation/:operation',
+ operationId => '/bridges/:id/operation/:operation',
post => #{
tags => [<<"bridges">>],
summary => <<"Start/Stop/Restart Bridge">>,
description => <<"Start/Stop/Restart bridges on a specific node">>,
parameters => [
- param_path_node(),
param_path_id(),
param_path_operation()
],
@@ -269,7 +272,8 @@ schema("/nodes/:node/bridges/:id/operation/:operation") ->
'/bridges'(post, #{body := #{<<"type">> := BridgeType} = Conf}) ->
BridgeName = maps:get(<<"name">>, Conf, emqx_misc:gen_id()),
case emqx_bridge:lookup(BridgeType, BridgeName) of
- {ok, _} -> {400, #{code => 'ALREADY_EXISTS', message => <<"bridge already exists">>}};
+ {ok, _} ->
+ {400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)};
{error, not_found} ->
case ensure_bridge_created(BridgeType, BridgeName, Conf) of
ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201);
@@ -296,7 +300,7 @@ list_local_bridges(Node) ->
{error, Error} -> {400, Error}
end;
{error, not_found} ->
- {404, #{code => 'NOT_FOUND', message => <<"bridge not found">>}}
+ {404, error_msg('NOT_FOUND',<<"bridge not found">>)}
end);
'/bridges/:id'(delete, #{bindings := #{id := Id}}) ->
@@ -305,7 +309,7 @@ list_local_bridges(Node) ->
#{override_to => cluster}) of
{ok, _} -> {204};
{error, Reason} ->
- {500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
+ {500, error_msg('UNKNOWN_ERROR', Reason)}
end).
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
@@ -324,20 +328,25 @@ lookup_from_local_node(BridgeType, BridgeName) ->
Error -> Error
end.
-'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings :=
- #{node := Node, id := Id, operation := Op}}) ->
- OperFun =
- fun (<<"start">>) -> start;
- (<<"stop">>) -> stop;
- (<<"restart">>) -> restart
- end,
- ?TRY_PARSE_ID(Id,
- case rpc_call(binary_to_atom(Node, latin1), emqx_bridge, OperFun(Op),
- [BridgeType, BridgeName]) of
- ok -> {200};
- {error, Reason} ->
- {500, #{code => 102, message => emqx_resource_api:stringify(Reason)}}
- end).
+'/bridges/:id/operation/:operation'(post, #{bindings :=
+ #{id := Id, operation := Op}}) ->
+ ?TRY_PARSE_ID(Id, case operation_to_conf_req(Op) of
+ invalid -> {404, error_msg('BAD_ARG', <<"invalid operation">>)};
+ UpReq ->
+ case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
+ UpReq, #{override_to => cluster}) of
+ {ok, _} -> {200};
+ {error, {pre_config_update, _, bridge_not_found}} ->
+ {404, error_msg('NOT_FOUND', <<"bridge not found">>)};
+ {error, Reason} ->
+ {500, error_msg('UNKNOWN_ERROR', Reason)}
+ end
+ end).
+
+operation_to_conf_req(<<"start">>) -> start;
+operation_to_conf_req(<<"stop">>) -> stop;
+operation_to_conf_req(<<"restart">>) -> restart;
+operation_to_conf_req(_) -> invalid.
ensure_bridge_created(BridgeType, BridgeName, Conf) ->
Conf1 = maps:without([<<"type">>, <<"name">>], Conf),
diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl
index 846b6dd00..519368523 100644
--- a/apps/emqx_bridge/src/emqx_bridge_app.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_app.erl
@@ -19,16 +19,35 @@
-export([start/2, stop/1]).
+-export([ pre_config_update/3
+ ]).
+
+-define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())).
+-define(LEAF_NODE_HDLR_PATH, (emqx_bridge:config_key_path() ++ ['?', '?'])).
+
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_bridge_sup:start_link(),
ok = emqx_bridge:load(),
ok = emqx_bridge:load_hook(),
- emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
+ emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
+ emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
{ok, Sup}.
stop(_State) ->
- emqx_conf:remove_handler(emqx_bridge:config_key_path()),
+ emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH),
+ emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH),
ok = emqx_bridge:unload_hook(),
ok.
+-define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart).
+pre_config_update(_, Oper, undefined) ?IS_OPER(Oper) ->
+ {error, bridge_not_found};
+pre_config_update(_, Oper, OldConfig) ?IS_OPER(Oper) ->
+ {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
+pre_config_update(_, Conf, _OldConfig) ->
+ {ok, Conf}.
+
%% internal functions
+operation_to_enable(start) -> true;
+operation_to_enable(stop) -> false;
+operation_to_enable(restart) -> true.
diff --git a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
index 2bef474bd..18fc59318 100644
--- a/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_http_schema.erl
@@ -79,7 +79,13 @@ fields("get") ->
] ++ fields("post").
basic_config() ->
- proplists:delete(base_url, emqx_connector_http:fields(config)).
+ [ {enable,
+ mk(boolean(),
+ #{ desc =>"Enable or disable this bridge"
+ , default => true
+ })}
+ ]
+ ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
%%======================================================================================
id_field() ->
diff --git a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
index d2cf6b1a8..4b6965349 100644
--- a/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_mqtt_schema.erl
@@ -11,28 +11,30 @@
roots() -> [].
fields("ingress") ->
- [ direction(ingress, emqx_connector_mqtt_schema:ingress_desc())
- , emqx_bridge_schema:connector_name()
- ] ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
+ [ emqx_bridge_schema:direction_field(ingress, emqx_connector_mqtt_schema:ingress_desc())
+ ]
+ ++ emqx_bridge_schema:common_bridge_fields()
+ ++ proplists:delete(hookpoint, emqx_connector_mqtt_schema:fields("ingress"));
fields("egress") ->
- [ direction(egress, emqx_connector_mqtt_schema:egress_desc())
- , emqx_bridge_schema:connector_name()
- ] ++ emqx_connector_mqtt_schema:fields("egress");
+ [ emqx_bridge_schema:direction_field(egress, emqx_connector_mqtt_schema:egress_desc())
+ ]
+ ++ emqx_bridge_schema:common_bridge_fields()
+ ++ emqx_connector_mqtt_schema:fields("egress");
fields("post_ingress") ->
[ type_field()
, name_field()
- ] ++ fields("ingress");
+ ] ++ proplists:delete(enable, fields("ingress"));
fields("post_egress") ->
[ type_field()
, name_field()
- ] ++ fields("egress");
+ ] ++ proplists:delete(enable, fields("egress"));
fields("put_ingress") ->
- fields("ingress");
+ proplists:delete(enable, fields("ingress"));
fields("put_egress") ->
- fields("egress");
+ proplists:delete(enable, fields("egress"));
fields("get_ingress") ->
[ id_field()
@@ -42,13 +44,6 @@ fields("get_egress") ->
] ++ fields("post_egress").
%%======================================================================================
-direction(Dir, Desc) ->
- {direction, mk(Dir,
- #{ nullable => false
- , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
"
- ++ Desc
- })}.
-
id_field() ->
{id, mk(binary(), #{desc => "The Bridge Id", example => "mqtt:my_mqtt_bridge"})}.
diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl
index ec875d0a4..e06065c7d 100644
--- a/apps/emqx_bridge/src/emqx_bridge_schema.erl
+++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl
@@ -11,7 +11,8 @@
, post_request/0
]).
--export([ connector_name/0
+-export([ common_bridge_fields/0
+ , direction_field/2
]).
%%======================================================================================
@@ -24,17 +25,6 @@
get_response() ->
http_schema("get").
-connector_name() ->
- {connector,
- mk(binary(),
- #{ nullable => false
- , desc =>"""
-The connector name to be used for this bridge.
-Connectors are configured as 'connectors.{type}.{name}',
-for example 'connectors.http.mybridge'.
-"""
- })}.
-
put_request() ->
http_schema("put").
@@ -49,6 +39,30 @@ http_schema(Method) ->
hoconsc:union([ref(emqx_bridge_http_schema, Method)
| Schemas]).
+common_bridge_fields() ->
+ [ {enable,
+ mk(boolean(),
+ #{ desc =>"Enable or disable this bridge"
+ , default => true
+ })}
+ , {connector,
+ mk(binary(),
+ #{ nullable => false
+ , desc =>"""
+The connector name to be used for this bridge.
+Connectors are configured as 'connectors.{type}.{name}',
+for example 'connectors.http.mybridge'.
+"""
+ })}
+ ].
+
+direction_field(Dir, Desc) ->
+ {direction, mk(Dir,
+ #{ nullable => false
+ , desc => "The direction of the bridge. Can be one of 'ingress' or 'egress'.
"
+ ++ Desc
+ })}.
+
%%======================================================================================
%% For config files
roots() -> [bridges].
diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
index 52c8a32de..716786a8f 100644
--- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
+++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl
@@ -234,37 +234,27 @@ t_start_stop_bridges(_) ->
, <<"url">> := URL1
}, jsx:decode(Bridge)),
%% stop it
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
{ok, 200, Bridge2} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"disconnected">>
}, jsx:decode(Bridge2)),
%% start again
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "start"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(start), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% restart an already started bridge
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
{ok, 200, Bridge3} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
}, jsx:decode(Bridge3)),
%% stop it again
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "stop"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(stop), <<"">>),
%% restart a stopped bridge
- {ok, 200, <<>>} = request(post,
- uri(["nodes", node(), "bridges", ?BRIDGE_ID, "operation", "restart"]),
- <<"">>),
+ {ok, 200, <<>>} = request(post, operation_path(restart), <<"">>),
{ok, 200, Bridge4} = request(get, uri(["bridges", ?BRIDGE_ID]), []),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID
, <<"status">> := <<"connected">>
@@ -306,3 +296,5 @@ auth_header_() ->
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
+operation_path(Oper) ->
+ uri(["bridges", ?BRIDGE_ID, "operation", Oper]).
diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl
index 95bc33a83..2b77d7ac2 100644
--- a/apps/emqx_connector/src/emqx_connector_api.erl
+++ b/apps/emqx_connector/src/emqx_connector_api.erl
@@ -220,8 +220,8 @@ schema("/connectors/:id") ->
case emqx_connector:update(ConnType, ConnName,
maps:without([<<"type">>, <<"name">>], Params)) of
{ok, #{raw_config := RawConf}} ->
- {201, RawConf#{<<"id">> =>
- emqx_connector:connector_id(ConnType, ConnName)}};
+ Id = emqx_connector:connector_id(ConnType, ConnName),
+ {201, format_resp(Id, RawConf)};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end
end.
@@ -229,7 +229,7 @@ schema("/connectors/:id") ->
'/connectors/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id,
case emqx_connector:lookup(ConnType, ConnName) of
- {ok, Conf} -> {200, Conf#{<<"id">> => Id}};
+ {ok, Conf} -> {200, format_resp(Id, Conf)};
{error, not_found} ->
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
end);
@@ -239,7 +239,8 @@ schema("/connectors/:id") ->
case emqx_connector:lookup(ConnType, ConnName) of
{ok, _} ->
case emqx_connector:update(ConnType, ConnName, Params) of
- {ok, #{raw_config := RawConf}} -> {200, RawConf#{<<"id">> => Id}};
+ {ok, #{raw_config := RawConf}} ->
+ {200, format_resp(Id, RawConf)};
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
end;
{error, not_found} ->
@@ -263,5 +264,12 @@ error_msg(Code, Msg) when is_binary(Msg) ->
error_msg(Code, Msg) ->
#{code => Code, message => bin(io_lib:format("~p", [Msg]))}.
+format_resp(ConnId, RawConf) ->
+ NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)),
+ RawConf#{
+ <<"id">> => ConnId,
+ <<"num_of_bridges">> => NumOfBridges
+ }.
+
bin(S) when is_list(S) ->
list_to_binary(S).
diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl
index 6bc609fa8..079f17716 100644
--- a/apps/emqx_connector/src/emqx_connector_mqtt.erl
+++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl
@@ -54,11 +54,14 @@ fields("config") ->
emqx_connector_mqtt_schema:fields("config");
fields("get") ->
- [{id, mk(binary(),
+ [ {id, mk(binary(),
#{ desc => "The connector Id"
, example => <<"mqtt:my_mqtt_connector">>
- })}]
- ++ fields("post");
+ })}
+ , {num_of_bridges, mk(integer(),
+ #{ desc => "The current number of bridges that are using this connector"
+ })}
+ ] ++ fields("post");
fields("put") ->
emqx_connector_mqtt_schema:fields("connector");
diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl
index c386a829f..33d10802b 100644
--- a/apps/emqx_connector/src/emqx_connector_schema.erl
+++ b/apps/emqx_connector/src/emqx_connector_schema.erl
@@ -13,6 +13,7 @@
, post_request/0
]).
+%% the config for http bridges do not need connectors
-define(CONN_TYPES, [mqtt]).
%%======================================================================================
diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
index bbac76674..760160df4 100644
--- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
+++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl
@@ -199,9 +199,9 @@ t_mqtt_conn_bridge_ingress(_) ->
, <<"name">> => ?CONNECTR_NAME
}),
- %ct:pal("---connector: ~p", [Connector]),
?assertMatch(#{ <<"id">> := ?CONNECTR_ID
, <<"server">> := <<"127.0.0.1:1883">>
+ , <<"num_of_bridges">> := 0
, <<"username">> := User1
, <<"password">> := <<"">>
, <<"proto_ver">> := <<"v4">>
@@ -216,7 +216,6 @@ t_mqtt_conn_bridge_ingress(_) ->
<<"name">> => ?BRIDGE_NAME_INGRESS
}),
- %ct:pal("---bridge: ~p", [Bridge]),
?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS
, <<"type">> := <<"mqtt">>
, <<"status">> := <<"connected">>
@@ -246,6 +245,12 @@ t_mqtt_conn_bridge_ingress(_) ->
false
end),
+ %% get the connector by id, verify the num_of_bridges now is 1
+ {ok, 200, Connector1Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []),
+ ?assertMatch(#{ <<"id">> := ?CONNECTR_ID
+ , <<"num_of_bridges">> := 1
+ }, jsx:decode(Connector1Str)),
+
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),