fix(emqx_connector): remove `stop` and `restart` operations

This commit is contained in:
Stefan Strigler 2023-11-01 09:18:50 +01:00
parent eb723489d7
commit 29683072a1
3 changed files with 45 additions and 129 deletions

View File

@ -97,7 +97,7 @@ get_response_body_schema() ->
param_path_operation_cluster() -> param_path_operation_cluster() ->
{operation, {operation,
mk( mk(
enum([start, stop, restart]), enum([start]),
#{ #{
in => path, in => path,
required => true, required => true,
@ -109,7 +109,7 @@ param_path_operation_cluster() ->
param_path_operation_on_node() -> param_path_operation_on_node() ->
{operation, {operation,
mk( mk(
enum([start, stop, restart]), enum([start]),
#{ #{
in => path, in => path,
required => true, required => true,
@ -266,7 +266,7 @@ schema("/connectors/:id/:operation") ->
'operationId' => '/connectors/:id/:operation', 'operationId' => '/connectors/:id/:operation',
post => #{ post => #{
tags => [<<"connectors">>], tags => [<<"connectors">>],
summary => <<"Stop, start or restart connector">>, summary => <<"Manually start a connector">>,
description => ?DESC("desc_api7"), description => ?DESC("desc_api7"),
parameters => [ parameters => [
param_path_id(), param_path_id(),
@ -288,7 +288,7 @@ schema("/nodes/:node/connectors/:id/:operation") ->
'operationId' => '/nodes/:node/connectors/:id/:operation', 'operationId' => '/nodes/:node/connectors/:id/:operation',
post => #{ post => #{
tags => [<<"connectors">>], tags => [<<"connectors">>],
summary => <<"Stop, start or restart connector">>, summary => <<"Manually start a connector for a given node">>,
description => ?DESC("desc_api8"), description => ?DESC("desc_api8"),
parameters => [ parameters => [
param_path_node(), param_path_node(),
@ -531,12 +531,8 @@ is_enabled_connector(ConnectorType, ConnectorName) ->
throw(not_found) throw(not_found)
end. end.
operation_func(all, restart) -> restart_connectors_to_all_nodes;
operation_func(all, start) -> start_connectors_to_all_nodes; operation_func(all, start) -> start_connectors_to_all_nodes;
operation_func(all, stop) -> stop_connectors_to_all_nodes; operation_func(_Node, start) -> start_connector_to_node.
operation_func(_Node, restart) -> restart_connector_to_node;
operation_func(_Node, start) -> start_connector_to_node;
operation_func(_Node, stop) -> stop_connector_to_node.
enable_func(true) -> enable; enable_func(true) -> enable;
enable_func(false) -> disable. enable_func(false) -> disable.

View File

@ -22,13 +22,9 @@
introduced_in/0, introduced_in/0,
list_connectors_on_nodes/1, list_connectors_on_nodes/1,
restart_connector_to_node/3,
start_connector_to_node/3,
stop_connector_to_node/3,
lookup_from_all_nodes/3, lookup_from_all_nodes/3,
restart_connectors_to_all_nodes/3, start_connector_to_node/3,
start_connectors_to_all_nodes/3, start_connectors_to_all_nodes/3
stop_connectors_to_all_nodes/3
]). ]).
-include_lib("emqx/include/bpapi.hrl"). -include_lib("emqx/include/bpapi.hrl").
@ -45,13 +41,13 @@ list_connectors_on_nodes(Nodes) ->
-type key() :: atom() | binary() | [byte()]. -type key() :: atom() | binary() | [byte()].
-spec restart_connector_to_node(node(), key(), key()) -> -spec lookup_from_all_nodes([node()], key(), key()) ->
term(). emqx_rpc:erpc_multicall().
restart_connector_to_node(Node, ConnectorType, ConnectorName) -> lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName) ->
rpc:call( erpc:multicall(
Node, Nodes,
emqx_connector_resource, emqx_connector_api,
restart, lookup_from_local_node,
[ConnectorType, ConnectorName], [ConnectorType, ConnectorName],
?TIMEOUT ?TIMEOUT
). ).
@ -67,28 +63,6 @@ start_connector_to_node(Node, ConnectorType, ConnectorName) ->
?TIMEOUT ?TIMEOUT
). ).
-spec stop_connector_to_node(node(), key(), key()) ->
term().
stop_connector_to_node(Node, ConnectorType, ConnectorName) ->
rpc:call(
Node,
emqx_connector_resource,
stop,
[ConnectorType, ConnectorName],
?TIMEOUT
).
-spec restart_connectors_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
restart_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) ->
erpc:multicall(
Nodes,
emqx_connector_resource,
restart,
[ConnectorType, ConnectorName],
?TIMEOUT
).
-spec start_connectors_to_all_nodes([node()], key(), key()) -> -spec start_connectors_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall(). emqx_rpc:erpc_multicall().
start_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) -> start_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) ->
@ -99,25 +73,3 @@ start_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) ->
[ConnectorType, ConnectorName], [ConnectorType, ConnectorName],
?TIMEOUT ?TIMEOUT
). ).
-spec stop_connectors_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
stop_connectors_to_all_nodes(Nodes, ConnectorType, ConnectorName) ->
erpc:multicall(
Nodes,
emqx_connector_resource,
stop,
[ConnectorType, ConnectorName],
?TIMEOUT
).
-spec lookup_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, ConnectorType, ConnectorName) ->
erpc:multicall(
Nodes,
emqx_connector_api,
lookup_from_local_node,
[ConnectorType, ConnectorName],
?TIMEOUT
).

View File

