fix(buffer_worker): allow signalling unrecoverable errors

This commit is contained in:
Thales Macedo Garitezi 2023-01-17 16:22:41 -03:00
parent 4ed7bff33f
commit 087b667263
12 changed files with 279 additions and 173 deletions

View File

@ -82,7 +82,7 @@ authorize(
} = Config } = Config
) -> ) ->
Request = generate_request(PubSub, Topic, Client, Config), Request = generate_request(PubSub, Topic, Client, Config),
try emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of case emqx_resource:simple_sync_query(ResourceID, {Method, Request, RequestTimeout}) of
{ok, 204, _Headers} -> {ok, 204, _Headers} ->
{matched, allow}; {matched, allow};
{ok, 200, Headers, Body} -> {ok, 200, Headers, Body} ->
@ -112,16 +112,6 @@ authorize(
reason => Reason reason => Reason
}), }),
ignore ignore
catch
error:timeout ->
Reason = timeout,
?tp(authz_http_request_failure, #{error => Reason}),
?SLOG(error, #{
msg => "http_server_query_failed",
resource => ResourceID,
reason => Reason
}),
ignore
end. end.
log_nomtach_msg(Status, Headers, Body) -> log_nomtach_msg(Status, Headers, Body) ->

View File

@ -172,7 +172,7 @@ t_response_handling(_Config) ->
[ [
#{ #{
?snk_kind := authz_http_request_failure, ?snk_kind := authz_http_request_failure,
error := timeout error := {recoverable_error, econnrefused}
} }
], ],
?of_kind(authz_http_request_failure, Trace) ?of_kind(authz_http_request_failure, Trace)

View File

@ -192,7 +192,7 @@ on_batch_query(
{Key, _} -> {Key, _} ->
case maps:get(Key, Inserts, undefined) of case maps:get(Key, Inserts, undefined) of
undefined -> undefined ->
{error, batch_select_not_implemented}; {error, {unrecoverable_error, batch_select_not_implemented}};
InsertSQL -> InsertSQL ->
Tokens = maps:get(Key, ParamsTokens), Tokens = maps:get(Key, ParamsTokens),
on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State) on_batch_insert(InstId, BatchReq, InsertSQL, Tokens, State)
@ -200,7 +200,7 @@ on_batch_query(
Request -> Request ->
LogMeta = #{connector => InstId, first_request => Request, state => State}, LogMeta = #{connector => InstId, first_request => Request, state => State},
?SLOG(error, LogMeta#{msg => "invalid request"}), ?SLOG(error, LogMeta#{msg => "invalid request"}),
{error, invalid_request} {error, {unrecoverable_error, invalid_request}}
end. end.
mysql_function(sql) -> mysql_function(sql) ->
@ -267,7 +267,7 @@ init_prepare(State = #{prepare_statement := Prepares, poolname := PoolName}) ->
maybe_prepare_sql(SQLOrKey, Prepares, PoolName) -> maybe_prepare_sql(SQLOrKey, Prepares, PoolName) ->
case maps:is_key(SQLOrKey, Prepares) of case maps:is_key(SQLOrKey, Prepares) of
true -> prepare_sql(Prepares, PoolName); true -> prepare_sql(Prepares, PoolName);
false -> {error, prepared_statement_invalid} false -> {error, {unrecoverable_error, prepared_statement_invalid}}
end. end.
prepare_sql(Prepares, PoolName) when is_map(Prepares) -> prepare_sql(Prepares, PoolName) when is_map(Prepares) ->
@ -465,12 +465,12 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) ->
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
), ),
{error, {recoverable_error, Reason}}; {error, {recoverable_error, Reason}};
{error, Reason} = Result -> {error, Reason} ->
?SLOG( ?SLOG(
error, error,
LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason}
), ),
Result; {error, {unrecoverable_error, Reason}};
Result -> Result ->
?tp( ?tp(
mysql_connector_query_return, mysql_connector_query_return,
@ -483,5 +483,5 @@ do_sql_query(SQLFunc, Conn, SQLOrKey, Data, Timeout, LogMeta) ->
error, error,
LogMeta#{msg => "mysql_connector_invalid_params", params => Data} LogMeta#{msg => "mysql_connector_invalid_params", params => Data}
), ),
{error, {invalid_params, Data}} {error, {unrecoverable_error, {invalid_params, Data}}}
end. end.

View File

