refactor: re-organize dealing with unrecoverable errors

This commit is contained in:
Thales Macedo Garitezi 2023-01-20 08:24:53 -03:00
parent 6fa6c679bb
commit ca4a262b75
8 changed files with 141 additions and 67 deletions

View File

@ -957,12 +957,11 @@ assert_mqtt_msg_received(Topic, Payload) ->
receive receive
{deliver, Topic, #message{payload = Payload}} -> {deliver, Topic, #message{payload = Payload}} ->
ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]), ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
ok; ok
Msg -> after 300 ->
ct:pal("Unexpected Msg: ~p", [Msg]), {messages, Messages} = process_info(self(), messages),
assert_mqtt_msg_received(Topic, Payload) Msg = io_lib:format("timeout waiting for ~p on topic ~p", [Payload, Topic]),
after 100 -> error({Msg, #{messages => Messages}})
ct:fail("timeout waiting for ~p on topic ~p", [Payload, Topic])
end. end.
request(Method, Url, Body) -> request(Method, Url, Body) ->

View File

@ -189,8 +189,8 @@ on_batch_query(
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts), St = maps:get(BinKey, Sts),
case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
{error, Error} -> {error, _Error} = Result ->
{error, Error}; handle_result(Result);
{_Column, Results} -> {_Column, Results} ->
handle_batch_result(Results, 0) handle_batch_result(Results, 0)
end end
@ -417,6 +417,8 @@ to_bin(Bin) when is_binary(Bin) ->
to_bin(Atom) when is_atom(Atom) -> to_bin(Atom) when is_atom(Atom) ->
erlang:atom_to_binary(Atom). erlang:atom_to_binary(Atom).
handle_result({error, disconnected}) ->
{error, {recoverable_error, disconnected}};
handle_result({error, Error}) -> handle_result({error, Error}) ->
{error, {unrecoverable_error, Error}}; {error, {unrecoverable_error, Error}};
handle_result(Res) -> handle_result(Res) ->

View File

@ -783,45 +783,26 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
ok ok
end, end,
{nack, PostFn}; {nack, PostFn};
handle_query_result_pure(Id, {error, {unrecoverable_error, Reason}}, HasBeenSent) -> handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
PostFn = fun() -> case is_unrecoverable_error(Error) of
true ->
PostFn =
fun() ->
?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
inc_sent_failed(Id, HasBeenSent), inc_sent_failed(Id, HasBeenSent),
ok ok
end, end,
{ack, PostFn}; {ack, PostFn};
handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) -> false ->
%% the message will be queued in replayq or inflight window, PostFn =
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not fun() ->
%% sent this message.
PostFn = fun() ->
?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}),
ok
end,
{nack, PostFn};
handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}),
ok ok
end, end,
{nack, PostFn}; {nack, PostFn}
handle_query_result_pure(Id, {async_return, {error, {unrecoverable_error, Reason}}}, HasBeenSent) -> end;
PostFn = fun() -> handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) ->
?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), handle_query_async_result_pure(Id, Result, HasBeenSent);
inc_sent_failed(Id, HasBeenSent),
ok
end,
{ack, PostFn};
handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
ok
end,
{nack, PostFn};
handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) ->
{ack, fun() -> ok end};
handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) ->
{ack, fun() -> ok end};
handle_query_result_pure(Id, Result, HasBeenSent) -> handle_query_result_pure(Id, Result, HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
assert_ok_result(Result), assert_ok_result(Result),
@ -830,6 +811,28 @@ handle_query_result_pure(Id, Result, HasBeenSent) ->
end, end,
{ack, PostFn}. {ack, PostFn}.
handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) ->
case is_unrecoverable_error(Error) of
true ->
PostFn =
fun() ->
?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
inc_sent_failed(Id, HasBeenSent),
ok
end,
{ack, PostFn};
false ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => async_send_error, reason => Reason}),
ok
end,
{nack, PostFn}
end;
handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) ->
{ack, fun() -> ok end};
handle_query_async_result_pure(_Id, ok, _HasBeenSent) ->
{ack, fun() -> ok end}.
handle_async_worker_down(Data0, Pid) -> handle_async_worker_down(Data0, Pid) ->
#{async_workers := AsyncWorkers0} = Data0, #{async_workers := AsyncWorkers0} = Data0,
{WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0),

View File

