From ca4a262b758ba7adc537813292f76242bf4f96c1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 20 Jan 2023 08:24:53 -0300 Subject: [PATCH] refactor: re-organize dealing with unrecoverable errors --- .../test/emqx_bridge_mqtt_SUITE.erl | 11 ++- .../src/emqx_connector_pgsql.erl | 6 +- .../src/emqx_resource_buffer_worker.erl | 81 ++++++++++--------- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 15 ++-- .../test/emqx_ee_bridge_influxdb_SUITE.erl | 33 +++++++- .../test/emqx_ee_bridge_mysql_SUITE.erl | 2 +- .../test/emqx_ee_bridge_pgsql_SUITE.erl | 26 ++++-- .../src/emqx_ee_connector_influxdb.erl | 34 +++++++- 8 files changed, 141 insertions(+), 67 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 1f5b06fab..a99f06f20 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -957,12 +957,11 @@ assert_mqtt_msg_received(Topic, Payload) -> receive {deliver, Topic, #message{payload = Payload}} -> ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]), - ok; - Msg -> - ct:pal("Unexpected Msg: ~p", [Msg]), - assert_mqtt_msg_received(Topic, Payload) - after 100 -> - ct:fail("timeout waiting for ~p on topic ~p", [Payload, Topic]) + ok + after 300 -> + {messages, Messages} = process_info(self(), messages), + Msg = io_lib:format("timeout waiting for ~p on topic ~p", [Payload, Topic]), + error({Msg, #{messages => Messages}}) end. request(Method, Url, Body) -> diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 34defb5e5..890227b9d 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -189,8 +189,8 @@ on_batch_query( Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of - {error, Error} -> - {error, Error}; + {error, _Error} = Result -> + handle_result(Result); {_Column, Results} -> handle_batch_result(Results, 0) end @@ -417,6 +417,8 @@ to_bin(Bin) when is_binary(Bin) -> to_bin(Atom) when is_atom(Atom) -> erlang:atom_to_binary(Atom). +handle_result({error, disconnected}) -> + {error, {recoverable_error, disconnected}}; handle_result({error, Error}) -> {error, {unrecoverable_error, Error}}; handle_result(Res) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8b79ce5a8..669b8e474 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -783,45 +783,26 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> ok end, {nack, PostFn}; -handle_query_result_pure(Id, {error, {unrecoverable_error, Reason}}, HasBeenSent) -> - PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), - inc_sent_failed(Id, HasBeenSent), - ok - end, - {ack, PostFn}; -handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) -> - %% the message will be queued in replayq or inflight window, - %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not - %% sent this message. - PostFn = fun() -> - ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), - ok - end, - {nack, PostFn}; -handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) -> - PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), - ok - end, - {nack, PostFn}; -handle_query_result_pure(Id, {async_return, {error, {unrecoverable_error, Reason}}}, HasBeenSent) -> - PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), - inc_sent_failed(Id, HasBeenSent), - ok - end, - {ack, PostFn}; -handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) -> - PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), - ok - end, - {nack, PostFn}; -handle_query_result_pure(_Id, {async_return, ok}, _HasBeenSent) -> - {ack, fun() -> ok end}; -handle_query_result_pure(_Id, {async_return, {ok, Pid}}, _HasBeenSent) when is_pid(Pid) -> - {ack, fun() -> ok end}; +handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> + case is_unrecoverable_error(Error) of + true -> + PostFn = + fun() -> + ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), + inc_sent_failed(Id, HasBeenSent), + ok + end, + {ack, PostFn}; + false -> + PostFn = + fun() -> + ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), + ok + end, + {nack, PostFn} + end; +handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) -> + handle_query_async_result_pure(Id, Result, HasBeenSent); handle_query_result_pure(Id, Result, HasBeenSent) -> PostFn = fun() -> assert_ok_result(Result), @@ -830,6 +811,28 @@ handle_query_result_pure(Id, Result, HasBeenSent) -> end, {ack, PostFn}. +handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> + case is_unrecoverable_error(Error) of + true -> + PostFn = + fun() -> + ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), + inc_sent_failed(Id, HasBeenSent), + ok + end, + {ack, PostFn}; + false -> + PostFn = fun() -> + ?SLOG(error, #{id => Id, msg => async_send_error, reason => Reason}), + ok + end, + {nack, PostFn} + end; +handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) -> + {ack, fun() -> ok end}; +handle_query_async_result_pure(_Id, ok, _HasBeenSent) -> + {ack, fun() -> ok end}. + handle_async_worker_down(Data0, Pid) -> #{async_workers := AsyncWorkers0} = Data0, {WorkerMRef, AsyncWorkers} = maps:take(Pid, AsyncWorkers0), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index c9560739f..247b7799b 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -1135,8 +1135,6 @@ do_econnrefused_or_timeout_test(Config, Error) -> ResourceId ); {_, sync} -> - %% even waiting, hard to avoid flakiness... simpler to just sleep - %% a bit until stabilization. wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(inflight, 1, 500), assert_metrics( @@ -1336,12 +1334,19 @@ t_unrecoverable_error(Config) -> ), wait_until_gauge_is(queuing, 0, _Timeout = 400), - wait_until_gauge_is(inflight, 1, _Timeout = 400), + %% TODO: once temporary clause in + %% `emqx_resource_buffer_worker:is_unrecoverable_error' + %% that marks all unknown errors as unrecoverable is + %% removed, this inflight should be 1, because we retry if + %% the worker is killed. + wait_until_gauge_is(inflight, 0, _Timeout = 400), assert_metrics( #{ dropped => 0, - failed => 0, - inflight => 1, + %% FIXME: see comment above; failed should be 0 + %% and inflight should be 1. + failed => 1, + inflight => 0, matched => 1, queuing => 0, retried => 0, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index fc7dce418..e1899b1b2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -277,6 +277,7 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" + " request_timeout = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" @@ -313,6 +314,7 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" + " request_timeout = 1s\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" @@ -906,9 +908,23 @@ t_write_failure(Config) -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> case QueryMode of sync -> - ?assertError(timeout, send_message(Config, SentData)); + {_, {ok, _}} = + ?wait_async_action( + try + send_message(Config, SentData) + catch + error:timeout -> + {error, timeout} + end, + #{?snk_kind := buffer_worker_flush_nack}, + 1_000 + ); async -> - ?assertEqual(ok, send_message(Config, SentData)) + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := buffer_worker_reply_after_query}, + 1_000 + ) end end), fun(Trace0) -> @@ -920,11 +936,20 @@ t_write_failure(Config) -> ?assert( {error, {error, {closed, "The connection was lost."}}} =:= Result orelse {error, {error, closed}} =:= Result orelse - {error, {error, econnrefused}} =:= Result, + {error, {recoverable_error, {error, econnrefused}}} =:= Result, #{got => Result} ); async -> - ok + Trace = ?of_kind(buffer_worker_reply_after_query, Trace0), + ?assertMatch([#{action := nack} | _], Trace), + [#{result := Result} | _] = Trace, + ?assert( + {error, {recoverable_error, {closed, "The connection was lost."}}} =:= + Result orelse + {error, {error, closed}} =:= Result orelse + {error, {recoverable_error, econnrefused}} =:= Result, + #{got => Result} + ) end, ok end diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 3bac01c66..57792b366 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -228,7 +228,7 @@ query_resource(Config, Request) -> Name = ?config(mysql_name, Config), BridgeType = ?config(mysql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request). + emqx_resource:query(ResourceID, Request, #{timeout => 500}). unprepare(Config, Key) -> Name = ?config(mysql_name, Config), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl index bdbbed8cf..25752f685 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl @@ -249,7 +249,7 @@ query_resource(Config, Request) -> Name = ?config(pgsql_name, Config), BridgeType = ?config(pgsql_bridge_type, Config), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request). + emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). connect_direct_pgsql(Config) -> Opts = #{ @@ -422,12 +422,22 @@ t_write_failure(Config) -> SentData = #{payload => Val, timestamp => 1668602148000}, ?check_trace( emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - case QueryMode of - sync -> - ?assertError(timeout, send_message(Config, SentData)); - async -> - send_message(Config, SentData) - end + {_, {ok, _}} = + ?wait_async_action( + case QueryMode of + sync -> + try + send_message(Config, SentData) + catch + error:timeout -> + {error, timeout} + end; + async -> + send_message(Config, SentData) + end, + #{?snk_kind := buffer_worker_flush_nack}, + 1_000 + ) end), fun(Trace0) -> ct:pal("trace: ~p", [Trace0]), @@ -437,7 +447,7 @@ t_write_failure(Config) -> case Error of {resource_error, _} -> ok; - disconnected -> + {recoverable_error, disconnected} -> ok; _ -> ct:fail("unexpected error: ~p", [Error]) diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 0037242b3..1ae4d9874 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -26,6 +26,7 @@ on_batch_query_async/4, on_get_status/2 ]). +-export([reply_callback/2]). -export([ namespace/0, @@ -353,7 +354,12 @@ do_query(InstId, Client, Points) -> connector => InstId, reason => Reason }), - Err + case is_unrecoverable_error(Err) of + true -> + {error, {unrecoverable_error, Reason}}; + false -> + {error, {recoverable_error, Reason}} + end end. do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> @@ -362,7 +368,20 @@ do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> connector => InstId, points => Points }), - {ok, _WorkerPid} = influxdb:write_async(Client, Points, ReplyFunAndArgs). + WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]}, + {ok, _WorkerPid} = influxdb:write_async(Client, Points, WrappedReplyFunAndArgs). + +reply_callback(ReplyFunAndArgs, {error, Reason} = Error) -> + case is_unrecoverable_error(Error) of + true -> + Result = {error, {unrecoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); + false -> + Result = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result) + end; +reply_callback(ReplyFunAndArgs, Result) -> + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result). %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans @@ -583,6 +602,17 @@ str(B) when is_binary(B) -> str(S) when is_list(S) -> S. +is_unrecoverable_error({error, {unrecoverable_error, _}}) -> + true; +is_unrecoverable_error({error, {recoverable_error, _}}) -> + false; +is_unrecoverable_error({error, {error, econnrefused}}) -> + false; +is_unrecoverable_error({error, econnrefused}) -> + false; +is_unrecoverable_error(_) -> + false. + %%=================================================================== %% eunit tests %%===================================================================