Fix conflicts
This commit is contained in:
commit
faeda253e1
14
Makefile
14
Makefile
|
@ -4,31 +4,27 @@ PROJECT = emqx
|
|||
PROJECT_DESCRIPTION = EMQ X Broker
|
||||
PROJECT_VERSION = 3.0
|
||||
|
||||
DEPS = jsx gproc gen_rpc lager ekka esockd cowboy clique lager_syslog
|
||||
DEPS = jsx gproc gen_rpc ekka esockd cowboy clique
|
||||
|
||||
dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0
|
||||
dep_gproc = git https://github.com/uwiger/gproc 0.8.0
|
||||
dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0
|
||||
dep_lager = git https://github.com/erlang-lager/lager 3.6.5
|
||||
dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.3.0
|
||||
dep_esockd = git https://github.com/emqx/esockd v5.4.2
|
||||
dep_ekka = git https://github.com/emqx/ekka v0.4.1
|
||||
dep_ekka = git https://github.com/emqx/ekka v0.5.0
|
||||
dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0
|
||||
dep_clique = git https://github.com/emqx/clique develop
|
||||
dep_lager_syslog = git https://github.com/basho/lager_syslog 3.0.1
|
||||
|
||||
NO_AUTOPATCH = cuttlefish
|
||||
|
||||
ERLC_OPTS += +debug_info -DAPPLICATION=emqx
|
||||
ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||
|
||||
BUILD_DEPS = cuttlefish
|
||||
dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30
|
||||
dep_cuttlefish = git https://github.com/emqx/cuttlefish v2.1.0
|
||||
|
||||
#TEST_DEPS = emqx_ct_helplers
|
||||
#dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers
|
||||
|
||||
TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx
|
||||
TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||
|
||||
EUNIT_OPTS = verbose
|
||||
|
||||
|
@ -47,7 +43,7 @@ CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
|
|||
|
||||
COVER = true
|
||||
|
||||
PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl lager compiler mnesia
|
||||
PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl compiler mnesia
|
||||
DIALYZER_DIRS := ebin/
|
||||
DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns
|
||||
|
||||
|
|
110
etc/emqx.conf
110
etc/emqx.conf
|
@ -328,91 +328,61 @@ rpc.socket_keepalive_count = 9
|
|||
## Log
|
||||
##--------------------------------------------------------------------
|
||||
|
||||
## Sets the log dir.
|
||||
## Where to emit the logs.
|
||||
## Enable the console (standard output) logs.
|
||||
##
|
||||
## Value: off | file | console | both
|
||||
## - off: disable logs entirely
|
||||
## - file: write logs to file
|
||||
## - console: write logs to standard I/O
|
||||
## - both: write logs both to file and standard I/O
|
||||
log.to = console
|
||||
|
||||
## The log severity level.
|
||||
##
|
||||
## Value: debug | info | notice | warning | error | critical | alert | emergency
|
||||
##
|
||||
## Note: Only the messages with severity level greater than or equal to
|
||||
## this level will be logged.
|
||||
##
|
||||
## Default: error
|
||||
log.level = error
|
||||
|
||||
## The dir for log files.
|
||||
##
|
||||
## Value: Folder
|
||||
log.dir = {{ platform_log_dir }}
|
||||
|
||||
## Where to emit the console logs.
|
||||
## The log filename for logs of level specified in "log.level".
|
||||
##
|
||||
## Value: off | file | console | both
|
||||
## - off: disabled
|
||||
## - file: write to file
|
||||
## - console: write to stdout
|
||||
## - both: file and stdout
|
||||
log.console = console
|
||||
## Value: String
|
||||
## Default: emqx.log
|
||||
log.file = emqx.log
|
||||
|
||||
## Sets the severity level of console log.
|
||||
##
|
||||
## Value: debug | info | notice | warning | error | critical | alert | emergency
|
||||
##
|
||||
## Default: error
|
||||
log.console.level = error
|
||||
|
||||
## The file where console logs will be writed to, when 'log.console' is set as 'file'.
|
||||
##
|
||||
## Value: File Name
|
||||
## log.console.file = {{ platform_log_dir }}/console.log
|
||||
|
||||
## Maximum file size for console log.
|
||||
##
|
||||
## Value: Number(bytes)
|
||||
## log.console.size = 10485760
|
||||
|
||||
## The rotation count for console log.
|
||||
## Maximum size of each log file.
|
||||
##
|
||||
## Value: Number
|
||||
## log.console.count = 5
|
||||
## Default: 10M
|
||||
## Supported Unit: KB | MB | G
|
||||
log.rotation.size = 10MB
|
||||
|
||||
## The file where info logs will be writed to.
|
||||
##
|
||||
## Value: File Name
|
||||
## log.info.file = {{ platform_log_dir }}/info.log
|
||||
|
||||
## Maximum file size for info log.
|
||||
##
|
||||
## Value: Number(bytes)
|
||||
## log.info.size = 10485760
|
||||
|
||||
## The rotation count for info log.
|
||||
## Maximum rotation count of log files.
|
||||
##
|
||||
## Value: Number
|
||||
## log.info.count = 5
|
||||
## Default: 5
|
||||
log.rotation.count = 5
|
||||
|
||||
## The file where error logs will be writed to.
|
||||
## To create additional log files for specific log levels.
|
||||
##
|
||||
## Value: File Name
|
||||
log.error.file = {{ platform_log_dir }}/error.log
|
||||
|
||||
## Maximum file size for error log.
|
||||
## Format: log.$level.file = $filename,
|
||||
## where "$level" can be one of: debug, info, notice, warning,
|
||||
## error, critical, alert, emergency
|
||||
## Note: Log files for a specific log level will contain all the logs
|
||||
## that greater than or equal to that level
|
||||
##
|
||||
## Value: Number(bytes)
|
||||
log.error.size = 10485760
|
||||
|
||||
## The rotation count for error log.
|
||||
##
|
||||
## Value: Number
|
||||
log.error.count = 5
|
||||
|
||||
## Enable the crash log.
|
||||
##
|
||||
## Value: on | off
|
||||
log.crash = on
|
||||
|
||||
## The file for crash log.
|
||||
##
|
||||
## Value: File Name
|
||||
log.crash.file = {{ platform_log_dir }}/crash.log
|
||||
|
||||
## Enable syslog.
|
||||
##
|
||||
## Values: on | off
|
||||
log.syslog = on
|
||||
|
||||
## Sets the severity level for syslog.
|
||||
##
|
||||
## Value: debug | info | notice | warning | error | critical | alert | emergency
|
||||
log.syslog.level = error
|
||||
#log.info.file = info.log
|
||||
#log.error.file = error.log
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## Authentication/Access Control
|
||||
|
|
230
priv/emqx.schema
230
priv/emqx.schema
|
@ -382,172 +382,114 @@ end}.
|
|||
%% Log
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
{mapping, "log.dir", "lager.log_dir", [
|
||||
{mapping, "log.to", "kernel.logger", [
|
||||
{default, console},
|
||||
{datatype, {enum, [off, file, console, both]}}
|
||||
]}.
|
||||
|
||||
{mapping, "log.level", "kernel.logger_level", [
|
||||
{default, error},
|
||||
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}}
|
||||
]}.
|
||||
|
||||
{mapping, "log.logger_sasl_compatible", "kernel.logger_sasl_compatible", [
|
||||
{default, true},
|
||||
{datatype, {enum, [true, false]}}
|
||||
]}.
|
||||
|
||||
{mapping, "log.dir", "kernel.logger", [
|
||||
{default, "log"},
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "log.console", "lager.handlers", [
|
||||
{default, file},
|
||||
{datatype, {enum, [off, file, console, both]}}
|
||||
]}.
|
||||
|
||||
{mapping, "log.console.level", "lager.handlers", [
|
||||
{default, info},
|
||||
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
|
||||
]}.
|
||||
|
||||
{mapping, "log.console.file", "lager.handlers", [
|
||||
{default, "log/console.log"},
|
||||
{mapping, "log.file", "kernel.logger", [
|
||||
{default, "emqx.log"},
|
||||
{datatype, file}
|
||||
]}.
|
||||
|
||||
{mapping, "log.console.size", "lager.handlers", [
|
||||
{default, 10485760},
|
||||
{datatype, integer}
|
||||
{mapping, "log.rotation.size", "kernel.logger", [
|
||||
{default, "10MB"},
|
||||
{datatype, bytesize}
|
||||
]}.
|
||||
|
||||
{mapping, "log.console.count", "lager.handlers", [
|
||||
{mapping, "log.rotation.count", "kernel.logger", [
|
||||
{default, 5},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "log.info.file", "lager.handlers", [
|
||||
{mapping, "log.$level.file", "kernel.logger", [
|
||||
{datatype, file}
|
||||
]}.
|
||||
|
||||
{mapping, "log.info.size", "lager.handlers", [
|
||||
{default, 10485760},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "log.info.count", "lager.handlers", [
|
||||
{default, 5},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "log.error.file", "lager.handlers", [
|
||||
{default, "log/error.log"},
|
||||
{datatype, file}
|
||||
]}.
|
||||
|
||||
{mapping, "log.error.size", "lager.handlers", [
|
||||
{default, 10485760},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "log.error.count", "lager.handlers", [
|
||||
{default, 5},
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "log.syslog", "lager.handlers", [
|
||||
{default, off},
|
||||
{datatype, flag}
|
||||
]}.
|
||||
|
||||
{mapping, "log.syslog.identity", "lager.handlers", [
|
||||
{default, "emqx"},
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
||||
{mapping, "log.syslog.facility", "lager.handlers", [
|
||||
{default, local0},
|
||||
{datatype, {enum, [daemon, local0, local1, local2, local3, local4, local5, local6, local7]}}
|
||||
]}.
|
||||
|
||||
{mapping, "log.syslog.level", "lager.handlers", [
|
||||
{default, error},
|
||||
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}}
|
||||
]}.
|
||||
|
||||
{mapping, "log.error.redirect", "lager.error_logger_redirect", [
|
||||
{default, on},
|
||||
{datatype, flag},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [
|
||||
{default, 1000},
|
||||
{datatype, integer},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{translation,
|
||||
"lager.handlers",
|
||||
fun(Conf) ->
|
||||
ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf, undefined) of
|
||||
undefined -> [];
|
||||
ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename},
|
||||
{level, error},
|
||||
{size, cuttlefish:conf_get("log.error.size", Conf)},
|
||||
{date, "$D0"},
|
||||
{count, cuttlefish:conf_get("log.error.count", Conf)}]}]
|
||||
end,
|
||||
|
||||
InfoHandler = case cuttlefish:conf_get("log.info.file", Conf, undefined) of
|
||||
undefined -> [];
|
||||
InfoFilename -> [{lager_file_backend, [{file, InfoFilename},
|
||||
{level, info},
|
||||
{size, cuttlefish:conf_get("log.info.size", Conf)},
|
||||
{date, "$D0"},
|
||||
{count, cuttlefish:conf_get("log.info.count", Conf)}]}]
|
||||
end,
|
||||
|
||||
ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
|
||||
ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
|
||||
|
||||
ConsoleHandler = {lager_console_backend, [{level, ConsoleLogLevel}]},
|
||||
ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
|
||||
{level, ConsoleLogLevel},
|
||||
{size, cuttlefish:conf_get("log.console.size", Conf)},
|
||||
{date, "$D0"},
|
||||
{count, cuttlefish:conf_get("log.console.count", Conf)}]},
|
||||
|
||||
ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of
|
||||
off -> [];
|
||||
file -> [ConsoleFileHandler];
|
||||
console -> [ConsoleHandler];
|
||||
both -> [ConsoleHandler, ConsoleFileHandler];
|
||||
_ -> []
|
||||
end,
|
||||
SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of
|
||||
false -> [];
|
||||
true -> [{lager_syslog_backend,
|
||||
[cuttlefish:conf_get("log.syslog.identity", Conf),
|
||||
cuttlefish:conf_get("log.syslog.facility", Conf),
|
||||
cuttlefish:conf_get("log.syslog.level", Conf)]}]
|
||||
end,
|
||||
ConsoleHandlers ++ ErrorHandler ++ InfoHandler ++ SyslogHandler
|
||||
end
|
||||
}.
|
||||
|
||||
{mapping, "log.crash", "lager.crash_log", [
|
||||
{default, on},
|
||||
{datatype, flag}
|
||||
]}.
|
||||
|
||||
{mapping, "log.crash.file", "lager.crash_log", [
|
||||
{default, "log/crash.log"},
|
||||
{datatype, file}
|
||||
]}.
|
||||
|
||||
{translation,
|
||||
"lager.crash_log",
|
||||
fun(Conf) ->
|
||||
case cuttlefish:conf_get("log.crash", Conf) of
|
||||
false -> undefined;
|
||||
_ ->
|
||||
cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log")
|
||||
end
|
||||
end}.
|
||||
|
||||
{mapping, "sasl", "sasl.sasl_error_logger", [
|
||||
{default, off},
|
||||
{datatype, flag},
|
||||
hidden
|
||||
]}.
|
||||
|
||||
{translation, "kernel.logger", fun(Conf) ->
|
||||
LogTo = cuttlefish:conf_get("log.to", Conf),
|
||||
TopLogLevel = cuttlefish:conf_get("log.level", Conf),
|
||||
Formatter = {emqx_logger_formatter,
|
||||
#{template =>
|
||||
[time," [",level,"] ",
|
||||
{client_id,
|
||||
[{peername,
|
||||
[client_id,"@",peername," "],
|
||||
[client_id, " "]}],
|
||||
[]},
|
||||
msg,"\n"]}},
|
||||
FileConf = fun(Filename) ->
|
||||
#{type => wrap,
|
||||
file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename),
|
||||
max_no_files => cuttlefish:conf_get("log.rotation.count", Conf),
|
||||
max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)}
|
||||
end,
|
||||
|
||||
%% For the default logger that outputs to console
|
||||
DefaultHandler =
|
||||
if LogTo =:= console orelse LogTo =:= both ->
|
||||
[{handler, default, logger_std_h,
|
||||
#{level => TopLogLevel,
|
||||
config => #{type => standard_io},
|
||||
formatter => Formatter}}];
|
||||
true ->
|
||||
[{handler, default, undefined}]
|
||||
end,
|
||||
|
||||
%% For the file logger
|
||||
FileHandler =
|
||||
if LogTo =:= file orelse LogTo =:= both ->
|
||||
[{handler, file, logger_disk_log_h,
|
||||
#{level => TopLogLevel,
|
||||
config => FileConf(cuttlefish:conf_get("log.file", Conf)),
|
||||
formatter => Formatter,
|
||||
filesync_repeat_interval => no_repeat}}];
|
||||
true -> []
|
||||
end,
|
||||
|
||||
%% For creating additional log files for specific log levels.
|
||||
AdditionalLogFiles =
|
||||
lists:foldl(
|
||||
fun({[_, Level, _] = K, Filename}, Acc) when LogTo =:= file; LogTo =:= both ->
|
||||
case cuttlefish_variable:is_fuzzy_match(K, ["log", "$level", "file"]) of
|
||||
true -> [{Level, Filename} | Acc];
|
||||
false -> Acc
|
||||
end;
|
||||
({_K, _V}, Acc) ->
|
||||
Acc
|
||||
end, [], Conf),
|
||||
AdditionalHandlers =
|
||||
[{handler, list_to_atom("file_for_"++Level), logger_disk_log_h,
|
||||
#{level => list_to_atom(Level),
|
||||
config => FileConf(Filename),
|
||||
formatter => Formatter,
|
||||
filesync_repeat_interval => no_repeat}}
|
||||
|| {Level, Filename} <- AdditionalLogFiles],
|
||||
|
||||
DefaultHandler ++ FileHandler ++ AdditionalHandlers
|
||||
end}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Authentication/ACL
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
11
rebar.config
11
rebar.config
|
@ -1,17 +1,15 @@
|
|||
{deps, [{jsx, "2.9.0"},
|
||||
{gproc, "0.8.0"},
|
||||
{lager, "3.6.5"},
|
||||
{cowboy, "2.4.0"},
|
||||
{lager_syslog, {git, "https://github.com/basho/lager_syslog", {branch, "3.0.1"}}}
|
||||
{cowboy, "2.4.0"}
|
||||
]}.
|
||||
|
||||
%% appended to deps in rebar.config.script
|
||||
{github_emqx_deps,
|
||||
[{gen_rpc, "2.2.0"},
|
||||
{ekka, "v0.4.1"},
|
||||
[{gen_rpc, "2.3.0"},
|
||||
{ekka, "v0.5.0"},
|
||||
{clique, "develop"},
|
||||
{esockd, "v5.4.2"},
|
||||
{cuttlefish, "emqx30"}
|
||||
{cuttlefish, "v2.1.0"}
|
||||
]}.
|
||||
|
||||
{edoc_opts, [{preprocess, true}]}.
|
||||
|
@ -20,7 +18,6 @@
|
|||
warn_unused_import,
|
||||
warn_obsolete_guard,
|
||||
debug_info,
|
||||
{parse_transform, lager_transform},
|
||||
{d, 'APPLICATION', emqx}]}.
|
||||
{xref_checks, [undefined_function_calls, undefined_functions,
|
||||
locals_not_used, deprecated_function_calls,
|
||||
|
|
|
@ -3,8 +3,8 @@
|
|||
{vsn,"3.0"},
|
||||
{modules,[]},
|
||||
{registered,[emqx_sup]},
|
||||
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,
|
||||
cowboy,lager_syslog]},
|
||||
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,
|
||||
cowboy]},
|
||||
{env,[]},
|
||||
{mod,{emqx_app,[]}},
|
||||
{maintainers,["Feng Lee <feng@emqx.io>"]},
|
||||
|
|
|
@ -47,12 +47,12 @@ stop(_State) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
print_banner() ->
|
||||
io:format("Starting ~s on node ~s~n", [?APP, node()]).
|
||||
logger:info("Starting ~s on node ~s", [?APP, node()]).
|
||||
|
||||
print_vsn() ->
|
||||
{ok, Descr} = application:get_key(description),
|
||||
{ok, Vsn} = application:get_key(vsn),
|
||||
io:format("~s ~s is running now!~n", [Descr, Vsn]).
|
||||
logger:info("~s ~s is running now!", [Descr, Vsn]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Autocluster
|
||||
|
|
|
@ -323,7 +323,7 @@ format_mountpoint(Prefix) ->
|
|||
store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg ->
|
||||
[Data | Queue];
|
||||
store(memory, _Data, Queue, _MaxPendingMsg) ->
|
||||
lager:error("Beyond max pending messages"),
|
||||
logger:error("Beyond max pending messages"),
|
||||
Queue;
|
||||
store(disk, Data, Queue, _MaxPendingMsg)->
|
||||
[Data | Queue].
|
||||
|
|
|
@ -35,7 +35,7 @@ start_listener({Proto, ListenOn, Options}) ->
|
|||
{ok, _} ->
|
||||
io:format("Start mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p!",
|
||||
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p~n!",
|
||||
[Proto, format(ListenOn), Reason])
|
||||
end.
|
||||
|
||||
|
@ -119,7 +119,7 @@ stop_listener({Proto, ListenOn, Opts}) ->
|
|||
ok ->
|
||||
io:format("Stop mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]);
|
||||
{error, Reason} ->
|
||||
io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p.",
|
||||
io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.",
|
||||
[Proto, format(ListenOn), Reason])
|
||||
end.
|
||||
|
||||
|
|
|
@ -22,38 +22,47 @@
|
|||
-export([error/1, error/2, error/3]).
|
||||
-export([critical/1, critical/2, critical/3]).
|
||||
|
||||
-export([add_proc_metadata/1]).
|
||||
|
||||
debug(Msg) ->
|
||||
lager:debug(Msg).
|
||||
logger:debug(Msg).
|
||||
debug(Format, Args) ->
|
||||
lager:debug(Format, Args).
|
||||
debug(Metadata, Format, Args) when is_list(Metadata) ->
|
||||
lager:debug(Metadata, Format, Args).
|
||||
logger:debug(Format, Args).
|
||||
debug(Metadata, Format, Args) when is_map(Metadata) ->
|
||||
logger:debug(Format, Args, Metadata).
|
||||
|
||||
info(Msg) ->
|
||||
lager:info(Msg).
|
||||
logger:info(Msg).
|
||||
info(Format, Args) ->
|
||||
lager:info(Format, Args).
|
||||
info(Metadata, Format, Args) when is_list(Metadata) ->
|
||||
lager:info(Metadata, Format, Args).
|
||||
logger:info(Format, Args).
|
||||
info(Metadata, Format, Args) when is_map(Metadata) ->
|
||||
logger:info(Format, Args, Metadata).
|
||||
|
||||
warning(Msg) ->
|
||||
lager:warning(Msg).
|
||||
logger:warning(Msg).
|
||||
warning(Format, Args) ->
|
||||
lager:warning(Format, Args).
|
||||
warning(Metadata, Format, Args) when is_list(Metadata) ->
|
||||
lager:warning(Metadata, Format, Args).
|
||||
logger:warning(Format, Args).
|
||||
warning(Metadata, Format, Args) when is_map(Metadata) ->
|
||||
logger:warning(Format, Args, Metadata).
|
||||
|
||||
error(Msg) ->
|
||||
lager:error(Msg).
|
||||
logger:error(Msg).
|
||||
error(Format, Args) ->
|
||||
lager:error(Format, Args).
|
||||
error(Metadata, Format, Args) when is_list(Metadata) ->
|
||||
lager:error(Metadata, Format, Args).
|
||||
logger:error(Format, Args).
|
||||
error(Metadata, Format, Args) when is_map(Metadata) ->
|
||||
logger:error(Format, Args, Metadata).
|
||||
|
||||
critical(Msg) ->
|
||||
lager:critical(Msg).
|
||||
logger:critical(Msg).
|
||||
critical(Format, Args) ->
|
||||
lager:critical(Format, Args).
|
||||
critical(Metadata, Format, Args) when is_list(Metadata) ->
|
||||
lager:critical(Metadata, Format, Args).
|
||||
logger:critical(Format, Args).
|
||||
critical(Metadata, Format, Args) when is_map(Metadata) ->
|
||||
logger:critical(Format, Args, Metadata).
|
||||
|
||||
add_proc_metadata(Meta) ->
|
||||
case logger:get_process_metadata() of
|
||||
undefined ->
|
||||
logger:set_process_metadata(Meta);
|
||||
OldMeta ->
|
||||
logger:set_process_metadata(maps:merge(OldMeta, Meta))
|
||||
end.
|
|
@ -0,0 +1,360 @@
|
|||
%%
|
||||
%% %CopyrightBegin%
|
||||
%%
|
||||
%% Copyright Ericsson AB 2017-2018. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% %CopyrightEnd%
|
||||
%%
|
||||
|
||||
%% This file is copied from lib/kernel/src/logger_formatter.erl, and
|
||||
%% modified for a more concise time format other than the default RFC3339.
|
||||
|
||||
-module(emqx_logger_formatter).
|
||||
|
||||
-export([format/2]).
|
||||
-export([check_config/1]).
|
||||
|
||||
-define(DEFAULT_FORMAT_TEMPLATE_SINGLE, [time," ",level,": ",msg,"\n"]).
|
||||
|
||||
-define(FormatP, "~0tp").
|
||||
|
||||
-define(IS_STRING(String),
|
||||
(is_list(String) orelse is_binary(String))).
|
||||
|
||||
%%%-----------------------------------------------------------------
|
||||
%%% Types
|
||||
-type config() :: #{chars_limit => pos_integer() | unlimited,
|
||||
depth => pos_integer() | unlimited,
|
||||
max_size => pos_integer() | unlimited,
|
||||
report_cb => logger:report_cb(),
|
||||
quit => template()}.
|
||||
-type template() :: [metakey() | {metakey(),template(),template()} | string()].
|
||||
-type metakey() :: atom() | [atom()].
|
||||
|
||||
%%%-----------------------------------------------------------------
|
||||
%%% API
|
||||
-spec format(LogEvent,Config) -> unicode:chardata() when
|
||||
LogEvent :: logger:log_event(),
|
||||
Config :: config().
|
||||
format(#{level:=Level,msg:=Msg0,meta:=Meta},Config0)
|
||||
when is_map(Config0) ->
|
||||
Config = add_default_config(Config0),
|
||||
Template = maps:get(template,Config),
|
||||
{BT,AT0} = lists:splitwith(fun(msg) -> false; (_) -> true end, Template),
|
||||
{DoMsg,AT} =
|
||||
case AT0 of
|
||||
[msg|Rest] -> {true,Rest};
|
||||
_ ->{false,AT0}
|
||||
end,
|
||||
B = do_format(Level,Meta,BT,Config),
|
||||
A = do_format(Level,Meta,AT,Config),
|
||||
MsgStr =
|
||||
if DoMsg ->
|
||||
Config1 =
|
||||
case maps:get(chars_limit,Config) of
|
||||
unlimited ->
|
||||
Config;
|
||||
Size0 ->
|
||||
Size =
|
||||
case Size0 - string:length([B,A]) of
|
||||
S when S>=0 -> S;
|
||||
_ -> 0
|
||||
end,
|
||||
Config#{chars_limit=>Size}
|
||||
end,
|
||||
string:trim(format_msg(Msg0,Meta,Config1));
|
||||
true ->
|
||||
""
|
||||
end,
|
||||
truncate([B,MsgStr,A],maps:get(max_size,Config)).
|
||||
|
||||
do_format(Level,Data,[level|Format],Config) ->
|
||||
[to_string(level,Level,Config)|do_format(Level,Data,Format,Config)];
|
||||
do_format(Level,Data,[{Key,IfExist,Else}|Format],Config) ->
|
||||
String =
|
||||
case value(Key,Data) of
|
||||
{ok,Value} -> do_format(Level,Data#{Key=>Value},IfExist,Config);
|
||||
error -> do_format(Level,Data,Else,Config)
|
||||
end,
|
||||
[String|do_format(Level,Data,Format,Config)];
|
||||
do_format(Level,Data,[Key|Format],Config)
|
||||
when is_atom(Key) orelse
|
||||
(is_list(Key) andalso is_atom(hd(Key))) ->
|
||||
String =
|
||||
case value(Key,Data) of
|
||||
{ok,Value} -> to_string(Key,Value,Config);
|
||||
error -> ""
|
||||
end,
|
||||
[String|do_format(Level,Data,Format,Config)];
|
||||
do_format(Level,Data,[Str|Format],Config) ->
|
||||
[Str|do_format(Level,Data,Format,Config)];
|
||||
do_format(_Level,_Data,[],_Config) ->
|
||||
[].
|
||||
|
||||
value(Key,Meta) when is_map_key(Key,Meta) ->
|
||||
{ok,maps:get(Key,Meta)};
|
||||
value([Key|Keys],Meta) when is_map_key(Key,Meta) ->
|
||||
value(Keys,maps:get(Key,Meta));
|
||||
value([],Value) ->
|
||||
{ok,Value};
|
||||
value(_,_) ->
|
||||
error.
|
||||
|
||||
to_string(time,Time,Config) ->
|
||||
format_time(Time,Config);
|
||||
to_string(mfa,MFA,Config) ->
|
||||
format_mfa(MFA,Config);
|
||||
to_string(_,Value,Config) ->
|
||||
to_string(Value,Config).
|
||||
|
||||
to_string(X,_) when is_atom(X) ->
|
||||
atom_to_list(X);
|
||||
to_string(X,_) when is_integer(X) ->
|
||||
integer_to_list(X);
|
||||
to_string(X,_) when is_pid(X) ->
|
||||
pid_to_list(X);
|
||||
to_string(X,_) when is_reference(X) ->
|
||||
ref_to_list(X);
|
||||
to_string(X,_) when is_list(X) ->
|
||||
case printable_list(lists:flatten(X)) of
|
||||
true -> X;
|
||||
_ -> io_lib:format(?FormatP,[X])
|
||||
end;
|
||||
to_string(X,_) ->
|
||||
io_lib:format("~s",[X]).
|
||||
|
||||
printable_list([]) ->
|
||||
false;
|
||||
printable_list(X) ->
|
||||
io_lib:printable_list(X).
|
||||
|
||||
format_msg({string,Chardata},Meta,Config) ->
|
||||
format_msg({"~ts",[Chardata]},Meta,Config);
|
||||
format_msg({report,_}=Msg,Meta,#{report_cb:=Fun}=Config)
|
||||
when is_function(Fun,1); is_function(Fun,2) ->
|
||||
format_msg(Msg,Meta#{report_cb=>Fun},maps:remove(report_cb,Config));
|
||||
format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,1) ->
|
||||
try Fun(Report) of
|
||||
{Format,Args} when is_list(Format), is_list(Args) ->
|
||||
format_msg({Format,Args},maps:remove(report_cb,Meta),Config);
|
||||
Other ->
|
||||
format_msg({"REPORT_CB/1 ERROR: ~0tp; Returned: ~0tp",
|
||||
[Report,Other]},Meta,Config)
|
||||
catch C:R:S ->
|
||||
format_msg({"REPORT_CB/1 CRASH: ~0tp; Reason: ~0tp",
|
||||
[Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]},Meta,Config)
|
||||
end;
|
||||
format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,2) ->
|
||||
try Fun(Report,maps:with([depth,chars_limit,single_line],Config)) of
|
||||
Chardata when ?IS_STRING(Chardata) ->
|
||||
try chardata_to_list(Chardata) % already size limited by report_cb
|
||||
catch _:_ ->
|
||||
format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp",
|
||||
[Report,Chardata]},Meta,Config)
|
||||
end;
|
||||
Other ->
|
||||
format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp",
|
||||
[Report,Other]},Meta,Config)
|
||||
catch C:R:S ->
|
||||
format_msg({"REPORT_CB/2 CRASH: ~0tp; Reason: ~0tp",
|
||||
[Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]},
|
||||
Meta,Config)
|
||||
end;
|
||||
format_msg({report,Report},Meta,Config) ->
|
||||
format_msg({report,Report},
|
||||
Meta#{report_cb=>fun logger:format_report/1},
|
||||
Config);
|
||||
format_msg(Msg,_Meta,#{depth:=Depth,chars_limit:=CharsLimit}) ->
|
||||
Opts = chars_limit_to_opts(CharsLimit),
|
||||
do_format_msg(Msg, Depth, Opts).
|
||||
|
||||
chars_limit_to_opts(unlimited) -> [];
|
||||
chars_limit_to_opts(CharsLimit) -> [{chars_limit,CharsLimit}].
|
||||
|
||||
do_format_msg({Format0,Args},Depth,Opts) ->
|
||||
try
|
||||
Format1 = io_lib:scan_format(Format0, Args),
|
||||
Format = reformat(Format1, Depth),
|
||||
io_lib:build_text(Format,Opts)
|
||||
catch C:R:S ->
|
||||
FormatError = "FORMAT ERROR: ~0tp - ~0tp",
|
||||
case Format0 of
|
||||
FormatError ->
|
||||
%% already been here - avoid failing cyclically
|
||||
erlang:raise(C,R,S);
|
||||
_ ->
|
||||
format_msg({FormatError,[Format0,Args]},Depth,Opts)
|
||||
end
|
||||
end.
|
||||
|
||||
reformat(Format,unlimited) ->
|
||||
Format;
|
||||
reformat([#{control_char:=C}=M|T], Depth) when C =:= $p ->
|
||||
[limit_depth(M#{width => 0}, Depth)|reformat(T, Depth)];
|
||||
reformat([#{control_char:=C}=M|T], Depth) when C =:= $P ->
|
||||
[M#{width => 0}|reformat(T, Depth)];
|
||||
reformat([#{control_char:=C}=M|T], Depth) when C =:= $p; C =:= $w ->
|
||||
[limit_depth(M, Depth)|reformat(T, Depth)];
|
||||
reformat([H|T], Depth) ->
|
||||
[H|reformat(T, Depth)];
|
||||
reformat([], _) ->
|
||||
[].
|
||||
|
||||
limit_depth(M0, unlimited) ->
|
||||
M0;
|
||||
limit_depth(#{control_char:=C0, args:=Args}=M0, Depth) ->
|
||||
C = C0 - ($a - $A), %To uppercase.
|
||||
M0#{control_char:=C,args:=Args++[Depth]}.
|
||||
|
||||
chardata_to_list(Chardata) ->
|
||||
case unicode:characters_to_list(Chardata,unicode) of
|
||||
List when is_list(List) ->
|
||||
List;
|
||||
Error ->
|
||||
throw(Error)
|
||||
end.
|
||||
|
||||
truncate(String,unlimited) ->
|
||||
String;
|
||||
truncate(String,Size) ->
|
||||
Length = string:length(String),
|
||||
if Length>Size ->
|
||||
case lists:reverse(lists:flatten(String)) of
|
||||
[$\n|_] ->
|
||||
string:slice(String,0,Size-4)++"...\n";
|
||||
_ ->
|
||||
string:slice(String,0,Size-3)++"..."
|
||||
end;
|
||||
true ->
|
||||
String
|
||||
end.
|
||||
|
||||
%% Convert microseconds-timestamp into local datatime string in milliseconds
|
||||
format_time(SysTime,#{})
|
||||
when is_integer(SysTime) ->
|
||||
Ms = SysTime rem 1000000 div 1000,
|
||||
{Date, _Time = {H, Mi, S}} = calendar:system_time_to_local_time(SysTime, microsecond),
|
||||
format_time({Date, {H, Mi, S, Ms}}).
|
||||
format_time({{Y, M, D}, {H, Mi, S, Ms}}) ->
|
||||
io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]);
|
||||
format_time({{Y, M, D}, {H, Mi, S}}) ->
|
||||
io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", [Y, M, D, H, Mi, S]).
|
||||
|
||||
format_mfa({M,F,A},_) when is_atom(M), is_atom(F), is_integer(A) ->
|
||||
atom_to_list(M)++":"++atom_to_list(F)++"/"++integer_to_list(A);
|
||||
format_mfa({M,F,A},Config) when is_atom(M), is_atom(F), is_list(A) ->
|
||||
format_mfa({M,F,length(A)},Config);
|
||||
format_mfa(MFA,Config) ->
|
||||
to_string(MFA,Config).
|
||||
|
||||
%% Ensure that all valid configuration parameters exist in the final
|
||||
%% configuration map
|
||||
add_default_config(Config0) ->
|
||||
Default =
|
||||
#{chars_limit=>unlimited,
|
||||
error_logger_notice_header=>info},
|
||||
MaxSize = get_max_size(maps:get(max_size,Config0,undefined)),
|
||||
Depth = get_depth(maps:get(depth,Config0,undefined)),
|
||||
add_default_template(maps:merge(Default,Config0#{max_size=>MaxSize,
|
||||
depth=>Depth})).
|
||||
|
||||
add_default_template(#{template:=_}=Config) ->
|
||||
Config;
|
||||
add_default_template(Config) ->
|
||||
Config#{template=>?DEFAULT_FORMAT_TEMPLATE_SINGLE}.
|
||||
|
||||
get_max_size(undefined) ->
|
||||
unlimited;
|
||||
get_max_size(S) ->
|
||||
max(10,S).
|
||||
|
||||
get_depth(undefined) ->
|
||||
error_logger:get_format_depth();
|
||||
get_depth(S) ->
|
||||
max(5,S).
|
||||
|
||||
-spec check_config(Config) -> ok | {error,term()} when
|
||||
Config :: config().
|
||||
check_config(Config) when is_map(Config) ->
|
||||
do_check_config(maps:to_list(Config));
|
||||
check_config(Config) ->
|
||||
{error,{invalid_formatter_config,?MODULE,Config}}.
|
||||
|
||||
do_check_config([{Type,L}|Config]) when Type == chars_limit;
|
||||
Type == depth;
|
||||
Type == max_size ->
|
||||
case check_limit(L) of
|
||||
ok -> do_check_config(Config);
|
||||
error -> {error,{invalid_formatter_config,?MODULE,{Type,L}}}
|
||||
end;
|
||||
do_check_config([{error_logger_notice_header,ELNH}|Config]) when ELNH == info;
|
||||
ELNH == notice ->
|
||||
do_check_config(Config);
|
||||
do_check_config([{report_cb,RCB}|Config]) when is_function(RCB,1);
|
||||
is_function(RCB,2) ->
|
||||
do_check_config(Config);
|
||||
do_check_config([{template,T}|Config]) ->
|
||||
case check_template(T) of
|
||||
ok -> do_check_config(Config);
|
||||
error -> {error,{invalid_formatter_template,?MODULE,T}}
|
||||
end;
|
||||
|
||||
do_check_config([C|_]) ->
|
||||
{error,{invalid_formatter_config,?MODULE,C}};
|
||||
do_check_config([]) ->
|
||||
ok.
|
||||
|
||||
check_limit(L) when is_integer(L), L>0 ->
|
||||
ok;
|
||||
check_limit(unlimited) ->
|
||||
ok;
|
||||
check_limit(_) ->
|
||||
error.
|
||||
|
||||
check_template([Key|T]) when is_atom(Key) ->
|
||||
check_template(T);
|
||||
check_template([Key|T]) when is_list(Key), is_atom(hd(Key)) ->
|
||||
case lists:all(fun(X) when is_atom(X) -> true;
|
||||
(_) -> false
|
||||
end,
|
||||
Key) of
|
||||
true ->
|
||||
check_template(T);
|
||||
false ->
|
||||
error
|
||||
end;
|
||||
check_template([{Key,IfExist,Else}|T])
|
||||
when is_atom(Key) orelse
|
||||
(is_list(Key) andalso is_atom(hd(Key))) ->
|
||||
case check_template(IfExist) of
|
||||
ok ->
|
||||
case check_template(Else) of
|
||||
ok ->
|
||||
check_template(T);
|
||||
error ->
|
||||
error
|
||||
end;
|
||||
error ->
|
||||
error
|
||||
end;
|
||||
check_template([Str|T]) when is_list(Str) ->
|
||||
case io_lib:printable_unicode_list(Str) of
|
||||
true -> check_template(T);
|
||||
false -> error
|
||||
end;
|
||||
check_template([]) ->
|
||||
ok;
|
||||
check_template(_) ->
|
||||
error.
|
|
@ -21,7 +21,7 @@ load() ->
|
|||
lists:foreach(
|
||||
fun({Mod, Env}) ->
|
||||
ok = Mod:load(Env),
|
||||
io:format("Load ~s module successfully.~n", [Mod])
|
||||
logger:info("Load ~s module successfully.", [Mod])
|
||||
end, emqx_config:get_env(modules, [])).
|
||||
|
||||
-spec(unload() -> ok).
|
||||
|
|
|
@ -72,9 +72,8 @@
|
|||
|
||||
-define(NO_PROPS, undefined).
|
||||
|
||||
-define(LOG(Level, Format, Args, PState),
|
||||
emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format,
|
||||
[PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
|
||||
-define(LOG(Level, Format, Args, _PState),
|
||||
emqx_logger:Level("[MQTT] " ++ Format, Args)).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Init
|
||||
|
@ -82,6 +81,7 @@
|
|||
|
||||
-spec(init(map(), list()) -> state()).
|
||||
init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
|
||||
emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}),
|
||||
Zone = proplists:get_value(zone, Options),
|
||||
#pstate{zone = Zone,
|
||||
sendfun = SendFun,
|
||||
|
@ -287,7 +287,7 @@ process_packet(?CONNECT_PACKET(
|
|||
client_id = ClientId,
|
||||
username = Username,
|
||||
password = Password} = Connect), PState) ->
|
||||
|
||||
emqx_logger:add_proc_metadata(#{client_id => ClientId}),
|
||||
%% TODO: Mountpoint...
|
||||
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
|
||||
WillMsg = make_will_msg(Connect),
|
||||
|
|
|
@ -161,9 +161,8 @@
|
|||
|
||||
-define(TIMEOUT, 60000).
|
||||
|
||||
-define(LOG(Level, Format, Args, State),
|
||||
emqx_logger:Level([{client, State#state.client_id}],
|
||||
"Session(~s): " ++ Format, [State#state.client_id | Args])).
|
||||
-define(LOG(Level, Format, Args, _State),
|
||||
emqx_logger:Level("[Session] " ++ Format, Args)).
|
||||
|
||||
%% @doc Start a session proc.
|
||||
-spec(start_link(SessAttrs :: map()) -> {ok, pid()}).
|
||||
|
@ -341,6 +340,7 @@ init([Parent, #{zone := Zone,
|
|||
max_inflight := MaxInflight,
|
||||
topic_alias_maximum := TopicAliasMaximum,
|
||||
will_msg := WillMsg}]) ->
|
||||
emqx_logger:add_proc_metadata(#{client_id => ClientId}),
|
||||
process_flag(trap_exit, true),
|
||||
true = link(ConnPid),
|
||||
IdleTimout = get_env(Zone, idle_timeout, 30000),
|
||||
|
|
|
@ -25,9 +25,9 @@
|
|||
|
||||
-define(SYSMON, ?MODULE).
|
||||
-define(LOG(Msg, ProcInfo),
|
||||
emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
|
||||
emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
|
||||
-define(LOG(Msg, ProcInfo, PortInfo),
|
||||
emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
|
||||
emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
|
||||
|
||||
%% @doc Start system monitor
|
||||
-spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}).
|
||||
|
@ -130,7 +130,7 @@ handle_info({timeout, _Ref, reset}, State) ->
|
|||
{noreply, State#state{events = []}, hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
lager:error("[SYSMON] unexpected Info: ~p", [Info]),
|
||||
logger:error("[SYSMON] unexpected Info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #state{timer = TRef}) ->
|
||||
|
|
|
@ -25,12 +25,20 @@
|
|||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||
code_change/3]).
|
||||
|
||||
-record(state, {level, traces}).
|
||||
-record(state, {level, org_top_level, traces}).
|
||||
|
||||
-type(trace_who() :: {client | topic, binary()}).
|
||||
-type(trace_who() :: {client_id | topic, binary()}).
|
||||
|
||||
-define(TRACER, ?MODULE).
|
||||
-define(OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]).
|
||||
-define(FORMAT, {emqx_logger_formatter,
|
||||
#{template =>
|
||||
[time," [",level,"] ",
|
||||
{client_id,
|
||||
[{peername,
|
||||
[client_id,"@",peername," "],
|
||||
[client_id, " "]}],
|
||||
[]},
|
||||
msg,"\n"]}}).
|
||||
|
||||
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
|
||||
start_link() ->
|
||||
|
@ -40,26 +48,26 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
|
|||
%% Dont' trace '$SYS' publish
|
||||
ignore;
|
||||
trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
||||
when is_binary(From); is_atom(From) ->
|
||||
emqx_logger:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
|
||||
when is_binary(From); is_atom(From) ->
|
||||
emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Start/Stop trace
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
%% @doc Start to trace client or topic.
|
||||
%% @doc Start to trace client_id or topic.
|
||||
-spec(start_trace(trace_who(), string()) -> ok | {error, term()}).
|
||||
start_trace({client, ClientId}, LogFile) ->
|
||||
start_trace({start_trace, {client, ClientId}, LogFile});
|
||||
start_trace({client_id, ClientId}, LogFile) ->
|
||||
start_trace({start_trace, {client_id, ClientId}, LogFile});
|
||||
start_trace({topic, Topic}, LogFile) ->
|
||||
start_trace({start_trace, {topic, Topic}, LogFile}).
|
||||
|
||||
start_trace(Req) -> gen_server:call(?MODULE, Req, infinity).
|
||||
|
||||
%% @doc Stop tracing client or topic.
|
||||
%% @doc Stop tracing client_id or topic.
|
||||
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
|
||||
stop_trace({client, ClientId}) ->
|
||||
gen_server:call(?MODULE, {stop_trace, {client, ClientId}});
|
||||
stop_trace({client_id, ClientId}) ->
|
||||
gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}});
|
||||
stop_trace({topic, Topic}) ->
|
||||
gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
|
||||
|
||||
|
@ -73,37 +81,45 @@ lookup_traces() ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
{ok, #state{level = emqx_config:get_env(trace_level, debug), traces = #{}}}.
|
||||
{ok, #state{level = emqx_config:get_env(trace_level, debug),
|
||||
org_top_level = get_top_level(),
|
||||
traces = #{}}}.
|
||||
|
||||
handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) ->
|
||||
case catch lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of
|
||||
{ok, exists} ->
|
||||
{reply, {error, already_exists}, State};
|
||||
{ok, Trace} ->
|
||||
{reply, ok, State#state{traces = maps:put(Who, {Trace, LogFile}, Traces)}};
|
||||
case logger:add_handler(handler_id(Who), logger_disk_log_h,
|
||||
#{level => Level,
|
||||
formatter => ?FORMAT,
|
||||
filesync_repeat_interval => 1000,
|
||||
config => #{type => halt, file => LogFile},
|
||||
filter_default => stop,
|
||||
filters => [{meta_key_filter,
|
||||
{fun filter_by_meta_key/2, Who} }]}) of
|
||||
ok ->
|
||||
set_top_level(all), % open the top logger level to 'all'
|
||||
emqx_logger:info("[Tracer] start trace for ~p", [Who]),
|
||||
{reply, ok, State#state{traces = maps:put(Who, LogFile, Traces)}};
|
||||
{error, Reason} ->
|
||||
emqx_logger:error("[Tracer] trace error: ~p", [Reason]),
|
||||
{reply, {error, Reason}, State};
|
||||
{'EXIT', Error} ->
|
||||
emqx_logger:error("[Tracer] trace exit: ~p", [Error]),
|
||||
{reply, {error, Error}, State}
|
||||
emqx_logger:error("[Tracer] start trace for ~p failed, error: ~p", [Who, Reason]),
|
||||
{reply, {error, Reason}, State}
|
||||
end;
|
||||
|
||||
handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
|
||||
handle_call({stop_trace, Who}, _From, State = #state{org_top_level = OrgTopLevel, traces = Traces}) ->
|
||||
case maps:find(Who, Traces) of
|
||||
{ok, {Trace, _LogFile}} ->
|
||||
case lager:stop_trace(Trace) of
|
||||
ok -> ok;
|
||||
{error, Error} ->
|
||||
emqx_logger:error("[Tracer] stop trace ~p error: ~p", [Who, Error])
|
||||
{ok, _LogFile} ->
|
||||
case logger:remove_handler(handler_id(Who)) of
|
||||
ok ->
|
||||
emqx_logger:info("[Tracer] stop trace for ~p", [Who]);
|
||||
{error, Reason} ->
|
||||
emqx_logger:error("[Tracer] stop trace for ~p failed, error: ~p", [Who, Reason])
|
||||
end,
|
||||
set_top_level(OrgTopLevel), % reset the top logger level to original value
|
||||
{reply, ok, State#state{traces = maps:remove(Who, Traces)}};
|
||||
error ->
|
||||
{reply, {error, not_found}, State}
|
||||
end;
|
||||
|
||||
handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
|
||||
{reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} <- maps:to_list(Traces)], State};
|
||||
{reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Tracer] unexpected call: ~p", [Req]),
|
||||
|
@ -123,3 +139,25 @@ terminate(_Reason, _State) ->
|
|||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
handler_id({topic, Topic}) ->
|
||||
list_to_atom("topic_" ++ binary_to_list(Topic));
|
||||
handler_id({client_id, ClientId}) ->
|
||||
list_to_atom("clientid_" ++ binary_to_list(ClientId)).
|
||||
|
||||
get_top_level() ->
|
||||
#{level := OrgTopLevel} = logger:get_primary_config(),
|
||||
OrgTopLevel.
|
||||
|
||||
set_top_level(Level) ->
|
||||
logger:set_primary_config(level, Level).
|
||||
|
||||
filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
|
||||
case maps:find(MetaKey, Meta) of
|
||||
{ok, MetaValue} -> LogEvent;
|
||||
{ok, Topic} when MetaKey =:= topic ->
|
||||
case emqx_topic:match(Topic, MetaValue) of
|
||||
true -> LogEvent;
|
||||
false -> ignore
|
||||
end;
|
||||
_ -> ignore
|
||||
end.
|
|
@ -45,9 +45,8 @@
|
|||
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
||||
|
||||
-define(WSLOG(Level, Format, Args, State),
|
||||
emqx_logger:Level("MQTT/WS(~s): " ++ Format,
|
||||
[esockd_net:format(State#state.peername) | Args])).
|
||||
-define(WSLOG(Level, Format, Args, _State),
|
||||
emqx_logger:Level("[MQTT/WS] " ++ Format, Args)).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
|
|
|
@ -33,21 +33,6 @@
|
|||
{large_heap,8388608},
|
||||
{busy_port,false},
|
||||
{busy_dist_port,true}]}]},
|
||||
{sasl,[{sasl_error_logger,false}]},
|
||||
{lager,
|
||||
[{error_logger_hwm,1000},
|
||||
{error_logger_redirect,true},
|
||||
{log_dir,"{{ platform_log_dir }}"},
|
||||
{handlers,
|
||||
[{lager_console_backend,error},
|
||||
{lager_file_backend,
|
||||
[{file,"{{ platform_log_dir }}/error.log"},
|
||||
{level,error},
|
||||
{size,10485760},
|
||||
{date,"$D0"},
|
||||
{count,5}]},
|
||||
{lager_syslog_backend,["emq",local0,error]}]},
|
||||
{crash_log,"{{ platform_log_dir }}/crash.log"}]},
|
||||
{gen_rpc,
|
||||
[{socket_keepalive_count,2},
|
||||
{socket_keepalive_interval,5},
|
||||
|
|
|
@ -52,10 +52,10 @@ init(_, _WSReq) ->
|
|||
{ok, #state{}}.
|
||||
|
||||
websocket_handle(Frame, _, State = #state{waiting = undefined, buffer = Buffer}) ->
|
||||
lager:info("Client received frame~p", [Frame]),
|
||||
logger:info("Client received frame~p", [Frame]),
|
||||
{ok, State#state{buffer = [Frame|Buffer]}};
|
||||
websocket_handle(Frame, _, State = #state{waiting = From}) ->
|
||||
lager:info("Client received frame~p", [Frame]),
|
||||
logger:info("Client received frame~p", [Frame]),
|
||||
From ! Frame,
|
||||
{ok, State#state{waiting = undefined}}.
|
||||
|
||||
|
|
Loading…
Reference in New Issue