@ -1135,8 +1135,6 @@ do_econnrefused_or_timeout_test(Config, Error) ->
ResourceId ResourceId
); );
{_, sync} -> {_, sync} ->
%% even waiting, hard to avoid flakiness... simpler to just sleep
%% a bit until stabilization.
wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(queuing, 0, 500),
wait_until_gauge_is(inflight, 1, 500), wait_until_gauge_is(inflight, 1, 500),
assert_metrics( assert_metrics(
@ -1336,12 +1334,19 @@ t_unrecoverable_error(Config) ->
), ),
wait_until_gauge_is(queuing, 0, _Timeout = 400), wait_until_gauge_is(queuing, 0, _Timeout = 400),
wait_until_gauge_is(inflight, 1, _Timeout = 400), %% TODO: once temporary clause in
%% `emqx_resource_buffer_worker:is_unrecoverable_error'
%% that marks all unknown errors as unrecoverable is
%% removed, this inflight should be 1, because we retry if
%% the worker is killed.
wait_until_gauge_is(inflight, 0, _Timeout = 400),
assert_metrics( assert_metrics(
#{ #{
dropped => 0, dropped => 0,
failed => 0, %% FIXME: see comment above; failed should be 0
inflight => 1, %% and inflight should be 1.
failed => 1,
inflight => 0,
matched => 1, matched => 1,
queuing => 0, queuing => 0,
retried => 0, retried => 0,

View File

@ -277,6 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n" " precision = ns\n"
" write_syntax = \"~s\"\n" " write_syntax = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1s\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" }\n" " }\n"
@ -313,6 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
" precision = ns\n" " precision = ns\n"
" write_syntax = \"~s\"\n" " write_syntax = \"~s\"\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 1s\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" }\n" " }\n"
@ -906,9 +908,23 @@ t_write_failure(Config) ->
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
case QueryMode of case QueryMode of
sync -> sync ->
?assertError(timeout, send_message(Config, SentData)); {_, {ok, _}} =
?wait_async_action(
try
send_message(Config, SentData)
catch
error:timeout ->
{error, timeout}
end,
#{?snk_kind := buffer_worker_flush_nack},
1_000
);
async -> async ->
?assertEqual(ok, send_message(Config, SentData)) ?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := buffer_worker_reply_after_query},
1_000
)
end end
end), end),
fun(Trace0) -> fun(Trace0) ->
@ -920,11 +936,20 @@ t_write_failure(Config) ->
?assert( ?assert(
{error, {error, {closed, "The connection was lost."}}} =:= Result orelse {error, {error, {closed, "The connection was lost."}}} =:= Result orelse
{error, {error, closed}} =:= Result orelse {error, {error, closed}} =:= Result orelse
{error, {error, econnrefused}} =:= Result, {error, {recoverable_error, {error, econnrefused}}} =:= Result,
#{got => Result} #{got => Result}
); );
async -> async ->
ok Trace = ?of_kind(buffer_worker_reply_after_query, Trace0),
?assertMatch([#{action := nack} | _], Trace),
[#{result := Result} | _] = Trace,
?assert(
{error, {recoverable_error, {closed, "The connection was lost."}}} =:=
Result orelse
{error, {error, closed}} =:= Result orelse
{error, {recoverable_error, econnrefused}} =:= Result,
#{got => Result}
)
end, end,
ok ok
end end

View File

@ -228,7 +228,7 @@ query_resource(Config, Request) ->
Name = ?config(mysql_name, Config), Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config), BridgeType = ?config(mysql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request). emqx_resource:query(ResourceID, Request, #{timeout => 500}).
unprepare(Config, Key) -> unprepare(Config, Key) ->
Name = ?config(mysql_name, Config), Name = ?config(mysql_name, Config),

View File

@ -249,7 +249,7 @@ query_resource(Config, Request) ->
Name = ?config(pgsql_name, Config), Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config), BridgeType = ?config(pgsql_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request). emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
connect_direct_pgsql(Config) -> connect_direct_pgsql(Config) ->
Opts = #{ Opts = #{
@ -422,12 +422,22 @@ t_write_failure(Config) ->
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace( ?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
{_, {ok, _}} =
?wait_async_action(
case QueryMode of case QueryMode of
sync -> sync ->
?assertError(timeout, send_message(Config, SentData)); try
send_message(Config, SentData)
catch
error:timeout ->
{error, timeout}
end;
async -> async ->
send_message(Config, SentData) send_message(Config, SentData)
end end,
#{?snk_kind := buffer_worker_flush_nack},
1_000
)
end), end),
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]), ct:pal("trace: ~p", [Trace0]),
@ -437,7 +447,7 @@ t_write_failure(Config) ->
case Error of case Error of
{resource_error, _} -> {resource_error, _} ->
ok; ok;
disconnected -> {recoverable_error, disconnected} ->
ok; ok;
_ -> _ ->
ct:fail("unexpected error: ~p", [Error]) ct:fail("unexpected error: ~p", [Error])

View File

@ -26,6 +26,7 @@
on_batch_query_async/4, on_batch_query_async/4,
on_get_status/2 on_get_status/2
]). ]).
-export([reply_callback/2]).
-export([ -export([
namespace/0, namespace/0,
@ -353,7 +354,12 @@ do_query(InstId, Client, Points) ->
connector => InstId, connector => InstId,
reason => Reason reason => Reason
}), }),
Err case is_unrecoverable_error(Err) of
true ->
{error, {unrecoverable_error, Reason}};
false ->
{error, {recoverable_error, Reason}}
end
end. end.
do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
@ -362,7 +368,20 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
connector => InstId, connector => InstId,
points => Points points => Points
}), }),
{ok, _WorkerPid} = influxdb:write_async(Client, Points, ReplyFunAndArgs). WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]},
{ok, _WorkerPid} = influxdb:write_async(Client, Points, WrappedReplyFunAndArgs).
reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
case is_unrecoverable_error(Error) of
true ->
Result = {error, {unrecoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
false ->
Result = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end;
reply_callback(ReplyFunAndArgs, Result) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans %% Tags & Fields Config Trans
@ -583,6 +602,17 @@ str(B) when is_binary(B) ->
str(S) when is_list(S) -> str(S) when is_list(S) ->
S. S.
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
true;
is_unrecoverable_error({error, {recoverable_error, _}}) ->
false;
is_unrecoverable_error({error, {error, econnrefused}}) ->
false;
is_unrecoverable_error({error, econnrefused}) ->
false;
is_unrecoverable_error(_) ->
false.
%%=================================================================== %%===================================================================
%% eunit tests %% eunit tests
%%=================================================================== %%===================================================================