This commit is contained in:
Feng 2015-12-05 15:43:49 +08:00
parent fc6a49415f
commit e3cd170683
18 changed files with 201 additions and 124 deletions

View File

@ -64,7 +64,7 @@ print_vsn() ->
start_servers(Sup) -> start_servers(Sup) ->
Servers = [{"emqttd ctl", emqttd_ctl}, Servers = [{"emqttd ctl", emqttd_ctl},
{"emqttd trace", emqttd_trace}, {"emqttd trace", {supervisor, emqttd_trace_sup}},
{"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}},
{"emqttd stats", emqttd_stats}, {"emqttd stats", emqttd_stats},
{"emqttd metrics", emqttd_metrics}, {"emqttd metrics", emqttd_metrics},

View File

@ -29,6 +29,8 @@
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
-include("emqttd_internal.hrl").
%% API Function Exports %% API Function Exports
-export([start_link/3]). -export([start_link/3]).
@ -108,11 +110,11 @@ qname(Node, SubTopic) when is_atom(Node) ->
qname(Node, SubTopic) -> qname(Node, SubTopic) ->
list_to_binary(["Bridge:", Node, ":", SubTopic]). list_to_binary(["Bridge:", Node, ":", SubTopic]).
handle_call(_Request, _From, State) -> handle_call(Req, _From, State) ->
{reply, error, State}. ?UNEXPECTED_REQ(Req, State).
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
{noreply, State}. ?UNEXPECTED_MSG(Msg, State).
handle_info({dispatch, Msg}, State = #state{mqueue = MQ, status = down}) -> handle_info({dispatch, Msg}, State = #state{mqueue = MQ, status = down}) ->
{noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}}; {noreply, State#state{mqueue = emqttd_mqueue:in(Msg, MQ)}};
@ -153,8 +155,7 @@ handle_info({'EXIT', _Pid, normal}, State) ->
{noreply, State}; {noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]), ?UNEXPECTED_INFO(Info, State).
{noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ok. ok.

View File

@ -29,6 +29,8 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_internal.hrl").
%% API Function Exports %% API Function Exports
-export([start_link/0]). -export([start_link/0]).
@ -219,7 +221,7 @@ init([]) ->
random:seed(os:timestamp()), random:seed(os:timestamp()),
ets:new(?BROKER_TAB, [set, public, named_table]), ets:new(?BROKER_TAB, [set, public, named_table]),
% Create $SYS Topics % Create $SYS Topics
emqttd_pubsub:create(<<"$SYS/brokers">>), emqttd_pubsub:create(topic, <<"$SYS/brokers">>),
[ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS],
% Tick % Tick
{ok, #state{started_at = os:timestamp(), {ok, #state{started_at = os:timestamp(),
@ -255,11 +257,10 @@ handle_call({unhook, Hook, Name}, _From, State) ->
{reply, Reply, State}; {reply, Reply, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("Unexpected request: ~p", [Req]), ?UNEXPECTED_REQ(Req, State).
{reply, {error, badreq}, State}.
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
{noreply, State}. ?UNEXPECTED_MSG(Msg, State).
handle_info(heartbeat, State) -> handle_info(heartbeat, State) ->
publish(uptime, list_to_binary(uptime(State))), publish(uptime, list_to_binary(uptime(State))),
@ -272,8 +273,8 @@ handle_info(tick, State) ->
retain(sysdescr, list_to_binary(sysdescr())), retain(sysdescr, list_to_binary(sysdescr())),
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(_Info, State) -> handle_info(Info, State) ->
{noreply, State}. ?UNEXPECTED_INFO(Info, State).
terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) -> terminate(_Reason, #state{heartbeat = Hb, tick_tref = TRef}) ->
stop_tick(Hb), stop_tick(Hb),
@ -288,7 +289,7 @@ code_change(_OldVsn, State, _Extra) ->
%%%============================================================================= %%%=============================================================================
create_topic(Topic) -> create_topic(Topic) ->
emqttd_pubsub:create(emqttd_topic:systop(Topic)). emqttd_pubsub:create(topic, emqttd_topic:systop(Topic)).
retain(brokers) -> retain(brokers) ->
Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")), Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")),

View File

@ -128,8 +128,7 @@ handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State}; {stop, {shutdown, kick}, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(critical, "Unexpected request: ~p", [Req], State), ?UNEXPECTED_REQ(Req, State).
{reply, {error, unsupported_request}, State}.
handle_cast({subscribe, TopicTable}, State) -> handle_cast({subscribe, TopicTable}, State) ->
with_session(fun(SessPid) -> with_session(fun(SessPid) ->
@ -142,8 +141,7 @@ handle_cast({unsubscribe, Topics}, State) ->
end, State); end, State);
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?LOG(critical, "Unexpected msg: ~p", [Msg], State), ?UNEXPECTED_MSG(Msg, State).
noreply(State).
handle_info(timeout, State) -> handle_info(timeout, State) ->
shutdown(idle_timeout, State); shutdown(idle_timeout, State);
@ -211,8 +209,7 @@ handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
end; end;
handle_info(Info, State) -> handle_info(Info, State) ->
?LOG(critical, "Unexpected info: ~p", [Info], State), ?UNEXPECTED_INFO(Info, State).
noreply(State).
terminate(Reason, #client_state{connection = Connection, terminate(Reason, #client_state{connection = Connection,
keepalive = KeepAlive, keepalive = KeepAlive,

View File

@ -126,8 +126,7 @@ prioritise_info(_Msg, _Len, _State) ->
3. 3.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("Unexpected request: ~p", [Req]), ?UNEXPECTED_REQ(Req, State).
{reply, {error, unsupported_req}, State}.
handle_cast({register, Client = #mqtt_client{client_id = ClientId, handle_cast({register, Client = #mqtt_client{client_id = ClientId,
client_pid = Pid}}, State) -> client_pid = Pid}}, State) ->
@ -149,8 +148,7 @@ handle_cast({unregister, ClientId, Pid}, State) ->
end; end;
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("Unexpected Msg: ~p", [Msg]), ?UNEXPECTED_MSG(Msg, State).
{noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
case dict:find(MRef, State#state.monitors) of case dict:find(MRef, State#state.monitors) of
@ -168,8 +166,7 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
end; end;
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]), ?UNEXPECTED_INFO(Info, State).
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) -> terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id), ok. ?GPROC_POOL(leave, Pool, Id), ok.

