From a9e4414d3c11e65e5ad15ce399fada6d780f7030 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 21 Apr 2015 18:10:02 +0800 Subject: [PATCH] fix issues#93, support trace. --- apps/emqttd/src/emqttd.erl | 2 +- apps/emqttd/src/emqttd_app.erl | 3 +- apps/emqttd/src/emqttd_ctl.erl | 30 ++++++++ apps/emqttd/src/emqttd_protocol.erl | 11 ++- apps/emqttd/src/emqttd_pubsub.erl | 3 +- apps/emqttd/src/emqttd_session.erl | 7 +- apps/emqttd/src/emqttd_trace.erl | 115 +++++++++++++++++++++++----- rel/files/app.config | 23 +++--- rel/files/emqttd_ctl | 32 ++++++++ 9 files changed, 187 insertions(+), 39 deletions(-) diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index f9d70fc04..f118272e8 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -68,7 +68,7 @@ open({http, Port, Options}) -> mochiweb:start_http(Port, Options, MFArgs). open(Protocol, Port, Options) -> - {ok, PktOpts} = application:get_env(emqttd, packet), + {ok, PktOpts} = application:get_env(emqttd, mqtt_packet), MFArgs = {emqttd_client, start_link, [PktOpts]}, esockd:open(Protocol, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs). diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index e7e116dd6..b798f1061 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -66,7 +66,7 @@ print_vsn() -> ?PRINT("~s ~s is running now~n", [Desc, Vsn]). start_servers(Sup) -> - {ok, SessOpts} = application:get_env(session), + {ok, SessOpts} = application:get_env(mqtt_session), {ok, PubSubOpts} = application:get_env(pubsub), {ok, BrokerOpts} = application:get_env(broker), {ok, MetricOpts} = application:get_env(metrics), @@ -74,6 +74,7 @@ start_servers(Sup) -> Servers = [ {"emqttd config", emqttd_config}, {"emqttd event", emqttd_event}, + {"emqttd trace", emqttd_trace}, {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, {"emqttd client manager", emqttd_cm}, {"emqttd session manager", emqttd_sm}, diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index 26da2246f..4fbaebd53 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -45,6 +45,7 @@ listeners/1, bridges/1, plugins/1, + trace/1, useradd/1, userdel/1]). @@ -156,6 +157,35 @@ plugins(["unload", Name]) -> {error, Reason} -> ?PRINT("error: ~s~n", [Reason]) end. +trace(["list"]) -> + lists:foreach(fun({{Who, Name}, LogFile}) -> + ?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile]) + end, emqttd_trace:all_traces()); + +trace(["client", ClientId, "off"]) -> + stop_trace(client, ClientId); +trace(["client", ClientId, LogFile]) -> + start_trace(client, ClientId, LogFile); +trace(["topic", Topic, "off"]) -> + stop_trace(topic, Topic); +trace(["topic", Topic, LogFile]) -> + start_trace(topic, Topic, LogFile). + +start_trace(Who, Name, LogFile) -> + case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of + ok -> + ?PRINT("trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error]) + end. +stop_trace(Who, Name) -> + case emqttd_trace:stop_trace({Who, bin(Name)}) of + ok -> + ?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]); + {error, Error} -> + ?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error]) + end. + node_name(SNode) -> SNode1 = case string:tokens(SNode, "@") of diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index a4d50a55e..8f595632b 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -113,7 +113,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = keep_alive = KeepAlive, clientid = ClientId} = Var, - trace(recv, Packet, State), + trace(recv, Packet, State#proto_state{clientid = ClientId}), %%TODO: fix later... State1 = State#proto_state{proto_ver = ProtoVer, username = Username, @@ -239,10 +239,12 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername {ok, State}. trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> - lager:info("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]); + lager:info([{client, ClientId}], "RECV from ~s@~s: ~s", + [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]); trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> - lager:info("SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]). + lager:info([{client, ClientId}], "SEND to ~s@~s: ~s", + [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]). %% @doc redeliver PUBREL PacketId redeliver({?PUBREL, PacketId}, State) -> @@ -251,7 +253,8 @@ redeliver({?PUBREL, PacketId}, State) -> shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> send_willmsg(ClientId, WillMsg), try_unregister(ClientId, self()), - lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, emqttd_net:format(Peername), Error]), + lager:info([{client, ClientId}], "Protocol ~s@~s Shutdown: ~p", + [ClientId, emqttd_net:format(Peername), Error]), ok. willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 5e726b54c..33193a397 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -130,7 +130,6 @@ unsubscribe(Topics = [Topic|_]) when is_binary(Topic) -> call(Req) -> Pid = gproc_pool:pick_worker(?POOL, self()), - lager:info("~p call ~p", [self(), Pid]), gen_server:call(Pid, Req, infinity). cast(Msg) -> @@ -142,7 +141,7 @@ cast(Msg) -> %%------------------------------------------------------------------------------ -spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok. publish(From, Msg=#mqtt_message{topic=Topic}) -> - lager:info("~s PUBLISH to ~s", [From, Topic]), + lager:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s", [From, Topic]), %% Retain message first. Don't create retained topic. case emqttd_msg_store:retain(Msg) of ok -> diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index a2a9a3ecd..a921efd91 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -186,7 +186,8 @@ subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topi end, SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), - lager:info("Client ~s subscribe ~p. Granted QoS: ~p", [ClientId, Topics, GrantedQos]), + lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p", + [ClientId, Topics, GrantedQos]), %%TODO: should be gen_event and notification... [emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics], {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; @@ -210,7 +211,7 @@ unsubscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, To end, %%unsubscribe from topic tree ok = emqttd_pubsub:unsubscribe(Topics), - lager:info("Client ~s unsubscribe ~p.", [ClientId, Topics]), + lager:info([{client, ClientId}], "Client ~s unsubscribe ~p.", [ClientId, Topics]), SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics), {ok, SessState#session_state{submap = SubMap1}}; @@ -288,7 +289,7 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{ awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, expire_timer = ETimer}) -> - lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]), + lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]), %cancel timeout timer erlang:cancel_timer(ETimer), diff --git a/apps/emqttd/src/emqttd_trace.erl b/apps/emqttd/src/emqttd_trace.erl index 3c25a2e3c..c057ecf09 100644 --- a/apps/emqttd/src/emqttd_trace.erl +++ b/apps/emqttd/src/emqttd_trace.erl @@ -20,32 +20,111 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd trace. +%%% Trace MQTT packets/messages by clientid or topic. %%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_trace). -%% Trace publish messages and write to file.. -%%------------------------------------------------------------------------------ -%% @doc -%% Start to trace client or topic. -%% -%% @end -%%------------------------------------------------------------------------------ -start_trace(client, ClientId) -> - ok; -start_trace(topic, Topic) -> - ok. +-author("Feng Lee "). + +-behaviour(gen_server). + +%% API Function Exports +-export([start_link/0]). + +-export([start_trace/2, stop_trace/1, all_traces/0]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {level, trace_map}). + +-type trace_who() :: {client | topic, binary()}. + +-define(TRACE_OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]). + +%%%============================================================================= +%%% API +%%%============================================================================= +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + %%------------------------------------------------------------------------------ -%% @doc -%% Stop tracing client or topic. -%% +%% @doc Start to trace client or topic. %% @end %%------------------------------------------------------------------------------ -stop_trace(client, ClientId) -> - ok; -stop_trace(topic, Topic) -> +-spec start_trace(trace_who(), string()) -> ok | {error, any()}. +start_trace({client, ClientId}, LogFile) -> + start_trace({start_trace, {client, ClientId}, LogFile}); + +start_trace({topic, Topic}, LogFile) -> + start_trace({start_trace, {topic, Topic}, LogFile}). + +start_trace(Req) -> + gen_server:call(?MODULE, Req, infinity). + +%%------------------------------------------------------------------------------ +%% @doc Stop tracing client or topic. +%% @end +%%------------------------------------------------------------------------------ +-spec stop_trace(trace_who()) -> ok | {error, any()}. +stop_trace({client, ClientId}) -> + gen_server:call(?MODULE, {stop_trace, {client, ClientId}}); +stop_trace({topic, Topic}) -> + gen_server:call(?MODULE, {stop_trace, {topic, Topic}}). + +%%------------------------------------------------------------------------------ +%% @doc Lookup all traces. +%% @end +%%------------------------------------------------------------------------------ +-spec all_traces() -> [{Who :: trace_who(), LogFile :: string()}]. +all_traces() -> + gen_server:call(?MODULE, all_traces). + +init([]) -> + {ok, #state{level = info, trace_map = #{}}}. + +handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, trace_map = TraceMap}) -> + case lager:trace_file(LogFile, [Who], Level, ?TRACE_OPTIONS) of + {ok, exists} -> + {reply, {error, existed}, State}; + {ok, Trace} -> + {reply, ok, State#state{trace_map = maps:put(Who, {Trace, LogFile}, TraceMap)}}; + {error, Error} -> + {reply, {error, Error}, State} + end; + +handle_call({stop_trace, Who}, _From, State = #state{trace_map = TraceMap}) -> + case maps:find(Who, TraceMap) of + {ok, {Trace, _LogFile}} -> + case lager:stop_trace(Trace) of + ok -> ok; + {error, Error} -> lager:error("Stop trace ~p error: ~p", [Who, Error]) + end, + {reply, ok, State#state{trace_map = maps:remove(Who, TraceMap)}}; + error -> + {reply, {error, not_found}, State} + end; + +handle_call(all_traces, _From, State = #state{trace_map = TraceMap}) -> + {reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} <- maps:to_list(TraceMap)], State}; + +handle_call(_Req, _From, State) -> + {reply, error, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + diff --git a/rel/files/app.config b/rel/files/app.config index 86ef3a181..3abc80b36 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -14,6 +14,8 @@ %{versions, ['tlsv1.2', 'tlsv1.1']} ]}, {lager, [ + {colored, true}, + {async_threshold, 1000}, {error_logger_redirect, false}, {crash_log, "log/emqttd_crash.log"}, {handlers, [ @@ -24,15 +26,15 @@ {level, info}, {size, 104857600}, {date, "$D0"}, - {count, 10} + {count, 30} ]}, {lager_file_backend, [ {formatter_config, [time, " [",severity,"] ", message, "\n"]}, {file, "log/emqttd_error.log"}, {level, error}, - {size, 10485760}, + {size, 104857600}, {date, "$D0"}, - {count, 10} + {count, 30} ]} ]} ]}, @@ -56,21 +58,21 @@ {internal, [{file, "etc/acl.config"}, {nomatch, allow}]} ]} ]}, - %% Packet - {packet, [ + %% MQTT Packet + {mqtt_packet, [ {max_clientid_len, 1024}, - {max_packet_size, 16#ffff} + {max_packet_size, 4096} ]}, - %% Session - {session, [ - {expires, 1}, %hour + %% MQTT Session + {mqtt_session, [ + {expires, 24}, %hour {max_queue, 1000}, {store_qos0, false} ]}, %% Retain messages {retained, [ {max_message_num, 100000}, - {max_playload_size, 16#ffff} + {max_playload_size, 4096} ]}, %% PubSub {pubsub, []}, @@ -91,6 +93,7 @@ {listen, [ {mqtt, 1883, [ {backlog, 512}, + %{buffer, 4096}, {acceptors, 16}, {max_clients, 1024}, {access, [{allow, all}]} diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index 4f1218525..d88f9ff03 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -271,6 +271,28 @@ case "$1" in $NODETOOL rpc emqttd_ctl listeners $@ ;; + trace) + # Make sure the local node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "emqttd is not running!" + exit 1 + fi + if [ $# -eq 2 -a $2 = "list" ]; then + $NODETOOL rpc emqttd_ctl trace list + elif [ $# -eq 4 ]; then + shift + $NODETOOL rpc emqttd_ctl trace $@ + else + echo "Usage: " + echo "$SCRIPT trace list" + echo "$SCRIPT trace client " + echo "$SCRIPT trace client off" + echo "$SCRIPT trace topic " + echo "$SCRIPT trace topic off" + exit 1 + fi + ;; *) echo "Usage: $SCRIPT" @@ -280,15 +302,25 @@ case "$1" in echo " stats #query broker statistics of clients, topics, subscribers" echo " metrics #query broker metrics" echo " cluster [] #query or cluster nodes" + echo " ----------------------------------------------------------------" echo " plugins list #query loaded plugins" echo " plugins load #load plugin" echo " plugins unload #unload plugin" + echo " ----------------------------------------------------------------" echo " bridges list #query bridges" echo " bridges start #start bridge" echo " bridges stop #stop bridge" + echo " ----------------------------------------------------------------" echo " useradd #add user" echo " userdel #delete user" + echo " ----------------------------------------------------------------" echo " listeners #query broker listeners" + echo " ----------------------------------------------------------------" + echo " trace list #query all traces" + echo " trace client #trace client with ClientId" + echo " trace client off #stop to trace client" + echo " trace topic #trace topic with Topic" + echo " trace topic off #stop to trace Topic" exit 1 ;;