feat(rule-engine): list rule support order by timestrap
This commit is contained in:
parent
7138e3a3a9
commit
f7b34cb098
|
@ -79,6 +79,7 @@
|
||||||
, on_action_failed :: continue | stop
|
, on_action_failed :: continue | stop
|
||||||
, actions :: list(#action_instance{})
|
, actions :: list(#action_instance{})
|
||||||
, enabled :: boolean()
|
, enabled :: boolean()
|
||||||
|
, created_at :: integer() %% epoch in millisecond precision
|
||||||
, description :: binary()
|
, description :: binary()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
|
@ -179,6 +179,7 @@ create_rule(Params = #{rawsql := Sql, actions := ActArgs}) ->
|
||||||
on_action_failed = maps:get(on_action_failed, Params, continue),
|
on_action_failed = maps:get(on_action_failed, Params, continue),
|
||||||
actions = Actions,
|
actions = Actions,
|
||||||
enabled = Enabled,
|
enabled = Enabled,
|
||||||
|
created_at = erlang:system_time(millisecond),
|
||||||
description = maps:get(description, Params, "")
|
description = maps:get(description, Params, "")
|
||||||
},
|
},
|
||||||
ok = emqx_rule_registry:add_rule(Rule),
|
ok = emqx_rule_registry:add_rule(Rule),
|
||||||
|
|
|
@ -230,7 +230,7 @@ update_rule(#{id := Id}, Params) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
list_rules(_Bindings, _Params) ->
|
list_rules(_Bindings, _Params) ->
|
||||||
return_all(emqx_rule_registry:get_rules()).
|
return_all(emqx_rule_registry:get_rules_ordered_by_ts()).
|
||||||
|
|
||||||
show_rule(#{id := Id}, _Params) ->
|
show_rule(#{id := Id}, _Params) ->
|
||||||
reply_with(fun emqx_rule_registry:get_rule/1, Id).
|
reply_with(fun emqx_rule_registry:get_rule/1, Id).
|
||||||
|
@ -495,8 +495,8 @@ parse_rule_params([{<<"actions">>, Actions} | Params], Rule) ->
|
||||||
parse_rule_params(Params, Rule#{actions => parse_actions(Actions)});
|
parse_rule_params(Params, Rule#{actions => parse_actions(Actions)});
|
||||||
parse_rule_params([{<<"description">>, Descr} | Params], Rule) ->
|
parse_rule_params([{<<"description">>, Descr} | Params], Rule) ->
|
||||||
parse_rule_params(Params, Rule#{description => Descr});
|
parse_rule_params(Params, Rule#{description => Descr});
|
||||||
parse_rule_params([_ | Params], Res) ->
|
parse_rule_params([_ | Params], Rule) ->
|
||||||
parse_rule_params(Params, Res).
|
parse_rule_params(Params, Rule).
|
||||||
|
|
||||||
on_failed(<<"continue">>) -> continue;
|
on_failed(<<"continue">>) -> continue;
|
||||||
on_failed(<<"stop">>) -> stop;
|
on_failed(<<"stop">>) -> stop;
|
||||||
|
|
|
@ -98,7 +98,7 @@ unload() ->
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
-dialyzer([{nowarn_function, [rules/1]}]).
|
-dialyzer([{nowarn_function, [rules/1]}]).
|
||||||
rules(["list"]) ->
|
rules(["list"]) ->
|
||||||
print_all(emqx_rule_registry:get_rules());
|
print_all(emqx_rule_registry:get_rules_ordered_by_ts());
|
||||||
|
|
||||||
rules(["show", RuleId]) ->
|
rules(["show", RuleId]) ->
|
||||||
print_with(fun emqx_rule_registry:get_rule/1, list_to_binary(RuleId));
|
print_with(fun emqx_rule_registry:get_rule/1, list_to_binary(RuleId));
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
-include("rule_engine.hrl").
|
-include("rule_engine.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("stdlib/include/qlc.hrl").
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
|
@ -27,6 +28,7 @@
|
||||||
-export([ get_rules/0
|
-export([ get_rules/0
|
||||||
, get_rules_for/1
|
, get_rules_for/1
|
||||||
, get_rules_with_same_event/1
|
, get_rules_with_same_event/1
|
||||||
|
, get_rules_ordered_by_ts/0
|
||||||
, get_rule/1
|
, get_rule/1
|
||||||
, add_rule/1
|
, add_rule/1
|
||||||
, add_rules/1
|
, add_rules/1
|
||||||
|
@ -168,6 +170,14 @@ start_link() ->
|
||||||
get_rules() ->
|
get_rules() ->
|
||||||
get_all_records(?RULE_TAB).
|
get_all_records(?RULE_TAB).
|
||||||
|
|
||||||
|
get_rules_ordered_by_ts() ->
|
||||||
|
F = fun() ->
|
||||||
|
Query = qlc:q([E || E <- mnesia:table(?RULE_TAB)]),
|
||||||
|
qlc:e(qlc:keysort(#rule.created_at, Query, [{order, ascending}]))
|
||||||
|
end,
|
||||||
|
{atomic, List} = mnesia:transaction(F),
|
||||||
|
List.
|
||||||
|
|
||||||
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
|
-spec(get_rules_for(Topic :: binary()) -> list(emqx_rule_engine:rule())).
|
||||||
get_rules_for(Topic) ->
|
get_rules_for(Topic) ->
|
||||||
[Rule || Rule = #rule{for = For} <- get_rules(),
|
[Rule || Rule = #rule{for = For} <- get_rules(),
|
||||||
|
|
|
@ -694,44 +694,44 @@ t_update_rule(_Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_disable_rule(_Config) ->
|
t_disable_rule(_Config) ->
|
||||||
ets:new(simpile_action_2, [named_table, set, public]),
|
ets:new(simple_action_2, [named_table, set, public]),
|
||||||
ets:insert(simpile_action_2, {created, 0}),
|
ets:insert(simple_action_2, {created, 0}),
|
||||||
ets:insert(simpile_action_2, {destroyed, 0}),
|
ets:insert(simple_action_2, {destroyed, 0}),
|
||||||
Now = erlang:timestamp(),
|
Now = erlang:timestamp(),
|
||||||
emqx_rule_registry:add_action(
|
emqx_rule_registry:add_action(
|
||||||
#action{name = 'simpile_action_2', app = ?APP,
|
#action{name = 'simple_action_2', app = ?APP,
|
||||||
module = ?MODULE,
|
module = ?MODULE,
|
||||||
on_create = simpile_action_2_create,
|
on_create = simple_action_2_create,
|
||||||
on_destroy = simpile_action_2_destroy,
|
on_destroy = simple_action_2_destroy,
|
||||||
types=[], params_spec = #{},
|
types=[], params_spec = #{},
|
||||||
title = #{en => <<"Simple Action">>},
|
title = #{en => <<"Simple Action">>},
|
||||||
description = #{en => <<"Simple Action">>}}),
|
description = #{en => <<"Simple Action">>}}),
|
||||||
{ok, #rule{actions = [#action_instance{id = ActInsId0}]}} = emqx_rule_engine:create_rule(
|
{ok, #rule{actions = [#action_instance{id = ActInsId0}]}} = emqx_rule_engine:create_rule(
|
||||||
#{id => <<"simple_rule_2">>,
|
#{id => <<"simple_rule_2">>,
|
||||||
rawsql => <<"select * from \"t/#\"">>,
|
rawsql => <<"select * from \"t/#\"">>,
|
||||||
actions => [#{name => 'simpile_action_2', args => #{}}]
|
actions => [#{name => 'simple_action_2', args => #{}}]
|
||||||
}),
|
}),
|
||||||
[{_, CAt}] = ets:lookup(simpile_action_2, created),
|
[{_, CAt}] = ets:lookup(simple_action_2, created),
|
||||||
?assert(CAt > Now),
|
?assert(CAt > Now),
|
||||||
[{_, DAt}] = ets:lookup(simpile_action_2, destroyed),
|
[{_, DAt}] = ets:lookup(simple_action_2, destroyed),
|
||||||
?assert(DAt < Now),
|
?assert(DAt < Now),
|
||||||
|
|
||||||
%% disable the rule and verify the old action instances has been cleared
|
%% disable the rule and verify the old action instances has been cleared
|
||||||
Now2 = erlang:timestamp(),
|
Now2 = erlang:timestamp(),
|
||||||
emqx_rule_engine:update_rule(#{ id => <<"simple_rule_2">>,
|
emqx_rule_engine:update_rule(#{ id => <<"simple_rule_2">>,
|
||||||
enabled => false}),
|
enabled => false}),
|
||||||
[{_, CAt2}] = ets:lookup(simpile_action_2, created),
|
[{_, CAt2}] = ets:lookup(simple_action_2, created),
|
||||||
?assert(CAt2 < Now2),
|
?assert(CAt2 < Now2),
|
||||||
[{_, DAt2}] = ets:lookup(simpile_action_2, destroyed),
|
[{_, DAt2}] = ets:lookup(simple_action_2, destroyed),
|
||||||
?assert(DAt2 > Now2),
|
?assert(DAt2 > Now2),
|
||||||
|
|
||||||
%% enable the rule again and verify the action instances has been created
|
%% enable the rule again and verify the action instances has been created
|
||||||
Now3 = erlang:timestamp(),
|
Now3 = erlang:timestamp(),
|
||||||
emqx_rule_engine:update_rule(#{ id => <<"simple_rule_2">>,
|
emqx_rule_engine:update_rule(#{ id => <<"simple_rule_2">>,
|
||||||
enabled => true}),
|
enabled => true}),
|
||||||
[{_, CAt3}] = ets:lookup(simpile_action_2, created),
|
[{_, CAt3}] = ets:lookup(simple_action_2, created),
|
||||||
?assert(CAt3 > Now3),
|
?assert(CAt3 > Now3),
|
||||||
[{_, DAt3}] = ets:lookup(simpile_action_2, destroyed),
|
[{_, DAt3}] = ets:lookup(simple_action_2, destroyed),
|
||||||
?assert(DAt3 < Now3),
|
?assert(DAt3 < Now3),
|
||||||
ok = emqx_rule_engine:delete_rule(<<"simple_rule_2">>).
|
ok = emqx_rule_engine:delete_rule(<<"simple_rule_2">>).
|
||||||
|
|
||||||
|
@ -744,6 +744,19 @@ t_get_rules_for(_Config) ->
|
||||||
ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>]),
|
ok = emqx_rule_registry:remove_rules([<<"rule-debug-1">>, <<"rule-debug-2">>]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_get_rules_ordered_by_ts(_Config) ->
|
||||||
|
Now = fun() -> erlang:system_time(nanosecond) end,
|
||||||
|
ok = emqx_rule_registry:add_rules(
|
||||||
|
[make_simple_rule_with_ts(<<"rule-debug-0">>, Now()),
|
||||||
|
make_simple_rule_with_ts(<<"rule-debug-1">>, Now()),
|
||||||
|
make_simple_rule_with_ts(<<"rule-debug-2">>, Now())
|
||||||
|
]),
|
||||||
|
?assertMatch([
|
||||||
|
#rule{id = <<"rule-debug-0">>},
|
||||||
|
#rule{id = <<"rule-debug-1">>},
|
||||||
|
#rule{id = <<"rule-debug-2">>}
|
||||||
|
], emqx_rule_registry:get_rules_ordered_by_ts()).
|
||||||
|
|
||||||
t_get_rules_for_2(_Config) ->
|
t_get_rules_for_2(_Config) ->
|
||||||
Len0 = length(emqx_rule_registry:get_rules_for(<<"simple/1">>)),
|
Len0 = length(emqx_rule_registry:get_rules_for(<<"simple/1">>)),
|
||||||
ok = emqx_rule_registry:add_rules(
|
ok = emqx_rule_registry:add_rules(
|
||||||
|
@ -2166,6 +2179,17 @@ make_simple_rule(RuleId) when is_binary(RuleId) ->
|
||||||
actions = [{'inspect', #{}}],
|
actions = [{'inspect', #{}}],
|
||||||
description = <<"simple rule">>}.
|
description = <<"simple rule">>}.
|
||||||
|
|
||||||
|
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
|
||||||
|
#rule{id = RuleId,
|
||||||
|
rawsql = <<"select * from \"simple/topic\"">>,
|
||||||
|
for = [<<"simple/topic">>],
|
||||||
|
fields = [<<"*">>],
|
||||||
|
is_foreach = false,
|
||||||
|
conditions = {},
|
||||||
|
actions = [{'inspect', #{}}],
|
||||||
|
created_at = Ts,
|
||||||
|
description = <<"simple rule">>}.
|
||||||
|
|
||||||
make_simple_rule(RuleId, SQL, ForTopics) when is_binary(RuleId) ->
|
make_simple_rule(RuleId, SQL, ForTopics) when is_binary(RuleId) ->
|
||||||
#rule{id = RuleId,
|
#rule{id = RuleId,
|
||||||
rawsql = SQL,
|
rawsql = SQL,
|
||||||
|
@ -2250,12 +2274,12 @@ crash_action(_Id, _Params) ->
|
||||||
error(crash)
|
error(crash)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
simpile_action_2_create(_Id, _Params) ->
|
simple_action_2_create(_Id, _Params) ->
|
||||||
ets:insert(simpile_action_2, {created, erlang:timestamp()}),
|
ets:insert(simple_action_2, {created, erlang:timestamp()}),
|
||||||
fun(_Data, _Envs) -> ok end.
|
fun(_Data, _Envs) -> ok end.
|
||||||
|
|
||||||
simpile_action_2_destroy(_Id, _Params) ->
|
simple_action_2_destroy(_Id, _Params) ->
|
||||||
ets:insert(simpile_action_2, {destroyed, erlang:timestamp()}),
|
ets:insert(simple_action_2, {destroyed, erlang:timestamp()}),
|
||||||
fun(_Data, _Envs) -> ok end.
|
fun(_Data, _Envs) -> ok end.
|
||||||
|
|
||||||
init_plus_by_one_action() ->
|
init_plus_by_one_action() ->
|
||||||
|
|
Loading…
Reference in New Issue