diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a5c24be9f..95b9c8759 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -394,12 +394,14 @@ aggregate_metrics(AllMetrics) -> end, InitMetrics, AllMetrics). format_resp(#{id := Id, raw_config := RawConf, - resource_data := #{mod := Mod, status := Status, metrics := Metrics}}) -> + resource_data := #{status := Status, metrics := Metrics}}) -> + {Type, Name} = emqx_bridge:parse_bridge_id(Id), IsConnected = fun(started) -> connected; (_) -> disconnected end, RawConf#{ id => Id, + type => Type, + name => Name, node => node(), - type => emqx_bridge:bridge_type(Mod), status => IsConnected(Status), metrics => Metrics }. diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 1510f439e..fe28dde9d 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -30,6 +30,8 @@ %% API callbacks -export(['/connectors_test'/2, '/connectors'/2, '/connectors/:id'/2]). +-define(CONN_TYPES, [mqtt]). + -define(TRY_PARSE_ID(ID, EXPR), try emqx_connector:parse_connector_id(Id) of {ConnType, ConnName} -> @@ -55,43 +57,54 @@ error_schema(Code, Message) -> put_request_body_schema() -> emqx_dashboard_swagger:schema_with_examples( - connector_info(put_req), connector_info_examples()). + emqx_connector_schema:put_request(), connector_info_examples(put)). post_request_body_schema() -> emqx_dashboard_swagger:schema_with_examples( - connector_info(post_req), connector_info_examples()). + emqx_connector_schema:post_request(), connector_info_examples(post)). get_response_body_schema() -> emqx_dashboard_swagger:schema_with_examples( - connector_info(), connector_info_examples()). + emqx_connector_schema:get_response(), connector_info_examples(get)). -connector_info() -> - connector_info(resp). +connector_info_array_example(Method) -> + [Config || #{value := Config} <- maps:values(connector_info_examples(Method))]. -connector_info(resp) -> - hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info") - ]); -connector_info(put_req) -> - hoconsc:union([ ref(emqx_connector_mqtt_schema, "connector") - ]); -connector_info(post_req) -> - hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector") - ]). +connector_info_examples(Method) -> + lists:foldl(fun(Type, Acc) -> + SType = atom_to_list(Type), + maps:merge(Acc, #{ + Type => #{ + summary => bin(string:uppercase(SType) ++ " Connector"), + value => info_example(Type, Method) + } + }) + end, #{}, ?CONN_TYPES). -connector_info_array_example() -> - [Config || #{value := Config} <- maps:values(connector_info_examples())]. +info_example(Type, Method) -> + maps:merge(info_example_basic(Type), + method_example(Type, Method)). -connector_info_examples() -> +method_example(Type, get) -> + SType = atom_to_list(Type), + SName = "my_" ++ SType ++ "_connector", #{ - mqtt => #{ - summary => <<"MQTT Bridge">>, - value => mqtt_info_example() - } - }. - -mqtt_info_example() -> + id => bin(SType ++ ":" ++ SName), + type => bin(SType), + name => bin(SName) + }; +method_example(Type, post) -> + SType = atom_to_list(Type), + SName = "my_" ++ SType ++ "_connector", + #{ + type => bin(SType), + name => bin(SName) + }; +method_example(_Type, put) -> + #{}. + +info_example_basic(mqtt) -> #{ - type => <<"mqtt">>, server => <<"127.0.0.1:1883">>, reconnect_interval => <<"30s">>, proto_ver => <<"v4">>, @@ -136,8 +149,8 @@ schema("/connectors") -> summary => <<"List connectors">>, responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( - array(connector_info()), - connector_info_array_example()) + array(emqx_connector_schema:get_response()), + connector_info_array_example(get)) } }, post => #{ @@ -198,17 +211,20 @@ schema("/connectors/:id") -> '/connectors'(get, _Request) -> {200, emqx_connector:list()}; -'/connectors'(post, #{body := #{<<"id">> := Id} = Params}) -> - ?TRY_PARSE_ID(Id, - case emqx_connector:lookup(ConnType, ConnName) of - {ok, _} -> - {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)}; - {error, not_found} -> - case emqx_connector:update(ConnType, ConnName, maps:remove(<<"id">>, Params)) of - {ok, #{raw_config := RawConf}} -> {201, RawConf#{<<"id">> => Id}}; - {error, Error} -> {400, error_msg('BAD_ARG', Error)} - end - end). +'/connectors'(post, #{body := #{<<"type">> := ConnType} = Params}) -> + ConnName = maps:get(<<"name">>, Params, emqx_misc:gen_id()), + case emqx_connector:lookup(ConnType, ConnName) of + {ok, _} -> + {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)}; + {error, not_found} -> + case emqx_connector:update(ConnType, ConnName, + maps:without([<<"type">>, <<"name">>], Params)) of + {ok, #{raw_config := RawConf}} -> + {201, RawConf#{<<"id">> => + emqx_connector:connector_id(ConnType, ConnName)}}; + {error, Error} -> {400, error_msg('BAD_ARG', Error)} + end + end. '/connectors/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, @@ -246,3 +262,10 @@ error_msg(Code, Msg) when is_binary(Msg) -> #{code => Code, message => Msg}; error_msg(Code, Msg) -> #{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}. + +bin(S) when is_atom(S) -> + atom_to_binary(S, utf8); +bin(S) when is_list(S) -> + list_to_binary(S); +bin(S) when is_binary(S) -> + S. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 9d11d7ac0..2cce0d195 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -40,6 +40,8 @@ -behaviour(hocon_schema). +-import(hoconsc, [mk/2]). + -export([ roots/0 , fields/1]). @@ -49,7 +51,25 @@ roots() -> fields("config"). fields("config") -> - emqx_connector_mqtt_schema:fields("config"). + emqx_connector_mqtt_schema:fields("config"); + +fields("get") -> + [{id, mk(binary(), + #{ desc => "The connector Id" + , example => <<"mqtt:my_mqtt_connector">> + })}] + ++ fields("post"); + +fields("put") -> + emqx_connector_mqtt_schema:fields("connector"); + +fields("post") -> + [ {type, mk(mqtt, #{desc => "The Connector Type"})} + , {name, mk(binary(), + #{ desc => "The Connector Name" + , example => <<"my_mqtt_connector">> + })} + ] ++ fields("put"). %% =================================================================== %% supervisor APIs diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl index 518d4e62d..c386a829f 100644 --- a/apps/emqx_connector/src/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/emqx_connector_schema.erl @@ -4,8 +4,33 @@ -include_lib("typerefl/include/types.hrl"). +-import(hoconsc, [mk/2, ref/2]). + -export([roots/0, fields/1]). +-export([ get_response/0 + , put_request/0 + , post_request/0 + ]). + +-define(CONN_TYPES, [mqtt]). + +%%====================================================================================== +%% For HTTP APIs + +get_response() -> + http_schema("get"). + +put_request() -> + http_schema("put"). + +post_request() -> + http_schema("post"). + +http_schema(Method) -> + Schemas = [ref(schema_mod(Type), Method) || Type <- ?CONN_TYPES], + hoconsc:union(Schemas). + %%====================================================================================== %% Hocon Schema Definitions @@ -14,23 +39,12 @@ roots() -> ["connectors"]. fields(connectors) -> fields("connectors"); fields("connectors") -> [ {mqtt, - sc(hoconsc:map(name, - hoconsc:union([ ref("mqtt_connector") + mk(hoconsc:map(name, + hoconsc:union([ ref(emqx_connector_mqtt_schema, "connector") ])), #{ desc => "MQTT bridges" })} - ]; + ]. -fields("mqtt_connector") -> - [ {type, sc(mqtt, #{desc => "The Connector Type"})} - %, {name, sc(binary(), #{desc => "The Connector Name"})} - ] - ++ emqx_connector_mqtt_schema:fields("connector"); - -fields("mqtt_connector_info") -> - [{id, sc(binary(), #{desc => "The connector Id", example => "mqtt:foo"})}] - ++ fields("mqtt_connector"). - -sc(Type, Meta) -> hoconsc:mk(Type, Meta). - -ref(Field) -> hoconsc:ref(?MODULE, Field). +schema_mod(Type) -> + list_to_atom(lists:concat(["emqx_connector_", Type])). diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 96f530563..bbac76674 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -24,7 +24,11 @@ -define(CONF_DEFAULT, <<"connectors: {}">>). -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). +-define(CONNECTR_TYPE, <<"mqtt">>). +-define(CONNECTR_NAME, <<"test_connector">>). -define(CONNECTR_ID, <<"mqtt:test_connector">>). +-define(BRIDGE_NAME_INGRESS, <<"ingress_test_bridge">>). +-define(BRIDGE_NAME_EGRESS, <<"egress_test_bridge">>). -define(BRIDGE_ID_INGRESS, <<"mqtt:ingress_test_bridge">>). -define(BRIDGE_ID_EGRESS, <<"mqtt:egress_test_bridge">>). -define(MQTT_CONNECOTR(Username), @@ -63,8 +67,8 @@ -define(metrics(MATCH, SUCC, FAILED, SPEED, SPEED5M, SPEEDMAX), #{<<"matched">> := MATCH, <<"success">> := SUCC, - <<"failed">> := FAILED, <<"speed">> := SPEED, - <<"speed_last5m">> := SPEED5M, <<"speed_max">> := SPEEDMAX}). + <<"failed">> := FAILED, <<"rate">> := SPEED, + <<"rate_last5m">> := SPEED5M, <<"rate_max">> := SPEEDMAX}). all() -> emqx_common_test_helpers:all(?MODULE). @@ -115,7 +119,9 @@ t_mqtt_crud_apis(_) -> %% POST /connectors/ will create a connector User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), %ct:pal("---connector: ~p", [Connector]), ?assertMatch(#{ <<"id">> := ?CONNECTR_ID @@ -128,7 +134,9 @@ t_mqtt_crud_apis(_) -> %% create a again returns an error {ok, 400, RetMsg} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), ?assertMatch( #{ <<"code">> := _ , <<"message">> := <<"connector already exists">> @@ -187,7 +195,9 @@ t_mqtt_conn_bridge_ingress(_) -> %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), %ct:pal("---connector: ~p", [Connector]), ?assertMatch(#{ <<"id">> := ?CONNECTR_ID @@ -201,11 +211,14 @@ t_mqtt_conn_bridge_ingress(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_INGRESS}), + ?MQTT_BRIDGE_INGRESS(?CONNECTR_ID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_INGRESS + }), %ct:pal("---bridge: ~p", [Bridge]), ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_INGRESS - , <<"bridge_type">> := <<"mqtt">> + , <<"type">> := <<"mqtt">> , <<"status">> := <<"connected">> , <<"connector">> := ?CONNECTR_ID }, jsx:decode(Bridge)), @@ -250,7 +263,9 @@ t_mqtt_conn_bridge_egress(_) -> %% then we add a mqtt connector, using POST User1 = <<"user1">>, {ok, 201, Connector} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR(User1)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR(User1)#{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), %ct:pal("---connector: ~p", [Connector]), ?assertMatch(#{ <<"id">> := ?CONNECTR_ID @@ -264,11 +279,15 @@ t_mqtt_conn_bridge_egress(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}), + ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), %ct:pal("---bridge: ~p", [Bridge]), ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"bridge_type">> := <<"mqtt">> + , <<"type">> := ?CONNECTR_TYPE + , <<"name">> := ?BRIDGE_NAME_EGRESS , <<"status">> := <<"connected">> , <<"connector">> := ?CONNECTR_ID }, jsx:decode(Bridge)), @@ -322,7 +341,10 @@ t_mqtt_conn_update(_) -> %% then we add a mqtt connector, using POST {ok, 201, Connector} = request(post, uri(["connectors"]), - ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"id">> => ?CONNECTR_ID}), + ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>) + #{ <<"type">> => ?CONNECTR_TYPE + , <<"name">> => ?CONNECTR_NAME + }), %ct:pal("---connector: ~p", [Connector]), ?assertMatch(#{ <<"id">> := ?CONNECTR_ID @@ -332,9 +354,13 @@ t_mqtt_conn_update(_) -> %% ... and a MQTT bridge, using POST %% we bind this bridge to the connector created just now {ok, 201, Bridge} = request(post, uri(["bridges"]), - ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{<<"id">> => ?BRIDGE_ID_EGRESS}), + ?MQTT_BRIDGE_EGRESS(?CONNECTR_ID)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), ?assertMatch(#{ <<"id">> := ?BRIDGE_ID_EGRESS - , <<"bridge_type">> := <<"mqtt">> + , <<"type">> := <<"mqtt">> + , <<"name">> := ?BRIDGE_NAME_EGRESS , <<"status">> := <<"connected">> , <<"connector">> := ?CONNECTR_ID }, jsx:decode(Bridge)), @@ -358,9 +384,15 @@ t_mqtt_conn_testing(_) -> %% APIs for testing the connectivity %% then we add a mqtt connector, using POST {ok, 200, <<>>} = request(post, uri(["connectors_test"]), - ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{<<"bridge_type">> => <<"mqtt">>}), + ?MQTT_CONNECOTR2(<<"127.0.0.1:1883">>)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }), {ok, 400, _} = request(post, uri(["connectors_test"]), - ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{<<"bridge_type">> => <<"mqtt">>}). + ?MQTT_CONNECOTR2(<<"127.0.0.1:2883">>)#{ + <<"type">> => ?CONNECTR_TYPE, + <<"name">> => ?BRIDGE_NAME_EGRESS + }). %%-------------------------------------------------------------------- %% HTTP Request