From 869dc1a081e5fefcbc84712988ebbb4362229cbb Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 15 Mar 2022 10:43:42 +0800 Subject: [PATCH 1/3] fix(rules): ensure create_at unchanged after rule is updated --- .../emqx_rule_engine/src/emqx_rule_engine.erl | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 60befa3ab..66e7f4377 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -124,14 +124,19 @@ load_rules() -> -spec create_rule(map()) -> {ok, rule()} | {error, term()}. create_rule(Params = #{id := RuleId}) when is_binary(RuleId) -> case get_rule(RuleId) of - not_found -> do_create_rule(Params); - {ok, _} -> {error, {already_exists, RuleId}} + not_found -> parse_and_insert(Params, now_ms()); + {ok, _} -> {error, already_exists} end. -spec update_rule(map()) -> {ok, rule()} | {error, term()}. update_rule(Params = #{id := RuleId}) when is_binary(RuleId) -> - ok = delete_rule(RuleId), - do_create_rule(Params). + case get_rule(RuleId) of + not_found -> + {error, not_found}; + {ok, #{created_at := CreatedAt}} -> + ok = delete_rule(RuleId), + parse_and_insert(Params, CreatedAt) + end. -spec(delete_rule(RuleId :: rule_id()) -> ok). delete_rule(RuleId) when is_binary(RuleId) -> @@ -232,13 +237,14 @@ code_change(_OldVsn, State, _Extra) -> %% Internal Functions %%------------------------------------------------------------------------------ -do_create_rule(Params = #{id := RuleId, sql := Sql, outputs := Outputs}) -> +parse_and_insert(Params = #{id := RuleId, sql := Sql, outputs := Outputs}, CreatedAt) -> case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> Rule = #{ id => RuleId, name => maps:get(name, Params, <<"">>), - created_at => erlang:system_time(millisecond), + created_at => CreatedAt, + updated_at => now_ms(), enable => maps:get(enable, Params, true), sql => Sql, outputs => parse_outputs(Outputs), @@ -287,5 +293,8 @@ get_all_records(Tab) -> maps_foreach(Fun, Map) -> lists:foreach(Fun, maps:to_list(Map)). +now_ms() -> + erlang:system_time(millisecond). + bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(B) when is_binary(B) -> B. From 93c5fa60b45a2948c012553398d0dc2d5ea66fb7 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 15 Mar 2022 10:40:48 +0800 Subject: [PATCH 2/3] fix(rules): store rule configs to cluster config files --- apps/emqx_rule_engine/src/emqx_rule_engine_api.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 2e3a09172..ed103ce92 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -207,7 +207,7 @@ param_path_id() -> {ok, _Rule} -> {400, #{code => 'BAD_REQUEST', message => <<"rule id already exists">>}}; not_found -> - case emqx_conf:update(ConfPath, Params, #{}) of + case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), {201, format_rule_resp(Rule)}; @@ -238,7 +238,7 @@ param_path_id() -> '/rules/:id'(put, #{bindings := #{id := Id}, body := Params0}) -> Params = filter_out_request_body(Params0), ConfPath = emqx_rule_engine:config_key_path() ++ [Id], - case emqx_conf:update(ConfPath, Params, #{}) of + case emqx_conf:update(ConfPath, Params, #{override_to => cluster}) of {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> [Rule] = get_one_rule(AllRules, Id), {200, format_rule_resp(Rule)}; @@ -250,7 +250,7 @@ param_path_id() -> '/rules/:id'(delete, #{bindings := #{id := Id}}) -> ConfPath = emqx_rule_engine:config_key_path() ++ [Id], - case emqx_conf:remove(ConfPath, #{}) of + case emqx_conf:remove(ConfPath, #{override_to => cluster}) of {ok, _} -> {204}; {error, Reason} -> ?SLOG(error, #{msg => "delete_rule_failed", From a892ff9006cf003ee5505127a232c854f1ecd532 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 16 Mar 2022 10:13:16 +0800 Subject: [PATCH 3/3] fix(dialyzer): update the specs for the rule() --- apps/emqx_rule_engine/include/rule_engine.hrl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx_rule_engine/include/rule_engine.hrl b/apps/emqx_rule_engine/include/rule_engine.hrl index 4862f970a..a8b8ae1bb 100644 --- a/apps/emqx_rule_engine/include/rule_engine.hrl +++ b/apps/emqx_rule_engine/include/rule_engine.hrl @@ -50,6 +50,7 @@ , enable := boolean() , description => binary() , created_at := integer() %% epoch in millisecond precision + , updated_at := integer() %% epoch in millisecond precision , from := list(topic()) , is_foreach := boolean() , fields := list()