View File

@ -283,14 +283,14 @@ key(counter, Metric) ->
%%%============================================================================= %%%=============================================================================
init([]) -> init([]) ->
random:seed(os:timstamp()), random:seed(os:timestamp()),
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
% Create metrics table % Create metrics table
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
% Init metrics % Init metrics
[create_metric(Metric) || Metric <- Metrics], [create_metric(Metric) || Metric <- Metrics],
% $SYS Topics for metrics % $SYS Topics for metrics
[ok = emqttd_pubsub:create(metric_topic(Topic)) || {_, Topic} <- Metrics], [ok = emqttd_pubsub:create(topic, metric_topic(Topic)) || {_, Topic} <- Metrics],
% Tick to publish metrics % Tick to publish metrics
{ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.

View File

@ -269,8 +269,7 @@ handle_call({subscribe, {SubId, SubPid}, TopicTable}, _From,
end; end;
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("Bad Request: ~p", [Req]), ?UNEXPECTED_REQ(Req, State).
{reply, {error, badreq}, State}.
handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) -> handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = StatsFun}) ->
%% Delete routes first %% Delete routes first
@ -286,8 +285,7 @@ handle_cast({unsubscribe, {SubId, SubPid}, Topics}, State = #state{statsfun = St
{noreply, State}; {noreply, State};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("Bad Msg: ~p", [Msg]), ?UNEXPECTED_MSG(Msg, State).
{noreply, State}.
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) -> handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
Routes = ?ROUTER:lookup_routes(DownPid), Routes = ?ROUTER:lookup_routes(DownPid),
@ -300,8 +298,7 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]), ?UNEXPECTED_INFO(Info, State).
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) -> terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id). ?GPROC_POOL(leave, Pool, Id).

