diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 73e1cd144..82cded4d1 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -95,4 +95,10 @@ until :: integer() }). +%%-------------------------------------------------------------------- +%% Configurations +%%-------------------------------------------------------------------- +-define(KIND_REPLICATE, replicate). +-define(KIND_INITIATE, initiate). + -endif. diff --git a/apps/emqx/src/emqx.erl b/apps/emqx/src/emqx.erl index ab33c6dbc..097583ed2 100644 --- a/apps/emqx/src/emqx.erl +++ b/apps/emqx/src/emqx.erl @@ -61,9 +61,12 @@ get_raw_config/2, update_config/2, update_config/3, + update_config/4, remove_config/1, remove_config/2, + remove_config/3, reset_config/2, + reset_config/3, data_dir/0, etc_file/1, cert_file/1, @@ -195,7 +198,7 @@ get_raw_config(KeyPath, Default) -> -spec update_config(emqx_utils_maps:config_key_path(), emqx_config:update_request()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update_config(KeyPath, UpdateReq) -> - update_config(KeyPath, UpdateReq, #{}). + update_config(KeyPath, UpdateReq, #{}, #{}). -spec update_config( emqx_utils_maps:config_key_path(), @@ -203,30 +206,56 @@ update_config(KeyPath, UpdateReq) -> emqx_config:update_opts() ) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -update_config([RootName | _] = KeyPath, UpdateReq, Opts) -> +update_config(KeyPath, UpdateReq, Opts) -> + update_config(KeyPath, UpdateReq, Opts, #{}). + +-spec update_config( + emqx_utils_maps:config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts(), + emqx_config:cluster_rpc_opts() +) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +update_config([RootName | _] = KeyPath, UpdateReq, Opts, ClusterRpcOpts) -> emqx_config_handler:update_config( emqx_config:get_schema_mod(RootName), KeyPath, - {{update, UpdateReq}, Opts} + {{update, UpdateReq}, Opts}, + ClusterRpcOpts ). -spec remove_config(emqx_utils_maps:config_key_path()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. remove_config(KeyPath) -> - remove_config(KeyPath, #{}). + remove_config(KeyPath, #{}, #{}). -spec remove_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. -remove_config([RootName | _] = KeyPath, Opts) -> +remove_config([_RootName | _] = KeyPath, Opts) -> + remove_config(KeyPath, Opts, #{}). + +-spec remove_config( + emqx_utils_maps:config_key_path(), emqx_config:update_opts(), emqx_config:cluster_rpc_opts() +) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +remove_config([RootName | _] = KeyPath, Opts, ClusterRpcOpts) -> emqx_config_handler:update_config( emqx_config:get_schema_mod(RootName), KeyPath, - {remove, Opts} + {remove, Opts}, + ClusterRpcOpts ). -spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. reset_config([RootName | SubKeys] = KeyPath, Opts) -> + reset_config([RootName | SubKeys] = KeyPath, Opts, #{}). + +-spec reset_config( + emqx_utils_maps:config_key_path(), emqx_config:update_opts(), emqx_config:cluster_rpc_opts() +) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +reset_config([RootName | SubKeys] = KeyPath, Opts, ClusterRpcOpts) -> case emqx_config:get_default_value(KeyPath) of {ok, Default} -> Mod = emqx_config:get_schema_mod(RootName), @@ -235,7 +264,8 @@ reset_config([RootName | SubKeys] = KeyPath, Opts) -> emqx_config_handler:update_config( Mod, KeyPath, - {{update, Default}, Opts} + {{update, Default}, Opts}, + ClusterRpcOpts ); false -> NewConf = @@ -247,7 +277,8 @@ reset_config([RootName | SubKeys] = KeyPath, Opts) -> emqx_config_handler:update_config( Mod, [RootName], - {{update, NewConf}, Opts} + {{update, NewConf}, Opts}, + ClusterRpcOpts ) end; {error, _} = Error -> diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 9da453260..aeaa4c9e2 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -118,6 +118,7 @@ config/0, app_envs/0, update_opts/0, + cluster_rpc_opts/0, update_cmd/0, update_args/0, update_error/0, @@ -147,6 +148,7 @@ raw_config => emqx_config:raw_config(), post_config_update => #{module() => any()} }. +-type cluster_rpc_opts() :: #{kind => ?KIND_INITIATE | ?KIND_REPLICATE}. %% raw_config() is the config that is NOT parsed and translated by hocon schema -type raw_config() :: #{binary() => term()} | list() | undefined. diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index b6fd63e26..729dccc28 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -18,6 +18,7 @@ -module(emqx_config_handler). -include("logger.hrl"). +-include("emqx.hrl"). -include("emqx_schema.hrl"). -include_lib("hocon/include/hocon_types.hrl"). @@ -30,6 +31,7 @@ add_handler/2, remove_handler/1, update_config/3, + update_config/4, get_raw_cluster_override_conf/0, info/0 ]). @@ -53,9 +55,13 @@ -optional_callbacks([ pre_config_update/3, + pre_config_update/4, propagated_pre_config_update/3, + propagated_pre_config_update/4, post_config_update/5, - propagated_post_config_update/5 + post_config_update/6, + propagated_post_config_update/5, + propagated_post_config_update/6 ]). -callback pre_config_update([atom()], emqx_config:update_request(), emqx_config:raw_config()) -> @@ -83,6 +89,38 @@ ) -> ok | {ok, Result :: any()} | {error, Reason :: term()}. +-callback pre_config_update( + [atom()], emqx_config:update_request(), emqx_config:raw_config(), emqx_config:cluster_rpc_opts() +) -> + ok | {ok, emqx_config:update_request()} | {error, term()}. +-callback propagated_pre_config_update( + [binary()], + emqx_config:update_request(), + emqx_config:raw_config(), + emqx_config:cluster_rpc_opts() +) -> + ok | {ok, emqx_config:update_request()} | {error, term()}. + +-callback post_config_update( + [atom()], + emqx_config:update_request(), + emqx_config:config(), + emqx_config:config(), + emqx_config:app_envs(), + emqx_config:cluster_rpc_opts() +) -> + ok | {ok, Result :: any()} | {error, Reason :: term()}. + +-callback propagated_post_config_update( + [atom()], + emqx_config:update_request(), + emqx_config:config(), + emqx_config:config(), + emqx_config:app_envs(), + emqx_config:cluster_rpc_opts() +) -> + ok | {ok, Result :: any()} | {error, Reason :: term()}. + -type state() :: #{handlers := any()}. -type config_key_path() :: emqx_utils_maps:config_key_path(). @@ -92,12 +130,17 @@ start_link() -> stop() -> gen_server:stop(?MODULE). --spec update_config(module(), config_key_path(), emqx_config:update_args()) -> - {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update_config(SchemaModule, ConfKeyPath, UpdateArgs) -> + update_config(SchemaModule, ConfKeyPath, UpdateArgs, #{}). + +-spec update_config(module(), config_key_path(), emqx_config:update_args(), map()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +update_config(SchemaModule, ConfKeyPath, UpdateArgs, ClusterOpts) -> %% force convert the path to a list of atoms, as there maybe some wildcard names/ids in the path AtomKeyPath = [atom(Key) || Key <- ConfKeyPath], - gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}, infinity). + gen_server:call( + ?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs, ClusterOpts}, infinity + ). -spec add_handler(config_key_path(), handler_name()) -> ok | {error, {conflict, list()}}. @@ -130,11 +173,11 @@ handle_call({add_handler, ConfKeyPath, HandlerName}, _From, State = #{handlers : {error, _Reason} = Error -> {reply, Error, State} end; handle_call( - {change_config, SchemaModule, ConfKeyPath, UpdateArgs}, + {change_config, SchemaModule, ConfKeyPath, UpdateArgs, ClusterRpcOpts}, _From, #{handlers := Handlers} = State ) -> - Result = handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs), + Result = handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterRpcOpts), {reply, Result, State}; handle_call(get_raw_cluster_override_conf, _From, State) -> Reply = emqx_config:read_override_conf(#{override_to => cluster}), @@ -203,9 +246,9 @@ filter_top_level_handlers(Handlers) -> Handlers ). -handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) -> +handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterRpcOpts) -> try - do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) + do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterRpcOpts) catch throw:Reason -> {error, Reason}; @@ -217,13 +260,14 @@ handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) -> update_req => UpdateArgs, module => SchemaModule, key_path => ConfKeyPath, + cluster_rpc_opts => ClusterRpcOpts, stacktrace => ST }), {error, {config_update_crashed, Reason}} end. -do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) -> - case process_update_request(ConfKeyPath, Handlers, UpdateArgs) of +do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterOpts) -> + case process_update_request(ConfKeyPath, Handlers, UpdateArgs, ClusterOpts) of {ok, NewRawConf, OverrideConf, Opts} -> check_and_save_configs( SchemaModule, @@ -232,23 +276,24 @@ do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) -> NewRawConf, OverrideConf, UpdateArgs, - Opts + Opts, + ClusterOpts ); {error, Result} -> {error, Result} end. -process_update_request([_], _Handlers, {remove, _Opts}) -> +process_update_request([_], _Handlers, {remove, _Opts}, _ClusterRpcOpts) -> {error, "remove_root_is_forbidden"}; -process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) -> +process_update_request(ConfKeyPath, _Handlers, {remove, Opts}, _ClusterRpcOpts) -> OldRawConf = emqx_config:get_root_raw(ConfKeyPath), BinKeyPath = bin_path(ConfKeyPath), NewRawConf = emqx_utils_maps:deep_remove(BinKeyPath, OldRawConf), OverrideConf = remove_from_override_config(BinKeyPath, Opts), {ok, NewRawConf, OverrideConf, Opts}; -process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) -> +process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}, ClusterRpcOpts) -> OldRawConf = emqx_config:get_root_raw(ConfKeyPath), - case do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) of + case do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, ClusterRpcOpts) of {ok, NewRawConf} -> OverrideConf = merge_to_override_config(NewRawConf, Opts), {ok, NewRawConf, OverrideConf, Opts}; @@ -256,15 +301,16 @@ process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) -> Error end. -do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) -> - do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, []). +do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, ClusterRpcOpts) -> + do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, ClusterRpcOpts, []). -do_update_config([], Handlers, OldRawConf, UpdateReq, ConfKeyPath) -> +do_update_config([], Handlers, OldRawConf, UpdateReq, ClusterRpcOpts, ConfKeyPath) -> call_pre_config_update(#{ handlers => Handlers, old_raw_conf => OldRawConf, update_req => UpdateReq, conf_key_path => ConfKeyPath, + cluster_rpc_opts => ClusterRpcOpts, callback => pre_config_update, is_propagated => false }); @@ -273,13 +319,18 @@ do_update_config( Handlers, OldRawConf, UpdateReq, + ClusterRpcOpts, ConfKeyPath0 ) -> ConfKeyPath = ConfKeyPath0 ++ [ConfKey], ConfKeyBin = bin(ConfKey), SubOldRawConf = get_sub_config(ConfKeyBin, OldRawConf), SubHandlers = get_sub_handlers(ConfKey, Handlers), - case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of + case + do_update_config( + SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ClusterRpcOpts, ConfKeyPath + ) + of {ok, NewUpdateReq} -> merge_to_old_config(#{ConfKeyBin => NewUpdateReq}, OldRawConf); Error -> @@ -293,12 +344,18 @@ check_and_save_configs( NewRawConf, OverrideConf, UpdateArgs, - Opts + Opts, + ClusterOpts ) -> Schema = schema(SchemaModule, ConfKeyPath), + Kind = maps:get(kind, ClusterOpts, ?KIND_INITIATE), {AppEnvs, NewConf} = emqx_config:check_config(Schema, NewRawConf), OldConf = emqx_config:get_root(ConfKeyPath), - case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of + case + do_post_config_update( + ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, ClusterOpts, #{} + ) + of {ok, Result0} -> post_update_ok( AppEnvs, @@ -310,21 +367,24 @@ check_and_save_configs( UpdateArgs, Result0 ); - {error, {post_config_update, HandlerName, Reason}} -> - HandlePostFailureFun = - fun() -> - post_update_ok( - AppEnvs, - NewConf, - NewRawConf, - OverrideConf, - Opts, - ConfKeyPath, - UpdateArgs, - #{} - ) - end, - {error, {post_config_update, HandlerName, {Reason, HandlePostFailureFun}}} + {error, {post_config_update, HandlerName, Reason}} when Kind =/= ?KIND_INITIATE -> + ?SLOG(critical, #{ + msg => "post_config_update_failed_but_save_the_config_anyway", + handler => HandlerName, + reason => Reason + }), + post_update_ok( + AppEnvs, + NewConf, + NewRawConf, + OverrideConf, + Opts, + ConfKeyPath, + UpdateArgs, + #{} + ); + {error, _} = Error -> + Error end. post_update_ok(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts, ConfKeyPath, UpdateArgs, Result0) -> @@ -332,7 +392,9 @@ post_update_ok(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts, ConfKeyPath, Up Result1 = return_change_result(ConfKeyPath, UpdateArgs), {ok, Result1#{post_config_update => Result0}}. -do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) -> +do_post_config_update( + ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, ClusterOpts, Result +) -> do_post_config_update( ConfKeyPath, Handlers, @@ -340,6 +402,7 @@ do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateAr NewConf, AppEnvs, UpdateArgs, + ClusterOpts, Result, [] ). @@ -352,6 +415,7 @@ do_post_config_update( AppEnvs, UpdateArgs, Result, + ClusterOpts, ConfKeyPath ) -> call_post_config_update(#{ @@ -362,6 +426,7 @@ do_post_config_update( update_req => up_req(UpdateArgs), result => Result, conf_key_path => ConfKeyPath, + cluster_rpc_opts => ClusterOpts, callback => post_config_update }); do_post_config_update( @@ -371,6 +436,7 @@ do_post_config_update( NewConf, AppEnvs, UpdateArgs, + ClusterOpts, Result, ConfKeyPath0 ) -> @@ -385,6 +451,7 @@ do_post_config_update( SubNewConf, AppEnvs, UpdateArgs, + ClusterOpts, Result, ConfKeyPath ). @@ -428,37 +495,41 @@ call_proper_pre_config_update( #{ handlers := #{?MOD := Module}, callback := Callback, - update_req := UpdateReq, - old_raw_conf := OldRawConf + update_req := UpdateReq } = Ctx ) -> - case erlang:function_exported(Module, Callback, 3) of - true -> - case apply_pre_config_update(Module, Ctx) of - {ok, NewUpdateReq} -> - {ok, NewUpdateReq}; - ok -> - {ok, UpdateReq}; - {error, Reason} -> - {error, {pre_config_update, Module, Reason}} - end; - false -> - merge_to_old_config(UpdateReq, OldRawConf) + Arity = get_function_arity(Module, Callback, [3, 4]), + case apply_pre_config_update(Module, Callback, Arity, Ctx) of + {ok, NewUpdateReq} -> + {ok, NewUpdateReq}; + ok -> + {ok, UpdateReq}; + {error, Reason} -> + {error, {pre_config_update, Module, Reason}} end; call_proper_pre_config_update( #{update_req := UpdateReq} ) -> {ok, UpdateReq}. -apply_pre_config_update(Module, #{ +apply_pre_config_update(Module, Callback, 3, #{ + conf_key_path := ConfKeyPath, + update_req := UpdateReq, + old_raw_conf := OldRawConf +}) -> + Module:Callback(ConfKeyPath, UpdateReq, OldRawConf); +apply_pre_config_update(Module, Callback, 4, #{ conf_key_path := ConfKeyPath, update_req := UpdateReq, old_raw_conf := OldRawConf, - callback := Callback + cluster_rpc_opts := ClusterRpcOpts }) -> - Module:Callback( - ConfKeyPath, UpdateReq, OldRawConf - ). + Module:Callback(ConfKeyPath, UpdateReq, OldRawConf, ClusterRpcOpts); +apply_pre_config_update(_Module, _Callback, false, #{ + update_req := UpdateReq, + old_raw_conf := OldRawConf +}) -> + merge_to_old_config(UpdateReq, OldRawConf). propagate_pre_config_updates_to_subconf( #{handlers := #{?WKEY := _}} = Ctx @@ -560,28 +631,23 @@ call_proper_post_config_update( result := Result } = Ctx ) -> - case erlang:function_exported(Module, Callback, 5) of - true -> - case apply_post_config_update(Module, Ctx) of - ok -> {ok, Result}; - {ok, Result1} -> {ok, Result#{Module => Result1}}; - {error, Reason} -> {error, {post_config_update, Module, Reason}} - end; - false -> - {ok, Result} + Arity = get_function_arity(Module, Callback, [5, 6]), + case apply_post_config_update(Module, Callback, Arity, Ctx) of + ok -> {ok, Result}; + {ok, Result1} -> {ok, Result#{Module => Result1}}; + {error, Reason} -> {error, {post_config_update, Module, Reason}} end; call_proper_post_config_update( #{result := Result} = _Ctx ) -> {ok, Result}. -apply_post_config_update(Module, #{ +apply_post_config_update(Module, Callback, 5, #{ conf_key_path := ConfKeyPath, update_req := UpdateReq, new_conf := NewConf, old_conf := OldConf, - app_envs := AppEnvs, - callback := Callback + app_envs := AppEnvs }) -> Module:Callback( ConfKeyPath, @@ -589,7 +655,25 @@ apply_post_config_update(Module, #{ NewConf, OldConf, AppEnvs - ). + ); +apply_post_config_update(Module, Callback, 6, #{ + conf_key_path := ConfKeyPath, + update_req := UpdateReq, + cluster_rpc_opts := ClusterRpcOpts, + new_conf := NewConf, + old_conf := OldConf, + app_envs := AppEnvs +}) -> + Module:Callback( + ConfKeyPath, + UpdateReq, + NewConf, + OldConf, + AppEnvs, + ClusterRpcOpts + ); +apply_post_config_update(_Module, _Callback, false, _Ctx) -> + ok. propagate_post_config_updates_to_subconf( #{handlers := #{?WKEY := _}} = Ctx @@ -768,7 +852,9 @@ assert_callback_function(Mod) -> _ = apply(Mod, module_info, []), case erlang:function_exported(Mod, pre_config_update, 3) orelse - erlang:function_exported(Mod, post_config_update, 5) + erlang:function_exported(Mod, post_config_update, 5) orelse + erlang:function_exported(Mod, pre_config_update, 4) orelse + erlang:function_exported(Mod, post_config_update, 6) of true -> ok; false -> error(#{msg => "bad_emqx_config_handler_callback", module => Mod}) @@ -811,3 +897,13 @@ load_prev_handlers() -> save_handlers(Handlers) -> application:set_env(emqx, ?MODULE, Handlers). + +get_function_arity(_Module, _Callback, []) -> + false; +get_function_arity(Module, Callback, [Arity | Opts]) -> + %% ensure module is loaded + Module = Module:module_info(module), + case erlang:function_exported(Module, Callback, Arity) of + true -> Arity; + false -> get_function_arity(Module, Callback, Opts) + end. diff --git a/apps/emqx/test/emqx_config_handler_SUITE.erl b/apps/emqx/test/emqx_config_handler_SUITE.erl index 1e36143b3..d71122c90 100644 --- a/apps/emqx/test/emqx_config_handler_SUITE.erl +++ b/apps/emqx/test/emqx_config_handler_SUITE.erl @@ -415,14 +415,7 @@ assert_update_result(FailedPath, Update, Expect) -> assert_update_result(Paths, UpdatePath, Update, Expect) -> with_update_result(Paths, UpdatePath, Update, fun(Old, Result) -> - case Expect of - {error, {post_config_update, ?MODULE, post_config_update_error}} -> - ?assertMatch( - {error, {post_config_update, ?MODULE, {post_config_update_error, _}}}, Result - ); - _ -> - ?assertEqual(Expect, Result) - end, + ?assertEqual(Expect, Result), New = emqx:get_raw_config(UpdatePath, undefined), ?assertEqual(Old, New) end). diff --git a/apps/emqx/test/emqx_listeners_SUITE.erl b/apps/emqx/test/emqx_listeners_SUITE.erl index ba84699c6..9f3107213 100644 --- a/apps/emqx/test/emqx_listeners_SUITE.erl +++ b/apps/emqx/test/emqx_listeners_SUITE.erl @@ -593,7 +593,7 @@ t_quic_update_opts_fail(Config) -> %% THEN: Reload failed but old listener is rollbacked. ?assertMatch( - {error, {post_config_update, emqx_listeners, {{rollbacked, {error, tls_error}}, _}}}, + {error, {post_config_update, emqx_listeners, {rollbacked, {error, tls_error}}}}, UpdateResult1 ), diff --git a/apps/emqx_auth/src/emqx_authz/emqx_authz.erl b/apps/emqx_auth/src/emqx_authz/emqx_authz.erl index e7594ed6b..99fdfaf5c 100644 --- a/apps/emqx_auth/src/emqx_authz/emqx_authz.erl +++ b/apps/emqx_auth/src/emqx_authz/emqx_authz.erl @@ -51,7 +51,7 @@ set_feature_available/2 ]). --export([post_config_update/5, pre_config_update/3]). +-export([pre_config_update/4, post_config_update/5]). -export([ maybe_read_source_files/1, @@ -194,8 +194,8 @@ update({?CMD_DELETE, Type}, Sources) -> update(Cmd, Sources) -> emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}). -pre_config_update(Path, Cmd, Sources) -> - try do_pre_config_update(Path, Cmd, Sources) of +pre_config_update(Path, Cmd, Sources, ClusterRpcOpts) -> + try do_pre_config_update(Path, Cmd, Sources, ClusterRpcOpts) of {error, Reason} -> {error, Reason}; NSources -> {ok, NSources} catch @@ -215,58 +215,64 @@ pre_config_update(Path, Cmd, Sources) -> {error, Reason} end. -do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) -> - do_pre_config_update(Cmd, Sources); -do_pre_config_update(?ROOT_KEY, {?CMD_MERGE, NewConf}, OldConf) -> - do_pre_config_merge(NewConf, OldConf); -do_pre_config_update(?ROOT_KEY, NewConf, OldConf) -> - do_pre_config_replace(NewConf, OldConf). +do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources, Opts) -> + do_pre_config_update(Cmd, Sources, Opts); +do_pre_config_update(?ROOT_KEY, {?CMD_MERGE, NewConf}, OldConf, Opts) -> + do_pre_config_merge(NewConf, OldConf, Opts); +do_pre_config_update(?ROOT_KEY, NewConf, OldConf, Opts) -> + do_pre_config_replace(NewConf, OldConf, Opts). -do_pre_config_merge(NewConf, OldConf) -> +do_pre_config_merge(NewConf, OldConf, Opts) -> MergeConf = emqx_utils_maps:deep_merge(OldConf, NewConf), NewSources = merge_sources(OldConf, NewConf), - do_pre_config_replace(MergeConf#{<<"sources">> => NewSources}, OldConf). + do_pre_config_replace(MergeConf#{<<"sources">> => NewSources}, OldConf, Opts). %% override the entire config when updating the root key %% emqx_conf:update(?ROOT_KEY, Conf); -do_pre_config_replace(Conf, Conf) -> +do_pre_config_replace(Conf, Conf, _Opts) -> Conf; -do_pre_config_replace(NewConf, OldConf) -> +do_pre_config_replace(NewConf, OldConf, Opts) -> NewSources = get_sources(NewConf), OldSources = get_sources(OldConf), - ReplaceSources = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources), + ReplaceSources = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources, Opts), NewConf#{<<"sources">> => ReplaceSources}. -do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) -> +do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources, _Opts) -> do_move(Cmd, Sources); -do_pre_config_update({?CMD_PREPEND, Source}, Sources) -> +do_pre_config_update({?CMD_PREPEND, Source}, Sources, _Opts) -> NSource = maybe_write_source_files(Source), NSources = [NSource] ++ Sources, ok = check_dup_types(NSources), NSources; -do_pre_config_update({?CMD_APPEND, Source}, Sources) -> +do_pre_config_update({?CMD_APPEND, Source}, Sources, _Opts) -> NSource = maybe_write_source_files(Source), NSources = Sources ++ [NSource], ok = check_dup_types(NSources), NSources; -do_pre_config_update({{?CMD_REPLACE, Type}, Source}, Sources) -> +do_pre_config_update({{?CMD_REPLACE, Type}, Source}, Sources, _Opts) -> NSource = maybe_write_source_files(Source), {_Old, Front, Rear} = take(Type, Sources), NSources = Front ++ [NSource | Rear], ok = check_dup_types(NSources), NSources; -do_pre_config_update({{?CMD_DELETE, Type}, _Source}, Sources) -> - {_Old, Front, Rear} = take(Type, Sources), - NSources = Front ++ Rear, - NSources; -do_pre_config_update({?CMD_REPLACE, Sources}, _OldSources) -> +do_pre_config_update({{?CMD_DELETE, Type}, _Source}, Sources, Opts) -> + case type_take(Type, Sources) of + not_found -> + case emqx_cluster_rpc:is_initiator(Opts) of + true -> throw({not_found_source, Type}); + false -> Sources + end; + {_Found, NSources} -> + NSources + end; +do_pre_config_update({?CMD_REPLACE, Sources}, _OldSources, _Opts) -> %% overwrite the entire config! NSources = lists:map(fun maybe_write_source_files/1, Sources), ok = check_dup_types(NSources), NSources; -do_pre_config_update({?CMD_REORDER, NewSourcesOrder}, OldSources) -> +do_pre_config_update({?CMD_REORDER, NewSourcesOrder}, OldSources, _Opts) -> reorder_sources(NewSourcesOrder, OldSources); -do_pre_config_update({Op, Source}, Sources) -> +do_pre_config_update({Op, Source}, Sources, _Opts) -> throw({bad_request, #{op => Op, source => Source, sources => Sources}}). post_config_update(_, _, undefined, _OldSource, _AppEnvs) -> @@ -293,9 +299,13 @@ do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sour Front ++ [InitedSources] ++ Rear; do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> OldInitedSources = lookup(), - {OldSource, Front, Rear} = take(Type, OldInitedSources), - ok = ensure_deleted(OldSource, #{clear_metric => true}), - Front ++ Rear; + case type_take(Type, OldInitedSources) of + not_found -> + OldInitedSources; + {Found, NSources} -> + ok = ensure_deleted(Found, #{clear_metric => true}), + NSources + end; do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) -> overwrite_entire_sources(Sources); do_post_config_update(?CONF_KEY_PATH, {?CMD_REORDER, NewSourcesOrder}, _Sources) -> diff --git a/apps/emqx_auth/test/emqx_authz/emqx_authz_api_cluster_SUITE.erl b/apps/emqx_auth/test/emqx_authz/emqx_authz_api_cluster_SUITE.erl new file mode 100644 index 000000000..a77b3df51 --- /dev/null +++ b/apps/emqx_auth/test/emqx_authz/emqx_authz_api_cluster_SUITE.erl @@ -0,0 +1,128 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_authz_api_cluster_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_mgmt_api_test_util, [uri/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("emqx/include/emqx_placeholder.hrl"). + +-define(SOURCE_HTTP, #{ + <<"type">> => <<"http">>, + <<"enable">> => true, + <<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>, + <<"ssl">> => #{<<"enable">> => true}, + <<"headers">> => #{}, + <<"method">> => <<"get">>, + <<"request_timeout">> => <<"5s">> +}). +-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = ?config(priv_dir, Config), + Cluster = mk_cluster_spec(#{}), + Nodes = [NodePrimary | _] = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}), + lists:foreach(fun(N) -> ?ON(N, emqx_authz_test_lib:register_fake_sources([http])) end, Nodes), + {ok, App} = ?ON(NodePrimary, emqx_common_test_http:create_default_app()), + [{api, App}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)). + +t_api(Config) -> + APINode = ?config(node, Config), + {ok, 200, Result1} = request(get, uri(["authorization", "sources"]), [], Config), + ?assertEqual([], get_sources(Result1)), + {ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE_HTTP, Config), + AllNodes = ?config(cluster_nodes, Config), + [OtherNode] = AllNodes -- [APINode], + lists:foreach( + fun(N) -> + ?ON( + N, + ?assertMatch( + [#{<<"type">> := <<"http">>}], + emqx:get_raw_config([authorization, sources]) + ) + ) + end, + AllNodes + ), + %% delete the source from the second node. + ?ON(OtherNode, begin + {ok, _} = emqx:update_config([authorization], #{<<"sources">> => []}), + ?assertMatch([], emqx:get_raw_config([authorization, sources])) + end), + ?assertMatch( + {ok, 204, _}, + request( + delete, + uri(["authorization", "sources", "http"]), + [], + Config + ) + ), + lists:foreach( + fun(N) -> + ?ON( + N, + ?assertMatch( + [], + emqx:get_raw_config([authorization, sources]) + ) + ) + end, + AllNodes + ), + ?assertMatch( + {ok, 404, _}, + request( + delete, + uri(["authorization", "sources", "http"]), + [], + Config + ) + ), + ok. + +get_sources(Result) -> + maps:get(<<"sources">>, emqx_utils_json:decode(Result, [return_maps])). + +mk_cluster_spec(Opts) -> + Apps = [ + emqx, + {emqx_conf, "authorization {cache{enable=false},no_match=deny,sources=[]}"}, + emqx_auth, + emqx_management + ], + Node1Apps = Apps ++ [{emqx_dashboard, "dashboard.listeners.http {enable=true,bind=18083}"}], + Node2Apps = Apps, + [ + {emqx_authz_api_cluster_SUITE1, Opts#{role => core, apps => Node1Apps}}, + {emqx_authz_api_cluster_SUITE2, Opts#{role => core, apps => Node2Apps}} + ]. + +request(Method, URL, Body, Config) -> + AuthHeader = emqx_common_test_http:auth_header(?config(api, Config)), + Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]}, + emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts). diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 37f052f56..e42505b49 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -27,6 +27,7 @@ query/1, reset/0, status/0, + is_initiator/1, skip_failed_commit/1, fast_forward_to_commit/2, on_mria_stop/1, @@ -66,6 +67,7 @@ -export_type([tnx_id/0, succeed_num/0]). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include("emqx_conf.hrl"). @@ -78,8 +80,6 @@ -define(INITIATE(MFA), {initiate, MFA}). -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). --define(APPLY_KIND_REPLICATE, replicate). --define(APPLY_KIND_INITIATE, initiate). -define(IS_STATUS(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)). -type tnx_id() :: pos_integer(). @@ -224,6 +224,9 @@ reset() -> gen_server:call(?MODULE, reset). status() -> transaction(fun ?MODULE:trans_status/0, []). +is_initiator(Opts) -> + ?KIND_INITIATE =:= maps:get(kind, Opts, ?KIND_INITIATE). + %% DO NOT delete this on_leave_clean/0, It's use when rpc before v560. on_leave_clean() -> on_leave_clean(node()). @@ -398,7 +401,7 @@ catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State ?tp(cluster_rpc_caught_up, #{}), ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> - {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE), + {Succeed, _} = apply_mfa(NextId, MFA, ?KIND_REPLICATE), case Succeed orelse SkipResult of true -> case transaction(fun ?MODULE:commit/2, [Node, NextId]) of @@ -520,7 +523,7 @@ init_mfa(Node, MFA) -> }, ok = mnesia:write(?CLUSTER_MFA, MFARec, write), ok = commit(Node, TnxId), - case apply_mfa(TnxId, MFA, ?APPLY_KIND_INITIATE) of + case apply_mfa(TnxId, MFA, ?KIND_INITIATE) of {true, Result} -> {ok, TnxId, Result}; {false, Error} -> mnesia:abort(Error) end; @@ -571,23 +574,7 @@ trans_query(TnxId) -> apply_mfa(TnxId, {M, F, A}, Kind) -> Res = try - case erlang:apply(M, F, A) of - {error, {post_config_update, HandlerName, {Reason0, PostFailureFun}}} when - Kind =/= ?APPLY_KIND_INITIATE - -> - ?SLOG(error, #{ - msg => "post_config_update_failed", - handler => HandlerName, - reason => Reason0 - }), - PostFailureFun(); - {error, {post_config_update, HandlerName, {Reason0, _Fun}}} when - Kind =:= ?APPLY_KIND_INITIATE - -> - {error, {post_config_update, HandlerName, Reason0}}; - Result -> - Result - end + erlang:apply(M, F, A ++ [#{kind => Kind}]) catch throw:Reason -> {error, #{reason => Reason}}; @@ -607,7 +594,7 @@ is_success(ok) -> true; is_success({ok, _}) -> true; is_success(_) -> false. -log_and_alarm(IsSuccess, Res, #{kind := ?APPLY_KIND_INITIATE} = Meta) -> +log_and_alarm(IsSuccess, Res, #{kind := ?KIND_INITIATE} = Meta) -> %% no alarm or error log in case of failure at originating a new cluster-call %% because nothing is committed case IsSuccess of diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index cfdc5820e..97b80ab6b 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -234,7 +234,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> {atomic, [_Status | L]} = emqx_cluster_rpc:status(), ?assertEqual([], L), ets:insert(test, {other_mfa_result, ok}), - {ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000), + {ok, 2, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000), ct:sleep(1000), {atomic, NewStatus} = emqx_cluster_rpc:status(), ?assertEqual(3, length(NewStatus)), @@ -251,7 +251,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) -> t_del_stale_mfa(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]}, + MFA = {M, F, A} = {?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]]}, Keys = lists:seq(1, 50), Keys2 = lists:seq(51, 150), Ids = @@ -292,7 +292,7 @@ t_del_stale_mfa(_Config) -> t_skip_failed_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), + {ok, 1, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -312,7 +312,7 @@ t_skip_failed_commit(_Config) -> t_fast_forward_commit(_Config) -> {atomic, []} = emqx_cluster_rpc:status(), - {ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), + {ok, 1, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), ct:sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), @@ -393,22 +393,25 @@ receive_msg(Count, Msg) when Count > 0 -> {Msg, flush_msg([])} end. -echo(Pid, Msg) -> +echo(Pid, Msg, _) -> erlang:send(Pid, Msg), ok. -echo_delay(Pid, Msg) -> +format(Fmt, Args, _Opts) -> + io:format(Fmt, Args). + +echo_delay(Pid, Msg, _) -> timer:sleep(rand:uniform(150)), erlang:send(Pid, {msg, Msg, erlang:system_time(), self()}), ok. -failed_on_node(Pid) -> +failed_on_node(Pid, _) -> case Pid =:= self() of true -> ok; false -> "MFA return not ok" end. -failed_on_node_by_odd(Pid) -> +failed_on_node_by_odd(Pid, _) -> case Pid =:= self() of true -> ok; @@ -421,7 +424,7 @@ failed_on_node_by_odd(Pid) -> end end. -failed_on_other_recover_after_retry(Pid) -> +failed_on_other_recover_after_retry(Pid, _) -> case Pid =:= self() of true -> ok; diff --git a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl index 6c7f9783c..5e6d07a49 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_plugins.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_plugins.erl @@ -47,8 +47,10 @@ get_plugins/0, install_package/2, delete_package/1, + delete_package/2, describe_package/1, ensure_action/2, + ensure_action/3, do_update_plugin_config/3 ]). @@ -399,7 +401,7 @@ validate_name(Name) -> %% API CallBack Begin list_plugins(get, _) -> Nodes = emqx:running_nodes(), - {Plugins, []} = emqx_mgmt_api_plugins_proto_v2:get_plugins(Nodes), + {Plugins, []} = emqx_mgmt_api_plugins_proto_v3:get_plugins(Nodes), {200, format_plugins(Plugins)}. get_plugins() -> @@ -451,7 +453,7 @@ upload_install(post, #{}) -> do_install_package(FileName, Bin) -> %% TODO: handle bad nodes Nodes = emqx:running_nodes(), - {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v2:install_package(Nodes, FileName, Bin), + {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v3:install_package(Nodes, FileName, Bin), case lists:filter(fun(R) -> R =/= ok end, Res) of [] -> {204}; @@ -477,17 +479,17 @@ do_install_package(FileName, Bin) -> plugin(get, #{bindings := #{name := Name}}) -> Nodes = emqx:running_nodes(), - {Plugins, _} = emqx_mgmt_api_plugins_proto_v2:describe_package(Nodes, Name), + {Plugins, _} = emqx_mgmt_api_plugins_proto_v3:describe_package(Nodes, Name), case format_plugins(Plugins) of [Plugin] -> {200, Plugin}; [] -> {404, #{code => 'NOT_FOUND', message => Name}} end; plugin(delete, #{bindings := #{name := Name}}) -> - Res = emqx_mgmt_api_plugins_proto_v2:delete_package(Name), + Res = emqx_mgmt_api_plugins_proto_v3:delete_package(Name), return(204, Res). update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> - Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action), + Res = emqx_mgmt_api_plugins_proto_v3:ensure_action(Name, Action), return(204, Res). plugin_config(get, #{bindings := #{name := NameVsn}}) -> @@ -583,8 +585,12 @@ describe_package(NameVsn) -> _ -> {Node, []} end. -%% For RPC plugin delete +%% Tip: Don't delete delete_package/1, use before v571 cluster_rpc delete_package(Name) -> + delete_package(Name, #{}). + +%% For RPC plugin delete +delete_package(Name, _Opts) -> case emqx_plugins:ensure_stopped(Name) of ok -> _ = emqx_plugins:ensure_disabled(Name), @@ -595,19 +601,24 @@ delete_package(Name) -> Error end. +%% Tip: Don't delete ensure_action/2, use before v571 cluster_rpc +ensure_action(Name, Action) -> + ensure_action(Name, Action, #{}). + %% for RPC plugin update %% TODO: catch thrown error to return 400 %% - plugin_not_found %% - otp vsn assertion failed -ensure_action(Name, start) -> + +ensure_action(Name, start, _Opts) -> _ = emqx_plugins:ensure_started(Name), _ = emqx_plugins:ensure_enabled(Name), ok; -ensure_action(Name, stop) -> +ensure_action(Name, stop, _Opts) -> _ = emqx_plugins:ensure_stopped(Name), _ = emqx_plugins:ensure_disabled(Name), ok; -ensure_action(Name, restart) -> +ensure_action(Name, restart, _Opts) -> _ = emqx_plugins:ensure_enabled(Name), _ = emqx_plugins:restart(Name), ok. diff --git a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl index f896468b7..cb3451fc1 100644 --- a/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_cli_SUITE.erl @@ -97,7 +97,7 @@ t_broker(_Config) -> t_cluster(_Config) -> SelfNode = node(), FakeNode = 'fake@127.0.0.1', - MFA = {io, format, [""]}, + MFA = {?MODULE, format, [""]}, meck:new(mria_mnesia, [non_strict, passthrough, no_link]), meck:expect(mria_mnesia, running_nodes, 0, [SelfNode, FakeNode]), {atomic, {ok, TnxId, _}} = @@ -353,3 +353,5 @@ t_autocluster_leave(Config) -> 10_000 ) ). + +format(Str, Opts) -> io:format("str:~s: Opts:~p", [Str, Opts]). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 9bd6b1390..15c0a0293 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -50,6 +50,7 @@ remove_local/1, reset_metrics/1, reset_metrics_local/1, + reset_metrics_local/2, %% Create metrics for a resource ID create_metrics/1, %% Delete metrics for a resource ID @@ -337,8 +338,13 @@ remove_local(ResId) -> ok end. +%% Tip: Don't delete reset_metrics_local/1, use before v572 rpc -spec reset_metrics_local(resource_id()) -> ok. reset_metrics_local(ResId) -> + reset_metrics_local(ResId, #{}). + +-spec reset_metrics_local(resource_id(), map()) -> ok. +reset_metrics_local(ResId, _ClusterOpts) -> emqx_resource_manager:reset_metrics(ResId). -spec reset_metrics(resource_id()) -> ok | {error, Reason :: term()}. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 373dd2622..2adf511a2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -56,7 +56,8 @@ unload_hooks_for_rule/1, maybe_add_metrics_for_rule/1, clear_metrics_for_rule/1, - reset_metrics_for_rule/1 + reset_metrics_for_rule/1, + reset_metrics_for_rule/2 ]). %% exported for `emqx_telemetry' @@ -302,8 +303,13 @@ maybe_add_metrics_for_rule(Id) -> clear_metrics_for_rule(Id) -> ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id). +%% Tip: Don't delete reset_metrics_for_rule/1, use before v572 rpc -spec reset_metrics_for_rule(rule_id()) -> ok. reset_metrics_for_rule(Id) -> + reset_metrics_for_rule(Id, #{}). + +-spec reset_metrics_for_rule(rule_id(), map()) -> ok. +reset_metrics_for_rule(Id, _Opts) -> emqx_metrics_worker:reset_metrics(rule_metrics, Id). unload_hooks_for_rule(#{id := Id, from := Topics}) ->