From bcf5f499da570cc7c69a9d4a45526f99d86c067f Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 17 Nov 2021 15:20:33 +0800 Subject: [PATCH] feat(connector): add APIs for connector --- apps/emqx_connector/src/emqx_connector.erl | 65 +++++++ .../emqx_connector/src/emqx_connector_api.erl | 174 ++++++++++++++++++ .../src/emqx_connector_schema.erl | 6 +- .../src/emqx_dashboard_swagger.erl | 14 +- 4 files changed, 251 insertions(+), 8 deletions(-) create mode 100644 apps/emqx_connector/src/emqx_connector_api.erl diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index dd0359348..0034fb28b 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -14,3 +14,68 @@ %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_connector). + +-export([config_key_path/0]). + +-export([ parse_connector_id/1 + , connector_id/2 + ]). + +-export([ list/0 + , lookup/1 + , lookup/2 + , update/2 + , update/3 + , delete/1 + , delete/2 + ]). + +config_key_path() -> + [connectors]. + +connector_id(Type0, Name0) -> + Type = bin(Type0), + Name = bin(Name0), + <>. + +parse_connector_id(ConnectorId) -> + case string:split(bin(ConnectorId), ":", all) of + [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)}; + _ -> error({invalid_connector_id, ConnectorId}) + end. + +list() -> + lists:foldl(fun({Type, NameAndConf}, Connectors) -> + lists:foldl(fun({Name, RawConf}, Acc) -> + [RawConf#{<<"id">> => connector_id(Type, Name)} | Acc] + end, Connectors, maps:to_list(NameAndConf)) + end, [], maps:to_list(emqx:get_raw_config(config_key_path(), #{}))). + +lookup(Id) when is_binary(Id) -> + {Type, Name} = parse_connector_id(Id), + lookup(Type, Name). + +lookup(Type, Name) -> + Id = connector_id(Type, Name), + case emqx:get_raw_config(config_key_path() ++ [Type, Name], not_found) of + not_found -> {error, not_found}; + Conf -> {ok, Conf#{<<"id">> => Id}} + end. + +update(Id, Conf) when is_binary(Id) -> + {Type, Name} = parse_connector_id(Id), + update(Type, Name, Conf). + +update(Type, Name, Conf) -> + emqx_conf:update(config_key_path() ++ [Type, Name], Conf, #{override_to => cluster}). + +delete(Id) when is_binary(Id) -> + {Type, Name} = parse_connector_id(Id), + delete(Type, Name). + +delete(Type, Name) -> + emqx_conf:remove(config_key_path() ++ [Type, Name], #{override_to => cluster}). + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Str) when is_list(Str) -> list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl new file mode 100644 index 000000000..490722e1d --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -0,0 +1,174 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_api). + +-behaviour(minirest_api). + +-include("emqx_connector.hrl"). + +-include_lib("typerefl/include/types.hrl"). + +-import(hoconsc, [mk/2, ref/2, array/1, enum/1]). + +%% Swagger specs from hocon schema +-export([api_spec/0, paths/0, schema/1, namespace/0]). + +%% API callbacks +-export(['/connectors'/2, '/connectors/:id'/2]). + +-define(TRY_PARSE_ID(ID, EXPR), + try emqx_connector:parse_connector_id(Id) of + {ConnType, ConnName} -> EXPR + catch + error:{invalid_bridge_id, Id0} -> + {400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary, + ". Bridge Ids must be of format :">>}} + end). + +namespace() -> "connector". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). + +paths() -> ["/connectors", "/connectors/:id"]. + +error_schema(Code, Message) -> + [ {code, mk(string(), #{example => Code})} + , {message, mk(string(), #{example => Message})} + ]. + +connector_info() -> + hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info") + ]). + +connector_req() -> + hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector") + ]). + +param_path_id() -> + [{id, mk(binary(), #{in => path, example => <<"mqtt:my_mqtt_connector">>})}]. + +schema("/connectors") -> + #{ + operationId => '/connectors', + get => #{ + tags => [<<"connectors">>], + description => <<"List all connectors">>, + summary => <<"List connectors">>, + responses => #{ + 200 => mk(array(connector_info()), #{desc => "List of connectors"}) + } + }, + post => #{ + tags => [<<"connectors">>], + description => <<"Create a new connector by given Id
" + "The Id must be of format :">>, + summary => <<"Create connector">>, + requestBody => connector_info(), + responses => #{ + 201 => connector_info(), + 400 => error_schema('ALREADY_EXISTS', "connector already exists") + } + } + }; + +schema("/connectors/:id") -> + #{ + operationId => '/connectors/:id', + get => #{ + tags => [<<"connectors">>], + description => <<"Get the connector by Id">>, + summary => <<"Get connector">>, + parameters => param_path_id(), + responses => #{ + 200 => connector_info(), + 404 => error_schema('NOT_FOUND', "Connector not found") + } + }, + put => #{ + tags => [<<"connectors">>], + description => <<"Update an existing connector by Id">>, + summary => <<"Update connector">>, + parameters => param_path_id(), + requestBody => connector_req(), + responses => #{ + 200 => <<"Update connector successfully">>, + 400 => error_schema('UPDATE_FAIL', "Update failed"), + 404 => error_schema('NOT_FOUND', "Connector not found") + }}, + delete => #{ + tags => [<<"connectors">>], + description => <<"Delete a connector by Id">>, + summary => <<"Delete connector">>, + parameters => param_path_id(), + responses => #{ + 200 => <<"Delete connector successfully">>, + 400 => error_schema('DELETE_FAIL', "Delete failed") + }} + }. + +'/connectors'(get, _Request) -> + {200, emqx_connector:list()}; + +'/connectors'(post, #{body := #{<<"id">> := Id} = Params}) -> + ?TRY_PARSE_ID(Id, + case emqx_connector:lookup(ConnType, ConnName) of + {ok, _} -> + {400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)}; + {error, not_found} -> + case emqx_connector:update(ConnType, ConnName, maps:remove(<<"id">>, Params)) of + {ok, #{raw_config := RawConf}} -> {201, RawConf#{<<"id">> => Id}}; + {error, Error} -> {400, error_msg('BAD_ARG', Error)} + end + end). + +'/connectors/:id'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, + case emqx_connector:lookup(ConnType, ConnName) of + {ok, Conf} -> {200, Conf#{<<"id">> => Id}}; + {error, not_found} -> + {404, error_msg('NOT_FOUND', <<"connector not found">>)} + end); + +'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params}) -> + ?TRY_PARSE_ID(Id, + case emqx_connector:lookup(ConnType, ConnName) of + {ok, _} -> + case emqx_connector:update(ConnType, ConnName, Params) of + {ok, #{raw_config := RawConf}} -> {200, RawConf#{<<"id">> => Id}}; + {error, Error} -> {400, error_msg('BAD_ARG', Error)} + end; + {error, not_found} -> + {404, error_msg('NOT_FOUND', <<"connector not found">>)} + end); + +'/connectors/:id'(delete, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, + case emqx_connector:lookup(ConnType, ConnName) of + {ok, _} -> + case emqx_connector:delete(ConnType, ConnName) of + {ok, _} -> {200}; + {error, Error} -> {400, error_msg('BAD_ARG', Error)} + end; + {error, not_found} -> + {404, error_msg('NOT_FOUND', <<"connector not found">>)} + end). + +error_msg(Code, Msg) when is_binary(Msg) -> + #{code => Code, message => Msg}; +error_msg(Code, Msg) -> + #{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}. diff --git a/apps/emqx_connector/src/emqx_connector_schema.erl b/apps/emqx_connector/src/emqx_connector_schema.erl index f2b99cf3e..6e353b524 100644 --- a/apps/emqx_connector/src/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/emqx_connector_schema.erl @@ -21,7 +21,11 @@ fields("connectors") -> ]; fields("mqtt_connector") -> - emqx_connector_mqtt_schema:fields("connector"). + emqx_connector_mqtt_schema:fields("connector"); + +fields("mqtt_connector_info") -> + [{id, sc(binary(), #{desc => "The connector Id"})}] + ++ fields("mqtt_connector"). sc(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index 6317ecb76..83760afa4 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -9,12 +9,12 @@ -export([schema_with_example/2, schema_with_examples/2]). -export([error_codes/1, error_codes/2]). +-export([filter_check_request/2, filter_check_request_and_translate_body/2]). + -ifdef(TEST). --export([ - parse_spec_ref/2, - components/1, - filter_check_request/2, - filter_check_request_and_translate_body/2]). +-export([ parse_spec_ref/2 + , components/1, + ]). -endif. -define(METHODS, [get, post, put, head, delete, patch, options, trace]). @@ -137,9 +137,9 @@ check_only(Schema, Map, Opts) -> Map. support_check_schema(#{check_schema := true, translate_body := true}) -> - #{filter => fun filter_check_request_and_translate_body/2}; + #{filter => fun ?MODULE:filter_check_request_and_translate_body/2}; support_check_schema(#{check_schema := true}) -> - #{filter => fun filter_check_request/2}; + #{filter => fun ?MODULE:filter_check_request/2}; support_check_schema(#{check_schema := Filter}) when is_function(Filter, 2) -> #{filter => Filter}; support_check_schema(_) ->