feat: HTTP API for bridge_v2

This commit is contained in:
Stefan Strigler 2023-10-12 17:41:56 +02:00 committed by Zaiming (Stone) Shi
parent a5a060473c
commit 38f5c7dcf4
8 changed files with 989 additions and 10 deletions

View File

@ -0,0 +1,582 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_v2_api).
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-import(hoconsc, [mk/2, array/1, enum/1]).
-import(emqx_utils, [redact/1]).
%% Swagger specs from hocon schema
-export([
api_spec/0,
paths/0,
schema/1,
namespace/0
]).
%% API callbacks
-export([
'/bridges_v2'/2,
'/bridges_v2/:id'/2,
'/bridges_v2/:id/enable/:enable'/2
%%,
%% '/bridges_v2/:id/:operation'/2,
%% '/nodes/:node/bridges_v2/:id/:operation'/2,
%% '/bridges_v2_probe'/2
]).
%% BpAPI
-export([lookup_from_local_node/2]).
-define(BRIDGE_NOT_FOUND(BRIDGE_TYPE, BRIDGE_NAME),
?NOT_FOUND(
<<"Bridge lookup failed: bridge named '", (bin(BRIDGE_NAME))/binary, "' of type ",
(bin(BRIDGE_TYPE))/binary, " does not exist.">>
)
).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}) of
{BridgeType, BridgeName} ->
EXPR
catch
throw:#{reason := Reason} ->
?NOT_FOUND(<<"Invalid bridge ID, ", Reason/binary>>)
end
).
namespace() -> "bridge_v2".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
paths() ->
[
"/bridges_v2",
"/bridges_v2/:id",
"/bridges_v2/:id/enable/:enable"
%%,
%% "/bridges_v2/:id/:operation",
%% "/nodes/:node/bridges_v2/:id/:operation",
%% "/bridges_v2_probe"
].
error_schema(Code, Message) when is_atom(Code) ->
error_schema([Code], Message);
error_schema(Codes, Message) when is_list(Message) ->
error_schema(Codes, list_to_binary(Message));
error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) ->
emqx_dashboard_swagger:error_codes(Codes, Message).
get_response_body_schema() ->
emqx_dashboard_swagger:schema_with_examples(
emqx_bridge_v2_schema:get_response(),
bridge_info_examples(get)
).
bridge_info_examples(Method) ->
maps:merge(
#{},
emqx_enterprise_bridge_examples(Method)
).
bridge_info_array_example(Method) ->
lists:map(fun(#{value := Config}) -> Config end, maps:values(bridge_info_examples(Method))).
-if(?EMQX_RELEASE_EDITION == ee).
emqx_enterprise_bridge_examples(Method) ->
emqx_bridge_v2_enterprise:examples(Method).
-else.
emqx_enterprise_bridgex_examples(_Method) -> #{}.
-endif.
param_path_id() ->
{id,
mk(
binary(),
#{
in => path,
required => true,
example => <<"webhook:webhook_example">>,
desc => ?DESC("desc_param_path_id")
}
)}.
param_path_enable() ->
{enable,
mk(
boolean(),
#{
in => path,
required => true,
desc => ?DESC("desc_param_path_enable"),
example => true
}
)}.
schema("/bridges_v2") ->
#{
'operationId' => '/bridges_v2',
get => #{
tags => [<<"bridges_v2">>],
summary => <<"List bridges">>,
description => ?DESC("desc_api1"),
responses => #{
200 => emqx_dashboard_swagger:schema_with_example(
array(emqx_bridge_v2_schema:get_response()),
bridge_info_array_example(get)
)
}
},
post => #{
tags => [<<"bridges_v2">>],
summary => <<"Create bridge">>,
description => ?DESC("desc_api2"),
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_bridge_v2_schema:post_request(),
bridge_info_examples(post)
),
responses => #{
201 => get_response_body_schema(),
400 => error_schema('ALREADY_EXISTS', "Bridge already exists")
}
}
};
schema("/bridges_v2/:id") ->
#{
'operationId' => '/bridges_v2/:id',
get => #{
tags => [<<"bridges_v2">>],
summary => <<"Get bridge">>,
description => ?DESC("desc_api3"),
parameters => [param_path_id()],
responses => #{
200 => get_response_body_schema(),
404 => error_schema('NOT_FOUND', "Bridge not found")
}
},
put => #{
tags => [<<"bridges_v2">>],
summary => <<"Update bridge">>,
description => ?DESC("desc_api4"),
parameters => [param_path_id()],
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
emqx_bridge_v2_schema:put_request(),
bridge_info_examples(put)
),
responses => #{
200 => get_response_body_schema(),
404 => error_schema('NOT_FOUND', "Bridge not found"),
400 => error_schema('BAD_REQUEST', "Update bridge failed")
}
},
delete => #{
tags => [<<"bridges_v2">>],
summary => <<"Delete bridge">>,
description => ?DESC("desc_api5"),
parameters => [param_path_id()],
responses => #{
204 => <<"Bridge deleted">>,
400 => error_schema(
'BAD_REQUEST',
"Cannot delete bridge while active rules are defined for this bridge"
),
404 => error_schema('NOT_FOUND', "Bridge not found"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
}
}
};
schema("/bridges_v2/:id/enable/:enable") ->
#{
'operationId' => '/bridges_v2/:id/enable/:enable',
put =>
#{
tags => [<<"bridges_v2">>],
summary => <<"Enable or disable bridge">>,
desc => ?DESC("desc_enable_bridge"),
parameters => [param_path_id(), param_path_enable()],
responses =>
#{
204 => <<"Success">>,
404 => error_schema(
'NOT_FOUND', "Bridge not found or invalid operation"
),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
}
}
}.
%% schema("/bridges/:id/:operation") ->
%% #{
%% 'operationId' => '/bridges/:id/:operation',
%% post => #{
%% tags => [<<"bridges">>],
%% summary => <<"Stop, start or restart bridge">>,
%% description => ?DESC("desc_api7"),
%% parameters => [
%% param_path_id(),
%% param_path_operation_cluster()
%% ],
%% responses => #{
%% 204 => <<"Operation success">>,
%% 400 => error_schema(
%% 'BAD_REQUEST', "Problem with configuration of external service"
%% ),
%% 404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"),
%% 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
%% 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
%% }
%% }
%% };
%% schema("/nodes/:node/bridges/:id/:operation") ->
%% #{
%% 'operationId' => '/nodes/:node/bridges/:id/:operation',
%% post => #{
%% tags => [<<"bridges">>],
%% summary => <<"Stop, start or restart bridge">>,
%% description => ?DESC("desc_api8"),
%% parameters => [
%% param_path_node(),
%% param_path_id(),
%% param_path_operation_on_node()
%% ],
%% responses => #{
%% 204 => <<"Operation success">>,
%% 400 => error_schema(
%% 'BAD_REQUEST',
%% "Problem with configuration of external service or bridge not enabled"
%% ),
%% 404 => error_schema(
%% 'NOT_FOUND', "Bridge or node not found or invalid operation"
%% ),
%% 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
%% 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
%% }
%% }
%% };
%% schema("/bridges_probe") ->
%% #{
%% 'operationId' => '/bridges_probe',
%% post => #{
%% tags => [<<"bridges">>],
%% desc => ?DESC("desc_api9"),
%% summary => <<"Test creating bridge">>,
%% 'requestBody' => emqx_dashboard_swagger:schema_with_examples(
%% emqx_bridge_schema:post_request(),
%% bridge_info_examples(post)
%% ),
%% responses => #{
%% 204 => <<"Test bridge OK">>,
%% 400 => error_schema(['TEST_FAILED'], "bridge test failed")
%% }
%% }
%% }.
'/bridges_v2'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
{ok, _} ->
?BAD_REQUEST('ALREADY_EXISTS', <<"bridge already exists">>);
{error, bridge_not_found} ->
Conf = filter_out_request_body(Conf0),
create_bridge(BridgeType, BridgeName, Conf)
end;
'/bridges_v2'(get, _Params) ->
Nodes = mria:running_nodes(),
NodeReplies = emqx_bridge_proto_v5:v2_list_bridges_on_nodes(Nodes),
case is_ok(NodeReplies) of
{ok, NodeBridges} ->
AllBridges = [
[format_resource(Data, Node) || Data <- Bridges]
|| {Node, Bridges} <- lists:zip(Nodes, NodeBridges)
],
?OK(zip_bridges(AllBridges));
{error, Reason} ->
?INTERNAL_ERROR(Reason)
end;
'/bridges_v2'(post, _Params) ->
?BAD_REQUEST("Bad Request");
'/bridges_v2'(_, _) ->
?METHOD_NOT_ALLOWED.
'/bridges_v2/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
'/bridges_v2/:id'(put, #{bindings := #{id := Id}, body := Conf0}) ->
Conf1 = filter_out_request_body(Conf0),
?TRY_PARSE_ID(
Id,
case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
{ok, _} ->
RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
Conf = deobfuscate(Conf1, RawConf),
update_bridge(BridgeType, BridgeName, Conf);
{error, bridge_not_found} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
end
);
'/bridges_v2/:id'(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(
Id,
case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
{ok, _} ->
case emqx_bridge_v2:remove(BridgeType, BridgeName) of
{ok, _} ->
?NO_CONTENT;
{error, {active_channels, Channels}} ->
?BAD_REQUEST(
{<<"Cannot delete bridge while there are active channels defined for this bridge">>,
Channels}
);
{error, timeout} ->
?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, Reason} ->
?INTERNAL_ERROR(Reason)
end;
{error, bridge_not_found} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
end
);
'/bridges_v2/:id'(_, _) ->
?METHOD_NOT_ALLOWED.
'/bridges_v2/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
?TRY_PARSE_ID(
Id,
case enable_func(Enable) of
invalid ->
?NOT_FOUND(<<"Invalid operation">>);
OperFunc ->
case emqx_bridge_v2:disable_enable(OperFunc, BridgeType, BridgeName) of
{ok, _} ->
?NO_CONTENT;
{error, {pre_config_update, _, bridge_not_found}} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
{error, {_, _, timeout}} ->
?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, timeout} ->
?SERVICE_UNAVAILABLE(<<"request timeout">>);
{error, Reason} ->
?INTERNAL_ERROR(Reason)
end
end
);
'/bridges_v2/:id/enable/:enable'(_, _) ->
?METHOD_NOT_ALLOWED.
%%% API helpers
is_ok(ok) ->
ok;
is_ok(OkResult = {ok, _}) ->
OkResult;
is_ok(Error = {error, _}) ->
Error;
is_ok(ResL) ->
case
lists:filter(
fun
({ok, _}) -> false;
(ok) -> false;
(_) -> true
end,
ResL
)
of
[] -> {ok, [Res || {ok, Res} <- ResL]};
ErrL -> hd(ErrL)
end.
deobfuscate(NewConf, OldConf) ->
maps:fold(
fun(K, V, Acc) ->
case maps:find(K, OldConf) of
error ->
Acc#{K => V};
{ok, OldV} when is_map(V), is_map(OldV) ->
Acc#{K => deobfuscate(V, OldV)};
{ok, OldV} ->
case emqx_utils:is_redacted(K, V) of
true ->
Acc#{K => OldV};
_ ->
Acc#{K => V}
end
end
end,
#{},
NewConf
).
%% bridge helpers
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
Nodes = mria:running_nodes(),
case is_ok(emqx_bridge_proto_v5:v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
{ok, [{ok, _} | _] = Results} ->
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
{ok, [{error, bridge_not_found} | _]} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
{error, Reason} ->
?INTERNAL_ERROR(Reason)
end.
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
lists:foldl(
fun(#{type := Type, name := Name}, Acc) ->
Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes),
[format_bridge_info(Bridges) | Acc]
end,
[],
BridgesFirstNode
).
pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
lists:foldl(
fun(BridgesOneNode, Acc) ->
case
[
Bridge
|| Bridge = #{type := Type0, name := Name0} <- BridgesOneNode,
Type0 == Type,
Name0 == Name
]
of
[BridgeInfo] ->
[BridgeInfo | Acc];
[] ->
?SLOG(warning, #{
msg => "bridge_inconsistent_in_cluster",
reason => not_found,
type => Type,
name => Name,
bridge => emqx_bridge_resource:bridge_id(Type, Name)
}),
Acc
end
end,
[],
BridgesAllNodes
).
format_bridge_info([FirstBridge | _] = Bridges) ->
Res = maps:remove(node, FirstBridge),
NodeStatus = node_status(Bridges),
redact(Res#{
status => aggregate_status(NodeStatus),
node_status => NodeStatus
}).
node_status(Bridges) ->
[maps:with([node, status, status_reason], B) || B <- Bridges].
aggregate_status(AllStatus) ->
Head = fun([A | _]) -> A end,
HeadVal = maps:get(status, Head(AllStatus), connecting),
AllRes = lists:all(fun(#{status := Val}) -> Val == HeadVal end, AllStatus),
case AllRes of
true -> HeadVal;
false -> inconsistent
end.
lookup_from_local_node(BridgeType, BridgeName) ->
case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
{ok, Res} -> {ok, format_resource(Res, node())};
Error -> Error
end.
%% resource
format_resource(
#{
type := Type,
name := Name,
raw_config := RawConf,
resource_data := ResourceData
},
Node
) ->
redact(
maps:merge(
RawConf#{
type => Type,
name => maps:get(<<"name">>, RawConf, Name),
node => Node
},
format_resource_data(ResourceData)
)
).
format_resource_data(ResData) ->
maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)).
format_resource_data(error, undefined, Result) ->
Result;
format_resource_data(error, Error, Result) ->
Result#{status_reason => emqx_utils:readable_error_msg(Error)};
format_resource_data(K, V, Result) ->
Result#{K => V}.
create_bridge(BridgeType, BridgeName, Conf) ->
create_or_update_bridge(BridgeType, BridgeName, Conf, 201).
update_bridge(BridgeType, BridgeName, Conf) ->
create_or_update_bridge(BridgeType, BridgeName, Conf, 200).
create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
case emqx_bridge_v2:create(BridgeType, BridgeName, Conf) of
{ok, _} ->
lookup_from_all_nodes(BridgeType, BridgeName, HttpStatusCode);
{error, Reason} when is_map(Reason) ->
?BAD_REQUEST(map_to_json(redact(Reason)))
end.
enable_func(<<"true">>) -> enable;
enable_func(<<"false">>) -> disable;
enable_func(_) -> invalid.
filter_out_request_body(Conf) ->
ExtraConfs = [
<<"id">>,
<<"type">>,
<<"name">>,
<<"status">>,
<<"status_reason">>,
<<"node_status">>,
<<"node">>
],
maps:without(ExtraConfs, Conf).
%% general helpers
bin(S) when is_list(S) ->
list_to_binary(S);
bin(S) when is_atom(S) ->
atom_to_binary(S, utf8);
bin(S) when is_binary(S) ->
S.
map_to_json(M0) ->
%% When dealing with Hocon validation errors, `value' might contain non-serializable
%% values (e.g.: user_lookup_fun), so we try again without that key if serialization
%% fails as a best effort.
M1 = emqx_utils_maps:jsonable_map(M0, fun(K, V) -> {K, emqx_utils_maps:binary_string(V)} end),
try
emqx_utils_json:encode(M1)
catch
error:_ ->
M2 = maps:without([value, <<"value">>], M1),
emqx_utils_json:encode(M2)
end.

View File

@ -0,0 +1,154 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_proto_v5).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
list_bridges_on_nodes/1,
restart_bridge_to_node/3,
start_bridge_to_node/3,
stop_bridge_to_node/3,
lookup_from_all_nodes/3,
get_metrics_from_all_nodes/3,
restart_bridges_to_all_nodes/3,
start_bridges_to_all_nodes/3,
stop_bridges_to_all_nodes/3,
v2_list_bridges_on_nodes/1,
v2_lookup_from_all_nodes/3
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 15000).
introduced_in() ->
"5.3.1".
-spec list_bridges_on_nodes([node()]) ->
emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
list_bridges_on_nodes(Nodes) ->
erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT).
-type key() :: atom() | binary() | [byte()].
-spec restart_bridge_to_node(node(), key(), key()) ->
term().
restart_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridge_to_node(node(), key(), key()) ->
term().
start_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridge_to_node(node(), key(), key()) ->
term().
stop_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec lookup_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_api,
lookup_from_local_node,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec get_metrics_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()).
get_metrics_from_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_api,
get_metrics_from_local_node,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec v2_list_bridges_on_nodes([node()]) ->
emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
v2_list_bridges_on_nodes(Nodes) ->
erpc:multicall(Nodes, emqx_bridge_v2, list, [], ?TIMEOUT).
-spec v2_lookup_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_v2_api,
lookup_from_local_node,
[BridgeType, BridgeName],
?TIMEOUT
).

View File

@ -5,13 +5,31 @@
-if(?EMQX_RELEASE_EDITION == ee).
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
api_schemas/1,
examples/1,
fields/1
]).
examples(Method) ->
MergeFun =
fun(Example, Examples) ->
maps:merge(Examples, Example)
end,
Fun =
fun(Module, Examples) ->
ConnectorExamples = erlang:apply(Module, bridge_v2_examples, [Method]),
lists:foldl(MergeFun, Examples, ConnectorExamples)
end,
lists:foldl(Fun, #{}, schema_modules()).
schema_modules() ->
[
emqx_bridge_kafka
].
fields(bridges_v2) ->
kafka_structs().
@ -27,6 +45,18 @@ kafka_structs() ->
)}
].
api_schemas(Method) ->
[
%% We need to map the `type' field of a request (binary) to a
%% connector schema module.
%% TODO: rename this to `kafka_producer' after alias support is added
%% to hocon; keeping this as just `kafka' for backwards compatibility.
api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_bridge_v2")
].
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
-else.
-endif.

