fix(connector api): handle `timeout` when waiting for connector status
Fixes https://emqx.atlassian.net/browse/EMQX-12251
This commit is contained in:
parent
40080f5e7d
commit
802361dbd0
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_connector, [
|
{application, emqx_connector, [
|
||||||
{description, "EMQX Data Integration Connectors"},
|
{description, "EMQX Data Integration Connectors"},
|
||||||
{vsn, "0.3.0"},
|
{vsn, "0.3.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_connector_app, []}},
|
{mod, {emqx_connector_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -685,6 +685,10 @@ is_ok(OkResult = {ok, _}) ->
|
||||||
OkResult;
|
OkResult;
|
||||||
is_ok(Error = {error, _}) ->
|
is_ok(Error = {error, _}) ->
|
||||||
Error;
|
Error;
|
||||||
|
is_ok(timeout) ->
|
||||||
|
%% Returned by `emqx_resource_manager:start' when the connector fails to reach either
|
||||||
|
%% `?status_connected' or `?status_disconnected' within `start_timeout'.
|
||||||
|
timeout;
|
||||||
is_ok(ResL) ->
|
is_ok(ResL) ->
|
||||||
case
|
case
|
||||||
lists:filter(
|
lists:filter(
|
||||||
|
@ -723,6 +727,14 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, ConnectorType, ConnectorName
|
||||||
case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
|
case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
|
||||||
Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
|
Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
|
||||||
?NO_CONTENT;
|
?NO_CONTENT;
|
||||||
|
timeout ->
|
||||||
|
%% Returned by `emqx_resource_manager:start' when the connector fails to reach
|
||||||
|
%% either `?status_connected' or `?status_disconnected' within
|
||||||
|
%% `start_timeout'.
|
||||||
|
?BAD_REQUEST(<<
|
||||||
|
"Timeout while waiting for connector to reach connected status."
|
||||||
|
" Please try again."
|
||||||
|
>>);
|
||||||
{error, not_implemented} ->
|
{error, not_implemented} ->
|
||||||
?NOT_IMPLEMENTED;
|
?NOT_IMPLEMENTED;
|
||||||
{error, timeout} ->
|
{error, timeout} ->
|
||||||
|
|
|
@ -536,14 +536,20 @@ do_start_connector(TestType, Config) ->
|
||||||
request_json(
|
request_json(
|
||||||
post,
|
post,
|
||||||
uri(["connectors"]),
|
uri(["connectors"]),
|
||||||
?KAFKA_CONNECTOR(BadName, BadServer),
|
(?KAFKA_CONNECTOR(BadName, BadServer))#{
|
||||||
|
<<"resource_opts">> => #{
|
||||||
|
<<"start_timeout">> => <<"10ms">>
|
||||||
|
}
|
||||||
|
},
|
||||||
Config
|
Config
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
BadConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, BadName),
|
BadConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, BadName),
|
||||||
|
%% Checks that an `emqx_resource_manager:start' timeout when waiting for the resource to
|
||||||
|
%% be connected doesn't return a 500 error.
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
%% request from product: return 400 on such errors
|
%% request from product: return 400 on such errors
|
||||||
{ok, SC, _} when SC == 500 orelse SC == 400,
|
{ok, 400, _},
|
||||||
request(post, {operation, TestType, start, BadConnectorID}, Config)
|
request(post, {operation, TestType, start, BadConnectorID}, Config)
|
||||||
),
|
),
|
||||||
ok = gen_tcp:close(Sock),
|
ok = gen_tcp:close(Sock),
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_resource, [
|
{application, emqx_resource, [
|
||||||
{description, "Manager for all external resources"},
|
{description, "Manager for all external resources"},
|
||||||
{vsn, "0.1.29"},
|
{vsn, "0.1.30"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_resource_app, []}},
|
{mod, {emqx_resource_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -274,7 +274,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Start the resource
|
%% @doc Start the resource
|
||||||
-spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}.
|
-spec start(resource_id(), creation_opts()) -> ok | timeout | {error, Reason :: term()}.
|
||||||
start(ResId, Opts) ->
|
start(ResId, Opts) ->
|
||||||
StartTimeout = maps:get(start_timeout, Opts, ?T_OPERATION),
|
StartTimeout = maps:get(start_timeout, Opts, ?T_OPERATION),
|
||||||
case safe_call(ResId, start, StartTimeout) of
|
case safe_call(ResId, start, StartTimeout) of
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue where a 500 HTTP status code could be returned by `/connectors/:connector-id/start` when there is a timeout waiting for the resource to be connected.
|
Loading…
Reference in New Issue