View File

@ -29,6 +29,8 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_internal.hrl").
%% API Function Exports %% API Function Exports
-export([start_link/2, aging/1]). -export([start_link/2, aging/1]).
@ -89,8 +91,7 @@ start_tick(Secs) ->
timer:send_interval(timer:seconds(Secs), {clean, aged}). timer:send_interval(timer:seconds(Secs), {clean, aged}).
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("Unexpected Request: ~p", [Req]), ?UNEXPECTED_REQ(Req, State).
{reply, {error, unsupported_request}, State}.
handle_cast({aging, Topics}, State = #state{aging = Aging}) -> handle_cast({aging, Topics}, State = #state{aging = Aging}) ->
#aging{topics = Dict} = Aging, #aging{topics = Dict} = Aging,
@ -104,8 +105,8 @@ handle_cast({aging, Topics}, State = #state{aging = Aging}) ->
end, Dict, Topics), end, Dict, Topics),
{noreply, State#state{aging = Aging#aging{topics = Dict1}}}; {noreply, State#state{aging = Aging#aging{topics = Dict1}}};
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
{noreply, State}. ?UNEXPECTED_MSG(Msg, State).
handle_info({clean, aged}, State = #state{aging = Aging}) -> handle_info({clean, aged}, State = #state{aging = Aging}) ->
@ -120,6 +121,7 @@ handle_info({clean, aged}, State = #state{aging = Aging}) ->
noreply(State#state{aging = NewAging}); noreply(State#state{aging = NewAging});
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
%% mnesia master?
Pattern = #mqtt_topic{_ = '_', node = Node}, Pattern = #mqtt_topic{_ = '_', node = Node},
F = fun() -> F = fun() ->
[mnesia:delete_object(topic, R, write) || [mnesia:delete_object(topic, R, write) ||
@ -128,8 +130,8 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
mnesia:async_dirty(F), mnesia:async_dirty(F),
noreply(State); noreply(State);
handle_info(_Info, State) -> handle_info(Info, State) ->
{noreply, State}. ?UNEXPECTED_INFO(Info, State).
terminate(_Reason, #state{aging = #aging{tref = TRef}}) -> terminate(_Reason, #state{aging = #aging{tref = TRef}}) ->
timer:cancel(TRef). timer:cancel(TRef).
@ -142,7 +144,8 @@ code_change(_OldVsn, State, _Extra) ->
%%%============================================================================= %%%=============================================================================
noreply(State = #state{statsfun = StatsFun}) -> noreply(State = #state{statsfun = StatsFun}) ->
StatsFun(topic), {noreply, State, hibernate}. StatsFun(topic),
{noreply, State, hibernate}.
try_clean(ByTime, List) -> try_clean(ByTime, List) ->
try_clean(ByTime, List, []). try_clean(ByTime, List, []).
@ -163,7 +166,6 @@ try_clean2(ByTime, {Topic, TS}, Left, Acc) when TS > ByTime ->
try_clean2(ByTime, {Topic, _TS}, Left, Acc) -> try_clean2(ByTime, {Topic, _TS}, Left, Acc) ->
TopicR = #mqtt_topic{topic = Topic, node = node()}, TopicR = #mqtt_topic{topic = Topic, node = node()},
io:format("Try to remove topic: ~p~n", [Topic]),
mnesia:transaction(fun try_remove_topic/1, [TopicR]), mnesia:transaction(fun try_remove_topic/1, [TopicR]),
try_clean(ByTime, Left, Acc). try_clean(ByTime, Left, Acc).

View File

@ -33,6 +33,8 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_internal.hrl").
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
%% Mnesia callbacks %% Mnesia callbacks
@ -157,12 +159,11 @@ init([]) ->
stats_timer = StatsTimer, stats_timer = StatsTimer,
expire_timer = ExpireTimer}}. expire_timer = ExpireTimer}}.
handle_call(_Request, _From, State) -> handle_call(Req, _From, State) ->
{reply, ok, State}. ?UNEXPECTED_REQ(Req, State).
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("Unexpected Msg: ~p", [Msg]), ?UNEXPECTED_MSG(Msg, State).
{noreply, State}.
handle_info(stats, State = #state{stats_fun = StatsFun}) -> handle_info(stats, State = #state{stats_fun = StatsFun}) ->
StatsFun(mnesia:table_info(retained, size)), StatsFun(mnesia:table_info(retained, size)),
@ -177,8 +178,7 @@ handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]), ?UNEXPECTED_INFO(Info, State).
{noreply, State}.
terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) -> terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) ->
timer:cancel(TRef1), timer:cancel(TRef1),

View File

@ -74,15 +74,17 @@ ensure_tab(Tab, Opts) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok. -spec add_routes(list({binary(), mqtt_qos()}), pid()) -> ok.
add_routes(TopicTable, Pid) when is_pid(Pid) -> add_routes(TopicTable, Pid) when is_pid(Pid) ->
case lookup_routes(Pid) of with_stats(fun() ->
[] -> case lookup_routes(Pid) of
erlang:monitor(process, Pid), [] ->
insert_routes(TopicTable, Pid); erlang:monitor(process, Pid),
TopicInEts -> insert_routes(TopicTable, Pid);
{NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts), TopicInEts ->
update_routes(UpdatedTopics, Pid), {NewTopics, UpdatedTopics} = diff(TopicTable, TopicInEts),
insert_routes(NewTopics, Pid) update_routes(UpdatedTopics, Pid),
end. insert_routes(NewTopics, Pid)
end
end).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Lookup Routes %% @doc Lookup Routes
@ -93,7 +95,7 @@ lookup_routes(Pid) when is_pid(Pid) ->
[{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)]. [{Topic, Qos} || {_, Topic, Qos} <- ets:lookup(reverse_route, Pid)].
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Has Route %% @doc Has Route?
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec has_route(binary()) -> boolean(). -spec has_route(binary()) -> boolean().
@ -101,19 +103,23 @@ has_route(Topic) ->
ets:member(route, Topic). ets:member(route, Topic).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Delete Routes. %% @doc Delete Routes
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec delete_routes(list(binary()), pid()) -> ok. -spec delete_routes(list(binary()), pid()) -> ok.
delete_routes(Topics, Pid) -> delete_routes(Topics, Pid) ->
Routes = [{Topic, Pid} || Topic <- Topics], with_stats(fun() ->
lists:foreach(fun delete_route/1, Routes). Routes = [{Topic, Pid} || Topic <- Topics],
lists:foreach(fun delete_route/1, Routes)
end).
-spec delete_routes(pid()) -> ok. -spec delete_routes(pid()) -> ok.
delete_routes(Pid) when is_pid(Pid) -> delete_routes(Pid) when is_pid(Pid) ->
Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup_routes(Pid)], with_stats(fun() ->
ets:delete(reverse_route, Pid), Routes = [{Topic, Pid} || {Topic, _Qos} <- lookup_routes(Pid)],
lists:foreach(fun delete_route_only/1, Routes). ets:delete(reverse_route, Pid),
lists:foreach(fun delete_route_only/1, Routes)
end).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Route Message on Local Node. %% @doc Route Message on Local Node.
@ -199,3 +205,13 @@ delete_route({Topic, Pid}) ->
delete_route_only({Topic, Pid}) -> delete_route_only({Topic, Pid}) ->
ets:match_delete(route, {Topic, Pid, '_'}). ets:match_delete(route, {Topic, Pid, '_'}).
with_stats(Fun) ->
Ok = Fun(), setstats(), Ok.
setstats() ->
lists:foreach(fun setstat/1, [{route, 'routes/count'},
{reverse_route, 'routes/reverse'}]).
setstat({Tab, Stat}) ->
emqttd_stats:setstat(Stat, ets:info(Tab, size)).

