diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 83db8e480..f53b71901 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -41,13 +41,6 @@ -define(MOD, {mod}). -define(WKEY, '?'). --define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL), - try [safe_atom(Key) || Key <- PATH] of - AtomKeyPath -> EXP - catch - error:badarg -> EXP_ON_FAIL - end). - -type handler_name() :: module(). -type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}. @@ -76,8 +69,9 @@ stop() -> -spec update_config(module(), emqx_config:config_key_path(), emqx_config:update_args()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update_config(SchemaModule, ConfKeyPath, UpdateArgs) -> - ?ATOM_CONF_PATH(ConfKeyPath, gen_server:call(?MODULE, {change_config, SchemaModule, - AtomKeyPath, UpdateArgs}), {error, {not_found, ConfKeyPath}}). + %% force covert the path to a list of atoms, as there maybe some wildcard names/ids in the path + AtomKeyPath = [atom(Key) || Key <- ConfKeyPath], + gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}). -spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok. add_handler(ConfKeyPath, HandlerName) -> @@ -310,9 +304,9 @@ bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath]. bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(B) when is_binary(B) -> B. -safe_atom(Bin) when is_binary(Bin) -> - binary_to_existing_atom(Bin, latin1); -safe_atom(Str) when is_list(Str) -> - list_to_existing_atom(Str); -safe_atom(Atom) when is_atom(Atom) -> +atom(Bin) when is_binary(Bin) -> + binary_to_atom(Bin, utf8); +atom(Str) when is_list(Str) -> + list_to_atom(Str); +atom(Atom) when is_atom(Atom) -> Atom. diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 8cb325e20..2f3601646 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -27,6 +27,7 @@ start(_StartType, _StartArgs) -> {ok, Sup}. stop(_State) -> + emqx_config_handler:remove_handler(emqx_bridge:config_key_path()), ok = emqx_bridge:unload_hook(), ok. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 1038af4b7..7ed8d19d6 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -17,6 +17,7 @@ -module(emqx_rule_engine). -behaviour(gen_server). +-behaviour(emqx_config_handler). -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -24,6 +25,10 @@ -export([start_link/0]). +-export([ post_config_update/4 + , config_key_path/0 + ]). + %% Rule Management -export([ load_rules/0 @@ -66,14 +71,33 @@ -define(T_CALL, 10000). -%%------------------------------------------------------------------------------ -%% Start the gen_server -%%------------------------------------------------------------------------------ +-define(FOREACH_RULE(RULES, EXPR), + lists:foreach(fun({ID0, _ITEM}) -> + ID = bin(ID0), + EXPR + end, maps:to_list(RULES))). + +config_key_path() -> + [rule_engine, rules]. -spec(start_link() -> {ok, pid()} | ignore | {error, Reason :: term()}). start_link() -> gen_server:start_link({local, ?RULE_ENGINE}, ?MODULE, [], []). +%%------------------------------------------------------------------------------ +%% The config handler for emqx_rule_engine +%%------------------------------------------------------------------------------ +post_config_update(_Req, NewRules, OldRules, _AppEnvs) -> + #{added := Added, removed := Removed, changed := Updated} + = emqx_map_lib:diff_maps(NewRules, OldRules), + ?FOREACH_RULE(Updated, begin + {_Old, New} = _ITEM, + {ok, _} = update_rule(New#{id => ID}) + end), + ?FOREACH_RULE(Removed, ok = delete_rule(ID)), + ?FOREACH_RULE(Added, {ok, _} = create_rule(_ITEM#{id => ID})), + {ok, get_rules()}. + %%------------------------------------------------------------------------------ %% APIs for rules %%------------------------------------------------------------------------------ @@ -81,23 +105,23 @@ start_link() -> -spec load_rules() -> ok. load_rules() -> lists:foreach(fun({Id, Rule}) -> - {ok, _} = create_rule(Rule#{id => Id}) + {ok, _} = create_rule(Rule#{id => bin(Id)}) end, maps:to_list(emqx:get_config([rule_engine, rules], #{}))). -spec create_rule(map()) -> {ok, rule()} | {error, term()}. -create_rule(Params = #{id := RuleId}) -> +create_rule(Params = #{id := RuleId}) when is_binary(RuleId) -> case get_rule(RuleId) of not_found -> do_create_rule(Params); {ok, _} -> {error, {already_exists, RuleId}} end. -spec update_rule(map()) -> {ok, rule()} | {error, term()}. -update_rule(Params = #{id := RuleId}) -> +update_rule(Params = #{id := RuleId}) when is_binary(RuleId) -> ok = delete_rule(RuleId), do_create_rule(Params). -spec(delete_rule(RuleId :: rule_id()) -> ok). -delete_rule(RuleId) -> +delete_rule(RuleId) when is_binary(RuleId) -> gen_server:call(?RULE_ENGINE, {delete_rule, RuleId}, ?T_CALL). -spec(insert_rule(Rule :: rule()) -> ok). @@ -259,3 +283,6 @@ parse_output_func(Func) when is_function(Func) -> get_all_records(Tab) -> [Rule#{id => Id} || {Id, Rule} <- ets:tab2list(Tab)]. + +bin(A) when is_atom(A) -> atom_to_binary(A, utf8); +bin(B) when is_binary(B) -> B. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 851946094..3c1eaba67 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -245,13 +245,22 @@ crud_rules(get, _Params) -> Records = emqx_rule_engine:get_rules_ordered_by_ts(), {200, format_rule_resp(Records)}; -crud_rules(post, #{body := Params}) -> - ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:create_rule(CheckedParams) of - {ok, Rule} -> {201, format_rule_resp(Rule)}; - {error, Reason} -> - ?SLOG(error, #{msg => "create_rule_failed", reason => Reason}), - {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} - end). +crud_rules(post, #{body := #{<<"id">> := Id} = Params}) -> + ConfPath = emqx_rule_engine:config_key_path() ++ [Id], + case emqx_rule_engine:get_rule(Id) of + {ok, _Rule} -> + {400, #{code => 'BAD_ARGS', message => <<"rule id already exists">>}}; + not_found -> + case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of + {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> + [Rule] = [R || R = #{id := Id0} <- AllRules, Id0 == Id], + {201, format_rule_resp(Rule)}; + {error, Reason} -> + ?SLOG(error, #{msg => "create_rule_failed", + id => Id, reason => Reason}), + {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} + end + end. rule_test(post, #{body := Params}) -> ?CHECK_PARAMS(Params, rule_test, case emqx_rule_sqltester:test(CheckedParams) of @@ -267,20 +276,27 @@ crud_rules_by_id(get, #{bindings := #{id := Id}}) -> {404, #{code => 'NOT_FOUND', message => <<"Rule Id Not Found">>}} end; -crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params0}) -> - Params = maps:merge(Params0, #{id => Id}), - ?CHECK_PARAMS(Params, rule_creation, case emqx_rule_engine:update_rule(CheckedParams) of - {ok, Rule} -> {200, format_rule_resp(Rule)}; +crud_rules_by_id(put, #{bindings := #{id := Id}, body := Params}) -> + ConfPath = emqx_rule_engine:config_key_path() ++ [Id], + case emqx:update_config(ConfPath, maps:remove(<<"id">>, Params), #{}) of + {ok, #{post_config_update := #{emqx_rule_engine := AllRules}}} -> + [Rule] = [R || R = #{id := Id0} <- AllRules, Id0 == Id], + {200, format_rule_resp(Rule)}; {error, Reason} -> ?SLOG(error, #{msg => "update_rule_failed", - id => Id, - reason => Reason}), + id => Id, reason => Reason}), {400, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} - end); + end; crud_rules_by_id(delete, #{bindings := #{id := Id}}) -> - ok = emqx_rule_engine:delete_rule(Id), - {200}. + ConfPath = emqx_rule_engine:config_key_path() ++ [Id], + case emqx:remove_config(ConfPath, #{}) of + {ok, _} -> {200}; + {error, Reason} -> + ?SLOG(error, #{msg => "delete_rule_failed", + id => Id, reason => Reason}), + {500, #{code => 'BAD_ARGS', message => ?ERR_BADARGS(Reason)}} + end. %%------------------------------------------------------------------------------ %% Internal functions diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index 04e644d02..e9ca443ec 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -29,7 +29,9 @@ start(_Type, _Args) -> ok = emqx_rule_events:reload(), SupRet = emqx_rule_engine_sup:start_link(), ok = emqx_rule_engine:load_rules(), + emqx_config_handler:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine), SupRet. stop(_State) -> + emqx_config_handler:remove_handler(emqx_rule_engine:config_key_path()), ok = emqx_rule_events:unload(). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index 0e679f0ca..8634cd41d 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -6,11 +6,14 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(CONF_DEFAULT, <<"rule_engine {rules {}}">>). + all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> application:load(emqx_machine), + ok = emqx_config:init_load(emqx_rule_engine_schema, ?CONF_DEFAULT), ok = emqx_ct_helpers:start_apps([emqx_rule_engine]), Config. @@ -35,6 +38,9 @@ t_crud_rule_api(_Config) -> <<"sql">> => <<"SELECT * from \"t/1\"">> }, {201, Rule} = emqx_rule_engine_api:crud_rules(post, #{body => Params0}), + %% if we post again with the same params, it return with 400 "rule id already exists" + ?assertMatch({400, #{code := _, message := _Message}}, + emqx_rule_engine_api:crud_rules(post, #{body => Params0})), ?assertEqual(RuleID, maps:get(id, Rule)), {200, Rules} = emqx_rule_engine_api:crud_rules(get, #{}),