fix: cleanup start/stop/restart operations

This commit is contained in:
Stefan Strigler 2023-10-12 15:20:28 +02:00 committed by Zaiming (Stone) Shi
parent 1816b450f0
commit 671b5306cd
1 changed files with 10 additions and 16 deletions

View File

@ -266,7 +266,7 @@ schema("/connectors/:id/:operation") ->
'operationId' => '/connectors/:id/:operation',
post => #{
tags => [<<"connectors">>],
summary => <<"Stop or restart connector">>,
summary => <<"Stop, start or restart connector">>,
description => ?DESC("desc_api7"),
parameters => [
param_path_id(),
@ -288,7 +288,7 @@ schema("/nodes/:node/connectors/:id/:operation") ->
'operationId' => '/nodes/:node/connectors/:id/:operation',
post => #{
tags => [<<"connectors">>],
summary => <<"Stop/restart connector">>,
summary => <<"Stop, start or restart connector">>,
description => ?DESC("desc_api8"),
parameters => [
param_path_node(),
@ -694,9 +694,7 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, ConnectorType, ConnectorName
Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
?NO_CONTENT;
{error, not_implemented} ->
%% Should only happen if we call `start` on a node that is
%% still on an older bpapi version that doesn't support it.
maybe_try_restart(NodeOrAll, OperFunc, Args);
?NOT_IMPLEMENTED;
{error, timeout} ->
?BAD_REQUEST(<<"Request timeout">>);
{error, {start_pool_failed, Name, Reason}} ->
@ -722,13 +720,6 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, ConnectorType, ConnectorName
?BAD_REQUEST(redact(Reason))
end.
maybe_try_restart(all, start_connectors_to_all_nodes, Args) ->
call_operation(all, restart_connectors_to_all_nodes, Args);
maybe_try_restart(Node, start_connector_to_node, Args) ->
call_operation(Node, restart_connector_to_node, Args);
maybe_try_restart(_, _, _) ->
?NOT_IMPLEMENTED.
do_bpapi_call(all, Call, Args) ->
maybe_unwrap(
do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_connector), Call, Args)
@ -741,21 +732,24 @@ do_bpapi_call(Node, Call, Args) ->
{error, {node_not_found, Node}}
end.
do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
case lists:member(SupportedVersion, supported_versions(Call)) of
do_bpapi_call_vsn(Version, Call, Args) ->
case is_supported_version(Version, Call) of
true ->
apply(emqx_connector_proto_v1, Call, Args);
false ->
{error, not_implemented}
end.
is_supported_version(Version, Call) ->
lists:member(Version, supported_versions(Call)).
supported_versions(_Call) -> [1].
maybe_unwrap({error, not_implemented}) ->
{error, not_implemented};
maybe_unwrap(RpcMulticallResult) ->
emqx_rpc:unwrap_erpc(RpcMulticallResult).
supported_versions(_Call) -> [1].
redact(Term) ->
emqx_utils:redact(Term).