View File

@ -23,7 +23,21 @@
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
-export([
get_response/0,
put_request/0,
post_request/0
]).
-if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_bridge_v2_enterprise:module_info(),
case erlang:function_exported(emqx_bridge_v2_enterprise, api_schemas, 1) of
true -> emqx_bridge_v2_enterprise:api_schemas(Method);
false -> []
end.
enterprise_fields_actions() ->
%% We *must* do this to ensure the module is really loaded, especially when we use
@ -42,11 +56,58 @@ enterprise_fields_actions() -> [].
-endif.
%%======================================================================================
%% For HTTP APIs
get_response() ->
api_schema("get").
put_request() ->
api_schema("put").
post_request() ->
api_schema("post").
api_schema(Method) ->
Broker = [
{Type, ref(Mod, Method)}
|| {Type, Mod} <- [
% {<<"webhook">>, emqx_bridge_http_schema},
% {<<"mqtt">>, emqx_bridge_mqtt_schema}
]
],
EE = enterprise_api_schemas(Method),
hoconsc:union(bridge_api_union(Broker ++ EE)).
bridge_api_union(Refs) ->
Index = maps:from_list(Refs),
fun
(all_union_members) ->
maps:values(Index);
({value, V}) ->
case V of
#{<<"type">> := T} ->
case maps:get(T, Index, undefined) of
undefined ->
throw(#{
field_name => type,
reason => <<"unknown bridge type">>
});
Ref ->
[Ref]
end;
_ ->
throw(#{
field_name => type,
reason => <<"unknown bridge type">>
})
end
end.
%%======================================================================================
%% HOCON Schema Callbacks
%%======================================================================================
namespace() -> "bridge_v2".
namespace() -> "bridges_v2".
tags() ->
[<<"Bridge V2">>].

