refactor: rename the error return resource_down -> recoverable_error

This commit is contained in:
Shawn 2022-08-24 22:18:00 +08:00
parent 0c9d12fd07
commit 6b0ccfbc43
7 changed files with 33 additions and 27 deletions

View File

@ -77,7 +77,7 @@
ok
| {ok, term()}
| {error, term()}
| {resource_down, term()}.
| {recoverable_error, term()}.
-define(WORKER_POOL_SIZE, 16).

View File

@ -128,7 +128,7 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
ok = emqx_metrics_worker:create_metrics(
?RES_METRICS,
ResId,
[matched, success, failed, exception, resource_down],
[matched, success, failed, exception],
[matched]
),
ok = emqx_resource_worker_sup:start_workers(ResId, Opts),

View File

@ -355,13 +355,14 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) ->
handle_query_result(Id, {error, _}, BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
BlockWorker;
handle_query_result(Id, {resource_down, _}, _BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down),
handle_query_result(Id, {recoverable_error, _}, _BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
true;
handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) ->
true;
handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) ->
true;
handle_query_result(Id, {async_return, {error, _}}, BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, failed),
BlockWorker;
handle_query_result(_Id, {async_return, ok}, BlockWorker) ->
BlockWorker;
handle_query_result(Id, Result, BlockWorker) ->
@ -390,8 +391,8 @@ call_query(QM0, Id, Query, QueryOpts) ->
-define(APPLY_RESOURCE(EXPR, REQ),
try
%% if the callback module (connector) wants to return an error that
%% makes the current resource goes into the `error` state, it should
%% return `{resource_down, Reason}`
%% makes the current resource goes into the `blocked` state, it should
%% return `{recoverable_error, Reason}`
EXPR
catch
ERR:REASON:STACKTRACE ->

View File

@ -96,7 +96,7 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) ->
Pid ! {From, {inc, N}},
receive
{ReqRef, ok} -> ok;
{ReqRef, incorrect_status} -> {resource_down, incorrect_status}
{ReqRef, incorrect_status} -> {recoverable_error, incorrect_status}
after 1000 ->
{error, timeout}
end;

View File

@ -419,7 +419,7 @@ t_query_counter_async_inflight(_) ->
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID),
ct:pal("metrics: ~p", [C]),
?assertMatch(
#{matched := M, success := S, exception := E, failed := F, resource_down := RD} when
#{matched := M, success := S, exception := E, failed := F, recoverable_error := RD} when
M >= Sent andalso M == S + E + F + RD,
C
),

View File

@ -506,12 +506,22 @@ nested_put(Alias, Val, Columns0) ->
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
inc_action_metrics(ok, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
inc_action_metrics({ok, _}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
inc_action_metrics({resource_down, _}, RuleId) ->
inc_action_metrics({recoverable_error, _}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics(_, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown').
inc_action_metrics(R, RuleId) ->
case is_ok_result(R) of
false ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
true ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
end.
is_ok_result(ok) ->
true;
is_ok_result(R) when is_tuple(R) ->
ok = erlang:element(1, R);
is_ok_result(ok) ->
false.

View File

@ -85,18 +85,13 @@ on_batch_query_async(
InstId,
BatchData,
{ReplayFun, Args},
State = #{write_syntax := SyntaxLines, client := Client}
#{write_syntax := SyntaxLines, client := Client}
) ->
case on_get_status(InstId, State) of
connected ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
do_async_query(InstId, Client, Points, {ReplayFun, Args});
{error, Reason} ->
{error, Reason}
end;
disconnected ->
{resource_down, disconnected}
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
do_async_query(InstId, Client, Points, {ReplayFun, Args});
{error, Reason} ->
{error, Reason}
end.
on_get_status(_InstId, #{client := Client}) ->