diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 3e48a3973..90baa2aed 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -42,6 +42,30 @@ '/connectors_probe'/2 ]). +-export([lookup_from_local_node/2]). + +-define(CONNECTOR_NOT_ENABLED, + ?BAD_REQUEST(<<"Forbidden operation, connector not enabled">>) +). + +-define(CONNECTOR_NOT_FOUND(CONNECTOR_TYPE, CONNECTOR_NAME), + ?NOT_FOUND( + <<"Connector lookup failed: connector named '", (bin(CONNECTOR_NAME))/binary, "' of type ", + (bin(CONNECTOR_TYPE))/binary, " does not exist.">> + ) +). + +%% Don't turn connector_name to atom, it's maybe not a existing atom. +-define(TRY_PARSE_ID(ID, EXPR), + try emqx_connector_resource:parse_connector_id(Id, #{atom_name => false}) of + {ConnectorType, ConnectorName} -> + EXPR + catch + throw:#{reason := Reason} -> + ?NOT_FOUND(<<"Invalid connector ID, ", Reason/binary>>) + end +). + namespace() -> "connector". api_spec() -> @@ -412,7 +436,7 @@ schema("/connectors_probe") -> end; '/connectors'(get, _Params) -> Nodes = mria:running_nodes(), - NodeReplies = emqx_connector_proto_v4:list_connectors_on_nodes(Nodes), + NodeReplies = emqx_connector_proto_v1:list_connectors_on_nodes(Nodes), case is_ok(NodeReplies) of {ok, NodeConnectors} -> AllConnectors = [ @@ -439,17 +463,11 @@ schema("/connectors_probe") -> ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) end ); -'/connectors/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) -> +'/connectors/:id'(delete, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID( Id, case emqx_connector:lookup(ConnectorType, ConnectorName) of {ok, _} -> - AlsoDeleteActs = - case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of - <<"true">> -> true; - true -> true; - _ -> false - end, case emqx_connector:remove(ConnectorType, ConnectorName) of {ok, _} -> ?NO_CONTENT; @@ -508,20 +526,10 @@ maybe_deobfuscate_connector_probe( maybe_deobfuscate_connector_probe(Params) -> Params. -get_metrics_from_all_nodes(ConnectorType, ConnectorName) -> - Nodes = mria:running_nodes(), - Result = do_bpapi_call(all, get_metrics_from_all_nodes, [Nodes, ConnectorType, ConnectorName]), - case Result of - Metrics when is_list(Metrics) -> - {200, format_connector_metrics(lists:zip(Nodes, Metrics))}; - {error, Reason} -> - ?INTERNAL_ERROR(Reason) - end. - lookup_from_all_nodes(ConnectorType, ConnectorName, SuccCode) -> Nodes = mria:running_nodes(), case - is_ok(emqx_connector_proto_v4:lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName)) + is_ok(emqx_connector_proto_v1:lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName)) of {ok, [{ok, _} | _] = Results} -> {SuccCode, format_connector_info([R || {ok, R} <- Results])}; @@ -551,9 +559,6 @@ create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) - ?BAD_REQUEST(map_to_json(redact(Reason))) end. -get_metrics_from_local_node(ConnectorType, ConnectorName) -> - format_metrics(emqx_connector:get_metrics(ConnectorType, ConnectorName)). - '/connectors/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> ?TRY_PARSE_ID( Id, @@ -701,20 +706,6 @@ format_connector_info([FirstConnector | _] = Connectors) -> node_status => NodeStatus }). -format_connector_metrics(Connectors) -> - FilteredConnectors = lists:filter( - fun - ({_Node, Metric}) when is_map(Metric) -> true; - (_) -> false - end, - Connectors - ), - NodeMetrics = collect_metrics(FilteredConnectors), - #{ - metrics => aggregate_metrics(NodeMetrics), - node_metrics => NodeMetrics - }. - node_status(Connectors) -> [maps:with([node, status, status_reason], B) || B <- Connectors]. @@ -727,43 +718,6 @@ aggregate_status(AllStatus) -> false -> inconsistent end. -collect_metrics(Connectors) -> - [#{node => Node, metrics => Metrics} || {Node, Metrics} <- Connectors]. - -aggregate_metrics(AllMetrics) -> - InitMetrics = ?EMPTY_METRICS, - lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics). - -aggregate_metrics( - #{ - metrics := ?metrics( - M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17 - ) - }, - ?metrics( - N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17 - ) -) -> - ?METRICS( - M1 + N1, - M2 + N2, - M3 + N3, - M4 + N4, - M5 + N5, - M6 + N6, - M7 + N7, - M8 + N8, - M9 + N9, - M10 + N10, - M11 + N11, - M12 + N12, - M13 + N13, - M14 + N14, - M15 + N15, - M16 + N16, - M17 + N17 - ). - format_resource( #{ type := Type, @@ -794,88 +748,6 @@ format_resource_data(error, Error, Result) -> format_resource_data(K, V, Result) -> Result#{K => V}. -format_metrics(#{ - counters := #{ - 'dropped' := Dropped, - 'dropped.other' := DroppedOther, - 'dropped.expired' := DroppedExpired, - 'dropped.queue_full' := DroppedQueueFull, - 'dropped.resource_not_found' := DroppedResourceNotFound, - 'dropped.resource_stopped' := DroppedResourceStopped, - 'matched' := Matched, - 'retried' := Retried, - 'late_reply' := LateReply, - 'failed' := SentFailed, - 'success' := SentSucc, - 'received' := Rcvd - }, - gauges := Gauges, - rate := #{ - matched := #{current := Rate, last5m := Rate5m, max := RateMax} - } -}) -> - Queued = maps:get('queuing', Gauges, 0), - SentInflight = maps:get('inflight', Gauges, 0), - ?METRICS( - Dropped, - DroppedOther, - DroppedExpired, - DroppedQueueFull, - DroppedResourceNotFound, - DroppedResourceStopped, - Matched, - Queued, - Retried, - LateReply, - SentFailed, - SentInflight, - SentSucc, - Rate, - Rate5m, - RateMax, - Rcvd - ); -format_metrics(_Metrics) -> - %% Empty metrics: can happen when a node joins another and a - %% connector is not yet replicated to it, so the counters map is - %% empty. - ?METRICS( - _Dropped = 0, - _DroppedOther = 0, - _DroppedExpired = 0, - _DroppedQueueFull = 0, - _DroppedResourceNotFound = 0, - _DroppedResourceStopped = 0, - _Matched = 0, - _Queued = 0, - _Retried = 0, - _LateReply = 0, - _SentFailed = 0, - _SentInflight = 0, - _SentSucc = 0, - _Rate = 0, - _Rate5m = 0, - _RateMax = 0, - _Rcvd = 0 - ). - -fill_defaults(Type, RawConf) -> - PackedConf = pack_connector_conf(Type, RawConf), - FullConf = emqx_config:fill_defaults(emqx_connector_schema, PackedConf, #{}), - unpack_connector_conf(Type, FullConf). - -pack_connector_conf(Type, RawConf) -> - #{<<"connectors">> => #{bin(Type) => #{<<"foo">> => RawConf}}}. - -filter_raw_conf(_TypeBin, RawConf) -> - RawConf. - -unpack_connector_conf(Type, PackedConf) -> - TypeBin = bin(Type), - #{<<"connectors">> := Connectors} = PackedConf, - #{<<"foo">> := RawConf} = maps:get(TypeBin, Connectors), - filter_raw_conf(TypeBin, RawConf). - is_ok(ok) -> ok; is_ok(OkResult = {ok, _}) -> @@ -905,8 +777,6 @@ filter_out_request_body(Conf) -> <<"status">>, <<"status_reason">>, <<"node_status">>, - <<"node_metrics">>, - <<"metrics">>, <<"node">> ], maps:without(ExtraConfs, Conf). @@ -985,7 +855,6 @@ maybe_unwrap(RpcMulticallResult) -> supported_versions(start_connector_to_node) -> [2, 3, 4]; supported_versions(start_connectors_to_all_nodes) -> [2, 3, 4]; -supported_versions(get_metrics_from_all_nodes) -> [4]; supported_versions(_Call) -> [1, 2, 3, 4]. redact(Term) -> diff --git a/apps/emqx_connector/src/proto/emqx_connector_proto_v1.erl b/apps/emqx_connector/src/proto/emqx_connector_proto_v1.erl new file mode 100644 index 000000000..df4d0825b --- /dev/null +++ b/apps/emqx_connector/src/proto/emqx_connector_proto_v1.erl @@ -0,0 +1,123 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_connector_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + list_connectors_on_nodes/1, + restart_connector_to_node/3, + start_connector_to_node/3, + stop_connector_to_node/3, + lookup_from_all_nodes/3, + restart_connectors_to_all_nodes/3, + start_connectors_to_all_nodes/3, + stop_connectors_to_all_nodes/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.3.1". + +-spec list_connectors_on_nodes([node()]) -> + emqx_rpc:erpc_multicall([emqx_resource:resource_data()]). +list_connectors_on_nodes(Nodes) -> + erpc:multicall(Nodes, emqx_connector, list, [], ?TIMEOUT). + +-type key() :: atom() | binary() | [byte()]. + +-spec restart_connector_to_node(node(), key(), key()) -> + term(). +restart_connector_to_node(Node, ConnectorType, ConnectorName) -> + rpc:call( + Node, + emqx_connector_resource, + restart, + [ConnectorType, ConnectorName], + ?TIMEOUT + ). + +-spec start_connector_to_node(node(), key(), key()) -> + term(). +start_connector_to_node(Node, ConnectorType, ConnectorName) -> + rpc:call( + Node, + emqx_connector_resource, + start, + [ConnectorType, ConnectorName], + ?TIMEOUT + ). + +-spec stop_connector_to_node(node(), key(), key()) -> + term(). +stop_connector_to_node(Node, ConnectorType, ConnectorName) -> + rpc:call( + Node, + emqx_connector_resource, + stop, + [ConnectorType, ConnectorName], + ?TIMEOUT + ). + +-spec restart_connectors_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +restart_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) -> + erpc:multicall( + Nodes, + emqx_connector_resource, + restart, + [ConnectorType, ConnectorName], + ?TIMEOUT + ). + +-spec start_connectors_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +start_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) -> + erpc:multicall( + Nodes, + emqx_connector_resource, + start, + [ConnectorType, ConnectorName], + ?TIMEOUT + ). + +-spec stop_connectors_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +stop_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) -> + erpc:multicall( + Nodes, + emqx_connector_resource, + stop, + [ConnectorType, ConnectorName], + ?TIMEOUT + ). + +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName) -> + erpc:multicall( + Nodes, + emqx_connector_api, + lookup_from_local_node, + [ConnectorType, ConnectorName], + ?TIMEOUT + ). diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index a80ada754..f5f2afa64 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -25,6 +25,8 @@ -export([roots/0, fields/1, desc/1, namespace/0, tags/0]). +-export([get_response/0, put_request/0, post_request/0]). + -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use diff --git a/rel/i18n/emqx_connector_api.hocon b/rel/i18n/emqx_connector_api.hocon new file mode 100644 index 000000000..b9c5dcd2e --- /dev/null +++ b/rel/i18n/emqx_connector_api.hocon @@ -0,0 +1,100 @@ +emqx_connector_api { + +desc_api1.desc: +"""List all created connectors""" + +desc_api1.label: +"""List All Connectors""" + +desc_api2.desc: +"""Create a new connector by type and name""" + +desc_api2.label: +"""Create Connector""" + +desc_api3.desc: +"""Get a connector by Id""" + +desc_api3.label: +"""Get Connector""" + +desc_api4.desc: +"""Update a connector by Id""" + +desc_api4.label: +"""Update Connector""" + +desc_api5.desc: +"""Delete a connector by Id""" + +desc_api5.label: +"""Delete Connector""" + +desc_api6.desc: +"""Reset a connector metrics by Id""" + +desc_api6.label: +"""Reset Connector Metrics""" + +desc_api7.desc: +"""Stop/Restart connectors on all nodes in the cluster.""" + +desc_api7.label: +"""Cluster Connector Operate""" + +desc_api8.desc: +"""Stop/Restart connectors on a specific node.""" + +desc_api8.label: +"""Node Connector Operate""" + +desc_api9.desc: +"""Test creating a new connector by given ID
+The ID must be of format '{type}:{name}'""" + +desc_api9.label: +"""Test Connector Creation""" + +desc_connector_metrics.desc: +"""Get connector metrics by Id""" + +desc_connector_metrics.label: +"""Get Connector Metrics""" + +desc_enable_connector.desc: +"""Enable or Disable connectors on all nodes in the cluster.""" + +desc_enable_connector.label: +"""Cluster Connector Enable""" + +desc_param_path_enable.desc: +"""Whether to enable this connector""" + +desc_param_path_enable.label: +"""Enable connector""" + +desc_param_path_id.desc: +"""The connector Id. Must be of format {type}:{name}""" + +desc_param_path_id.label: +"""Connector ID""" + +desc_param_path_node.desc: +"""The node name, e.g. emqx@127.0.0.1""" + +desc_param_path_node.label: +"""The node name""" + +desc_param_path_operation_cluster.desc: +"""Operations can be one of: stop, restart""" + +desc_param_path_operation_cluster.label: +"""Cluster Operation""" + +desc_param_path_operation_on_node.desc: +"""Operations can be one of: stop, restart""" + +desc_param_path_operation_on_node.label: +"""Node Operation """ + +}