From ed8aa466028fc0a35dfb28757256d020e4be20cc Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 3 Oct 2023 18:02:23 +0200 Subject: [PATCH] refactor: copy bridge api code over to emqx_connector --- .../emqx_connector/src/emqx_connector_api.erl | 1026 +++++++++++++++++ .../src/schema/emqx_connector_ee_schema.erl | 47 +- .../src/schema/emqx_connector_schema.erl | 55 + 3 files changed, 1120 insertions(+), 8 deletions(-) create mode 100644 apps/emqx_connector/src/emqx_connector_api.erl diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl new file mode 100644 index 000000000..3e48a3973 --- /dev/null +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -0,0 +1,1026 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_connector_api). + +-behaviour(minirest_api). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx_utils/include/emqx_utils_api.hrl"). + +-import(hoconsc, [mk/2, array/1, enum/1]). + +%% Swagger specs from hocon schema +-export([ + api_spec/0, + paths/0, + schema/1, + namespace/0 +]). + +%% API callbacks +-export([ + '/connectors'/2, + '/connectors/:id'/2, + '/connectors/:id/enable/:enable'/2, + '/connectors/:id/:operation'/2, + '/nodes/:node/connectors/:id/:operation'/2, + '/connectors_probe'/2 +]). + +namespace() -> "connector". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). + +paths() -> + [ + "/connectors", + "/connectors/:id", + "/connectors/:id/enable/:enable", + "/connectors/:id/:operation", + "/nodes/:node/connectors/:id/:operation", + "/connectors_probe" + ]. + +error_schema(Code, Message) when is_atom(Code) -> + error_schema([Code], Message); +error_schema(Codes, Message) when is_list(Message) -> + error_schema(Codes, list_to_binary(Message)); +error_schema(Codes, Message) when is_list(Codes) andalso is_binary(Message) -> + emqx_dashboard_swagger:error_codes(Codes, Message). + +get_response_body_schema() -> + emqx_dashboard_swagger:schema_with_examples( + emqx_bridge_schema:get_response(), + connector_info_examples(get) + ). + +param_path_operation_cluster() -> + {operation, + mk( + enum([start, stop, restart]), + #{ + in => path, + required => true, + example => <<"start">>, + desc => ?DESC("desc_param_path_operation_cluster") + } + )}. + +param_path_operation_on_node() -> + {operation, + mk( + enum([start, stop, restart]), + #{ + in => path, + required => true, + example => <<"start">>, + desc => ?DESC("desc_param_path_operation_on_node") + } + )}. + +param_path_node() -> + {node, + mk( + binary(), + #{ + in => path, + required => true, + example => <<"emqx@127.0.0.1">>, + desc => ?DESC("desc_param_path_node") + } + )}. + +param_path_id() -> + {id, + mk( + binary(), + #{ + in => path, + required => true, + example => <<"webhook:webhook_example">>, + desc => ?DESC("desc_param_path_id") + } + )}. + +param_path_enable() -> + {enable, + mk( + boolean(), + #{ + in => path, + required => true, + desc => ?DESC("desc_param_path_enable"), + example => true + } + )}. + +connector_info_array_example(Method) -> + lists:map(fun(#{value := Config}) -> Config end, maps:values(connector_info_examples(Method))). + +connector_info_examples(Method) -> + maps:merge( + #{ + <<"webhook_example">> => #{ + summary => <<"WebHook">>, + value => info_example(webhook, Method) + }, + <<"mqtt_example">> => #{ + summary => <<"MQTT Connector">>, + value => info_example(mqtt, Method) + } + }, + emqx_enterprise_connector_examples(Method) + ). + +-if(?EMQX_RELEASE_EDITION == ee). +emqx_enterprise_connector_examples(Method) -> + emqx_connector_ee_schema:examples(Method). +-else. +emqx_enterprise_connector_examples(_Method) -> #{}. +-endif. + +info_example(Type, Method) -> + maps:merge( + info_example_basic(Type), + method_example(Type, Method) + ). + +method_example(Type, Method) when Method == get; Method == post -> + SType = atom_to_list(Type), + SName = SType ++ "_example", + #{ + type => bin(SType), + name => bin(SName) + }; +method_example(_Type, put) -> + #{}. + +info_example_basic(webhook) -> + #{ + enable => true, + url => <<"http://localhost:9901/messages/${topic}">>, + request_timeout => <<"15s">>, + connect_timeout => <<"15s">>, + max_retries => 3, + pool_type => <<"random">>, + pool_size => 4, + enable_pipelining => 100, + ssl => #{enable => false}, + local_topic => <<"emqx_webhook/#">>, + method => post, + body => <<"${payload}">>, + resource_opts => #{ + worker_pool_size => 1, + health_check_interval => 15000, + query_mode => async, + inflight_window => 100, + max_buffer_bytes => 100 * 1024 * 1024 + } + }; +info_example_basic(mqtt) -> + (mqtt_main_example())#{ + egress => mqtt_egress_example(), + ingress => mqtt_ingress_example() + }. + +mqtt_main_example() -> + #{ + enable => true, + server => <<"127.0.0.1:1883">>, + proto_ver => <<"v4">>, + username => <<"foo">>, + password => <<"******">>, + clean_start => true, + keepalive => <<"300s">>, + retry_interval => <<"15s">>, + max_inflight => 100, + resource_opts => #{ + health_check_interval => <<"15s">>, + query_mode => sync, + max_buffer_bytes => 100 * 1024 * 1024 + }, + ssl => #{ + enable => false + } + }. +mqtt_egress_example() -> + #{ + local => #{ + topic => <<"emqx/#">> + }, + remote => #{ + topic => <<"from_emqx/${topic}">>, + qos => <<"${qos}">>, + payload => <<"${payload}">>, + retain => false + } + }. +mqtt_ingress_example() -> + #{ + remote => #{ + topic => <<"aws/#">>, + qos => 1 + }, + local => #{ + topic => <<"from_aws/${topic}">>, + qos => <<"${qos}">>, + payload => <<"${payload}">>, + retain => <<"${retain}">> + } + }. + +schema("/connectors") -> + #{ + 'operationId' => '/connectors', + get => #{ + tags => [<<"connectors">>], + summary => <<"List connectors">>, + description => ?DESC("desc_api1"), + responses => #{ + 200 => emqx_dashboard_swagger:schema_with_example( + array(emqx_connector_schema:get_response()), + connector_info_array_example(get) + ) + } + }, + post => #{ + tags => [<<"connectors">>], + summary => <<"Create connector">>, + description => ?DESC("desc_api2"), + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_connector_schema:post_request(), + connector_info_examples(post) + ), + responses => #{ + 201 => get_response_body_schema(), + 400 => error_schema('ALREADY_EXISTS', "Connector already exists") + } + } + }; +schema("/connectors/:id") -> + #{ + 'operationId' => '/connectors/:id', + get => #{ + tags => [<<"connectors">>], + summary => <<"Get connector">>, + description => ?DESC("desc_api3"), + parameters => [param_path_id()], + responses => #{ + 200 => get_response_body_schema(), + 404 => error_schema('NOT_FOUND', "Connector not found") + } + }, + put => #{ + tags => [<<"connectors">>], + summary => <<"Update connector">>, + description => ?DESC("desc_api4"), + parameters => [param_path_id()], + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_connector_schema:put_request(), + connector_info_examples(put) + ), + responses => #{ + 200 => get_response_body_schema(), + 404 => error_schema('NOT_FOUND', "Connector not found"), + 400 => error_schema('BAD_REQUEST', "Update connector failed") + } + }, + delete => #{ + tags => [<<"connectors">>], + summary => <<"Delete connector">>, + description => ?DESC("desc_api5"), + parameters => [param_path_id()], + responses => #{ + 204 => <<"Connector deleted">>, + 400 => error_schema( + 'BAD_REQUEST', + "Cannot delete connector while active rules are defined for this connector" + ), + 404 => error_schema('NOT_FOUND', "Connector not found"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; +schema("/connectors/:id/enable/:enable") -> + #{ + 'operationId' => '/connectors/:id/enable/:enable', + put => + #{ + tags => [<<"connectors">>], + summary => <<"Enable or disable connector">>, + desc => ?DESC("desc_enable_connector"), + parameters => [param_path_id(), param_path_enable()], + responses => + #{ + 204 => <<"Success">>, + 404 => error_schema( + 'NOT_FOUND', "Connector not found or invalid operation" + ), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; +schema("/connectors/:id/:operation") -> + #{ + 'operationId' => '/connectors/:id/:operation', + post => #{ + tags => [<<"connectors">>], + summary => <<"Stop or restart connector">>, + description => ?DESC("desc_api7"), + parameters => [ + param_path_id(), + param_path_operation_cluster() + ], + responses => #{ + 204 => <<"Operation success">>, + 400 => error_schema( + 'BAD_REQUEST', "Problem with configuration of external service" + ), + 404 => error_schema('NOT_FOUND', "Connector not found or invalid operation"), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; +schema("/nodes/:node/connectors/:id/:operation") -> + #{ + 'operationId' => '/nodes/:node/connectors/:id/:operation', + post => #{ + tags => [<<"connectors">>], + summary => <<"Stop/restart connector">>, + description => ?DESC("desc_api8"), + parameters => [ + param_path_node(), + param_path_id(), + param_path_operation_on_node() + ], + responses => #{ + 204 => <<"Operation success">>, + 400 => error_schema( + 'BAD_REQUEST', + "Problem with configuration of external service or connector not enabled" + ), + 404 => error_schema( + 'NOT_FOUND', "Connector or node not found or invalid operation" + ), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; +schema("/connectors_probe") -> + #{ + 'operationId' => '/connectors_probe', + post => #{ + tags => [<<"connectors">>], + desc => ?DESC("desc_api9"), + summary => <<"Test creating connector">>, + 'requestBody' => emqx_dashboard_swagger:schema_with_examples( + emqx_connector_schema:post_request(), + connector_info_examples(post) + ), + responses => #{ + 204 => <<"Test connector OK">>, + 400 => error_schema(['TEST_FAILED'], "connector test failed") + } + } + }. + +'/connectors'(post, #{body := #{<<"type">> := ConnectorType, <<"name">> := ConnectorName} = Conf0}) -> + case emqx_connector:lookup(ConnectorType, ConnectorName) of + {ok, _} -> + ?BAD_REQUEST('ALREADY_EXISTS', <<"connector already exists">>); + {error, not_found} -> + Conf = filter_out_request_body(Conf0), + create_connector(ConnectorType, ConnectorName, Conf) + end; +'/connectors'(get, _Params) -> + Nodes = mria:running_nodes(), + NodeReplies = emqx_connector_proto_v4:list_connectors_on_nodes(Nodes), + case is_ok(NodeReplies) of + {ok, NodeConnectors} -> + AllConnectors = [ + [format_resource(Data, Node) || Data <- Connectors] + || {Node, Connectors} <- lists:zip(Nodes, NodeConnectors) + ], + ?OK(zip_connectors(AllConnectors)); + {error, Reason} -> + ?INTERNAL_ERROR(Reason) + end. + +'/connectors/:id'(get, #{bindings := #{id := Id}}) -> + ?TRY_PARSE_ID(Id, lookup_from_all_nodes(ConnectorType, ConnectorName, 200)); +'/connectors/:id'(put, #{bindings := #{id := Id}, body := Conf0}) -> + Conf1 = filter_out_request_body(Conf0), + ?TRY_PARSE_ID( + Id, + case emqx_connector:lookup(ConnectorType, ConnectorName) of + {ok, _} -> + RawConf = emqx:get_raw_config([connectors, ConnectorType, ConnectorName], #{}), + Conf = deobfuscate(Conf1, RawConf), + update_connector(ConnectorType, ConnectorName, Conf); + {error, not_found} -> + ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) + end + ); +'/connectors/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) -> + ?TRY_PARSE_ID( + Id, + case emqx_connector:lookup(ConnectorType, ConnectorName) of + {ok, _} -> + AlsoDeleteActs = + case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of + <<"true">> -> true; + true -> true; + _ -> false + end, + case emqx_connector:remove(ConnectorType, ConnectorName) of + {ok, _} -> + ?NO_CONTENT; + {error, {active_channels, Channels}} -> + ?BAD_REQUEST( + {<<"Cannot delete connector while there are active channels defined for this connector">>, + Channels} + ); + {error, timeout} -> + ?SERVICE_UNAVAILABLE(<<"request timeout">>); + {error, Reason} -> + ?INTERNAL_ERROR(Reason) + end; + {error, not_found} -> + ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) + end + ). + +'/connectors_probe'(post, Request) -> + RequestMeta = #{module => ?MODULE, method => post, path => "/connectors_probe"}, + case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of + {ok, #{body := #{<<"type">> := ConnType} = Params}} -> + Params1 = maybe_deobfuscate_connector_probe(Params), + case + emqx_connector_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) + of + ok -> + ?NO_CONTENT; + {error, #{kind := validation_error} = Reason0} -> + Reason = redact(Reason0), + ?BAD_REQUEST('TEST_FAILED', map_to_json(Reason)); + {error, Reason0} when not is_tuple(Reason0); element(1, Reason0) =/= 'exit' -> + Reason1 = + case Reason0 of + {unhealthy_target, Message} -> Message; + _ -> Reason0 + end, + Reason = redact(Reason1), + ?BAD_REQUEST('TEST_FAILED', Reason) + end; + BadRequest -> + redact(BadRequest) + end. + +maybe_deobfuscate_connector_probe( + #{<<"type">> := ConnectorType, <<"name">> := ConnectorName} = Params +) -> + case emqx_connector:lookup(ConnectorType, ConnectorName) of + {ok, _} -> + RawConf = emqx:get_raw_config([connectors, ConnectorType, ConnectorName], #{}), + deobfuscate(Params, RawConf); + _ -> + %% A connector may be probed before it's created, so not finding it here is fine + Params + end; +maybe_deobfuscate_connector_probe(Params) -> + Params. + +get_metrics_from_all_nodes(ConnectorType, ConnectorName) -> + Nodes = mria:running_nodes(), + Result = do_bpapi_call(all, get_metrics_from_all_nodes, [Nodes, ConnectorType, ConnectorName]), + case Result of + Metrics when is_list(Metrics) -> + {200, format_connector_metrics(lists:zip(Nodes, Metrics))}; + {error, Reason} -> + ?INTERNAL_ERROR(Reason) + end. + +lookup_from_all_nodes(ConnectorType, ConnectorName, SuccCode) -> + Nodes = mria:running_nodes(), + case + is_ok(emqx_connector_proto_v4:lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName)) + of + {ok, [{ok, _} | _] = Results} -> + {SuccCode, format_connector_info([R || {ok, R} <- Results])}; + {ok, [{error, not_found} | _]} -> + ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName); + {error, Reason} -> + ?INTERNAL_ERROR(Reason) + end. + +lookup_from_local_node(ConnectorType, ConnectorName) -> + case emqx_connector:lookup(ConnectorType, ConnectorName) of + {ok, Res} -> {ok, format_resource(Res, node())}; + Error -> Error + end. + +create_connector(ConnectorType, ConnectorName, Conf) -> + create_or_update_connector(ConnectorType, ConnectorName, Conf, 201). + +update_connector(ConnectorType, ConnectorName, Conf) -> + create_or_update_connector(ConnectorType, ConnectorName, Conf, 200). + +create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) -> + case emqx_connector:create(ConnectorType, ConnectorName, Conf) of + {ok, _} -> + lookup_from_all_nodes(ConnectorType, ConnectorName, HttpStatusCode); + {error, Reason} when is_map(Reason) -> + ?BAD_REQUEST(map_to_json(redact(Reason))) + end. + +get_metrics_from_local_node(ConnectorType, ConnectorName) -> + format_metrics(emqx_connector:get_metrics(ConnectorType, ConnectorName)). + +'/connectors/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> + ?TRY_PARSE_ID( + Id, + case enable_func(Enable) of + invalid -> + ?NOT_FOUND(<<"Invalid operation">>); + OperFunc -> + case emqx_connector:disable_enable(OperFunc, ConnectorType, ConnectorName) of + {ok, _} -> + ?NO_CONTENT; + {error, {pre_config_update, _, connector_not_found}} -> + ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName); + {error, {_, _, timeout}} -> + ?SERVICE_UNAVAILABLE(<<"request timeout">>); + {error, timeout} -> + ?SERVICE_UNAVAILABLE(<<"request timeout">>); + {error, Reason} -> + ?INTERNAL_ERROR(Reason) + end + end + ). + +'/connectors/:id/:operation'(post, #{ + bindings := + #{id := Id, operation := Op} +}) -> + ?TRY_PARSE_ID( + Id, + case operation_to_all_func(Op) of + invalid -> + ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); + OperFunc -> + try is_enabled_connector(ConnectorType, ConnectorName) of + false -> + ?CONNECTOR_NOT_ENABLED; + true -> + Nodes = mria:running_nodes(), + call_operation(all, OperFunc, [Nodes, ConnectorType, ConnectorName]) + catch + throw:not_found -> + ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) + end + end + ). + +'/nodes/:node/connectors/:id/:operation'(post, #{ + bindings := + #{id := Id, operation := Op, node := Node} +}) -> + ?TRY_PARSE_ID( + Id, + case node_operation_func(Op) of + invalid -> + ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); + OperFunc -> + try is_enabled_connector(ConnectorType, ConnectorName) of + false -> + ?CONNECTOR_NOT_ENABLED; + true -> + case emqx_utils:safe_to_existing_atom(Node, utf8) of + {ok, TargetNode} -> + call_operation(TargetNode, OperFunc, [ + TargetNode, ConnectorType, ConnectorName + ]); + {error, _} -> + ?NOT_FOUND(<<"Invalid node name: ", Node/binary>>) + end + catch + throw:not_found -> + ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) + end + end + ). + +is_enabled_connector(ConnectorType, ConnectorName) -> + try emqx:get_config([connectors, ConnectorType, binary_to_existing_atom(ConnectorName)]) of + ConfMap -> + maps:get(enable, ConfMap, false) + catch + error:{config_not_found, _} -> + throw(not_found); + error:badarg -> + %% catch non-existing atom, + %% none-existing atom means it is not available in config PT storage. + throw(not_found) + end. + +node_operation_func(<<"restart">>) -> restart_connector_to_node; +node_operation_func(<<"start">>) -> start_connector_to_node; +node_operation_func(<<"stop">>) -> stop_connector_to_node; +node_operation_func(_) -> invalid. + +operation_to_all_func(<<"restart">>) -> restart_connectors_to_all_nodes; +operation_to_all_func(<<"start">>) -> start_connectors_to_all_nodes; +operation_to_all_func(<<"stop">>) -> stop_connectors_to_all_nodes; +operation_to_all_func(_) -> invalid. + +enable_func(<<"true">>) -> enable; +enable_func(<<"false">>) -> disable; +enable_func(_) -> invalid. + +zip_connectors([ConnectorsFirstNode | _] = ConnectorsAllNodes) -> + lists:foldl( + fun(#{type := Type, name := Name}, Acc) -> + Connectors = pick_connectors_by_id(Type, Name, ConnectorsAllNodes), + [format_connector_info(Connectors) | Acc] + end, + [], + ConnectorsFirstNode + ). + +pick_connectors_by_id(Type, Name, ConnectorsAllNodes) -> + lists:foldl( + fun(ConnectorsOneNode, Acc) -> + case + [ + Connector + || Connector = #{type := Type0, name := Name0} <- ConnectorsOneNode, + Type0 == Type, + Name0 == Name + ] + of + [ConnectorInfo] -> + [ConnectorInfo | Acc]; + [] -> + ?SLOG(warning, #{ + msg => "connector_inconsistent_in_cluster", + reason => not_found, + type => Type, + name => Name, + connector => emqx_connector_resource:connector_id(Type, Name) + }), + Acc + end + end, + [], + ConnectorsAllNodes + ). + +format_connector_info([FirstConnector | _] = Connectors) -> + Res = maps:remove(node, FirstConnector), + NodeStatus = node_status(Connectors), + redact(Res#{ + status => aggregate_status(NodeStatus), + node_status => NodeStatus + }). + +format_connector_metrics(Connectors) -> + FilteredConnectors = lists:filter( + fun + ({_Node, Metric}) when is_map(Metric) -> true; + (_) -> false + end, + Connectors + ), + NodeMetrics = collect_metrics(FilteredConnectors), + #{ + metrics => aggregate_metrics(NodeMetrics), + node_metrics => NodeMetrics + }. + +node_status(Connectors) -> + [maps:with([node, status, status_reason], B) || B <- Connectors]. + +aggregate_status(AllStatus) -> + Head = fun([A | _]) -> A end, + HeadVal = maps:get(status, Head(AllStatus), connecting), + AllRes = lists:all(fun(#{status := Val}) -> Val == HeadVal end, AllStatus), + case AllRes of + true -> HeadVal; + false -> inconsistent + end. + +collect_metrics(Connectors) -> + [#{node => Node, metrics => Metrics} || {Node, Metrics} <- Connectors]. + +aggregate_metrics(AllMetrics) -> + InitMetrics = ?EMPTY_METRICS, + lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics). + +aggregate_metrics( + #{ + metrics := ?metrics( + M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17 + ) + }, + ?metrics( + N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17 + ) +) -> + ?METRICS( + M1 + N1, + M2 + N2, + M3 + N3, + M4 + N4, + M5 + N5, + M6 + N6, + M7 + N7, + M8 + N8, + M9 + N9, + M10 + N10, + M11 + N11, + M12 + N12, + M13 + N13, + M14 + N14, + M15 + N15, + M16 + N16, + M17 + N17 + ). + +format_resource( + #{ + type := Type, + name := ConnectorName, + raw_config := RawConf, + resource_data := ResourceData + }, + Node +) -> + redact( + maps:merge( + RawConf#{ + type => Type, + name => maps:get(<<"name">>, RawConf, ConnectorName), + node => Node + }, + format_resource_data(ResourceData) + ) + ). + +format_resource_data(ResData) -> + maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)). + +format_resource_data(error, undefined, Result) -> + Result; +format_resource_data(error, Error, Result) -> + Result#{status_reason => emqx_utils:readable_error_msg(Error)}; +format_resource_data(K, V, Result) -> + Result#{K => V}. + +format_metrics(#{ + counters := #{ + 'dropped' := Dropped, + 'dropped.other' := DroppedOther, + 'dropped.expired' := DroppedExpired, + 'dropped.queue_full' := DroppedQueueFull, + 'dropped.resource_not_found' := DroppedResourceNotFound, + 'dropped.resource_stopped' := DroppedResourceStopped, + 'matched' := Matched, + 'retried' := Retried, + 'late_reply' := LateReply, + 'failed' := SentFailed, + 'success' := SentSucc, + 'received' := Rcvd + }, + gauges := Gauges, + rate := #{ + matched := #{current := Rate, last5m := Rate5m, max := RateMax} + } +}) -> + Queued = maps:get('queuing', Gauges, 0), + SentInflight = maps:get('inflight', Gauges, 0), + ?METRICS( + Dropped, + DroppedOther, + DroppedExpired, + DroppedQueueFull, + DroppedResourceNotFound, + DroppedResourceStopped, + Matched, + Queued, + Retried, + LateReply, + SentFailed, + SentInflight, + SentSucc, + Rate, + Rate5m, + RateMax, + Rcvd + ); +format_metrics(_Metrics) -> + %% Empty metrics: can happen when a node joins another and a + %% connector is not yet replicated to it, so the counters map is + %% empty. + ?METRICS( + _Dropped = 0, + _DroppedOther = 0, + _DroppedExpired = 0, + _DroppedQueueFull = 0, + _DroppedResourceNotFound = 0, + _DroppedResourceStopped = 0, + _Matched = 0, + _Queued = 0, + _Retried = 0, + _LateReply = 0, + _SentFailed = 0, + _SentInflight = 0, + _SentSucc = 0, + _Rate = 0, + _Rate5m = 0, + _RateMax = 0, + _Rcvd = 0 + ). + +fill_defaults(Type, RawConf) -> + PackedConf = pack_connector_conf(Type, RawConf), + FullConf = emqx_config:fill_defaults(emqx_connector_schema, PackedConf, #{}), + unpack_connector_conf(Type, FullConf). + +pack_connector_conf(Type, RawConf) -> + #{<<"connectors">> => #{bin(Type) => #{<<"foo">> => RawConf}}}. + +filter_raw_conf(_TypeBin, RawConf) -> + RawConf. + +unpack_connector_conf(Type, PackedConf) -> + TypeBin = bin(Type), + #{<<"connectors">> := Connectors} = PackedConf, + #{<<"foo">> := RawConf} = maps:get(TypeBin, Connectors), + filter_raw_conf(TypeBin, RawConf). + +is_ok(ok) -> + ok; +is_ok(OkResult = {ok, _}) -> + OkResult; +is_ok(Error = {error, _}) -> + Error; +is_ok(ResL) -> + case + lists:filter( + fun + ({ok, _}) -> false; + (ok) -> false; + (_) -> true + end, + ResL + ) + of + [] -> {ok, [Res || {ok, Res} <- ResL]}; + ErrL -> hd(ErrL) + end. + +filter_out_request_body(Conf) -> + ExtraConfs = [ + <<"id">>, + <<"type">>, + <<"name">>, + <<"status">>, + <<"status_reason">>, + <<"node_status">>, + <<"node_metrics">>, + <<"metrics">>, + <<"node">> + ], + maps:without(ExtraConfs, Conf). + +bin(S) when is_list(S) -> + list_to_binary(S); +bin(S) when is_atom(S) -> + atom_to_binary(S, utf8); +bin(S) when is_binary(S) -> + S. + +call_operation(NodeOrAll, OperFunc, Args = [_Nodes, ConnectorType, ConnectorName]) -> + case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of + Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok -> + ?NO_CONTENT; + {error, not_implemented} -> + %% Should only happen if we call `start` on a node that is + %% still on an older bpapi version that doesn't support it. + maybe_try_restart(NodeOrAll, OperFunc, Args); + {error, timeout} -> + ?BAD_REQUEST(<<"Request timeout">>); + {error, {start_pool_failed, Name, Reason}} -> + Msg = bin( + io_lib:format("Failed to start ~p pool for reason ~p", [Name, redact(Reason)]) + ), + ?BAD_REQUEST(Msg); + {error, not_found} -> + ConnectorId = emqx_connector_resource:connector_id(ConnectorType, ConnectorName), + ?SLOG(warning, #{ + msg => "connector_inconsistent_in_cluster_for_call_operation", + reason => not_found, + type => ConnectorType, + name => ConnectorName, + connector => ConnectorId + }), + ?SERVICE_UNAVAILABLE(<<"Connector not found on remote node: ", ConnectorId/binary>>); + {error, {node_not_found, Node}} -> + ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>); + {error, {unhealthy_target, Message}} -> + ?BAD_REQUEST(Message); + {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> + ?BAD_REQUEST(redact(Reason)) + end. + +maybe_try_restart(all, start_connectors_to_all_nodes, Args) -> + call_operation(all, restart_connectors_to_all_nodes, Args); +maybe_try_restart(Node, start_connector_to_node, Args) -> + call_operation(Node, restart_connector_to_node, Args); +maybe_try_restart(_, _, _) -> + ?NOT_IMPLEMENTED. + +do_bpapi_call(all, Call, Args) -> + maybe_unwrap( + do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_connector), Call, Args) + ); +do_bpapi_call(Node, Call, Args) -> + case lists:member(Node, mria:running_nodes()) of + true -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_connector), Call, Args); + false -> + {error, {node_not_found, Node}} + end. + +do_bpapi_call_vsn(SupportedVersion, Call, Args) -> + case lists:member(SupportedVersion, supported_versions(Call)) of + true -> + apply(emqx_connector_proto_v4, Call, Args); + false -> + {error, not_implemented} + end. + +maybe_unwrap({error, not_implemented}) -> + {error, not_implemented}; +maybe_unwrap(RpcMulticallResult) -> + emqx_rpc:unwrap_erpc(RpcMulticallResult). + +supported_versions(start_connector_to_node) -> [2, 3, 4]; +supported_versions(start_connectors_to_all_nodes) -> [2, 3, 4]; +supported_versions(get_metrics_from_all_nodes) -> [4]; +supported_versions(_Call) -> [1, 2, 3, 4]. + +redact(Term) -> + emqx_utils:redact(Term). + +deobfuscate(NewConf, OldConf) -> + maps:fold( + fun(K, V, Acc) -> + case maps:find(K, OldConf) of + error -> + Acc#{K => V}; + {ok, OldV} when is_map(V), is_map(OldV) -> + Acc#{K => deobfuscate(V, OldV)}; + {ok, OldV} -> + case emqx_utils:is_redacted(K, V) of + true -> + Acc#{K => OldV}; + _ -> + Acc#{K => V} + end + end + end, + #{}, + NewConf + ). + +map_to_json(M0) -> + %% When dealing with Hocon validation errors, `value' might contain non-serializable + %% values (e.g.: user_lookup_fun), so we try again without that key if serialization + %% fails as a best effort. + M1 = emqx_utils_maps:jsonable_map(M0, fun(K, V) -> {K, emqx_utils_maps:binary_string(V)} end), + try + emqx_utils_json:encode(M1) + catch + error:_ -> + M2 = maps:without([value, <<"value">>], M1), + emqx_utils_json:encode(M2) + end. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 0bfd7e602..d812203d5 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -13,9 +13,20 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - fields/1 + api_schemas/1, + fields/1, + examples/1 ]). +resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); +resource_type(kafka) -> emqx_bridge_kafka_impl_producer. + +%% For connectors that need to override connector configurations. +connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> + connector_impl_module(binary_to_atom(ConnectorType, utf8)); +connector_impl_module(_ConnectorType) -> + undefined. + fields(connectors) -> kafka_structs(). @@ -31,14 +42,34 @@ kafka_structs() -> )} ]. -resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); -resource_type(kafka) -> emqx_bridge_kafka_impl_producer. +examples(Method) -> + MergeFun = + fun(Example, Examples) -> + maps:merge(Examples, Example) + end, + Fun = + fun(Module, Examples) -> + ConnectorExamples = erlang:apply(Module, conn_bridge_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) + end, + lists:foldl(Fun, #{}, schema_modules()). -%% For connectors that need to override connector configurations. -connector_impl_module(ConnectorType) when is_binary(ConnectorType) -> - connector_impl_module(binary_to_atom(ConnectorType, utf8)); -connector_impl_module(_ConnectorType) -> - undefined. +schema_modules() -> + [ + emqx_bridge_kafka + ]. + +api_schemas(Method) -> + [ + %% We need to map the `type' field of a request (binary) to a + %% connector schema module. + %% TODO: rename this to `kafka_producer' after alias support is added + %% to hocon; keeping this as just `kafka' for backwards compatibility. + api_ref(emqx_bridge_kafka, <<"kafka">>, Method ++ "_producer") + ]. + +api_ref(Module, Type, Method) -> + {Type, ref(Module, Method)}. -else. diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index af466d8b7..a80ada754 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -26,6 +26,14 @@ -export([roots/0, fields/1, desc/1, namespace/0, tags/0]). -if(?EMQX_RELEASE_EDITION == ee). +enterprise_api_schemas(Method) -> + %% We *must* do this to ensure the module is really loaded, especially when we use + %% `call_hocon' from `nodetool' to generate initial configurations. + _ = emqx_connector_ee_schema:module_info(), + case erlang:function_exported(emqx_connector_ee_schema, api_schemas, 1) of + true -> emqx_connector_ee_schema:api_schemas(Method); + false -> [] + end. enterprise_fields_connectors() -> %% We *must* do this to ensure the module is really loaded, especially when we use @@ -199,6 +207,53 @@ transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) -> %% HOCON Schema Callbacks %%====================================================================================== +%% For HTTP APIs +get_response() -> + api_schema("get"). + +put_request() -> + api_schema("put"). + +post_request() -> + api_schema("post"). + +api_schema(Method) -> + Broker = [ + {Type, ref(Mod, Method)} + || {Type, Mod} <- [ + {<<"webhook">>, emqx_bridge_http_schema}, + {<<"mqtt">>, emqx_bridge_mqtt_schema} + ] + ], + EE = enterprise_api_schemas(Method), + hoconsc:union(bridge_api_union(Broker ++ EE)). + +bridge_api_union(Refs) -> + Index = maps:from_list(Refs), + fun + (all_union_members) -> + maps:values(Index); + ({value, V}) -> + case V of + #{<<"type">> := T} -> + case maps:get(T, Index, undefined) of + undefined -> + throw(#{ + field_name => type, + reason => <<"unknown bridge type">> + }); + Ref -> + [Ref] + end; + _ -> + throw(#{ + field_name => type, + reason => <<"unknown bridge type">> + }) + end + end. + +%% general config namespace() -> "connector". tags() ->