View File

@ -50,6 +50,8 @@
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
-include("emqttd_internal.hrl").
-behaviour(gen_server2). -behaviour(gen_server2).
%% Session API %% Session API
@ -83,6 +85,7 @@
%% Last packet id of the session %% Last packet id of the session
packet_id = 1, packet_id = 1,
%%TODO: Removed??
%% Clients subscriptions. %% Clients subscriptions.
subscriptions :: list(), subscriptions :: list(),
@ -306,8 +309,7 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
end; end;
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?LOG(critical, "Unexpected Request: ~p", [Req], State), ?UNEXPECTED_REQ(Req, State).
{reply, {error, unsupported_req}, State, hibernate}.
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId, handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) -> subscriptions = Subscriptions}) ->
@ -481,8 +483,7 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp})
end; end;
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
?LOG(critical, "Unexpected Msg: ~p", [Msg], State), ?UNEXPECTED_MSG(Msg, State).
hibernate(State).
%% Queue messages when client is offline %% Queue messages when client is offline
handle_info({dispatch, Msg}, Session = #session{client_pid = undefined, handle_info({dispatch, Msg}, Session = #session{client_pid = undefined,
@ -578,8 +579,7 @@ handle_info(expired, Session) ->
shutdown(expired, Session); shutdown(expired, Session);
handle_info(Info, Session) -> handle_info(Info, Session) ->
?LOG(critical, "Unexpected info: ~p", [Info], Session), ?UNEXPECTED_INFO(Info, Session).
hibernate(Session).
terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) -> terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
emqttd_sm:unregister_session(CleanSess, ClientId). emqttd_sm:unregister_session(CleanSess, ClientId).

