Merge branch 'master' into improve_rule_bridge_apis
This commit is contained in:
commit
c5f92ef856
|
@ -256,7 +256,7 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
|
||||||
init_load(SchemaMod, parse_hocon(Conf));
|
init_load(SchemaMod, parse_hocon(Conf));
|
||||||
init_load(SchemaMod, RawConf) when is_map(RawConf) ->
|
init_load(SchemaMod, RawConf) when is_map(RawConf) ->
|
||||||
ok = save_schema_mod_and_names(SchemaMod),
|
ok = save_schema_mod_and_names(SchemaMod),
|
||||||
%% Merge environment varialbe overrides on top
|
%% Merge environment variable overrides on top
|
||||||
RawConfWithEnvs = merge_envs(SchemaMod, RawConf),
|
RawConfWithEnvs = merge_envs(SchemaMod, RawConf),
|
||||||
ClusterOverrides = read_override_conf(#{override_to => cluster}),
|
ClusterOverrides = read_override_conf(#{override_to => cluster}),
|
||||||
LocalOverrides = read_override_conf(#{override_to => local}),
|
LocalOverrides = read_override_conf(#{override_to => local}),
|
||||||
|
@ -267,7 +267,7 @@ init_load(SchemaMod, RawConf) when is_map(RawConf) ->
|
||||||
check_config(SchemaMod, RawConfWithOverrides , #{}),
|
check_config(SchemaMod, RawConfWithOverrides , #{}),
|
||||||
RootNames = get_root_names(),
|
RootNames = get_root_names(),
|
||||||
ok = save_to_config_map(maps:with(get_atom_root_names(), CheckedConf),
|
ok = save_to_config_map(maps:with(get_atom_root_names(), CheckedConf),
|
||||||
maps:with(RootNames, RawConfWithEnvs)).
|
maps:with(RootNames, RawConfWithOverrides)).
|
||||||
|
|
||||||
parse_hocon(Conf) ->
|
parse_hocon(Conf) ->
|
||||||
IncDirs = include_dirs(),
|
IncDirs = include_dirs(),
|
||||||
|
@ -301,7 +301,7 @@ merge_envs(SchemaMod, RawConf) ->
|
||||||
},
|
},
|
||||||
hocon_tconf:merge_env_overrides(SchemaMod, RawConf, all, Opts).
|
hocon_tconf:merge_env_overrides(SchemaMod, RawConf, all, Opts).
|
||||||
|
|
||||||
-spec check_config(module(), raw_config()) -> {AppEnvs, CheckedConf}
|
-spec check_config(hocon_schema:schema(), raw_config()) -> {AppEnvs, CheckedConf}
|
||||||
when AppEnvs :: app_envs(), CheckedConf :: config().
|
when AppEnvs :: app_envs(), CheckedConf :: config().
|
||||||
check_config(SchemaMod, RawConf) ->
|
check_config(SchemaMod, RawConf) ->
|
||||||
check_config(SchemaMod, RawConf, #{}).
|
check_config(SchemaMod, RawConf, #{}).
|
||||||
|
@ -454,7 +454,7 @@ do_get(Type, [], Default) ->
|
||||||
AccIn#{conf_key(Type0, RootName) => Conf};
|
AccIn#{conf_key(Type0, RootName) => Conf};
|
||||||
(_, AccIn) -> AccIn
|
(_, AccIn) -> AccIn
|
||||||
end, #{}, persistent_term:get()),
|
end, #{}, persistent_term:get()),
|
||||||
case map_size(AllConf) == 0 of
|
case AllConf =:= #{} of
|
||||||
true -> Default;
|
true -> Default;
|
||||||
false -> AllConf
|
false -> AllConf
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
-module(emqx_config_handler).
|
-module(emqx_config_handler).
|
||||||
|
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
@ -28,6 +29,7 @@
|
||||||
, remove_handler/1
|
, remove_handler/1
|
||||||
, update_config/3
|
, update_config/3
|
||||||
, get_raw_cluster_override_conf/0
|
, get_raw_cluster_override_conf/0
|
||||||
|
, info/0
|
||||||
, merge_to_old_config/2
|
, merge_to_old_config/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -74,9 +76,11 @@ update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
|
||||||
AtomKeyPath = [atom(Key) || Key <- ConfKeyPath],
|
AtomKeyPath = [atom(Key) || Key <- ConfKeyPath],
|
||||||
gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}, infinity).
|
gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}, infinity).
|
||||||
|
|
||||||
-spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok.
|
-spec add_handler(emqx_config:config_key_path(), handler_name()) ->
|
||||||
|
ok | {error, {conflict, list()}}.
|
||||||
add_handler(ConfKeyPath, HandlerName) ->
|
add_handler(ConfKeyPath, HandlerName) ->
|
||||||
gen_server:call(?MODULE, {add_handler, ConfKeyPath, HandlerName}, infinity).
|
assert_callback_function(HandlerName),
|
||||||
|
gen_server:call(?MODULE, {add_handler, ConfKeyPath, HandlerName}).
|
||||||
|
|
||||||
%% @doc Remove handler asynchronously
|
%% @doc Remove handler asynchronously
|
||||||
-spec remove_handler(emqx_config:config_key_path()) -> ok.
|
-spec remove_handler(emqx_config:config_key_path()) -> ok.
|
||||||
|
@ -86,19 +90,21 @@ remove_handler(ConfKeyPath) ->
|
||||||
get_raw_cluster_override_conf() ->
|
get_raw_cluster_override_conf() ->
|
||||||
gen_server:call(?MODULE, get_raw_cluster_override_conf).
|
gen_server:call(?MODULE, get_raw_cluster_override_conf).
|
||||||
|
|
||||||
|
info() ->
|
||||||
|
gen_server:call(?MODULE, info).
|
||||||
|
|
||||||
%%============================================================================
|
%%============================================================================
|
||||||
|
|
||||||
-spec init(term()) -> {ok, state()}.
|
-spec init(term()) -> {ok, state()}.
|
||||||
init(_) ->
|
init(_) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
{ok, #{handlers => #{?MOD => ?MODULE}}}.
|
Handlers = load_prev_handlers(),
|
||||||
|
{ok, #{handlers => Handlers#{?MOD => ?MODULE}}}.
|
||||||
|
|
||||||
handle_call({add_handler, ConfKeyPath, HandlerName}, _From, State = #{handlers := Handlers}) ->
|
handle_call({add_handler, ConfKeyPath, HandlerName}, _From, State = #{handlers := Handlers}) ->
|
||||||
case deep_put_handler(ConfKeyPath, Handlers, HandlerName) of
|
case deep_put_handler(ConfKeyPath, Handlers, HandlerName) of
|
||||||
{ok, NewHandlers} ->
|
{ok, NewHandlers} -> {reply, ok, State#{handlers => NewHandlers}};
|
||||||
{reply, ok, State#{handlers => NewHandlers}};
|
{error, _Reason} = Error -> {reply, Error, State}
|
||||||
Error ->
|
|
||||||
{reply, Error, State}
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
|
handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
|
||||||
|
@ -108,48 +114,62 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
|
||||||
handle_call(get_raw_cluster_override_conf, _From, State) ->
|
handle_call(get_raw_cluster_override_conf, _From, State) ->
|
||||||
Reply = emqx_config:read_override_conf(#{override_to => cluster}),
|
Reply = emqx_config:read_override_conf(#{override_to => cluster}),
|
||||||
{reply, Reply, State};
|
{reply, Reply, State};
|
||||||
|
handle_call(info, _From, State) ->
|
||||||
|
{reply, State, State};
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
Reply = ok,
|
{reply, ok, State}.
|
||||||
{reply, Reply, State}.
|
|
||||||
|
|
||||||
handle_cast({remove_handler, ConfKeyPath},
|
handle_cast({remove_handler, ConfKeyPath}, State = #{handlers := Handlers}) ->
|
||||||
State = #{handlers := Handlers}) ->
|
NewHandlers = do_remove_handler(ConfKeyPath, Handlers),
|
||||||
{noreply, State#{handlers => emqx_map_lib:deep_remove(ConfKeyPath ++ [?MOD], Handlers)}};
|
{noreply, State#{handlers => NewHandlers}};
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, _State) ->
|
terminate(_Reason, #{handlers := Handlers}) ->
|
||||||
|
save_handlers(Handlers),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
deep_put_handler([], Handlers, Mod) when is_map(Handlers) ->
|
deep_put_handler([], Handlers, Mod) ->
|
||||||
{ok, Handlers#{?MOD => Mod}};
|
{ok, Handlers#{?MOD => Mod}};
|
||||||
deep_put_handler([], _Handlers, Mod) ->
|
|
||||||
{ok, #{?MOD => Mod}};
|
|
||||||
deep_put_handler([?WKEY | KeyPath], Handlers, Mod) ->
|
|
||||||
deep_put_handler2(?WKEY, KeyPath, Handlers, Mod);
|
|
||||||
deep_put_handler([Key | KeyPath], Handlers, Mod) ->
|
deep_put_handler([Key | KeyPath], Handlers, Mod) ->
|
||||||
case maps:find(?WKEY, Handlers) of
|
|
||||||
error ->
|
|
||||||
deep_put_handler2(Key, KeyPath, Handlers, Mod);
|
|
||||||
{ok, _SubHandlers} ->
|
|
||||||
{error, {cannot_override_a_wildcard_path, [?WKEY | KeyPath]}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
deep_put_handler2(Key, KeyPath, Handlers, Mod) ->
|
|
||||||
SubHandlers = maps:get(Key, Handlers, #{}),
|
SubHandlers = maps:get(Key, Handlers, #{}),
|
||||||
case deep_put_handler(KeyPath, SubHandlers, Mod) of
|
case deep_put_handler(KeyPath, SubHandlers, Mod) of
|
||||||
{ok, SubHandlers1} ->
|
{ok, NewSubHandlers} ->
|
||||||
{ok, Handlers#{Key => SubHandlers1}};
|
NewHandlers = Handlers#{Key => NewSubHandlers},
|
||||||
Error ->
|
case check_handler_conflict(NewHandlers) of
|
||||||
Error
|
ok -> {ok, NewHandlers};
|
||||||
|
{error, Reason} -> {error, Reason}
|
||||||
|
end;
|
||||||
|
{error, _Reason} = Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% Make sure that Specify Key and ?WKEY cannot be on the same level.
|
||||||
|
%%
|
||||||
|
%% [k1, ?, ?], [k1, ?], [k1] is allow.
|
||||||
|
%% [K1, ?, k2], [k1, ?, k3] is allow.
|
||||||
|
%% [k1, ?, ?], [k1, ?, k2] is not allow.
|
||||||
|
check_handler_conflict(Handlers) ->
|
||||||
|
Keys = filter_top_level_handlers(Handlers),
|
||||||
|
case lists:member(?WKEY, Keys) of
|
||||||
|
true when length(Keys) =:= 1 -> ok;
|
||||||
|
true -> {error, {conflict, Keys}};
|
||||||
|
false -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
filter_top_level_handlers(Handlers) ->
|
||||||
|
maps:fold(
|
||||||
|
fun
|
||||||
|
(K, #{?MOD := _}, Acc) -> [K | Acc];
|
||||||
|
(_K, #{}, Acc) -> Acc;
|
||||||
|
(?MOD, _, Acc) -> Acc
|
||||||
|
end, [], Handlers).
|
||||||
|
|
||||||
handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
|
handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
|
||||||
try
|
try
|
||||||
do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs)
|
do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs)
|
||||||
|
@ -157,9 +177,12 @@ handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
|
||||||
throw : Reason ->
|
throw : Reason ->
|
||||||
{error, Reason};
|
{error, Reason};
|
||||||
Error : Reason : ST ->
|
Error : Reason : ST ->
|
||||||
?SLOG(error, #{msg => "change_config_failed",
|
?SLOG(error, #{msg => "change_config_crashed",
|
||||||
exception => Error,
|
exception => Error,
|
||||||
reason => Reason,
|
reason => Reason,
|
||||||
|
update_req => UpdateArgs,
|
||||||
|
module => SchemaModule,
|
||||||
|
key_path => ConfKeyPath,
|
||||||
stacktrace => ST
|
stacktrace => ST
|
||||||
}),
|
}),
|
||||||
{error, config_update_crashed}
|
{error, config_update_crashed}
|
||||||
|
@ -174,11 +197,12 @@ do_handle_update_request(SchemaModule, ConfKeyPath, Handlers, UpdateArgs) ->
|
||||||
{error, Result}
|
{error, Result}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
process_update_request([_], _Handlers, {remove, _Opts}) ->
|
||||||
|
{error, "remove_root_is_forbidden"};
|
||||||
process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) ->
|
process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) ->
|
||||||
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
|
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
|
||||||
BinKeyPath = bin_path(ConfKeyPath),
|
BinKeyPath = bin_path(ConfKeyPath),
|
||||||
NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
|
NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
|
||||||
_ = remove_from_local_if_cluster_change(BinKeyPath, Opts),
|
|
||||||
OverrideConf = remove_from_override_config(BinKeyPath, Opts),
|
OverrideConf = remove_from_override_config(BinKeyPath, Opts),
|
||||||
{ok, NewRawConf, OverrideConf, Opts};
|
{ok, NewRawConf, OverrideConf, Opts};
|
||||||
process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
|
process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
|
||||||
|
@ -198,25 +222,22 @@ do_update_config([], Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
|
||||||
do_update_config([ConfKey | SubConfKeyPath], Handlers, OldRawConf,
|
do_update_config([ConfKey | SubConfKeyPath], Handlers, OldRawConf,
|
||||||
UpdateReq, ConfKeyPath0) ->
|
UpdateReq, ConfKeyPath0) ->
|
||||||
ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
|
ConfKeyPath = ConfKeyPath0 ++ [ConfKey],
|
||||||
SubOldRawConf = get_sub_config(bin(ConfKey), OldRawConf),
|
ConfKeyBin = bin(ConfKey),
|
||||||
|
SubOldRawConf = get_sub_config(ConfKeyBin, OldRawConf),
|
||||||
SubHandlers = get_sub_handlers(ConfKey, Handlers),
|
SubHandlers = get_sub_handlers(ConfKey, Handlers),
|
||||||
case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of
|
case do_update_config(SubConfKeyPath, SubHandlers, SubOldRawConf, UpdateReq, ConfKeyPath) of
|
||||||
{ok, NewUpdateReq} ->
|
{ok, NewUpdateReq} -> merge_to_old_config(#{ConfKeyBin => NewUpdateReq}, OldRawConf);
|
||||||
call_pre_config_update(Handlers, OldRawConf, #{bin(ConfKey) => NewUpdateReq},
|
Error -> Error
|
||||||
ConfKeyPath);
|
|
||||||
Error ->
|
|
||||||
Error
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, OverrideConf,
|
check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, OverrideConf,
|
||||||
UpdateArgs, Opts) ->
|
UpdateArgs, Opts) ->
|
||||||
OldConf = emqx_config:get_root(ConfKeyPath),
|
OldConf = emqx_config:get_root(ConfKeyPath),
|
||||||
FullRawConf = with_full_raw_confs(NewRawConf),
|
Schema = schema(SchemaModule, ConfKeyPath),
|
||||||
{AppEnvs, CheckedConf} = emqx_config:check_config(SchemaModule, FullRawConf),
|
{AppEnvs, #{root := NewConf}} = emqx_config:check_config(Schema, #{<<"root">> => NewRawConf}),
|
||||||
NewConf = maps:with(maps:keys(OldConf), CheckedConf),
|
|
||||||
_ = remove_from_local_if_cluster_change(ConfKeyPath, Opts),
|
|
||||||
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
|
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
|
||||||
{ok, Result0} ->
|
{ok, Result0} ->
|
||||||
|
remove_from_local_if_cluster_change(ConfKeyPath, Opts),
|
||||||
case save_configs(ConfKeyPath, AppEnvs, NewConf, NewRawConf, OverrideConf,
|
case save_configs(ConfKeyPath, AppEnvs, NewConf, NewRawConf, OverrideConf,
|
||||||
UpdateArgs, Opts) of
|
UpdateArgs, Opts) of
|
||||||
{ok, Result1} ->
|
{ok, Result1} ->
|
||||||
|
@ -259,8 +280,7 @@ get_sub_config(ConfKey, Conf) when is_map(Conf) ->
|
||||||
get_sub_config(_, _Conf) -> %% the Conf is a primitive
|
get_sub_config(_, _Conf) -> %% the Conf is a primitive
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
|
call_pre_config_update(#{?MOD := HandlerName}, OldRawConf, UpdateReq, ConfKeyPath) ->
|
||||||
HandlerName = maps:get(?MOD, Handlers, undefined),
|
|
||||||
case erlang:function_exported(HandlerName, pre_config_update, 3) of
|
case erlang:function_exported(HandlerName, pre_config_update, 3) of
|
||||||
true ->
|
true ->
|
||||||
case HandlerName:pre_config_update(ConfKeyPath, UpdateReq, OldRawConf) of
|
case HandlerName:pre_config_update(ConfKeyPath, UpdateReq, OldRawConf) of
|
||||||
|
@ -268,21 +288,25 @@ call_pre_config_update(Handlers, OldRawConf, UpdateReq, ConfKeyPath) ->
|
||||||
{error, Reason} -> {error, {pre_config_update, HandlerName, Reason}}
|
{error, Reason} -> {error, {pre_config_update, HandlerName, Reason}}
|
||||||
end;
|
end;
|
||||||
false -> merge_to_old_config(UpdateReq, OldRawConf)
|
false -> merge_to_old_config(UpdateReq, OldRawConf)
|
||||||
end.
|
end;
|
||||||
|
call_pre_config_update(_Handlers, OldRawConf, UpdateReq, _ConfKeyPath) ->
|
||||||
|
merge_to_old_config(UpdateReq, OldRawConf).
|
||||||
|
|
||||||
call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result, ConfKeyPath) ->
|
call_post_config_update(#{?MOD := HandlerName}, OldConf, NewConf,
|
||||||
HandlerName = maps:get(?MOD, Handlers, undefined),
|
AppEnvs, UpdateReq, Result, ConfKeyPath) ->
|
||||||
case erlang:function_exported(HandlerName, post_config_update, 5) of
|
case erlang:function_exported(HandlerName, post_config_update, 5) of
|
||||||
true ->
|
true ->
|
||||||
case HandlerName:post_config_update(ConfKeyPath, UpdateReq, NewConf, OldConf,
|
case HandlerName:post_config_update(ConfKeyPath, UpdateReq,
|
||||||
AppEnvs) of
|
NewConf, OldConf, AppEnvs) of
|
||||||
ok -> {ok, Result};
|
ok -> {ok, Result};
|
||||||
{ok, Result1} ->
|
{ok, Result1} -> {ok, Result#{HandlerName => Result1}};
|
||||||
{ok, Result#{HandlerName => Result1}};
|
|
||||||
{error, Reason} -> {error, {post_config_update, HandlerName, Reason}}
|
{error, Reason} -> {error, {post_config_update, HandlerName, Reason}}
|
||||||
end;
|
end;
|
||||||
false -> {ok, Result}
|
false -> {ok, Result}
|
||||||
end.
|
end;
|
||||||
|
call_post_config_update(_Handlers, _OldConf, _NewConf, _AppEnvs,
|
||||||
|
_UpdateReq, Result, _ConfKeyPath) ->
|
||||||
|
{ok, Result}.
|
||||||
|
|
||||||
save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs, Opts) ->
|
save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs, Opts) ->
|
||||||
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf, Opts) of
|
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf, Opts) of
|
||||||
|
@ -295,6 +319,7 @@ save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, Update
|
||||||
%% 1. the old config is undefined
|
%% 1. the old config is undefined
|
||||||
%% 2. either the old or the new config is not of map type
|
%% 2. either the old or the new config is not of map type
|
||||||
%% the behaviour is merging the new the config to the old config if they are maps.
|
%% the behaviour is merging the new the config to the old config if they are maps.
|
||||||
|
|
||||||
merge_to_old_config(UpdateReq, RawConf) when is_map(UpdateReq), is_map(RawConf) ->
|
merge_to_old_config(UpdateReq, RawConf) when is_map(UpdateReq), is_map(RawConf) ->
|
||||||
{ok, maps:merge(RawConf, UpdateReq)};
|
{ok, maps:merge(RawConf, UpdateReq)};
|
||||||
merge_to_old_config(UpdateReq, _RawConf) ->
|
merge_to_old_config(UpdateReq, _RawConf) ->
|
||||||
|
@ -302,13 +327,12 @@ merge_to_old_config(UpdateReq, _RawConf) ->
|
||||||
|
|
||||||
%% local-override.conf priority is higher than cluster-override.conf
|
%% local-override.conf priority is higher than cluster-override.conf
|
||||||
%% If we want cluster to take effect, we must remove the local.
|
%% If we want cluster to take effect, we must remove the local.
|
||||||
remove_from_local_if_cluster_change(BinKeyPath, Opts) ->
|
remove_from_local_if_cluster_change(BinKeyPath, #{override_to := cluster} = Opts) ->
|
||||||
case maps:get(override, Opts, local) of
|
|
||||||
local -> ok;
|
|
||||||
cluster ->
|
|
||||||
Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
|
Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
|
||||||
emqx_config:save_to_override_conf(Local, Opts)
|
_ = emqx_config:save_to_override_conf(Local, Opts),
|
||||||
end.
|
ok;
|
||||||
|
remove_from_local_if_cluster_change(_BinKeyPath, _Opts) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
remove_from_override_config(_BinKeyPath, #{persistent := false}) ->
|
remove_from_override_config(_BinKeyPath, #{persistent := false}) ->
|
||||||
undefined;
|
undefined;
|
||||||
|
@ -337,9 +361,6 @@ return_rawconf(ConfKeyPath, #{rawconf_with_defaults := true}) ->
|
||||||
return_rawconf(ConfKeyPath, _) ->
|
return_rawconf(ConfKeyPath, _) ->
|
||||||
emqx_config:get_raw(ConfKeyPath).
|
emqx_config:get_raw(ConfKeyPath).
|
||||||
|
|
||||||
with_full_raw_confs(PartialConf) ->
|
|
||||||
maps:merge(emqx_config:get_raw([]), PartialConf).
|
|
||||||
|
|
||||||
bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
|
bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
|
||||||
|
|
||||||
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||||
|
@ -351,3 +372,43 @@ atom(Str) when is_list(Str) ->
|
||||||
list_to_atom(Str);
|
list_to_atom(Str);
|
||||||
atom(Atom) when is_atom(Atom) ->
|
atom(Atom) when is_atom(Atom) ->
|
||||||
Atom.
|
Atom.
|
||||||
|
|
||||||
|
-dialyzer({nowarn_function, do_remove_handler/2}).
|
||||||
|
do_remove_handler(ConfKeyPath, Handlers) ->
|
||||||
|
NewHandlers = emqx_map_lib:deep_remove(ConfKeyPath ++ [?MOD], Handlers),
|
||||||
|
remove_empty_leaf(ConfKeyPath, NewHandlers).
|
||||||
|
|
||||||
|
remove_empty_leaf([], Handlers) -> Handlers;
|
||||||
|
remove_empty_leaf(KeyPath, Handlers) ->
|
||||||
|
case emqx_map_lib:deep_find(KeyPath, Handlers) =:= {ok, #{}} of
|
||||||
|
true -> %% empty leaf
|
||||||
|
Handlers1 = emqx_map_lib:deep_remove(KeyPath, Handlers),
|
||||||
|
SubKeyPath = lists:sublist(KeyPath, length(KeyPath) - 1),
|
||||||
|
remove_empty_leaf(SubKeyPath, Handlers1);
|
||||||
|
false -> Handlers
|
||||||
|
end.
|
||||||
|
|
||||||
|
assert_callback_function(Mod) ->
|
||||||
|
case erlang:function_exported(Mod, pre_config_update, 3) orelse
|
||||||
|
erlang:function_exported(Mod, post_config_update, 5) of
|
||||||
|
true -> ok;
|
||||||
|
false -> error(#{msg => "bad_emqx_config_handler_callback", module => Mod})
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
|
||||||
|
schema(SchemaModule, [RootKey | _]) ->
|
||||||
|
Roots = hocon_schema:roots(SchemaModule),
|
||||||
|
Field =
|
||||||
|
case lists:keyfind(bin(RootKey), 1, Roots) of
|
||||||
|
{_, {Ref, ?REF(Ref)}} -> {Ref, ?R_REF(SchemaModule, Ref)};
|
||||||
|
{_, Field0} -> Field0
|
||||||
|
end,
|
||||||
|
#{roots => [root], fields => #{root => [Field]}}.
|
||||||
|
|
||||||
|
load_prev_handlers() ->
|
||||||
|
Handlers = application:get_env(emqx, ?MODULE, #{}),
|
||||||
|
application:unset_env(emqx, ?MODULE),
|
||||||
|
Handlers.
|
||||||
|
|
||||||
|
save_handlers(Handlers) ->
|
||||||
|
application:set_env(emqx, ?MODULE, Handlers).
|
||||||
|
|
|
@ -61,8 +61,8 @@ deep_find([Key | KeyPath] = Path, Map) when is_map(Map) ->
|
||||||
{ok, SubMap} -> deep_find(KeyPath, SubMap);
|
{ok, SubMap} -> deep_find(KeyPath, SubMap);
|
||||||
error -> {not_found, Path, Map}
|
error -> {not_found, Path, Map}
|
||||||
end;
|
end;
|
||||||
deep_find(_KeyPath, Data) ->
|
deep_find(KeyPath, Data) ->
|
||||||
{not_found, _KeyPath, Data}.
|
{not_found, KeyPath, Data}.
|
||||||
|
|
||||||
-spec deep_put(config_key_path(), map(), term()) -> map().
|
-spec deep_put(config_key_path(), map(), term()) -> map().
|
||||||
deep_put([], _Map, Data) ->
|
deep_put([], _Map, Data) ->
|
||||||
|
@ -152,7 +152,7 @@ diff_maps(NewMap, OldMap) ->
|
||||||
binary_string_kv(K, V, JsonableFun) ->
|
binary_string_kv(K, V, JsonableFun) ->
|
||||||
case JsonableFun(K, V) of
|
case JsonableFun(K, V) of
|
||||||
drop -> drop;
|
drop -> drop;
|
||||||
{K1, V1} -> {binary_string(K1), binary_string(V1)}
|
{K1, V1} -> {binary_string(K1), V1}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
binary_string([]) -> [];
|
binary_string([]) -> [];
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
-dialyzer(no_contracts).
|
-dialyzer(no_contracts).
|
||||||
-dialyzer(no_unused).
|
-dialyzer(no_unused).
|
||||||
-dialyzer(no_fail_call).
|
-dialyzer(no_fail_call).
|
||||||
|
-elvis([{elvis_style, invalid_dynamic_call, disable}]).
|
||||||
|
|
||||||
-include("emqx_authentication.hrl").
|
-include("emqx_authentication.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
@ -385,7 +386,8 @@ after idling for 'Keepalive * backoff * 2'."""
|
||||||
, {"max_inflight",
|
, {"max_inflight",
|
||||||
sc(range(1, 65535),
|
sc(range(1, 65535),
|
||||||
#{ default => 32,
|
#{ default => 32,
|
||||||
desc => "Maximum size of the Inflight Window storing QoS1/2 messages delivered but un-acked."
|
desc => "Maximum size of the Inflight Window storing QoS1/2"
|
||||||
|
" messages delivered but un-acked."
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
, {"retry_interval",
|
, {"retry_interval",
|
||||||
|
@ -403,7 +405,8 @@ after idling for 'Keepalive * backoff * 2'."""
|
||||||
, {"await_rel_timeout",
|
, {"await_rel_timeout",
|
||||||
sc(duration(),
|
sc(duration(),
|
||||||
#{ default => "300s",
|
#{ default => "300s",
|
||||||
desc => "The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout."
|
desc => "The QoS2 messages (Client -> Broker) will be dropped"
|
||||||
|
" if awaiting PUBREL timeout."
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
, {"session_expiry_interval",
|
, {"session_expiry_interval",
|
||||||
|
@ -1102,7 +1105,8 @@ fields("trace") ->
|
||||||
default => text,
|
default => text,
|
||||||
desc => """
|
desc => """
|
||||||
Determine the format of the payload format in the trace file.<br>
|
Determine the format of the payload format in the trace file.<br>
|
||||||
`text`: Text-based protocol or plain text protocol. It is recommended when payload is JSON encoded.<br>
|
`text`: Text-based protocol or plain text protocol.
|
||||||
|
It is recommended when payload is JSON encoded.<br>
|
||||||
`hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.<br>
|
`hex`: Binary hexadecimal encode. It is recommended when payload is a custom binary protocol.<br>
|
||||||
`hidden`: payload is obfuscated as `******`
|
`hidden`: payload is obfuscated as `******`
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -35,7 +35,9 @@
|
||||||
%% Hocon Schema
|
%% Hocon Schema
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
roots() -> [{config, #{type => hoconsc:union([hoconsc:ref(type1), hoconsc:ref(type2)])}}].
|
roots() -> [{config, #{type => hoconsc:union([
|
||||||
|
hoconsc:ref(?MODULE, type1),
|
||||||
|
hoconsc:ref(?MODULE, type2)])}}].
|
||||||
|
|
||||||
fields(type1) ->
|
fields(type1) ->
|
||||||
[ {mechanism, {enum, ['password-based']}}
|
[ {mechanism, {enum, ['password-based']}}
|
||||||
|
|
|
@ -0,0 +1,314 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2019-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_config_handler_SUITE).
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
-define(MOD, {mod}).
|
||||||
|
-define(WKEY, '?').
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
emqx_common_test_helpers:boot_modules(all),
|
||||||
|
emqx_common_test_helpers:start_apps([]),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_common_test_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
init_per_testcase(_Case, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_Case, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_handler(_Config) ->
|
||||||
|
BadCallBackMod = emqx,
|
||||||
|
RootKey = sysmon,
|
||||||
|
%% bad
|
||||||
|
?assertError(#{msg := "bad_emqx_config_handler_callback", module := BadCallBackMod},
|
||||||
|
emqx_config_handler:add_handler([RootKey], BadCallBackMod)),
|
||||||
|
%% simple
|
||||||
|
ok = emqx_config_handler:add_handler([RootKey], ?MODULE),
|
||||||
|
#{handlers := Handlers0} = emqx_config_handler:info(),
|
||||||
|
?assertMatch(#{RootKey := #{?MOD := ?MODULE}}, Handlers0),
|
||||||
|
ok = emqx_config_handler:remove_handler([RootKey]),
|
||||||
|
#{handlers := Handlers1} = emqx_config_handler:info(),
|
||||||
|
ct:pal("Key:~p simple: ~p~n", [RootKey, Handlers1]),
|
||||||
|
?assertEqual(false, maps:is_key(RootKey, Handlers1)),
|
||||||
|
%% wildcard 1
|
||||||
|
Wildcard1 = [RootKey, '?', cpu_check_interval],
|
||||||
|
ok = emqx_config_handler:add_handler(Wildcard1, ?MODULE),
|
||||||
|
#{handlers := Handlers2} = emqx_config_handler:info(),
|
||||||
|
?assertMatch(#{RootKey := #{?WKEY := #{cpu_check_interval := #{?MOD := ?MODULE}}}}, Handlers2),
|
||||||
|
ok = emqx_config_handler:remove_handler(Wildcard1),
|
||||||
|
#{handlers := Handlers3} = emqx_config_handler:info(),
|
||||||
|
ct:pal("Key:~p wildcard1: ~p~n", [Wildcard1, Handlers3]),
|
||||||
|
?assertEqual(false, maps:is_key(RootKey, Handlers3)),
|
||||||
|
|
||||||
|
%% can_override_a_wildcard_path
|
||||||
|
ok = emqx_config_handler:add_handler(Wildcard1, ?MODULE),
|
||||||
|
?assertEqual(ok, emqx_config_handler:add_handler([RootKey, os, cpu_check_interval], ?MODULE)),
|
||||||
|
ok = emqx_config_handler:remove_handler(Wildcard1),
|
||||||
|
ok = emqx_config_handler:remove_handler([RootKey, os, cpu_check_interval]),
|
||||||
|
|
||||||
|
ok = emqx_config_handler:add_handler([RootKey, os, cpu_check_interval], ?MODULE),
|
||||||
|
ok = emqx_config_handler:add_handler(Wildcard1, ?MODULE),
|
||||||
|
ok = emqx_config_handler:remove_handler([RootKey, os, cpu_check_interval]),
|
||||||
|
ok = emqx_config_handler:remove_handler(Wildcard1),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_conflict_handler(_Config) ->
|
||||||
|
ok = emqx_config_handler:add_handler([sysmon, '?', '?'], ?MODULE),
|
||||||
|
?assertMatch({error, {conflict, _}},
|
||||||
|
emqx_config_handler:add_handler([sysmon, '?', cpu_check_interval], ?MODULE)),
|
||||||
|
ok = emqx_config_handler:remove_handler([sysmon, '?', '?']),
|
||||||
|
|
||||||
|
ok = emqx_config_handler:add_handler([sysmon, '?', cpu_check_interval], ?MODULE),
|
||||||
|
?assertMatch({error, {conflict, _}},
|
||||||
|
emqx_config_handler:add_handler([sysmon, '?', '?'], ?MODULE)),
|
||||||
|
ok = emqx_config_handler:remove_handler([sysmon, '?', cpu_check_interval]),
|
||||||
|
|
||||||
|
%% override
|
||||||
|
ok = emqx_config_handler:add_handler([sysmon], emqx_logger),
|
||||||
|
?assertMatch(#{handlers := #{sysmon := #{{mod} := emqx_logger}}},
|
||||||
|
emqx_config_handler:info()),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_root_key_update(_Config) ->
|
||||||
|
PathKey = [sysmon],
|
||||||
|
Opts = #{rawconf_with_defaults => true},
|
||||||
|
ok = emqx_config_handler:add_handler(PathKey, ?MODULE),
|
||||||
|
%% update
|
||||||
|
Old = #{<<"os">> := OS} = emqx:get_raw_config(PathKey),
|
||||||
|
{ok, Res} = emqx:update_config(PathKey,
|
||||||
|
Old#{<<"os">> => OS#{<<"cpu_check_interval">> => <<"12s">>}}, Opts),
|
||||||
|
?assertMatch(#{config := #{os := #{cpu_check_interval := 12000}},
|
||||||
|
post_config_update := #{?MODULE := ok},
|
||||||
|
raw_config := #{<<"os">> := #{<<"cpu_check_interval">> := <<"12s">>}}},
|
||||||
|
Res),
|
||||||
|
?assertMatch(#{os := #{cpu_check_interval := 12000}}, emqx:get_config(PathKey)),
|
||||||
|
|
||||||
|
%% update sub key
|
||||||
|
SubKey = PathKey ++ [os, cpu_high_watermark],
|
||||||
|
?assertEqual({ok,#{config => 0.81,
|
||||||
|
post_config_update => #{?MODULE => ok},
|
||||||
|
raw_config => <<"81%">>}},
|
||||||
|
emqx:update_config(SubKey, "81%", Opts)),
|
||||||
|
?assertEqual(0.81, emqx:get_config(SubKey)),
|
||||||
|
?assertEqual("81%", emqx:get_raw_config(SubKey)),
|
||||||
|
%% remove
|
||||||
|
?assertEqual({error, "remove_root_is_forbidden"}, emqx:remove_config(PathKey)),
|
||||||
|
?assertMatch(true, is_map(emqx:get_raw_config(PathKey))),
|
||||||
|
|
||||||
|
ok = emqx_config_handler:remove_handler(PathKey),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_sub_key_update_remove(_Config) ->
|
||||||
|
KeyPath = [sysmon, os, cpu_check_interval],
|
||||||
|
Opts = #{},
|
||||||
|
ok = emqx_config_handler:add_handler(KeyPath, ?MODULE),
|
||||||
|
{ok, Res} = emqx:update_config(KeyPath, <<"60s">>, Opts),
|
||||||
|
?assertMatch(#{config := 60000,
|
||||||
|
post_config_update := #{?MODULE := ok},
|
||||||
|
raw_config := <<"60s">>},
|
||||||
|
Res),
|
||||||
|
?assertMatch(60000, emqx:get_config(KeyPath)),
|
||||||
|
|
||||||
|
KeyPath2 = [sysmon, os, cpu_low_watermark],
|
||||||
|
ok = emqx_config_handler:add_handler(KeyPath2, ?MODULE),
|
||||||
|
{ok, Res1} = emqx:update_config(KeyPath2, <<"40%">>, Opts),
|
||||||
|
?assertMatch(#{config := 0.4,
|
||||||
|
post_config_update := #{},
|
||||||
|
raw_config := <<"40%">>},
|
||||||
|
Res1),
|
||||||
|
?assertMatch(0.4, emqx:get_config(KeyPath2)),
|
||||||
|
|
||||||
|
%% remove
|
||||||
|
?assertEqual({ok,#{post_config_update => #{emqx_config_handler_SUITE => ok}}},
|
||||||
|
emqx:remove_config(KeyPath)),
|
||||||
|
?assertError({config_not_found, KeyPath}, emqx:get_raw_config(KeyPath)),
|
||||||
|
OSKey = maps:keys(emqx:get_raw_config([sysmon, os])),
|
||||||
|
?assertEqual(false, lists:member(<<"cpu_check_interval">>, OSKey)),
|
||||||
|
?assert(length(OSKey) > 0),
|
||||||
|
|
||||||
|
?assertEqual({ok,#{config => 60000,
|
||||||
|
post_config_update => #{?MODULE => ok},
|
||||||
|
raw_config => <<"60s">>}}, emqx:reset_config(KeyPath, Opts)),
|
||||||
|
OSKey1 = maps:keys(emqx:get_raw_config([sysmon, os])),
|
||||||
|
?assertEqual(true, lists:member(<<"cpu_check_interval">>, OSKey1)),
|
||||||
|
?assert(length(OSKey1) > 1),
|
||||||
|
|
||||||
|
ok = emqx_config_handler:remove_handler(KeyPath),
|
||||||
|
ok = emqx_config_handler:remove_handler(KeyPath2),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_check_failed(_Config) ->
|
||||||
|
KeyPath = [sysmon, os, cpu_check_interval],
|
||||||
|
Opts = #{rawconf_with_defaults => true},
|
||||||
|
Origin = emqx:get_raw_config(KeyPath),
|
||||||
|
ok = emqx_config_handler:add_handler(KeyPath, ?MODULE),
|
||||||
|
%% It should be a duration("1h"), but we set it as a percent.
|
||||||
|
?assertMatch({error, _Res}, emqx:update_config(KeyPath, <<"80%">>, Opts)),
|
||||||
|
New = emqx:get_raw_config(KeyPath),
|
||||||
|
?assertEqual(Origin, New),
|
||||||
|
ok = emqx_config_handler:remove_handler(KeyPath),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_stop(_Config) ->
|
||||||
|
OldPid = erlang:whereis(emqx_config_handler),
|
||||||
|
OldInfo = emqx_config_handler:info(),
|
||||||
|
emqx_config_handler:stop(),
|
||||||
|
NewPid = wait_for_new_pid(),
|
||||||
|
NewInfo = emqx_config_handler:info(),
|
||||||
|
?assertNotEqual(OldPid, NewPid),
|
||||||
|
?assertEqual(OldInfo, NewInfo),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_callback_crash(_Config) ->
|
||||||
|
CrashPath = [sysmon, os, cpu_high_watermark],
|
||||||
|
Opts = #{rawconf_with_defaults => true},
|
||||||
|
ok = emqx_config_handler:add_handler(CrashPath, ?MODULE),
|
||||||
|
Old = emqx:get_raw_config(CrashPath),
|
||||||
|
?assertEqual({error, config_update_crashed}, emqx:update_config(CrashPath, <<"89%">>, Opts)),
|
||||||
|
New = emqx:get_raw_config(CrashPath),
|
||||||
|
?assertEqual(Old, New),
|
||||||
|
ok = emqx_config_handler:remove_handler(CrashPath),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_pre_callback_error(_Config) ->
|
||||||
|
callback_error([sysmon, os, mem_check_interval], <<"100s">>,
|
||||||
|
{error, {pre_config_update, ?MODULE, pre_config_update_error}}),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_post_update_error(_Config) ->
|
||||||
|
callback_error([sysmon, os, sysmem_high_watermark], <<"60%">>,
|
||||||
|
{error, {post_config_update, ?MODULE, post_config_update_error}}),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_handler_root() ->
|
||||||
|
%% Don't rely on default emqx_config_handler's merge behaviour.
|
||||||
|
RootKey = [],
|
||||||
|
Opts = #{rawconf_with_defaults => true},
|
||||||
|
ok = emqx_config_handler:add_handler(RootKey, ?MODULE),
|
||||||
|
%% update
|
||||||
|
Old = #{<<"sysmon">> := #{<<"os">> := OS}} = emqx:get_raw_config(RootKey),
|
||||||
|
{ok, Res} = emqx:update_config(RootKey,
|
||||||
|
Old#{<<"sysmon">> => #{<<"os">> => OS#{<<"cpu_check_interval">> => <<"12s">>}}},
|
||||||
|
Opts),
|
||||||
|
?assertMatch(#{config := #{os := #{cpu_check_interval := 12000}},
|
||||||
|
post_config_update := #{?MODULE := ok},
|
||||||
|
raw_config := #{<<"os">> := #{<<"cpu_check_interval">> := <<"12s">>}}},
|
||||||
|
Res),
|
||||||
|
?assertMatch(#{sysmon := #{os := #{cpu_check_interval := 12000}}}, emqx:get_config(RootKey)),
|
||||||
|
ok = emqx_config_handler:remove_handler(RootKey),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_get_raw_cluster_override_conf(_Config) ->
|
||||||
|
Raw0 = emqx_config:read_override_conf(#{override_to => cluster}),
|
||||||
|
Raw1 = emqx_config_handler:get_raw_cluster_override_conf(),
|
||||||
|
?assertEqual(Raw0, Raw1),
|
||||||
|
OldPid = erlang:whereis(emqx_config_handler),
|
||||||
|
OldInfo = emqx_config_handler:info(),
|
||||||
|
|
||||||
|
?assertEqual(ok, gen_server:call(emqx_config_handler, bad_call_msg)),
|
||||||
|
gen_server:cast(emqx_config_handler, bad_cast_msg),
|
||||||
|
erlang:send(emqx_config_handler, bad_info_msg),
|
||||||
|
|
||||||
|
NewPid = erlang:whereis(emqx_config_handler),
|
||||||
|
NewInfo = emqx_config_handler:info(),
|
||||||
|
?assertEqual(OldPid, NewPid),
|
||||||
|
?assertEqual(OldInfo, NewInfo),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_save_config_failed(_Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_update_sub(_Config) ->
|
||||||
|
PathKey = [sysmon],
|
||||||
|
Opts = #{rawconf_with_defaults => true},
|
||||||
|
ok = emqx_config_handler:add_handler(PathKey, ?MODULE),
|
||||||
|
%% update sub key
|
||||||
|
#{<<"os">> := OS1} = emqx:get_raw_config(PathKey),
|
||||||
|
{ok, Res} = emqx:update_config(PathKey ++ [os, cpu_check_interval], <<"120s">>, Opts),
|
||||||
|
?assertMatch(#{config := 120000,
|
||||||
|
post_config_update := #{?MODULE := ok},
|
||||||
|
raw_config := <<"120s">>},
|
||||||
|
Res),
|
||||||
|
?assertMatch(#{os := #{cpu_check_interval := 120000}}, emqx:get_config(PathKey)),
|
||||||
|
#{<<"os">> := OS2} = emqx:get_raw_config(PathKey),
|
||||||
|
?assertEqual(lists:sort(maps:keys(OS1)), lists:sort(maps:keys(OS2))),
|
||||||
|
|
||||||
|
%% update sub key
|
||||||
|
SubKey = PathKey ++ [os, cpu_high_watermark],
|
||||||
|
?assertEqual({ok,#{config => 0.81,
|
||||||
|
post_config_update => #{?MODULE => ok},
|
||||||
|
raw_config => <<"81%">>}},
|
||||||
|
emqx:update_config(SubKey, "81%", Opts)),
|
||||||
|
?assertEqual(0.81, emqx:get_config(SubKey)),
|
||||||
|
?assertEqual("81%", emqx:get_raw_config(SubKey)),
|
||||||
|
|
||||||
|
ok = emqx_config_handler:remove_handler(PathKey),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
|
||||||
|
pre_config_update([sysmon], UpdateReq, _RawConf) ->
|
||||||
|
{ok, UpdateReq};
|
||||||
|
pre_config_update([sysmon, os], UpdateReq, _RawConf) ->
|
||||||
|
{ok, UpdateReq};
|
||||||
|
pre_config_update([sysmon, os, cpu_check_interval], UpdateReq, _RawConf) ->
|
||||||
|
{ok, UpdateReq};
|
||||||
|
pre_config_update([sysmon, os, cpu_low_watermark], UpdateReq, _RawConf) ->
|
||||||
|
{ok, UpdateReq};
|
||||||
|
pre_config_update([sysmon, os, sysmem_high_watermark], UpdateReq, _RawConf) ->
|
||||||
|
{ok, UpdateReq};
|
||||||
|
pre_config_update([sysmon, os, mem_check_interval], _UpdateReq, _RawConf) ->
|
||||||
|
{error, pre_config_update_error}.
|
||||||
|
|
||||||
|
post_config_update([sysmon], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
|
||||||
|
{ok, ok};
|
||||||
|
post_config_update([sysmon, os], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
|
||||||
|
{ok, ok};
|
||||||
|
post_config_update([sysmon, os, cpu_check_interval], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
|
||||||
|
{ok, ok};
|
||||||
|
post_config_update([sysmon, os, cpu_low_watermark], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
|
||||||
|
ok;
|
||||||
|
post_config_update([sysmon, os, sysmem_high_watermark], _UpdateReq, _NewConf, _OldConf, _AppEnvs) ->
|
||||||
|
{error, post_config_update_error}.
|
||||||
|
|
||||||
|
wait_for_new_pid() ->
|
||||||
|
case erlang:whereis(emqx_config_handler) of
|
||||||
|
undefined ->
|
||||||
|
ct:sleep(10),
|
||||||
|
wait_for_new_pid();
|
||||||
|
Pid -> Pid
|
||||||
|
end.
|
||||||
|
|
||||||
|
callback_error(FailedPath, Update, Error) ->
|
||||||
|
Opts = #{rawconf_with_defaults => true},
|
||||||
|
ok = emqx_config_handler:add_handler(FailedPath, ?MODULE),
|
||||||
|
Old = emqx:get_raw_config(FailedPath),
|
||||||
|
?assertEqual(Error, emqx:update_config(FailedPath, Update, Opts)),
|
||||||
|
New = emqx:get_raw_config(FailedPath),
|
||||||
|
?assertEqual(Old, New),
|
||||||
|
ok = emqx_config_handler:remove_handler(FailedPath),
|
||||||
|
ok.
|
|
@ -19,7 +19,7 @@
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, ref/1, ref/2, array/1, enum/1]).
|
-import(hoconsc, [mk/2, enum/1]).
|
||||||
-import(emqx_schema, [mk_duration/2]).
|
-import(emqx_schema, [mk_duration/2]).
|
||||||
|
|
||||||
-export([fields/1, authz_sources_types/1]).
|
-export([fields/1, authz_sources_types/1]).
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
-include("emqx_authz.hrl").
|
-include("emqx_authz.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
-import(hoconsc, [mk/1, mk/2, ref/1, ref/2, array/1, enum/1]).
|
-import(hoconsc, [mk/1, mk/2, ref/2, array/1, enum/1]).
|
||||||
|
|
||||||
-define(BAD_REQUEST, 'BAD_REQUEST').
|
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||||
-define(NOT_FOUND, 'NOT_FOUND').
|
-define(NOT_FOUND, 'NOT_FOUND').
|
||||||
|
@ -83,26 +83,33 @@ schema("/authorization/sources") ->
|
||||||
, get =>
|
, get =>
|
||||||
#{ description => <<"List all authorization sources">>
|
#{ description => <<"List all authorization sources">>
|
||||||
, responses =>
|
, responses =>
|
||||||
#{ 200 => mk( array(hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)]))
|
#{ 200 => mk( array(hoconsc:union(
|
||||||
|
[ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)]))
|
||||||
, #{desc => <<"Authorization source">>})
|
, #{desc => <<"Authorization source">>})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
, post =>
|
, post =>
|
||||||
#{ description => <<"Add a new source">>
|
#{ description => <<"Add a new source">>
|
||||||
, 'requestBody' => mk( hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)])
|
, 'requestBody' => mk( hoconsc:union(
|
||||||
|
[ref(?API_SCHEMA_MODULE, Type)
|
||||||
|
|| Type <- authz_sources_types(detailed)])
|
||||||
, #{desc => <<"Source config">>})
|
, #{desc => <<"Source config">>})
|
||||||
, responses =>
|
, responses =>
|
||||||
#{ 204 => <<"Authorization source created successfully">>
|
#{ 204 => <<"Authorization source created successfully">>
|
||||||
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
|
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST],
|
||||||
|
<<"Bad Request">>)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
, put =>
|
, put =>
|
||||||
#{ description => <<"Update all sources">>
|
#{ description => <<"Update all sources">>
|
||||||
, 'requestBody' => mk( array(hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)]))
|
, 'requestBody' => mk( array(hoconsc:union(
|
||||||
|
[ref(?API_SCHEMA_MODULE, Type)
|
||||||
|
|| Type <- authz_sources_types(detailed)]))
|
||||||
, #{desc => <<"Sources">>})
|
, #{desc => <<"Sources">>})
|
||||||
, responses =>
|
, responses =>
|
||||||
#{ 204 => <<"Authorization source updated successfully">>
|
#{ 204 => <<"Authorization source updated successfully">>
|
||||||
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
|
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST],
|
||||||
|
<<"Bad Request">>)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -112,7 +119,9 @@ schema("/authorization/sources/:type") ->
|
||||||
#{ description => <<"Get a authorization source">>
|
#{ description => <<"Get a authorization source">>
|
||||||
, parameters => parameters_field()
|
, parameters => parameters_field()
|
||||||
, responses =>
|
, responses =>
|
||||||
#{ 200 => mk( hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)])
|
#{ 200 => mk( hoconsc:union(
|
||||||
|
[ref(?API_SCHEMA_MODULE, Type)
|
||||||
|
|| Type <- authz_sources_types(detailed)])
|
||||||
, #{desc => <<"Authorization source">>})
|
, #{desc => <<"Authorization source">>})
|
||||||
, 404 => emqx_dashboard_swagger:error_codes([?NOT_FOUND], <<"Not Found">>)
|
, 404 => emqx_dashboard_swagger:error_codes([?NOT_FOUND], <<"Not Found">>)
|
||||||
}
|
}
|
||||||
|
@ -120,7 +129,8 @@ schema("/authorization/sources/:type") ->
|
||||||
, put =>
|
, put =>
|
||||||
#{ description => <<"Update source">>
|
#{ description => <<"Update source">>
|
||||||
, parameters => parameters_field()
|
, parameters => parameters_field()
|
||||||
, 'requestBody' => mk( hoconsc:union([ref(?API_SCHEMA_MODULE, Type) || Type <- authz_sources_types(detailed)]))
|
, 'requestBody' => mk( hoconsc:union([ref(?API_SCHEMA_MODULE, Type)
|
||||||
|
|| Type <- authz_sources_types(detailed)]))
|
||||||
, responses =>
|
, responses =>
|
||||||
#{ 204 => <<"Authorization source updated successfully">>
|
#{ 204 => <<"Authorization source updated successfully">>
|
||||||
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
|
, 400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
|
||||||
|
|
|
@ -29,8 +29,8 @@ start(_StartType, _StartArgs) ->
|
||||||
{ok, Sup} = emqx_bridge_sup:start_link(),
|
{ok, Sup} = emqx_bridge_sup:start_link(),
|
||||||
ok = emqx_bridge:load(),
|
ok = emqx_bridge:load(),
|
||||||
ok = emqx_bridge:load_hook(),
|
ok = emqx_bridge:load_hook(),
|
||||||
emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
|
ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE),
|
||||||
emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
|
ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge),
|
||||||
{ok, Sup}.
|
{ok, Sup}.
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
|
|
|
@ -72,23 +72,23 @@ roots() ->
|
||||||
end,
|
end,
|
||||||
emqx_schema_high_prio_roots() ++
|
emqx_schema_high_prio_roots() ++
|
||||||
[ {"node",
|
[ {"node",
|
||||||
sc(hoconsc:ref("node"),
|
sc(ref("node"),
|
||||||
#{ desc => "Node name, cookie, config & data directories "
|
#{ desc => "Node name, cookie, config & data directories "
|
||||||
"and the Erlang virtual machine (BEAM) boot parameters."
|
"and the Erlang virtual machine (BEAM) boot parameters."
|
||||||
})}
|
})}
|
||||||
, {"cluster",
|
, {"cluster",
|
||||||
sc(hoconsc:ref("cluster"),
|
sc(ref("cluster"),
|
||||||
#{ desc => "EMQX nodes can form a cluster to scale up the total capacity.<br>"
|
#{ desc => "EMQX nodes can form a cluster to scale up the total capacity.<br>"
|
||||||
"Here holds the configs to instruct how individual nodes "
|
"Here holds the configs to instruct how individual nodes "
|
||||||
"can discover each other."
|
"can discover each other."
|
||||||
})}
|
})}
|
||||||
, {"log",
|
, {"log",
|
||||||
sc(hoconsc:ref("log"),
|
sc(ref("log"),
|
||||||
#{ desc => "Configure logging backends (to console or to file), "
|
#{ desc => "Configure logging backends (to console or to file), "
|
||||||
"and logging level for each logger backend."
|
"and logging level for each logger backend."
|
||||||
})}
|
})}
|
||||||
, {"rpc",
|
, {"rpc",
|
||||||
sc(hoconsc:ref("rpc"),
|
sc(ref("rpc"),
|
||||||
#{ desc => "EMQX uses a library called <code>gen_rpc</code> for "
|
#{ desc => "EMQX uses a library called <code>gen_rpc</code> for "
|
||||||
"inter-broker communication.<br/>Most of the time the default config "
|
"inter-broker communication.<br/>Most of the time the default config "
|
||||||
"should work, but in case you need to do performance "
|
"should work, but in case you need to do performance "
|
||||||
|
@ -315,19 +315,22 @@ a crash dump"
|
||||||
sc(emqx_schema:duration(),
|
sc(emqx_schema:duration(),
|
||||||
#{ mapping => "vm_args.-kernel net_ticktime"
|
#{ mapping => "vm_args.-kernel net_ticktime"
|
||||||
, default => "2m"
|
, default => "2m"
|
||||||
, desc => "This is the approximate time an EMQX node may be unresponsive until it is considered down and thereby disconnected."
|
, desc => "This is the approximate time an EMQX node may"
|
||||||
|
" be unresponsive until it is considered down and thereby disconnected."
|
||||||
})}
|
})}
|
||||||
, {"dist_listen_min",
|
, {"dist_listen_min",
|
||||||
sc(range(1024, 65535),
|
sc(range(1024, 65535),
|
||||||
#{ mapping => "kernel.inet_dist_listen_min"
|
#{ mapping => "kernel.inet_dist_listen_min"
|
||||||
, default => 6369
|
, default => 6369
|
||||||
, desc => "Lower bound for the port range where EMQX broker listens for peer connections."
|
, desc => "Lower bound for the port range where"
|
||||||
|
" EMQX broker listens for peer connections."
|
||||||
})}
|
})}
|
||||||
, {"dist_listen_max",
|
, {"dist_listen_max",
|
||||||
sc(range(1024, 65535),
|
sc(range(1024, 65535),
|
||||||
#{ mapping => "kernel.inet_dist_listen_max"
|
#{ mapping => "kernel.inet_dist_listen_max"
|
||||||
, default => 6369
|
, default => 6369
|
||||||
, desc => "Upper bound for the port range where EMQX broker listens for peer connections."
|
, desc => "Upper bound for the port range "
|
||||||
|
"where EMQX broker listens for peer connections."
|
||||||
})}
|
})}
|
||||||
, {"backtrace_depth",
|
, {"backtrace_depth",
|
||||||
sc(integer(),
|
sc(integer(),
|
||||||
|
@ -455,7 +458,8 @@ fields("rpc") ->
|
||||||
#{ mapping => "gen_rpc.port_discovery"
|
#{ mapping => "gen_rpc.port_discovery"
|
||||||
, default => stateless
|
, default => stateless
|
||||||
, desc => "<code>manual</code>: discover ports by <code>tcp_server_port</code>.<br/>"
|
, desc => "<code>manual</code>: discover ports by <code>tcp_server_port</code>.<br/>"
|
||||||
"<code>stateless</code>: discover ports in a stateless manner, using the following algorithm. "
|
"<code>stateless</code>: discover ports in a stateless manner,"
|
||||||
|
" using the following algorithm. "
|
||||||
"If node name is <code>emqxN@127.0.0.1</code>, where the N is an integer, "
|
"If node name is <code>emqxN@127.0.0.1</code>, where the N is an integer, "
|
||||||
"then the listening port will be 5370 + N."
|
"then the listening port will be 5370 + N."
|
||||||
})}
|
})}
|
||||||
|
@ -464,7 +468,8 @@ fields("rpc") ->
|
||||||
#{ mapping => "gen_rpc.tcp_server_port"
|
#{ mapping => "gen_rpc.tcp_server_port"
|
||||||
, default => 5369
|
, default => 5369
|
||||||
, desc => "Listening port used by RPC local service.<br/> "
|
, desc => "Listening port used by RPC local service.<br/> "
|
||||||
"Note that this config only takes effect when rpc.port_discovery is set to manual."
|
"Note that this config only takes effect "
|
||||||
|
"when rpc.port_discovery is set to manual."
|
||||||
})}
|
})}
|
||||||
, {"ssl_server_port",
|
, {"ssl_server_port",
|
||||||
sc(integer(),
|
sc(integer(),
|
||||||
|
@ -497,7 +502,8 @@ fields("rpc") ->
|
||||||
sc(file(),
|
sc(file(),
|
||||||
#{ mapping => "gen_rpc.keyfile"
|
#{ mapping => "gen_rpc.keyfile"
|
||||||
, desc => "Path to the private key file for the <code>rpc.certfile</code>.<br/>"
|
, desc => "Path to the private key file for the <code>rpc.certfile</code>.<br/>"
|
||||||
"Note: contents of this file are secret, so it's necessary to set permissions to 600."
|
"Note: contents of this file are secret, so it's necessary to "
|
||||||
|
"set permissions to 600."
|
||||||
})}
|
})}
|
||||||
, {"cacertfile",
|
, {"cacertfile",
|
||||||
sc(file(),
|
sc(file(),
|
||||||
|
@ -528,7 +534,8 @@ fields("rpc") ->
|
||||||
sc(emqx_schema:duration_s(),
|
sc(emqx_schema:duration_s(),
|
||||||
#{ mapping => "gen_rpc.socket_keepalive_idle"
|
#{ mapping => "gen_rpc.socket_keepalive_idle"
|
||||||
, default => "7200s"
|
, default => "7200s"
|
||||||
, desc => "How long the connections between the brokers should remain open after the last message is sent."
|
, desc => "How long the connections between the brokers should remain open "
|
||||||
|
"after the last message is sent."
|
||||||
})}
|
})}
|
||||||
, {"socket_keepalive_interval",
|
, {"socket_keepalive_interval",
|
||||||
sc(emqx_schema:duration_s(),
|
sc(emqx_schema:duration_s(),
|
||||||
|
@ -941,7 +948,7 @@ roots(Module) ->
|
||||||
emqx_schema_high_prio_roots() ->
|
emqx_schema_high_prio_roots() ->
|
||||||
Roots = emqx_schema:roots(high),
|
Roots = emqx_schema:roots(high),
|
||||||
Authz = {"authorization",
|
Authz = {"authorization",
|
||||||
sc(hoconsc:ref("authorization"),
|
sc(hoconsc:ref(?MODULE, "authorization"),
|
||||||
#{ desc => """
|
#{ desc => """
|
||||||
Authorization a.k.a. ACL.<br>
|
Authorization a.k.a. ACL.<br>
|
||||||
In EMQX, MQTT client access control is extremely flexible.<br>
|
In EMQX, MQTT client access control is extremely flexible.<br>
|
||||||
|
|
|
@ -115,8 +115,14 @@ set_special_configs(_) ->
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||||
%% assert we there's no connectors and no bridges at first
|
%% assert we there's no connectors and no bridges at first
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["connectors"]), []),
|
{ok, 200, Connectors} = request(get, uri(["connectors"]), []),
|
||||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
lists:foreach(fun(#{<<"id">> := ConnectorID}) ->
|
||||||
|
{ok, 200, <<>>} = request(delete, uri(["connectors", ConnectorID]), [])
|
||||||
|
end, jsx:decode(Connectors)),
|
||||||
|
{ok, 200, Bridges} = request(get, uri(["bridges"]), []),
|
||||||
|
lists:foreach(fun(#{<<"id">> := BridgeID}) ->
|
||||||
|
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), [])
|
||||||
|
end, jsx:decode(Bridges)),
|
||||||
Config.
|
Config.
|
||||||
end_per_testcase(_, _Config) ->
|
end_per_testcase(_, _Config) ->
|
||||||
clear_resources(),
|
clear_resources(),
|
||||||
|
|
|
@ -45,7 +45,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
api_spec() ->
|
api_spec() ->
|
||||||
emqx_dashboard_swagger:spec(?MODULE).
|
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
|
||||||
|
|
||||||
namespace() -> "configuration".
|
namespace() -> "configuration".
|
||||||
|
|
||||||
|
@ -53,7 +53,6 @@ paths() ->
|
||||||
["/configs", "/configs_reset/:rootname"] ++
|
["/configs", "/configs_reset/:rootname"] ++
|
||||||
lists:map(fun({Name, _Type}) -> ?PREFIX ++ to_list(Name) end, config_list(?EXCLUDES)).
|
lists:map(fun({Name, _Type}) -> ?PREFIX ++ to_list(Name) end, config_list(?EXCLUDES)).
|
||||||
|
|
||||||
|
|
||||||
schema("/configs") ->
|
schema("/configs") ->
|
||||||
#{
|
#{
|
||||||
'operationId' => configs,
|
'operationId' => configs,
|
||||||
|
@ -156,7 +155,7 @@ config(put, #{body := Body}, Req) ->
|
||||||
Path = conf_path(Req),
|
Path = conf_path(Req),
|
||||||
case emqx:update_config(Path, Body, #{rawconf_with_defaults => true}) of
|
case emqx:update_config(Path, Body, #{rawconf_with_defaults => true}) of
|
||||||
{ok, #{raw_config := RawConf}} ->
|
{ok, #{raw_config := RawConf}} ->
|
||||||
{200, emqx_map_lib:jsonable_map(RawConf)};
|
{200, RawConf};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
|
{400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Reason)}}
|
||||||
end.
|
end.
|
||||||
|
@ -194,8 +193,7 @@ conf_path_reset(Req) ->
|
||||||
string:lexemes(Path, "/ ").
|
string:lexemes(Path, "/ ").
|
||||||
|
|
||||||
get_full_config() ->
|
get_full_config() ->
|
||||||
emqx_map_lib:jsonable_map(
|
emqx_config:fill_defaults(emqx:get_raw_config([])).
|
||||||
emqx_config:fill_defaults(emqx:get_raw_config([]))).
|
|
||||||
|
|
||||||
conf_path_from_querystr(Req) ->
|
conf_path_from_querystr(Req) ->
|
||||||
case proplists:get_value(<<"conf_path">>, cowboy_req:parse_qs(Req)) of
|
case proplists:get_value(<<"conf_path">>, cowboy_req:parse_qs(Req)) of
|
||||||
|
|
|
@ -160,20 +160,28 @@ t_update_disable(_Config) ->
|
||||||
|
|
||||||
t_update_re_failed(_Config) ->
|
t_update_re_failed(_Config) ->
|
||||||
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
|
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?REWRITE),
|
||||||
|
Re = <<"*^test/*">>,
|
||||||
Rules = [#{
|
Rules = [#{
|
||||||
<<"source_topic">> => <<"test/#">>,
|
<<"source_topic">> => <<"test/#">>,
|
||||||
<<"re">> => <<"*^test/*">>,
|
<<"re">> => Re,
|
||||||
<<"dest_topic">> => <<"test1/$2">>,
|
<<"dest_topic">> => <<"test1/$2">>,
|
||||||
<<"action">> => <<"publish">>
|
<<"action">> => <<"publish">>
|
||||||
}],
|
}],
|
||||||
Error = {badmatch,
|
?assertError({badmatch,
|
||||||
{error,
|
{error,
|
||||||
{emqx_modules_schema,
|
{_,
|
||||||
[{validation_error,
|
[
|
||||||
#{path => "rewrite.1.re",
|
{validation_error,
|
||||||
reason => {<<"*^test/*">>,{"nothing to repeat",0}},
|
#{
|
||||||
value => <<"*^test/*">>}}]}}},
|
path := "root.rewrite.1.re",
|
||||||
?assertError(Error, emqx_rewrite:update(Rules)),
|
reason := {Re, {"nothing to repeat", 0}},
|
||||||
|
value := Re
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, emqx_rewrite:update(Rules)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -68,7 +68,7 @@ root_fields() ->
|
||||||
, {install_dir, fun install_dir/1}
|
, {install_dir, fun install_dir/1}
|
||||||
].
|
].
|
||||||
|
|
||||||
states(type) -> hoconsc:array(hoconsc:ref(state));
|
states(type) -> hoconsc:array(hoconsc:ref(?MODULE, state));
|
||||||
states(required) -> false;
|
states(required) -> false;
|
||||||
states(default) -> [];
|
states(default) -> [];
|
||||||
states(desc) -> "An array of plugins in the desired states.<br>"
|
states(desc) -> "An array of plugins in the desired states.<br>"
|
||||||
|
|
|
@ -104,7 +104,7 @@ parameters() ->
|
||||||
})}].
|
})}].
|
||||||
|
|
||||||
fields(message_summary) ->
|
fields(message_summary) ->
|
||||||
[ {id, mk(binary(), #{desc => <<"Message ID">>})}
|
[ {msgid, mk(binary(), #{desc => <<"Message ID">>})}
|
||||||
, {topic, mk(binary(), #{desc => "The topic"})}
|
, {topic, mk(binary(), #{desc => "The topic"})}
|
||||||
, {qos, mk(emqx_schema:qos(), #{desc => "The QoS"})}
|
, {qos, mk(emqx_schema:qos(), #{desc => "The QoS"})}
|
||||||
, {publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})}
|
, {publish_at, mk(string(), #{desc => "Publish datetime, in RFC 3339 format"})}
|
||||||
|
|
|
@ -23,10 +23,8 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
start(_Type, _Args) ->
|
start(_Type, _Args) ->
|
||||||
{ok, Sup} = emqx_retainer_sup:start_link(),
|
emqx_retainer_sup:start_link().
|
||||||
emqx_retainer_cli:load(),
|
|
||||||
{ok, Sup}.
|
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
emqx_retainer_cli:unload().
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -1,37 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-module(emqx_retainer_cli).
|
|
||||||
|
|
||||||
-include("emqx_retainer.hrl").
|
|
||||||
|
|
||||||
%% APIs
|
|
||||||
-export([ load/0
|
|
||||||
, cmd/1
|
|
||||||
, unload/0
|
|
||||||
]).
|
|
||||||
|
|
||||||
load() ->
|
|
||||||
emqx_ctl:register_command(retainer, {?MODULE, cmd}, []).
|
|
||||||
|
|
||||||
cmd(_) ->
|
|
||||||
emqx_ctl:usage([{"retainer info", "Show the count of retained messages"},
|
|
||||||
{"retainer topics", "Show all topics of retained messages"},
|
|
||||||
{"retainer clean", "Clean all retained messages"},
|
|
||||||
{"retainer clean <Topic>", "Clean retained messages by the specified topic filter"}]).
|
|
||||||
|
|
||||||
unload() ->
|
|
||||||
emqx_ctl:unregister_command(retainer).
|
|
|
@ -25,6 +25,7 @@
|
||||||
-export([ start_link/2
|
-export([ start_link/2
|
||||||
, dispatch/2
|
, dispatch/2
|
||||||
, refresh_limiter/0
|
, refresh_limiter/0
|
||||||
|
, worker/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
|
@ -50,6 +51,9 @@ refresh_limiter() ->
|
||||||
end,
|
end,
|
||||||
Workers).
|
Workers).
|
||||||
|
|
||||||
|
worker() ->
|
||||||
|
gproc_pool:pick_worker(?POOL, self()).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
%% Starts the server
|
%% Starts the server
|
||||||
|
@ -79,6 +83,7 @@ start_link(Pool, Id) ->
|
||||||
{stop, Reason :: term()} |
|
{stop, Reason :: term()} |
|
||||||
ignore.
|
ignore.
|
||||||
init([Pool, Id]) ->
|
init([Pool, Id]) ->
|
||||||
|
erlang:process_flag(trap_exit, true),
|
||||||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||||
Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
|
Bucket = emqx:get_config([retainer, flow_control, limiter_bucket_name]),
|
||||||
Limiter = emqx_limiter_server:connect(shared, Bucket),
|
Limiter = emqx_limiter_server:connect(shared, Bucket),
|
||||||
|
@ -188,10 +193,6 @@ format_status(_Opt, Status) ->
|
||||||
cast(Msg) ->
|
cast(Msg) ->
|
||||||
gen_server:cast(worker(), Msg).
|
gen_server:cast(worker(), Msg).
|
||||||
|
|
||||||
%% @private
|
|
||||||
worker() ->
|
|
||||||
gproc_pool:pick_worker(?POOL, self()).
|
|
||||||
|
|
||||||
-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
|
-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
|
||||||
dispatch(Context, Pid, Topic, Cursor, Limiter) ->
|
dispatch(Context, Pid, Topic, Cursor, Limiter) ->
|
||||||
Mod = emqx_retainer:get_backend_module(),
|
Mod = emqx_retainer:get_backend_module(),
|
||||||
|
|
|
@ -99,6 +99,10 @@ t_store_and_clean(_) ->
|
||||||
<<"this is a retained message">>,
|
<<"this is a retained message">>,
|
||||||
[{qos, 0}, {retain, true}]),
|
[{qos, 0}, {retain, true}]),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
|
|
||||||
|
{ok, List} = emqx_retainer:page_read(<<"retained">>, 1, 10),
|
||||||
|
?assertEqual(1, length(List)),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
|
||||||
|
@ -109,6 +113,10 @@ t_store_and_clean(_) ->
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(0, length(receive_messages(1))),
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
|
|
||||||
|
ok = emqx_retainer:clean(),
|
||||||
|
{ok, List2} = emqx_retainer:page_read(<<"retained">>, 1, 10),
|
||||||
|
?assertEqual(0, length(List2)),
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
t_retain_handling(_) ->
|
t_retain_handling(_) ->
|
||||||
|
@ -337,6 +345,96 @@ t_flow_control(_) ->
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_clear_expired(_) ->
|
||||||
|
ConfMod = fun(Conf) ->
|
||||||
|
Conf#{<<"msg_clear_interval">> := <<"1s">>, <<"msg_expiry_interval">> := <<"3s">>}
|
||||||
|
end,
|
||||||
|
|
||||||
|
Case = fun() ->
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
|
||||||
|
lists:foreach(fun(I) ->
|
||||||
|
emqtt:publish(C1,
|
||||||
|
<<"retained/", (I + 60):8/unsigned-integer>>,
|
||||||
|
#{'Message-Expiry-Interval' => 3},
|
||||||
|
<<"retained">>,
|
||||||
|
[{qos, 0}, {retain, true}])
|
||||||
|
end,
|
||||||
|
lists:seq(1, 5)),
|
||||||
|
timer:sleep(1000),
|
||||||
|
|
||||||
|
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
|
||||||
|
?assertEqual(5, erlang:length(List)),
|
||||||
|
|
||||||
|
timer:sleep(4500),
|
||||||
|
|
||||||
|
{ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
|
||||||
|
?assertEqual(0, erlang:length(List2)),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1)
|
||||||
|
end,
|
||||||
|
with_conf(ConfMod, Case).
|
||||||
|
|
||||||
|
t_max_payload_size(_) ->
|
||||||
|
ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := 6} end,
|
||||||
|
Case = fun() ->
|
||||||
|
emqx_retainer:clean(),
|
||||||
|
timer:sleep(500),
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
|
||||||
|
emqtt:publish(C1,
|
||||||
|
<<"retained/1">>, #{}, <<"1234">>, [{qos, 0}, {retain, true}]),
|
||||||
|
|
||||||
|
emqtt:publish(C1,
|
||||||
|
<<"retained/2">>, #{}, <<"1234567">>, [{qos, 0}, {retain, true}]),
|
||||||
|
|
||||||
|
timer:sleep(500),
|
||||||
|
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 10),
|
||||||
|
?assertEqual(1, erlang:length(List)),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1)
|
||||||
|
end,
|
||||||
|
with_conf(ConfMod, Case).
|
||||||
|
|
||||||
|
t_page_read(_) ->
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
ok = emqx_retainer:clean(),
|
||||||
|
timer:sleep(500),
|
||||||
|
|
||||||
|
Fun = fun(I) ->
|
||||||
|
emqtt:publish(C1,
|
||||||
|
<<"retained/", (I + 60)>>,
|
||||||
|
<<"this is a retained message">>,
|
||||||
|
[{qos, 0}, {retain, true}]
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
lists:foreach(Fun, lists:seq(1, 9)),
|
||||||
|
timer:sleep(200),
|
||||||
|
|
||||||
|
{ok, List} = emqx_retainer:page_read(<<"retained/+">>, 1, 5),
|
||||||
|
?assertEqual(5, length(List)),
|
||||||
|
|
||||||
|
{ok, List2} = emqx_retainer:page_read(<<"retained/+">>, 2, 5),
|
||||||
|
?assertEqual(4, length(List2)),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
|
t_only_for_coverage(_) ->
|
||||||
|
?assertEqual("retainer", emqx_retainer_schema:namespace()),
|
||||||
|
ignored = gen_server:call(emqx_retainer, unexpected),
|
||||||
|
ok = gen_server:cast(emqx_retainer, unexpected),
|
||||||
|
unexpected = erlang:send(erlang:whereis(emqx_retainer), unexpected),
|
||||||
|
|
||||||
|
Dispatcher = emqx_retainer_dispatcher:worker(),
|
||||||
|
ignored = gen_server:call(Dispatcher, unexpected),
|
||||||
|
ok = gen_server:cast(Dispatcher, unexpected),
|
||||||
|
unexpected = erlang:send(Dispatcher, unexpected),
|
||||||
|
true = erlang:exit(Dispatcher, normal),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -356,3 +454,15 @@ receive_messages(Count, Msgs) ->
|
||||||
after 2000 ->
|
after 2000 ->
|
||||||
Msgs
|
Msgs
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
with_conf(ConfMod, Case) ->
|
||||||
|
Conf = emqx:get_raw_config([retainer]),
|
||||||
|
NewConf = ConfMod(Conf),
|
||||||
|
emqx_retainer:update_config(NewConf),
|
||||||
|
try
|
||||||
|
Case(),
|
||||||
|
emqx_retainer:update_config(Conf)
|
||||||
|
catch Type:Error:Strace ->
|
||||||
|
emqx_retainer:update_config(Conf),
|
||||||
|
erlang:raise(Type, Error, Strace)
|
||||||
|
end.
|
||||||
|
|
|
@ -22,128 +22,117 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-import(emqx_common_test_http, [ request_api/3
|
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
|
||||||
, request_api/5
|
-import(emqx_mgmt_api_test_util, [request_api/2, request_api/5, api_path/1, auth_header_/0]).
|
||||||
, get_http_data/1
|
|
||||||
, create_default_app/0
|
|
||||||
, delete_default_app/0
|
|
||||||
, default_auth_header/0
|
|
||||||
]).
|
|
||||||
|
|
||||||
-define(HOST, "http://127.0.0.1:8081/").
|
|
||||||
-define(API_VERSION, "v4").
|
|
||||||
-define(BASE_PATH, "api").
|
|
||||||
-define(CFG_URI, "/configs/retainer").
|
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
%% TODO: V5 API
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
%% emqx_common_test_helpers:all(?MODULE).
|
|
||||||
[].
|
|
||||||
|
|
||||||
groups() ->
|
|
||||||
[].
|
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
application:load(emqx_conf),
|
||||||
|
ok = ekka:start(),
|
||||||
|
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
|
||||||
|
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
|
||||||
|
meck:expect(emqx_alarm, activate, 3, ok),
|
||||||
|
meck:expect(emqx_alarm, deactivate, 3, ok),
|
||||||
|
|
||||||
application:stop(emqx_retainer),
|
application:stop(emqx_retainer),
|
||||||
emqx_common_test_helpers:start_apps([emqx_retainer, emqx_management], fun set_special_configs/1),
|
emqx_retainer_SUITE:load_base_conf(),
|
||||||
create_default_app(),
|
emqx_mgmt_api_test_util:init_suite([emqx_retainer]),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(Config) ->
|
||||||
delete_default_app(),
|
ekka:stop(),
|
||||||
emqx_common_test_helpers:stop_apps([emqx_management, emqx_retainer]).
|
mria:stop(),
|
||||||
|
mria_mnesia:delete_schema(),
|
||||||
|
meck:unload(emqx_alarm),
|
||||||
|
emqx_mgmt_api_test_util:end_suite([emqx_slow_subs]),
|
||||||
|
Config.
|
||||||
|
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
|
{ok, _} = emqx_cluster_rpc:start_link(),
|
||||||
|
application:ensure_all_started(emqx_retainer),
|
||||||
|
timer:sleep(500),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
set_special_configs(emqx_retainer) ->
|
|
||||||
emqx_retainer_SUITE:init_emqx_retainer_conf();
|
|
||||||
set_special_configs(emqx_management) ->
|
|
||||||
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
|
|
||||||
applications =>[#{id => "admin", secret => "public"}]}),
|
|
||||||
ok;
|
|
||||||
set_special_configs(_) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Test Cases
|
%% Test Cases
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
t_config(_Config) ->
|
t_config(_Config) ->
|
||||||
{ok, Return} = request_http_rest_lookup([?CFG_URI]),
|
Path = api_path(["mqtt", "retainer"]),
|
||||||
NowCfg = get_http_data(Return),
|
{ok, ConfJson} = request_api(get, Path),
|
||||||
NewCfg = NowCfg#{<<"msg_expiry_interval">> => timer:seconds(60)},
|
ReturnConf = decode_json(ConfJson),
|
||||||
RetainerConf = #{<<"emqx_retainer">> => NewCfg},
|
?assertMatch(#{backend := _, enable := _, flow_control := _,
|
||||||
|
max_payload_size := _, msg_clear_interval := _,
|
||||||
|
msg_expiry_interval := _},
|
||||||
|
ReturnConf),
|
||||||
|
|
||||||
{ok, _} = request_http_rest_update([?CFG_URI], RetainerConf),
|
UpdateConf = fun(Enable) ->
|
||||||
{ok, UpdateReturn} = request_http_rest_lookup(["retainer"]),
|
RawConf = emqx_json:decode(ConfJson, [return_maps]),
|
||||||
?assertEqual(NewCfg, get_http_data(UpdateReturn)),
|
UpdateJson = RawConf#{<<"enable">> := Enable},
|
||||||
ok.
|
{ok, UpdateResJson} = request_api(put,
|
||||||
|
Path, [], auth_header_(), UpdateJson),
|
||||||
|
UpdateRawConf = emqx_json:decode(UpdateResJson, [return_maps]),
|
||||||
|
?assertEqual(Enable, maps:get(<<"enable">>, UpdateRawConf))
|
||||||
|
end,
|
||||||
|
|
||||||
t_enable_disable(_Config) ->
|
UpdateConf(false),
|
||||||
Conf = switch_emqx_retainer(undefined, true),
|
UpdateConf(true).
|
||||||
|
|
||||||
|
t_messages(_) ->
|
||||||
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
emqx_retainer:clean(),
|
||||||
|
timer:sleep(500),
|
||||||
|
|
||||||
|
Each = fun(I) ->
|
||||||
|
emqtt:publish(C1, <<"retained/", (I + 60)>>,
|
||||||
|
<<"retained">>,
|
||||||
|
[{qos, 0}, {retain, true}])
|
||||||
|
end,
|
||||||
|
|
||||||
|
lists:foreach(Each, lists:seq(1, 5)),
|
||||||
|
|
||||||
|
{ok, MsgsJson} = request_api(get, api_path(["mqtt", "retainer", "messages"])),
|
||||||
|
Msgs = decode_json(MsgsJson),
|
||||||
|
?assert(erlang:length(Msgs) >= 5), %% maybe has $SYS messages
|
||||||
|
|
||||||
|
[First | _] = Msgs,
|
||||||
|
?assertMatch(#{msgid := _, topic := _, qos := _,
|
||||||
|
publish_at := _, from_clientid := _, from_username := _
|
||||||
|
},
|
||||||
|
First),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
|
t_lookup_and_delete(_) ->
|
||||||
|
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
emqx_retainer:clean(),
|
||||||
|
timer:sleep(500),
|
||||||
|
|
||||||
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(C1, <<"retained/api">>, <<"retained">>, [{qos, 0}, {retain, true}]),
|
||||||
timer:sleep(100),
|
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
API = api_path(["mqtt", "retainer", "message", "retained%2Fapi"]),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
{ok, LookupJson} = request_api(get, API),
|
||||||
|
LookupResult = decode_json(LookupJson),
|
||||||
|
|
||||||
_ = switch_emqx_retainer(Conf, false),
|
?assertMatch(#{msgid := _, topic := _, qos := _, payload := _,
|
||||||
|
publish_at := _, from_clientid := _, from_username := _
|
||||||
|
},
|
||||||
|
LookupResult),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained">>),
|
{ok, []} = request_api(delete, API),
|
||||||
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
|
|
||||||
timer:sleep(100),
|
{error, {"HTTP/1.1", 404, "Not Found"}} = request_api(get, API),
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
|
||||||
?assertEqual(0, length(receive_messages(1))),
|
|
||||||
|
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% HTTP Request
|
%% HTTP Request
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
request_http_rest_lookup(Path) ->
|
decode_json(Data) ->
|
||||||
request_api(get, uri([Path]), default_auth_header()).
|
BinJson = emqx_json:decode(Data, [return_maps]),
|
||||||
|
emqx_map_lib:unsafe_atom_key_map(BinJson).
|
||||||
request_http_rest_update(Path, Params) ->
|
|
||||||
request_api(put, uri([Path]), [], default_auth_header(), Params).
|
|
||||||
|
|
||||||
uri(Parts) when is_list(Parts) ->
|
|
||||||
NParts = [b2l(E) || E <- Parts],
|
|
||||||
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
b2l(B) when is_binary(B) ->
|
|
||||||
binary_to_list(B);
|
|
||||||
b2l(L) when is_list(L) ->
|
|
||||||
L.
|
|
||||||
|
|
||||||
receive_messages(Count) ->
|
|
||||||
receive_messages(Count, []).
|
|
||||||
receive_messages(0, Msgs) ->
|
|
||||||
Msgs;
|
|
||||||
receive_messages(Count, Msgs) ->
|
|
||||||
receive
|
|
||||||
{publish, Msg} ->
|
|
||||||
ct:log("Msg: ~p ~n", [Msg]),
|
|
||||||
receive_messages(Count-1, [Msg|Msgs]);
|
|
||||||
Other ->
|
|
||||||
ct:log("Other Msg: ~p~n",[Other]),
|
|
||||||
receive_messages(Count, Msgs)
|
|
||||||
after 2000 ->
|
|
||||||
Msgs
|
|
||||||
end.
|
|
||||||
|
|
||||||
switch_emqx_retainer(undefined, IsEnable) ->
|
|
||||||
{ok, Return} = request_http_rest_lookup([?COMMON_SHARD]),
|
|
||||||
NowCfg = get_http_data(Return),
|
|
||||||
switch_emqx_retainer(NowCfg, IsEnable);
|
|
||||||
|
|
||||||
switch_emqx_retainer(NowCfg, IsEnable) ->
|
|
||||||
NewCfg = NowCfg#{<<"enable">> => IsEnable},
|
|
||||||
RetainerConf = #{<<"emqx_retainer">> => NewCfg},
|
|
||||||
{ok, _} = request_http_rest_update([?CFG_URI], RetainerConf),
|
|
||||||
NewCfg.
|
|
||||||
|
|
|
@ -140,8 +140,7 @@ init([]) ->
|
||||||
Enable = emqx:get_config([slow_subs, enable]),
|
Enable = emqx:get_config([slow_subs, enable]),
|
||||||
{ok, check_enable(Enable, InitState)}.
|
{ok, check_enable(Enable, InitState)}.
|
||||||
|
|
||||||
handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) ->
|
handle_call({update_settings, #{enable := Enable}}, _From, State) ->
|
||||||
emqx_config:put([slow_subs], Conf),
|
|
||||||
State2 = check_enable(Enable, State),
|
State2 = check_enable(Enable, State),
|
||||||
{reply, ok, State2};
|
{reply, ok, State2};
|
||||||
|
|
||||||
|
|
6
bin/emqx
6
bin/emqx
|
@ -225,15 +225,17 @@ if ! check_erlang_start >/dev/null 2>&1; then
|
||||||
if [ "$LD_LIBRARY_PATH" != "$DYNLIBS_DIR" ]; then
|
if [ "$LD_LIBRARY_PATH" != "$DYNLIBS_DIR" ]; then
|
||||||
export LD_LIBRARY_PATH="$DYNLIBS_DIR:$LD_LIBRARY_PATH"
|
export LD_LIBRARY_PATH="$DYNLIBS_DIR:$LD_LIBRARY_PATH"
|
||||||
fi
|
fi
|
||||||
|
deps_hint="Please make sure openssl-1.1.1 (libcrypto), libncurses and libatomic1 are installed."
|
||||||
if ! check_erlang_start; then
|
if ! check_erlang_start; then
|
||||||
## it's hopeless
|
## it's hopeless
|
||||||
echoerr "FATAL: Unable to start Erlang."
|
echoerr "FATAL: Unable to start Erlang."
|
||||||
echoerr "Please make sure openssl-1.1.1 (libcrypto) and libncurses are installed."
|
echoerr "$deps_hint"
|
||||||
echoerr "Also ensure it's running on the correct platform:"
|
echoerr "Also ensure it's running on the correct platform:"
|
||||||
echoerr "$BUILD_INFO"
|
echoerr "$BUILD_INFO"
|
||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
echoerr "WARNING: There seem to be missing dynamic libs from the OS. Using libs from ${DYNLIBS_DIR}"
|
echoerr "Using libs from '${DYNLIBS_DIR}' due to missing from the OS."
|
||||||
|
echoerr "$deps_hint"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
## backward compatible
|
## backward compatible
|
||||||
|
|
Loading…
Reference in New Issue