diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index a374c822b..f453bb73c 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -237,27 +237,32 @@ create_resource(#{type := Type, config := Config0} = Params) -> }, ok = emqx_rule_registry:add_resource(Resource), %% Note that we will return OK in case of resource creation failure, - %% users can always re-start the resource later. - catch cluster_call(init_resource, [M, F, ResId, Config]), + %% A timer is started to re-start the resource later. + catch cluster_call(init_resource, [M, F, ResId, Config, true]), {ok, Resource}; not_found -> {error, {resource_type_not_found, Type}} end. -update_resource(#{id := Id, type := Type, config := NewConfig, description := Description} - = NewResource) -> +update_resource(#{id := Id, type := Type, config := NewConfig, + description := Description} = NewResource) -> case emqx_rule_registry:find_resource_type(Type) of {ok, #resource_type{on_create = {Module, Create}, params_spec = ParamSpec}} -> Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec), - _ = delete_resource(Id), - emqx_rule_registry:add_resource(#resource{id = Id, - config = Config, - type = Type, - description = Description, - created_at = erlang:system_time(millisecond)}), - catch cluster_call(init_resource, [Module, Create, Id, Config]), - {ok, NewResource}; + case delete_resource(Id) of + {error, not_found} -> {error, not_found}; + _ -> %% deletion might fail because of an associted rule. + emqx_rule_registry:add_resource( + #resource{ + id = Id, + config = Config, + type = Type, + description = Description, + created_at = erlang:system_time(millisecond)}), + catch cluster_call(init_resource, [Module, Create, Id, Config, true]), + {ok, NewResource} + end; not_found -> {error, {resource_type_not_found, Type}} end. @@ -281,7 +286,9 @@ start_resource(ResId) -> -spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}). test_resource(#{type := Type, config := Config0}) -> case emqx_rule_registry:find_resource_type(Type) of - {ok, #resource_type{on_create = {ModC,Create}, on_destroy = {ModD,Destroy}, params_spec = ParamSpec}} -> + {ok, #resource_type{on_create = {ModC, Create}, + on_destroy = {ModD, Destroy}, + params_spec = ParamSpec}} -> Config = emqx_rule_validator:validate_params(Config0, ParamSpec), ResId = resource_id(), try @@ -319,7 +326,7 @@ get_resource_params(ResId) -> delete_resource(ResId) -> case emqx_rule_registry:find_resource(ResId) of {ok, #resource{type = ResType}} -> - {ok, #resource_type{on_destroy = {ModD,Destroy}}} + {ok, #resource_type{on_destroy = {ModD, Destroy}}} = emqx_rule_registry:find_resource_type(ResType), try ok = emqx_rule_registry:remove_resource(ResId), @@ -328,7 +335,7 @@ delete_resource(ResId) -> throw:Reason -> {error, Reason} end; not_found -> - ok + {error, not_found} end. %%------------------------------------------------------------------------------ @@ -344,7 +351,7 @@ refresh_resources() -> "Can not re-stablish resource ~p: ~0p. The resource is disconnected." "Fix the issue and establish it manually.\n" "Stacktrace: ~0p", - [ResId, {Error,Reason}, ST]) + [ResId, {Error, Reason}, ST]) end end, emqx_rule_registry:get_resources()). @@ -486,10 +493,15 @@ cluster_call(Func, Args) -> end. init_resource(Module, OnCreate, ResId, Config) -> - Params = ?RAISE(Module:OnCreate(ResId, Config), - start_reinitial_loop(ResId), - {{init_resource_failure, node()}, - {{Module, OnCreate}, {_EXCLASS_,_EXCPTION_, _ST_}}}), + init_resource(Module, OnCreate, ResId, Config, false). + +init_resource(Module, OnCreate, ResId, Config, Restart) -> + Params = ?RAISE( + Module:OnCreate(ResId, Config), + Restart andalso + timer:apply_after(timer:seconds(60), ?MODULE, do_init_resource, + [Module, OnCreate, ResId, Config, Restart]), + {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}), ResParams = #resource_params{id = ResId, params = Params, status = #{is_alive => true}}, @@ -497,7 +509,9 @@ init_resource(Module, OnCreate, ResId, Config) -> init_action(Module, OnCreate, ActionInstId, Params) -> ok = emqx_rule_metrics:create_metrics(ActionInstId), - case ?RAISE(Module:OnCreate(ActionInstId, Params), {{init_action_failure, node()}, {{Module,OnCreate},{_EXCLASS_,_EXCPTION_,_ST_}}}) of + case ?RAISE(Module:OnCreate(ActionInstId, Params), + {{init_action_failure, node()}, + {{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}}) of {Apply, NewParams} when is_function(Apply) -> %% BACKW: =< e4.2.2 ok = emqx_rule_registry:add_action_instance_params( #action_instance_params{id = ActionInstId, params = NewParams, apply = Apply}); @@ -616,14 +630,3 @@ find_type(ResId) -> alarm_name_of_resource_down(Type, ResId) -> list_to_binary(io_lib:format("resource/~s/~s/down", [Type, ResId])). - -start_reinitial_loop(Id) -> - spawn(fun() -> - timer:sleep(60000), - case emqx_rule_registry:find_resource(Id) of - {ok, _}-> - ?LOG(warning, "try to re-initialize resource: ~p", [Id]), - start_resource(Id); - not_found -> ok - end - end). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index aafaf0082..9b598e5b9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -255,12 +255,18 @@ show_action(#{name := Name}, _Params) -> %% Resources API %%------------------------------------------------------------------------------ create_resource(#{}, Params) -> - if_test(fun() -> do_create_resource(test_resource, Params) end, - fun() -> do_create_resource(create_resource, Params) end, - Params). + case parse_resource_params(Params) of + {ok, ParsedParams} -> + if_test(fun() -> do_create_resource(test_resource, ParsedParams) end, + fun() -> do_create_resource(create_resource, ParsedParams) end, + Params); + {error, Reason} -> + ?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]), + return({error, 400, ?ERR_BADARGS(Reason)}) + end. -do_create_resource(Create, Params) -> - case emqx_rule_engine:Create(parse_resource_params(Params)) of +do_create_resource(Create, ParsedParams) -> + case emqx_rule_engine:Create(ParsedParams) of ok -> return(ok); {ok, Resource} -> @@ -322,23 +328,30 @@ start_resource(#{id := Id}, _Params) -> end. update_resource(#{id := Id}, Params) -> - case emqx_rule_registry:find_resource(Id) of - {ok, #resource{id = Id, type = Type} = _OldResource} -> - Config = maps:get(config, parse_resource_params(Params)), - Description = maps:get(description, parse_resource_params(Params)), - _ = emqx_rule_engine:update_resource(#{id => Id, - config => Config, - type => Type, - description => Description, - created_at => erlang:system_time(millisecond)}), - return(ok); - _Other -> - return({error, 400, ?ERR_NO_RESOURCE(Id)}) + case parse_resource_params(Params) of + {ok, ParsedParams} -> + case emqx_rule_registry:find_resource(Id) of + {ok, #resource{id = Id, type = Type} = _OldResource} -> + Config = maps:get(config, ParsedParams), + Description = maps:get(description, ParsedParams), + emqx_rule_engine:update_resource( + #{id => Id, + config => Config, + type => Type, + description => Description, + created_at => erlang:system_time(millisecond)}), + return(ok); + _Other -> + return({error, 400, ?ERR_NO_RESOURCE(Id)}) + end; + {error, Reason} -> + return({error, 400, ?ERR_BADARGS(Reason)}) end. delete_resource(#{id := Id}, _Params) -> case emqx_rule_engine:delete_resource(Id) of ok -> return(ok); + {error, not_found} -> return(ok); {error, Reason} -> return({error, 400, ?ERR_BADARGS(Reason)}) end. @@ -495,13 +508,13 @@ parse_action(Action) -> parse_resource_params(Params) -> parse_resource_params(Params, #{config => #{}, description => <<"">>}). parse_resource_params([], Res) -> - Res; + {ok, Res}; parse_resource_params([{<<"id">>, Id} | Params], Res) -> parse_resource_params(Params, Res#{id => Id}); parse_resource_params([{<<"type">>, ResourceType} | Params], Res) -> try parse_resource_params(Params, Res#{type => binary_to_existing_atom(ResourceType, utf8)}) catch error:badarg -> - throw({resource_type_not_found, ResourceType}) + {error, {resource_type_not_found, ResourceType}} end; parse_resource_params([{<<"config">>, Config} | Params], Res) -> parse_resource_params(Params, Res#{config => json_term_to_map(Config)}); diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl index a47b7fe80..f165b95b1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl @@ -183,6 +183,7 @@ resources(["show", ResourceId]) -> resources(["delete", ResourceId]) -> case emqx_rule_engine:delete_resource(list_to_binary(ResourceId)) of ok -> emqx_ctl:print("ok~n"); + {error, not_found} -> emqx_ctl:print("ok~n"); {error, Reason} -> emqx_ctl:print("Cannot delete resource as ~0p~n", [Reason]) end;