feat(connector): add APIs for connector
This commit is contained in:
parent
4dac90f4a7
commit
bcf5f499da
|
@ -14,3 +14,68 @@
|
|||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_connector).
|
||||
|
||||
-export([config_key_path/0]).
|
||||
|
||||
-export([ parse_connector_id/1
|
||||
, connector_id/2
|
||||
]).
|
||||
|
||||
-export([ list/0
|
||||
, lookup/1
|
||||
, lookup/2
|
||||
, update/2
|
||||
, update/3
|
||||
, delete/1
|
||||
, delete/2
|
||||
]).
|
||||
|
||||
config_key_path() ->
|
||||
[connectors].
|
||||
|
||||
connector_id(Type0, Name0) ->
|
||||
Type = bin(Type0),
|
||||
Name = bin(Name0),
|
||||
<<Type/binary, ":", Name/binary>>.
|
||||
|
||||
parse_connector_id(ConnectorId) ->
|
||||
case string:split(bin(ConnectorId), ":", all) of
|
||||
[Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)};
|
||||
_ -> error({invalid_connector_id, ConnectorId})
|
||||
end.
|
||||
|
||||
list() ->
|
||||
lists:foldl(fun({Type, NameAndConf}, Connectors) ->
|
||||
lists:foldl(fun({Name, RawConf}, Acc) ->
|
||||
[RawConf#{<<"id">> => connector_id(Type, Name)} | Acc]
|
||||
end, Connectors, maps:to_list(NameAndConf))
|
||||
end, [], maps:to_list(emqx:get_raw_config(config_key_path(), #{}))).
|
||||
|
||||
lookup(Id) when is_binary(Id) ->
|
||||
{Type, Name} = parse_connector_id(Id),
|
||||
lookup(Type, Name).
|
||||
|
||||
lookup(Type, Name) ->
|
||||
Id = connector_id(Type, Name),
|
||||
case emqx:get_raw_config(config_key_path() ++ [Type, Name], not_found) of
|
||||
not_found -> {error, not_found};
|
||||
Conf -> {ok, Conf#{<<"id">> => Id}}
|
||||
end.
|
||||
|
||||
update(Id, Conf) when is_binary(Id) ->
|
||||
{Type, Name} = parse_connector_id(Id),
|
||||
update(Type, Name, Conf).
|
||||
|
||||
update(Type, Name, Conf) ->
|
||||
emqx_conf:update(config_key_path() ++ [Type, Name], Conf, #{override_to => cluster}).
|
||||
|
||||
delete(Id) when is_binary(Id) ->
|
||||
{Type, Name} = parse_connector_id(Id),
|
||||
delete(Type, Name).
|
||||
|
||||
delete(Type, Name) ->
|
||||
emqx_conf:remove(config_key_path() ++ [Type, Name], #{override_to => cluster}).
|
||||
|
||||
bin(Bin) when is_binary(Bin) -> Bin;
|
||||
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_connector_api).
|
||||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
-include("emqx_connector.hrl").
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
|
||||
-import(hoconsc, [mk/2, ref/2, array/1, enum/1]).
|
||||
|
||||
%% Swagger specs from hocon schema
|
||||
-export([api_spec/0, paths/0, schema/1, namespace/0]).
|
||||
|
||||
%% API callbacks
|
||||
-export(['/connectors'/2, '/connectors/:id'/2]).
|
||||
|
||||
-define(TRY_PARSE_ID(ID, EXPR),
|
||||
try emqx_connector:parse_connector_id(Id) of
|
||||
{ConnType, ConnName} -> EXPR
|
||||
catch
|
||||
error:{invalid_bridge_id, Id0} ->
|
||||
{400, #{code => 'INVALID_ID', message => <<"invalid_bridge_id: ", Id0/binary,
|
||||
". Bridge Ids must be of format <bridge_type>:<name>">>}}
|
||||
end).
|
||||
|
||||
namespace() -> "connector".
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
|
||||
|
||||
paths() -> ["/connectors", "/connectors/:id"].
|
||||
|
||||
error_schema(Code, Message) ->
|
||||
[ {code, mk(string(), #{example => Code})}
|
||||
, {message, mk(string(), #{example => Message})}
|
||||
].
|
||||
|
||||
connector_info() ->
|
||||
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector_info")
|
||||
]).
|
||||
|
||||
connector_req() ->
|
||||
hoconsc:union([ ref(emqx_connector_schema, "mqtt_connector")
|
||||
]).
|
||||
|
||||
param_path_id() ->
|
||||
[{id, mk(binary(), #{in => path, example => <<"mqtt:my_mqtt_connector">>})}].
|
||||
|
||||
schema("/connectors") ->
|
||||
#{
|
||||
operationId => '/connectors',
|
||||
get => #{
|
||||
tags => [<<"connectors">>],
|
||||
description => <<"List all connectors">>,
|
||||
summary => <<"List connectors">>,
|
||||
responses => #{
|
||||
200 => mk(array(connector_info()), #{desc => "List of connectors"})
|
||||
}
|
||||
},
|
||||
post => #{
|
||||
tags => [<<"connectors">>],
|
||||
description => <<"Create a new connector by given Id <br>"
|
||||
"The Id must be of format <type>:<name>">>,
|
||||
summary => <<"Create connector">>,
|
||||
requestBody => connector_info(),
|
||||
responses => #{
|
||||
201 => connector_info(),
|
||||
400 => error_schema('ALREADY_EXISTS', "connector already exists")
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
schema("/connectors/:id") ->
|
||||
#{
|
||||
operationId => '/connectors/:id',
|
||||
get => #{
|
||||
tags => [<<"connectors">>],
|
||||
description => <<"Get the connector by Id">>,
|
||||
summary => <<"Get connector">>,
|
||||
parameters => param_path_id(),
|
||||
responses => #{
|
||||
200 => connector_info(),
|
||||
404 => error_schema('NOT_FOUND', "Connector not found")
|
||||
}
|
||||
},
|
||||
put => #{
|
||||
tags => [<<"connectors">>],
|
||||
description => <<"Update an existing connector by Id">>,
|
||||
summary => <<"Update connector">>,
|
||||
parameters => param_path_id(),
|
||||
requestBody => connector_req(),
|
||||
responses => #{
|
||||
200 => <<"Update connector successfully">>,
|
||||
400 => error_schema('UPDATE_FAIL', "Update failed"),
|
||||
404 => error_schema('NOT_FOUND', "Connector not found")
|
||||
}},
|
||||
delete => #{
|
||||
tags => [<<"connectors">>],
|
||||
description => <<"Delete a connector by Id">>,
|
||||
summary => <<"Delete connector">>,
|
||||
parameters => param_path_id(),
|
||||
responses => #{
|
||||
200 => <<"Delete connector successfully">>,
|
||||
400 => error_schema('DELETE_FAIL', "Delete failed")
|
||||
}}
|
||||
}.
|
||||
|
||||
'/connectors'(get, _Request) ->
|
||||
{200, emqx_connector:list()};
|
||||
|
||||
'/connectors'(post, #{body := #{<<"id">> := Id} = Params}) ->
|
||||
?TRY_PARSE_ID(Id,
|
||||
case emqx_connector:lookup(ConnType, ConnName) of
|
||||
{ok, _} ->
|
||||
{400, error_msg('ALREADY_EXISTS', <<"connector already exists">>)};
|
||||
{error, not_found} ->
|
||||
case emqx_connector:update(ConnType, ConnName, maps:remove(<<"id">>, Params)) of
|
||||
{ok, #{raw_config := RawConf}} -> {201, RawConf#{<<"id">> => Id}};
|
||||
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
|
||||
end
|
||||
end).
|
||||
|
||||
'/connectors/:id'(get, #{bindings := #{id := Id}}) ->
|
||||
?TRY_PARSE_ID(Id,
|
||||
case emqx_connector:lookup(ConnType, ConnName) of
|
||||
{ok, Conf} -> {200, Conf#{<<"id">> => Id}};
|
||||
{error, not_found} ->
|
||||
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
|
||||
end);
|
||||
|
||||
'/connectors/:id'(put, #{bindings := #{id := Id}, body := Params}) ->
|
||||
?TRY_PARSE_ID(Id,
|
||||
case emqx_connector:lookup(ConnType, ConnName) of
|
||||
{ok, _} ->
|
||||
case emqx_connector:update(ConnType, ConnName, Params) of
|
||||
{ok, #{raw_config := RawConf}} -> {200, RawConf#{<<"id">> => Id}};
|
||||
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
|
||||
end;
|
||||
{error, not_found} ->
|
||||
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
|
||||
end);
|
||||
|
||||
'/connectors/:id'(delete, #{bindings := #{id := Id}}) ->
|
||||
?TRY_PARSE_ID(Id,
|
||||
case emqx_connector:lookup(ConnType, ConnName) of
|
||||
{ok, _} ->
|
||||
case emqx_connector:delete(ConnType, ConnName) of
|
||||
{ok, _} -> {200};
|
||||
{error, Error} -> {400, error_msg('BAD_ARG', Error)}
|
||||
end;
|
||||
{error, not_found} ->
|
||||
{404, error_msg('NOT_FOUND', <<"connector not found">>)}
|
||||
end).
|
||||
|
||||
error_msg(Code, Msg) when is_binary(Msg) ->
|
||||
#{code => Code, message => Msg};
|
||||
error_msg(Code, Msg) ->
|
||||
#{code => Code, message => list_to_binary(io_lib:format("~p", [Msg]))}.
|
|
@ -21,7 +21,11 @@ fields("connectors") ->
|
|||
];
|
||||
|
||||
fields("mqtt_connector") ->
|
||||
emqx_connector_mqtt_schema:fields("connector").
|
||||
emqx_connector_mqtt_schema:fields("connector");
|
||||
|
||||
fields("mqtt_connector_info") ->
|
||||
[{id, sc(binary(), #{desc => "The connector Id"})}]
|
||||
++ fields("mqtt_connector").
|
||||
|
||||
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
|
||||
|
||||
|
|
|
@ -9,12 +9,12 @@
|
|||
-export([schema_with_example/2, schema_with_examples/2]).
|
||||
-export([error_codes/1, error_codes/2]).
|
||||
|
||||
-export([filter_check_request/2, filter_check_request_and_translate_body/2]).
|
||||
|
||||
-ifdef(TEST).
|
||||
-export([
|
||||
parse_spec_ref/2,
|
||||
components/1,
|
||||
filter_check_request/2,
|
||||
filter_check_request_and_translate_body/2]).
|
||||
-export([ parse_spec_ref/2
|
||||
, components/1,
|
||||
]).
|
||||
-endif.
|
||||
|
||||
-define(METHODS, [get, post, put, head, delete, patch, options, trace]).
|
||||
|
@ -137,9 +137,9 @@ check_only(Schema, Map, Opts) ->
|
|||
Map.
|
||||
|
||||
support_check_schema(#{check_schema := true, translate_body := true}) ->
|
||||
#{filter => fun filter_check_request_and_translate_body/2};
|
||||
#{filter => fun ?MODULE:filter_check_request_and_translate_body/2};
|
||||
support_check_schema(#{check_schema := true}) ->
|
||||
#{filter => fun filter_check_request/2};
|
||||
#{filter => fun ?MODULE:filter_check_request/2};
|
||||
support_check_schema(#{check_schema := Filter}) when is_function(Filter, 2) ->
|
||||
#{filter => Filter};
|
||||
support_check_schema(_) ->
|
||||
|
|
Loading…
Reference in New Issue