diff --git a/apps/emqx_authz/src/emqx_authz_http.erl b/apps/emqx_authz/src/emqx_authz_http.erl index a300291d1..852a667c8 100644 --- a/apps/emqx_authz/src/emqx_authz_http.erl +++ b/apps/emqx_authz/src/emqx_authz_http.erl @@ -82,7 +82,7 @@ authorize( } = Config ) -> Request = generate_request(PubSub, Topic, Client, Config), - try emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of + case emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of {ok, 204, _Headers} -> {matched, allow}; {ok, 200, Headers, Body} -> @@ -112,16 +112,6 @@ authorize( reason => Reason }), ignore - catch - error:timeout -> - Reason = timeout, - ?tp(authz_http_request_failure, #{error => Reason}), - ?SLOG(error, #{ - msg => "http_server_query_failed", - resource => ResourceID, - reason => Reason - }), - ignore end. log_nomtach_msg(Status, Headers, Body) -> diff --git a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl index e5a72f680..e91da9829 100644 --- a/apps/emqx_authz/test/emqx_authz_http_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_http_SUITE.erl @@ -172,7 +172,7 @@ t_response_handling(_Config) -> [ #{ ?snk_kind := authz_http_request_failure, - error := timeout + error := {recoverable_error, econnrefused} } ], ?of_kind(authz_http_request_failure, Trace) diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index c144c48e9..066e053d4 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -192,7 +192,7 @@ on_batch_query( {Key, _} -> case maps:get(Key, Inserts, undefined) of undefined -> - {error, batch_select_not_implemented}; + {error, {unrecoverable_error, batch_select_not_implemented}}; InsertSQL -> Tokens = maps:get(Key, ParamsTokens), on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State) @@ -200,7 +200,7 @@ on_batch_query( Request -> LogMeta = #{connector => InstId, first_request => Request, state => State}, ?SLOG(error, LogMeta#{msg => "invalid request"}), - {error, invalid_request} + {error, {unrecoverable_error, invalid_request}} end. mysql_function(sql) -> @@ -267,7 +267,7 @@ init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) -> maybe_prepare_sql(SQLOrKey, Prepares, PoolName) -> case maps:is_key(SQLOrKey, Prepares) of true -> prepare_sql(Prepares, PoolName); - false -> {error, prepared_statement_invalid} + false -> {error, {unrecoverable_error, prepared_statement_invalid}} end. prepare_sql(Prepares, PoolName) when is_map(Prepares) -> @@ -465,12 +465,12 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) -> LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} ), {error, {recoverable_error, Reason}}; - {error, Reason} = Result -> + {error, Reason} -> ?SLOG( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} ), - Result; + {error, {unrecoverable_error, Reason}}; Result -> ?tp( mysql_connector_query_return, @@ -483,5 +483,5 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) -> error, LogMeta#{msg => "mysql_connector_invalid_params", params => Data} ), - {error, {invalid_params, Data}} + {error, {unrecoverable_error, {invalid_params, Data}}} end. diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 9965ff3b4..34defb5e5 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -153,7 +153,8 @@ on_query( }), Type = pgsql_query_type(TypeOrKey), {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), - on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data). + Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data), + handle_result(Res). pgsql_query_type(sql) -> query; @@ -182,23 +183,17 @@ on_batch_query( msg => "batch prepare not implemented" }, ?SLOG(error, Log), - {error, batch_prepare_not_implemented}; + {error, {unrecoverable_error, batch_prepare_not_implemented}}; TokenList -> {_, Datas} = lists:unzip(BatchReq), Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], St = maps:get(BinKey, Sts), - {_Column, Results} = on_sql_query(InstId, PoolName, execute_batch, St, Datas2), - %% this local function only suits for the result of batch insert - TransResult = fun - Trans([{ok, Count} | T], Acc) -> - Trans(T, Acc + Count); - Trans([{error, _} = Error | _], _Acc) -> - Error; - Trans([], Acc) -> - {ok, Acc} - end, - - TransResult(Results, 0) + case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of + {error, Error} -> + {error, Error}; + {_Column, Results} -> + handle_batch_result(Results, 0) + end end; _ -> Log = #{ @@ -208,7 +203,7 @@ on_batch_query( msg => "invalid request" }, ?SLOG(error, Log), - {error, invalid_request} + {error, {unrecoverable_error, invalid_request}} end. proc_sql_params(query, SQLOrKey, Params, _State) -> @@ -225,24 +220,38 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens}) end. on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> - Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover), - case Result of - {error, Reason} -> + try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of + {error, Reason} = Result -> + ?tp( + pgsql_connector_query_return, + #{error => Reason} + ), ?SLOG(error, #{ msg => "postgresql connector do sql query failed", connector => InstId, type => Type, sql => NameOrSQL, reason => Reason - }); - _ -> + }), + Result; + Result -> ?tp( pgsql_connector_query_return, #{result => Result} ), - ok - end, - Result. + Result + catch + error:function_clause:Stacktrace -> + ?SLOG(error, #{ + msg => "postgresql connector do sql query failed", + connector => InstId, + type => Type, + sql => NameOrSQL, + reason => function_clause, + stacktrace => Stacktrace + }), + {error, {unrecoverable_error, invalid_request}} + end. on_get_status(_InstId, #{poolname := Pool} = State) -> case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of @@ -407,3 +416,15 @@ to_bin(Bin) when is_binary(Bin) -> Bin; to_bin(Atom) when is_atom(Atom) -> erlang:atom_to_binary(Atom). + +handle_result({error, Error}) -> + {error, {unrecoverable_error, Error}}; +handle_result(Res) -> + Res. + +handle_batch_result([{ok, Count} | Rest], Acc) -> + handle_batch_result(Rest, Acc + Count); +handle_batch_result([{error, Error} | _Rest], _Acc) -> + {error, {unrecoverable_error, Error}}; +handle_batch_result([], Acc) -> + {ok, Acc}. diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 7fdf9d28d..4bb46bca3 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -207,11 +207,23 @@ do_query(InstId, Query, #{poolname := PoolName, type := Type} = State) -> connector => InstId, query => Query, reason => Reason - }); + }), + case is_unrecoverable_error(Reason) of + true -> + {error, {unrecoverable_error, Reason}}; + false -> + Result + end; _ -> - ok - end, - Result. + Result + end. + +is_unrecoverable_error(Results) when is_list(Results) -> + lists:any(fun is_unrecoverable_error/1, Results); +is_unrecoverable_error({error, <<"ERR unknown command ", _/binary>>}) -> + true; +is_unrecoverable_error(_) -> + false. extract_eredis_cluster_workers(PoolName) -> lists:flatten([ diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl index a1d8fe9d5..87d2b8e21 100644 --- a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl @@ -128,8 +128,12 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> emqx_resource:query(PoolName, {cmds, [RedisCommand, RedisCommand]}) ), ?assertMatch( - {error, [{ok, <<"PONG">>}, {error, _}]}, - emqx_resource:query(PoolName, {cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]}) + {error, {unrecoverable_error, [{ok, <<"PONG">>}, {error, _}]}}, + emqx_resource:query( + PoolName, + {cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]}, + #{timeout => 500} + ) ), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index aff66c287..0fd21bfcd 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -79,8 +79,7 @@ query/2, query/3, %% query the instance without batching and queuing messages. - simple_sync_query/2, - simple_async_query/3 + simple_sync_query/2 ]). %% Direct calls to the callback module @@ -278,10 +277,6 @@ query(ResId, Request, Opts) -> simple_sync_query(ResId, Request) -> emqx_resource_worker:simple_sync_query(ResId, Request). --spec simple_async_query(resource_id(), Request :: term(), reply_fun()) -> Result :: term(). -simple_async_query(ResId, Request, ReplyFun) -> - emqx_resource_worker:simple_async_query(ResId, Request, ReplyFun). - -spec start(resource_id()) -> ok | {error, Reason :: term()}. start(ResId) -> start(ResId, #{}). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 51a95424a..6b739a44c 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -38,8 +38,7 @@ ]). -export([ - simple_sync_query/2, - simple_async_query/3 + simple_sync_query/2 ]). -export([ @@ -53,7 +52,7 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([reply_after_query/7, batch_reply_after_query/7]). +-export([reply_after_query/8, batch_reply_after_query/8]). -export([clear_disk_queue_dir/2]). @@ -121,7 +120,7 @@ simple_sync_query(Id, Request) -> %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. Index = undefined, - QueryOpts = #{perform_inflight_capacity_check => false}, + QueryOpts = #{simple_query => true}, emqx_resource_metrics:matched_inc(Id), Ref = make_message_ref(), Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts), @@ -129,23 +128,6 @@ simple_sync_query(Id, Request) -> _ = handle_query_result(Id, Result, HasBeenSent), Result. --spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). -simple_async_query(Id, Request, ReplyFun) -> - %% Note: since calling this function implies in bypassing the - %% buffer workers, and each buffer worker index is used when - %% collecting gauge metrics, we use this dummy index. If this - %% call ends up calling buffering functions, that's a bug and - %% would mess up the metrics anyway. `undefined' is ignored by - %% `emqx_resource_metrics:*_shift/3'. - Index = undefined, - QueryOpts = #{perform_inflight_capacity_check => false}, - emqx_resource_metrics:matched_inc(Id), - Ref = make_message_ref(), - Result = call_query(async, Id, Index, Ref, ?QUERY(ReplyFun, Request, false), QueryOpts), - HasBeenSent = false, - _ = handle_query_result(Id, Result, HasBeenSent), - Result. - -spec block(pid()) -> ok. block(ServerRef) -> gen_statem:cast(ServerRef, block). @@ -334,15 +316,15 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> resume_interval := ResumeT } = Data0, ?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), - QueryOpts = #{}, + QueryOpts = #{simple_query => false}, Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), ReplyResult = case QueryOrBatch of ?QUERY(From, CoreReq, HasBeenSent) -> Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), - reply_caller_defer_metrics(Id, Reply); + reply_caller_defer_metrics(Id, Reply, QueryOpts); [?QUERY(_, _, _) | _] = Batch -> - batch_reply_caller_defer_metrics(Id, Result, Batch) + batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) end, case ReplyResult of %% Send failed because resource is down @@ -478,10 +460,10 @@ do_flush( } = Data0, %% unwrap when not batching (i.e., batch size == 1) [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, - QueryOpts = #{inflight_tid => InflightTID}, + QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), - case reply_caller(Id, Reply) of + case reply_caller(Id, Reply, QueryOpts) of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. nack -> @@ -517,7 +499,15 @@ do_flush( %% calls the corresponding callback function. Also, we %% must ensure the async worker is being monitored for %% such requests. - is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), + IsUnrecoverableError = is_unrecoverable_error(Result), + case is_async_return(Result) of + true when IsUnrecoverableError -> + ack_inflight(InflightTID, Ref, Id, Index); + true -> + ok; + false -> + ack_inflight(InflightTID, Ref, Id, Index) + end, {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), @@ -548,9 +538,9 @@ do_flush(Data0, #{ batch_size := BatchSize, inflight_tid := InflightTID } = Data0, - QueryOpts = #{inflight_tid => InflightTID}, + QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts), - case batch_reply_caller(Id, Result, Batch) of + case batch_reply_caller(Id, Result, Batch, QueryOpts) of %% Failed; remove the request from the queue, as we cannot pop %% from it again, but we'll retry it using the inflight table. nack -> @@ -586,7 +576,15 @@ do_flush(Data0, #{ %% calls the corresponding callback function. Also, we %% must ensure the async worker is being monitored for %% such requests. - is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), + IsUnrecoverableError = is_unrecoverable_error(Result), + case is_async_return(Result) of + true when IsUnrecoverableError -> + ack_inflight(InflightTID, Ref, Id, Index); + true -> + ok; + false -> + ack_inflight(InflightTID, Ref, Id, Index) + end, {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), store_async_worker_reference(InflightTID, Ref, WorkerMRef), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), @@ -609,16 +607,16 @@ do_flush(Data0, #{ end end. -batch_reply_caller(Id, BatchResult, Batch) -> - {ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch), +batch_reply_caller(Id, BatchResult, Batch, QueryOpts) -> + {ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts), PostFn(), ShouldBlock. -batch_reply_caller_defer_metrics(Id, BatchResult, Batch) -> +batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> {ShouldAck, PostFns} = lists:foldl( fun(Reply, {_ShouldAck, PostFns}) -> - {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply), + {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), {ShouldAck, [PostFn | PostFns]} end, {ack, []}, @@ -629,37 +627,53 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch) -> PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end, {ShouldAck, PostFn}. -reply_caller(Id, Reply) -> - {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply), +reply_caller(Id, Reply, QueryOpts) -> + {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts), PostFn(), ShouldAck. %% Should only reply to the caller when the decision is final (not %% retriable). See comment on `handle_query_result_pure'. -reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result)) -> +reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), _QueryOpts) -> handle_query_result_pure(Id, Result, HasBeenSent); -reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result)) when +reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), QueryOpts) when is_function(ReplyFun) -> + IsSimpleQuery = maps:get(simple_query, QueryOpts, false), + IsUnrecoverableError = is_unrecoverable_error(Result), {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), - case {ShouldAck, Result} of - {nack, _} -> + case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of + {ack, {async_return, _}, true, _} -> + apply(ReplyFun, Args ++ [Result]), ok; - {ack, {async_return, _}} -> + {ack, {async_return, _}, false, _} -> ok; - {ack, _} -> + {_, _, _, true} -> + apply(ReplyFun, Args ++ [Result]), + ok; + {nack, _, _, _} -> + ok; + {ack, _, _, _} -> apply(ReplyFun, Args ++ [Result]), ok end, {ShouldAck, PostFn}; -reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result)) -> +reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), QueryOpts) -> + IsSimpleQuery = maps:get(simple_query, QueryOpts, false), + IsUnrecoverableError = is_unrecoverable_error(Result), {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), - case {ShouldAck, Result} of - {nack, _} -> + case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of + {ack, {async_return, _}, true, _} -> + gen_statem:reply(From, Result), ok; - {ack, {async_return, _}} -> + {ack, {async_return, _}, false, _} -> ok; - {ack, _} -> + {_, _, _, true} -> + gen_statem:reply(From, Result), + ok; + {nack, _, _, _} -> + ok; + {ack, _, _, _} -> gen_statem:reply(From, Result), ok end, @@ -679,7 +693,6 @@ handle_query_result(Id, Result, HasBeenSent) -> handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{msg => resource_exception, info => Msg}), - %% inc_sent_failed(Id, HasBeenSent), ok end, {nack, PostFn}; @@ -704,12 +717,16 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), - %% emqx_resource_metrics:dropped_other_inc(Id), ok end, {nack, PostFn}; -%% TODO: invert this logic: we should differentiate errors that are -%% irrecoverable; all others are deemed recoverable. +handle_query_result_pure(Id, {error, {unrecoverable_error, Reason}}, HasBeenSent) -> + PostFn = fun() -> + ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), + inc_sent_failed(Id, HasBeenSent), + ok + end, + {ack, PostFn}; handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) -> %% the message will be queued in replayq or inflight window, %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not @@ -725,6 +742,13 @@ handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) -> ok end, {nack, PostFn}; +handle_query_result_pure(Id, {async_return, {error, {unrecoverable_error, Reason}}}, HasBeenSent) -> + PostFn = fun() -> + ?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}), + inc_sent_failed(Id, HasBeenSent), + ok + end, + {ack, PostFn}; handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) -> PostFn = fun() -> ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), @@ -799,8 +823,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt ?APPLY_RESOURCE( call_query_async, begin - ReplyFun = fun ?MODULE:reply_after_query/7, - Args = [self(), Id, Index, InflightTID, Ref, Query], + ReplyFun = fun ?MODULE:reply_after_query/8, + Args = [self(), Id, Index, InflightTID, Ref, Query, QueryOpts], IsRetriable = false, WorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), @@ -824,8 +848,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt ?APPLY_RESOURCE( call_batch_query_async, begin - ReplyFun = fun ?MODULE:batch_reply_after_query/7, - ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, + ReplyFun = fun ?MODULE:batch_reply_after_query/8, + ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], IsRetriable = false, WorkerMRef = undefined, @@ -837,11 +861,15 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt Batch ). -reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), Result) -> +reply_after_query( + Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), QueryOpts, Result +) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. - {Action, PostFn} = reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)), + {Action, PostFn} = reply_caller_defer_metrics( + Id, ?REPLY(From, Request, HasBeenSent, Result), QueryOpts + ), case Action of nack -> %% Keep retrying. @@ -867,11 +895,11 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee ok end. -batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> +batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. - {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch), + {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts), case Action of nack -> %% Keep retrying. @@ -1118,12 +1146,18 @@ do_cancel_inflight_item(Data, Ref) -> IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), Result = {error, interrupted}, - _ = batch_reply_caller(Id, Result, Batch), + QueryOpts = #{simple_query => false}, + _ = batch_reply_caller(Id, Result, Batch, QueryOpts), ?tp(resource_worker_cancelled_inflight, #{ref => Ref}), ok. %%============================================================================== +inc_sent_failed(Id, _HasBeenSent = true) -> + emqx_resource_metrics:retried_failed_inc(Id); +inc_sent_failed(Id, _HasBeenSent) -> + emqx_resource_metrics:failed_inc(Id). + inc_sent_success(Id, _HasBeenSent = true) -> emqx_resource_metrics:retried_success_inc(Id); inc_sent_success(Id, _HasBeenSent) -> @@ -1204,10 +1238,14 @@ mark_as_sent(?QUERY(From, Req, _)) -> HasBeenSent = true, ?QUERY(From, Req, HasBeenSent). -is_async(ResourceId) -> - case emqx_resource_manager:ets_lookup(ResourceId) of - {ok, _Group, #{query_mode := QM, callback_mode := CM}} -> - call_mode(QM, CM) =:= async; - _ -> - false - end. +is_unrecoverable_error({error, {unrecoverable_error, _}}) -> + true; +is_unrecoverable_error({async_return, Result}) -> + is_unrecoverable_error(Result); +is_unrecoverable_error(_) -> + false. + +is_async_return({async_return, _}) -> + true; +is_async_return(_) -> + false. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index bb87a9f37..0372c21ea 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -778,15 +778,25 @@ t_bad_timestamp(Config) -> {async, false} -> ?assertEqual(ok, Return), ?assertMatch( - [#{error := [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}], + [ + #{ + error := [ + {error, {bad_timestamp, [<<"bad_timestamp">>]}} + ] + } + ], ?of_kind(influxdb_connector_send_query_error, Trace) ); {sync, false} -> ?assertEqual( - {error, [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}, Return + {error, + {unrecoverable_error, [ + {error, {bad_timestamp, [<<"bad_timestamp">>]}} + ]}}, + Return ); {sync, true} -> - ?assertEqual({error, points_trans_failed}, Return) + ?assertEqual({error, {unrecoverable_error, points_trans_failed}}, Return) end, ok end @@ -894,11 +904,19 @@ t_write_failure(Config) -> }, ?check_trace( emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - send_message(Config, SentData) - end), - fun(Result, _Trace) -> case QueryMode of sync -> + ?assertError(timeout, send_message(Config, SentData)); + async -> + ?assertEqual(ok, send_message(Config, SentData)) + end + end), + fun(Trace0) -> + case QueryMode of + sync -> + Trace = ?of_kind(resource_worker_flush_nack, Trace0), + ?assertMatch([_ | _], Trace), + [#{result := Result} | _] = Trace, ?assert( {error, {error, {closed, "The connection was lost."}}} =:= Result orelse {error, {error, closed}} =:= Result orelse @@ -906,7 +924,7 @@ t_write_failure(Config) -> #{got => Result} ); async -> - ?assertEqual(ok, Result) + ok end, ok end @@ -938,11 +956,7 @@ t_missing_field(Config) -> begin emqx:publish(Msg0), emqx:publish(Msg1), - NEvents = - case IsBatch of - true -> 1; - false -> 2 - end, + NEvents = 1, {ok, _} = snabbkaffe:block_until( ?match_n_events(NEvents, #{ @@ -964,7 +978,7 @@ t_missing_field(Config) -> ); false -> ?assertMatch( - [#{error := [{error, no_fields}]}, #{error := [{error, no_fields}]} | _], + [#{error := [{error, no_fields}]} | _], ?of_kind(influxdb_connector_send_query_error, Trace) ) end, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 812c4ee85..ce38c357d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_ee_bridge_mysql_SUITE). @@ -170,6 +170,7 @@ mysql_config(BridgeType, Config) -> " password = ~p\n" " sql = ~p\n" " resource_opts = {\n" + " request_timeout = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" @@ -397,20 +398,32 @@ t_write_failure(Config) -> ProxyName = ?config(proxy_name, Config), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), + QueryMode = ?config(query_mode, Config), {ok, _} = create_bridge(Config), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, ?check_trace( emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - send_message(Config, SentData) + case QueryMode of + sync -> + ?assertError(timeout, send_message(Config, SentData)); + async -> + send_message(Config, SentData) + end end), - fun - ({error, {resource_error, _}}, _Trace) -> - ok; - ({error, {recoverable_error, disconnected}}, _Trace) -> - ok; - (_, _Trace) -> - ?assert(false) + fun(Trace0) -> + ct:pal("trace: ~p", [Trace0]), + Trace = ?of_kind(resource_worker_flush_nack, Trace0), + ?assertMatch([#{result := {error, _}} | _], Trace), + [#{result := {error, Error}} | _] = Trace, + case Error of + {resource_error, _} -> + ok; + {recoverable_error, disconnected} -> + ok; + _ -> + ct:fail("unexpected error: ~p", [Error]) + end end ), ok. @@ -424,10 +437,10 @@ t_write_timeout(Config) -> {ok, _} = create_bridge(Config), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, - Timeout = 10, + Timeout = 1000, emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> - ?assertMatch( - {error, {resource_error, _}}, + ?assertError( + timeout, query_resource(Config, {send_message, SentData, [], Timeout}) ) end), @@ -443,7 +456,7 @@ t_simple_sql_query(Config) -> BatchSize = ?config(batch_size, Config), IsBatch = BatchSize > 1, case IsBatch of - true -> ?assertEqual({error, batch_select_not_implemented}, Result); + true -> ?assertEqual({error, {unrecoverable_error, batch_select_not_implemented}}, Result); false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result) end, ok. @@ -459,10 +472,16 @@ t_missing_data(Config) -> case IsBatch of true -> ?assertMatch( - {error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result + {error, + {unrecoverable_error, + {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}}, + Result ); false -> - ?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result) + ?assertMatch( + {error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}, + Result + ) end, ok. @@ -476,8 +495,10 @@ t_bad_sql_parameter(Config) -> BatchSize = ?config(batch_size, Config), IsBatch = BatchSize > 1, case IsBatch of - true -> ?assertEqual({error, invalid_request}, Result); - false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result) + true -> + ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); + false -> + ?assertEqual({error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}, Result) end, ok. @@ -491,8 +512,8 @@ t_unprepared_statement_query(Config) -> BatchSize = ?config(batch_size, Config), IsBatch = BatchSize > 1, case IsBatch of - true -> ?assertEqual({error, invalid_request}, Result); - false -> ?assertEqual({error, prepared_statement_invalid}, Result) + true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); + false -> ?assertEqual({error, {unrecoverable_error, prepared_statement_invalid}}, Result) end, ok. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl index c2ff6fa8f..f39ecc1dc 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_pgsql_SUITE.erl @@ -191,6 +191,7 @@ pgsql_config(BridgeType, Config) -> " password = ~p\n" " sql = ~p\n" " resource_opts = {\n" + " request_timeout = 500ms\n" " batch_size = ~b\n" " query_mode = ~s\n" " }\n" @@ -415,20 +416,32 @@ t_write_failure(Config) -> ProxyName = ?config(proxy_name, Config), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), + QueryMode = ?config(query_mode, Config), {ok, _} = create_bridge(Config), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, ?check_trace( emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> - send_message(Config, SentData) + case QueryMode of + sync -> + ?assertError(timeout, send_message(Config, SentData)); + async -> + send_message(Config, SentData) + end end), - fun - ({error, {resource_error, _}}, _Trace) -> - ok; - ({error, {recoverable_error, disconnected}}, _Trace) -> - ok; - (_, _Trace) -> - ?assert(false) + fun(Trace0) -> + ct:pal("trace: ~p", [Trace0]), + Trace = ?of_kind(resource_worker_flush_nack, Trace0), + ?assertMatch([#{result := {error, _}} | _], Trace), + [#{result := {error, Error}} | _] = Trace, + case Error of + {resource_error, _} -> + ok; + disconnected -> + ok; + _ -> + ct:fail("unexpected error: ~p", [Error]) + end end ), ok. @@ -442,12 +455,9 @@ t_write_timeout(Config) -> {ok, _} = create_bridge(Config), Val = integer_to_binary(erlang:unique_integer()), SentData = #{payload => Val, timestamp => 1668602148000}, - Timeout = 10, + Timeout = 1000, emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> - ?assertMatch( - {error, {resource_error, _}}, - query_resource(Config, {send_message, SentData, [], Timeout}) - ) + ?assertError(timeout, query_resource(Config, {send_message, SentData, [], Timeout})) end), ok. @@ -459,7 +469,7 @@ t_simple_sql_query(Config) -> Request = {sql, <<"SELECT count(1) AS T">>}, Result = query_resource(Config, Request), case ?config(enable_batch, Config) of - true -> ?assertEqual({error, batch_prepare_not_implemented}, Result); + true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); false -> ?assertMatch({ok, _, [{1}]}, Result) end, ok. @@ -471,7 +481,8 @@ t_missing_data(Config) -> ), Result = send_message(Config, #{}), ?assertMatch( - {error, {error, error, <<"23502">>, not_null_violation, _, _}}, Result + {error, {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}, + Result ), ok. @@ -484,10 +495,10 @@ t_bad_sql_parameter(Config) -> Result = query_resource(Config, Request), case ?config(enable_batch, Config) of true -> - ?assertEqual({error, invalid_request}, Result); + ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); false -> ?assertMatch( - {error, {resource_error, _}}, Result + {error, {unrecoverable_error, _}}, Result ) end, ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index f966f56c6..0d21b381e 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -56,13 +56,13 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c #{points => Points, batch => false, mode => sync} ), do_query(InstId, Client, Points); - {error, ErrorPoints} = Err -> + {error, ErrorPoints} -> ?tp( influxdb_connector_send_query_error, #{batch => false, mode => sync, error => ErrorPoints} ), log_error_points(InstId, ErrorPoints), - Err + {error, {unrecoverable_error, ErrorPoints}} end. %% Once a Batched Data trans to points failed. @@ -80,7 +80,7 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client influxdb_connector_send_query_error, #{batch => true, mode => sync, error => Reason} ), - {error, Reason} + {error, {unrecoverable_error, Reason}} end. on_query_async( @@ -123,7 +123,7 @@ on_batch_query_async( influxdb_connector_send_query_error, #{batch => true, mode => async, error => Reason} ), - {error, Reason} + {error, {unrecoverable_error, Reason}} end. on_get_status(_InstId, #{client := Client}) ->