feat: support emqx_ctl conf command

This commit is contained in:
Zhongwen Deng 2023-05-23 16:43:41 +08:00
parent b290d2543b
commit 8f12d81878
7 changed files with 151 additions and 44 deletions

View File

@ -20,6 +20,7 @@
-include_lib("emqx/include/logger.hrl").
-define(AUTHN_TRACE_TAG, "AUTHN").
-define(GLOBAL, 'mqtt:global').
-define(TRACE_AUTHN_PROVIDER(Msg), ?TRACE_AUTHN_PROVIDER(Msg, #{})).
-define(TRACE_AUTHN_PROVIDER(Msg, Meta), ?TRACE_AUTHN_PROVIDER(debug, Msg, Meta)).

View File

@ -86,12 +86,6 @@
%% utility functions
-export([authenticator_id/1, metrics_id/2]).
%% proxy callback
-export([
pre_config_update/3,
post_config_update/5
]).
-export_type([
authenticator_id/0,
position/0,
@ -275,12 +269,6 @@ get_enabled(Authenticators) ->
%% APIs
%%------------------------------------------------------------------------------
pre_config_update(Path, UpdateReq, OldConfig) ->
emqx_authentication_config:pre_config_update(Path, UpdateReq, OldConfig).
post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
emqx_authentication_config:post_config_update(Path, UpdateReq, NewConfig, OldConfig, AppEnvs).
%% @doc Get all registered authentication providers.
get_providers() ->
call(get_providers).
@ -447,8 +435,9 @@ list_users(ChainName, AuthenticatorID, FuzzyParams) ->
init(_Opts) ->
process_flag(trap_exit, true),
ok = emqx_config_handler:add_handler([?CONF_ROOT], ?MODULE),
ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], ?MODULE),
Module = emqx_authentication_config,
ok = emqx_config_handler:add_handler([?CONF_ROOT], Module),
ok = emqx_config_handler:add_handler([listeners, '?', '?', ?CONF_ROOT], Module),
{ok, #{hooked => false, providers => #{}}}.
handle_call(get_providers, _From, #{providers := Providers} = State) ->

View File

@ -65,8 +65,8 @@
-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) ->
{ok, map() | list()} | {error, term()}.
pre_config_update(_, UpdateReq, OldConfig) ->
try do_pre_config_update(UpdateReq, to_list(OldConfig)) of
pre_config_update(Paths, UpdateReq, OldConfig) ->
try do_pre_config_update(Paths, UpdateReq, to_list(OldConfig)) of
{error, Reason} -> {error, Reason};
{ok, NewConfig} -> {ok, NewConfig}
catch
@ -74,9 +74,9 @@ pre_config_update(_, UpdateReq, OldConfig) ->
{error, Reason}
end.
do_pre_config_update({create_authenticator, ChainName, Config}, OldConfig) ->
do_pre_config_update(_, {create_authenticator, ChainName, Config}, OldConfig) ->
NewId = authenticator_id(Config),
case lists:filter(fun(OldConfig0) -> authenticator_id(OldConfig0) =:= NewId end, OldConfig) of
case filter_authenticator(NewId, OldConfig) of
[] ->
CertsDir = certs_dir(ChainName, Config),
NConfig = convert_certs(CertsDir, Config),
@ -84,7 +84,7 @@ do_pre_config_update({create_authenticator, ChainName, Config}, OldConfig) ->
[_] ->
{error, {already_exists, {authenticator, NewId}}}
end;
do_pre_config_update({delete_authenticator, _ChainName, AuthenticatorID}, OldConfig) ->
do_pre_config_update(_, {delete_authenticator, _ChainName, AuthenticatorID}, OldConfig) ->
NewConfig = lists:filter(
fun(OldConfig0) ->
AuthenticatorID =/= authenticator_id(OldConfig0)
@ -92,7 +92,7 @@ do_pre_config_update({delete_authenticator, _ChainName, AuthenticatorID}, OldCon
OldConfig
),
{ok, NewConfig};
do_pre_config_update({update_authenticator, ChainName, AuthenticatorID, Config}, OldConfig) ->
do_pre_config_update(_, {update_authenticator, ChainName, AuthenticatorID, Config}, OldConfig) ->
CertsDir = certs_dir(ChainName, AuthenticatorID),
NewConfig = lists:map(
fun(OldConfig0) ->
@ -104,7 +104,7 @@ do_pre_config_update({update_authenticator, ChainName, AuthenticatorID, Config},
OldConfig
),
{ok, NewConfig};
do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}, OldConfig) ->
do_pre_config_update(_, {move_authenticator, _ChainName, AuthenticatorID, Position}, OldConfig) ->
case split_by_id(AuthenticatorID, OldConfig) of
{error, Reason} ->
{error, Reason};
@ -129,7 +129,18 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
{ok, BeforeNFound ++ [FoundRelated, Found | AfterNFound]}
end
end
end.
end;
do_pre_config_update(_, OldConfig, OldConfig) ->
{ok, OldConfig};
do_pre_config_update(Paths, NewConfig, _OldConfig) ->
ChainName = chain_name(Paths),
{ok, [
begin
CertsDir = certs_dir(ChainName, New),
convert_certs(CertsDir, New)
end
|| New <- to_list(NewConfig)
]}.
-spec post_config_update(
list(atom()),
@ -139,13 +150,16 @@ do_pre_config_update({move_authenticator, _ChainName, AuthenticatorID, Position}
emqx_config:app_envs()
) ->
ok | {ok, map()} | {error, term()}.
post_config_update(_, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
do_post_config_update(UpdateReq, to_list(NewConfig), OldConfig, AppEnvs).
post_config_update(Paths, UpdateReq, NewConfig, OldConfig, AppEnvs) ->
do_post_config_update(Paths, UpdateReq, to_list(NewConfig), OldConfig, AppEnvs).
do_post_config_update({create_authenticator, ChainName, Config}, NewConfig, _OldConfig, _AppEnvs) ->
do_post_config_update(
_, {create_authenticator, ChainName, Config}, NewConfig, _OldConfig, _AppEnvs
) ->
NConfig = get_authenticator_config(authenticator_id(Config), NewConfig),
emqx_authentication:create_authenticator(ChainName, NConfig);
do_post_config_update(
_,
{delete_authenticator, ChainName, AuthenticatorID},
_NewConfig,
OldConfig,
@ -160,6 +174,7 @@ do_post_config_update(
{error, Reason}
end;
do_post_config_update(
_,
{update_authenticator, ChainName, AuthenticatorID, Config},
NewConfig,
_OldConfig,
@ -172,12 +187,49 @@ do_post_config_update(
emqx_authentication:update_authenticator(ChainName, AuthenticatorID, NConfig)
end;
do_post_config_update(
_,
{move_authenticator, ChainName, AuthenticatorID, Position},
_NewConfig,
_OldConfig,
_AppEnvs
) ->
emqx_authentication:move_authenticator(ChainName, AuthenticatorID, Position).
emqx_authentication:move_authenticator(ChainName, AuthenticatorID, Position);
do_post_config_update(_, _UpdateReq, OldConfig, OldConfig, _AppEnvs) ->
ok;
do_post_config_update(Paths, _UpdateReq, NewConfig0, OldConfig0, _AppEnvs) ->
ChainName = chain_name(Paths),
OldConfig = to_list(OldConfig0),
NewConfig = to_list(NewConfig0),
OldIds = lists:map(fun authenticator_id/1, OldConfig),
NewIds = lists:map(fun authenticator_id/1, NewConfig),
%% delete authenticators that are not in the new config
lists:foreach(
fun(Conf) ->
Id = authenticator_id(Conf),
case lists:member(Id, NewIds) of
true ->
ok;
false ->
_ = emqx_authentication:delete_authenticator(ChainName, Id),
CertsDir = certs_dir(ChainName, Conf),
ok = clear_certs(CertsDir, Conf)
end
end,
OldConfig
),
%% create new authenticators and update existing ones
lists:foreach(
fun(Conf) ->
Id = authenticator_id(Conf),
case lists:member(Id, OldIds) of
true ->
emqx_authentication:update_authenticator(ChainName, Id, Conf);
false ->
emqx_authentication:create_authenticator(ChainName, Conf)
end
end,
NewConfig
).
to_list(undefined) -> [];
to_list(M) when M =:= #{} -> [];
@ -213,14 +265,15 @@ clear_certs(CertsDir, Config) ->
ok = emqx_tls_lib:delete_ssl_files(CertsDir, undefined, OldSSL).
get_authenticator_config(AuthenticatorID, AuthenticatorsConfig) ->
case
lists:filter(fun(C) -> AuthenticatorID =:= authenticator_id(C) end, AuthenticatorsConfig)
of
case filter_authenticator(AuthenticatorID, AuthenticatorsConfig) of
[C] -> C;
[] -> {error, not_found};
_ -> error({duplicated_authenticator_id, AuthenticatorsConfig})
end.
filter_authenticator(ID, Authenticators) ->
lists:filter(fun(A) -> ID =:= authenticator_id(A) end, Authenticators).
split_by_id(ID, AuthenticatorsConfig) ->
case
lists:foldl(
@ -287,3 +340,8 @@ dir(ChainName, ID) when is_binary(ID) ->
emqx_utils:safe_filename(iolist_to_binary([to_bin(ChainName), "-", ID]));
dir(ChainName, Config) when is_map(Config) ->
dir(ChainName, authenticator_id(Config)).
chain_name([authentication]) ->
?GLOBAL;
chain_name([listeners, Type, Name, authentication]) ->
binary_to_existing_atom(<<(atom_to_binary(Type))/binary, ":", (atom_to_binary(Name))/binary>>).

View File

@ -288,12 +288,14 @@ get_default_value([RootName | _] = KeyPath) ->
end.
-spec get_raw(emqx_utils_maps:config_key_path()) -> term().
get_raw([Root | T]) when is_atom(Root) -> get_raw([bin(Root) | T]);
get_raw(KeyPath) -> do_get_raw(KeyPath).
get_raw([Root | _] = KeyPath) when is_binary(Root) -> do_get_raw(KeyPath);
get_raw([Root | T]) -> get_raw([bin(Root) | T]);
get_raw([]) -> do_get_raw([]).
-spec get_raw(emqx_utils_maps:config_key_path(), term()) -> term().
get_raw([Root | T], Default) when is_atom(Root) -> get_raw([bin(Root) | T], Default);
get_raw(KeyPath, Default) -> do_get_raw(KeyPath, Default).
get_raw([Root | _] = KeyPath, Default) when is_binary(Root) -> do_get_raw(KeyPath, Default);
get_raw([Root | T], Default) -> get_raw([bin(Root) | T], Default);
get_raw([], Default) -> do_get_raw([], Default).
-spec put_raw(map()) -> ok.
put_raw(Config) ->

View File

@ -23,8 +23,6 @@
-define(AUTHN, emqx_authentication).
-define(GLOBAL, 'mqtt:global').
-define(RE_PLACEHOLDER, "\\$\\{[a-z0-9\\-]+\\}").
-define(AUTH_SHARD, emqx_authn_shard).

View File

@ -31,6 +31,7 @@
-define(NOT_FOUND, 'NOT_FOUND').
-define(ALREADY_EXISTS, 'ALREADY_EXISTS').
-define(INTERNAL_ERROR, 'INTERNAL_ERROR').
-define(CONFIG, emqx_authentication_config).
% Swagger
@ -833,12 +834,12 @@ with_chain(ListenerID, Fun) ->
create_authenticator(ConfKeyPath, ChainName, Config) ->
case update_config(ConfKeyPath, {create_authenticator, ChainName, Config}) of
{ok, #{
post_config_update := #{emqx_authentication := #{id := ID}},
post_config_update := #{?CONFIG := #{id := ID}},
raw_config := AuthenticatorsConfig
}} ->
{ok, AuthenticatorConfig} = find_config(ID, AuthenticatorsConfig),
{200, maps:put(id, ID, convert_certs(fill_defaults(AuthenticatorConfig)))};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
{error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
serialize_error(Reason);
{error, Reason} ->
serialize_error(Reason)
@ -1017,7 +1018,7 @@ update_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Config) ->
of
{ok, _} ->
{204};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
{error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
serialize_error(Reason);
{error, Reason} ->
serialize_error(Reason)
@ -1027,7 +1028,7 @@ delete_authenticator(ConfKeyPath, ChainName, AuthenticatorID) ->
case update_config(ConfKeyPath, {delete_authenticator, ChainName, AuthenticatorID}) of
{ok, _} ->
{204};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
{error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
serialize_error(Reason);
{error, Reason} ->
serialize_error(Reason)
@ -1044,7 +1045,7 @@ move_authenticator(ConfKeyPath, ChainName, AuthenticatorID, Position) ->
of
{ok, _} ->
{204};
{error, {_PrePostConfigUpdate, emqx_authentication, Reason}} ->
{error, {_PrePostConfigUpdate, ?CONFIG, Reason}} ->
serialize_error(Reason);
{error, Reason} ->
serialize_error(Reason)

View File

@ -18,16 +18,47 @@
-export([
load/0,
admins/1,
conf/1,
unload/0
]).
-define(CMD, cluster_call).
-define(CLUSTER_CALL, cluster_call).
-define(CONF, conf).
load() ->
emqx_ctl:register_command(?CMD, {?MODULE, admins}, []).
emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, []),
emqx_ctl:register_command(?CONF, {?MODULE, conf}, []).
unload() ->
emqx_ctl:unregister_command(?CMD).
emqx_ctl:unregister_command(?CLUSTER_CALL),
emqx_ctl:unregister_command(?CONF).
conf(["reload"]) ->
ConfFiles = lists:flatten(lists:join(",", application:get_env(emqx, config_files, []))),
case emqx_config:reload_etc_conf_on_local_node() of
[] ->
emqx_ctl:print("reload ~s success~n", [ConfFiles]);
Error ->
emqx_ctl:print("reload ~s failed:~n", [ConfFiles]),
print(Error)
end;
conf(["print", "--only-keys"]) ->
print(emqx_config:get_root_names());
conf(["print"]) ->
print_hocon(get_config());
conf(["print", Key]) ->
print_hocon(get_config(Key));
conf(["load", Path]) ->
load_config(Path);
conf(_) ->
emqx_ctl:usage(
[
%{"conf reload", "reload etc/emqx.conf on local node"},
{"conf print --only-keys", "print all keys"},
{"conf print", "print all running configures"},
{"conf print <key>", "print a specific configuration"}
]
).
admins(["status"]) ->
status();
@ -43,7 +74,7 @@ admins(["skip", Node0]) ->
status();
admins(["tnxid", TnxId0]) ->
TnxId = list_to_integer(TnxId0),
emqx_ctl:print("~p~n", [emqx_cluster_rpc:query(TnxId)]);
print(emqx_cluster_rpc:query(TnxId));
admins(["fast_forward"]) ->
status(),
Nodes = mria:running_nodes(),
@ -91,3 +122,30 @@ status() ->
Status
),
emqx_ctl:print("-----------------------------------------------\n").
print(Json) ->
emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Json)]).
print_hocon(Hocon) ->
emqx_ctl:print("~ts~n", [hocon_pp:do(Hocon, #{})]).
get_config() -> emqx_config:fill_defaults(emqx:get_raw_config([])).
get_config(Key) -> emqx_config:fill_defaults(#{Key => emqx:get_raw_config([Key])}).
-define(OPTIONS, #{rawconf_with_defaults => true, override_to => cluster}).
load_config(Path) ->
case hocon:files([Path]) of
{ok, Conf} ->
maps:foreach(
fun(Key, Value) ->
case emqx_conf:update([Key], Value, ?OPTIONS) of
{ok, _} -> emqx_ctl:print("load ~ts ok~n", [Key]);
{error, Reason} -> emqx_ctl:print("load ~ts failed: ~p~n", [Key, Reason])
end
end,
Conf
);
{error, Reason} ->
emqx_ctl:print("load ~ts failed: ~p~n", [Path, Reason]),
{error, Reason}
end.