Merge remote-tracking branch 'upstream/release-57' into 20240702-m-sync-r57-mix-umbrella

This commit is contained in:
zhongwencool 2024-07-03 15:15:58 +08:00
commit cfa7c3bf04
18 changed files with 483 additions and 165 deletions

View File

@ -95,4 +95,10 @@
until :: integer() until :: integer()
}). }).
%%--------------------------------------------------------------------
%% Configurations
%%--------------------------------------------------------------------
-define(KIND_REPLICATE, replicate).
-define(KIND_INITIATE, initiate).
-endif. -endif.

View File

@ -61,9 +61,12 @@
get_raw_config/2, get_raw_config/2,
update_config/2, update_config/2,
update_config/3, update_config/3,
update_config/4,
remove_config/1, remove_config/1,
remove_config/2, remove_config/2,
remove_config/3,
reset_config/2, reset_config/2,
reset_config/3,
data_dir/0, data_dir/0,
etc_file/1, etc_file/1,
cert_file/1, cert_file/1,
@ -195,7 +198,7 @@ get_raw_config(KeyPath, Default) ->
-spec update_config(emqx_utils_maps:config_key_path(), emqx_config:update_request()) -> -spec update_config(emqx_utils_maps:config_key_path(), emqx_config:update_request()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update_config(KeyPath, UpdateReq) -> update_config(KeyPath, UpdateReq) ->
update_config(KeyPath, UpdateReq, #{}). update_config(KeyPath, UpdateReq, #{}, #{}).
-spec update_config( -spec update_config(
emqx_utils_maps:config_key_path(), emqx_utils_maps:config_key_path(),
@ -203,30 +206,56 @@ update_config(KeyPath, UpdateReq) ->
emqx_config:update_opts() emqx_config:update_opts()
) -> ) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update_config([RootName | _] = KeyPath, UpdateReq, Opts) -> update_config(KeyPath, UpdateReq, Opts) ->
update_config(KeyPath, UpdateReq, Opts, #{}).
-spec update_config(
emqx_utils_maps:config_key_path(),
emqx_config:update_request(),
emqx_config:update_opts(),
emqx_config:cluster_rpc_opts()
) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update_config([RootName | _] = KeyPath, UpdateReq, Opts, ClusterRpcOpts) ->
emqx_config_handler:update_config( emqx_config_handler:update_config(
emqx_config:get_schema_mod(RootName), emqx_config:get_schema_mod(RootName),
KeyPath, KeyPath,
{{update, UpdateReq}, Opts} {{update, UpdateReq}, Opts},
ClusterRpcOpts
). ).
-spec remove_config(emqx_utils_maps:config_key_path()) -> -spec remove_config(emqx_utils_maps:config_key_path()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove_config(KeyPath) -> remove_config(KeyPath) ->
remove_config(KeyPath, #{}). remove_config(KeyPath, #{}, #{}).
-spec remove_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> -spec remove_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove_config([RootName | _] = KeyPath, Opts) -> remove_config([_RootName | _] = KeyPath, Opts) ->
remove_config(KeyPath, Opts, #{}).
-spec remove_config(
emqx_utils_maps:config_key_path(), emqx_config:update_opts(), emqx_config:cluster_rpc_opts()
) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove_config([RootName | _] = KeyPath, Opts, ClusterRpcOpts) ->
emqx_config_handler:update_config( emqx_config_handler:update_config(
emqx_config:get_schema_mod(RootName), emqx_config:get_schema_mod(RootName),
KeyPath, KeyPath,
{remove, Opts} {remove, Opts},
ClusterRpcOpts
). ).
-spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> -spec reset_config(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset_config([RootName | SubKeys] = KeyPath, Opts) -> reset_config([RootName | SubKeys] = KeyPath, Opts) ->
reset_config([RootName | SubKeys] = KeyPath, Opts, #{}).
-spec reset_config(
emqx_utils_maps:config_key_path(), emqx_config:update_opts(), emqx_config:cluster_rpc_opts()
) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset_config([RootName | SubKeys] = KeyPath, Opts, ClusterRpcOpts) ->
case emqx_config:get_default_value(KeyPath) of case emqx_config:get_default_value(KeyPath) of
{ok, Default} -> {ok, Default} ->
Mod = emqx_config:get_schema_mod(RootName), Mod = emqx_config:get_schema_mod(RootName),
@ -235,7 +264,8 @@ reset_config([RootName | SubKeys] = KeyPath, Opts) ->
emqx_config_handler:update_config( emqx_config_handler:update_config(
Mod, Mod,
KeyPath, KeyPath,
{{update, Default}, Opts} {{update, Default}, Opts},
ClusterRpcOpts
); );
false -> false ->
NewConf = NewConf =
@ -247,7 +277,8 @@ reset_config([RootName | SubKeys] = KeyPath, Opts) ->
emqx_config_handler:update_config( emqx_config_handler:update_config(
Mod, Mod,
[RootName], [RootName],
{{update, NewConf}, Opts} {{update, NewConf}, Opts},
ClusterRpcOpts
) )
end; end;
{error, _} = Error -> {error, _} = Error ->

View File

@ -118,6 +118,7 @@
config/0, config/0,
app_envs/0, app_envs/0,
update_opts/0, update_opts/0,
cluster_rpc_opts/0,
update_cmd/0, update_cmd/0,
update_args/0, update_args/0,
update_error/0, update_error/0,
@ -147,6 +148,7 @@
raw_config => emqx_config:raw_config(), raw_config => emqx_config:raw_config(),
post_config_update => #{module() => any()} post_config_update => #{module() => any()}
}. }.
-type cluster_rpc_opts() :: #{kind => ?KIND_INITIATE | ?KIND_REPLICATE}.
%% raw_config() is the config that is NOT parsed and translated by hocon schema %% raw_config() is the config that is NOT parsed and translated by hocon schema
-type raw_config() :: #{binary() => term()} | list() | undefined. -type raw_config() :: #{binary() => term()} | list() | undefined.

View File

@ -18,6 +18,7 @@
-module(emqx_config_handler). -module(emqx_config_handler).
-include("logger.hrl"). -include("logger.hrl").
-include("emqx.hrl").
-include("emqx_schema.hrl"). -include("emqx_schema.hrl").
-include_lib("hocon/include/hocon_types.hrl"). -include_lib("hocon/include/hocon_types.hrl").
@ -30,6 +31,7 @@
add_handler/2, add_handler/2,
remove_handler/1, remove_handler/1,
update_config/3, update_config/3,
update_config/4,
get_raw_cluster_override_conf/0, get_raw_cluster_override_conf/0,
info/0 info/0
]). ]).
@ -53,9 +55,13 @@
-optional_callbacks([ -optional_callbacks([
pre_config_update/3, pre_config_update/3,
pre_config_update/4,
propagated_pre_config_update/3, propagated_pre_config_update/3,
propagated_pre_config_update/4,
post_config_update/5, post_config_update/5,
propagated_post_config_update/5 post_config_update/6,
propagated_post_config_update/5,
propagated_post_config_update/6
]). ]).
-callback pre_config_update([atom()], emqx_config:update_request(), emqx_config:raw_config()) -> -callback pre_config_update([atom()], emqx_config:update_request(), emqx_config:raw_config()) ->
@ -83,6 +89,38 @@
) -> ) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}. ok | {ok, Result :: any()} | {error, Reason :: term()}.
-callback pre_config_update(
[atom()], emqx_config:update_request(), emqx_config:raw_config(), emqx_config:cluster_rpc_opts()
) ->
ok | {ok, emqx_config:update_request()} | {error, term()}.
-callback propagated_pre_config_update(
[binary()],
emqx_config:update_request(),
emqx_config:raw_config(),
emqx_config:cluster_rpc_opts()
) ->
ok | {ok, emqx_config:update_request()} | {error, term()}.
-callback post_config_update(
[atom()],
emqx_config:update_request(),
emqx_config:config(),
emqx_config:config(),
emqx_config:app_envs(),
emqx_config:cluster_rpc_opts()
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.
-callback propagated_post_config_update(
[atom()],
emqx_config:update_request(),
emqx_config:config(),
emqx_config:config(),
emqx_config:app_envs(),
emqx_config:cluster_rpc_opts()
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.
-type state() :: #{handlers := any()}. -type state() :: #{handlers := any()}.
-type config_key_path() :: emqx_utils_maps:config_key_path(). -type config_key_path() :: emqx_utils_maps:config_key_path().
@ -92,12 +130,17 @@ start_link() ->
stop() -> stop() ->
gen_server:stop(?MODULE). gen_server:stop(?MODULE).
-spec update_config(module(), config_key_path(), emqx_config:update_args()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update_config(SchemaModule, ConfKeyPath, UpdateArgs) -> update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
update_config(SchemaModule, ConfKeyPath, UpdateArgs, #{}).
-spec update_config(module(), config_key_path(), emqx_config:update_args(), map()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update_config(SchemaModule, ConfKeyPath, UpdateArgs, ClusterOpts) ->
%% force convert the path to a list of atoms, as there maybe some wildcard names/ids in the path %% force convert the path to a list of atoms, as there maybe some wildcard names/ids in the path
AtomKeyPath = [atom(Key) || Key <- ConfKeyPath], AtomKeyPath = [atom(Key) || Key <- ConfKeyPath],
gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}, infinity). gen_server:call(
?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs, ClusterOpts}, infinity
).
-spec add_handler(config_key_path(), handler_name()) -> -spec add_handler(config_key_path(), handler_name()) ->
ok | {error, {conflict, list()}}. ok | {error, {conflict, list()}}.
@ -130,11 +173,11 @@ handle_call({add_handler, ConfKeyPath, HandlerName}, _From, State = #{handlers :
{error, _Reason} = Error -> {reply, Error, State} {error, _Reason} = Error -> {reply, Error, State}
end; end;
handle_call( handle_call(
{change_config, SchemaModule, ConfKeyPath, UpdateArgs}, {change_config, SchemaModule, ConfKeyPath, UpdateArgs, ClusterRpcOpts},
_From, _From,
#{handlers := Handlers} = State #{handlers := Handlers} = State
) -> ) ->
Result = handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs), Result = handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterRpcOpts),
{reply, Result, State}; {reply, Result, State};
handle_call(get_raw_cluster_override_conf, _From, State) -> handle_call(get_raw_cluster_override_conf, _From, State) ->
Reply = emqx_config:read_override_conf(#{override_to => cluster}), Reply = emqx_config:read_override_conf(#{override_to => cluster}),
@ -203,9 +246,9 @@ filter_top_level_handlers(Handlers) ->
Handlers Handlers
). ).
handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) -> handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterRpcOpts) ->
try try
do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterRpcOpts)
catch catch
throw:Reason -> throw:Reason ->
{error, Reason}; {error, Reason};
@ -217,13 +260,14 @@ handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
update_req => UpdateArgs, update_req => UpdateArgs,
module => SchemaModule, module => SchemaModule,
key_path => ConfKeyPath, key_path => ConfKeyPath,
cluster_rpc_opts => ClusterRpcOpts,
stacktrace => ST stacktrace => ST
}), }),
{error, {config_update_crashed, Reason}} {error, {config_update_crashed, Reason}}
end. end.
do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) -> do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs, ClusterOpts) ->
case process_update_request(ConfKeyPath, Handlers, UpdateArgs) of case process_update_request(ConfKeyPath, Handlers, UpdateArgs, ClusterOpts) of
{ok, NewRawConf, OverrideConf, Opts} -> {ok, NewRawConf, OverrideConf, Opts} ->
check_and_save_configs( check_and_save_configs(
SchemaModule, SchemaModule,
@ -232,23 +276,24 @@ do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
NewRawConf, NewRawConf,
OverrideConf, OverrideConf,
UpdateArgs, UpdateArgs,
Opts Opts,
ClusterOpts
); );
{error, Result} -> {error, Result} ->
{error, Result} {error, Result}
end. end.
process_update_request([_], _Handlers, {remove, _Opts}) -> process_update_request([_], _Handlers, {remove, _Opts}, _ClusterRpcOpts) ->
{error, "remove_root_is_forbidden"}; {error, "remove_root_is_forbidden"};
process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) -> process_update_request(ConfKeyPath, _Handlers, {remove, Opts}, _ClusterRpcOpts) ->
OldRawConf = emqx_config:get_root_raw(ConfKeyPath), OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
BinKeyPath = bin_path(ConfKeyPath), BinKeyPath = bin_path(ConfKeyPath),
NewRawConf = emqx_utils_maps:deep_remove(BinKeyPath, OldRawConf), NewRawConf = emqx_utils_maps:deep_remove(BinKeyPath, OldRawConf),
OverrideConf = remove_from_override_config(BinKeyPath, Opts), OverrideConf = remove_from_override_config(BinKeyPath, Opts),
{ok, NewRawConf, OverrideConf, Opts}; {ok, NewRawConf, OverrideConf, Opts};
process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) -> process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}, ClusterRpcOpts) ->
OldRawConf = emqx_config:get_root_raw(ConfKeyPath), OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
case do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) of case do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, ClusterRpcOpts) of
{ok, NewRawConf} -> {ok, NewRawConf} ->
OverrideConf = merge_to_override_config(NewRawConf, Opts), OverrideConf = merge_to_override_config(NewRawConf, Opts),
{ok, NewRawConf, OverrideConf, Opts}; {ok, NewRawConf, OverrideConf, Opts};
@ -256,15 +301,16 @@ process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
Error Error
end. end.
do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) -> do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, ClusterRpcOpts) ->
do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, []). do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq, ClusterRpcOpts, []).
do_update_config([], Handlers, OldRawConf, UpdateReq, ConfKeyPath) -> do_update_config([], Handlers, OldRawConf, UpdateReq, ClusterRpcOpts, ConfKeyPath) ->
call_pre_config_update(#{ call_pre_config_update(#{
handlers => Handlers, handlers => Handlers,
old_raw_conf => OldRawConf, old_raw_conf => OldRawConf,
update_req => UpdateReq, update_req => UpdateReq,
conf_key_path => ConfKeyPath, conf_key_path => ConfKeyPath,
cluster_rpc_opts => ClusterRpcOpts,
callback => pre_config_update, callback => pre_config_update,
is_propagated => false is_propagated => false
}); });
@ -273,13 +319,18 @@ do_update_config(
Handlers, Handlers,
OldRawConf, OldRawConf,
UpdateReq, UpdateReq,
ClusterRpcOpts,
ConfKeyPath0 ConfKeyPath0
) -> ) ->
ConfKeyPath = ConfKeyPath0 ++ [ConfKey], ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
ConfKeyBin = bin(ConfKey), ConfKeyBin = bin(ConfKey),
SubOldRawConf = get_sub_config(ConfKeyBin, OldRawConf), SubOldRawConf = get_sub_config(ConfKeyBin, OldRawConf),
SubHandlers = get_sub_handlers(ConfKey, Handlers), SubHandlers = get_sub_handlers(ConfKey, Handlers),
case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of case
do_update_config(
SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ClusterRpcOpts, ConfKeyPath
)
of
{ok, NewUpdateReq} -> {ok, NewUpdateReq} ->
merge_to_old_config(#{ConfKeyBin => NewUpdateReq}, OldRawConf); merge_to_old_config(#{ConfKeyBin => NewUpdateReq}, OldRawConf);
Error -> Error ->
@ -293,12 +344,18 @@ check_and_save_configs(
NewRawConf, NewRawConf,
OverrideConf, OverrideConf,
UpdateArgs, UpdateArgs,
Opts Opts,
ClusterOpts
) -> ) ->
Schema = schema(SchemaModule, ConfKeyPath), Schema = schema(SchemaModule, ConfKeyPath),
Kind = maps:get(kind, ClusterOpts, ?KIND_INITIATE),
{AppEnvs, NewConf} = emqx_config:check_config(Schema, NewRawConf), {AppEnvs, NewConf} = emqx_config:check_config(Schema, NewRawConf),
OldConf = emqx_config:get_root(ConfKeyPath), OldConf = emqx_config:get_root(ConfKeyPath),
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of case
do_post_config_update(
ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, ClusterOpts, #{}
)
of
{ok, Result0} -> {ok, Result0} ->
post_update_ok( post_update_ok(
AppEnvs, AppEnvs,
@ -310,21 +367,24 @@ check_and_save_configs(
UpdateArgs, UpdateArgs,
Result0 Result0
); );
{error, {post_config_update, HandlerName, Reason}} -> {error, {post_config_update, HandlerName, Reason}} when Kind =/= ?KIND_INITIATE ->
HandlePostFailureFun = ?SLOG(critical, #{
fun() -> msg => "post_config_update_failed_but_save_the_config_anyway",
post_update_ok( handler => HandlerName,
AppEnvs, reason => Reason
NewConf, }),
NewRawConf, post_update_ok(
OverrideConf, AppEnvs,
Opts, NewConf,
ConfKeyPath, NewRawConf,
UpdateArgs, OverrideConf,
#{} Opts,
) ConfKeyPath,
end, UpdateArgs,
{error, {post_config_update, HandlerName, {Reason, HandlePostFailureFun}}} #{}
);
{error, _} = Error ->
Error
end. end.
post_update_ok(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts, ConfKeyPath, UpdateArgs, Result0) -> post_update_ok(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts, ConfKeyPath, UpdateArgs, Result0) ->
@ -332,7 +392,9 @@ post_update_ok(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts, ConfKeyPath, Up
Result1 = return_change_result(ConfKeyPath, UpdateArgs), Result1 = return_change_result(ConfKeyPath, UpdateArgs),
{ok, Result1#{post_config_update => Result0}}. {ok, Result1#{post_config_update => Result0}}.
do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) -> do_post_config_update(
ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, ClusterOpts, Result
) ->
do_post_config_update( do_post_config_update(
ConfKeyPath, ConfKeyPath,
Handlers, Handlers,
@ -340,6 +402,7 @@ do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateAr
NewConf, NewConf,
AppEnvs, AppEnvs,
UpdateArgs, UpdateArgs,
ClusterOpts,
Result, Result,
[] []
). ).
@ -352,6 +415,7 @@ do_post_config_update(
AppEnvs, AppEnvs,
UpdateArgs, UpdateArgs,
Result, Result,
ClusterOpts,
ConfKeyPath ConfKeyPath
) -> ) ->
call_post_config_update(#{ call_post_config_update(#{
@ -362,6 +426,7 @@ do_post_config_update(
update_req => up_req(UpdateArgs), update_req => up_req(UpdateArgs),
result => Result, result => Result,
conf_key_path => ConfKeyPath, conf_key_path => ConfKeyPath,
cluster_rpc_opts => ClusterOpts,
callback => post_config_update callback => post_config_update
}); });
do_post_config_update( do_post_config_update(
@ -371,6 +436,7 @@ do_post_config_update(
NewConf, NewConf,
AppEnvs, AppEnvs,
UpdateArgs, UpdateArgs,
ClusterOpts,
Result, Result,
ConfKeyPath0 ConfKeyPath0
) -> ) ->
@ -385,6 +451,7 @@ do_post_config_update(
SubNewConf, SubNewConf,
AppEnvs, AppEnvs,
UpdateArgs, UpdateArgs,
ClusterOpts,
Result, Result,
ConfKeyPath ConfKeyPath
). ).
@ -428,37 +495,41 @@ call_proper_pre_config_update(
#{ #{
handlers := #{?MOD := Module}, handlers := #{?MOD := Module},
callback := Callback, callback := Callback,
update_req := UpdateReq, update_req := UpdateReq
old_raw_conf := OldRawConf
} = Ctx } = Ctx
) -> ) ->
case erlang:function_exported(Module, Callback, 3) of Arity = get_function_arity(Module, Callback, [3, 4]),
true -> case apply_pre_config_update(Module, Callback, Arity, Ctx) of
case apply_pre_config_update(Module, Ctx) of {ok, NewUpdateReq} ->
{ok, NewUpdateReq} -> {ok, NewUpdateReq};
{ok, NewUpdateReq}; ok ->
ok -> {ok, UpdateReq};
{ok, UpdateReq}; {error, Reason} ->
{error, Reason} -> {error, {pre_config_update, Module, Reason}}
{error, {pre_config_update, Module, Reason}}
end;
false ->
merge_to_old_config(UpdateReq, OldRawConf)
end; end;
call_proper_pre_config_update( call_proper_pre_config_update(
#{update_req := UpdateReq} #{update_req := UpdateReq}
) -> ) ->
{ok, UpdateReq}. {ok, UpdateReq}.
apply_pre_config_update(Module, #{ apply_pre_config_update(Module, Callback, 3, #{
conf_key_path := ConfKeyPath,
update_req := UpdateReq,
old_raw_conf := OldRawConf
}) ->
Module:Callback(ConfKeyPath, UpdateReq, OldRawConf);
apply_pre_config_update(Module, Callback, 4, #{
conf_key_path := ConfKeyPath, conf_key_path := ConfKeyPath,
update_req := UpdateReq, update_req := UpdateReq,
old_raw_conf := OldRawConf, old_raw_conf := OldRawConf,
callback := Callback cluster_rpc_opts := ClusterRpcOpts
}) -> }) ->
Module:Callback( Module:Callback(ConfKeyPath, UpdateReq, OldRawConf, ClusterRpcOpts);
ConfKeyPath, UpdateReq, OldRawConf apply_pre_config_update(_Module, _Callback, false, #{
). update_req := UpdateReq,
old_raw_conf := OldRawConf
}) ->
merge_to_old_config(UpdateReq, OldRawConf).
propagate_pre_config_updates_to_subconf( propagate_pre_config_updates_to_subconf(
#{handlers := #{?WKEY := _}} = Ctx #{handlers := #{?WKEY := _}} = Ctx
@ -560,28 +631,23 @@ call_proper_post_config_update(
result := Result result := Result
} = Ctx } = Ctx
) -> ) ->
case erlang:function_exported(Module, Callback, 5) of Arity = get_function_arity(Module, Callback, [5, 6]),
true -> case apply_post_config_update(Module, Callback, Arity, Ctx) of
case apply_post_config_update(Module, Ctx) of ok -> {ok, Result};
ok -> {ok, Result}; {ok, Result1} -> {ok, Result#{Module => Result1}};
{ok, Result1} -> {ok, Result#{Module => Result1}}; {error, Reason} -> {error, {post_config_update, Module, Reason}}
{error, Reason} -> {error, {post_config_update, Module, Reason}}
end;
false ->
{ok, Result}
end; end;
call_proper_post_config_update( call_proper_post_config_update(
#{result := Result} = _Ctx #{result := Result} = _Ctx
) -> ) ->
{ok, Result}. {ok, Result}.
apply_post_config_update(Module, #{ apply_post_config_update(Module, Callback, 5, #{
conf_key_path := ConfKeyPath, conf_key_path := ConfKeyPath,
update_req := UpdateReq, update_req := UpdateReq,
new_conf := NewConf, new_conf := NewConf,
old_conf := OldConf, old_conf := OldConf,
app_envs := AppEnvs, app_envs := AppEnvs
callback := Callback
}) -> }) ->
Module:Callback( Module:Callback(
ConfKeyPath, ConfKeyPath,
@ -589,7 +655,25 @@ apply_post_config_update(Module, #{
NewConf, NewConf,
OldConf, OldConf,
AppEnvs AppEnvs
). );
apply_post_config_update(Module, Callback, 6, #{
conf_key_path := ConfKeyPath,
update_req := UpdateReq,
cluster_rpc_opts := ClusterRpcOpts,
new_conf := NewConf,
old_conf := OldConf,
app_envs := AppEnvs
}) ->
Module:Callback(
ConfKeyPath,
UpdateReq,
NewConf,
OldConf,
AppEnvs,
ClusterRpcOpts
);
apply_post_config_update(_Module, _Callback, false, _Ctx) ->
ok.
propagate_post_config_updates_to_subconf( propagate_post_config_updates_to_subconf(
#{handlers := #{?WKEY := _}} = Ctx #{handlers := #{?WKEY := _}} = Ctx
@ -768,7 +852,9 @@ assert_callback_function(Mod) ->
_ = apply(Mod, module_info, []), _ = apply(Mod, module_info, []),
case case
erlang:function_exported(Mod, pre_config_update, 3) orelse erlang:function_exported(Mod, pre_config_update, 3) orelse
erlang:function_exported(Mod, post_config_update, 5) erlang:function_exported(Mod, post_config_update, 5) orelse
erlang:function_exported(Mod, pre_config_update, 4) orelse
erlang:function_exported(Mod, post_config_update, 6)
of of
true -> ok; true -> ok;
false -> error(#{msg => "bad_emqx_config_handler_callback", module => Mod}) false -> error(#{msg => "bad_emqx_config_handler_callback", module => Mod})
@ -811,3 +897,13 @@ load_prev_handlers() ->
save_handlers(Handlers) -> save_handlers(Handlers) ->
application:set_env(emqx, ?MODULE, Handlers). application:set_env(emqx, ?MODULE, Handlers).
get_function_arity(_Module, _Callback, []) ->
false;
get_function_arity(Module, Callback, [Arity | Opts]) ->
%% ensure module is loaded
Module = Module:module_info(module),
case erlang:function_exported(Module, Callback, Arity) of
true -> Arity;
false -> get_function_arity(Module, Callback, Opts)
end.

View File

@ -415,14 +415,7 @@ assert_update_result(FailedPath, Update, Expect) ->
assert_update_result(Paths, UpdatePath, Update, Expect) -> assert_update_result(Paths, UpdatePath, Update, Expect) ->
with_update_result(Paths, UpdatePath, Update, fun(Old, Result) -> with_update_result(Paths, UpdatePath, Update, fun(Old, Result) ->
case Expect of ?assertEqual(Expect, Result),
{error, {post_config_update, ?MODULE, post_config_update_error}} ->
?assertMatch(
{error, {post_config_update, ?MODULE, {post_config_update_error, _}}}, Result
);
_ ->
?assertEqual(Expect, Result)
end,
New = emqx:get_raw_config(UpdatePath, undefined), New = emqx:get_raw_config(UpdatePath, undefined),
?assertEqual(Old, New) ?assertEqual(Old, New)
end). end).

View File

@ -593,7 +593,7 @@ t_quic_update_opts_fail(Config) ->
%% THEN: Reload failed but old listener is rollbacked. %% THEN: Reload failed but old listener is rollbacked.
?assertMatch( ?assertMatch(
{error, {post_config_update, emqx_listeners, {{rollbacked, {error, tls_error}}, _}}}, {error, {post_config_update, emqx_listeners, {rollbacked, {error, tls_error}}}},
UpdateResult1 UpdateResult1
), ),

View File

@ -51,7 +51,7 @@
set_feature_available/2 set_feature_available/2
]). ]).
-export([post_config_update/5, pre_config_update/3]). -export([pre_config_update/4, post_config_update/5]).
-export([ -export([
maybe_read_source_files/1, maybe_read_source_files/1,
@ -194,8 +194,8 @@ update({?CMD_DELETE, Type}, Sources) ->
update(Cmd, Sources) -> update(Cmd, Sources) ->
emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}). emqx_authz_utils:update_config(?CONF_KEY_PATH, {Cmd, Sources}).
pre_config_update(Path, Cmd, Sources) -> pre_config_update(Path, Cmd, Sources, ClusterRpcOpts) ->
try do_pre_config_update(Path, Cmd, Sources) of try do_pre_config_update(Path, Cmd, Sources, ClusterRpcOpts) of
{error, Reason} -> {error, Reason}; {error, Reason} -> {error, Reason};
NSources -> {ok, NSources} NSources -> {ok, NSources}
catch catch
@ -215,58 +215,64 @@ pre_config_update(Path, Cmd, Sources) ->
{error, Reason} {error, Reason}
end. end.
do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) -> do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources, Opts) ->
do_pre_config_update(Cmd, Sources); do_pre_config_update(Cmd, Sources, Opts);
do_pre_config_update(?ROOT_KEY, {?CMD_MERGE, NewConf}, OldConf) -> do_pre_config_update(?ROOT_KEY, {?CMD_MERGE, NewConf}, OldConf, Opts) ->
do_pre_config_merge(NewConf, OldConf); do_pre_config_merge(NewConf, OldConf, Opts);
do_pre_config_update(?ROOT_KEY, NewConf, OldConf) -> do_pre_config_update(?ROOT_KEY, NewConf, OldConf, Opts) ->
do_pre_config_replace(NewConf, OldConf). do_pre_config_replace(NewConf, OldConf, Opts).
do_pre_config_merge(NewConf, OldConf) -> do_pre_config_merge(NewConf, OldConf, Opts) ->
MergeConf = emqx_utils_maps:deep_merge(OldConf, NewConf), MergeConf = emqx_utils_maps:deep_merge(OldConf, NewConf),
NewSources = merge_sources(OldConf, NewConf), NewSources = merge_sources(OldConf, NewConf),
do_pre_config_replace(MergeConf#{<<"sources">> => NewSources}, OldConf). do_pre_config_replace(MergeConf#{<<"sources">> => NewSources}, OldConf, Opts).
%% override the entire config when updating the root key %% override the entire config when updating the root key
%% emqx_conf:update(?ROOT_KEY, Conf); %% emqx_conf:update(?ROOT_KEY, Conf);
do_pre_config_replace(Conf, Conf) -> do_pre_config_replace(Conf, Conf, _Opts) ->
Conf; Conf;
do_pre_config_replace(NewConf, OldConf) -> do_pre_config_replace(NewConf, OldConf, Opts) ->
NewSources = get_sources(NewConf), NewSources = get_sources(NewConf),
OldSources = get_sources(OldConf), OldSources = get_sources(OldConf),
ReplaceSources = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources), ReplaceSources = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources, Opts),
NewConf#{<<"sources">> => ReplaceSources}. NewConf#{<<"sources">> => ReplaceSources}.
do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) -> do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources, _Opts) ->
do_move(Cmd, Sources); do_move(Cmd, Sources);
do_pre_config_update({?CMD_PREPEND, Source}, Sources) -> do_pre_config_update({?CMD_PREPEND, Source}, Sources, _Opts) ->
NSource = maybe_write_source_files(Source), NSource = maybe_write_source_files(Source),
NSources = [NSource] ++ Sources, NSources = [NSource] ++ Sources,
ok = check_dup_types(NSources), ok = check_dup_types(NSources),
NSources; NSources;
do_pre_config_update({?CMD_APPEND, Source}, Sources) -> do_pre_config_update({?CMD_APPEND, Source}, Sources, _Opts) ->
NSource = maybe_write_source_files(Source), NSource = maybe_write_source_files(Source),
NSources = Sources ++ [NSource], NSources = Sources ++ [NSource],
ok = check_dup_types(NSources), ok = check_dup_types(NSources),
NSources; NSources;
do_pre_config_update({{?CMD_REPLACE, Type}, Source}, Sources) -> do_pre_config_update({{?CMD_REPLACE, Type}, Source}, Sources, _Opts) ->
NSource = maybe_write_source_files(Source), NSource = maybe_write_source_files(Source),
{_Old, Front, Rear} = take(Type, Sources), {_Old, Front, Rear} = take(Type, Sources),
NSources = Front ++ [NSource | Rear], NSources = Front ++ [NSource | Rear],
ok = check_dup_types(NSources), ok = check_dup_types(NSources),
NSources; NSources;
do_pre_config_update({{?CMD_DELETE, Type}, _Source}, Sources) -> do_pre_config_update({{?CMD_DELETE, Type}, _Source}, Sources, Opts) ->
{_Old, Front, Rear} = take(Type, Sources), case type_take(Type, Sources) of
NSources = Front ++ Rear, not_found ->
NSources; case emqx_cluster_rpc:is_initiator(Opts) of
do_pre_config_update({?CMD_REPLACE, Sources}, _OldSources) -> true -> throw({not_found_source, Type});
false -> Sources
end;
{_Found, NSources} ->
NSources
end;
do_pre_config_update({?CMD_REPLACE, Sources}, _OldSources, _Opts) ->
%% overwrite the entire config! %% overwrite the entire config!
NSources = lists:map(fun maybe_write_source_files/1, Sources), NSources = lists:map(fun maybe_write_source_files/1, Sources),
ok = check_dup_types(NSources), ok = check_dup_types(NSources),
NSources; NSources;
do_pre_config_update({?CMD_REORDER, NewSourcesOrder}, OldSources) -> do_pre_config_update({?CMD_REORDER, NewSourcesOrder}, OldSources, _Opts) ->
reorder_sources(NewSourcesOrder, OldSources); reorder_sources(NewSourcesOrder, OldSources);
do_pre_config_update({Op, Source}, Sources) -> do_pre_config_update({Op, Source}, Sources, _Opts) ->
throw({bad_request, #{op => Op, source => Source, sources => Sources}}). throw({bad_request, #{op => Op, source => Source, sources => Sources}}).
post_config_update(_, _, undefined, _OldSource, _AppEnvs) -> post_config_update(_, _, undefined, _OldSource, _AppEnvs) ->
@ -293,9 +299,13 @@ do_post_config_update(?CONF_KEY_PATH, {{?CMD_REPLACE, Type}, RawNewSource}, Sour
Front ++ [InitedSources] ++ Rear; Front ++ [InitedSources] ++ Rear;
do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) -> do_post_config_update(?CONF_KEY_PATH, {{?CMD_DELETE, Type}, _RawNewSource}, _Sources) ->
OldInitedSources = lookup(), OldInitedSources = lookup(),
{OldSource, Front, Rear} = take(Type, OldInitedSources), case type_take(Type, OldInitedSources) of
ok = ensure_deleted(OldSource, #{clear_metric => true}), not_found ->
Front ++ Rear; OldInitedSources;
{Found, NSources} ->
ok = ensure_deleted(Found, #{clear_metric => true}),
NSources
end;
do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) -> do_post_config_update(?CONF_KEY_PATH, {?CMD_REPLACE, _RawNewSources}, Sources) ->
overwrite_entire_sources(Sources); overwrite_entire_sources(Sources);
do_post_config_update(?CONF_KEY_PATH, {?CMD_REORDER, NewSourcesOrder}, _Sources) -> do_post_config_update(?CONF_KEY_PATH, {?CMD_REORDER, NewSourcesOrder}, _Sources) ->

View File

@ -0,0 +1,128 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authz_api_cluster_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_mgmt_api_test_util, [uri/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_placeholder.hrl").
-define(SOURCE_HTTP, #{
<<"type">> => <<"http">>,
<<"enable">> => true,
<<"url">> => <<"https://fake.com:443/acl?username=", ?PH_USERNAME/binary>>,
<<"ssl">> => #{<<"enable">> => true},
<<"headers">> => #{},
<<"method">> => <<"get">>,
<<"request_timeout">> => <<"5s">>
}).
-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
WorkDir = ?config(priv_dir, Config),
Cluster = mk_cluster_spec(#{}),
Nodes = [NodePrimary | _] = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}),
lists:foreach(fun(N) -> ?ON(N, emqx_authz_test_lib:register_fake_sources([http])) end, Nodes),
{ok, App} = ?ON(NodePrimary, emqx_common_test_http:create_default_app()),
[{api, App}, {cluster_nodes, Nodes}, {node, NodePrimary} | Config].
end_per_suite(Config) ->
ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)).
t_api(Config) ->
APINode = ?config(node, Config),
{ok, 200, Result1} = request(get, uri(["authorization", "sources"]), [], Config),
?assertEqual([], get_sources(Result1)),
{ok, 204, _} = request(post, uri(["authorization", "sources"]), ?SOURCE_HTTP, Config),
AllNodes = ?config(cluster_nodes, Config),
[OtherNode] = AllNodes -- [APINode],
lists:foreach(
fun(N) ->
?ON(
N,
?assertMatch(
[#{<<"type">> := <<"http">>}],
emqx:get_raw_config([authorization, sources])
)
)
end,
AllNodes
),
%% delete the source from the second node.
?ON(OtherNode, begin
{ok, _} = emqx:update_config([authorization], #{<<"sources">> => []}),
?assertMatch([], emqx:get_raw_config([authorization, sources]))
end),
?assertMatch(
{ok, 204, _},
request(
delete,
uri(["authorization", "sources", "http"]),
[],
Config
)
),
lists:foreach(
fun(N) ->
?ON(
N,
?assertMatch(
[],
emqx:get_raw_config([authorization, sources])
)
)
end,
AllNodes
),
?assertMatch(
{ok, 404, _},
request(
delete,
uri(["authorization", "sources", "http"]),
[],
Config
)
),
ok.
get_sources(Result) ->
maps:get(<<"sources">>, emqx_utils_json:decode(Result, [return_maps])).
mk_cluster_spec(Opts) ->
Apps = [
emqx,
{emqx_conf, "authorization {cache{enable=false},no_match=deny,sources=[]}"},
emqx_auth,
emqx_management
],
Node1Apps = Apps ++ [{emqx_dashboard, "dashboard.listeners.http {enable=true,bind=18083}"}],
Node2Apps = Apps,
[
{emqx_authz_api_cluster_SUITE1, Opts#{role => core, apps => Node1Apps}},
{emqx_authz_api_cluster_SUITE2, Opts#{role => core, apps => Node2Apps}}
].
request(Method, URL, Body, Config) ->
AuthHeader = emqx_common_test_http:auth_header(?config(api, Config)),
Opts = #{compatible_mode => true, httpc_req_opts => [{body_format, binary}]},
emqx_mgmt_api_test_util:request_api(Method, URL, [], AuthHeader, Body, Opts).

View File

@ -519,6 +519,7 @@ wait_acked(Opts) ->
%% no need to check return value; we check the property in %% no need to check return value; we check the property in
%% the check phase. this is just to give it a chance to do %% the check phase. this is just to give it a chance to do
%% so and avoid flakiness. should be fast. %% so and avoid flakiness. should be fast.
ct:pal("waiting ~b ms until acked...", [Timeout]),
Res = snabbkaffe:block_until( Res = snabbkaffe:block_until(
?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}), ?match_n_events(N, #{?snk_kind := gcp_pubsub_consumer_worker_acknowledged}),
Timeout Timeout
@ -1265,11 +1266,12 @@ t_multiple_topic_mappings(Config) ->
%% 2+ pull workers do not duplicate delivered messages %% 2+ pull workers do not duplicate delivered messages
t_multiple_pull_workers(Config) -> t_multiple_pull_workers(Config) ->
ct:timetrap({seconds, 120}), ct:timetrap({seconds, 121}),
BridgeName = ?config(consumer_name, Config), BridgeName = ?config(consumer_name, Config),
TopicMapping = ?config(topic_mapping, Config), TopicMapping = ?config(topic_mapping, Config),
ResourceId = resource_id(Config), ResourceId = resource_id(Config),
?check_trace( ?check_trace(
#{timetrap => 120_000},
begin begin
NConsumers = 3, NConsumers = 3,
start_and_subscribe_mqtt(Config), start_and_subscribe_mqtt(Config),
@ -1277,7 +1279,7 @@ t_multiple_pull_workers(Config) ->
snabbkaffe:subscribe( snabbkaffe:subscribe(
?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}), ?match_event(#{?snk_kind := "gcp_pubsub_consumer_worker_subscription_ready"}),
NConsumers, NConsumers,
40_000 infinity
), ),
{ok, _} = create_bridge( {ok, _} = create_bridge(
Config, Config,
@ -1287,6 +1289,14 @@ t_multiple_pull_workers(Config) ->
<<"ack_deadline">> => <<"10m">>, <<"ack_deadline">> => <<"10m">>,
<<"ack_retry_interval">> => <<"1s">>, <<"ack_retry_interval">> => <<"1s">>,
<<"consumer_workers_per_topic">> => NConsumers <<"consumer_workers_per_topic">> => NConsumers
},
<<"resource_opts">> => #{
%% Used by worker when patching subscritpion; we increase it a bit
%% here because (at least) the gcp emulator tends to time out /
%% throttle (?) workers targeting the same subscription, making
%% the test flakier.
<<"request_ttl">> => <<"5s">>,
<<"resume_interval">> => <<"1s">>
} }
} }
), ),
@ -1294,6 +1304,14 @@ t_multiple_pull_workers(Config) ->
[#{pubsub_topic := Topic}] = TopicMapping, [#{pubsub_topic := Topic}] = TopicMapping,
Payload = emqx_guid:to_hexstr(emqx_guid:gen()), Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
Messages = [#{<<"data">> => Payload}], Messages = [#{<<"data">> => Payload}],
?retry(
500,
20,
?assertMatch(
{ok, connected},
health_check(Config)
)
),
pubsub_publish(Config, Topic, Messages), pubsub_publish(Config, Topic, Messages),
{ok, Published} = receive_published(), {ok, Published} = receive_published(),
?assertMatch( ?assertMatch(

View File

@ -27,6 +27,7 @@
query/1, query/1,
reset/0, reset/0,
status/0, status/0,
is_initiator/1,
skip_failed_commit/1, skip_failed_commit/1,
fast_forward_to_commit/2, fast_forward_to_commit/2,
on_mria_stop/1, on_mria_stop/1,
@ -66,6 +67,7 @@
-export_type([tnx_id/0, succeed_num/0]). -export_type([tnx_id/0, succeed_num/0]).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_conf.hrl"). -include("emqx_conf.hrl").
@ -78,8 +80,6 @@
-define(INITIATE(MFA), {initiate, MFA}). -define(INITIATE(MFA), {initiate, MFA}).
-define(CATCH_UP, catch_up). -define(CATCH_UP, catch_up).
-define(TIMEOUT, timer:minutes(1)). -define(TIMEOUT, timer:minutes(1)).
-define(APPLY_KIND_REPLICATE, replicate).
-define(APPLY_KIND_INITIATE, initiate).
-define(IS_STATUS(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)). -define(IS_STATUS(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)).
-type tnx_id() :: pos_integer(). -type tnx_id() :: pos_integer().
@ -224,6 +224,9 @@ reset() -> gen_server:call(?MODULE, reset).
status() -> status() ->
transaction(fun ?MODULE:trans_status/0, []). transaction(fun ?MODULE:trans_status/0, []).
is_initiator(Opts) ->
?KIND_INITIATE =:= maps:get(kind, Opts, ?KIND_INITIATE).
%% DO NOT delete this on_leave_clean/0, It's use when rpc before v560. %% DO NOT delete this on_leave_clean/0, It's use when rpc before v560.
on_leave_clean() -> on_leave_clean() ->
on_leave_clean(node()). on_leave_clean(node()).
@ -398,7 +401,7 @@ catch_up(#{node := Node, retry_interval := RetryMs, is_leaving := false} = State
?tp(cluster_rpc_caught_up, #{}), ?tp(cluster_rpc_caught_up, #{}),
?TIMEOUT; ?TIMEOUT;
{atomic, {still_lagging, NextId, MFA}} -> {atomic, {still_lagging, NextId, MFA}} ->
{Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE), {Succeed, _} = apply_mfa(NextId, MFA, ?KIND_REPLICATE),
case Succeed orelse SkipResult of case Succeed orelse SkipResult of
true -> true ->
case transaction(fun ?MODULE:commit/2, [Node, NextId]) of case transaction(fun ?MODULE:commit/2, [Node, NextId]) of
@ -520,7 +523,7 @@ init_mfa(Node, MFA) ->
}, },
ok = mnesia:write(?CLUSTER_MFA, MFARec, write), ok = mnesia:write(?CLUSTER_MFA, MFARec, write),
ok = commit(Node, TnxId), ok = commit(Node, TnxId),
case apply_mfa(TnxId, MFA, ?APPLY_KIND_INITIATE) of case apply_mfa(TnxId, MFA, ?KIND_INITIATE) of
{true, Result} -> {ok, TnxId, Result}; {true, Result} -> {ok, TnxId, Result};
{false, Error} -> mnesia:abort(Error) {false, Error} -> mnesia:abort(Error)
end; end;
@ -571,23 +574,7 @@ trans_query(TnxId) ->
apply_mfa(TnxId, {M, F, A}, Kind) -> apply_mfa(TnxId, {M, F, A}, Kind) ->
Res = Res =
try try
case erlang:apply(M, F, A) of erlang:apply(M, F, A ++ [#{kind => Kind}])
{error, {post_config_update, HandlerName, {Reason0, PostFailureFun}}} when
Kind =/= ?APPLY_KIND_INITIATE
->
?SLOG(error, #{
msg => "post_config_update_failed",
handler => HandlerName,
reason => Reason0
}),
PostFailureFun();
{error, {post_config_update, HandlerName, {Reason0, _Fun}}} when
Kind =:= ?APPLY_KIND_INITIATE
->
{error, {post_config_update, HandlerName, Reason0}};
Result ->
Result
end
catch catch
throw:Reason -> throw:Reason ->
{error, #{reason => Reason}}; {error, #{reason => Reason}};
@ -607,7 +594,7 @@ is_success(ok) -> true;
is_success({ok, _}) -> true; is_success({ok, _}) -> true;
is_success(_) -> false. is_success(_) -> false.
log_and_alarm(IsSuccess, Res, #{kind := ?APPLY_KIND_INITIATE} = Meta) -> log_and_alarm(IsSuccess, Res, #{kind := ?KIND_INITIATE} = Meta) ->
%% no alarm or error log in case of failure at originating a new cluster-call %% no alarm or error log in case of failure at originating a new cluster-call
%% because nothing is committed %% because nothing is committed
case IsSuccess of case IsSuccess of

View File

@ -234,7 +234,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
{atomic, [_Status | L]} = emqx_cluster_rpc:status(), {atomic, [_Status | L]} = emqx_cluster_rpc:status(),
?assertEqual([], L), ?assertEqual([], L),
ets:insert(test, {other_mfa_result, ok}), ets:insert(test, {other_mfa_result, ok}),
{ok, 2, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000), {ok, 2, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], 1, 1000),
ct:sleep(1000), ct:sleep(1000),
{atomic, NewStatus} = emqx_cluster_rpc:status(), {atomic, NewStatus} = emqx_cluster_rpc:status(),
?assertEqual(3, length(NewStatus)), ?assertEqual(3, length(NewStatus)),
@ -251,7 +251,7 @@ t_commit_ok_apply_fail_on_other_node_then_recover(_Config) ->
t_del_stale_mfa(_Config) -> t_del_stale_mfa(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
MFA = {M, F, A} = {io, format, ["format:~p~n", [?FUNCTION_NAME]]}, MFA = {M, F, A} = {?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]]},
Keys = lists:seq(1, 50), Keys = lists:seq(1, 50),
Keys2 = lists:seq(51, 150), Keys2 = lists:seq(51, 150),
Ids = Ids =
@ -292,7 +292,7 @@ t_del_stale_mfa(_Config) ->
t_skip_failed_commit(_Config) -> t_skip_failed_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), {ok, 1, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
ct:sleep(180), ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(), {atomic, List1} = emqx_cluster_rpc:status(),
Node = node(), Node = node(),
@ -312,7 +312,7 @@ t_skip_failed_commit(_Config) ->
t_fast_forward_commit(_Config) -> t_fast_forward_commit(_Config) ->
{atomic, []} = emqx_cluster_rpc:status(), {atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = multicall(io, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000), {ok, 1, ok} = multicall(?MODULE, format, ["format:~p~n", [?FUNCTION_NAME]], all, 1000),
ct:sleep(180), ct:sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(), {atomic, List1} = emqx_cluster_rpc:status(),
Node = node(), Node = node(),
@ -393,22 +393,25 @@ receive_msg(Count, Msg) when Count > 0 ->
{Msg, flush_msg([])} {Msg, flush_msg([])}
end. end.
echo(Pid, Msg) -> echo(Pid, Msg, _) ->
erlang:send(Pid, Msg), erlang:send(Pid, Msg),
ok. ok.
echo_delay(Pid, Msg) -> format(Fmt, Args, _Opts) ->
io:format(Fmt, Args).
echo_delay(Pid, Msg, _) ->
timer:sleep(rand:uniform(150)), timer:sleep(rand:uniform(150)),
erlang:send(Pid, {msg, Msg, erlang:system_time(), self()}), erlang:send(Pid, {msg, Msg, erlang:system_time(), self()}),
ok. ok.
failed_on_node(Pid) -> failed_on_node(Pid, _) ->
case Pid =:= self() of case Pid =:= self() of
true -> ok; true -> ok;
false -> "MFA return not ok" false -> "MFA return not ok"
end. end.
failed_on_node_by_odd(Pid) -> failed_on_node_by_odd(Pid, _) ->
case Pid =:= self() of case Pid =:= self() of
true -> true ->
ok; ok;
@ -421,7 +424,7 @@ failed_on_node_by_odd(Pid) ->
end end
end. end.
failed_on_other_recover_after_retry(Pid) -> failed_on_other_recover_after_retry(Pid, _) ->
case Pid =:= self() of case Pid =:= self() of
true -> true ->
ok; ok;

View File

@ -343,9 +343,13 @@ t_session_serialization(_Config) ->
ok = emqx_eviction_agent:disable(test_eviction), ok = emqx_eviction_agent:disable(test_eviction),
?assertEqual( ?retry(
1, 200,
emqx_eviction_agent:session_count() 10,
?assertEqual(
1,
emqx_eviction_agent:session_count()
)
), ),
?assertMatch( ?assertMatch(

View File

@ -47,8 +47,10 @@
get_plugins/0, get_plugins/0,
install_package/2, install_package/2,
delete_package/1, delete_package/1,
delete_package/2,
describe_package/1, describe_package/1,
ensure_action/2, ensure_action/2,
ensure_action/3,
do_update_plugin_config/3 do_update_plugin_config/3
]). ]).
@ -399,7 +401,7 @@ validate_name(Name) ->
%% API CallBack Begin %% API CallBack Begin
list_plugins(get, _) -> list_plugins(get, _) ->
Nodes = emqx:running_nodes(), Nodes = emqx:running_nodes(),
{Plugins, []} = emqx_mgmt_api_plugins_proto_v2:get_plugins(Nodes), {Plugins, []} = emqx_mgmt_api_plugins_proto_v3:get_plugins(Nodes),
{200, format_plugins(Plugins)}. {200, format_plugins(Plugins)}.
get_plugins() -> get_plugins() ->
@ -451,7 +453,7 @@ upload_install(post, #{}) ->
do_install_package(FileName, Bin) -> do_install_package(FileName, Bin) ->
%% TODO: handle bad nodes %% TODO: handle bad nodes
Nodes = emqx:running_nodes(), Nodes = emqx:running_nodes(),
{[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v2:install_package(Nodes, FileName, Bin), {[_ | _] = Res, []} = emqx_mgmt_api_plugins_proto_v3:install_package(Nodes, FileName, Bin),
case lists:filter(fun(R) -> R =/= ok end, Res) of case lists:filter(fun(R) -> R =/= ok end, Res) of
[] -> [] ->
{204}; {204};
@ -477,17 +479,17 @@ do_install_package(FileName, Bin) ->
plugin(get, #{bindings := #{name := Name}}) -> plugin(get, #{bindings := #{name := Name}}) ->
Nodes = emqx:running_nodes(), Nodes = emqx:running_nodes(),
{Plugins, _} = emqx_mgmt_api_plugins_proto_v2:describe_package(Nodes, Name), {Plugins, _} = emqx_mgmt_api_plugins_proto_v3:describe_package(Nodes, Name),
case format_plugins(Plugins) of case format_plugins(Plugins) of
[Plugin] -> {200, Plugin}; [Plugin] -> {200, Plugin};
[] -> {404, #{code => 'NOT_FOUND', message => Name}} [] -> {404, #{code => 'NOT_FOUND', message => Name}}
end; end;
plugin(delete, #{bindings := #{name := Name}}) -> plugin(delete, #{bindings := #{name := Name}}) ->
Res = emqx_mgmt_api_plugins_proto_v2:delete_package(Name), Res = emqx_mgmt_api_plugins_proto_v3:delete_package(Name),
return(204, Res). return(204, Res).
update_plugin(put, #{bindings := #{name := Name, action := Action}}) -> update_plugin(put, #{bindings := #{name := Name, action := Action}}) ->
Res = emqx_mgmt_api_plugins_proto_v2:ensure_action(Name, Action), Res = emqx_mgmt_api_plugins_proto_v3:ensure_action(Name, Action),
return(204, Res). return(204, Res).
plugin_config(get, #{bindings := #{name := NameVsn}}) -> plugin_config(get, #{bindings := #{name := NameVsn}}) ->
@ -583,8 +585,12 @@ describe_package(NameVsn) ->
_ -> {Node, []} _ -> {Node, []}
end. end.
%% For RPC plugin delete %% Tip: Don't delete delete_package/1, use before v571 cluster_rpc
delete_package(Name) -> delete_package(Name) ->
delete_package(Name, #{}).
%% For RPC plugin delete
delete_package(Name, _Opts) ->
case emqx_plugins:ensure_stopped(Name) of case emqx_plugins:ensure_stopped(Name) of
ok -> ok ->
_ = emqx_plugins:ensure_disabled(Name), _ = emqx_plugins:ensure_disabled(Name),
@ -595,19 +601,24 @@ delete_package(Name) ->
Error Error
end. end.
%% Tip: Don't delete ensure_action/2, use before v571 cluster_rpc
ensure_action(Name, Action) ->
ensure_action(Name, Action, #{}).
%% for RPC plugin update %% for RPC plugin update
%% TODO: catch thrown error to return 400 %% TODO: catch thrown error to return 400
%% - plugin_not_found %% - plugin_not_found
%% - otp vsn assertion failed %% - otp vsn assertion failed
ensure_action(Name, start) ->
ensure_action(Name, start, _Opts) ->
_ = emqx_plugins:ensure_started(Name), _ = emqx_plugins:ensure_started(Name),
_ = emqx_plugins:ensure_enabled(Name), _ = emqx_plugins:ensure_enabled(Name),
ok; ok;
ensure_action(Name, stop) -> ensure_action(Name, stop, _Opts) ->
_ = emqx_plugins:ensure_stopped(Name), _ = emqx_plugins:ensure_stopped(Name),
_ = emqx_plugins:ensure_disabled(Name), _ = emqx_plugins:ensure_disabled(Name),
ok; ok;
ensure_action(Name, restart) -> ensure_action(Name, restart, _Opts) ->
_ = emqx_plugins:ensure_enabled(Name), _ = emqx_plugins:ensure_enabled(Name),
_ = emqx_plugins:restart(Name), _ = emqx_plugins:restart(Name),
ok. ok.

View File

@ -97,7 +97,7 @@ t_broker(_Config) ->
t_cluster(_Config) -> t_cluster(_Config) ->
SelfNode = node(), SelfNode = node(),
FakeNode = 'fake@127.0.0.1', FakeNode = 'fake@127.0.0.1',
MFA = {io, format, [""]}, MFA = {?MODULE, format, [""]},
meck:new(mria_mnesia, [non_strict, passthrough, no_link]), meck:new(mria_mnesia, [non_strict, passthrough, no_link]),
meck:expect(mria_mnesia, running_nodes, 0, [SelfNode, FakeNode]), meck:expect(mria_mnesia, running_nodes, 0, [SelfNode, FakeNode]),
{atomic, {ok, TnxId, _}} = {atomic, {ok, TnxId, _}} =
@ -353,3 +353,5 @@ t_autocluster_leave(Config) ->
10_000 10_000
) )
). ).
format(Str, Opts) -> io:format("str:~s: Opts:~p", [Str, Opts]).

View File

@ -50,6 +50,7 @@
remove_local/1, remove_local/1,
reset_metrics/1, reset_metrics/1,
reset_metrics_local/1, reset_metrics_local/1,
reset_metrics_local/2,
%% Create metrics for a resource ID %% Create metrics for a resource ID
create_metrics/1, create_metrics/1,
%% Delete metrics for a resource ID %% Delete metrics for a resource ID
@ -337,8 +338,13 @@ remove_local(ResId) ->
ok ok
end. end.
%% Tip: Don't delete reset_metrics_local/1, use before v572 rpc
-spec reset_metrics_local(resource_id()) -> ok. -spec reset_metrics_local(resource_id()) -> ok.
reset_metrics_local(ResId) -> reset_metrics_local(ResId) ->
reset_metrics_local(ResId, #{}).
-spec reset_metrics_local(resource_id(), map()) -> ok.
reset_metrics_local(ResId, _ClusterOpts) ->
emqx_resource_manager:reset_metrics(ResId). emqx_resource_manager:reset_metrics(ResId).
-spec reset_metrics(resource_id()) -> ok | {error, Reason :: term()}. -spec reset_metrics(resource_id()) -> ok | {error, Reason :: term()}.

View File

@ -56,7 +56,8 @@
unload_hooks_for_rule/1, unload_hooks_for_rule/1,
maybe_add_metrics_for_rule/1, maybe_add_metrics_for_rule/1,
clear_metrics_for_rule/1, clear_metrics_for_rule/1,
reset_metrics_for_rule/1 reset_metrics_for_rule/1,
reset_metrics_for_rule/2
]). ]).
%% exported for `emqx_telemetry' %% exported for `emqx_telemetry'
@ -302,8 +303,13 @@ maybe_add_metrics_for_rule(Id) ->
clear_metrics_for_rule(Id) -> clear_metrics_for_rule(Id) ->
ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id). ok = emqx_metrics_worker:clear_metrics(rule_metrics, Id).
%% Tip: Don't delete reset_metrics_for_rule/1, use before v572 rpc
-spec reset_metrics_for_rule(rule_id()) -> ok. -spec reset_metrics_for_rule(rule_id()) -> ok.
reset_metrics_for_rule(Id) -> reset_metrics_for_rule(Id) ->
reset_metrics_for_rule(Id, #{}).
-spec reset_metrics_for_rule(rule_id(), map()) -> ok.
reset_metrics_for_rule(Id, _Opts) ->
emqx_metrics_worker:reset_metrics(rule_metrics, Id). emqx_metrics_worker:reset_metrics(rule_metrics, Id).
unload_hooks_for_rule(#{id := Id, from := Topics}) -> unload_hooks_for_rule(#{id := Id, from := Topics}) ->

View File

@ -77,7 +77,7 @@ is_existing_type(SchemaName, Path) ->
-spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}. -spec get_schema(schema_name()) -> {ok, map()} | {error, not_found}.
get_schema(SchemaName) -> get_schema(SchemaName) ->
case try
emqx_config:get( emqx_config:get(
[?CONF_KEY_ROOT, schemas, schema_name_bin_to_atom(SchemaName)], undefined [?CONF_KEY_ROOT, schemas, schema_name_bin_to_atom(SchemaName)], undefined
) )
@ -86,6 +86,9 @@ get_schema(SchemaName) ->
{error, not_found}; {error, not_found};
Config -> Config ->
{ok, Config} {ok, Config}
catch
throw:not_found ->
{error, not_found}
end. end.
-spec add_schema(schema_name(), schema()) -> ok | {error, term()}. -spec add_schema(schema_name(), schema()) -> ok | {error, term()}.
@ -340,7 +343,7 @@ to_bin(A) when is_atom(A) -> atom_to_binary(A);
to_bin(B) when is_binary(B) -> B. to_bin(B) when is_binary(B) -> B.
schema_name_bin_to_atom(Bin) when size(Bin) > 255 -> schema_name_bin_to_atom(Bin) when size(Bin) > 255 ->
erlang:throw( throw(
iolist_to_binary( iolist_to_binary(
io_lib:format( io_lib:format(
"Name is is too long." "Name is is too long."
@ -351,4 +354,9 @@ schema_name_bin_to_atom(Bin) when size(Bin) > 255 ->
) )
); );
schema_name_bin_to_atom(Bin) -> schema_name_bin_to_atom(Bin) ->
binary_to_atom(Bin, utf8). try
binary_to_existing_atom(Bin, utf8)
catch
error:badarg ->
throw(not_found)
end.

View File

@ -223,6 +223,13 @@ t_crud(Config) ->
%% no schemas at first %% no schemas at first
?assertMatch({ok, 200, []}, request(get)), ?assertMatch({ok, 200, []}, request(get)),
?assertMatch(
{ok, 404, #{
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Schema not found">>
}},
request({get, <<"some_name_that_is_not_an_atom_yet">>})
),
?assertMatch( ?assertMatch(
{ok, 404, #{ {ok, 404, #{
<<"code">> := <<"NOT_FOUND">>, <<"code">> := <<"NOT_FOUND">>,