From 0ef0b68de48cb7e9a4127c7913401fb524417d1b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 31 Aug 2022 18:25:00 +0800 Subject: [PATCH] refactor: change '{recoverable_error,Reason}' to '{error,{recoverable_error,Reason}}' --- .../src/emqx_connector_http.erl | 25 ++++++++++++++----- .../src/emqx_connector_mysql.erl | 2 +- apps/emqx_resource/include/emqx_resource.hrl | 4 +-- .../src/emqx_resource_worker.erl | 8 +++--- .../src/emqx_rule_runtime.erl | 2 +- 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 2562bf272..3eb55c9a4 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -46,7 +46,7 @@ namespace/0 ]). --export([check_ssl_opts/2]). +-export([check_ssl_opts/2, validate_method/1]). -type connect_timeout() :: emqx_schema:duration() | infinity. -type pool_type() :: random | hash. @@ -137,8 +137,10 @@ fields(config) -> fields("request") -> [ {method, - hoconsc:mk(hoconsc:enum([post, put, get, delete]), #{ - required => false, desc => ?DESC("method") + hoconsc:mk(binary(), #{ + required => false, + desc => ?DESC("method"), + validator => fun ?MODULE:validate_method/1 })}, {path, hoconsc:mk(binary(), #{required => false, desc => ?DESC("path")})}, {body, hoconsc:mk(binary(), #{required => false, desc => ?DESC("body")})}, @@ -171,6 +173,17 @@ desc(_) -> validations() -> [{check_ssl_opts, fun check_ssl_opts/1}]. +validate_method(M) when M =:= <<"post">>; M =:= <<"put">>; M =:= <<"get">>; M =:= <<"delete">> -> + ok; +validate_method(M) -> + case string:find(M, "${") of + nomatch -> + {error, + <<"Invalid method, should be one of 'post', 'put', 'get', 'delete' or variables in ${field} format.">>}; + _ -> + ok + end. + sc(Type, Meta) -> hoconsc:mk(Type, Meta). ref(Field) -> hoconsc:ref(?MODULE, Field). @@ -286,13 +299,13 @@ on_query( Retry ) of - {error, econnrefused} -> + {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> ?SLOG(warning, #{ msg => "http_connector_do_request_failed", - reason => econnrefused, + reason => Reason, connector => InstId }), - {recoverable_error, econnrefused}; + {error, {recoverable_error, Reason}}; {error, Reason} = Result -> ?SLOG(error, #{ msg => "http_connector_do_request_failed", diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index b9c200316..802427134 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -420,7 +420,7 @@ on_sql_query( error, LogMeta#{msg => "mysql_connector_do_sql_query_failed", reason => Reason} ), - {recoverable_error, Reason}; + {error, {recoverable_error, Reason}}; {error, Reason} -> ?SLOG( error, diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 2409a7069..aab0129d1 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -76,8 +76,8 @@ -type query_result() :: ok | {ok, term()} - | {error, term()} - | {recoverable_error, term()}. + | {error, {recoverable_error, term()}} + | {error, term()}. -define(WORKER_POOL_SIZE, 16). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 3a12a6b91..d951d7a9f 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -396,7 +396,7 @@ handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), BlockWorker) -> handle_query_result(Id, {error, _}, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent.failed'), BlockWorker; -handle_query_result(Id, {recoverable_error, _}, _BlockWorker) -> +handle_query_result(Id, {error, {recoverable_error, _}}, _BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', -1), true; handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> @@ -433,7 +433,7 @@ call_query(QM0, Id, Query, QueryOpts) -> try %% if the callback module (connector) wants to return an error that %% makes the current resource goes into the `blocked` state, it should - %% return `{recoverable_error, Reason}` + %% return `{error, {recoverable_error, Reason}}` EXPR catch ERR:REASON:STACKTRACE -> @@ -457,7 +457,7 @@ apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?APPLY_RESOURCE( case inflight_is_full(Name, WinSize) of true -> - ?tp(inflight_full, #{id => Id, wind_size => WinSize}), + ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent'), @@ -483,7 +483,7 @@ apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?APPLY_RESOURCE( case inflight_is_full(Name, WinSize) of true -> - ?tp(inflight_full, #{id => Id, wind_size => WinSize}), + ?tp(warning, inflight_full, #{id => Id, wind_size => WinSize}), {async_return, inflight_full}; false -> ok = emqx_metrics_worker:inc(?RES_METRICS, Id, 'sent', length(Batch)), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index b1d563291..29f2c6bf6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -510,7 +510,7 @@ nested_put(Alias, Val, Columns0) -> -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). inc_action_metrics(ok, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success'); -inc_action_metrics({recoverable_error, _}, RuleId) -> +inc_action_metrics({error, {recoverable_error, _}}, RuleId) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');