fix: add missing parts

This commit is contained in:
Stefan Strigler 2023-10-04 13:02:27 +02:00 committed by Zaiming (Stone) Shi
parent ed8aa46602
commit 7641c22455
4 changed files with 252 additions and 158 deletions

View File

@ -42,6 +42,30 @@
'/connectors_probe'/2 '/connectors_probe'/2
]). ]).
-export([lookup_from_local_node/2]).
-define(CONNECTOR_NOT_ENABLED,
?BAD_REQUEST(<<"Forbidden operation, connector not enabled">>)
).
-define(CONNECTOR_NOT_FOUND(CONNECTOR_TYPE, CONNECTOR_NAME),
?NOT_FOUND(
<<"Connector lookup failed: connector named '", (bin(CONNECTOR_NAME))/binary, "' of type ",
(bin(CONNECTOR_TYPE))/binary, " does not exist.">>
)
).
%% Don't turn connector_name to atom, it's maybe not a existing atom.
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_connector_resource:parse_connector_id(Id, #{atom_name => false}) of
{ConnectorType, ConnectorName} ->
EXPR
catch
throw:#{reason := Reason} ->
?NOT_FOUND(<<"Invalid connector ID, ", Reason/binary>>)
end
).
namespace() -> "connector". namespace() -> "connector".
api_spec() -> api_spec() ->
@ -412,7 +436,7 @@ schema("/connectors_probe") ->
end; end;
'/connectors'(get, _Params) -> '/connectors'(get, _Params) ->
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
NodeReplies = emqx_connector_proto_v4:list_connectors_on_nodes(Nodes), NodeReplies = emqx_connector_proto_v1:list_connectors_on_nodes(Nodes),
case is_ok(NodeReplies) of case is_ok(NodeReplies) of
{ok, NodeConnectors} -> {ok, NodeConnectors} ->
AllConnectors = [ AllConnectors = [
@ -439,17 +463,11 @@ schema("/connectors_probe") ->
?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName)
end end
); );
'/connectors/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) -> '/connectors/:id'(delete, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(
Id, Id,
case emqx_connector:lookup(ConnectorType, ConnectorName) of case emqx_connector:lookup(ConnectorType, ConnectorName) of
{ok, _} -> {ok, _} ->
AlsoDeleteActs =
case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of
<<"true">> -> true;
true -> true;
_ -> false
end,
case emqx_connector:remove(ConnectorType, ConnectorName) of case emqx_connector:remove(ConnectorType, ConnectorName) of
{ok, _} -> {ok, _} ->
?NO_CONTENT; ?NO_CONTENT;
@ -508,20 +526,10 @@ maybe_deobfuscate_connector_probe(
maybe_deobfuscate_connector_probe(Params) -> maybe_deobfuscate_connector_probe(Params) ->
Params. Params.
get_metrics_from_all_nodes(ConnectorType, ConnectorName) ->
Nodes = mria:running_nodes(),
Result = do_bpapi_call(all, get_metrics_from_all_nodes, [Nodes, ConnectorType, ConnectorName]),
case Result of
Metrics when is_list(Metrics) ->
{200, format_connector_metrics(lists:zip(Nodes, Metrics))};
{error, Reason} ->
?INTERNAL_ERROR(Reason)
end.
lookup_from_all_nodes(ConnectorType, ConnectorName, SuccCode) -> lookup_from_all_nodes(ConnectorType, ConnectorName, SuccCode) ->
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
case case
is_ok(emqx_connector_proto_v4:lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName)) is_ok(emqx_connector_proto_v1:lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName))
of of
{ok, [{ok, _} | _] = Results} -> {ok, [{ok, _} | _] = Results} ->
{SuccCode, format_connector_info([R || {ok, R} <- Results])}; {SuccCode, format_connector_info([R || {ok, R} <- Results])};
@ -551,9 +559,6 @@ create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) -
?BAD_REQUEST(map_to_json(redact(Reason))) ?BAD_REQUEST(map_to_json(redact(Reason)))
end. end.
get_metrics_from_local_node(ConnectorType, ConnectorName) ->
format_metrics(emqx_connector:get_metrics(ConnectorType, ConnectorName)).
'/connectors/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> '/connectors/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(
Id, Id,
@ -701,20 +706,6 @@ format_connector_info([FirstConnector | _] = Connectors) ->
node_status => NodeStatus node_status => NodeStatus
}). }).
format_connector_metrics(Connectors) ->
FilteredConnectors = lists:filter(
fun
({_Node, Metric}) when is_map(Metric) -> true;
(_) -> false
end,
Connectors
),
NodeMetrics = collect_metrics(FilteredConnectors),
#{
metrics => aggregate_metrics(NodeMetrics),
node_metrics => NodeMetrics
}.
node_status(Connectors) -> node_status(Connectors) ->
[maps:with([node, status, status_reason], B) || B <- Connectors]. [maps:with([node, status, status_reason], B) || B <- Connectors].
@ -727,43 +718,6 @@ aggregate_status(AllStatus) ->
false -> inconsistent false -> inconsistent
end. end.
collect_metrics(Connectors) ->
[#{node => Node, metrics => Metrics} || {Node, Metrics} <- Connectors].
aggregate_metrics(AllMetrics) ->
InitMetrics = ?EMPTY_METRICS,
lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics).
aggregate_metrics(
#{
metrics := ?metrics(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
)
},
?metrics(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
)
) ->
?METRICS(
M1 + N1,
M2 + N2,
M3 + N3,
M4 + N4,
M5 + N5,
M6 + N6,
M7 + N7,
M8 + N8,
M9 + N9,
M10 + N10,
M11 + N11,
M12 + N12,
M13 + N13,
M14 + N14,
M15 + N15,
M16 + N16,
M17 + N17
).
format_resource( format_resource(
#{ #{
type := Type, type := Type,
@ -794,88 +748,6 @@ format_resource_data(error, Error, Result) ->
format_resource_data(K, V, Result) -> format_resource_data(K, V, Result) ->
Result#{K => V}. Result#{K => V}.
format_metrics(#{
counters := #{
'dropped' := Dropped,
'dropped.other' := DroppedOther,
'dropped.expired' := DroppedExpired,
'dropped.queue_full' := DroppedQueueFull,
'dropped.resource_not_found' := DroppedResourceNotFound,
'dropped.resource_stopped' := DroppedResourceStopped,
'matched' := Matched,
'retried' := Retried,
'late_reply' := LateReply,
'failed' := SentFailed,
'success' := SentSucc,
'received' := Rcvd
},
gauges := Gauges,
rate := #{
matched := #{current := Rate, last5m := Rate5m, max := RateMax}
}
}) ->
Queued = maps:get('queuing', Gauges, 0),
SentInflight = maps:get('inflight', Gauges, 0),
?METRICS(
Dropped,
DroppedOther,
DroppedExpired,
DroppedQueueFull,
DroppedResourceNotFound,
DroppedResourceStopped,
Matched,
Queued,
Retried,
LateReply,
SentFailed,
SentInflight,
SentSucc,
Rate,
Rate5m,
RateMax,
Rcvd
);
format_metrics(_Metrics) ->
%% Empty metrics: can happen when a node joins another and a
%% connector is not yet replicated to it, so the counters map is
%% empty.
?METRICS(
_Dropped = 0,
_DroppedOther = 0,
_DroppedExpired = 0,
_DroppedQueueFull = 0,
_DroppedResourceNotFound = 0,
_DroppedResourceStopped = 0,
_Matched = 0,
_Queued = 0,
_Retried = 0,
_LateReply = 0,
_SentFailed = 0,
_SentInflight = 0,
_SentSucc = 0,
_Rate = 0,
_Rate5m = 0,
_RateMax = 0,
_Rcvd = 0
).
fill_defaults(Type, RawConf) ->
PackedConf = pack_connector_conf(Type, RawConf),
FullConf = emqx_config:fill_defaults(emqx_connector_schema, PackedConf, #{}),
unpack_connector_conf(Type, FullConf).
pack_connector_conf(Type, RawConf) ->
#{<<"connectors">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
filter_raw_conf(_TypeBin, RawConf) ->
RawConf.
unpack_connector_conf(Type, PackedConf) ->
TypeBin = bin(Type),
#{<<"connectors">> := Connectors} = PackedConf,
#{<<"foo">> := RawConf} = maps:get(TypeBin, Connectors),
filter_raw_conf(TypeBin, RawConf).
is_ok(ok) -> is_ok(ok) ->
ok; ok;
is_ok(OkResult = {ok, _}) -> is_ok(OkResult = {ok, _}) ->
@ -905,8 +777,6 @@ filter_out_request_body(Conf) ->
<<"status">>, <<"status">>,
<<"status_reason">>, <<"status_reason">>,
<<"node_status">>, <<"node_status">>,
<<"node_metrics">>,
<<"metrics">>,
<<"node">> <<"node">>
], ],
maps:without(ExtraConfs, Conf). maps:without(ExtraConfs, Conf).
@ -985,7 +855,6 @@ maybe_unwrap(RpcMulticallResult) ->
supported_versions(start_connector_to_node) -> [2, 3, 4]; supported_versions(start_connector_to_node) -> [2, 3, 4];
supported_versions(start_connectors_to_all_nodes) -> [2, 3, 4]; supported_versions(start_connectors_to_all_nodes) -> [2, 3, 4];
supported_versions(get_metrics_from_all_nodes) -> [4];
supported_versions(_Call) -> [1, 2, 3, 4]. supported_versions(_Call) -> [1, 2, 3, 4].
redact(Term) -> redact(Term) ->

