diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 135d57dbe..ec57f2c85 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.3.0"}, + {vsn, "0.3.1"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector_api.erl b/apps/emqx_connector/src/emqx_connector_api.erl index 97f68b7ef..4c7a0476e 100644 --- a/apps/emqx_connector/src/emqx_connector_api.erl +++ b/apps/emqx_connector/src/emqx_connector_api.erl @@ -685,6 +685,10 @@ is_ok(OkResult = {ok, _}) -> OkResult; is_ok(Error = {error, _}) -> Error; +is_ok(timeout) -> + %% Returned by `emqx_resource_manager:start' when the connector fails to reach either + %% `?status_connected' or `?status_disconnected' within `start_timeout'. + timeout; is_ok(ResL) -> case lists:filter( @@ -723,6 +727,14 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, ConnectorType, ConnectorName case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok -> ?NO_CONTENT; + timeout -> + %% Returned by `emqx_resource_manager:start' when the connector fails to reach + %% either `?status_connected' or `?status_disconnected' within + %% `start_timeout'. + ?BAD_REQUEST(<< + "Timeout while waiting for connector to reach connected status." + " Please try again." + >>); {error, not_implemented} -> ?NOT_IMPLEMENTED; {error, timeout} -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index 2b1ada37b..0f57e9034 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -536,14 +536,20 @@ do_start_connector(TestType, Config) -> request_json( post, uri(["connectors"]), - ?KAFKA_CONNECTOR(BadName, BadServer), + (?KAFKA_CONNECTOR(BadName, BadServer))#{ + <<"resource_opts">> => #{ + <<"start_timeout">> => <<"10ms">> + } + }, Config ) ), BadConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, BadName), + %% Checks that an `emqx_resource_manager:start' timeout when waiting for the resource to + %% be connected doesn't return a 500 error. ?assertMatch( %% request from product: return 400 on such errors - {ok, SC, _} when SC == 500 orelse SC == 400, + {ok, 400, _}, request(post, {operation, TestType, start, BadConnectorID}, Config) ), ok = gen_tcp:close(Sock), diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 913cc5e8c..39b8ec8d1 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.29"}, + {vsn, "0.1.30"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index d650a2afb..a09a6e9f8 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -274,7 +274,7 @@ restart(ResId, Opts) when is_binary(ResId) -> end. %% @doc Start the resource --spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. +-spec start(resource_id(), creation_opts()) -> ok | timeout | {error, Reason :: term()}. start(ResId, Opts) -> StartTimeout = maps:get(start_timeout, Opts, ?T_OPERATION), case safe_call(ResId, start, StartTimeout) of diff --git a/changes/ce/fix-13148.en.md b/changes/ce/fix-13148.en.md new file mode 100644 index 000000000..15002e132 --- /dev/null +++ b/changes/ce/fix-13148.en.md @@ -0,0 +1 @@ +Fixed an issue where a 500 HTTP status code could be returned by `/connectors/:connector-id/start` when there is a timeout waiting for the resource to be connected.