feat: add emqx_config_logger.
This commit is contained in:
parent
eb4be03012
commit
18886f657b
|
@ -0,0 +1,55 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_config_logger).
|
||||
|
||||
-behaviour(emqx_config_handler).
|
||||
|
||||
%% API
|
||||
-export([add_handler/0, remove_handler/0]).
|
||||
-export([post_config_update/5]).
|
||||
|
||||
-define(LOG, [log]).
|
||||
|
||||
add_handler() ->
|
||||
ok = emqx_config_handler:add_handler(?LOG, ?MODULE),
|
||||
ok.
|
||||
|
||||
remove_handler() ->
|
||||
ok = emqx_config_handler:remove_handler(?LOG),
|
||||
ok.
|
||||
|
||||
post_config_update(?LOG, _Req, NewConf, _OldConf, _AppEnvs) ->
|
||||
NewLog = #{log => NewConf},
|
||||
[{"logger_level", LevelFunc}, {"logger", LoggerFunc}] =
|
||||
emqx_conf_schema:translation("kernel"),
|
||||
NewHandlers = LoggerFunc(NewLog),
|
||||
Level = LevelFunc(NewLog),
|
||||
ok = update_log_handlers(NewHandlers),
|
||||
ok = logger:set_primary_config(level, Level),
|
||||
application:set_env(kernel, logger_level, Level),
|
||||
ok;
|
||||
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
|
||||
ok.
|
||||
|
||||
update_log_handlers(NewHandlers) ->
|
||||
OldHandlers = application:get_env(kernel, logger, []),
|
||||
lists:foreach(fun({handler, HandlerId, _Mod, _Conf}) ->
|
||||
logger:remove_handler(HandlerId)
|
||||
end, OldHandlers -- NewHandlers),
|
||||
lists:foreach(fun({handler, HandlerId, Mod, Conf}) ->
|
||||
logger:add_handler(HandlerId, Mod, Conf)
|
||||
end, NewHandlers -- OldHandlers),
|
||||
application:set_env(kernel, logger, NewHandlers).
|
|
@ -47,11 +47,13 @@ start(_Type, _Args) ->
|
|||
{ok, Sup} = emqx_sup:start_link(),
|
||||
ok = maybe_start_listeners(),
|
||||
ok = emqx_alarm_handler:load(),
|
||||
emqx_config:add_handlers(),
|
||||
register(emqx, self()),
|
||||
{ok, Sup}.
|
||||
|
||||
prep_stop(_State) ->
|
||||
ok = emqx_alarm_handler:unload(),
|
||||
emqx_config:remove_handlers(),
|
||||
emqx_boot:is_enabled(listeners)
|
||||
andalso emqx_listeners:stop().
|
||||
|
||||
|
|
|
@ -70,6 +70,10 @@
|
|||
, find_listener_conf/3
|
||||
]).
|
||||
|
||||
-export([ add_handlers/0
|
||||
, remove_handlers/0
|
||||
]).
|
||||
|
||||
-include("logger.hrl").
|
||||
|
||||
-define(CONF, conf).
|
||||
|
@ -431,6 +435,14 @@ save_to_override_conf(RawConf, Opts) ->
|
|||
end
|
||||
end.
|
||||
|
||||
add_handlers() ->
|
||||
ok = emqx_config_logger:add_handler(),
|
||||
ok.
|
||||
|
||||
remove_handlers() ->
|
||||
ok = emqx_config_logger:remove_handler(),
|
||||
ok.
|
||||
|
||||
load_hocon_file(FileName, LoadType) ->
|
||||
case filelib:is_regular(FileName) of
|
||||
true ->
|
||||
|
|
|
@ -234,7 +234,7 @@ check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, Override
|
|||
UpdateArgs, Opts) ->
|
||||
OldConf = emqx_config:get_root(ConfKeyPath),
|
||||
Schema = schema(SchemaModule, ConfKeyPath),
|
||||
{AppEnvs, #{root := NewConf}} = emqx_config:check_config(Schema, #{<<"root">> => NewRawConf}),
|
||||
{AppEnvs, NewConf} = emqx_config:check_config(Schema, NewRawConf),
|
||||
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
|
||||
{ok, Result0} ->
|
||||
remove_from_local_if_cluster_change(ConfKeyPath, Opts),
|
||||
|
@ -403,7 +403,7 @@ schema(SchemaModule, [RootKey | _]) ->
|
|||
{_, {Ref, ?REF(Ref)}} -> {Ref, ?R_REF(SchemaModule, Ref)};
|
||||
{_, Field0} -> Field0
|
||||
end,
|
||||
#{roots => [root], fields => #{root => [Field]}}.
|
||||
#{roots => [Field]}.
|
||||
|
||||
load_prev_handlers() ->
|
||||
Handlers = application:get_env(emqx, ?MODULE, #{}),
|
||||
|
|
|
@ -91,7 +91,7 @@
|
|||
%% Stats Timer
|
||||
stats_timer :: disabled | maybe(reference()),
|
||||
%% Idle Timeout
|
||||
idle_timeout :: integer(),
|
||||
idle_timeout :: integer() | infinity,
|
||||
%% Idle Timer
|
||||
idle_timer :: maybe(reference()),
|
||||
%% Zone name
|
||||
|
|
|
@ -34,7 +34,6 @@ init([]) ->
|
|||
, child_spec(emqx_stats, worker)
|
||||
, child_spec(emqx_metrics, worker)
|
||||
, child_spec(emqx_ctl, worker)
|
||||
, child_spec(emqx_logger, worker)
|
||||
]}}.
|
||||
|
||||
child_spec(M, Type) ->
|
||||
|
|
|
@ -18,20 +18,8 @@
|
|||
|
||||
-compile({no_auto_import, [error/1]}).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-behaviour(emqx_config_handler).
|
||||
-elvis([{elvis_style, god_modules, disable}]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ start_link/0
|
||||
, init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
%% Logs
|
||||
-export([ debug/1
|
||||
, debug/2
|
||||
|
@ -71,8 +59,6 @@
|
|||
, stop_log_handler/1
|
||||
]).
|
||||
|
||||
-export([post_config_update/5]).
|
||||
|
||||
-type(peername_str() :: list()).
|
||||
-type(logger_dst() :: file:filename() | console | unknown).
|
||||
-type(logger_handler_info() :: #{
|
||||
|
@ -84,49 +70,6 @@
|
|||
}).
|
||||
|
||||
-define(STOPPED_HANDLERS, {?MODULE, stopped_handlers}).
|
||||
-define(CONF_PATH, [log]).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
init([]) ->
|
||||
ok = emqx_config_handler:add_handler(?CONF_PATH, ?MODULE),
|
||||
{ok, #{}}.
|
||||
|
||||
handle_call({update_config, AppEnvs}, _From, State) ->
|
||||
OldEnvs = application:get_env(kernel, logger, []),
|
||||
NewEnvs = proplists:get_value(logger, proplists:get_value(kernel, AppEnvs, []), []),
|
||||
ok = application:set_env(kernel, logger, NewEnvs),
|
||||
_ = [logger:remove_handler(HandlerId) || {handler, HandlerId, _Mod, _Conf} <- OldEnvs],
|
||||
_ = [logger:add_handler(HandlerId, Mod, Conf) || {handler, HandlerId, Mod, Conf} <- NewEnvs],
|
||||
ok = tune_primary_log_level(),
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call(_Req, _From, State) ->
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok = emqx_config_handler:remove_handler(?CONF_PATH),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% emqx_config_handler callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
post_config_update(_, _Req, _NewConf, _OldConf, AppEnvs) ->
|
||||
gen_server:call(?MODULE, {update_config, AppEnvs}, 5000).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
|
|
|
@ -138,13 +138,14 @@ apply_fun(Fun, Input, State) ->
|
|||
{arity, 2} -> Fun(Input, State)
|
||||
end.
|
||||
|
||||
-spec(start_timer(integer(), term()) -> reference()).
|
||||
-spec(start_timer(integer() | atom(), term()) -> maybe(reference())).
|
||||
start_timer(Interval, Msg) ->
|
||||
start_timer(Interval, self(), Msg).
|
||||
|
||||
-spec(start_timer(integer(), pid() | atom(), term()) -> reference()).
|
||||
start_timer(Interval, Dest, Msg) ->
|
||||
erlang:start_timer(erlang:ceil(Interval), Dest, Msg).
|
||||
-spec(start_timer(integer() | atom(), pid() | atom(), term()) -> maybe(reference())).
|
||||
start_timer(Interval, Dest, Msg) when is_number(Interval) ->
|
||||
erlang:start_timer(erlang:ceil(Interval), Dest, Msg);
|
||||
start_timer(_Atom, _Dest, _Msg) -> undefined.
|
||||
|
||||
-spec(cancel_timer(maybe(reference())) -> ok).
|
||||
cancel_timer(Timer) when is_reference(Timer) ->
|
||||
|
|
|
@ -419,7 +419,7 @@ after idling for 'Keepalive * backoff * 2'."""
|
|||
})
|
||||
}
|
||||
, {"max_mqueue_len",
|
||||
sc(hoconsc:union([range(0, inf), infinity]),
|
||||
sc(hoconsc:union([non_neg_integer(), infinity]),
|
||||
#{ default => 1000,
|
||||
desc =>
|
||||
"""Maximum queue length. Enqueued messages when persistent client disconnected,
|
||||
|
|
|
@ -176,7 +176,7 @@ do_parse_versions([], Acc) -> lists:reverse(Acc);
|
|||
do_parse_versions([V | More], Acc) ->
|
||||
case parse_version(V) of
|
||||
unknown ->
|
||||
emqx_logger:warning("unknown_tls_version_discarded: ~p", [V]),
|
||||
?SLOG(warning, #{msg => "unknown_tls_version_discarded", version => V}),
|
||||
do_parse_versions(More, Acc);
|
||||
Parsed ->
|
||||
do_parse_versions(More, [Parsed | Acc])
|
||||
|
|
|
@ -65,9 +65,9 @@ handle_msg(ReqMsg, RequestHandler, Parent) ->
|
|||
props = RspProps,
|
||||
payload = RspPayload
|
||||
},
|
||||
emqx_logger:debug("~p sending response msg to topic ~ts with~n"
|
||||
"corr-data=~p~npayload=~p",
|
||||
[?MODULE, RspTopic, CorrData, RspPayload]),
|
||||
logger:debug("~p sending response msg to topic ~ts with~n"
|
||||
"corr-data=~p~npayload=~p",
|
||||
[?MODULE, RspTopic, CorrData, RspPayload]),
|
||||
ok = send_response(RspMsg);
|
||||
_ ->
|
||||
Parent ! {discarded, ReqPayload},
|
||||
|
|
|
@ -772,7 +772,7 @@ tr_logger(Conf) ->
|
|||
%% For the file logger
|
||||
FileHandlers =
|
||||
[begin
|
||||
{handler, binary_to_atom(HandlerName, latin1), logger_disk_log_h, #{
|
||||
{handler, to_atom(HandlerName), logger_disk_log_h, #{
|
||||
level => conf_get("level", SubConf),
|
||||
config => (log_handler_conf(SubConf)) #{
|
||||
type => case conf_get("rotation.enable", SubConf) of
|
||||
|
@ -846,7 +846,7 @@ log_handler_common_confs() ->
|
|||
#{ default => error
|
||||
})}
|
||||
, {"max_depth",
|
||||
sc(hoconsc:union([unlimited, integer()]),
|
||||
sc(hoconsc:union([unlimited, non_neg_integer()]),
|
||||
#{ default => 100
|
||||
, desc => "Maximum depth for Erlang term log formatting "
|
||||
"and Erlang process message queue inspection."
|
||||
|
|
|
@ -34,6 +34,8 @@
|
|||
-define(PREFIX, "/configs/").
|
||||
-define(PREFIX_RESET, "/configs_reset/").
|
||||
-define(ERR_MSG(MSG), list_to_binary(io_lib:format("~p", [MSG]))).
|
||||
-define(OPTS, #{rawconf_with_defaults => true, override_to => cluster}).
|
||||
|
||||
-define(EXCLUDES, [
|
||||
<<"exhook">>,
|
||||
<<"gateway">>,
|
||||
|
@ -177,7 +179,7 @@ config(get, _Params, Req) ->
|
|||
|
||||
config(put, #{body := Body}, Req) ->
|
||||
Path = conf_path(Req),
|
||||
case emqx:update_config(Path, Body, #{rawconf_with_defaults => true}) of
|
||||
case emqx_conf:update(Path, Body, ?OPTS) of
|
||||
{ok, #{raw_config := RawConf}} ->
|
||||
{200, RawConf};
|
||||
{error, Reason} ->
|
||||
|
@ -192,7 +194,7 @@ global_zone_configs(get, _Params, _Req) ->
|
|||
global_zone_configs(put, #{body := Body}, _Req) ->
|
||||
Res =
|
||||
maps:fold(fun(Path, Value, Acc) ->
|
||||
case emqx:update_config([Path], Value, #{rawconf_with_defaults => true}) of
|
||||
case emqx_conf:update([Path], Value, ?OPTS) of
|
||||
{ok, #{raw_config := RawConf}} ->
|
||||
Acc#{Path => RawConf};
|
||||
{error, Reason} ->
|
||||
|
|
|
@ -88,7 +88,10 @@ schema("/plugins/install") ->
|
|||
properties => #{
|
||||
plugin => #{type => string, format => binary}}},
|
||||
encoding => #{plugin => #{'contentType' => 'application/gzip'}}}}},
|
||||
responses => #{200 => <<"OK">>}
|
||||
responses => #{
|
||||
200 => <<"OK">>,
|
||||
400 => emqx_dashboard_swagger:error_codes(['UNEXPECTED_ERROR','ALREADY_INSTALLED'])
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/plugins/:name") ->
|
||||
|
@ -263,15 +266,29 @@ upload_install(post, #{body := #{<<"plugin">> := Plugin}}) when is_map(Plugin) -
|
|||
%% File bin is too large, we use rpc:multicall instead of cluster_rpc:multicall
|
||||
%% TODO what happened when a new node join in?
|
||||
%% emqx_plugins_monitor should copy plugins from other core node when boot-up.
|
||||
{Res, _} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
|
||||
case lists:filter(fun(R) -> R =/= ok end, Res) of
|
||||
[] -> {200};
|
||||
[{error, Reason} | _] ->
|
||||
{400, #{code => 'UNEXPECTED_ERROR',
|
||||
message => iolist_to_binary(io_lib:format("~p", [Reason]))}}
|
||||
case emqx_plugins:describe(string:trim(FileName, trailing, ".tar.gz")) of
|
||||
{error, #{error := "bad_info_file", return := {enoent, _}}} ->
|
||||
{AppName, _Vsn} = emqx_plugins:parse_name_vsn(FileName),
|
||||
AppDir = filename:join(emqx_plugins:install_dir(), AppName),
|
||||
case filelib:wildcard(AppDir ++ "*.tar.gz") of
|
||||
[] ->
|
||||
{Res, _} = emqx_mgmt_api_plugins_proto_v1:install_package(FileName, Bin),
|
||||
case lists:filter(fun(R) -> R =/= ok end, Res) of
|
||||
[] -> {200};
|
||||
[{error, Reason} | _] ->
|
||||
{400, #{code => 'UNEXPECTED_ERROR',
|
||||
message => iolist_to_binary(io_lib:format("~p", [Reason]))}}
|
||||
end;
|
||||
OtherVsn ->
|
||||
{400, #{code => 'ALREADY_INSTALLED',
|
||||
message => iolist_to_binary(io_lib:format("~p already installed",
|
||||
[OtherVsn]))}}
|
||||
end;
|
||||
{ok, _} ->
|
||||
{400, #{code => 'ALREADY_INSTALLED',
|
||||
message => iolist_to_binary(io_lib:format("~p is already installed", [FileName]))}}
|
||||
end;
|
||||
upload_install(post, #{} = Body) ->
|
||||
io:format("~p~n", [Body]),
|
||||
upload_install(post, #{}) ->
|
||||
{400, #{code => 'BAD_FORM_DATA',
|
||||
message =>
|
||||
<<"form-data should be `plugin=@packagename-vsn.tar.gz;type=application/x-gzip`">>}
|
||||
|
@ -309,7 +326,6 @@ update_boot_order(post, #{bindings := #{name := Name}, body := Body}) ->
|
|||
%% For RPC upload_install/2
|
||||
install_package(FileName, Bin) ->
|
||||
File = filename:join(emqx_plugins:install_dir(), FileName),
|
||||
io:format("xx:~p~n", [File]),
|
||||
ok = file:write_file(File, Bin),
|
||||
PackageName = string:trim(FileName, trailing, ".tar.gz"),
|
||||
emqx_plugins:ensure_installed(PackageName).
|
||||
|
|
|
@ -24,11 +24,11 @@ all() ->
|
|||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
emqx_mgmt_api_test_util:init_suite([emqx_conf]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_) ->
|
||||
emqx_mgmt_api_test_util:end_suite().
|
||||
emqx_mgmt_api_test_util:end_suite([emqx_conf]).
|
||||
|
||||
t_get(_Config) ->
|
||||
{ok, Configs} = get_configs(),
|
||||
|
@ -68,6 +68,16 @@ t_update(_Config) ->
|
|||
?assertMatch(#{<<"vm">> := #{<<"busy_port">> := false}}, SysMon4),
|
||||
ok.
|
||||
|
||||
t_log(_Config) ->
|
||||
{ok, Log} = get_config("log"),
|
||||
File = "log/emqx-test.log",
|
||||
Log1 = emqx_map_lib:deep_put([<<"file_handlers">>, <<"default">>, <<"enable">>], Log, true),
|
||||
Log2 = emqx_map_lib:deep_put([<<"file_handlers">>, <<"default">>, <<"file">>], Log1, File),
|
||||
{ok, #{}} = update_config(<<"log">>, Log2),
|
||||
{ok, Log3} = logger:get_handler_config(default),
|
||||
?assertMatch(#{config := #{file := File}}, Log3),
|
||||
ok.
|
||||
|
||||
t_zones(_Config) ->
|
||||
{ok, Zones} = get_zones(),
|
||||
ZonesKeys = lists:map(fun({K, _}) -> K end, hocon_schema:roots(emqx_zone_schema)),
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
%% <<"certfile">> => file_input()
|
||||
%% <<"cafile">> => file_input() %% backward compatible
|
||||
%% <<"cacertfile">> => file_input()
|
||||
%% <<"verify">> => boolean()
|
||||
%% <<"verify">> => verify_none | verify_peer
|
||||
%% <<"tls_versions">> => binary()
|
||||
%% <<"ciphers">> => binary()
|
||||
-type opts_key() :: binary() | atom().
|
||||
|
|
|
@ -32,7 +32,7 @@ prop_opts_input() ->
|
|||
[{keyfile, prop_file_or_content()},
|
||||
{certfile, prop_file_or_content()},
|
||||
{cacertfile, prop_file_or_content()},
|
||||
{verify, proper_types:boolean()},
|
||||
{verify, proper_types:oneof([verify_none, verify_peer])},
|
||||
{versions, prop_tls_versions()},
|
||||
{ciphers, prop_tls_ciphers()},
|
||||
{other, proper_types:binary()}].
|
||||
|
|
|
@ -35,6 +35,7 @@
|
|||
, restart/1
|
||||
, list/0
|
||||
, describe/1
|
||||
, parse_name_vsn/1
|
||||
]).
|
||||
|
||||
-export([ get_config/2
|
||||
|
@ -565,18 +566,27 @@ is_needed_by(AppToStop, RunningApp) ->
|
|||
|
||||
put_config(Key, Value) when is_atom(Key) ->
|
||||
put_config([Key], Value);
|
||||
put_config(Path, Value) when is_list(Path) ->
|
||||
emqx_config:put([?CONF_ROOT | Path], Value).
|
||||
put_config(Path, Values) when is_list(Path) ->
|
||||
Opts = #{rawconf_with_defaults => true, override_to => cluster},
|
||||
case emqx:update_config([?CONF_ROOT | Path], bin_key(Values), Opts) of
|
||||
{ok, _} -> ok;
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
bin_key(Map) when is_map(Map) ->
|
||||
maps:fold(fun(K, V, Acc) -> Acc#{bin(K) => V} end, #{}, Map);
|
||||
bin_key(List) when is_list(List) ->
|
||||
lists:map(fun(M) -> bin_key(M) end, List).
|
||||
|
||||
get_config(Key, Default) when is_atom(Key) ->
|
||||
get_config([Key], Default);
|
||||
get_config(Path, Default) ->
|
||||
emqx:get_config([?CONF_ROOT | Path], Default).
|
||||
emqx_conf:get([?CONF_ROOT | Path], Default).
|
||||
|
||||
install_dir() -> get_config(install_dir, "").
|
||||
|
||||
put_configured(Configured) ->
|
||||
ok = put_config(states, Configured).
|
||||
ok = put_config(states, bin_key(Configured)).
|
||||
|
||||
configured() ->
|
||||
get_config(states, []).
|
||||
|
|
Loading…
Reference in New Issue