View File

@ -0,0 +1,123 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_connector_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
list_connectors_on_nodes/1,
restart_connector_to_node/3,
start_connector_to_node/3,
stop_connector_to_node/3,
lookup_from_all_nodes/3,
restart_connectors_to_all_nodes/3,
start_connectors_to_all_nodes/3,
stop_connectors_to_all_nodes/3
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 15000).
introduced_in() ->
"5.3.1".
-spec list_connectors_on_nodes([node()]) ->
emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
list_connectors_on_nodes(Nodes) ->
erpc:multicall(Nodes, emqx_connector, list, [], ?TIMEOUT).
-type key() :: atom() | binary() | [byte()].
-spec restart_connector_to_node(node(), key(), key()) ->
term().
restart_connector_to_node(Node, ConnectorType, ConnectorName) ->
rpc:call(
Node,
emqx_connector_resource,
restart,
[ConnectorType, ConnectorName],
?TIMEOUT
).
-spec start_connector_to_node(node(), key(), key()) ->
term().
start_connector_to_node(Node, ConnectorType, ConnectorName) ->
rpc:call(
Node,
emqx_connector_resource,
start,
[ConnectorType, ConnectorName],
?TIMEOUT
).
-spec stop_connector_to_node(node(), key(), key()) ->
term().
stop_connector_to_node(Node, ConnectorType, ConnectorName) ->
rpc:call(
Node,
emqx_connector_resource,
stop,
[ConnectorType, ConnectorName],
?TIMEOUT
).
-spec restart_connectors_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
restart_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) ->
erpc:multicall(
Nodes,
emqx_connector_resource,
restart,
[ConnectorType, ConnectorName],
?TIMEOUT
).
-spec start_connectors_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
start_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) ->
erpc:multicall(
Nodes,
emqx_connector_resource,
start,
[ConnectorType, ConnectorName],
?TIMEOUT
).
-spec stop_connectors_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
stop_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) ->
erpc:multicall(
Nodes,
emqx_connector_resource,
stop,
[ConnectorType, ConnectorName],
?TIMEOUT
).
-spec lookup_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName) ->
erpc:multicall(
Nodes,
emqx_connector_api,
lookup_from_local_node,
[ConnectorType, ConnectorName],
?TIMEOUT
).

