diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 01e5faf07..fdf7de3f0 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -37,6 +37,7 @@ , lookup/2 , lookup/3 , list/0 + , list_bridges_by_connector/1 , create/3 , recreate/2 , recreate/3 @@ -160,6 +161,10 @@ list() -> end, Bridges, maps:to_list(NameAndConf)) end, [], maps:to_list(emqx:get_raw_config([bridges], #{}))). +list_bridges_by_connector(ConnectorId) -> + [B || B = #{raw_config := #{<<"connector">> := Id}} <- list(), + ConnectorId =:= Id]. + lookup(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 95bc33a83..2b77d7ac2 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -220,8 +220,8 @@ schema("/connectors/:id") -> case emqx_connector:update(ConnType, ConnName, maps:without([<<"type">>, <<"name">>], Params)) of {ok, #{raw_config := RawConf}} -> - {201, RawConf#{<<"id">> => - emqx_connector:connector_id(ConnType, ConnName)}}; + Id = emqx_connector:connector_id(ConnType, ConnName), + {201, format_resp(Id, RawConf)}; {error, Error} -> {400, error_msg('BAD_ARG', Error)} end end. @@ -229,7 +229,7 @@ schema("/connectors/:id") -> '/connectors/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, case emqx_connector:lookup(ConnType, ConnName) of - {ok, Conf} -> {200, Conf#{<<"id">> => Id}}; + {ok, Conf} -> {200, format_resp(Id, Conf)}; {error, not_found} -> {404, error_msg('NOT_FOUND', <<"connector not found">>)} end); @@ -239,7 +239,8 @@ schema("/connectors/:id") -> case emqx_connector:lookup(ConnType, ConnName) of {ok, _} -> case emqx_connector:update(ConnType, ConnName, Params) of - {ok, #{raw_config := RawConf}} -> {200, RawConf#{<<"id">> => Id}}; + {ok, #{raw_config := RawConf}} -> + {200, format_resp(Id, RawConf)}; {error, Error} -> {400, error_msg('BAD_ARG', Error)} end; {error, not_found} -> @@ -263,5 +264,12 @@ error_msg(Code, Msg) when is_binary(Msg) -> error_msg(Code, Msg) -> #{code => Code, message => bin(io_lib:format("~p", [Msg]))}. +format_resp(ConnId, RawConf) -> + NumOfBridges = length(emqx_bridge:list_bridges_by_connector(ConnId)), + RawConf#{ + <<"id">> => ConnId, + <<"num_of_bridges">> => NumOfBridges + }. + bin(S) when is_list(S) -> list_to_binary(S). diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 6bc609fa8..079f17716 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -54,11 +54,14 @@ fields("config") -> emqx_connector_mqtt_schema:fields("config"); fields("get") -> - [{id, mk(binary(), + [ {id, mk(binary(), #{ desc => "The connector Id" , example => <<"mqtt:my_mqtt_connector">> - })}] - ++ fields("post"); + })} + , {num_of_bridges, mk(integer(), + #{ desc => "The current number of bridges that are using this connector" + })} + ] ++ fields("post"); fields("put") -> emqx_connector_mqtt_schema:fields("connector"); diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl index c386a829f..33d10802b 100644 --- a/apps/emqx_connector/src/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/emqx_connector_schema.erl @@ -13,6 +13,7 @@ , post_request/0 ]). +%% the config for http bridges do not need connectors -define(CONN_TYPES, [mqtt]). %%====================================================================================== diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index bbac76674..760160df4 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -199,9 +199,9 @@ t_mqtt_conn_bridge_ingress(_) -> , <<"name">> => ?CONNECTR_NAME }), - %ct:pal("---connector: ~p", [Connector]), ?assertMatch(#{ <<"id">> := ?CONNECTR_ID , <<"server">> := <<"127.0.0.1:1883">> + , <<"num_of_bridges">> := 0 , <<"username">> := User1 , <<"password">> := <<"">> , <<"proto_ver">> := <<"v4">> @@ -216,7 +216,6 @@ t_mqtt_conn_bridge_ingress(_) -> <<"name">> => ?BRIDGE_NAME_INGRESS }), - %ct:pal("---bridge: ~p", [Bridge]), ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS , <<"type">> := <<"mqtt">> , <<"status">> := <<"connected">> @@ -246,6 +245,12 @@ t_mqtt_conn_bridge_ingress(_) -> false end), + %% get the connector by id, verify the num_of_bridges now is 1 + {ok, 200, Connector1Str} = request(get, uri(["connectors", ?CONNECTR_ID]), []), + ?assertMatch(#{ <<"id">> := ?CONNECTR_ID + , <<"num_of_bridges">> := 1 + }, jsx:decode(Connector1Str)), + %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", ?BRIDGE_ID_INGRESS]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),