@ -153,7 +153,8 @@ on_query(
}), }),
Type = pgsql_query_type(TypeOrKey), Type = pgsql_query_type(TypeOrKey),
{NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State), {NameOrSQL2, Data} = proc_sql_params(TypeOrKey, NameOrSQL, Params, State),
on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data). Res = on_sql_query(InstId, PoolName, Type, NameOrSQL2, Data),
handle_result(Res).
pgsql_query_type(sql) -> pgsql_query_type(sql) ->
query; query;
@ -182,23 +183,17 @@ on_batch_query(
msg => "batch prepare not implemented" msg => "batch prepare not implemented"
}, },
?SLOG(error, Log), ?SLOG(error, Log),
{error, batch_prepare_not_implemented}; {error, {unrecoverable_error, batch_prepare_not_implemented}};
TokenList -> TokenList ->
{_, Datas} = lists:unzip(BatchReq), {_, Datas} = lists:unzip(BatchReq),
Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas], Datas2 = [emqx_plugin_libs_rule:proc_sql(TokenList, Data) || Data <- Datas],
St = maps:get(BinKey, Sts), St = maps:get(BinKey, Sts),
{_Column, Results} = on_sql_query(InstId, PoolName, execute_batch, St, Datas2), case on_sql_query(InstId, PoolName, execute_batch, St, Datas2) of
%% this local function only suits for the result of batch insert {error, Error} ->
TransResult = fun {error, Error};
Trans([{ok, Count} | T], Acc) -> {_Column, Results} ->
Trans(T, Acc + Count); handle_batch_result(Results, 0)
Trans([{error, _} = Error | _], _Acc) -> end
Error;
Trans([], Acc) ->
{ok, Acc}
end,
TransResult(Results, 0)
end; end;
_ -> _ ->
Log = #{ Log = #{
@ -208,7 +203,7 @@ on_batch_query(
msg => "invalid request" msg => "invalid request"
}, },
?SLOG(error, Log), ?SLOG(error, Log),
{error, invalid_request} {error, {unrecoverable_error, invalid_request}}
end. end.
proc_sql_params(query, SQLOrKey, Params, _State) -> proc_sql_params(query, SQLOrKey, Params, _State) ->
@ -225,24 +220,38 @@ proc_sql_params(TypeOrKey, SQLOrData, Params, #{params_tokens := ParamsTokens})
end. end.
on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) -> on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
Result = ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover), try ecpool:pick_and_do(PoolName, {?MODULE, Type, [NameOrSQL, Data]}, no_handover) of
case Result of {error, Reason} = Result ->
{error, Reason} -> ?tp(
pgsql_connector_query_return,
#{error => Reason}
),
?SLOG(error, #{ ?SLOG(error, #{
msg => "postgresql connector do sql query failed", msg => "postgresql connector do sql query failed",
connector => InstId, connector => InstId,
type => Type, type => Type,
sql => NameOrSQL, sql => NameOrSQL,
reason => Reason reason => Reason
}); }),
_ -> Result;
Result ->
?tp( ?tp(
pgsql_connector_query_return, pgsql_connector_query_return,
#{result => Result} #{result => Result}
), ),
ok Result
end, catch
Result. error:function_clause:Stacktrace ->
?SLOG(error, #{
msg => "postgresql connector do sql query failed",
connector => InstId,
type => Type,
sql => NameOrSQL,
reason => function_clause,
stacktrace => Stacktrace
}),
{error, {unrecoverable_error, invalid_request}}
end.
on_get_status(_InstId, #{poolname := Pool} = State) -> on_get_status(_InstId, #{poolname := Pool} = State) ->
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
@ -407,3 +416,15 @@ to_bin(Bin) when is_binary(Bin) ->
Bin; Bin;
to_bin(Atom) when is_atom(Atom) -> to_bin(Atom) when is_atom(Atom) ->
erlang:atom_to_binary(Atom). erlang:atom_to_binary(Atom).
handle_result({error, Error}) ->
{error, {unrecoverable_error, Error}};
handle_result(Res) ->
Res.
handle_batch_result([{ok, Count} | Rest], Acc) ->
handle_batch_result(Rest, Acc + Count);
handle_batch_result([{error, Error} | _Rest], _Acc) ->
{error, {unrecoverable_error, Error}};
handle_batch_result([], Acc) ->
{ok, Acc}.

View File

@ -207,11 +207,23 @@ do_query(InstId, Query, #{poolname := PoolName, type := Type} = State) ->
connector => InstId, connector => InstId,
query => Query, query => Query,
reason => Reason reason => Reason
}); }),
case is_unrecoverable_error(Reason) of
true ->
{error, {unrecoverable_error, Reason}};
false ->
Result
end;
_ -> _ ->
ok Result
end, end.
Result.
is_unrecoverable_error(Results) when is_list(Results) ->
lists:any(fun is_unrecoverable_error/1, Results);
is_unrecoverable_error({error, <<"ERR unknown command ", _/binary>>}) ->
true;
is_unrecoverable_error(_) ->
false.
extract_eredis_cluster_workers(PoolName) -> extract_eredis_cluster_workers(PoolName) ->
lists:flatten([ lists:flatten([

View File

@ -128,8 +128,12 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
emqx_resource:query(PoolName, {cmds, [RedisCommand, RedisCommand]}) emqx_resource:query(PoolName, {cmds, [RedisCommand, RedisCommand]})
), ),
?assertMatch( ?assertMatch(
{error, [{ok, <<"PONG">>}, {error, _}]}, {error, {unrecoverable_error, [{ok, <<"PONG">>}, {error, _}]}},
emqx_resource:query(PoolName, {cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]}) emqx_resource:query(
PoolName,
{cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]},
#{timeout => 500}
)
), ),
?assertEqual(ok, emqx_resource:stop(PoolName)), ?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail % Resource will be listed still, but state will be changed and healthcheck will fail