View File

@ -183,8 +183,7 @@ handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("Unexpected Msg: ~p", [Msg]), ?UNEXPECTED_MSG(Msg, State).
{noreply, State}.
%%TODO: fix this issue that index_read is really slow... %%TODO: fix this issue that index_read is really slow...
handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->
@ -195,8 +194,7 @@ handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State) ->
{noreply, State}; {noreply, State};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]), ?UNEXPECTED_INFO(Info, State).
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) -> terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id). ?GPROC_POOL(leave, Pool, Id).

View File

@ -29,6 +29,8 @@
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_internal.hrl").
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
%% API Function Exports %% API Function Exports
@ -53,12 +55,11 @@ init([StatsFun]) ->
{ok, TRef} = timer:send_interval(timer:seconds(1), tick), {ok, TRef} = timer:send_interval(timer:seconds(1), tick),
{ok, #state{stats_fun = StatsFun, tick_tref = TRef}}. {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}.
handle_call(_Request, _From, State) -> handle_call(Req, _From, State) ->
{reply, ok, State}. ?UNEXPECTED_REQ(Req, State).
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("Unexpected Msg: ~p", [Msg]), ?UNEXPECTED_MSG(Msg, State).
{noreply, State}.
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
lager:error("!!!Mnesia node down: ~s", [Node]), lager:error("!!!Mnesia node down: ~s", [Node]),
@ -79,8 +80,7 @@ handle_info(tick, State) ->
{noreply, setstats(State), hibernate}; {noreply, setstats(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected Info: ~p", [Info]), ?UNEXPECTED_INFO(Info, State).
{noreply, State}.
terminate(_Reason, _State = #state{tick_tref = TRef}) -> terminate(_Reason, _State = #state{tick_tref = TRef}) ->
timer:cancel(TRef), timer:cancel(TRef),

View File

@ -60,10 +60,12 @@
%% $SYS Topics for Subscribers %% $SYS Topics for Subscribers
-define(SYSTOP_PUBSUB, [ -define(SYSTOP_PUBSUB, [
'routes/count', % ...
'routes/reverse', % ...
'topics/count', % ... 'topics/count', % ...
'topics/max', % ... 'topics/max', % ...
'subscribers/count', % ... 'subscriptions/count', % ...
'subscribers/max', % ... 'subscriptions/max', % ...
'queues/count', % ... 'queues/count', % ...
'queues/max' % ... 'queues/max' % ...
]). ]).
@ -138,12 +140,12 @@ setstats(Stat, MaxStat, Val) ->
%%%============================================================================= %%%=============================================================================
init([]) -> init([]) ->
random:seed(now()), random:seed(os:timestamp()),
ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]),
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED,
ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]), ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]),
% Create $SYS Topics % Create $SYS Topics
[ok = emqttd_pubsub:create(stats_topic(Topic)) || Topic <- Topics], [ok = emqttd_pubsub:create(topic, stats_topic(Topic)) || Topic <- Topics],
% Tick to publish stats % Tick to publish stats
{ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}. {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.

View File

@ -27,12 +27,22 @@
-behavior(gen_server). -behavior(gen_server).
-include("emqttd_internal.hrl").
-export([start_link/1]). -export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {tick_tref, events = []}). -record(state, {tickref, events = [], tracelog}).
-define(LOG_FMT, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]).
-define(LOG(Msg, ProcInfo),
lager:warning([{sysmon, true}], "~s~n~p", [WarnMsg, ProcInfo])).
-define(LOG(Msg, ProcInfo, PortInfo),
lager:warning([{sysmon, true}], "~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Start system monitor %% @doc Start system monitor
@ -50,7 +60,8 @@ start_link(Opts) ->
init([Opts]) -> init([Opts]) ->
erlang:system_monitor(self(), parse_opt(Opts)), erlang:system_monitor(self(), parse_opt(Opts)),
{ok, TRef} = timer:send_interval(timer:seconds(1), reset), {ok, TRef} = timer:send_interval(timer:seconds(1), reset),
{ok, #state{tick_tref = TRef}}. {ok, TraceLog} = start_tracelog(proplists:get_value(logfile, Opts)),
{ok, #state{tickref = TRef, tracelog = TraceLog}}.
parse_opt(Opts) -> parse_opt(Opts) ->
parse_opt(Opts, []). parse_opt(Opts, []).
@ -71,55 +82,55 @@ parse_opt([{busy_port, false}|Opts], Acc) ->
parse_opt([{busy_dist_port, true}|Opts], Acc) -> parse_opt([{busy_dist_port, true}|Opts], Acc) ->
parse_opt(Opts, [busy_dist_port|Acc]); parse_opt(Opts, [busy_dist_port|Acc]);
parse_opt([{busy_dist_port, false}|Opts], Acc) -> parse_opt([{busy_dist_port, false}|Opts], Acc) ->
parse_opt(Opts, Acc);
parse_opt([{logfile, _}|Opts], Acc) ->
parse_opt(Opts, Acc). parse_opt(Opts, Acc).
handle_call(Request, _From, State) -> handle_call(Req, _From, State) ->
lager:error("Unexpected request: ~p", [Request]), ?UNEXPECTED_REQ(Req, State).
{reply, {error, unexpected_request}, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("Unexpected msg: ~p", [Msg]), ?UNEXPECTED_MSG(Msg, State).
{noreply, State}.
handle_info({monitor, Pid, long_gc, Info}, State) -> handle_info({monitor, Pid, long_gc, Info}, State) ->
suppress({long_gc, Pid}, fun() -> suppress({long_gc, Pid}, fun() ->
WarnMsg = io_lib:format("long_gc: pid = ~p, info: ~p", [Pid, Info]), WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]),
lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), ?LOG(WarnMsg, procinfo(Pid)),
publish(long_gc, WarnMsg) publish(long_gc, WarnMsg)
end, State); end, State);
handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) -> handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
suppress({long_schedule, Pid}, fun() -> suppress({long_schedule, Pid}, fun() ->
WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]), WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), ?LOG(WarnMsg, procinfo(Pid)),
publish(long_schedule, WarnMsg) publish(long_schedule, WarnMsg)
end, State); end, State);
handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) -> handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
suppress({long_schedule, Port}, fun() -> suppress({long_schedule, Port}, fun() ->
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]), WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
lager:error("~s~n~p", [WarnMsg, erlang:port_info(Port)]), ?LOG(WarnMsg, erlang:port_info(Port)),
publish(long_schedule, WarnMsg) publish(long_schedule, WarnMsg)
end, State); end, State);
handle_info({monitor, Pid, large_heap, Info}, State) -> handle_info({monitor, Pid, large_heap, Info}, State) ->
suppress({large_heap, Pid}, fun() -> suppress({large_heap, Pid}, fun() ->
WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]), WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
lager:error("~s~n~p", [WarnMsg, procinfo(Pid)]), ?LOG(WarnMsg, procinfo(Pid)),
publish(large_heap, WarnMsg) publish(large_heap, WarnMsg)
end, State); end, State);
handle_info({monitor, SusPid, busy_port, Port}, State) -> handle_info({monitor, SusPid, busy_port, Port}, State) ->
suppress({busy_port, Port}, fun() -> suppress({busy_port, Port}, fun() ->
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]), WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), ?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)),
publish(busy_port, WarnMsg) publish(busy_port, WarnMsg)
end, State); end, State);
handle_info({monitor, SusPid, busy_dist_port, Port}, State) -> handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
suppress({busy_dist_port, Port}, fun() -> suppress({busy_dist_port, Port}, fun() ->
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]), WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
lager:error("~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]), ?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)),
publish(busy_dist_port, WarnMsg) publish(busy_dist_port, WarnMsg)
end, State); end, State);
@ -127,11 +138,11 @@ handle_info(reset, State) ->
{noreply, State#state{events = []}}; {noreply, State#state{events = []}};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("Unexpected info: ~p", [Info]), ?UNEXPECTED_INFO(Info, State).
{noreply, State}.
terminate(_Reason, #state{tick_tref = TRef}) -> terminate(_Reason, #state{tickref = TRef, tracelog = TraceLog}) ->
timer:cancel(TRef). timer:cancel(TRef),
cancel_tracelog(TraceLog).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -159,3 +170,13 @@ publish(Sysmon, WarnMsg) ->
topic(Sysmon) -> topic(Sysmon) ->
emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))). emqttd_topic:systop(list_to_binary(lists:concat(['sysmon/', Sysmon]))).
start_tracelog(undefined) ->
{ok, undefined};
start_tracelog(LogFile) ->
lager:trace_file(LogFile, [{sysmon, true}], info, ?LOG_FMT).
cancel_tracelog(undefined) ->
ok;
cancel_tracelog(TraceLog) ->
lager:stop_trace(TraceLog).

