diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 2a8b7ba23..ae1e74269 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -69,7 +69,7 @@ namespace() -> "connector". api_spec() -> - emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). + emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). paths() -> [ @@ -347,11 +347,7 @@ schema("/connectors_probe") -> ?OK(zip_connectors(AllConnectors)); {error, Reason} -> ?INTERNAL_ERROR(Reason) - end; -'/connectors'(post, _Params) -> - ?BAD_REQUEST(<<"Bad Request">>); -'/connectors'(_, _) -> - ?METHOD_NOT_ALLOWED. + end. '/connectors/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, lookup_from_all_nodes(ConnectorType, ConnectorName, 200)); @@ -389,9 +385,7 @@ schema("/connectors_probe") -> {error, not_found} -> ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) end - ); -'/connectors/:id'(_, _) -> - ?METHOD_NOT_ALLOWED. + ). '/connectors_probe'(post, Request) -> RequestMeta = #{module => ?MODULE, method => post, path => "/connectors_probe"}, @@ -417,9 +411,7 @@ schema("/connectors_probe") -> end; BadRequest -> redact(BadRequest) - end; -'/connectors_probe'(_, _) -> - ?METHOD_NOT_ALLOWED. + end. maybe_deobfuscate_connector_probe( #{<<"type">> := ConnectorType, <<"name">> := ConnectorName} = Params @@ -471,22 +463,17 @@ create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) - '/connectors/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> ?TRY_PARSE_ID( Id, - case enable_func(Enable) of - invalid -> - ?NOT_FOUND(<<"Invalid operation">>); - OperFunc -> - case emqx_connector:disable_enable(OperFunc, ConnectorType, ConnectorName) of - {ok, _} -> - ?NO_CONTENT; - {error, {pre_config_update, _, connector_not_found}} -> - ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName); - {error, {_, _, timeout}} -> - ?SERVICE_UNAVAILABLE(<<"request timeout">>); - {error, timeout} -> - ?SERVICE_UNAVAILABLE(<<"request timeout">>); - {error, Reason} -> - ?INTERNAL_ERROR(Reason) - end + case emqx_connector:disable_enable(enable_func(Enable), ConnectorType, ConnectorName) of + {ok, _} -> + ?NO_CONTENT; + {error, {pre_config_update, _, connector_not_found}} -> + ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName); + {error, {_, _, timeout}} -> + ?SERVICE_UNAVAILABLE(<<"request timeout">>); + {error, timeout} -> + ?SERVICE_UNAVAILABLE(<<"request timeout">>); + {error, Reason} -> + ?INTERNAL_ERROR(Reason) end ). @@ -496,20 +483,10 @@ create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) - }) -> ?TRY_PARSE_ID( Id, - case operation_to_all_func(Op) of - invalid -> - ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); - OperFunc -> - try is_enabled_connector(ConnectorType, ConnectorName) of - false -> - ?CONNECTOR_NOT_ENABLED; - true -> - Nodes = mria:running_nodes(), - call_operation(all, OperFunc, [Nodes, ConnectorType, ConnectorName]) - catch - throw:not_found -> - ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) - end + begin + OperFunc = operation_func(all, Op), + Nodes = mria:running_nodes(), + call_operation_if_enabled(all, OperFunc, [Nodes, ConnectorType, ConnectorName]) end ). @@ -519,29 +496,28 @@ create_or_update_connector(ConnectorType, ConnectorName, Conf, HttpStatusCode) - }) -> ?TRY_PARSE_ID( Id, - case node_operation_func(Op) of - invalid -> - ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); - OperFunc -> - try is_enabled_connector(ConnectorType, ConnectorName) of - false -> - ?CONNECTOR_NOT_ENABLED; - true -> - case emqx_utils:safe_to_existing_atom(Node, utf8) of - {ok, TargetNode} -> - call_operation(TargetNode, OperFunc, [ - TargetNode, ConnectorType, ConnectorName - ]); - {error, _} -> - ?NOT_FOUND(<<"Invalid node name: ", Node/binary>>) - end - catch - throw:not_found -> - ?CONNECTOR_NOT_FOUND(ConnectorType, ConnectorName) - end + case emqx_utils:safe_to_existing_atom(Node, utf8) of + {ok, TargetNode} -> + OperFunc = operation_func(TargetNode, Op), + call_operation_if_enabled(TargetNode, OperFunc, [ + TargetNode, ConnectorType, ConnectorName + ]); + {error, _} -> + ?NOT_FOUND(<<"Invalid node name: ", Node/binary>>) end ). +call_operation_if_enabled(NodeOrAll, OperFunc, [Nodes, BridgeType, BridgeName]) -> + try is_enabled_connector(BridgeType, BridgeName) of + false -> + ?CONNECTOR_NOT_ENABLED; + true -> + call_operation(NodeOrAll, OperFunc, [Nodes, BridgeType, BridgeName]) + catch + throw:not_found -> + ?CONNECTOR_NOT_FOUND(BridgeType, BridgeName) + end. + is_enabled_connector(ConnectorType, ConnectorName) -> try emqx:get_config([connectors, ConnectorType, binary_to_existing_atom(ConnectorName)]) of ConfMap -> @@ -555,19 +531,15 @@ is_enabled_connector(ConnectorType, ConnectorName) -> throw(not_found) end. -node_operation_func(<<"restart">>) -> restart_connector_to_node; -node_operation_func(<<"start">>) -> start_connector_to_node; -node_operation_func(<<"stop">>) -> stop_connector_to_node; -node_operation_func(_) -> invalid. +operation_func(all, restart) -> restart_connectors_to_all_nodes; +operation_func(all, start) -> start_connectors_to_all_nodes; +operation_func(all, stop) -> stop_connectors_to_all_nodes; +operation_func(_Node, restart) -> restart_connector_to_node; +operation_func(_Node, start) -> start_connector_to_node; +operation_func(_Node, stop) -> stop_connector_to_node. -operation_to_all_func(<<"restart">>) -> restart_connectors_to_all_nodes; -operation_to_all_func(<<"start">>) -> start_connectors_to_all_nodes; -operation_to_all_func(<<"stop">>) -> stop_connectors_to_all_nodes; -operation_to_all_func(_) -> invalid. - -enable_func(<<"true">>) -> enable; -enable_func(<<"false">>) -> disable; -enable_func(_) -> invalid. +enable_func(true) -> enable; +enable_func(false) -> disable. zip_connectors([ConnectorsFirstNode | _] = ConnectorsAllNodes) -> lists:foldl( diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 560c5cc01..50c83b6fc 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -244,16 +244,14 @@ bridge_api_union(Refs) -> undefined -> throw(#{ field_name => type, + value => T, reason => <<"unknown bridge type">> }); Ref -> [Ref] end; _ -> - throw(#{ - field_name => type, - reason => <<"unknown bridge type">> - }) + maps:values(Index) end end. diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 3490372c0..17fc6a553 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -26,7 +26,6 @@ -define(CONNECTOR_NAME, (atom_to_binary(?FUNCTION_NAME))). -define(CONNECTOR(NAME, TYPE), #{ - <<"enable">> => true, %<<"ssl">> => #{<<"enable">> => false}, <<"type">> => TYPE, <<"name">> => NAME @@ -35,10 +34,11 @@ -define(CONNECTOR_TYPE_STR, "kafka"). -define(CONNECTOR_TYPE, <>). -define(KAFKA_BOOTSTRAP_HOST, <<"127.0.0.1:9092">>). --define(KAFKA_CONNECTOR(Name, BootstrapHosts), ?CONNECTOR(Name, ?CONNECTOR_TYPE)#{ +-define(KAFKA_CONNECTOR_BASE(BootstrapHosts), #{ <<"authentication">> => <<"none">>, <<"bootstrap_hosts">> => BootstrapHosts, <<"connect_timeout">> => <<"5s">>, + <<"enable">> => true, <<"metadata_request_timeout">> => <<"5s">>, <<"min_metadata_refresh_interval">> => <<"3s">>, <<"socket_opts">> => @@ -49,6 +49,9 @@ <<"tcp_keepalive">> => <<"none">> } }). +-define(KAFKA_CONNECTOR_BASE, ?KAFKA_CONNECTOR_BASE(?KAFKA_BOOTSTRAP_HOST)). +-define(KAFKA_CONNECTOR(Name, BootstrapHosts), + ?CONNECTOR(Name, ?CONNECTOR_TYPE)?KAFKA_CONNECTOR_BASE(BootstrapHosts)). -define(KAFKA_CONNECTOR(Name), ?KAFKA_CONNECTOR(Name, ?KAFKA_BOOTSTRAP_HOST)). %% -define(CONNECTOR_TYPE_MQTT, <<"mqtt">>). @@ -284,14 +287,13 @@ t_connectors_lifecycle(Config) -> <<"type">> := ?CONNECTOR_TYPE, <<"name">> := ConnectorName, <<"bootstrap_hosts">> := <<"foobla:1234">>, - <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _] }}, request_json( put, uri(["connectors", ConnectorID]), - ?KAFKA_CONNECTOR(?CONNECTOR_NAME, <<"foobla:1234">>), + ?KAFKA_CONNECTOR_BASE(<<"foobla:1234">>), Config ) ), @@ -323,9 +325,9 @@ t_connectors_lifecycle(Config) -> ), ?assertMatch( - {ok, 404, #{ - <<"code">> := <<"NOT_FOUND">>, - <<"message">> := <<"Invalid operation", _/binary>> + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := _ }}, request_json(post, uri(["connectors", ConnectorID, "brababbel"]), Config) ), @@ -343,7 +345,7 @@ t_connectors_lifecycle(Config) -> request_json( put, uri(["connectors", ConnectorID]), - ?KAFKA_CONNECTOR(?CONNECTOR_NAME), + ?KAFKA_CONNECTOR_BASE, Config ) ), @@ -453,7 +455,7 @@ do_start_stop_connectors(TestType, Config) -> request_json(get, uri(["connectors", ConnectorID]), Config) ), - {ok, 404, _} = request(post, {operation, TestType, invalidop, ConnectorID}, Config), + {ok, 400, _} = request(post, {operation, TestType, invalidop, ConnectorID}, Config), %% delete the connector {ok, 204, <<>>} = request(delete, uri(["connectors", ConnectorID]), Config), @@ -574,7 +576,7 @@ t_enable_disable_connectors(Config) -> {ok, 204, <<>>} = request(put, enable_path(false, ConnectorID), Config), %% bad param - {ok, 404, _} = request(put, enable_path(foo, ConnectorID), Config), + {ok, 400, _} = request(put, enable_path(foo, ConnectorID), Config), {ok, 404, _} = request(put, enable_path(true, "foo"), Config), {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), Config), @@ -619,9 +621,9 @@ t_with_redact_update(Config) -> ), %% update with redacted config - ConnectorConf = emqx_utils:redact(Template), + ConnectorUpdatedConf = maps:without([<<"name">>, <<"type">>], emqx_utils:redact(Template)), ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), - {ok, 200, _} = request(put, uri(["connectors", ConnectorID]), ConnectorConf, Config), + {ok, 200, _} = request(put, uri(["connectors", ConnectorID]), ConnectorUpdatedConf, Config), ?assertEqual( Password, get_raw_config([connectors, ?CONNECTOR_TYPE, Name, authentication, password], Config)