From e3cd170683611cda6f17e33e076855075b8348c4 Mon Sep 17 00:00:00 2001 From: Feng Date: Sat, 5 Dec 2015 15:43:49 +0800 Subject: [PATCH] 0.14 --- src/emqttd_app.erl | 2 +- src/emqttd_bridge.erl | 13 ++++---- src/emqttd_broker.erl | 17 ++++++----- src/emqttd_client.erl | 9 ++---- src/emqttd_cm.erl | 9 ++---- src/emqttd_metrics.erl | 4 +-- src/emqttd_pubsub.erl | 9 ++---- src/emqttd_pubsub_helper.erl | 18 +++++++----- src/emqttd_retainer.erl | 12 ++++---- src/emqttd_router.erl | 48 ++++++++++++++++++++---------- src/emqttd_session.erl | 12 ++++---- src/emqttd_sm.erl | 6 ++-- src/emqttd_sm_helper.erl | 12 ++++---- src/emqttd_stats.erl | 10 ++++--- src/emqttd_sysmon.erl | 57 ++++++++++++++++++++++++------------ src/emqttd_sysmon_sup.erl | 9 +++--- src/emqttd_trace.erl | 35 ++++++++++++---------- src/emqttd_trace_sup.erl | 43 +++++++++++++++++++++++++++ 18 files changed, 201 insertions(+), 124 deletions(-) create mode 100644 src/emqttd_trace_sup.erl diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index 09b20c440..7cfb10bd0 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -64,7 +64,7 @@ print_vsn() -> start_servers(Sup) -> Servers = [{"emqttd ctl", emqttd_ctl}, - {"emqttd trace", emqttd_trace}, + {"emqttd trace", {supervisor, emqttd_trace_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd stats", emqttd_stats}, {"emqttd metrics", emqttd_metrics}, diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 98a20cd62..a51ad4cf6 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -29,6 +29,8 @@ -include("emqttd_protocol.hrl"). +-include("emqttd_internal.hrl"). + %% API Function Exports -export([start_link/3]). @@ -108,11 +110,11 @@ qname(Node, SubTopic) when is_atom(Node) -> qname(Node, SubTopic) -> list_to_binary(["Bridge:", Node, ":", SubTopic]). -handle_call(_Request, _From, State) -> - {reply, error, State}. +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). handle_info({dispatch, Msg}, State = #state{mqueue = MQ, status = down}) -> {noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}}; @@ -153,8 +155,7 @@ handle_info({'EXIT', _Pid, normal}, State) -> {noreply, State}; handle_info(Info, State) -> - lager:error("Unexpected Info: ~p", [Info]), - {noreply, State}. + ?UNEXPECTED_INFO(Info, State). terminate(_Reason, _State) -> ok. diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index a35416606..0b6c09fc3 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -29,6 +29,8 @@ -include("emqttd.hrl"). +-include("emqttd_internal.hrl"). + %% API Function Exports -export([start_link/0]). @@ -219,7 +221,7 @@ init([]) -> random:seed(os:timestamp()), ets:new(?BROKER_TAB, [set, public, named_table]), % Create $SYS Topics - emqttd_pubsub:create(<<"$SYS/brokers">>), + emqttd_pubsub:create(topic, <<"$SYS/brokers">>), [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], % Tick {ok, #state{started_at = os:timestamp(), @@ -255,11 +257,10 @@ handle_call({unhook, Hook, Name}, _From, State) -> {reply, Reply, State}; handle_call(Req, _From, State) -> - lager:error("Unexpected request: ~p", [Req]), - {reply, {error, badreq}, State}. + ?UNEXPECTED_REQ(Req, State). -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). handle_info(heartbeat, State) -> publish(uptime, list_to_binary(uptime(State))), @@ -272,8 +273,8 @@ handle_info(tick, State) -> retain(sysdescr, list_to_binary(sysdescr())), {noreply, State, hibernate}; -handle_info(_Info, State) -> - {noreply, State}. +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info, State). terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> stop_tick(Hb), @@ -288,7 +289,7 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= create_topic(Topic) -> - emqttd_pubsub:create(emqttd_topic:systop(Topic)). + emqttd_pubsub:create(topic, emqttd_topic:systop(Topic)). retain(brokers) -> Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")), diff --git a/src/emqttd_client.erl b/src/emqttd_client.erl index 216d285e7..d303fa332 100644 --- a/src/emqttd_client.erl +++ b/src/emqttd_client.erl @@ -128,8 +128,7 @@ handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; handle_call(Req, _From, State) -> - ?LOG(critical, "Unexpected request: ~p", [Req], State), - {reply, {error, unsupported_request}, State}. + ?UNEXPECTED_REQ(Req, State). handle_cast({subscribe, TopicTable}, State) -> with_session(fun(SessPid) -> @@ -142,8 +141,7 @@ handle_cast({unsubscribe, Topics}, State) -> end, State); handle_cast(Msg, State) -> - ?LOG(critical, "Unexpected msg: ~p", [Msg], State), - noreply(State). + ?UNEXPECTED_MSG(Msg, State). handle_info(timeout, State) -> shutdown(idle_timeout, State); @@ -211,8 +209,7 @@ handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> end; handle_info(Info, State) -> - ?LOG(critical, "Unexpected info: ~p", [Info], State), - noreply(State). + ?UNEXPECTED_INFO(Info, State). terminate(Reason, #client_state{connection = Connection, keepalive = KeepAlive, diff --git a/src/emqttd_cm.erl b/src/emqttd_cm.erl index 38ba067ec..9b3bffe2b 100644 --- a/src/emqttd_cm.erl +++ b/src/emqttd_cm.erl @@ -126,8 +126,7 @@ prioritise_info(_Msg, _Len, _State) -> 3. handle_call(Req, _From, State) -> - lager:error("Unexpected request: ~p", [Req]), - {reply, {error, unsupported_req}, State}. + ?UNEXPECTED_REQ(Req, State). handle_cast({register, Client = #mqtt_client{client_id = ClientId, client_pid = Pid}}, State) -> @@ -149,8 +148,7 @@ handle_cast({unregister, ClientId, Pid}, State) -> end; handle_cast(Msg, State) -> - lager:error("Unexpected Msg: ~p", [Msg]), - {noreply, State}. + ?UNEXPECTED_MSG(Msg, State). handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> case dict:find(MRef, State#state.monitors) of @@ -168,8 +166,7 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> end; handle_info(Info, State) -> - lager:error("Unexpected Info: ~p", [Info]), - {noreply, State}. + ?UNEXPECTED_INFO(Info, State). terminate(_Reason, #state{pool = Pool, id = Id}) -> ?GPROC_POOL(leave, Pool, Id), ok. diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index adcc5890b..d75985ef3 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -283,14 +283,14 @@ key(counter, Metric) -> %%%============================================================================= init([]) -> - random:seed(os:timstamp()), + random:seed(os:timestamp()), Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % Create metrics table ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), % Init metrics [create_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics - [ok = emqttd_pubsub:create(metric_topic(Topic)) || {_, Topic} <- Metrics], + [ok = emqttd_pubsub:create(topic, metric_topic(Topic)) || {_, Topic} <- Metrics], % Tick to publish metrics {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 403c2d060..2a87940fa 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -269,8 +269,7 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From, end; handle_call(Req, _From, State) -> - lager:error("Bad Request: ~p", [Req]), - {reply, {error, badreq}, State}. + ?UNEXPECTED_REQ(Req, State). handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) -> %% Delete routes first @@ -286,8 +285,7 @@ handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = St {noreply, State}; handle_cast(Msg, State) -> - lager:error("Bad Msg: ~p", [Msg]), - {noreply, State}. + ?UNEXPECTED_MSG(Msg, State). handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> Routes = ?ROUTER:lookup_routes(DownPid), @@ -300,8 +298,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> {noreply, State, hibernate}; handle_info(Info, State) -> - lager:error("Unexpected Info: ~p", [Info]), - {noreply, State}. + ?UNEXPECTED_INFO(Info, State). terminate(_Reason, #state{pool = Pool, id = Id}) -> ?GPROC_POOL(leave, Pool, Id). diff --git a/src/emqttd_pubsub_helper.erl b/src/emqttd_pubsub_helper.erl index 6c8384b39..4ae8452fb 100644 --- a/src/emqttd_pubsub_helper.erl +++ b/src/emqttd_pubsub_helper.erl @@ -29,6 +29,8 @@ -include("emqttd.hrl"). +-include("emqttd_internal.hrl"). + %% API Function Exports -export([start_link/2, aging/1]). @@ -89,8 +91,7 @@ start_tick(Secs) -> timer:send_interval(timer:seconds(Secs), {clean, aged}). handle_call(Req, _From, State) -> - lager:error("Unexpected Request: ~p", [Req]), - {reply, {error, unsupported_request}, State}. + ?UNEXPECTED_REQ(Req, State). handle_cast({aging, Topics}, State = #state{aging = Aging}) -> #aging{topics = Dict} = Aging, @@ -104,8 +105,8 @@ handle_cast({aging, Topics}, State = #state{aging = Aging}) -> end, Dict, Topics), {noreply, State#state{aging = Aging#aging{topics = Dict1}}}; -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). handle_info({clean, aged}, State = #state{aging = Aging}) -> @@ -120,6 +121,7 @@ handle_info({clean, aged}, State = #state{aging = Aging}) -> noreply(State#state{aging = NewAging}); handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> + %% mnesia master? Pattern = #mqtt_topic{_ = '_', node = Node}, F = fun() -> [mnesia:delete_object(topic, R, write) || @@ -128,8 +130,8 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> mnesia:async_dirty(F), noreply(State); -handle_info(_Info, State) -> - {noreply, State}. +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info, State). terminate(_Reason, #state{aging = #aging{tref = TRef}}) -> timer:cancel(TRef). @@ -142,7 +144,8 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= noreply(State = #state{statsfun = StatsFun}) -> - StatsFun(topic), {noreply, State, hibernate}. + StatsFun(topic), + {noreply, State, hibernate}. try_clean(ByTime, List) -> try_clean(ByTime, List, []). @@ -163,7 +166,6 @@ try_clean2(ByTime, {Topic, TS}, Left, Acc) when TS > ByTime -> try_clean2(ByTime, {Topic, _TS}, Left, Acc) -> TopicR = #mqtt_topic{topic = Topic, node = node()}, - io:format("Try to remove topic: ~p~n", [Topic]), mnesia:transaction(fun try_remove_topic/1, [TopicR]), try_clean(ByTime, Left, Acc). diff --git a/src/emqttd_retainer.erl b/src/emqttd_retainer.erl index be6bc1c48..1b033f5fd 100644 --- a/src/emqttd_retainer.erl +++ b/src/emqttd_retainer.erl @@ -33,6 +33,8 @@ -include("emqttd.hrl"). +-include("emqttd_internal.hrl"). + -include_lib("stdlib/include/ms_transform.hrl"). %% Mnesia callbacks @@ -157,12 +159,11 @@ init([]) -> stats_timer = StatsTimer, expire_timer = ExpireTimer}}. -handle_call(_Request, _From, State) -> - {reply, ok, State}. +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). handle_cast(Msg, State) -> - lager:error("Unexpected Msg: ~p", [Msg]), - {noreply, State}. + ?UNEXPECTED_MSG(Msg, State). handle_info(stats, State = #state{stats_fun = StatsFun}) -> StatsFun(mnesia:table_info(retained, size)), @@ -177,8 +178,7 @@ handle_info(expire, State = #state{expired_after = ExpiredAfter}) -> {noreply, State, hibernate}; handle_info(Info, State) -> - lager:error("Unexpected Info: ~p", [Info]), - {noreply, State}. + ?UNEXPECTED_INFO(Info, State). terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) -> timer:cancel(TRef1), diff --git a/src/emqttd_router.erl b/src/emqttd_router.erl index dd95c5cba..3d1489a5c 100644 --- a/src/emqttd_router.erl +++ b/src/emqttd_router.erl @@ -74,15 +74,17 @@ ensure_tab(Tab, Opts) -> %%------------------------------------------------------------------------------ -spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok. add_routes(TopicTable, Pid) when is_pid(Pid) -> - case lookup_routes(Pid) of - [] -> - erlang:monitor(process, Pid), - insert_routes(TopicTable, Pid); - TopicInEts -> - {NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts), - update_routes(UpdatedTopics, Pid), - insert_routes(NewTopics, Pid) - end. + with_stats(fun() -> + case lookup_routes(Pid) of + [] -> + erlang:monitor(process, Pid), + insert_routes(TopicTable, Pid); + TopicInEts -> + {NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts), + update_routes(UpdatedTopics, Pid), + insert_routes(NewTopics, Pid) + end + end). %%------------------------------------------------------------------------------ %% @doc Lookup Routes @@ -93,7 +95,7 @@ lookup_routes(Pid) when is_pid(Pid) -> [{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)]. %%------------------------------------------------------------------------------ -%% @doc Has Route +%% @doc Has Route? %% @end %%------------------------------------------------------------------------------ -spec has_route(binary()) -> boolean(). @@ -101,19 +103,23 @@ has_route(Topic) -> ets:member(route, Topic). %%------------------------------------------------------------------------------ -%% @doc Delete Routes. +%% @doc Delete Routes %% @end %%------------------------------------------------------------------------------ -spec delete_routes(list(binary()), pid()) -> ok. delete_routes(Topics, Pid) -> - Routes = [{Topic, Pid} || Topic <- Topics], - lists:foreach(fun delete_route/1, Routes). + with_stats(fun() -> + Routes = [{Topic, Pid} || Topic <- Topics], + lists:foreach(fun delete_route/1, Routes) + end). -spec delete_routes(pid()) -> ok. delete_routes(Pid) when is_pid(Pid) -> - Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup_routes(Pid)], - ets:delete(reverse_route, Pid), - lists:foreach(fun delete_route_only/1, Routes). + with_stats(fun() -> + Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup_routes(Pid)], + ets:delete(reverse_route, Pid), + lists:foreach(fun delete_route_only/1, Routes) + end). %%------------------------------------------------------------------------------ %% @doc Route Message on Local Node. @@ -199,3 +205,13 @@ delete_route({Topic, Pid}) -> delete_route_only({Topic, Pid}) -> ets:match_delete(route, {Topic, Pid, '_'}). +with_stats(Fun) -> + Ok = Fun(), setstats(), Ok. + +setstats() -> + lists:foreach(fun setstat/1, [{route, 'routes/count'}, + {reverse_route, 'routes/reverse'}]). + +setstat({Tab, Stat}) -> + emqttd_stats:setstat(Stat, ets:info(Tab, size)). + diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index bfc885ae4..07b9b0359 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -50,6 +50,8 @@ -include("emqttd_protocol.hrl"). +-include("emqttd_internal.hrl"). + -behaviour(gen_server2). %% Session API @@ -83,6 +85,7 @@ %% Last packet id of the session packet_id = 1, + %%TODO: Removed?? %% Client’s subscriptions. subscriptions :: list(), @@ -306,8 +309,7 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, end; handle_call(Req, _From, State) -> - ?LOG(critical, "Unexpected Request: ~p", [Req], State), - {reply, {error, unsupported_req}, State, hibernate}. + ?UNEXPECTED_REQ(Req, State). handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> @@ -481,8 +483,7 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp}) end; handle_cast(Msg, State) -> - ?LOG(critical, "Unexpected Msg: ~p", [Msg], State), - hibernate(State). + ?UNEXPECTED_MSG(Msg, State). %% Queue messages when client is offline handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, @@ -578,8 +579,7 @@ handle_info(expired, Session) -> shutdown(expired, Session); handle_info(Info, Session) -> - ?LOG(critical, "Unexpected info: ~p", [Info], Session), - hibernate(Session). + ?UNEXPECTED_INFO(Info, Session). terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> emqttd_sm:unregister_session(CleanSess, ClientId). diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 9783cd3e5..6c81ee33f 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -183,8 +183,7 @@ handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(Msg, State) -> - lager:error("Unexpected Msg: ~p", [Msg]), - {noreply, State}. + ?UNEXPECTED_MSG(Msg, State). %%TODO: fix this issue that index_read is really slow... handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> @@ -195,8 +194,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> {noreply, State}; handle_info(Info, State) -> - lager:error("Unexpected Info: ~p", [Info]), - {noreply, State}. + ?UNEXPECTED_INFO(Info, State). terminate(_Reason, #state{pool = Pool, id = Id}) -> ?GPROC_POOL(leave, Pool, Id). diff --git a/src/emqttd_sm_helper.erl b/src/emqttd_sm_helper.erl index 75f8abe1f..4fcf8c967 100644 --- a/src/emqttd_sm_helper.erl +++ b/src/emqttd_sm_helper.erl @@ -29,6 +29,8 @@ -include("emqttd.hrl"). +-include("emqttd_internal.hrl"). + -include_lib("stdlib/include/ms_transform.hrl"). %% API Function Exports @@ -53,12 +55,11 @@ init([StatsFun]) -> {ok, TRef} = timer:send_interval(timer:seconds(1), tick), {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. -handle_call(_Request, _From, State) -> - {reply, ok, State}. +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). handle_cast(Msg, State) -> - lager:error("Unexpected Msg: ~p", [Msg]), - {noreply, State}. + ?UNEXPECTED_MSG(Msg, State). handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> lager:error("!!!Mnesia node down: ~s", [Node]), @@ -79,8 +80,7 @@ handle_info(tick, State) -> {noreply, setstats(State), hibernate}; handle_info(Info, State) -> - lager:error("Unexpected Info: ~p", [Info]), - {noreply, State}. + ?UNEXPECTED_INFO(Info, State). terminate(_Reason, _State = #state{tick_tref = TRef}) -> timer:cancel(TRef), diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 2bafe89df..7135a2315 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -60,10 +60,12 @@ %% $SYS Topics for Subscribers -define(SYSTOP_PUBSUB, [ + 'routes/count', % ... + 'routes/reverse', % ... 'topics/count', % ... 'topics/max', % ... - 'subscribers/count', % ... - 'subscribers/max', % ... + 'subscriptions/count', % ... + 'subscriptions/max', % ... 'queues/count', % ... 'queues/max' % ... ]). @@ -138,12 +140,12 @@ setstats(Stat, MaxStat, Val) -> %%%============================================================================= init([]) -> - random:seed(now()), + random:seed(os:timestamp()), ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]), % Create $SYS Topics - [ok = emqttd_pubsub:create(stats_topic(Topic)) || Topic <- Topics], + [ok = emqttd_pubsub:create(topic, stats_topic(Topic)) || Topic <- Topics], % Tick to publish stats {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. diff --git a/src/emqttd_sysmon.erl b/src/emqttd_sysmon.erl index 7c2a1d772..6bbae1a68 100644 --- a/src/emqttd_sysmon.erl +++ b/src/emqttd_sysmon.erl @@ -27,12 +27,22 @@ -behavior(gen_server). +-include("emqttd_internal.hrl"). + -export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {tick_tref, events = []}). +-record(state, {tickref, events = [], tracelog}). + +-define(LOG_FMT, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]). + +-define(LOG(Msg, ProcInfo), + lager:warning([{sysmon, true}], "~s~n~p", [WarnMsg, ProcInfo])). + +-define(LOG(Msg, ProcInfo, PortInfo), + lager:warning([{sysmon, true}], "~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])). %%------------------------------------------------------------------------------ %% @doc Start system monitor @@ -50,7 +60,8 @@ start_link(Opts) -> init([Opts]) -> erlang:system_monitor(self(), parse_opt(Opts)), {ok, TRef} = timer:send_interval(timer:seconds(1), reset), - {ok, #state{tick_tref = TRef}}. + {ok, TraceLog} = start_tracelog(proplists:get_value(logfile, Opts)), + {ok, #state{tickref = TRef, tracelog = TraceLog}}. parse_opt(Opts) -> parse_opt(Opts, []). @@ -71,55 +82,55 @@ parse_opt([{busy_port, false}|Opts], Acc) -> parse_opt([{busy_dist_port, true}|Opts], Acc) -> parse_opt(Opts, [busy_dist_port|Acc]); parse_opt([{busy_dist_port, false}|Opts], Acc) -> + parse_opt(Opts, Acc); +parse_opt([{logfile, _}|Opts], Acc) -> parse_opt(Opts, Acc). -handle_call(Request, _From, State) -> - lager:error("Unexpected request: ~p", [Request]), - {reply, {error, unexpected_request}, State}. +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). handle_cast(Msg, State) -> - lager:error("Unexpected msg: ~p", [Msg]), - {noreply, State}. + ?UNEXPECTED_MSG(Msg, State). handle_info({monitor, Pid, long_gc, Info}, State) -> suppress({long_gc, Pid}, fun() -> - WarnMsg = io_lib:format("long_gc: pid = ~p, info: ~p", [Pid, Info]), - lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), + WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]), + ?LOG(WarnMsg, procinfo(Pid)), publish(long_gc, WarnMsg) end, State); handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> suppress({long_schedule, Pid}, fun() -> WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), - lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(WarnMsg, procinfo(Pid)), publish(long_schedule, WarnMsg) end, State); handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> suppress({long_schedule, Port}, fun() -> WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), - lager:error("~s~n~p", [WarnMsg, erlang:port_info(Port)]), + ?LOG(WarnMsg, erlang:port_info(Port)), publish(long_schedule, WarnMsg) end, State); handle_info({monitor, Pid, large_heap, Info}, State) -> suppress({large_heap, Pid}, fun() -> WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), - lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), + ?LOG(WarnMsg, procinfo(Pid)), publish(large_heap, WarnMsg) end, State); handle_info({monitor, SusPid, busy_port, Port}, State) -> suppress({busy_port, Port}, fun() -> WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)), publish(busy_port, WarnMsg) end, State); handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> suppress({busy_dist_port, Port}, fun() -> WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), - lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), + ?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)), publish(busy_dist_port, WarnMsg) end, State); @@ -127,11 +138,11 @@ handle_info(reset, State) -> {noreply, State#state{events = []}}; handle_info(Info, State) -> - lager:error("Unexpected info: ~p", [Info]), - {noreply, State}. + ?UNEXPECTED_INFO(Info, State). -terminate(_Reason, #state{tick_tref = TRef}) -> - timer:cancel(TRef). +terminate(_Reason, #state{tickref = TRef, tracelog = TraceLog}) -> + timer:cancel(TRef), + cancel_tracelog(TraceLog). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -159,3 +170,13 @@ publish(Sysmon, WarnMsg) -> topic(Sysmon) -> emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))). +start_tracelog(undefined) -> + {ok, undefined}; +start_tracelog(LogFile) -> + lager:trace_file(LogFile, [{sysmon, true}], info, ?LOG_FMT). + +cancel_tracelog(undefined) -> + ok; +cancel_tracelog(TraceLog) -> + lager:stop_trace(TraceLog). + diff --git a/src/emqttd_sysmon_sup.erl b/src/emqttd_sysmon_sup.erl index 37bbd90bf..40649596d 100644 --- a/src/emqttd_sysmon_sup.erl +++ b/src/emqttd_sysmon_sup.erl @@ -37,9 +37,8 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Env = emqttd:env(sysmon), - Sysmon = {sysmon, - {emqttd_sysmon, start_link, [Env]}, - permanent, 5000, worker, [emqttd_sysmon]}, - {ok, {{one_for_one, 10, 100}, [Sysmon]}}. + Env = emqttd:env(sysmon), + {ok, {{one_for_one, 10, 100}, + [{sysmon, {emqttd_sysmon, start_link, [Env]}, + permanent, 5000, worker, [emqttd_sysmon]}]}}. diff --git a/src/emqttd_trace.erl b/src/emqttd_trace.erl index 1416917af..9ddeb8fb5 100644 --- a/src/emqttd_trace.erl +++ b/src/emqttd_trace.erl @@ -19,7 +19,7 @@ %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% SOFTWARE. %%%----------------------------------------------------------------------------- -%%% @doc Trace MQTT packets/messages by clientid or topic. +%%% @doc Trace MQTT packets/messages by ClientID or Topic. %%% %%% @author Feng Lee %%%----------------------------------------------------------------------------- @@ -27,6 +27,8 @@ -behaviour(gen_server). +-include("emqttd_internal.hrl"). + %% API Function Exports -export([start_link/0]). @@ -36,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {level, trace_map}). +-record(state, {level, traces}). -type trace_who() :: {client | topic, binary()}. @@ -82,41 +84,42 @@ all_traces() -> gen_server:call(?MODULE, all_traces). init([]) -> - {ok, #state{level = info, trace_map = #{}}}. + {ok, #state{level = info, traces = #{}}}. -handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, trace_map = TraceMap}) -> +handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) -> case lager:trace_file(LogFile, [Who], Level, ?TRACE_OPTIONS) of {ok, exists} -> {reply, {error, existed}, State}; {ok, Trace} -> - {reply, ok, State#state{trace_map = maps:put(Who, {Trace, LogFile}, TraceMap)}}; + {reply, ok, State#state{traces = maps:put(Who, {Trace, LogFile}, Traces)}}; {error, Error} -> {reply, {error, Error}, State} end; -handle_call({stop_trace, Who}, _From, State = #state{trace_map = TraceMap}) -> - case maps:find(Who, TraceMap) of +handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) -> + case maps:find(Who, Traces) of {ok, {Trace, _LogFile}} -> case lager:stop_trace(Trace) of ok -> ok; {error, Error} -> lager:error("Stop trace ~p error: ~p", [Who, Error]) end, - {reply, ok, State#state{trace_map = maps:remove(Who, TraceMap)}}; + {reply, ok, State#state{traces = maps:remove(Who, Traces)}}; error -> {reply, {error, not_found}, State} end; -handle_call(all_traces, _From, State = #state{trace_map = TraceMap}) -> - {reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} <- maps:to_list(TraceMap)], State}; +handle_call(all_traces, _From, State = #state{traces = Traces}) -> + {reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} + <- maps:to_list(Traces)], State}; -handle_call(_Req, _From, State) -> - {reply, error, State}. +handle_call(Req, _From, State) -> + ?UNEXPECTED_REQ(Req, State). -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(Msg, State) -> + ?UNEXPECTED_MSG(Msg, State). -handle_info(_Info, State) -> - {noreply, State}. +handle_info(Info, State) -> + ?UNEXPECTED_INFO(Info, State). terminate(_Reason, _State) -> ok. diff --git a/src/emqttd_trace_sup.erl b/src/emqttd_trace_sup.erl new file mode 100644 index 000000000..e5cfd4e78 --- /dev/null +++ b/src/emqttd_trace_sup.erl @@ -0,0 +1,43 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc emqttd trace supervisor. +%%% +%%% @author Feng Lee +%%%----------------------------------------------------------------------------- +-module(emqttd_trace_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, {{one_for_one, 10, 100}, + [{trace, {emqttd_trace, start_link, []}, + permanent, 5000, worker, [emqttd_trace]}]}}. +