View File

@ -79,8 +79,7 @@
query/2, query/2,
query/3, query/3,
%% query the instance without batching and queuing messages. %% query the instance without batching and queuing messages.
simple_sync_query/2, simple_sync_query/2
simple_async_query/3
]). ]).
%% Direct calls to the callback module %% Direct calls to the callback module
@ -278,10 +277,6 @@ query(ResId, Request, Opts) ->
simple_sync_query(ResId, Request) -> simple_sync_query(ResId, Request) ->
emqx_resource_worker:simple_sync_query(ResId, Request). emqx_resource_worker:simple_sync_query(ResId, Request).
-spec simple_async_query(resource_id(), Request :: term(), reply_fun()) -> Result :: term().
simple_async_query(ResId, Request, ReplyFun) ->
emqx_resource_worker:simple_async_query(ResId, Request, ReplyFun).
-spec start(resource_id()) -> ok | {error, Reason :: term()}. -spec start(resource_id()) -> ok | {error, Reason :: term()}.
start(ResId) -> start(ResId) ->
start(ResId, #{}). start(ResId, #{}).

View File

@ -38,8 +38,7 @@
]). ]).
-export([ -export([
simple_sync_query/2, simple_sync_query/2
simple_async_query/3
]). ]).
-export([ -export([
@ -53,7 +52,7 @@
-export([queue_item_marshaller/1, estimate_size/1]). -export([queue_item_marshaller/1, estimate_size/1]).
-export([reply_after_query/7, batch_reply_after_query/7]). -export([reply_after_query/8, batch_reply_after_query/8]).
-export([clear_disk_queue_dir/2]). -export([clear_disk_queue_dir/2]).
@ -121,7 +120,7 @@ simple_sync_query(Id, Request) ->
%% would mess up the metrics anyway. `undefined' is ignored by %% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'. %% `emqx_resource_metrics:*_shift/3'.
Index = undefined, Index = undefined,
QueryOpts = #{perform_inflight_capacity_check => false}, QueryOpts = #{simple_query => true},
emqx_resource_metrics:matched_inc(Id), emqx_resource_metrics:matched_inc(Id),
Ref = make_message_ref(), Ref = make_message_ref(),
Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts), Result = call_query(sync, Id, Index, Ref, ?QUERY(self(), Request, false), QueryOpts),
@ -129,23 +128,6 @@ simple_sync_query(Id, Request) ->
_ = handle_query_result(Id, Result, HasBeenSent), _ = handle_query_result(Id, Result, HasBeenSent),
Result. Result.
-spec simple_async_query(id(), request(), reply_fun()) -> Result :: term().
simple_async_query(Id, Request, ReplyFun) ->
%% Note: since calling this function implies in bypassing the
%% buffer workers, and each buffer worker index is used when
%% collecting gauge metrics, we use this dummy index. If this
%% call ends up calling buffering functions, that's a bug and
%% would mess up the metrics anyway. `undefined' is ignored by
%% `emqx_resource_metrics:*_shift/3'.
Index = undefined,
QueryOpts = #{perform_inflight_capacity_check => false},
emqx_resource_metrics:matched_inc(Id),
Ref = make_message_ref(),
Result = call_query(async, Id, Index, Ref, ?QUERY(ReplyFun, Request, false), QueryOpts),
HasBeenSent = false,
_ = handle_query_result(Id, Result, HasBeenSent),
Result.
-spec block(pid()) -> ok. -spec block(pid()) -> ok.
block(ServerRef) -> block(ServerRef) ->
gen_statem:cast(ServerRef, block). gen_statem:cast(ServerRef, block).
@ -334,15 +316,15 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
resume_interval := ResumeT resume_interval := ResumeT
} = Data0, } = Data0,
?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}), ?tp(resource_worker_retry_inflight, #{query_or_batch => QueryOrBatch, ref => Ref}),
QueryOpts = #{}, QueryOpts = #{simple_query => false},
Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts), Result = call_query(sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
ReplyResult = ReplyResult =
case QueryOrBatch of case QueryOrBatch of
?QUERY(From, CoreReq, HasBeenSent) -> ?QUERY(From, CoreReq, HasBeenSent) ->
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
reply_caller_defer_metrics(Id, Reply); reply_caller_defer_metrics(Id, Reply, QueryOpts);
[?QUERY(_, _, _) | _] = Batch -> [?QUERY(_, _, _) | _] = Batch ->
batch_reply_caller_defer_metrics(Id, Result, Batch) batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
end, end,
case ReplyResult of case ReplyResult of
%% Send failed because resource is down %% Send failed because resource is down
@ -478,10 +460,10 @@ do_flush(
} = Data0, } = Data0,
%% unwrap when not batching (i.e., batch size == 1) %% unwrap when not batching (i.e., batch size == 1)
[?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch,
QueryOpts = #{inflight_tid => InflightTID}, QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(configured, Id, Index, Ref, Request, QueryOpts), Result = call_query(configured, Id, Index, Ref, Request, QueryOpts),
Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), Reply = ?REPLY(From, CoreReq, HasBeenSent, Result),
case reply_caller(Id, Reply) of case reply_caller(Id, Reply, QueryOpts) of
%% Failed; remove the request from the queue, as we cannot pop %% Failed; remove the request from the queue, as we cannot pop
%% from it again, but we'll retry it using the inflight table. %% from it again, but we'll retry it using the inflight table.
nack -> nack ->
@ -517,7 +499,15 @@ do_flush(
%% calls the corresponding callback function. Also, we %% calls the corresponding callback function. Also, we
%% must ensure the async worker is being monitored for %% must ensure the async worker is being monitored for
%% such requests. %% such requests.
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), IsUnrecoverableError = is_unrecoverable_error(Result),
case is_async_return(Result) of
true when IsUnrecoverableError ->
ack_inflight(InflightTID, Ref, Id, Index);
true ->
ok;
false ->
ack_inflight(InflightTID, Ref, Id, Index)
end,
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@ -548,9 +538,9 @@ do_flush(Data0, #{
batch_size := BatchSize, batch_size := BatchSize,
inflight_tid := InflightTID inflight_tid := InflightTID
} = Data0, } = Data0,
QueryOpts = #{inflight_tid => InflightTID}, QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts), Result = call_query(configured, Id, Index, Ref, Batch, QueryOpts),
case batch_reply_caller(Id, Result, Batch) of case batch_reply_caller(Id, Result, Batch, QueryOpts) of
%% Failed; remove the request from the queue, as we cannot pop %% Failed; remove the request from the queue, as we cannot pop
%% from it again, but we'll retry it using the inflight table. %% from it again, but we'll retry it using the inflight table.
nack -> nack ->
@ -586,7 +576,15 @@ do_flush(Data0, #{
%% calls the corresponding callback function. Also, we %% calls the corresponding callback function. Also, we
%% must ensure the async worker is being monitored for %% must ensure the async worker is being monitored for
%% such requests. %% such requests.
is_async(Id) orelse ack_inflight(InflightTID, Ref, Id, Index), IsUnrecoverableError = is_unrecoverable_error(Result),
case is_async_return(Result) of
true when IsUnrecoverableError ->
ack_inflight(InflightTID, Ref, Id, Index);
true ->
ok;
false ->
ack_inflight(InflightTID, Ref, Id, Index)
end,
{Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result), {Data1, WorkerMRef} = ensure_async_worker_monitored(Data0, Result),
store_async_worker_reference(InflightTID, Ref, WorkerMRef), store_async_worker_reference(InflightTID, Ref, WorkerMRef),
emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)),
@ -609,16 +607,16 @@ do_flush(Data0, #{
end end
end. end.
batch_reply_caller(Id, BatchResult, Batch) -> batch_reply_caller(Id, BatchResult, Batch, QueryOpts) ->
{ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch), {ShouldBlock, PostFn} = batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts),
PostFn(), PostFn(),
ShouldBlock. ShouldBlock.
batch_reply_caller_defer_metrics(Id, BatchResult, Batch) -> batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
{ShouldAck, PostFns} = {ShouldAck, PostFns} =
lists:foldl( lists:foldl(
fun(Reply, {_ShouldAck, PostFns}) -> fun(Reply, {_ShouldAck, PostFns}) ->
{ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply), {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
{ShouldAck, [PostFn | PostFns]} {ShouldAck, [PostFn | PostFns]}
end, end,
{ack, []}, {ack, []},
@ -629,37 +627,53 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch) ->
PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end, PostFn = fun() -> lists:foreach(fun(F) -> F() end, PostFns) end,
{ShouldAck, PostFn}. {ShouldAck, PostFn}.
reply_caller(Id, Reply) -> reply_caller(Id, Reply, QueryOpts) ->
{ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply), {ShouldAck, PostFn} = reply_caller_defer_metrics(Id, Reply, QueryOpts),
PostFn(), PostFn(),
ShouldAck. ShouldAck.
%% Should only reply to the caller when the decision is final (not %% Should only reply to the caller when the decision is final (not
%% retriable). See comment on `handle_query_result_pure'. %% retriable). See comment on `handle_query_result_pure'.
reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result)) -> reply_caller_defer_metrics(Id, ?REPLY(undefined, _, HasBeenSent, Result), _QueryOpts) ->
handle_query_result_pure(Id, Result, HasBeenSent); handle_query_result_pure(Id, Result, HasBeenSent);
reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result)) when reply_caller_defer_metrics(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), QueryOpts) when
is_function(ReplyFun) is_function(ReplyFun)
-> ->
IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
IsUnrecoverableError = is_unrecoverable_error(Result),
{ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
case {ShouldAck, Result} of case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
{nack, _} -> {ack, {async_return, _}, true, _} ->
apply(ReplyFun, Args ++ [Result]),
ok; ok;
{ack, {async_return, _}} -> {ack, {async_return, _}, false, _} ->
ok; ok;
{ack, _} -> {_, _, _, true} ->
apply(ReplyFun, Args ++ [Result]),
ok;
{nack, _, _, _} ->
ok;
{ack, _, _, _} ->
apply(ReplyFun, Args ++ [Result]), apply(ReplyFun, Args ++ [Result]),
ok ok
end, end,
{ShouldAck, PostFn}; {ShouldAck, PostFn};
reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result)) -> reply_caller_defer_metrics(Id, ?REPLY(From, _, HasBeenSent, Result), QueryOpts) ->
IsSimpleQuery = maps:get(simple_query, QueryOpts, false),
IsUnrecoverableError = is_unrecoverable_error(Result),
{ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent), {ShouldAck, PostFn} = handle_query_result_pure(Id, Result, HasBeenSent),
case {ShouldAck, Result} of case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of
{nack, _} -> {ack, {async_return, _}, true, _} ->
gen_statem:reply(From, Result),
ok; ok;
{ack, {async_return, _}} -> {ack, {async_return, _}, false, _} ->
ok; ok;
{ack, _} -> {_, _, _, true} ->
gen_statem:reply(From, Result),
ok;
{nack, _, _, _} ->
ok;
{ack, _, _, _} ->
gen_statem:reply(From, Result), gen_statem:reply(From, Result),
ok ok
end, end,
@ -679,7 +693,6 @@ handle_query_result(Id, Result, HasBeenSent) ->
handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) -> handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{msg => resource_exception, info => Msg}), ?SLOG(error, #{msg => resource_exception, info => Msg}),
%% inc_sent_failed(Id, HasBeenSent),
ok ok
end, end,
{nack, PostFn}; {nack, PostFn};
@ -704,12 +717,16 @@ handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) ->
handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}),
%% emqx_resource_metrics:dropped_other_inc(Id),
ok ok
end, end,
{nack, PostFn}; {nack, PostFn};
%% TODO: invert this logic: we should differentiate errors that are handle_query_result_pure(Id, {error, {unrecoverable_error, Reason}}, HasBeenSent) ->
%% irrecoverable; all others are deemed recoverable. PostFn = fun() ->
?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
inc_sent_failed(Id, HasBeenSent),
ok
end,
{ack, PostFn};
handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) -> handle_query_result_pure(Id, {error, {recoverable_error, Reason}}, _HasBeenSent) ->
%% the message will be queued in replayq or inflight window, %% the message will be queued in replayq or inflight window,
%% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not
@ -725,6 +742,13 @@ handle_query_result_pure(Id, {error, Reason}, _HasBeenSent) ->
ok ok
end, end,
{nack, PostFn}; {nack, PostFn};
handle_query_result_pure(Id, {async_return, {error, {unrecoverable_error, Reason}}}, HasBeenSent) ->
PostFn = fun() ->
?SLOG(error, #{id => Id, msg => unrecoverable_error, reason => Reason}),
inc_sent_failed(Id, HasBeenSent),
ok
end,
{ack, PostFn};
handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) -> handle_query_result_pure(Id, {async_return, {error, Msg}}, _HasBeenSent) ->
PostFn = fun() -> PostFn = fun() ->
?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}),
@ -799,8 +823,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _) = Query, ResSt
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_query_async, call_query_async,
begin begin
ReplyFun = fun ?MODULE:reply_after_query/7, ReplyFun = fun ?MODULE:reply_after_query/8,
Args = [self(), Id, Index, InflightTID, Ref, Query], Args = [self(), Id, Index, InflightTID, Ref, Query, QueryOpts],
IsRetriable = false, IsRetriable = false,
WorkerMRef = undefined, WorkerMRef = undefined,
InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef), InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, WorkerMRef),
@ -824,8 +848,8 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
?APPLY_RESOURCE( ?APPLY_RESOURCE(
call_batch_query_async, call_batch_query_async,
begin begin
ReplyFun = fun ?MODULE:batch_reply_after_query/7, ReplyFun = fun ?MODULE:batch_reply_after_query/8,
ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch]}, ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, InflightTID, Ref, Batch, QueryOpts]},
Requests = [Request || ?QUERY(_From, Request, _) <- Batch], Requests = [Request || ?QUERY(_From, Request, _) <- Batch],
IsRetriable = false, IsRetriable = false,
WorkerMRef = undefined, WorkerMRef = undefined,
@ -837,11 +861,15 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _) | _] = Batch, ResSt
Batch Batch
). ).
reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), Result) -> reply_after_query(
Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBeenSent), QueryOpts, Result
) ->
%% NOTE: 'inflight' is the count of messages that were sent async %% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
{Action, PostFn} = reply_caller_defer_metrics(Id, ?REPLY(From, Request, HasBeenSent, Result)), {Action, PostFn} = reply_caller_defer_metrics(
Id, ?REPLY(From, Request, HasBeenSent, Result), QueryOpts
),
case Action of case Action of
nack -> nack ->
%% Keep retrying. %% Keep retrying.
@ -867,11 +895,11 @@ reply_after_query(Pid, Id, Index, InflightTID, Ref, ?QUERY(From, Request, HasBee
ok ok
end. end.
batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, Result) -> batch_reply_after_query(Pid, Id, Index, InflightTID, Ref, Batch, QueryOpts, Result) ->
%% NOTE: 'inflight' is the count of messages that were sent async %% NOTE: 'inflight' is the count of messages that were sent async
%% but received no ACK, NOT the number of messages queued in the %% but received no ACK, NOT the number of messages queued in the
%% inflight window. %% inflight window.
{Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch), {Action, PostFn} = batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts),
case Action of case Action of
nack -> nack ->
%% Keep retrying. %% Keep retrying.
@ -1118,12 +1146,18 @@ do_cancel_inflight_item(Data, Ref) ->
IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}), IsAcked andalso ets:update_counter(InflightTID, ?SIZE_REF, {2, -Count, 0, 0}),
emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)), emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(InflightTID)),
Result = {error, interrupted}, Result = {error, interrupted},
_ = batch_reply_caller(Id, Result, Batch), QueryOpts = #{simple_query => false},
_ = batch_reply_caller(Id, Result, Batch, QueryOpts),
?tp(resource_worker_cancelled_inflight, #{ref => Ref}), ?tp(resource_worker_cancelled_inflight, #{ref => Ref}),
ok. ok.
%%============================================================================== %%==============================================================================
inc_sent_failed(Id, _HasBeenSent = true) ->
emqx_resource_metrics:retried_failed_inc(Id);
inc_sent_failed(Id, _HasBeenSent) ->
emqx_resource_metrics:failed_inc(Id).
inc_sent_success(Id, _HasBeenSent = true) -> inc_sent_success(Id, _HasBeenSent = true) ->
emqx_resource_metrics:retried_success_inc(Id); emqx_resource_metrics:retried_success_inc(Id);
inc_sent_success(Id, _HasBeenSent) -> inc_sent_success(Id, _HasBeenSent) ->
@ -1204,10 +1238,14 @@ mark_as_sent(?QUERY(From, Req, _)) ->
HasBeenSent = true, HasBeenSent = true,
?QUERY(From, Req, HasBeenSent). ?QUERY(From, Req, HasBeenSent).
is_async(ResourceId) -> is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
case emqx_resource_manager:ets_lookup(ResourceId) of true;
{ok, _Group, #{query_mode := QM, callback_mode := CM}} -> is_unrecoverable_error({async_return, Result}) ->
call_mode(QM, CM) =:= async; is_unrecoverable_error(Result);
_ -> is_unrecoverable_error(_) ->
false false.
end.
is_async_return({async_return, _}) ->
true;
is_async_return(_) ->
false.

View File

@ -778,15 +778,25 @@ t_bad_timestamp(Config) ->
{async, false} -> {async, false} ->
?assertEqual(ok, Return), ?assertEqual(ok, Return),
?assertMatch( ?assertMatch(
[#{error := [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}], [
#{
error := [
{error, {bad_timestamp, [<<"bad_timestamp">>]}}
]
}
],
?of_kind(influxdb_connector_send_query_error, Trace) ?of_kind(influxdb_connector_send_query_error, Trace)
); );
{sync, false} -> {sync, false} ->
?assertEqual( ?assertEqual(
{error, [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}, Return {error,
{unrecoverable_error, [
{error, {bad_timestamp, [<<"bad_timestamp">>]}}
]}},
Return
); );
{sync, true} -> {sync, true} ->
?assertEqual({error, points_trans_failed}, Return) ?assertEqual({error, {unrecoverable_error, points_trans_failed}}, Return)
end, end,
ok ok
end end
@ -894,11 +904,19 @@ t_write_failure(Config) ->
}, },
?check_trace( ?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
send_message(Config, SentData)
end),
fun(Result, _Trace) ->
case QueryMode of case QueryMode of
sync -> sync ->
?assertError(timeout, send_message(Config, SentData));
async ->
?assertEqual(ok, send_message(Config, SentData))
end
end),
fun(Trace0) ->
case QueryMode of
sync ->
Trace = ?of_kind(resource_worker_flush_nack, Trace0),
?assertMatch([_ | _], Trace),
[#{result := Result} | _] = Trace,
?assert( ?assert(
{error, {error, {closed, "The connection was lost."}}} =:= Result orelse {error, {error, {closed, "The connection was lost."}}} =:= Result orelse
{error, {error, closed}} =:= Result orelse {error, {error, closed}} =:= Result orelse
@ -906,7 +924,7 @@ t_write_failure(Config) ->
#{got => Result} #{got => Result}
); );
async -> async ->
?assertEqual(ok, Result) ok
end, end,
ok ok
end end
@ -938,11 +956,7 @@ t_missing_field(Config) ->
begin begin
emqx:publish(Msg0), emqx:publish(Msg0),
emqx:publish(Msg1), emqx:publish(Msg1),
NEvents = NEvents = 1,
case IsBatch of
true -> 1;
false -> 2
end,
{ok, _} = {ok, _} =
snabbkaffe:block_until( snabbkaffe:block_until(
?match_n_events(NEvents, #{ ?match_n_events(NEvents, #{
@ -964,7 +978,7 @@ t_missing_field(Config) ->
); );
false -> false ->
?assertMatch( ?assertMatch(
[#{error := [{error, no_fields}]}, #{error := [{error, no_fields}]} | _], [#{error := [{error, no_fields}]} | _],
?of_kind(influxdb_connector_send_query_error, Trace) ?of_kind(influxdb_connector_send_query_error, Trace)
) )
end, end,

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. % Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_ee_bridge_mysql_SUITE). -module(emqx_ee_bridge_mysql_SUITE).
@ -170,6 +170,7 @@ mysql_config(BridgeType, Config) ->
" password = ~p\n" " password = ~p\n"
" sql = ~p\n" " sql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"
@ -397,20 +398,32 @@ t_write_failure(Config) ->
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()), Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace( ?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
case QueryMode of
sync ->
?assertError(timeout, send_message(Config, SentData));
async ->
send_message(Config, SentData) send_message(Config, SentData)
end
end), end),
fun fun(Trace0) ->
({error, {resource_error, _}}, _Trace) -> ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(resource_worker_flush_nack, Trace0),
?assertMatch([#{result := {error, _}} | _], Trace),
[#{result := {error, Error}} | _] = Trace,
case Error of
{resource_error, _} ->
ok; ok;
({error, {recoverable_error, disconnected}}, _Trace) -> {recoverable_error, disconnected} ->
ok; ok;
(_, _Trace) -> _ ->
?assert(false) ct:fail("unexpected error: ~p", [Error])
end
end end
), ),
ok. ok.
@ -424,10 +437,10 @@ t_write_timeout(Config) ->
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()), Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 10, Timeout = 1000,
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch( ?assertError(
{error, {resource_error, _}}, timeout,
query_resource(Config, {send_message, SentData, [], Timeout}) query_resource(Config, {send_message, SentData, [], Timeout})
) )
end), end),
@ -443,7 +456,7 @@ t_simple_sql_query(Config) ->
BatchSize = ?config(batch_size, Config), BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1, IsBatch = BatchSize > 1,
case IsBatch of case IsBatch of
true -> ?assertEqual({error, batch_select_not_implemented}, Result); true -> ?assertEqual({error, {unrecoverable_error, batch_select_not_implemented}}, Result);
false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result) false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
end, end,
ok. ok.
@ -459,10 +472,16 @@ t_missing_data(Config) ->
case IsBatch of case IsBatch of
true -> true ->
?assertMatch( ?assertMatch(
{error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result {error,
{unrecoverable_error,
{1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}},
Result
); );
false -> false ->
?assertMatch({error, {1048, _, <<"Column 'arrived' cannot be null">>}}, Result) ?assertMatch(
{error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}},
Result
)
end, end,
ok. ok.
@ -476,8 +495,10 @@ t_bad_sql_parameter(Config) ->
BatchSize = ?config(batch_size, Config), BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1, IsBatch = BatchSize > 1,
case IsBatch of case IsBatch of
true -> ?assertEqual({error, invalid_request}, Result); true ->
false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result) ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false ->
?assertEqual({error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}, Result)
end, end,
ok. ok.
@ -491,8 +512,8 @@ t_unprepared_statement_query(Config) ->
BatchSize = ?config(batch_size, Config), BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1, IsBatch = BatchSize > 1,
case IsBatch of case IsBatch of
true -> ?assertEqual({error, invalid_request}, Result); true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false -> ?assertEqual({error, prepared_statement_invalid}, Result) false -> ?assertEqual({error, {unrecoverable_error, prepared_statement_invalid}}, Result)
end, end,
ok. ok.

