diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 39c4daf69..49024bada 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -82,7 +82,7 @@ { id :: resource_id() , type :: resource_type_name() , config :: #{} %% the configs got from API for initializing resource - , created_at :: erlang:timestamp() + , created_at :: integer() %% epoch in millisecond precision , description :: binary() }). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 315637a12..fc40ff38e 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -39,6 +39,7 @@ , get_resource_status/1 , get_resource_params/1 , delete_resource/1 + , ensure_resource_deleted/1 ]). -export([ init_resource/4 @@ -154,7 +155,7 @@ module_attributes(Module) -> %% APIs for rules and resources %%------------------------------------------------------------------------------ --dialyzer([{nowarn_function, create_rule/1}]). +-dialyzer([{nowarn_function, [create_rule/1, rule_id/0]}]). -spec(create_rule(#{}) -> {ok, rule()} | no_return()). create_rule(Params = #{rawsql := Sql, actions := Actions}) -> case emqx_rule_sqlparser:parse_select(Sql) of @@ -179,7 +180,7 @@ create_rule(Params = #{rawsql := Sql, actions := Actions}) -> Error -> error(Error) end. --spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | no_return()). +-spec(update_rule(#{id := binary(), _=>_}) -> {ok, rule()} | {error, {not_found, rule_id()}}). update_rule(Params = #{id := RuleId}) -> case emqx_rule_registry:get_rule(RuleId) of {ok, Rule0} -> @@ -206,7 +207,7 @@ delete_rule(RuleId) -> ok end. --spec(create_resource(#{}) -> {ok, resource()} | {error, Reason :: term()}). +-spec(create_resource(#{type := _, config := _, _ => _}) -> {ok, resource()} | {error, Reason :: term()}). create_resource(#{type := Type, config := Config} = Params) -> case emqx_rule_registry:find_resource_type(Type) of {ok, #resource_type{on_create = {M, F}, params_spec = ParamSpec}} -> @@ -215,7 +216,9 @@ create_resource(#{type := Type, config := Config} = Params) -> Resource = #resource{id = ResId, type = Type, config = Config, - description = iolist_to_binary(maps:get(description, Params, ""))}, + description = iolist_to_binary(maps:get(description, Params, "")), + created_at = erlang:system_time(millisecond) + }, ok = emqx_rule_registry:add_resource(Resource), %% Note that we will return OK in case of resource creation failure, %% users can always re-start the resource later. @@ -231,14 +234,14 @@ start_resource(ResId) -> {ok, #resource{type = ResType, config = Config}} -> {ok, #resource_type{on_create = {Mod, Create}}} = emqx_rule_registry:find_resource_type(ResType), - init_resource(Mod, Create, ResId, Config), + _ = init_resource(Mod, Create, ResId, Config), refresh_actions_of_a_resource(ResId), ok; not_found -> {error, {resource_not_found, ResId}} end. --spec(test_resource(#{}) -> ok | {error, Reason :: term()}). +-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}). test_resource(#{type := Type, config := Config}) -> case emqx_rule_registry:find_resource_type(Type) of {ok, #resource_type{on_create = {ModC,Create}, on_destroy = {ModD,Destroy}, params_spec = ParamSpec}} -> @@ -284,6 +287,12 @@ delete_resource(ResId) -> {error, {resource_not_found, ResId}} end. +%% @doc Ensure resource deleted. `resource_not_found` error is discarded. +-spec(ensure_resource_deleted(resource_id()) -> ok). +ensure_resource_deleted(ResId) -> + _ = delete_resource(ResId), + ok. + %%------------------------------------------------------------------------------ %% Re-establish resources %%------------------------------------------------------------------------------ @@ -530,12 +539,12 @@ fetch_resource_status(Module, OnStatus, ResId) -> end. refresh_actions_of_a_resource(ResId) -> - [refresh_actions(Actions, - fun (#action_instance{args = #{<<"$resource">> := ResId0}}) + R = fun (#action_instance{args = #{<<"$resource">> := ResId0}}) when ResId0 =:= ResId -> true; (_) -> false - end) - || #rule{actions = Actions} <- emqx_rule_registry:get_rules()]. + end, + F = fun(#rule{actions = Actions}) -> refresh_actions(Actions, R) end, + lists:foreach(F, emqx_rule_registry:get_rules()). refresh_actions(Actions) -> refresh_actions(Actions, fun(_) -> true end). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 2a6ad1754..131ea9651 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -355,7 +355,7 @@ start_resource(#{id := Id}, _Params) -> delete_resource(#{id := Id}, _Params) -> try - emqx_rule_engine:delete_resource(Id), + ok = emqx_rule_engine:ensure_resource_deleted(Id), return(ok) catch _Error:{throw,Reason} ->