View File

@ -3,7 +3,6 @@
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
@ -18,6 +17,7 @@
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
bridge_v2_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).
@ -47,6 +47,16 @@ connector_examples(_Method) ->
}
].
bridge_v2_examples(Method) ->
[
#{
<<"kafka">> => #{
summary => <<"Kafka Bridge v2">>,
value => values({Method, bridge_v2_producer})
}
}
].
conn_bridge_examples(Method) ->
[
#{
@ -67,11 +77,41 @@ conn_bridge_examples(Method) ->
].
values({get, KafkaType}) ->
values({post, KafkaType});
maps:merge(
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
},
values({post, KafkaType})
);
values({post, KafkaType}) ->
maps:merge(values(common_config), values(KafkaType));
maps:merge(
#{
name => <<"my_bridge">>,
type => <<"kafka">>
},
values({put, KafkaType})
);
values({put, KafkaType}) when KafkaType =:= bridge_v2_producer ->
values(KafkaType);
values({put, KafkaType}) ->
values({post, KafkaType});
maps:merge(values(common_config), values(KafkaType));
values(bridge_v2_producer) ->
maps:merge(
#{
enable => true,
connector => <<"my_connector">>,
resource_opts => #{
health_check_interval => "32s"
}
},
values(producer)
);
values(common_config) ->
#{
authentication => #{
@ -160,7 +200,7 @@ host_opts() ->
namespace() -> "bridge_kafka".
roots() -> ["config_consumer", "config_producer"].
roots() -> ["config_consumer", "config_producer", "config_bridge_v2"].
fields("post_" ++ Type) ->
[type_field(Type), name_field() | fields("config_" ++ Type)];
@ -168,6 +208,8 @@ fields("put_" ++ Type) ->
fields("config_" ++ Type);
fields("get_" ++ Type) ->
emqx_bridge_schema:status_fields() ++ fields("post_" ++ Type);
fields("config_bridge_v2") ->
fields(kafka_producer_action);
fields("config_connector") ->
fields(kafka_connector);
fields("config_producer") ->

