diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 97263099d..ccc2685e1 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -79,6 +79,7 @@ , on_action_failed :: continue | stop , actions :: list(#action_instance{}) , enabled :: boolean() + , created_at :: integer() %% epoch in millisecond precision , description :: binary() }). diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 860b9e702..12f00c191 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -179,6 +179,7 @@ create_rule(Params = #{rawsql := Sql, actions := ActArgs}) -> on_action_failed = maps:get(on_action_failed, Params, continue), actions = Actions, enabled = Enabled, + created_at = erlang:system_time(millisecond), description = maps:get(description, Params, "") }, ok = emqx_rule_registry:add_rule(Rule), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index beb1204df..e7287d98d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -230,7 +230,7 @@ update_rule(#{id := Id}, Params) -> end. list_rules(_Bindings, _Params) -> - return_all(emqx_rule_registry:get_rules()). + return_all(emqx_rule_registry:get_rules_ordered_by_ts()). show_rule(#{id := Id}, _Params) -> reply_with(fun emqx_rule_registry:get_rule/1, Id). @@ -495,8 +495,8 @@ parse_rule_params([{<<"actions">>, Actions} | Params], Rule) -> parse_rule_params(Params, Rule#{actions => parse_actions(Actions)}); parse_rule_params([{<<"description">>, Descr} | Params], Rule) -> parse_rule_params(Params, Rule#{description => Descr}); -parse_rule_params([_ | Params], Res) -> - parse_rule_params(Params, Res). +parse_rule_params([_ | Params], Rule) -> + parse_rule_params(Params, Rule). on_failed(<<"continue">>) -> continue; on_failed(<<"stop">>) -> stop; diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl index 9edc198f9..dbcf3e0e5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_cli.erl @@ -98,7 +98,7 @@ unload() -> %%----------------------------------------------------------------------------- -dialyzer([{nowarn_function, [rules/1]}]). rules(["list"]) -> - print_all(emqx_rule_registry:get_rules()); + print_all(emqx_rule_registry:get_rules_ordered_by_ts()); rules(["show", RuleId]) -> print_with(fun emqx_rule_registry:get_rule/1, list_to_binary(RuleId)); diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index dc7a33805..80667f995 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -20,6 +20,7 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("stdlib/include/qlc.hrl"). -export([start_link/0]). @@ -27,6 +28,7 @@ -export([ get_rules/0 , get_rules_for/1 , get_rules_with_same_event/1 + , get_rules_ordered_by_ts/0 , get_rule/1 , add_rule/1 , add_rules/1 @@ -168,6 +170,14 @@ start_link() -> get_rules() -> get_all_records(?RULE_TAB). +get_rules_ordered_by_ts() -> + F = fun() -> + Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]), + qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}])) + end, + {atomic, List} = mnesia:transaction(F), + List. + -spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())). get_rules_for(Topic) -> [Rule || Rule = #rule{for = For} <- get_rules(), diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 456438b6d..29c1ee6b6 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -694,44 +694,44 @@ t_update_rule(_Config) -> ok. t_disable_rule(_Config) -> - ets:new(simpile_action_2, [named_table, set, public]), - ets:insert(simpile_action_2, {created, 0}), - ets:insert(simpile_action_2, {destroyed, 0}), + ets:new(simple_action_2, [named_table, set, public]), + ets:insert(simple_action_2, {created, 0}), + ets:insert(simple_action_2, {destroyed, 0}), Now = erlang:timestamp(), emqx_rule_registry:add_action( - #action{name = 'simpile_action_2', app = ?APP, + #action{name = 'simple_action_2', app = ?APP, module = ?MODULE, - on_create = simpile_action_2_create, - on_destroy = simpile_action_2_destroy, + on_create = simple_action_2_create, + on_destroy = simple_action_2_destroy, types=[], params_spec = #{}, title = #{en => <<"Simple Action">>}, description = #{en => <<"Simple Action">>}}), {ok, #rule{actions = [#action_instance{id = ActInsId0}]}} = emqx_rule_engine:create_rule( #{id => <<"simple_rule_2">>, rawsql => <<"select * from \"t/#\"">>, - actions => [#{name => 'simpile_action_2', args => #{}}] + actions => [#{name => 'simple_action_2', args => #{}}] }), - [{_, CAt}] = ets:lookup(simpile_action_2, created), + [{_, CAt}] = ets:lookup(simple_action_2, created), ?assert(CAt > Now), - [{_, DAt}] = ets:lookup(simpile_action_2, destroyed), + [{_, DAt}] = ets:lookup(simple_action_2, destroyed), ?assert(DAt < Now), %% disable the rule and verify the old action instances has been cleared Now2 = erlang:timestamp(), emqx_rule_engine:update_rule(#{ id => <<"simple_rule_2">>, enabled => false}), - [{_, CAt2}] = ets:lookup(simpile_action_2, created), + [{_, CAt2}] = ets:lookup(simple_action_2, created), ?assert(CAt2 < Now2), - [{_, DAt2}] = ets:lookup(simpile_action_2, destroyed), + [{_, DAt2}] = ets:lookup(simple_action_2, destroyed), ?assert(DAt2 > Now2), %% enable the rule again and verify the action instances has been created Now3 = erlang:timestamp(), emqx_rule_engine:update_rule(#{ id => <<"simple_rule_2">>, enabled => true}), - [{_, CAt3}] = ets:lookup(simpile_action_2, created), + [{_, CAt3}] = ets:lookup(simple_action_2, created), ?assert(CAt3 > Now3), - [{_, DAt3}] = ets:lookup(simpile_action_2, destroyed), + [{_, DAt3}] = ets:lookup(simple_action_2, destroyed), ?assert(DAt3 < Now3), ok = emqx_rule_engine:delete_rule(<<"simple_rule_2">>). @@ -744,6 +744,19 @@ t_get_rules_for(_Config) -> ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>]), ok. +t_get_rules_ordered_by_ts(_Config) -> + Now = fun() -> erlang:system_time(nanosecond) end, + ok = emqx_rule_registry:add_rules( + [make_simple_rule_with_ts(<<"rule-debug-0">>, Now()), + make_simple_rule_with_ts(<<"rule-debug-1">>, Now()), + make_simple_rule_with_ts(<<"rule-debug-2">>, Now()) + ]), + ?assertMatch([ + #rule{id = <<"rule-debug-0">>}, + #rule{id = <<"rule-debug-1">>}, + #rule{id = <<"rule-debug-2">>} + ], emqx_rule_registry:get_rules_ordered_by_ts()). + t_get_rules_for_2(_Config) -> Len0 = length(emqx_rule_registry:get_rules_for(<<"simple/1">>)), ok = emqx_rule_registry:add_rules( @@ -2166,6 +2179,17 @@ make_simple_rule(RuleId) when is_binary(RuleId) -> actions = [{'inspect', #{}}], description = <<"simple rule">>}. +make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) -> + #rule{id = RuleId, + rawsql = <<"select * from \"simple/topic\"">>, + for = [<<"simple/topic">>], + fields = [<<"*">>], + is_foreach = false, + conditions = {}, + actions = [{'inspect', #{}}], + created_at = Ts, + description = <<"simple rule">>}. + make_simple_rule(RuleId, SQL, ForTopics) when is_binary(RuleId) -> #rule{id = RuleId, rawsql = SQL, @@ -2250,12 +2274,12 @@ crash_action(_Id, _Params) -> error(crash) end. -simpile_action_2_create(_Id, _Params) -> - ets:insert(simpile_action_2, {created, erlang:timestamp()}), +simple_action_2_create(_Id, _Params) -> + ets:insert(simple_action_2, {created, erlang:timestamp()}), fun(_Data, _Envs) -> ok end. -simpile_action_2_destroy(_Id, _Params) -> - ets:insert(simpile_action_2, {destroyed, erlang:timestamp()}), +simple_action_2_destroy(_Id, _Params) -> + ets:insert(simple_action_2, {destroyed, erlang:timestamp()}), fun(_Data, _Envs) -> ok end. init_plus_by_one_action() ->