Merge pull request #9832 from sstrigler/EMQX-8774-failure-to-handle-timeout-error-in-resource-worker

EMQX 8774 failure to handle timeout error in resource worker
This commit is contained in:
Zaiming (Stone) Shi 2023-01-27 14:36:44 +01:00 committed by GitHub
commit 52b75ada04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 41 additions and 37 deletions

View File

@ -899,7 +899,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
), ),
Payload1 = <<"hello2">>, Payload1 = <<"hello2">>,
Payload2 = <<"hello3">>, Payload2 = <<"hello3">>,
%% we need to to it in other processes because it'll block due to %% We need to do it in other processes because it'll block due to
%% the long timeout %% the long timeout
spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload1)) end), spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload1)) end),
spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload2)) end), spawn(fun() -> emqx:publish(emqx_message:make(LocalTopic, Payload2)) end),

View File

@ -20,7 +20,6 @@
-module(emqx_resource_buffer_worker). -module(emqx_resource_buffer_worker).
-include("emqx_resource.hrl"). -include("emqx_resource.hrl").
-include("emqx_resource_utils.hrl").
-include("emqx_resource_errors.hrl"). -include("emqx_resource_errors.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
@ -266,15 +265,17 @@ code_change(_OldVsn, State, _Extra) ->
%%============================================================================== %%==============================================================================
-define(PICK(ID, KEY, PID, EXPR), -define(PICK(ID, KEY, PID, EXPR),
try gproc_pool:pick_worker(ID, KEY) of try
case gproc_pool:pick_worker(ID, KEY) of
PID when is_pid(PID) -> PID when is_pid(PID) ->
EXPR; EXPR;
_ -> _ ->
?RESOURCE_ERROR(worker_not_created, "resource not created") ?RESOURCE_ERROR(worker_not_created, "resource not created")
end
catch catch
error:badarg -> error:badarg ->
?RESOURCE_ERROR(worker_not_created, "resource not created"); ?RESOURCE_ERROR(worker_not_created, "resource not created");
exit:{timeout, _} -> error:timeout ->
?RESOURCE_ERROR(timeout, "call resource timeout") ?RESOURCE_ERROR(timeout, "call resource timeout")
end end
). ).

View File

@ -19,8 +19,6 @@
-compile(export_all). -compile(export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_resource.hrl").
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -772,7 +770,10 @@ t_healthy_timeout(_) ->
%% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later. %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
#{health_check_interval => 200} #{health_check_interval => 200}
), ),
?assertError(timeout, emqx_resource:query(?ID, get_state, #{timeout => 1_000})), ?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(?ID, get_state, #{timeout => 1_000})
),
?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)), ?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)),
ok = emqx_resource:remove_local(?ID). ok = emqx_resource:remove_local(?ID).
@ -1583,8 +1584,8 @@ do_t_expiration_before_sending(QueryMode) ->
spawn_link(fun() -> spawn_link(fun() ->
case QueryMode of case QueryMode of
sync -> sync ->
?assertError( ?assertMatch(
timeout, {error, {resource_error, #{reason := timeout}}},
emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS}) emqx_resource:query(?ID, {inc_counter, 99}, #{timeout => TimeoutMS})
); );
async -> async ->
@ -1690,8 +1691,8 @@ do_t_expiration_before_sending_partial_batch(QueryMode) ->
spawn_link(fun() -> spawn_link(fun() ->
case QueryMode of case QueryMode of
sync -> sync ->
?assertError( ?assertMatch(
timeout, {error, {resource_error, #{reason := timeout}}},
emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS}) emqx_resource:query(?ID, {inc_counter, 199}, #{timeout => TimeoutMS})
); );
async -> async ->
@ -2043,8 +2044,8 @@ do_t_expiration_retry(IsBatch) ->
ResumeInterval * 2 ResumeInterval * 2
), ),
spawn_link(fun() -> spawn_link(fun() ->
?assertError( ?assertMatch(
timeout, {error, {resource_error, #{reason := timeout}}},
emqx_resource:query( emqx_resource:query(
?ID, ?ID,
{inc_counter, 1}, {inc_counter, 1},
@ -2127,8 +2128,8 @@ t_expiration_retry_batch_multiple_times(_Config) ->
), ),
TimeoutMS = 100, TimeoutMS = 100,
spawn_link(fun() -> spawn_link(fun() ->
?assertError( ?assertMatch(
timeout, {error, {resource_error, #{reason := timeout}}},
emqx_resource:query( emqx_resource:query(
?ID, ?ID,
{inc_counter, 1}, {inc_counter, 1},
@ -2137,8 +2138,8 @@ t_expiration_retry_batch_multiple_times(_Config) ->
) )
end), end),
spawn_link(fun() -> spawn_link(fun() ->
?assertError( ?assertMatch(
timeout, {error, {resource_error, #{reason := timeout}}},
emqx_resource:query( emqx_resource:query(
?ID, ?ID,
{inc_counter, 2}, {inc_counter, 2},

View File

@ -0,0 +1 @@
Improve error log when bridge in 'sync' mode timed out to get response.

View File

@ -0,0 +1 @@
优化桥接同步资源调用超时情况下的一个错误日志。

View File

@ -910,12 +910,10 @@ t_write_failure(Config) ->
sync -> sync ->
{_, {ok, _}} = {_, {ok, _}} =
?wait_async_action( ?wait_async_action(
try ?assertMatch(
{error, {resource_error, #{reason := timeout}}},
send_message(Config, SentData) send_message(Config, SentData)
catch ),
error:timeout ->
{error, timeout}
end,
#{?snk_kind := buffer_worker_flush_nack}, #{?snk_kind := buffer_worker_flush_nack},
1_000 1_000
); );
@ -947,7 +945,8 @@ t_write_failure(Config) ->
{error, {recoverable_error, {closed, "The connection was lost."}}} =:= {error, {recoverable_error, {closed, "The connection was lost."}}} =:=
Result orelse Result orelse
{error, {error, closed}} =:= Result orelse {error, {error, closed}} =:= Result orelse
{error, {recoverable_error, econnrefused}} =:= Result, {error, {recoverable_error, econnrefused}} =:= Result orelse
{error, {recoverable_error, noproc}} =:= Result,
#{got => Result} #{got => Result}
) )
end, end,

View File

@ -406,7 +406,10 @@ t_write_failure(Config) ->
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
case QueryMode of case QueryMode of
sync -> sync ->
?assertError(timeout, send_message(Config, SentData)); ?assertMatch(
{error, {resource_error, #{reason := timeout}}},
send_message(Config, SentData)
);
async -> async ->
send_message(Config, SentData) send_message(Config, SentData)
end end
@ -439,8 +442,8 @@ t_write_timeout(Config) ->
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000, Timeout = 1000,
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertError( ?assertMatch(
timeout, {error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData, [], Timeout}) query_resource(Config, {send_message, SentData, [], Timeout})
) )
end), end),

View File

@ -426,12 +426,7 @@ t_write_failure(Config) ->
?wait_async_action( ?wait_async_action(
case QueryMode of case QueryMode of
sync -> sync ->
try ?assertMatch({error, _}, send_message(Config, SentData));
send_message(Config, SentData)
catch
error:timeout ->
{error, timeout}
end;
async -> async ->
send_message(Config, SentData) send_message(Config, SentData)
end, end,
@ -467,7 +462,10 @@ t_write_timeout(Config) ->
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000, Timeout = 1000,
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertError(timeout, query_resource(Config, {send_message, SentData, [], Timeout})) ?assertMatch(
{error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData, [], Timeout})
)
end), end),
ok. ok.