Use the new logger macros
This commit is contained in:
parent
fe0f5333b3
commit
e882af9369
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_event).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([alarm_fun/0, get_alarms/0, set_alarm/1, clear_alarm/1]).
|
||||
|
@ -84,7 +85,7 @@ handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #{alarms := Alar
|
|||
{ok, Json} ->
|
||||
emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json));
|
||||
{error, Reason} ->
|
||||
emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
|
||||
?ERROR("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
|
||||
end,
|
||||
{ok, State#{alarms := [Alarm|Alarms]}};
|
||||
|
||||
|
@ -93,23 +94,23 @@ handle_event({clear_alarm, AlarmId}, State = #{alarms := Alarms}) ->
|
|||
{ok, Json} ->
|
||||
emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json));
|
||||
{error, Reason} ->
|
||||
emqx_logger:error("[AlarmMgr] Failed to encode clear: ~p", [Reason])
|
||||
?ERROR("[AlarmMgr] Failed to encode clear: ~p", [Reason])
|
||||
end,
|
||||
{ok, State#{alarms := lists:keydelete(AlarmId, 2, Alarms)}, hibernate};
|
||||
|
||||
handle_event(Event, State)->
|
||||
emqx_logger:error("[AlarmMgr] unexpected event: ~p", [Event]),
|
||||
?ERROR("[AlarmMgr] unexpected event: ~p", [Event]),
|
||||
{ok, State}.
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[AlarmMgr] unexpected info: ~p", [Info]),
|
||||
?ERROR("[AlarmMgr] unexpected info: ~p", [Info]),
|
||||
{ok, State}.
|
||||
|
||||
handle_call(get_alarms, State = #{alarms := Alarms}) ->
|
||||
{ok, Alarms, State};
|
||||
|
||||
handle_call(Req, State) ->
|
||||
emqx_logger:error("[AlarmMgr] unexpected call: ~p", [Req]),
|
||||
?ERROR("[AlarmMgr] unexpected call: ~p", [Req]),
|
||||
{ok, ignored, State}.
|
||||
|
||||
terminate(swap, State) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
%% Mnesia bootstrap
|
||||
-export([mnesia/1]).
|
||||
|
@ -77,11 +78,11 @@ init([]) ->
|
|||
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Banned] unexpected call: ~p", [Req]),
|
||||
?ERROR("[Banned] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[Banned] unexpected msg: ~p", [Msg]),
|
||||
?ERROR("[Banned] unexpected msg: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
|
||||
|
@ -89,7 +90,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
|
|||
{noreply, ensure_expiry_timer(State), hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[Banned] unexpected info: ~p", [Info]),
|
||||
?ERROR("[Banned] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #{expiry_timer := TRef}) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/2]).
|
||||
-export([subscribe/1, subscribe/2, subscribe/3]).
|
||||
|
@ -170,7 +171,7 @@ publish(Msg) when is_record(Msg, message) ->
|
|||
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
|
||||
Delivery#delivery.results;
|
||||
{stop, _} ->
|
||||
emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]),
|
||||
?WARN("Stop publishing: ~s", [emqx_message:format(Msg)]),
|
||||
[]
|
||||
end.
|
||||
|
||||
|
@ -181,7 +182,7 @@ safe_publish(Msg) when is_record(Msg, message) ->
|
|||
publish(Msg)
|
||||
catch
|
||||
_:Error:Stacktrace ->
|
||||
emqx_logger:error("[Broker] publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
|
||||
?ERROR("[Broker] publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
|
||||
after
|
||||
ok
|
||||
end.
|
||||
|
@ -228,7 +229,7 @@ forward(Node, To, Delivery) ->
|
|||
%% rpc:call to ensure the delivery, but the latency:(
|
||||
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
|
||||
{badrpc, Reason} ->
|
||||
emqx_logger:error("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
|
||||
?ERROR("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
|
||||
Delivery;
|
||||
Delivery1 -> Delivery1
|
||||
end.
|
||||
|
@ -396,14 +397,14 @@ handle_call({subscribe, Topic, I}, _From, State) ->
|
|||
{reply, Ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
|
||||
?ERROR("[Broker] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({subscribe, Topic}, State) ->
|
||||
case emqx_router:do_add_route(Topic) of
|
||||
ok -> ok;
|
||||
{error, Reason} ->
|
||||
emqx_logger:error("[Broker] Failed to add route: ~p", [Reason])
|
||||
?ERROR("[Broker] Failed to add route: ~p", [Reason])
|
||||
end,
|
||||
{noreply, State};
|
||||
|
||||
|
@ -426,11 +427,11 @@ handle_cast({unsubscribed, Topic, I}, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[Broker] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[Broker] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[Broker] unexpected info: ~p", [Info]),
|
||||
?ERROR("[Broker] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #{pool := Pool, id := Id}) ->
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([register_sub/2]).
|
||||
-export([lookup_subid/1, lookup_subpid/1]).
|
||||
|
@ -96,7 +98,7 @@ init([]) ->
|
|||
{ok, #{pmon => emqx_pmon:new()}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[BrokerHelper] unexpected call: ~p", [Req]),
|
||||
?ERROR("[BrokerHelper] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
|
||||
|
@ -105,7 +107,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
|
|||
{noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[BrokerHelper] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[BrokerHelper] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
|
||||
|
@ -116,7 +118,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon})
|
|||
{noreply, State#{pmon := PMon1}};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[BrokerHelper] unexpected info: ~p", [Info]),
|
||||
?ERROR("[BrokerHelper] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
|
@ -140,7 +141,7 @@ init([]) ->
|
|||
{ok, #{conn_pmon => emqx_pmon:new()}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[CM] unexpected call: ~p", [Req]),
|
||||
?ERROR("[CM] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
||||
|
@ -150,7 +151,7 @@ handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) ->
|
|||
{noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[CM] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[CM] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) ->
|
||||
|
@ -161,7 +162,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}
|
|||
{noreply, State#{conn_pmon := PMon1}};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[CM] unexpected info: ~p", [Info]),
|
||||
?ERROR("[CM] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([register_command/2, register_command/3, unregister_command/1]).
|
||||
-export([run_command/1, run_command/2, lookup_command/1]).
|
||||
|
@ -63,7 +65,7 @@ run_command(Cmd, Args) when is_atom(Cmd) ->
|
|||
_ -> ok
|
||||
catch
|
||||
_:Reason:Stacktrace ->
|
||||
emqx_logger:error("[CTL] CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]),
|
||||
?ERROR("[Ctl] CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]),
|
||||
{error, Reason}
|
||||
end;
|
||||
[] ->
|
||||
|
@ -91,14 +93,14 @@ init([]) ->
|
|||
{ok, #state{seq = 0}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("unexpected call: ~p", [Req]),
|
||||
?ERROR("[Ctl] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({register_command, Cmd, MF, Opts}, State = #state{seq = Seq}) ->
|
||||
case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of
|
||||
[] -> ets:insert(?TAB, {{Seq, Cmd}, MF, Opts});
|
||||
[[OriginSeq] | _] ->
|
||||
emqx_logger:warning("[CTL] cmd ~s is overidden by ~p", [Cmd, MF]),
|
||||
?WARN("[Ctl] cmd ~s is overidden by ~p", [Cmd, MF]),
|
||||
ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts})
|
||||
end,
|
||||
noreply(next_seq(State));
|
||||
|
@ -108,11 +110,11 @@ handle_cast({unregister_command, Cmd}, State) ->
|
|||
noreply(State);
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("Unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[Ctl] unexpected cast: ~p", [Msg]),
|
||||
noreply(State).
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("unexpected info: ~p", [Info]),
|
||||
?ERROR("[Ctl] unexpected info: ~p", [Info]),
|
||||
noreply(State).
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
@ -151,3 +153,4 @@ register_command_test_() ->
|
|||
}.
|
||||
|
||||
-endif.
|
||||
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0, stop/0]).
|
||||
|
||||
%% Hooks API
|
||||
|
@ -152,7 +154,7 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat
|
|||
{reply, Reply, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Hooks] unexpected call: ~p", [Req]),
|
||||
?ERROR("[Hooks] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({del, HookPoint, Action}, State) ->
|
||||
|
@ -165,11 +167,11 @@ handle_cast({del, HookPoint, Action}, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[Hooks] unexpected msg: ~p", [Msg]),
|
||||
?ERROR("[Hooks] unexpected msg: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[Hooks] unexpected info: ~p", [Info]),
|
||||
?ERROR("[Hooks] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
-module(emqx_metrics).
|
||||
|
||||
-include("logger.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
|
@ -182,7 +183,7 @@ commit() ->
|
|||
case get('$metrics') of
|
||||
undefined -> ok;
|
||||
Metrics ->
|
||||
maps:fold(fun({Type, Metric}, Val, _Acc) ->
|
||||
maps:fold(fun({Type, Metric}, Val, _Acc) ->
|
||||
update_counter(key(Type, Metric), {2, Val})
|
||||
end, 0, Metrics),
|
||||
erase('$metrics')
|
||||
|
@ -279,9 +280,9 @@ qos_sent(?QOS_1) ->
|
|||
qos_sent(?QOS_2) ->
|
||||
inc('messages/qos2/sent').
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%%------------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%-----------------------------------------------------------------------------
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
% Create metrics table
|
||||
|
@ -290,15 +291,15 @@ init([]) ->
|
|||
{ok, #{}, hibernate}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Metrics] unexpected call: ~p", [Req]),
|
||||
?ERROR("[Metrics] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[Metrics] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[Metrics] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[Metrics] unexpected info: ~p", [Info]),
|
||||
?ERROR("[Metrics] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #{}) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
-include_lib("ekka/include/ekka.hrl").
|
||||
|
||||
%% Mnesia bootstrap
|
||||
|
@ -177,15 +178,15 @@ handle_call({delete_route, Topic, Dest}, _From, State) ->
|
|||
{reply, Ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Router] unexpected call: ~p", [Req]),
|
||||
?ERROR("[Router] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[Router] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[Router] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[Router] unexpected info: ~p", [Info]),
|
||||
?ERROR("[Router] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #{pool := Pool, id := Id}) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
%% Mnesia bootstrap
|
||||
-export([mnesia/1]).
|
||||
|
@ -94,11 +95,11 @@ init([]) ->
|
|||
{ok, #{nodes => Nodes}, hibernate}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[RouterHelper] unexpected call: ~p", [Req]),
|
||||
?ERROR("[RouterHelper] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[RouterHelper] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[RouterHelper] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) ->
|
||||
|
@ -114,7 +115,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_info({mnesia_table_event, Event}, State) ->
|
||||
emqx_logger:error("[RouterHelper] unexpected mnesia_table_event: ~p", [Event]),
|
||||
?ERROR("[RouterHelper] unexpected mnesia_table_event: ~p", [Event]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
|
||||
|
@ -132,7 +133,7 @@ handle_info({membership, _Event}, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[RouteHelper] unexpected info: ~p", [Info]),
|
||||
?ERROR("[RouteHelper] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/1]).
|
||||
-export([start_session/1, count_sessions/0]).
|
||||
|
||||
|
@ -34,8 +36,6 @@
|
|||
|
||||
-define(SUP, ?MODULE).
|
||||
-define(BATCH_EXIT, 100000).
|
||||
-define(ERROR_MSG(Format, Args),
|
||||
error_logger:error_msg("[~s] " ++ Format, [?MODULE | Args])).
|
||||
|
||||
%% @doc Start session supervisor.
|
||||
-spec(start_link(map()) -> emqx_types:startlink_ret()).
|
||||
|
@ -83,8 +83,8 @@ handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From,
|
|||
reply({error, Reason}, State)
|
||||
catch
|
||||
_:Error:Stk ->
|
||||
?ERROR_MSG("Failed to start session ~p: ~p, stacktrace:~n~p",
|
||||
[ClientId, Error, Stk]),
|
||||
?ERROR("Failed to start session ~p: ~p, stacktrace:~n~p",
|
||||
[ClientId, Error, Stk]),
|
||||
reply({error, Error}, State)
|
||||
end;
|
||||
|
||||
|
@ -92,11 +92,11 @@ handle_call(count_sessions, _From, State = #state{sessions = SessMap}) ->
|
|||
{reply, maps:size(SessMap), State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?ERROR_MSG("unexpected call: ~p", [Req]),
|
||||
?ERROR("unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?ERROR_MSG("unexpected cast: ~p", [Msg]),
|
||||
?ERROR("unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) ->
|
||||
|
@ -108,7 +108,7 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_dow
|
|||
{noreply, State#state{sessions = SessMap1}};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?ERROR_MSG("unexpected info: ~p", [Info]),
|
||||
?ERROR("unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, State) ->
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
-include("emqx.hrl").
|
||||
-include("emqx_mqtt.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
%% Mnesia bootstrap
|
||||
-export([mnesia/1]).
|
||||
|
@ -284,11 +285,11 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
|
|||
{reply, ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[SharedSub] unexpected call: ~p", [Req]),
|
||||
?ERROR("[SharedSub] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[SharedSub] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
|
||||
|
@ -303,12 +304,12 @@ handle_info({mnesia_table_event, _Event}, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
|
||||
emqx_logger:info("[SharedSub] shared subscriber down: ~p", [SubPid]),
|
||||
?INFO("[SharedSub] shared subscriber down: ~p", [SubPid]),
|
||||
cleanup_down(SubPid),
|
||||
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[SharedSub] unexpected info: ~p", [Info]),
|
||||
?ERROR("[SharedSub] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
|
||||
|
@ -90,7 +91,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
|
|||
try emqx_session:discard(SessPid, ConnPid)
|
||||
catch
|
||||
_:Error:_Stk ->
|
||||
emqx_logger:error("[SM] Failed to discard ~p: ~p", [SessPid, Error])
|
||||
?ERROR("[SM] Failed to discard ~p: ~p", [SessPid, Error])
|
||||
end
|
||||
end, lookup_session_pids(ClientId)).
|
||||
|
||||
|
@ -104,7 +105,7 @@ resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) ->
|
|||
{ok, SessPid};
|
||||
SessPids ->
|
||||
[SessPid|StalePids] = lists:reverse(SessPids),
|
||||
emqx_logger:error("[SM] More than one session found: ~p", [SessPids]),
|
||||
?ERROR("[SM] More than one session found: ~p", [SessPids]),
|
||||
lists:foreach(fun(StalePid) ->
|
||||
catch emqx_session:discard(StalePid, ConnPid)
|
||||
end, StalePids),
|
||||
|
@ -226,15 +227,15 @@ init([]) ->
|
|||
{ok, #{}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[SM] unexpected call: ~p", [Req]),
|
||||
?ERROR("[SM] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[SM] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[SM] unexpected info: ~p", [Info]),
|
||||
?ERROR("[SM] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([is_enabled/0]).
|
||||
|
@ -81,11 +82,11 @@ init([]) ->
|
|||
{ok, #{}}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Registry] unexpected call: ~p", [Req]),
|
||||
?ERROR("[Registry] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[Registry] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[Registry] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({membership, {mnesia, down, Node}}, State) ->
|
||||
|
@ -99,7 +100,7 @@ handle_info({membership, _Event}, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[Registry] unexpected info: ~p", [Info]),
|
||||
?ERROR("[Registry] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0, start_link/1, stop/0]).
|
||||
|
||||
|
@ -164,7 +165,7 @@ start_timer(#state{tick_ms = Ms} = State) ->
|
|||
handle_call(stop, _From, State) ->
|
||||
{stop, normal, _Reply = ok, State};
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Stats] unexpected call: ~p", [Req]),
|
||||
?ERROR("[Stats] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({setstat, Stat, MaxStat, Val}, State) ->
|
||||
|
@ -182,7 +183,7 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) ->
|
|||
handle_cast({update_interval, Update = #update{name = Name}}, State = #state{updates = Updates}) ->
|
||||
case lists:keyfind(Name, #update.name, Updates) of
|
||||
#update{} ->
|
||||
emqx_logger:error("[Stats]: duplicated update: ~s", [Name]),
|
||||
?ERROR("[Stats]: duplicated update: ~s", [Name]),
|
||||
{noreply, State};
|
||||
false ->
|
||||
{noreply, State#state{updates = [Update | Updates]}}
|
||||
|
@ -192,7 +193,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) ->
|
|||
{noreply, State#state{updates = lists:keydelete(Name, #update.name, Updates)}};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[Stats] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[Stats] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) ->
|
||||
|
@ -200,8 +201,9 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
|
|||
fun(Update = #update{name = Name, countdown = C, interval = I,
|
||||
func = UpFun}, Acc) when C =< 0 ->
|
||||
try UpFun()
|
||||
catch _:Error ->
|
||||
emqx_logger:error("[Stats] update ~s error: ~p", [Name, Error])
|
||||
catch
|
||||
_:Error ->
|
||||
?ERROR("[Stats] update ~s error: ~p", [Name, Error])
|
||||
end,
|
||||
[Update#update{countdown = I} | Acc];
|
||||
(Update = #update{countdown = C}, Acc) ->
|
||||
|
@ -210,7 +212,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
|
|||
{noreply, start_timer(State#state{updates = Updates1}), hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[Stats] unexpected info: ~p", [Info]),
|
||||
?ERROR("[Stats] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #state{timer = TRef}) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([version/0, uptime/0, datetime/0, sysdescr/0, sys_interval/0]).
|
||||
|
@ -98,11 +99,11 @@ handle_call(uptime, _From, State) ->
|
|||
{reply, uptime(State), State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[SYS] unexpected call: ~p", [Req]),
|
||||
?ERROR("[SYS] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[SYS] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[SYS] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
|
||||
|
@ -119,7 +120,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Versi
|
|||
{noreply, tick(State), hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[SYS] unexpected info: ~p", [Info]),
|
||||
?ERROR("[SYS] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
|
||||
-behavior(gen_server).
|
||||
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/1]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||
|
@ -24,13 +26,9 @@
|
|||
-record(state, {timer, events}).
|
||||
|
||||
-define(SYSMON, ?MODULE).
|
||||
-define(LOG(Msg, ProcInfo),
|
||||
emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
|
||||
-define(LOG(Msg, ProcInfo, PortInfo),
|
||||
emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
|
||||
|
||||
%% @doc Start system monitor
|
||||
-spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}).
|
||||
-spec(start_link(Opts :: list(tuple())) -> emqx_types:startlink_ret()).
|
||||
start_link(Opts) ->
|
||||
gen_server:start_link({local, ?SYSMON}, ?MODULE, [Opts], []).
|
||||
|
||||
|
@ -40,6 +38,7 @@ start_link(Opts) ->
|
|||
|
||||
init([Opts]) ->
|
||||
erlang:system_monitor(self(), parse_opt(Opts)),
|
||||
emqx_logger:set_proc_metadata(#{sysmon => true}),
|
||||
{ok, start_timer(#state{events = []})}.
|
||||
|
||||
start_timer(State) ->
|
||||
|
@ -71,18 +70,18 @@ parse_opt([_Opt|Opts], Acc) ->
|
|||
parse_opt(Opts, Acc).
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[SYSMON] unexpected call: ~p", [Req]),
|
||||
?ERROR("[SYSMON] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[SYSMON] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[SYSMON] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({monitor, Pid, long_gc, Info}, State) ->
|
||||
suppress({long_gc, Pid},
|
||||
fun() ->
|
||||
WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]),
|
||||
?LOG(WarnMsg, procinfo(Pid)),
|
||||
?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
|
||||
safe_publish(long_gc, WarnMsg)
|
||||
end, State);
|
||||
|
||||
|
@ -90,7 +89,7 @@ handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
|
|||
suppress({long_schedule, Pid},
|
||||
fun() ->
|
||||
WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
|
||||
?LOG(WarnMsg, procinfo(Pid)),
|
||||
?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
|
||||
safe_publish(long_schedule, WarnMsg)
|
||||
end, State);
|
||||
|
||||
|
@ -98,7 +97,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
|
|||
suppress({long_schedule, Port},
|
||||
fun() ->
|
||||
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
|
||||
?LOG(WarnMsg, erlang:port_info(Port)),
|
||||
?WARN("[SYSMON] ~s~n~p", [WarnMsg, erlang:port_info(Port)]),
|
||||
safe_publish(long_schedule, WarnMsg)
|
||||
end, State);
|
||||
|
||||
|
@ -106,7 +105,7 @@ handle_info({monitor, Pid, large_heap, Info}, State) ->
|
|||
suppress({large_heap, Pid},
|
||||
fun() ->
|
||||
WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
|
||||
?LOG(WarnMsg, procinfo(Pid)),
|
||||
?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
|
||||
safe_publish(large_heap, WarnMsg)
|
||||
end, State);
|
||||
|
||||
|
@ -114,7 +113,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
|
|||
suppress({busy_port, Port},
|
||||
fun() ->
|
||||
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
||||
?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)),
|
||||
?WARN("[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
|
||||
safe_publish(busy_port, WarnMsg)
|
||||
end, State);
|
||||
|
||||
|
@ -122,7 +121,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
|
|||
suppress({busy_dist_port, Port},
|
||||
fun() ->
|
||||
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
|
||||
?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)),
|
||||
?WARN("[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
|
||||
safe_publish(busy_dist_port, WarnMsg)
|
||||
end, State);
|
||||
|
||||
|
@ -130,7 +129,7 @@ handle_info({timeout, _Ref, reset}, State) ->
|
|||
{noreply, State#state{events = []}, hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
logger:error("[SYSMON] unexpected Info: ~p", [Info]),
|
||||
?ERROR("[SYSMON] unexpected Info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #state{timer = TRef}) ->
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([get_env/2, get_env/3]).
|
||||
|
@ -76,7 +77,7 @@ handle_call(force_reload, _From, State) ->
|
|||
{reply, ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
emqx_logger:error("[Zone] unexpected call: ~p", [Req]),
|
||||
?ERROR("[Zone] unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
||||
handle_cast({set_env, Zone, Key, Val}, State) ->
|
||||
|
@ -84,7 +85,7 @@ handle_cast({set_env, Zone, Key, Val}, State) ->
|
|||
{noreply, State};
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[Zone] unexpected cast: ~p", [Msg]),
|
||||
?ERROR("[Zone] unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(reload, State) ->
|
||||
|
@ -92,7 +93,7 @@ handle_info(reload, State) ->
|
|||
{noreply, ensure_reload_timer(State#{timer := undefined}), hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[Zone] unexpected info: ~p", [Info]),
|
||||
?ERROR("[Zone] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
|
|
Loading…
Reference in New Issue