View File

@ -347,7 +347,11 @@ schema("/connectors_probe") ->
?OK(zip_connectors(AllConnectors));
{error, Reason} ->
?INTERNAL_ERROR(Reason)
end.
end;
'/connectors'(post, _Params) ->
?BAD_REQUEST(<<"Bad Request">>);
'/connectors'(_, _) ->
?METHOD_NOT_ALLOWED.
'/connectors/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(ConnectorType, ConnectorName, 200));
@ -385,7 +389,9 @@ schema("/connectors_probe") ->
{error, not_found} ->
?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName)
end
).
);
'/connectors/:id'(_, _) ->
?METHOD_NOT_ALLOWED.
'/connectors_probe'(post, Request) ->
RequestMeta = #{module => ?MODULE, method => post, path => "/connectors_probe"},
@ -411,7 +417,9 @@ schema("/connectors_probe") ->
end;
BadRequest ->
redact(BadRequest)
end.
end;
'/connectors_probe'(_, _) ->
?METHOD_NOT_ALLOWED.
maybe_deobfuscate_connector_probe(
#{<<"type">> := ConnectorType, <<"name">> := ConnectorName} = Params

View File

@ -28,6 +28,8 @@
-define(NOT_FOUND(REASON), {404, ?ERROR_MSG('NOT_FOUND', REASON)}).
-define(METHOD_NOT_ALLOWED, 405).
-define(INTERNAL_ERROR(REASON), {500, ?ERROR_MSG('INTERNAL_ERROR', REASON)}).
-define(NOT_IMPLEMENTED, 501).

View File

@ -0,0 +1,100 @@
emqx_bridge_v2_api {
desc_api1.desc:
"""List all created bridges."""
desc_api1.label:
"""List All Bridges"""
desc_api2.desc:
"""Create a new bridge by type and name."""
desc_api2.label:
"""Create Bridge"""
desc_api3.desc:
"""Get a bridge by id."""
desc_api3.label:
"""Get Bridge"""
desc_api4.desc:
"""Update a bridge by id."""
desc_api4.label:
"""Update Bridge"""
desc_api5.desc:
"""Delete a bridge by id."""
desc_api5.label:
"""Delete Bridge"""
desc_api6.desc:
"""Reset a bridge metrics by id."""
desc_api6.label:
"""Reset Bridge Metrics"""
desc_api7.desc:
"""Stop/restart bridges on all nodes in the cluster."""
desc_api7.label:
"""Cluster Bridge Operate"""
desc_api8.desc:
"""Stop/restart bridges on a specific node."""
desc_api8.label:
"""Node Bridge Operate"""
desc_api9.desc:
"""Test creating a new bridge by given id.</br>
The id must be of format '{type}:{name}'."""
desc_api9.label:
"""Test Bridge Creation"""
desc_bridge_metrics.desc:
"""Get bridge metrics by id."""
desc_bridge_metrics.label:
"""Get Bridge Metrics"""
desc_enable_bridge.desc:
"""Enable or Disable bridges on all nodes in the cluster."""
desc_enable_bridge.label:
"""Cluster Bridge Enable"""
desc_param_path_enable.desc:
"""Whether to enable this bridge."""
desc_param_path_enable.label:
"""Enable bridge"""
desc_param_path_id.desc:
"""The bridge id. Must be of format {type}:{name}."""
desc_param_path_id.label:
"""Bridge ID"""
desc_param_path_node.desc:
"""The node name, e.g. 'emqx@127.0.0.1'."""
desc_param_path_node.label:
"""The node name"""
desc_param_path_operation_cluster.desc:
"""Operations can be one of: 'stop' or 'restart'."""
desc_param_path_operation_cluster.label:
"""Cluster Operation"""
desc_param_path_operation_on_node.desc:
"""Operations can be one of: 'stop' or 'restart'."""
desc_param_path_operation_on_node.label:
"""Node Operation """
}