fix(rule_engine): mechanism of restarting resources (#3980)

This commit is contained in:
Shawn 2021-01-04 11:52:57 +08:00 committed by GitHub
parent 26021b37b3
commit e518828d8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 68 additions and 51 deletions

View File

@ -237,27 +237,32 @@ create_resource(#{type := Type, config := Config0} = Params) ->
},
ok = emqx_rule_registry:add_resource(Resource),
%% Note that we will return OK in case of resource creation failure,
%% users can always re-start the resource later.
catch cluster_call(init_resource, [M, F, ResId, Config]),
%% A timer is started to re-start the resource later.
catch cluster_call(init_resource, [M, F, ResId, Config, true]),
{ok, Resource};
not_found ->
{error, {resource_type_not_found, Type}}
end.
update_resource(#{id := Id, type := Type, config := NewConfig, description := Description}
= NewResource) ->
update_resource(#{id := Id, type := Type, config := NewConfig,
description := Description} = NewResource) ->
case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {Module, Create},
params_spec = ParamSpec}} ->
Config = emqx_rule_validator:validate_params(NewConfig, ParamSpec),
_ = delete_resource(Id),
emqx_rule_registry:add_resource(#resource{id = Id,
config = Config,
type = Type,
description = Description,
created_at = erlang:system_time(millisecond)}),
catch cluster_call(init_resource, [Module, Create, Id, Config]),
{ok, NewResource};
case delete_resource(Id) of
{error, not_found} -> {error, not_found};
_ -> %% deletion might fail because of an associted rule.
emqx_rule_registry:add_resource(
#resource{
id = Id,
config = Config,
type = Type,
description = Description,
created_at = erlang:system_time(millisecond)}),
catch cluster_call(init_resource, [Module, Create, Id, Config, true]),
{ok, NewResource}
end;
not_found ->
{error, {resource_type_not_found, Type}}
end.
@ -281,7 +286,9 @@ start_resource(ResId) ->
-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
test_resource(#{type := Type, config := Config0}) ->
case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {ModC,Create}, on_destroy = {ModD,Destroy}, params_spec = ParamSpec}} ->
{ok, #resource_type{on_create = {ModC, Create},
on_destroy = {ModD, Destroy},
params_spec = ParamSpec}} ->
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
ResId = resource_id(),
try
@ -319,7 +326,7 @@ get_resource_params(ResId) ->
delete_resource(ResId) ->
case emqx_rule_registry:find_resource(ResId) of
{ok, #resource{type = ResType}} ->
{ok, #resource_type{on_destroy = {ModD,Destroy}}}
{ok, #resource_type{on_destroy = {ModD, Destroy}}}
= emqx_rule_registry:find_resource_type(ResType),
try
ok = emqx_rule_registry:remove_resource(ResId),
@ -328,7 +335,7 @@ delete_resource(ResId) ->
throw:Reason -> {error, Reason}
end;
not_found ->
ok
{error, not_found}
end.
%%------------------------------------------------------------------------------
@ -344,7 +351,7 @@ refresh_resources() ->
"Can not re-stablish resource ~p: ~0p. The resource is disconnected."
"Fix the issue and establish it manually.\n"
"Stacktrace: ~0p",
[ResId, {Error,Reason}, ST])
[ResId, {Error, Reason}, ST])
end
end, emqx_rule_registry:get_resources()).
@ -486,10 +493,15 @@ cluster_call(Func, Args) ->
end.
init_resource(Module, OnCreate, ResId, Config) ->
Params = ?RAISE(Module:OnCreate(ResId, Config),
start_reinitial_loop(ResId),
{{init_resource_failure, node()},
{{Module, OnCreate}, {_EXCLASS_,_EXCPTION_, _ST_}}}),
init_resource(Module, OnCreate, ResId, Config, false).
init_resource(Module, OnCreate, ResId, Config, Restart) ->
Params = ?RAISE(
Module:OnCreate(ResId, Config),
Restart andalso
timer:apply_after(timer:seconds(60), ?MODULE, do_init_resource,
[Module, OnCreate, ResId, Config, Restart]),
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}),
ResParams = #resource_params{id = ResId,
params = Params,
status = #{is_alive => true}},
@ -497,7 +509,9 @@ init_resource(Module, OnCreate, ResId, Config) ->
init_action(Module, OnCreate, ActionInstId, Params) ->
ok = emqx_rule_metrics:create_metrics(ActionInstId),
case ?RAISE(Module:OnCreate(ActionInstId, Params), {{init_action_failure, node()}, {{Module,OnCreate},{_EXCLASS_,_EXCPTION_,_ST_}}}) of
case ?RAISE(Module:OnCreate(ActionInstId, Params),
{{init_action_failure, node()},
{{Module, OnCreate}, {_EXCLASS_, _EXCPTION_, _ST_}}}) of
{Apply, NewParams} when is_function(Apply) -> %% BACKW: =< e4.2.2
ok = emqx_rule_registry:add_action_instance_params(
#action_instance_params{id = ActionInstId, params = NewParams, apply = Apply});
@ -616,14 +630,3 @@ find_type(ResId) ->
alarm_name_of_resource_down(Type, ResId) ->
list_to_binary(io_lib:format("resource/~s/~s/down", [Type, ResId])).
start_reinitial_loop(Id) ->
spawn(fun() ->
timer:sleep(60000),
case emqx_rule_registry:find_resource(Id) of
{ok, _}->
?LOG(warning, "try to re-initialize resource: ~p", [Id]),
start_resource(Id);
not_found -> ok
end
end).

View File

@ -255,12 +255,18 @@ show_action(#{name := Name}, _Params) ->
%% Resources API
%%------------------------------------------------------------------------------
create_resource(#{}, Params) ->
if_test(fun() -> do_create_resource(test_resource, Params) end,
fun() -> do_create_resource(create_resource, Params) end,
Params).
case parse_resource_params(Params) of
{ok, ParsedParams} ->
if_test(fun() -> do_create_resource(test_resource, ParsedParams) end,
fun() -> do_create_resource(create_resource, ParsedParams) end,
Params);
{error, Reason} ->
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
return({error, 400, ?ERR_BADARGS(Reason)})
end.
do_create_resource(Create, Params) ->
case emqx_rule_engine:Create(parse_resource_params(Params)) of
do_create_resource(Create, ParsedParams) ->
case emqx_rule_engine:Create(ParsedParams) of
ok ->
return(ok);
{ok, Resource} ->
@ -322,23 +328,30 @@ start_resource(#{id := Id}, _Params) ->
end.
update_resource(#{id := Id}, Params) ->
case emqx_rule_registry:find_resource(Id) of
{ok, #resource{id = Id, type = Type} = _OldResource} ->
Config = maps:get(config, parse_resource_params(Params)),
Description = maps:get(description, parse_resource_params(Params)),
_ = emqx_rule_engine:update_resource(#{id => Id,
config => Config,
type => Type,
description => Description,
created_at => erlang:system_time(millisecond)}),
return(ok);
_Other ->
return({error, 400, ?ERR_NO_RESOURCE(Id)})
case parse_resource_params(Params) of
{ok, ParsedParams} ->
case emqx_rule_registry:find_resource(Id) of
{ok, #resource{id = Id, type = Type} = _OldResource} ->
Config = maps:get(config, ParsedParams),
Description = maps:get(description, ParsedParams),
emqx_rule_engine:update_resource(
#{id => Id,
config => Config,
type => Type,
description => Description,
created_at => erlang:system_time(millisecond)}),
return(ok);
_Other ->
return({error, 400, ?ERR_NO_RESOURCE(Id)})
end;
{error, Reason} ->
return({error, 400, ?ERR_BADARGS(Reason)})
end.
delete_resource(#{id := Id}, _Params) ->
case emqx_rule_engine:delete_resource(Id) of
ok -> return(ok);
{error, not_found} -> return(ok);
{error, Reason} ->
return({error, 400, ?ERR_BADARGS(Reason)})
end.
@ -495,13 +508,13 @@ parse_action(Action) ->
parse_resource_params(Params) ->
parse_resource_params(Params, #{config => #{}, description => <<"">>}).
parse_resource_params([], Res) ->
Res;
{ok, Res};
parse_resource_params([{<<"id">>, Id} | Params], Res) ->
parse_resource_params(Params, Res#{id => Id});
parse_resource_params([{<<"type">>, ResourceType} | Params], Res) ->
try parse_resource_params(Params, Res#{type => binary_to_existing_atom(ResourceType, utf8)})
catch error:badarg ->
throw({resource_type_not_found, ResourceType})
{error, {resource_type_not_found, ResourceType}}
end;
parse_resource_params([{<<"config">>, Config} | Params], Res) ->
parse_resource_params(Params, Res#{config => json_term_to_map(Config)});

View File

@ -183,6 +183,7 @@ resources(["show", ResourceId]) ->
resources(["delete", ResourceId]) ->
case emqx_rule_engine:delete_resource(list_to_binary(ResourceId)) of
ok -> emqx_ctl:print("ok~n");
{error, not_found} -> emqx_ctl:print("ok~n");
{error, Reason} ->
emqx_ctl:print("Cannot delete resource as ~0p~n", [Reason])
end;