diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index fa3f0a0f6..4d1c45eb4 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -77,7 +77,7 @@ ok | {ok, term()} | {error, term()} - | {resource_down, term()}. + | {recoverable_error, term()}. -define(WORKER_POOL_SIZE, 16). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 82db194dc..f90d042b0 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -128,7 +128,7 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ok = emqx_metrics_worker:create_metrics( ?RES_METRICS, ResId, - [matched, success, failed, exception, resource_down], + [matched, success, failed, exception], [matched] ), ok = emqx_resource_worker_sup:start_workers(ResId, Opts), diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 9c632b662..8034bcb9e 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -355,13 +355,14 @@ handle_query_result(_Id, ?RESOURCE_ERROR_M(_, _), BlockWorker) -> handle_query_result(Id, {error, _}, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, failed), BlockWorker; -handle_query_result(Id, {resource_down, _}, _BlockWorker) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down), +handle_query_result(Id, {recoverable_error, _}, _BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, failed), true; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> true; -handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) -> - true; +handle_query_result(Id, {async_return, {error, _}}, BlockWorker) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, failed), + BlockWorker; handle_query_result(_Id, {async_return, ok}, BlockWorker) -> BlockWorker; handle_query_result(Id, Result, BlockWorker) -> @@ -390,8 +391,8 @@ call_query(QM0, Id, Query, QueryOpts) -> -define(APPLY_RESOURCE(EXPR, REQ), try %% if the callback module (connector) wants to return an error that - %% makes the current resource goes into the `error` state, it should - %% return `{resource_down, Reason}` + %% makes the current resource goes into the `blocked` state, it should + %% return `{recoverable_error, Reason}` EXPR catch ERR:REASON:STACKTRACE -> diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 6e7bca18a..4999b9410 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -96,7 +96,7 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> Pid ! {From, {inc, N}}, receive {ReqRef, ok} -> ok; - {ReqRef, incorrect_status} -> {resource_down, incorrect_status} + {ReqRef, incorrect_status} -> {recoverable_error, incorrect_status} after 1000 -> {error, timeout} end; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 06160f6c7..0bfa67d07 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -419,7 +419,7 @@ t_query_counter_async_inflight(_) -> {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), ct:pal("metrics: ~p", [C]), ?assertMatch( - #{matched := M, success := S, exception := E, failed := F, resource_down := RD} when + #{matched := M, success := S, exception := E, failed := F, recoverable_error := RD} when M >= Sent andalso M == S + E + F + RD, C ), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index d7fa65829..aafce137c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -506,12 +506,22 @@ nested_put(Alias, Val, Columns0) -> -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). inc_action_metrics(ok, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); -inc_action_metrics({ok, _}, RuleId) -> - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); -inc_action_metrics({resource_down, _}, RuleId) -> +inc_action_metrics({recoverable_error, _}, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); -inc_action_metrics(_, RuleId) -> - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), - emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'). +inc_action_metrics(R, RuleId) -> + case is_ok_result(R) of + false -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); + true -> + emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success') + end. + +is_ok_result(ok) -> + true; +is_ok_result(R) when is_tuple(R) -> + ok = erlang:element(1, R); +is_ok_result(ok) -> + false. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index b83aec4bd..2024ba8ce 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -85,18 +85,13 @@ on_batch_query_async( InstId, BatchData, {ReplayFun, Args}, - State = #{write_syntax := SyntaxLines, client := Client} + #{write_syntax := SyntaxLines, client := Client} ) -> - case on_get_status(InstId, State) of - connected -> - case parse_batch_data(InstId, BatchData, SyntaxLines) of - {ok, Points} -> - do_async_query(InstId, Client, Points, {ReplayFun, Args}); - {error, Reason} -> - {error, Reason} - end; - disconnected -> - {resource_down, disconnected} + case parse_batch_data(InstId, BatchData, SyntaxLines) of + {ok, Points} -> + do_async_query(InstId, Client, Points, {ReplayFun, Args}); + {error, Reason} -> + {error, Reason} end. on_get_status(_InstId, #{client := Client}) ->