View File

@ -25,6 +25,8 @@
-export([roots/0, fields/1, desc/1, namespace/0, tags/0]). -export([roots/0, fields/1, desc/1, namespace/0, tags/0]).
-export([get_response/0, put_request/0, post_request/0]).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
enterprise_api_schemas(Method) -> enterprise_api_schemas(Method) ->
%% We *must* do this to ensure the module is really loaded, especially when we use %% We *must* do this to ensure the module is really loaded, especially when we use

View File

@ -0,0 +1,100 @@
emqx_connector_api {
desc_api1.desc:
"""List all created connectors"""
desc_api1.label:
"""List All Connectors"""
desc_api2.desc:
"""Create a new connector by type and name"""
desc_api2.label:
"""Create Connector"""
desc_api3.desc:
"""Get a connector by Id"""
desc_api3.label:
"""Get Connector"""
desc_api4.desc:
"""Update a connector by Id"""
desc_api4.label:
"""Update Connector"""
desc_api5.desc:
"""Delete a connector by Id"""
desc_api5.label:
"""Delete Connector"""
desc_api6.desc:
"""Reset a connector metrics by Id"""
desc_api6.label:
"""Reset Connector Metrics"""
desc_api7.desc:
"""Stop/Restart connectors on all nodes in the cluster."""
desc_api7.label:
"""Cluster Connector Operate"""
desc_api8.desc:
"""Stop/Restart connectors on a specific node."""
desc_api8.label:
"""Node Connector Operate"""
desc_api9.desc:
"""Test creating a new connector by given ID </br>
The ID must be of format '{type}:{name}'"""
desc_api9.label:
"""Test Connector Creation"""
desc_connector_metrics.desc:
"""Get connector metrics by Id"""
desc_connector_metrics.label:
"""Get Connector Metrics"""
desc_enable_connector.desc:
"""Enable or Disable connectors on all nodes in the cluster."""
desc_enable_connector.label:
"""Cluster Connector Enable"""
desc_param_path_enable.desc:
"""Whether to enable this connector"""
desc_param_path_enable.label:
"""Enable connector"""
desc_param_path_id.desc:
"""The connector Id. Must be of format {type}:{name}"""
desc_param_path_id.label:
"""Connector ID"""
desc_param_path_node.desc:
"""The node name, e.g. emqx@127.0.0.1"""
desc_param_path_node.label:
"""The node name"""
desc_param_path_operation_cluster.desc:
"""Operations can be one of: stop, restart"""
desc_param_path_operation_cluster.label:
"""Cluster Operation"""
desc_param_path_operation_on_node.desc:
"""Operations can be one of: stop, restart"""
desc_param_path_operation_on_node.label:
"""Node Operation """
}