View File

@ -37,9 +37,8 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
Env = emqttd:env(sysmon), Env = emqttd:env(sysmon),
Sysmon = {sysmon, {ok, {{one_for_one, 10, 100},
{emqttd_sysmon, start_link, [Env]}, [{sysmon, {emqttd_sysmon, start_link, [Env]},
permanent, 5000, worker, [emqttd_sysmon]}, permanent, 5000, worker, [emqttd_sysmon]}]}}.
{ok, {{one_for_one, 10, 100}, [Sysmon]}}.

View File

@ -19,7 +19,7 @@
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE %%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE. %%% SOFTWARE.
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
%%% @doc Trace MQTT packets/messages by clientid or topic. %%% @doc Trace MQTT packets/messages by ClientID or Topic.
%%% %%%
%%% @author Feng Lee <feng@emqtt.io> %%% @author Feng Lee <feng@emqtt.io>
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
@ -27,6 +27,8 @@
-behaviour(gen_server). -behaviour(gen_server).
-include("emqttd_internal.hrl").
%% API Function Exports %% API Function Exports
-export([start_link/0]). -export([start_link/0]).
@ -36,7 +38,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {level, trace_map}). -record(state, {level, traces}).
-type trace_who() :: {client | topic, binary()}. -type trace_who() :: {client | topic, binary()}.
@ -82,41 +84,42 @@ all_traces() ->
gen_server:call(?MODULE, all_traces). gen_server:call(?MODULE, all_traces).
init([]) -> init([]) ->
{ok, #state{level = info, trace_map = #{}}}. {ok, #state{level = info, traces = #{}}}.
handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, trace_map = TraceMap}) -> handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) ->
case lager:trace_file(LogFile, [Who], Level, ?TRACE_OPTIONS) of case lager:trace_file(LogFile, [Who], Level, ?TRACE_OPTIONS) of
{ok, exists} -> {ok, exists} ->
{reply, {error, existed}, State}; {reply, {error, existed}, State};
{ok, Trace} -> {ok, Trace} ->
{reply, ok, State#state{trace_map = maps:put(Who, {Trace, LogFile}, TraceMap)}}; {reply, ok, State#state{traces = maps:put(Who, {Trace, LogFile}, Traces)}};
{error, Error} -> {error, Error} ->
{reply, {error, Error}, State} {reply, {error, Error}, State}
end; end;
handle_call({stop_trace, Who}, _From, State = #state{trace_map = TraceMap}) -> handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
case maps:find(Who, TraceMap) of case maps:find(Who, Traces) of
{ok, {Trace, _LogFile}} -> {ok, {Trace, _LogFile}} ->
case lager:stop_trace(Trace) of case lager:stop_trace(Trace) of
ok -> ok; ok -> ok;
{error, Error} -> lager:error("Stop trace ~p error: ~p", [Who, Error]) {error, Error} -> lager:error("Stop trace ~p error: ~p", [Who, Error])
end, end,
{reply, ok, State#state{trace_map = maps:remove(Who, TraceMap)}}; {reply, ok, State#state{traces = maps:remove(Who, Traces)}};
error -> error ->
{reply, {error, not_found}, State} {reply, {error, not_found}, State}
end; end;
handle_call(all_traces, _From, State = #state{trace_map = TraceMap}) -> handle_call(all_traces, _From, State = #state{traces = Traces}) ->
{reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} <- maps:to_list(TraceMap)], State}; {reply, [{Who, LogFile} || {Who, {_Trace, LogFile}}
<- maps:to_list(Traces)], State};
handle_call(_Req, _From, State) -> handle_call(Req, _From, State) ->
{reply, error, State}. ?UNEXPECTED_REQ(Req, State).
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
{noreply, State}. ?UNEXPECTED_MSG(Msg, State).
handle_info(_Info, State) -> handle_info(Info, State) ->
{noreply, State}. ?UNEXPECTED_INFO(Info, State).
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ok. ok.

43
src/emqttd_trace_sup.erl Normal file
View File

@ -0,0 +1,43 @@
%%%-----------------------------------------------------------------------------
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
%%%
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
%%% of this software and associated documentation files (the "Software"), to deal
%%% in the Software without restriction, including without limitation the rights
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%%% copies of the Software, and to permit persons to whom the Software is
%%% furnished to do so, subject to the following conditions:
%%%
%%% The above copyright notice and this permission notice shall be included in all
%%% copies or substantial portions of the Software.
%%%
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc emqttd trace supervisor.
%%%
%%% @author Feng Lee <feng@emqtt.io>
%%%-----------------------------------------------------------------------------
-module(emqttd_trace_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, {{one_for_one, 10, 100},
[{trace, {emqttd_trace, start_link, []},
permanent, 5000, worker, [emqttd_trace]}]}}.