From 29683072a11763fb978e42fc9e3a00fdf4cc2e6e Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 1 Nov 2023 09:18:50 +0100 Subject: [PATCH] fix(emqx_connector): remove `stop` and `restart` operations --- .../emqx_connector/src/emqx_connector_api.erl | 14 +-- .../src/proto/emqx_connector_proto_v1.erl | 66 ++----------- .../test/emqx_connector_api_SUITE.erl | 94 ++++++------------- 3 files changed, 45 insertions(+), 129 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index a21a490af..52466e9da 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -97,7 +97,7 @@ get_response_body_schema() -> param_path_operation_cluster() -> {operation, mk( - enum([start, stop, restart]), + enum([start]), #{ in => path, required => true, @@ -109,7 +109,7 @@ param_path_operation_cluster() -> param_path_operation_on_node() -> {operation, mk( - enum([start, stop, restart]), + enum([start]), #{ in => path, required => true, @@ -266,7 +266,7 @@ schema("/connectors/:id/:operation") -> 'operationId' => '/connectors/:id/:operation', post => #{ tags => [<<"connectors">>], - summary => <<"Stop, start or restart connector">>, + summary => <<"Manually start a connector">>, description => ?DESC("desc_api7"), parameters => [ param_path_id(), @@ -288,7 +288,7 @@ schema("/nodes/:node/connectors/:id/:operation") -> 'operationId' => '/nodes/:node/connectors/:id/:operation', post => #{ tags => [<<"connectors">>], - summary => <<"Stop, start or restart connector">>, + summary => <<"Manually start a connector for a given node">>, description => ?DESC("desc_api8"), parameters => [ param_path_node(), @@ -531,12 +531,8 @@ is_enabled_connector(ConnectorType, ConnectorName) -> throw(not_found) end. -operation_func(all, restart) -> restart_connectors_to_all_nodes; operation_func(all, start) -> start_connectors_to_all_nodes; -operation_func(all, stop) -> stop_connectors_to_all_nodes; -operation_func(_Node, restart) -> restart_connector_to_node; -operation_func(_Node, start) -> start_connector_to_node; -operation_func(_Node, stop) -> stop_connector_to_node. +operation_func(_Node, start) -> start_connector_to_node. enable_func(true) -> enable; enable_func(false) -> disable. diff --git a/apps/emqx_connector/src/proto/emqx_connector_proto_v1.erl b/apps/emqx_connector/src/proto/emqx_connector_proto_v1.erl index df4d0825b..0cfb831e8 100644 --- a/apps/emqx_connector/src/proto/emqx_connector_proto_v1.erl +++ b/apps/emqx_connector/src/proto/emqx_connector_proto_v1.erl @@ -22,13 +22,9 @@ introduced_in/0, list_connectors_on_nodes/1, - restart_connector_to_node/3, - start_connector_to_node/3, - stop_connector_to_node/3, lookup_from_all_nodes/3, - restart_connectors_to_all_nodes/3, - start_connectors_to_all_nodes/3, - stop_connectors_to_all_nodes/3 + start_connector_to_node/3, + start_connectors_to_all_nodes/3 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -45,13 +41,13 @@ list_connectors_on_nodes(Nodes) -> -type key() :: atom() | binary() | [byte()]. --spec restart_connector_to_node(node(), key(), key()) -> - term(). -restart_connector_to_node(Node, ConnectorType, ConnectorName) -> - rpc:call( - Node, - emqx_connector_resource, - restart, +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName) -> + erpc:multicall( + Nodes, + emqx_connector_api, + lookup_from_local_node, [ConnectorType, ConnectorName], ?TIMEOUT ). @@ -67,28 +63,6 @@ start_connector_to_node(Node, ConnectorType, ConnectorName) -> ?TIMEOUT ). --spec stop_connector_to_node(node(), key(), key()) -> - term(). -stop_connector_to_node(Node, ConnectorType, ConnectorName) -> - rpc:call( - Node, - emqx_connector_resource, - stop, - [ConnectorType, ConnectorName], - ?TIMEOUT - ). - --spec restart_connectors_to_all_nodes([node()], key(), key()) -> - emqx_rpc:erpc_multicall(). -restart_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) -> - erpc:multicall( - Nodes, - emqx_connector_resource, - restart, - [ConnectorType, ConnectorName], - ?TIMEOUT - ). - -spec start_connectors_to_all_nodes([node()], key(), key()) -> emqx_rpc:erpc_multicall(). start_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) -> @@ -99,25 +73,3 @@ start_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) -> [ConnectorType, ConnectorName], ?TIMEOUT ). - --spec stop_connectors_to_all_nodes([node()], key(), key()) -> - emqx_rpc:erpc_multicall(). -stop_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) -> - erpc:multicall( - Nodes, - emqx_connector_resource, - stop, - [ConnectorType, ConnectorName], - ?TIMEOUT - ). - --spec lookup_from_all_nodes([node()], key(), key()) -> - emqx_rpc:erpc_multicall(). -lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName) -> - erpc:multicall( - Nodes, - emqx_connector_api, - lookup_from_local_node, - [ConnectorType, ConnectorName], - ?TIMEOUT - ). diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 5b7879eb4..09f45bed6 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -396,13 +396,13 @@ t_start_connector_unknown_node(Config) -> Config ). -t_start_stop_connectors_node(Config) -> - do_start_stop_connectors(node, Config). +t_start_connector_node(Config) -> + do_start_connector(node, Config). -t_start_stop_connectors_cluster(Config) -> - do_start_stop_connectors(cluster, Config). +t_start_connector_cluster(Config) -> + do_start_connector(cluster, Config). -do_start_stop_connectors(TestType, Config) -> +do_start_connector(TestType, Config) -> %% assert we there's no connectors at first {ok, 200, []} = request_json(get, uri(["connectors"]), Config), @@ -424,6 +424,14 @@ do_start_stop_connectors(TestType, Config) -> ), ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), + + %% Starting a healthy connector shouldn't do any harm + {ok, 204, <<>>} = request(post, {operation, TestType, start, ConnectorID}, Config), + ?assertMatch( + {ok, 200, #{<<"status">> := <<"connected">>}}, + request_json(get, uri(["connectors", ConnectorID]), Config) + ), + ExpectedStatus = case ?config(group, Config) of cluster when TestType == node -> @@ -433,7 +441,23 @@ do_start_stop_connectors(TestType, Config) -> end, %% stop it - {ok, 204, <<>>} = request(post, {operation, TestType, stop, ConnectorID}, Config), + case ?config(group, Config) of + cluster -> + case TestType of + node -> + Node = ?config(node, Config), + ok = rpc:call( + Node, emqx_connector_resource, stop, [?CONNECTOR_TYPE, Name], 500 + ); + cluster -> + Nodes = ?config(cluster_nodes, Config), + [{ok, ok}, {ok, ok}] = erpc:multicall( + Nodes, emqx_connector_resource, stop, [?CONNECTOR_TYPE, Name], 500 + ) + end; + _ -> + ok = emqx_connector_resource:stop(?CONNECTOR_TYPE, Name) + end, ?assertMatch( {ok, 200, #{<<"status">> := ExpectedStatus}}, request_json(get, uri(["connectors", ConnectorID]), Config) @@ -444,27 +468,8 @@ do_start_stop_connectors(TestType, Config) -> {ok, 200, #{<<"status">> := <<"connected">>}}, request_json(get, uri(["connectors", ConnectorID]), Config) ), - %% start a started connector - {ok, 204, <<>>} = request(post, {operation, TestType, start, ConnectorID}, Config), - ?assertMatch( - {ok, 200, #{<<"status">> := <<"connected">>}}, - request_json(get, uri(["connectors", ConnectorID]), Config) - ), - %% restart an already started connector - {ok, 204, <<>>} = request(post, {operation, TestType, restart, ConnectorID}, Config), - ?assertMatch( - {ok, 200, #{<<"status">> := <<"connected">>}}, - request_json(get, uri(["connectors", ConnectorID]), Config) - ), - %% stop it again - {ok, 204, <<>>} = request(post, {operation, TestType, stop, ConnectorID}, Config), - %% restart a stopped connector - {ok, 204, <<>>} = request(post, {operation, TestType, restart, ConnectorID}, Config), - ?assertMatch( - {ok, 200, #{<<"status">> := <<"connected">>}}, - request_json(get, uri(["connectors", ConnectorID]), Config) - ), + %% test invalid op {ok, 400, _} = request(post, {operation, TestType, invalidop, ConnectorID}, Config), %% delete the connector @@ -506,43 +511,6 @@ do_start_stop_connectors(TestType, Config) -> ok = gen_tcp:close(Sock), ok. -t_start_stop_inconsistent_connector_node(Config) -> - start_stop_inconsistent_connector(node, Config). - -t_start_stop_inconsistent_connector_cluster(Config) -> - start_stop_inconsistent_connector(cluster, Config). - -start_stop_inconsistent_connector(Type, Config) -> - Node = ?config(node, Config), - - erpc:call(Node, fun() -> - meck:new(emqx_connector_resource, [passthrough, no_link]), - meck:expect( - emqx_connector_resource, - stop, - fun - (_, <<"connector_not_found">>) -> {error, not_found}; - (ConnectorType, Name) -> meck:passthrough([ConnectorType, Name]) - end - ) - end), - - emqx_common_test_helpers:on_exit(fun() -> - erpc:call(Node, fun() -> - meck:unload([emqx_connector_resource]) - end) - end), - - {ok, 201, _Connector} = request( - post, - uri(["connectors"]), - ?KAFKA_CONNECTOR(<<"connector_not_found">>), - Config - ), - {ok, 503, _} = request( - post, {operation, Type, stop, <<"kafka_producer:connector_not_found">>}, Config - ). - t_enable_disable_connectors(Config) -> %% assert we there's no connectors at first {ok, 200, []} = request_json(get, uri(["connectors"]), Config),