From 657ecef67bec35b769bb59ec2087caaadd474d6a Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 31 Dec 2021 20:57:34 +0800 Subject: [PATCH] fix(resource): don't crash on resource stopped --- apps/emqx_bridge/src/emqx_bridge.erl | 5 ++++- apps/emqx_resource/include/emqx_resource.hrl | 4 +++- apps/emqx_resource/src/emqx_resource.erl | 16 +++++++++++----- .../emqx_resource/src/emqx_resource_instance.erl | 14 +++++++++++--- apps/emqx_resource/test/emqx_resource_SUITE.erl | 2 +- apps/emqx_rule_engine/src/emqx_rule_runtime.erl | 3 ++- 6 files changed, 32 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 1814a11fe..3ae4e5820 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -94,7 +94,10 @@ send_message(BridgeId, Message) -> not_found -> throw({bridge_not_found, BridgeId}); #{enable := true} -> - emqx_resource:query(ResId, {send_message, Message}); + case emqx_resource:query(ResId, {send_message, Message}) of + {error, {emqx_resource, Reason}} -> throw({bridge_not_ready, Reason}); + Result -> Result + end; #{enable := false} -> throw({bridge_stopped, BridgeId}) end. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 08c230401..ed1de18cf 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -25,7 +25,7 @@ mod := module(), config := resource_config(), state := resource_state(), - status := started | stopped, + status := started | stopped | starting, metrics := emqx_plugin_libs_metrics:metrics() }. -type resource_group() :: binary(). @@ -41,3 +41,5 @@ %% the `after_query_fun()` is mainly for callbacks that increment counters or do some fallback %% actions upon query failure -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. + +-define(TEST_ID_PREFIX, "_test_:"). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 37c4caa2e..5ec5fd92a 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -82,7 +82,6 @@ ]). -define(HOCON_CHECK_OPTS, #{atom_key => true, nullable => true}). - -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -optional_callbacks([ on_query/4 @@ -170,7 +169,7 @@ create_dry_run(ResourceType, Config) -> -spec create_dry_run_local(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run_local(ResourceType, Config) -> - InstId = iolist_to_binary(emqx_misc:gen_id(16)), + InstId = emqx_resource_instance:make_test_id(), call_instance(InstId, {create_dry_run, InstId, ResourceType, Config}). -spec recreate(instance_id(), resource_type(), resource_config(), term()) -> @@ -201,14 +200,18 @@ query(InstId, Request) -> -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). query(InstId, Request, AfterQuery) -> case get_instance(InstId) of + {ok, #{status := starting}} -> + query_error(starting, <<"cannot serve query when the resource " + "instance is still starting">>); {ok, #{status := stopped}} -> - error({resource_stopped, InstId}); + query_error(stopped, <<"cannot serve query when the resource " + "instance is stopped">>); {ok, #{mod := Mod, state := ResourceState, status := started}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe Mod:on_query(InstId, Request, AfterQuery, ResourceState); - {error, Reason} -> - error({get_instance, {InstId, Reason}}) + {error, not_found} -> + query_error(not_found, <<"the resource id not exists">>) end. -spec restart(instance_id()) -> ok | {error, Reason :: term()}. @@ -368,3 +371,6 @@ cluster_call(Func, Args) -> {ok, _TxnId, Result} -> Result; Failed -> Failed end. + +query_error(Reason, Msg) -> + {error, {?MODULE, #{reason => Reason, msg => Msg}}}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index 745b8b684..c413691d9 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -26,6 +26,7 @@ -export([ lookup/1 , get_metrics/1 , list_all/0 + , make_test_id/0 ]). -export([ hash_call/2 @@ -61,7 +62,7 @@ hash_call(InstId, Request) -> hash_call(InstId, Request, Timeout) -> gen_server:call(pick(InstId), Request, Timeout). --spec lookup(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. +-spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}. lookup(InstId) -> case ets:lookup(emqx_resource_instance, InstId) of [] -> {error, not_found}; @@ -69,6 +70,10 @@ lookup(InstId) -> {ok, Data#{id => InstId, metrics => get_metrics(InstId)}} end. +make_test_id() -> + RandId = iolist_to_binary(emqx_misc:gen_id(16)), + <>. + get_metrics(InstId) -> emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). @@ -146,7 +151,7 @@ do_recreate(InstId, ResourceType, NewConfig, Params) -> {ok, #{mod := ResourceType, state := ResourceState, config := OldConfig}} -> Config = emqx_resource:call_config_merge(ResourceType, OldConfig, NewConfig, Params), - TestInstId = iolist_to_binary(emqx_misc:gen_id(16)), + TestInstId = make_test_id(), case do_create_dry_run(TestInstId, ResourceType, Config) of ok -> do_remove(ResourceType, InstId, ResourceState, false), @@ -166,7 +171,9 @@ do_create(InstId, ResourceType, Config, Opts) -> {ok, _} -> {ok, already_created}; _ -> Res0 = #{id => InstId, mod => ResourceType, config => Config, - status => stopped, state => undefined}, + status => starting, state => undefined}, + %% The `emqx_resource:call_start/3` need the instance exist beforehand + ets:insert(emqx_resource_instance, {InstId, Res0}), case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId), @@ -181,6 +188,7 @@ do_create(InstId, ResourceType, Config, Opts) -> ets:insert(emqx_resource_instance, {InstId, Res0}), {ok, Res0}; {error, Reason} when ForceCreate == false -> + ets:delete(emqx_resource_instance, InstId), {error, Reason} end end. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 6b2e5903e..0b65b5a78 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -142,7 +142,7 @@ t_stop_start(_) -> ?assertNot(is_process_alive(Pid0)), - ?assertException(error, {resource_stopped, ?ID}, emqx_resource:query(?ID, get_state)), + ?assertMatch({emqx_resource, #{reason := stopped}}, emqx_resource:query(?ID, get_state)), ok = emqx_resource:restart(?ID), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 428a1ef8c..049829c59 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -240,7 +240,8 @@ handle_output(RuleId, OutId, Selected, Envs) -> catch Err:Reason:ST -> ok = emqx_plugin_libs_metrics:inc_failed(rule_metrics, RuleId), - ?SLOG(error, #{msg => "output_failed", + Level = case Err of throw -> debug; _ -> error end, + ?SLOG(Level, #{msg => "output_failed", output => OutId, exception => Err, reason => Reason,