diff --git a/Makefile b/Makefile index b4b68259f..07fe13cbb 100644 --- a/Makefile +++ b/Makefile @@ -4,31 +4,27 @@ PROJECT = emqx PROJECT_DESCRIPTION = EMQ X Broker PROJECT_VERSION = 3.0 -DEPS = jsx gproc gen_rpc lager ekka esockd cowboy clique lager_syslog +DEPS = jsx gproc gen_rpc ekka esockd cowboy clique dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0 dep_gproc = git https://github.com/uwiger/gproc 0.8.0 -dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0 -dep_lager = git https://github.com/erlang-lager/lager 3.6.5 +dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.3.0 dep_esockd = git https://github.com/emqx/esockd v5.4.2 -dep_ekka = git https://github.com/emqx/ekka v0.4.1 +dep_ekka = git https://github.com/emqx/ekka v0.5.0 dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0 dep_clique = git https://github.com/emqx/clique develop -dep_lager_syslog = git https://github.com/basho/lager_syslog 3.0.1 NO_AUTOPATCH = cuttlefish ERLC_OPTS += +debug_info -DAPPLICATION=emqx -ERLC_OPTS += +'{parse_transform, lager_transform}' BUILD_DEPS = cuttlefish -dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30 +dep_cuttlefish = git https://github.com/emqx/cuttlefish v2.1.0 #TEST_DEPS = emqx_ct_helplers #dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx -TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' EUNIT_OPTS = verbose @@ -47,7 +43,7 @@ CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) COVER = true -PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl lager compiler mnesia +PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl compiler mnesia DIALYZER_DIRS := ebin/ DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns diff --git a/etc/emqx.conf b/etc/emqx.conf index 1e04839e0..8b42eb682 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -328,91 +328,61 @@ rpc.socket_keepalive_count = 9 ## Log ##-------------------------------------------------------------------- -## Sets the log dir. +## Where to emit the logs. +## Enable the console (standard output) logs. +## +## Value: off | file | console | both +## - off: disable logs entirely +## - file: write logs to file +## - console: write logs to standard I/O +## - both: write logs both to file and standard I/O +log.to = console + +## The log severity level. +## +## Value: debug | info | notice | warning | error | critical | alert | emergency +## +## Note: Only the messages with severity level greater than or equal to +## this level will be logged. +## +## Default: error +log.level = error + +## The dir for log files. ## ## Value: Folder log.dir = {{ platform_log_dir }} -## Where to emit the console logs. +## The log filename for logs of level specified in "log.level". ## -## Value: off | file | console | both -## - off: disabled -## - file: write to file -## - console: write to stdout -## - both: file and stdout -log.console = console +## Value: String +## Default: emqx.log +log.file = emqx.log -## Sets the severity level of console log. -## -## Value: debug | info | notice | warning | error | critical | alert | emergency -## -## Default: error -log.console.level = error - -## The file where console logs will be writed to, when 'log.console' is set as 'file'. -## -## Value: File Name -## log.console.file = {{ platform_log_dir }}/console.log - -## Maximum file size for console log. -## -## Value: Number(bytes) -## log.console.size = 10485760 - -## The rotation count for console log. +## Maximum size of each log file. ## ## Value: Number -## log.console.count = 5 +## Default: 10M +## Supported Unit: KB | MB | G +log.rotation.size = 10MB -## The file where info logs will be writed to. -## -## Value: File Name -## log.info.file = {{ platform_log_dir }}/info.log - -## Maximum file size for info log. -## -## Value: Number(bytes) -## log.info.size = 10485760 - -## The rotation count for info log. +## Maximum rotation count of log files. ## ## Value: Number -## log.info.count = 5 +## Default: 5 +log.rotation.count = 5 -## The file where error logs will be writed to. +## To create additional log files for specific log levels. ## ## Value: File Name -log.error.file = {{ platform_log_dir }}/error.log - -## Maximum file size for error log. +## Format: log.$level.file = $filename, +## where "$level" can be one of: debug, info, notice, warning, +## error, critical, alert, emergency +## Note: Log files for a specific log level will contain all the logs +## that greater than or equal to that level ## -## Value: Number(bytes) -log.error.size = 10485760 - -## The rotation count for error log. -## -## Value: Number -log.error.count = 5 - -## Enable the crash log. -## -## Value: on | off -log.crash = on - -## The file for crash log. -## -## Value: File Name -log.crash.file = {{ platform_log_dir }}/crash.log - -## Enable syslog. -## -## Values: on | off -log.syslog = on - -## Sets the severity level for syslog. -## -## Value: debug | info | notice | warning | error | critical | alert | emergency -log.syslog.level = error +#log.info.file = info.log +#log.error.file = error.log ##-------------------------------------------------------------------- ## Authentication/Access Control diff --git a/priv/emqx.schema b/priv/emqx.schema index e9667b114..8026a6400 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -382,172 +382,114 @@ end}. %% Log %%-------------------------------------------------------------------- -{mapping, "log.dir", "lager.log_dir", [ +{mapping, "log.to", "kernel.logger", [ + {default, console}, + {datatype, {enum, [off, file, console, both]}} +]}. + +{mapping, "log.level", "kernel.logger_level", [ + {default, error}, + {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} +]}. + +{mapping, "log.logger_sasl_compatible", "kernel.logger_sasl_compatible", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "log.dir", "kernel.logger", [ {default, "log"}, {datatype, string} ]}. -{mapping, "log.console", "lager.handlers", [ - {default, file}, - {datatype, {enum, [off, file, console, both]}} -]}. - -{mapping, "log.console.level", "lager.handlers", [ - {default, info}, - {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} -]}. - -{mapping, "log.console.file", "lager.handlers", [ - {default, "log/console.log"}, +{mapping, "log.file", "kernel.logger", [ + {default, "emqx.log"}, {datatype, file} ]}. -{mapping, "log.console.size", "lager.handlers", [ - {default, 10485760}, - {datatype, integer} +{mapping, "log.rotation.size", "kernel.logger", [ + {default, "10MB"}, + {datatype, bytesize} ]}. -{mapping, "log.console.count", "lager.handlers", [ +{mapping, "log.rotation.count", "kernel.logger", [ {default, 5}, {datatype, integer} ]}. -{mapping, "log.info.file", "lager.handlers", [ +{mapping, "log.$level.file", "kernel.logger", [ {datatype, file} ]}. -{mapping, "log.info.size", "lager.handlers", [ - {default, 10485760}, - {datatype, integer} -]}. - -{mapping, "log.info.count", "lager.handlers", [ - {default, 5}, - {datatype, integer} -]}. - -{mapping, "log.error.file", "lager.handlers", [ - {default, "log/error.log"}, - {datatype, file} -]}. - -{mapping, "log.error.size", "lager.handlers", [ - {default, 10485760}, - {datatype, integer} -]}. - -{mapping, "log.error.count", "lager.handlers", [ - {default, 5}, - {datatype, integer} -]}. - -{mapping, "log.syslog", "lager.handlers", [ - {default, off}, - {datatype, flag} -]}. - -{mapping, "log.syslog.identity", "lager.handlers", [ - {default, "emqx"}, - {datatype, string} -]}. - -{mapping, "log.syslog.facility", "lager.handlers", [ - {default, local0}, - {datatype, {enum, [daemon, local0, local1, local2, local3, local4, local5, local6, local7]}} -]}. - -{mapping, "log.syslog.level", "lager.handlers", [ - {default, error}, - {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}} -]}. - -{mapping, "log.error.redirect", "lager.error_logger_redirect", [ - {default, on}, - {datatype, flag}, - hidden -]}. - -{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [ - {default, 1000}, - {datatype, integer}, - hidden -]}. - -{translation, - "lager.handlers", - fun(Conf) -> - ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf, undefined) of - undefined -> []; - ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename}, - {level, error}, - {size, cuttlefish:conf_get("log.error.size", Conf)}, - {date, "$D0"}, - {count, cuttlefish:conf_get("log.error.count", Conf)}]}] - end, - - InfoHandler = case cuttlefish:conf_get("log.info.file", Conf, undefined) of - undefined -> []; - InfoFilename -> [{lager_file_backend, [{file, InfoFilename}, - {level, info}, - {size, cuttlefish:conf_get("log.info.size", Conf)}, - {date, "$D0"}, - {count, cuttlefish:conf_get("log.info.count", Conf)}]}] - end, - - ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), - ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), - - ConsoleHandler = {lager_console_backend, [{level, ConsoleLogLevel}]}, - ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, - {level, ConsoleLogLevel}, - {size, cuttlefish:conf_get("log.console.size", Conf)}, - {date, "$D0"}, - {count, cuttlefish:conf_get("log.console.count", Conf)}]}, - - ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of - off -> []; - file -> [ConsoleFileHandler]; - console -> [ConsoleHandler]; - both -> [ConsoleHandler, ConsoleFileHandler]; - _ -> [] - end, - SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of - false -> []; - true -> [{lager_syslog_backend, - [cuttlefish:conf_get("log.syslog.identity", Conf), - cuttlefish:conf_get("log.syslog.facility", Conf), - cuttlefish:conf_get("log.syslog.level", Conf)]}] - end, - ConsoleHandlers ++ ErrorHandler ++ InfoHandler ++ SyslogHandler - end -}. - -{mapping, "log.crash", "lager.crash_log", [ - {default, on}, - {datatype, flag} -]}. - -{mapping, "log.crash.file", "lager.crash_log", [ - {default, "log/crash.log"}, - {datatype, file} -]}. - -{translation, - "lager.crash_log", - fun(Conf) -> - case cuttlefish:conf_get("log.crash", Conf) of - false -> undefined; - _ -> - cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log") - end - end}. - {mapping, "sasl", "sasl.sasl_error_logger", [ {default, off}, {datatype, flag}, hidden ]}. +{translation, "kernel.logger", fun(Conf) -> + LogTo = cuttlefish:conf_get("log.to", Conf), + TopLogLevel = cuttlefish:conf_get("log.level", Conf), + Formatter = {emqx_logger_formatter, + #{template => + [time," [",level,"] ", + {client_id, + [{peername, + [client_id,"@",peername," "], + [client_id, " "]}], + []}, + msg,"\n"]}}, + FileConf = fun(Filename) -> + #{type => wrap, + file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename), + max_no_files => cuttlefish:conf_get("log.rotation.count", Conf), + max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)} + end, + + %% For the default logger that outputs to console + DefaultHandler = + if LogTo =:= console orelse LogTo =:= both -> + [{handler, default, logger_std_h, + #{level => TopLogLevel, + config => #{type => standard_io}, + formatter => Formatter}}]; + true -> + [{handler, default, undefined}] + end, + + %% For the file logger + FileHandler = + if LogTo =:= file orelse LogTo =:= both -> + [{handler, file, logger_disk_log_h, + #{level => TopLogLevel, + config => FileConf(cuttlefish:conf_get("log.file", Conf)), + formatter => Formatter, + filesync_repeat_interval => no_repeat}}]; + true -> [] + end, + + %% For creating additional log files for specific log levels. + AdditionalLogFiles = + lists:foldl( + fun({[_, Level, _] = K, Filename}, Acc) when LogTo =:= file; LogTo =:= both -> + case cuttlefish_variable:is_fuzzy_match(K, ["log", "$level", "file"]) of + true -> [{Level, Filename} | Acc]; + false -> Acc + end; + ({_K, _V}, Acc) -> + Acc + end, [], Conf), + AdditionalHandlers = + [{handler, list_to_atom("file_for_"++Level), logger_disk_log_h, + #{level => list_to_atom(Level), + config => FileConf(Filename), + formatter => Formatter, + filesync_repeat_interval => no_repeat}} + || {Level, Filename} <- AdditionalLogFiles], + + DefaultHandler ++ FileHandler ++ AdditionalHandlers +end}. + %%-------------------------------------------------------------------- %% Authentication/ACL %%-------------------------------------------------------------------- diff --git a/rebar.config b/rebar.config index c64e2cfcb..0af3f6b3e 100644 --- a/rebar.config +++ b/rebar.config @@ -1,17 +1,15 @@ {deps, [{jsx, "2.9.0"}, {gproc, "0.8.0"}, - {lager, "3.6.5"}, - {cowboy, "2.4.0"}, - {lager_syslog, {git, "https://github.com/basho/lager_syslog", {branch, "3.0.1"}}} + {cowboy, "2.4.0"} ]}. %% appended to deps in rebar.config.script {github_emqx_deps, - [{gen_rpc, "2.2.0"}, - {ekka, "v0.4.1"}, + [{gen_rpc, "2.3.0"}, + {ekka, "v0.5.0"}, {clique, "develop"}, {esockd, "v5.4.2"}, - {cuttlefish, "emqx30"} + {cuttlefish, "v2.1.0"} ]}. {edoc_opts, [{preprocess, true}]}. @@ -20,7 +18,6 @@ warn_unused_import, warn_obsolete_guard, debug_info, - {parse_transform, lager_transform}, {d, 'APPLICATION', emqx}]}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, diff --git a/src/emqx.app.src b/src/emqx.app.src index d44707186..424801f52 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -3,8 +3,8 @@ {vsn,"3.0"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd, - cowboy,lager_syslog]}, + {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd, + cowboy]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 4a39e46aa..a4e884cb9 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -47,12 +47,12 @@ stop(_State) -> %%-------------------------------------------------------------------- print_banner() -> - io:format("Starting ~s on node ~s~n", [?APP, node()]). + logger:info("Starting ~s on node ~s", [?APP, node()]). print_vsn() -> {ok, Descr} = application:get_key(description), {ok, Vsn} = application:get_key(vsn), - io:format("~s ~s is running now!~n", [Descr, Vsn]). + logger:info("~s ~s is running now!", [Descr, Vsn]). %%-------------------------------------------------------------------- %% Autocluster diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index b8f0dff56..b1ccdc44b 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -323,7 +323,7 @@ format_mountpoint(Prefix) -> store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg -> [Data | Queue]; store(memory, _Data, Queue, _MaxPendingMsg) -> - lager:error("Beyond max pending messages"), + logger:error("Beyond max pending messages"), Queue; store(disk, Data, Queue, _MaxPendingMsg)-> [Data | Queue]. diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index d3ef43069..6987f03f9 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -35,7 +35,7 @@ start_listener({Proto, ListenOn, Options}) -> {ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p!", + io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p~n!", [Proto, format(ListenOn), Reason]) end. @@ -119,7 +119,7 @@ stop_listener({Proto, ListenOn, Opts}) -> ok -> io:format("Stop mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p.", + io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.", [Proto, format(ListenOn), Reason]) end. diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index 2af5cd6b7..64bac9968 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -22,38 +22,47 @@ -export([error/1, error/2, error/3]). -export([critical/1, critical/2, critical/3]). +-export([add_proc_metadata/1]). + debug(Msg) -> - lager:debug(Msg). + logger:debug(Msg). debug(Format, Args) -> - lager:debug(Format, Args). -debug(Metadata, Format, Args) when is_list(Metadata) -> - lager:debug(Metadata, Format, Args). + logger:debug(Format, Args). +debug(Metadata, Format, Args) when is_map(Metadata) -> + logger:debug(Format, Args, Metadata). info(Msg) -> - lager:info(Msg). + logger:info(Msg). info(Format, Args) -> - lager:info(Format, Args). -info(Metadata, Format, Args) when is_list(Metadata) -> - lager:info(Metadata, Format, Args). + logger:info(Format, Args). +info(Metadata, Format, Args) when is_map(Metadata) -> + logger:info(Format, Args, Metadata). warning(Msg) -> - lager:warning(Msg). + logger:warning(Msg). warning(Format, Args) -> - lager:warning(Format, Args). -warning(Metadata, Format, Args) when is_list(Metadata) -> - lager:warning(Metadata, Format, Args). + logger:warning(Format, Args). +warning(Metadata, Format, Args) when is_map(Metadata) -> + logger:warning(Format, Args, Metadata). error(Msg) -> - lager:error(Msg). + logger:error(Msg). error(Format, Args) -> - lager:error(Format, Args). -error(Metadata, Format, Args) when is_list(Metadata) -> - lager:error(Metadata, Format, Args). + logger:error(Format, Args). +error(Metadata, Format, Args) when is_map(Metadata) -> + logger:error(Format, Args, Metadata). critical(Msg) -> - lager:critical(Msg). + logger:critical(Msg). critical(Format, Args) -> - lager:critical(Format, Args). -critical(Metadata, Format, Args) when is_list(Metadata) -> - lager:critical(Metadata, Format, Args). + logger:critical(Format, Args). +critical(Metadata, Format, Args) when is_map(Metadata) -> + logger:critical(Format, Args, Metadata). +add_proc_metadata(Meta) -> + case logger:get_process_metadata() of + undefined -> + logger:set_process_metadata(Meta); + OldMeta -> + logger:set_process_metadata(maps:merge(OldMeta, Meta)) + end. \ No newline at end of file diff --git a/src/emqx_logger_formatter.erl b/src/emqx_logger_formatter.erl new file mode 100644 index 000000000..3c5d9fc16 --- /dev/null +++ b/src/emqx_logger_formatter.erl @@ -0,0 +1,360 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +%% This file is copied from lib/kernel/src/logger_formatter.erl, and +%% modified for a more concise time format other than the default RFC3339. + +-module(emqx_logger_formatter). + +-export([format/2]). +-export([check_config/1]). + +-define(DEFAULT_FORMAT_TEMPLATE_SINGLE, [time," ",level,": ",msg,"\n"]). + +-define(FormatP, "~0tp"). + +-define(IS_STRING(String), + (is_list(String) orelse is_binary(String))). + +%%%----------------------------------------------------------------- +%%% Types +-type config() :: #{chars_limit => pos_integer() | unlimited, + depth => pos_integer() | unlimited, + max_size => pos_integer() | unlimited, + report_cb => logger:report_cb(), + quit => template()}. +-type template() :: [metakey() | {metakey(),template(),template()} | string()]. +-type metakey() :: atom() | [atom()]. + +%%%----------------------------------------------------------------- +%%% API +-spec format(LogEvent,Config) -> unicode:chardata() when + LogEvent :: logger:log_event(), + Config :: config(). +format(#{level:=Level,msg:=Msg0,meta:=Meta},Config0) + when is_map(Config0) -> + Config = add_default_config(Config0), + Template = maps:get(template,Config), + {BT,AT0} = lists:splitwith(fun(msg) -> false; (_) -> true end, Template), + {DoMsg,AT} = + case AT0 of + [msg|Rest] -> {true,Rest}; + _ ->{false,AT0} + end, + B = do_format(Level,Meta,BT,Config), + A = do_format(Level,Meta,AT,Config), + MsgStr = + if DoMsg -> + Config1 = + case maps:get(chars_limit,Config) of + unlimited -> + Config; + Size0 -> + Size = + case Size0 - string:length([B,A]) of + S when S>=0 -> S; + _ -> 0 + end, + Config#{chars_limit=>Size} + end, + string:trim(format_msg(Msg0,Meta,Config1)); + true -> + "" + end, + truncate([B,MsgStr,A],maps:get(max_size,Config)). + +do_format(Level,Data,[level|Format],Config) -> + [to_string(level,Level,Config)|do_format(Level,Data,Format,Config)]; +do_format(Level,Data,[{Key,IfExist,Else}|Format],Config) -> + String = + case value(Key,Data) of + {ok,Value} -> do_format(Level,Data#{Key=>Value},IfExist,Config); + error -> do_format(Level,Data,Else,Config) + end, + [String|do_format(Level,Data,Format,Config)]; +do_format(Level,Data,[Key|Format],Config) + when is_atom(Key) orelse + (is_list(Key) andalso is_atom(hd(Key))) -> + String = + case value(Key,Data) of + {ok,Value} -> to_string(Key,Value,Config); + error -> "" + end, + [String|do_format(Level,Data,Format,Config)]; +do_format(Level,Data,[Str|Format],Config) -> + [Str|do_format(Level,Data,Format,Config)]; +do_format(_Level,_Data,[],_Config) -> + []. + +value(Key,Meta) when is_map_key(Key,Meta) -> + {ok,maps:get(Key,Meta)}; +value([Key|Keys],Meta) when is_map_key(Key,Meta) -> + value(Keys,maps:get(Key,Meta)); +value([],Value) -> + {ok,Value}; +value(_,_) -> + error. + +to_string(time,Time,Config) -> + format_time(Time,Config); +to_string(mfa,MFA,Config) -> + format_mfa(MFA,Config); +to_string(_,Value,Config) -> + to_string(Value,Config). + +to_string(X,_) when is_atom(X) -> + atom_to_list(X); +to_string(X,_) when is_integer(X) -> + integer_to_list(X); +to_string(X,_) when is_pid(X) -> + pid_to_list(X); +to_string(X,_) when is_reference(X) -> + ref_to_list(X); +to_string(X,_) when is_list(X) -> + case printable_list(lists:flatten(X)) of + true -> X; + _ -> io_lib:format(?FormatP,[X]) + end; +to_string(X,_) -> + io_lib:format("~s",[X]). + +printable_list([]) -> + false; +printable_list(X) -> + io_lib:printable_list(X). + +format_msg({string,Chardata},Meta,Config) -> + format_msg({"~ts",[Chardata]},Meta,Config); +format_msg({report,_}=Msg,Meta,#{report_cb:=Fun}=Config) + when is_function(Fun,1); is_function(Fun,2) -> + format_msg(Msg,Meta#{report_cb=>Fun},maps:remove(report_cb,Config)); +format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,1) -> + try Fun(Report) of + {Format,Args} when is_list(Format), is_list(Args) -> + format_msg({Format,Args},maps:remove(report_cb,Meta),Config); + Other -> + format_msg({"REPORT_CB/1 ERROR: ~0tp; Returned: ~0tp", + [Report,Other]},Meta,Config) + catch C:R:S -> + format_msg({"REPORT_CB/1 CRASH: ~0tp; Reason: ~0tp", + [Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]},Meta,Config) + end; +format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,2) -> + try Fun(Report,maps:with([depth,chars_limit,single_line],Config)) of + Chardata when ?IS_STRING(Chardata) -> + try chardata_to_list(Chardata) % already size limited by report_cb + catch _:_ -> + format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp", + [Report,Chardata]},Meta,Config) + end; + Other -> + format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp", + [Report,Other]},Meta,Config) + catch C:R:S -> + format_msg({"REPORT_CB/2 CRASH: ~0tp; Reason: ~0tp", + [Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]}, + Meta,Config) + end; +format_msg({report,Report},Meta,Config) -> + format_msg({report,Report}, + Meta#{report_cb=>fun logger:format_report/1}, + Config); +format_msg(Msg,_Meta,#{depth:=Depth,chars_limit:=CharsLimit}) -> + Opts = chars_limit_to_opts(CharsLimit), + do_format_msg(Msg, Depth, Opts). + +chars_limit_to_opts(unlimited) -> []; +chars_limit_to_opts(CharsLimit) -> [{chars_limit,CharsLimit}]. + +do_format_msg({Format0,Args},Depth,Opts) -> + try + Format1 = io_lib:scan_format(Format0, Args), + Format = reformat(Format1, Depth), + io_lib:build_text(Format,Opts) + catch C:R:S -> + FormatError = "FORMAT ERROR: ~0tp - ~0tp", + case Format0 of + FormatError -> + %% already been here - avoid failing cyclically + erlang:raise(C,R,S); + _ -> + format_msg({FormatError,[Format0,Args]},Depth,Opts) + end + end. + +reformat(Format,unlimited) -> + Format; +reformat([#{control_char:=C}=M|T], Depth) when C =:= $p -> + [limit_depth(M#{width => 0}, Depth)|reformat(T, Depth)]; +reformat([#{control_char:=C}=M|T], Depth) when C =:= $P -> + [M#{width => 0}|reformat(T, Depth)]; +reformat([#{control_char:=C}=M|T], Depth) when C =:= $p; C =:= $w -> + [limit_depth(M, Depth)|reformat(T, Depth)]; +reformat([H|T], Depth) -> + [H|reformat(T, Depth)]; +reformat([], _) -> + []. + +limit_depth(M0, unlimited) -> + M0; +limit_depth(#{control_char:=C0, args:=Args}=M0, Depth) -> + C = C0 - ($a - $A), %To uppercase. + M0#{control_char:=C,args:=Args++[Depth]}. + +chardata_to_list(Chardata) -> + case unicode:characters_to_list(Chardata,unicode) of + List when is_list(List) -> + List; + Error -> + throw(Error) + end. + +truncate(String,unlimited) -> + String; +truncate(String,Size) -> + Length = string:length(String), + if Length>Size -> + case lists:reverse(lists:flatten(String)) of + [$\n|_] -> + string:slice(String,0,Size-4)++"...\n"; + _ -> + string:slice(String,0,Size-3)++"..." + end; + true -> + String + end. + +%% Convert microseconds-timestamp into local datatime string in milliseconds +format_time(SysTime,#{}) + when is_integer(SysTime) -> + Ms = SysTime rem 1000000 div 1000, + {Date, _Time = {H, Mi, S}} = calendar:system_time_to_local_time(SysTime, microsecond), + format_time({Date, {H, Mi, S, Ms}}). +format_time({{Y, M, D}, {H, Mi, S, Ms}}) -> + io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]); +format_time({{Y, M, D}, {H, Mi, S}}) -> + io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", [Y, M, D, H, Mi, S]). + +format_mfa({M,F,A},_) when is_atom(M), is_atom(F), is_integer(A) -> + atom_to_list(M)++":"++atom_to_list(F)++"/"++integer_to_list(A); +format_mfa({M,F,A},Config) when is_atom(M), is_atom(F), is_list(A) -> + format_mfa({M,F,length(A)},Config); +format_mfa(MFA,Config) -> + to_string(MFA,Config). + +%% Ensure that all valid configuration parameters exist in the final +%% configuration map +add_default_config(Config0) -> + Default = + #{chars_limit=>unlimited, + error_logger_notice_header=>info}, + MaxSize = get_max_size(maps:get(max_size,Config0,undefined)), + Depth = get_depth(maps:get(depth,Config0,undefined)), + add_default_template(maps:merge(Default,Config0#{max_size=>MaxSize, + depth=>Depth})). + +add_default_template(#{template:=_}=Config) -> + Config; +add_default_template(Config) -> + Config#{template=>?DEFAULT_FORMAT_TEMPLATE_SINGLE}. + +get_max_size(undefined) -> + unlimited; +get_max_size(S) -> + max(10,S). + +get_depth(undefined) -> + error_logger:get_format_depth(); +get_depth(S) -> + max(5,S). + +-spec check_config(Config) -> ok | {error,term()} when + Config :: config(). +check_config(Config) when is_map(Config) -> + do_check_config(maps:to_list(Config)); +check_config(Config) -> + {error,{invalid_formatter_config,?MODULE,Config}}. + +do_check_config([{Type,L}|Config]) when Type == chars_limit; + Type == depth; + Type == max_size -> + case check_limit(L) of + ok -> do_check_config(Config); + error -> {error,{invalid_formatter_config,?MODULE,{Type,L}}} + end; +do_check_config([{error_logger_notice_header,ELNH}|Config]) when ELNH == info; + ELNH == notice -> + do_check_config(Config); +do_check_config([{report_cb,RCB}|Config]) when is_function(RCB,1); + is_function(RCB,2) -> + do_check_config(Config); +do_check_config([{template,T}|Config]) -> + case check_template(T) of + ok -> do_check_config(Config); + error -> {error,{invalid_formatter_template,?MODULE,T}} + end; + +do_check_config([C|_]) -> + {error,{invalid_formatter_config,?MODULE,C}}; +do_check_config([]) -> + ok. + +check_limit(L) when is_integer(L), L>0 -> + ok; +check_limit(unlimited) -> + ok; +check_limit(_) -> + error. + +check_template([Key|T]) when is_atom(Key) -> + check_template(T); +check_template([Key|T]) when is_list(Key), is_atom(hd(Key)) -> + case lists:all(fun(X) when is_atom(X) -> true; + (_) -> false + end, + Key) of + true -> + check_template(T); + false -> + error + end; +check_template([{Key,IfExist,Else}|T]) + when is_atom(Key) orelse + (is_list(Key) andalso is_atom(hd(Key))) -> + case check_template(IfExist) of + ok -> + case check_template(Else) of + ok -> + check_template(T); + error -> + error + end; + error -> + error + end; +check_template([Str|T]) when is_list(Str) -> + case io_lib:printable_unicode_list(Str) of + true -> check_template(T); + false -> error + end; +check_template([]) -> + ok; +check_template(_) -> + error. diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index 09e732f68..60c4b6351 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -21,7 +21,7 @@ load() -> lists:foreach( fun({Mod, Env}) -> ok = Mod:load(Env), - io:format("Load ~s module successfully.~n", [Mod]) + logger:info("Load ~s module successfully.", [Mod]) end, emqx_config:get_env(modules, [])). -spec(unload() -> ok). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 51951ccda..9289dd77c 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -72,9 +72,8 @@ -define(NO_PROPS, undefined). --define(LOG(Level, Format, Args, PState), - emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format, - [PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])). +-define(LOG(Level, Format, Args, _PState), + emqx_logger:Level("[MQTT] " ++ Format, Args)). %%------------------------------------------------------------------------------ %% Init @@ -82,6 +81,7 @@ -spec(init(map(), list()) -> state()). init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> + emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}), Zone = proplists:get_value(zone, Options), #pstate{zone = Zone, sendfun = SendFun, @@ -287,7 +287,7 @@ process_packet(?CONNECT_PACKET( client_id = ClientId, username = Username, password = Password} = Connect), PState) -> - + emqx_logger:add_proc_metadata(#{client_id => ClientId}), %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) WillMsg = make_will_msg(Connect), diff --git a/src/emqx_session.erl b/src/emqx_session.erl index cd7e80c84..6cfef70d2 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -161,9 +161,8 @@ -define(TIMEOUT, 60000). --define(LOG(Level, Format, Args, State), - emqx_logger:Level([{client, State#state.client_id}], - "Session(~s): " ++ Format, [State#state.client_id | Args])). +-define(LOG(Level, Format, Args, _State), + emqx_logger:Level("[Session] " ++ Format, Args)). %% @doc Start a session proc. -spec(start_link(SessAttrs :: map()) -> {ok, pid()}). @@ -341,6 +340,7 @@ init([Parent, #{zone := Zone, max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum, will_msg := WillMsg}]) -> + emqx_logger:add_proc_metadata(#{client_id => ClientId}), process_flag(trap_exit, true), true = link(ConnPid), IdleTimout = get_env(Zone, idle_timeout, 30000), diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index b42d96aa4..e3bd4a2a9 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -25,9 +25,9 @@ -define(SYSMON, ?MODULE). -define(LOG(Msg, ProcInfo), - emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])). + emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])). -define(LOG(Msg, ProcInfo, PortInfo), - emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])). + emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])). %% @doc Start system monitor -spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}). @@ -130,7 +130,7 @@ handle_info({timeout, _Ref, reset}, State) -> {noreply, State#state{events = []}, hibernate}; handle_info(Info, State) -> - lager:error("[SYSMON] unexpected Info: ~p", [Info]), + logger:error("[SYSMON] unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 44ad6c26f..3d060dee9 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -25,12 +25,20 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {level, traces}). +-record(state, {level, org_top_level, traces}). --type(trace_who() :: {client | topic, binary()}). +-type(trace_who() :: {client_id | topic, binary()}). -define(TRACER, ?MODULE). --define(OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]). +-define(FORMAT, {emqx_logger_formatter, + #{template => + [time," [",level,"] ", + {client_id, + [{peername, + [client_id,"@",peername," "], + [client_id, " "]}], + []}, + msg,"\n"]}}). -spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> @@ -40,26 +48,26 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) -> %% Dont' trace '$SYS' publish ignore; trace(publish, #message{from = From, topic = Topic, payload = Payload}) - when is_binary(From); is_atom(From) -> - emqx_logger:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + when is_binary(From); is_atom(From) -> + emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]). %%------------------------------------------------------------------------------ %% Start/Stop trace %%------------------------------------------------------------------------------ -%% @doc Start to trace client or topic. +%% @doc Start to trace client_id or topic. -spec(start_trace(trace_who(), string()) -> ok | {error, term()}). -start_trace({client, ClientId}, LogFile) -> - start_trace({start_trace, {client, ClientId}, LogFile}); +start_trace({client_id, ClientId}, LogFile) -> + start_trace({start_trace, {client_id, ClientId}, LogFile}); start_trace({topic, Topic}, LogFile) -> start_trace({start_trace, {topic, Topic}, LogFile}). start_trace(Req) -> gen_server:call(?MODULE, Req, infinity). -%% @doc Stop tracing client or topic. +%% @doc Stop tracing client_id or topic. -spec(stop_trace(trace_who()) -> ok | {error, term()}). -stop_trace({client, ClientId}) -> - gen_server:call(?MODULE, {stop_trace, {client, ClientId}}); +stop_trace({client_id, ClientId}) -> + gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}}); stop_trace({topic, Topic}) -> gen_server:call(?MODULE, {stop_trace, {topic, Topic}}). @@ -73,37 +81,45 @@ lookup_traces() -> %%------------------------------------------------------------------------------ init([]) -> - {ok, #state{level = emqx_config:get_env(trace_level, debug), traces = #{}}}. + {ok, #state{level = emqx_config:get_env(trace_level, debug), + org_top_level = get_top_level(), + traces = #{}}}. handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) -> - case catch lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of - {ok, exists} -> - {reply, {error, already_exists}, State}; - {ok, Trace} -> - {reply, ok, State#state{traces = maps:put(Who, {Trace, LogFile}, Traces)}}; + case logger:add_handler(handler_id(Who), logger_disk_log_h, + #{level => Level, + formatter => ?FORMAT, + filesync_repeat_interval => 1000, + config => #{type => halt, file => LogFile}, + filter_default => stop, + filters => [{meta_key_filter, + {fun filter_by_meta_key/2, Who} }]}) of + ok -> + set_top_level(all), % open the top logger level to 'all' + emqx_logger:info("[Tracer] start trace for ~p", [Who]), + {reply, ok, State#state{traces = maps:put(Who, LogFile, Traces)}}; {error, Reason} -> - emqx_logger:error("[Tracer] trace error: ~p", [Reason]), - {reply, {error, Reason}, State}; - {'EXIT', Error} -> - emqx_logger:error("[Tracer] trace exit: ~p", [Error]), - {reply, {error, Error}, State} + emqx_logger:error("[Tracer] start trace for ~p failed, error: ~p", [Who, Reason]), + {reply, {error, Reason}, State} end; -handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) -> +handle_call({stop_trace, Who}, _From, State = #state{org_top_level = OrgTopLevel, traces = Traces}) -> case maps:find(Who, Traces) of - {ok, {Trace, _LogFile}} -> - case lager:stop_trace(Trace) of - ok -> ok; - {error, Error} -> - emqx_logger:error("[Tracer] stop trace ~p error: ~p", [Who, Error]) + {ok, _LogFile} -> + case logger:remove_handler(handler_id(Who)) of + ok -> + emqx_logger:info("[Tracer] stop trace for ~p", [Who]); + {error, Reason} -> + emqx_logger:error("[Tracer] stop trace for ~p failed, error: ~p", [Who, Reason]) end, + set_top_level(OrgTopLevel), % reset the top logger level to original value {reply, ok, State#state{traces = maps:remove(Who, Traces)}}; error -> {reply, {error, not_found}, State} end; handle_call(lookup_traces, _From, State = #state{traces = Traces}) -> - {reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} <- maps:to_list(Traces)], State}; + {reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State}; handle_call(Req, _From, State) -> emqx_logger:error("[Tracer] unexpected call: ~p", [Req]), @@ -123,3 +139,25 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handler_id({topic, Topic}) -> + list_to_atom("topic_" ++ binary_to_list(Topic)); +handler_id({client_id, ClientId}) -> + list_to_atom("clientid_" ++ binary_to_list(ClientId)). + +get_top_level() -> + #{level := OrgTopLevel} = logger:get_primary_config(), + OrgTopLevel. + +set_top_level(Level) -> + logger:set_primary_config(level, Level). + +filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) -> + case maps:find(MetaKey, Meta) of + {ok, MetaValue} -> LogEvent; + {ok, Topic} when MetaKey =:= topic -> + case emqx_topic:match(Topic, MetaValue) of + true -> LogEvent; + false -> ignore + end; + _ -> ignore + end. \ No newline at end of file diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 7ff0b55a8..6a8b71094 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -45,9 +45,8 @@ -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). --define(WSLOG(Level, Format, Args, State), - emqx_logger:Level("MQTT/WS(~s): " ++ Format, - [esockd_net:format(State#state.peername) | Args])). +-define(WSLOG(Level, Format, Args, _State), + emqx_logger:Level("[MQTT/WS] " ++ Format, Args)). %%------------------------------------------------------------------------------ %% API diff --git a/test/emqx_SUITE_data/slave.config b/test/emqx_SUITE_data/slave.config index dd3a5f32b..1cdf851b5 100644 --- a/test/emqx_SUITE_data/slave.config +++ b/test/emqx_SUITE_data/slave.config @@ -33,21 +33,6 @@ {large_heap,8388608}, {busy_port,false}, {busy_dist_port,true}]}]}, - {sasl,[{sasl_error_logger,false}]}, - {lager, - [{error_logger_hwm,1000}, - {error_logger_redirect,true}, - {log_dir,"{{ platform_log_dir }}"}, - {handlers, - [{lager_console_backend,error}, - {lager_file_backend, - [{file,"{{ platform_log_dir }}/error.log"}, - {level,error}, - {size,10485760}, - {date,"$D0"}, - {count,5}]}, - {lager_syslog_backend,["emq",local0,error]}]}, - {crash_log,"{{ platform_log_dir }}/crash.log"}]}, {gen_rpc, [{socket_keepalive_count,2}, {socket_keepalive_interval,5}, diff --git a/test/ws_client.erl b/test/ws_client.erl index 25f38164d..39f01467a 100644 --- a/test/ws_client.erl +++ b/test/ws_client.erl @@ -52,10 +52,10 @@ init(_, _WSReq) -> {ok, #state{}}. websocket_handle(Frame, _, State = #state{waiting = undefined, buffer = Buffer}) -> - lager:info("Client received frame~p", [Frame]), + logger:info("Client received frame~p", [Frame]), {ok, State#state{buffer = [Frame|Buffer]}}; websocket_handle(Frame, _, State = #state{waiting = From}) -> - lager:info("Client received frame~p", [Frame]), + logger:info("Client received frame~p", [Frame]), From ! Frame, {ok, State#state{waiting = undefined}}.