View File

@ -191,6 +191,7 @@ pgsql_config(BridgeType, Config) ->
" password = ~p\n" " password = ~p\n"
" sql = ~p\n" " sql = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" query_mode = ~s\n" " query_mode = ~s\n"
" }\n" " }\n"
@ -415,20 +416,32 @@ t_write_failure(Config) ->
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()), Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace( ?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
case QueryMode of
sync ->
?assertError(timeout, send_message(Config, SentData));
async ->
send_message(Config, SentData) send_message(Config, SentData)
end
end), end),
fun fun(Trace0) ->
({error, {resource_error, _}}, _Trace) -> ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(resource_worker_flush_nack, Trace0),
?assertMatch([#{result := {error, _}} | _], Trace),
[#{result := {error, Error}} | _] = Trace,
case Error of
{resource_error, _} ->
ok; ok;
({error, {recoverable_error, disconnected}}, _Trace) -> disconnected ->
ok; ok;
(_, _Trace) -> _ ->
?assert(false) ct:fail("unexpected error: ~p", [Error])
end
end end
), ),
ok. ok.
@ -442,12 +455,9 @@ t_write_timeout(Config) ->
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()), Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 10, Timeout = 1000,
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch( ?assertError(timeout, query_resource(Config, {send_message, SentData, [], Timeout}))
{error, {resource_error, _}},
query_resource(Config, {send_message, SentData, [], Timeout})
)
end), end),
ok. ok.
@ -459,7 +469,7 @@ t_simple_sql_query(Config) ->
Request = {sql, <<"SELECT count(1) AS T">>}, Request = {sql, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request), Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of case ?config(enable_batch, Config) of
true -> ?assertEqual({error, batch_prepare_not_implemented}, Result); true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false -> ?assertMatch({ok, _, [{1}]}, Result) false -> ?assertMatch({ok, _, [{1}]}, Result)
end, end,
ok. ok.
@ -471,7 +481,8 @@ t_missing_data(Config) ->
), ),
Result = send_message(Config, #{}), Result = send_message(Config, #{}),
?assertMatch( ?assertMatch(
{error, {error, error, <<"23502">>, not_null_violation, _, _}}, Result {error, {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}},
Result
), ),
ok. ok.
@ -484,10 +495,10 @@ t_bad_sql_parameter(Config) ->
Result = query_resource(Config, Request), Result = query_resource(Config, Request),
case ?config(enable_batch, Config) of case ?config(enable_batch, Config) of
true -> true ->
?assertEqual({error, invalid_request}, Result); ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false -> false ->
?assertMatch( ?assertMatch(
{error, {resource_error, _}}, Result {error, {unrecoverable_error, _}}, Result
) )
end, end,
ok. ok.

View File

@ -56,13 +56,13 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
#{points => Points, batch => false, mode => sync} #{points => Points, batch => false, mode => sync}
), ),
do_query(InstId, Client, Points); do_query(InstId, Client, Points);
{error, ErrorPoints} = Err -> {error, ErrorPoints} ->
?tp( ?tp(
influxdb_connector_send_query_error, influxdb_connector_send_query_error,
#{batch => false, mode => sync, error => ErrorPoints} #{batch => false, mode => sync, error => ErrorPoints}
), ),
log_error_points(InstId, ErrorPoints), log_error_points(InstId, ErrorPoints),
Err {error, {unrecoverable_error, ErrorPoints}}
end. end.
%% Once a Batched Data trans to points failed. %% Once a Batched Data trans to points failed.
@ -80,7 +80,7 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
influxdb_connector_send_query_error, influxdb_connector_send_query_error,
#{batch => true, mode => sync, error => Reason} #{batch => true, mode => sync, error => Reason}
), ),
{error, Reason} {error, {unrecoverable_error, Reason}}
end. end.
on_query_async( on_query_async(
@ -123,7 +123,7 @@ on_batch_query_async(
influxdb_connector_send_query_error, influxdb_connector_send_query_error,
#{batch => true, mode => async, error => Reason} #{batch => true, mode => async, error => Reason}
), ),
{error, Reason} {error, {unrecoverable_error, Reason}}
end. end.
on_get_status(_InstId, #{client := Client}) -> on_get_status(_InstId, #{client := Client}) ->