@ -396,13 +396,13 @@ t_start_connector_unknown_node(Config) ->
Config Config
). ).
t_start_stop_connectors_node(Config) -> t_start_connector_node(Config) ->
do_start_stop_connectors(node, Config). do_start_connector(node, Config).
t_start_stop_connectors_cluster(Config) -> t_start_connector_cluster(Config) ->
do_start_stop_connectors(cluster, Config). do_start_connector(cluster, Config).
do_start_stop_connectors(TestType, Config) -> do_start_connector(TestType, Config) ->
%% assert we there's no connectors at first %% assert we there's no connectors at first
{ok, 200, []} = request_json(get, uri(["connectors"]), Config), {ok, 200, []} = request_json(get, uri(["connectors"]), Config),
@ -424,6 +424,14 @@ do_start_stop_connectors(TestType, Config) ->
), ),
ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name), ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, Name),
%% Starting a healthy connector shouldn't do any harm
{ok, 204, <<>>} = request(post, {operation, TestType, start, ConnectorID}, Config),
?assertMatch(
{ok, 200, #{<<"status">> := <<"connected">>}},
request_json(get, uri(["connectors", ConnectorID]), Config)
),
ExpectedStatus = ExpectedStatus =
case ?config(group, Config) of case ?config(group, Config) of
cluster when TestType == node -> cluster when TestType == node ->
@ -433,7 +441,23 @@ do_start_stop_connectors(TestType, Config) ->
end, end,
%% stop it %% stop it
{ok, 204, <<>>} = request(post, {operation, TestType, stop, ConnectorID}, Config), case ?config(group, Config) of
cluster ->
case TestType of
node ->
Node = ?config(node, Config),
ok = rpc:call(
Node, emqx_connector_resource, stop, [?CONNECTOR_TYPE, Name], 500
);
cluster ->
Nodes = ?config(cluster_nodes, Config),
[{ok, ok}, {ok, ok}] = erpc:multicall(
Nodes, emqx_connector_resource, stop, [?CONNECTOR_TYPE, Name], 500
)
end;
_ ->
ok = emqx_connector_resource:stop(?CONNECTOR_TYPE, Name)
end,
?assertMatch( ?assertMatch(
{ok, 200, #{<<"status">> := ExpectedStatus}}, {ok, 200, #{<<"status">> := ExpectedStatus}},
request_json(get, uri(["connectors", ConnectorID]), Config) request_json(get, uri(["connectors", ConnectorID]), Config)
@ -444,27 +468,8 @@ do_start_stop_connectors(TestType, Config) ->
{ok, 200, #{<<"status">> := <<"connected">>}}, {ok, 200, #{<<"status">> := <<"connected">>}},
request_json(get, uri(["connectors", ConnectorID]), Config) request_json(get, uri(["connectors", ConnectorID]), Config)
), ),
%% start a started connector
{ok, 204, <<>>} = request(post, {operation, TestType, start, ConnectorID}, Config),
?assertMatch(
{ok, 200, #{<<"status">> := <<"connected">>}},
request_json(get, uri(["connectors", ConnectorID]), Config)
),
%% restart an already started connector
{ok, 204, <<>>} = request(post, {operation, TestType, restart, ConnectorID}, Config),
?assertMatch(
{ok, 200, #{<<"status">> := <<"connected">>}},
request_json(get, uri(["connectors", ConnectorID]), Config)
),
%% stop it again
{ok, 204, <<>>} = request(post, {operation, TestType, stop, ConnectorID}, Config),
%% restart a stopped connector
{ok, 204, <<>>} = request(post, {operation, TestType, restart, ConnectorID}, Config),
?assertMatch(
{ok, 200, #{<<"status">> := <<"connected">>}},
request_json(get, uri(["connectors", ConnectorID]), Config)
),
%% test invalid op
{ok, 400, _} = request(post, {operation, TestType, invalidop, ConnectorID}, Config), {ok, 400, _} = request(post, {operation, TestType, invalidop, ConnectorID}, Config),
%% delete the connector %% delete the connector
@ -506,43 +511,6 @@ do_start_stop_connectors(TestType, Config) ->
ok = gen_tcp:close(Sock), ok = gen_tcp:close(Sock),
ok. ok.
t_start_stop_inconsistent_connector_node(Config) ->
start_stop_inconsistent_connector(node, Config).
t_start_stop_inconsistent_connector_cluster(Config) ->
start_stop_inconsistent_connector(cluster, Config).
start_stop_inconsistent_connector(Type, Config) ->
Node = ?config(node, Config),
erpc:call(Node, fun() ->
meck:new(emqx_connector_resource, [passthrough, no_link]),
meck:expect(
emqx_connector_resource,
stop,
fun
(_, <<"connector_not_found">>) -> {error, not_found};
(ConnectorType, Name) -> meck:passthrough([ConnectorType, Name])
end
)
end),
emqx_common_test_helpers:on_exit(fun() ->
erpc:call(Node, fun() ->
meck:unload([emqx_connector_resource])
end)
end),
{ok, 201, _Connector} = request(
post,
uri(["connectors"]),
?KAFKA_CONNECTOR(<<"connector_not_found">>),
Config
),
{ok, 503, _} = request(
post, {operation, Type, stop, <<"kafka_producer:connector_not_found">>}, Config
).
t_enable_disable_connectors(Config) -> t_enable_disable_connectors(Config) ->
%% assert we there's no connectors at first %% assert we there's no connectors at first
{ok, 200, []} = request_json(get, uri(["connectors"]), Config), {ok, 200, []} = request_json(get, uri(["connectors"]), Config),