fix issues#93, support trace.

This commit is contained in:
Feng Lee 2015-04-21 18:10:02 +08:00
parent 6907d4feed
commit a9e4414d3c
9 changed files with 187 additions and 39 deletions

View File

@ -68,7 +68,7 @@ open({http, Port, Options}) ->
mochiweb:start_http(Port, Options, MFArgs).
open(Protocol, Port, Options) ->
{ok, PktOpts} = application:get_env(emqttd, packet),
{ok, PktOpts} = application:get_env(emqttd, mqtt_packet),
MFArgs = {emqttd_client, start_link, [PktOpts]},
esockd:open(Protocol, Port, emqttd_opts:merge(?MQTT_SOCKOPTS, Options) , MFArgs).

View File

@ -66,7 +66,7 @@ print_vsn() ->
?PRINT("~s ~s is running now~n", [Desc, Vsn]).
start_servers(Sup) ->
{ok, SessOpts} = application:get_env(session),
{ok, SessOpts} = application:get_env(mqtt_session),
{ok, PubSubOpts} = application:get_env(pubsub),
{ok, BrokerOpts} = application:get_env(broker),
{ok, MetricOpts} = application:get_env(metrics),
@ -74,6 +74,7 @@ start_servers(Sup) ->
Servers = [
{"emqttd config", emqttd_config},
{"emqttd event", emqttd_event},
{"emqttd trace", emqttd_trace},
{"emqttd pooler", {supervisor, emqttd_pooler_sup}},
{"emqttd client manager", emqttd_cm},
{"emqttd session manager", emqttd_sm},

View File

@ -45,6 +45,7 @@
listeners/1,
bridges/1,
plugins/1,
trace/1,
useradd/1,
userdel/1]).
@ -156,6 +157,35 @@ plugins(["unload", Name]) ->
{error, Reason} -> ?PRINT("error: ~s~n", [Reason])
end.
trace(["list"]) ->
lists:foreach(fun({{Who, Name}, LogFile}) ->
?PRINT("trace ~s ~s -> ~s~n", [Who, Name, LogFile])
end, emqttd_trace:all_traces());
trace(["client", ClientId, "off"]) ->
stop_trace(client, ClientId);
trace(["client", ClientId, LogFile]) ->
start_trace(client, ClientId, LogFile);
trace(["topic", Topic, "off"]) ->
stop_trace(topic, Topic);
trace(["topic", Topic, LogFile]) ->
start_trace(topic, Topic, LogFile).
start_trace(Who, Name, LogFile) ->
case emqttd_trace:start_trace({Who, bin(Name)}, LogFile) of
ok ->
?PRINT("trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
?PRINT("trace ~s ~s error: ~p~n", [Who, Name, Error])
end.
stop_trace(Who, Name) ->
case emqttd_trace:stop_trace({Who, bin(Name)}) of
ok ->
?PRINT("stop to trace ~s ~s successfully.~n", [Who, Name]);
{error, Error} ->
?PRINT("stop to trace ~s ~s error: ~p.~n", [Who, Name, Error])
end.
node_name(SNode) ->
SNode1 =
case string:tokens(SNode, "@") of

View File

@ -113,7 +113,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
keep_alive = KeepAlive,
clientid = ClientId} = Var,
trace(recv, Packet, State),
trace(recv, Packet, State#proto_state{clientid = ClientId}), %%TODO: fix later...
State1 = State#proto_state{proto_ver = ProtoVer,
username = Username,
@ -239,10 +239,12 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername
{ok, State}.
trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
lager:info("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]);
lager:info([{client, ClientId}], "RECV from ~s@~s: ~s",
[ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]);
trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
lager:info("SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]).
lager:info([{client, ClientId}], "SEND to ~s@~s: ~s",
[ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]).
%% @doc redeliver PUBREL PacketId
redeliver({?PUBREL, PacketId}, State) ->
@ -251,7 +253,8 @@ redeliver({?PUBREL, PacketId}, State) ->
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->
send_willmsg(ClientId, WillMsg),
try_unregister(ClientId, self()),
lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, emqttd_net:format(Peername), Error]),
lager:info([{client, ClientId}], "Protocol ~s@~s Shutdown: ~p",
[ClientId, emqttd_net:format(Peername), Error]),
ok.
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->

View File

@ -130,7 +130,6 @@ unsubscribe(Topics = [Topic|_]) when is_binary(Topic) ->
call(Req) ->
Pid = gproc_pool:pick_worker(?POOL, self()),
lager:info("~p call ~p", [self(), Pid]),
gen_server:call(Pid, Req, infinity).
cast(Msg) ->
@ -142,7 +141,7 @@ cast(Msg) ->
%%------------------------------------------------------------------------------
-spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok.
publish(From, Msg=#mqtt_message{topic=Topic}) ->
lager:info("~s PUBLISH to ~s", [From, Topic]),
lager:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s", [From, Topic]),
%% Retain message first. Don't create retained topic.
case emqttd_msg_store:retain(Msg) of
ok ->

View File

@ -186,7 +186,8 @@ subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topi
end,
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
lager:info("Client ~s subscribe ~p. Granted QoS: ~p", [ClientId, Topics, GrantedQos]),
lager:info([{client, ClientId}], "Client ~s subscribe ~p. Granted QoS: ~p",
[ClientId, Topics, GrantedQos]),
%%TODO: should be gen_event and notification...
[emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics],
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
@ -210,7 +211,7 @@ unsubscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, To
end,
%%unsubscribe from topic tree
ok = emqttd_pubsub:unsubscribe(Topics),
lager:info("Client ~s unsubscribe ~p.", [ClientId, Topics]),
lager:info([{client, ClientId}], "Client ~s unsubscribe ~p.", [ClientId, Topics]),
SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics),
{ok, SessState#session_state{submap = SubMap1}};
@ -288,7 +289,7 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{
awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp,
expire_timer = ETimer}) ->
lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]),
lager:info([{client, ClientId}], "Session ~s resumed by ~p",[ClientId, ClientPid]),
%cancel timeout timer
erlang:cancel_timer(ETimer),

View File

@ -20,32 +20,111 @@
%%% SOFTWARE.
%%%-----------------------------------------------------------------------------
%%% @doc
%%% emqttd trace.
%%% Trace MQTT packets/messages by clientid or topic.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_trace).
%% Trace publish messages and write to file..
%%------------------------------------------------------------------------------
%% @doc
%% Start to trace client or topic.
%%
%% @end
%%------------------------------------------------------------------------------
start_trace(client, ClientId) ->
ok;
start_trace(topic, Topic) ->
ok.
-author("Feng Lee <feng@emqtt.io>").
-behaviour(gen_server).
%% API Function Exports
-export([start_link/0]).
-export([start_trace/2, stop_trace/1, all_traces/0]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {level, trace_map}).
-type trace_who() :: {client | topic, binary()}.
-define(TRACE_OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]).
%%%=============================================================================
%%% API
%%%=============================================================================
-spec start_link() -> {ok, pid()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%------------------------------------------------------------------------------
%% @doc
%% Stop tracing client or topic.
%%
%% @doc Start to trace client or topic.
%% @end
%%------------------------------------------------------------------------------
stop_trace(client, ClientId) ->
ok;
stop_trace(topic, Topic) ->
-spec start_trace(trace_who(), string()) -> ok | {error, any()}.
start_trace({client, ClientId}, LogFile) ->
start_trace({start_trace, {client, ClientId}, LogFile});
start_trace({topic, Topic}, LogFile) ->
start_trace({start_trace, {topic, Topic}, LogFile}).
start_trace(Req) ->
gen_server:call(?MODULE, Req, infinity).
%%------------------------------------------------------------------------------
%% @doc Stop tracing client or topic.
%% @end
%%------------------------------------------------------------------------------
-spec stop_trace(trace_who()) -> ok | {error, any()}.
stop_trace({client, ClientId}) ->
gen_server:call(?MODULE, {stop_trace, {client, ClientId}});
stop_trace({topic, Topic}) ->
gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
%%------------------------------------------------------------------------------
%% @doc Lookup all traces.
%% @end
%%------------------------------------------------------------------------------
-spec all_traces() -> [{Who :: trace_who(), LogFile :: string()}].
all_traces() ->
gen_server:call(?MODULE, all_traces).
init([]) ->
{ok, #state{level = info, trace_map = #{}}}.
handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, trace_map = TraceMap}) ->
case lager:trace_file(LogFile, [Who], Level, ?TRACE_OPTIONS) of
{ok, exists} ->
{reply, {error, existed}, State};
{ok, Trace} ->
{reply, ok, State#state{trace_map = maps:put(Who, {Trace, LogFile}, TraceMap)}};
{error, Error} ->
{reply, {error, Error}, State}
end;
handle_call({stop_trace, Who}, _From, State = #state{trace_map = TraceMap}) ->
case maps:find(Who, TraceMap) of
{ok, {Trace, _LogFile}} ->
case lager:stop_trace(Trace) of
ok -> ok;
{error, Error} -> lager:error("Stop trace ~p error: ~p", [Who, Error])
end,
{reply, ok, State#state{trace_map = maps:remove(Who, TraceMap)}};
error ->
{reply, {error, not_found}, State}
end;
handle_call(all_traces, _From, State = #state{trace_map = TraceMap}) ->
{reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} <- maps:to_list(TraceMap)], State};
handle_call(_Req, _From, State) ->
{reply, error, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -14,6 +14,8 @@
%{versions, ['tlsv1.2', 'tlsv1.1']}
]},
{lager, [
{colored, true},
{async_threshold, 1000},
{error_logger_redirect, false},
{crash_log, "log/emqttd_crash.log"},
{handlers, [
@ -24,15 +26,15 @@
{level, info},
{size, 104857600},
{date, "$D0"},
{count, 10}
{count, 30}
]},
{lager_file_backend, [
{formatter_config, [time, " [",severity,"] ", message, "\n"]},
{file, "log/emqttd_error.log"},
{level, error},
{size, 10485760},
{size, 104857600},
{date, "$D0"},
{count, 10}
{count, 30}
]}
]}
]},
@ -56,21 +58,21 @@
{internal, [{file, "etc/acl.config"}, {nomatch, allow}]}
]}
]},
%% Packet
{packet, [
%% MQTT Packet
{mqtt_packet, [
{max_clientid_len, 1024},
{max_packet_size, 16#ffff}
{max_packet_size, 4096}
]},
%% Session
{session, [
{expires, 1}, %hour
%% MQTT Session
{mqtt_session, [
{expires, 24}, %hour
{max_queue, 1000},
{store_qos0, false}
]},
%% Retain messages
{retained, [
{max_message_num, 100000},
{max_playload_size, 16#ffff}
{max_playload_size, 4096}
]},
%% PubSub
{pubsub, []},
@ -91,6 +93,7 @@
{listen, [
{mqtt, 1883, [
{backlog, 512},
%{buffer, 4096},
{acceptors, 16},
{max_clients, 1024},
{access, [{allow, all}]}

View File

@ -271,6 +271,28 @@ case "$1" in
$NODETOOL rpc emqttd_ctl listeners $@
;;
trace)
# Make sure the local node IS running
RES=`$NODETOOL ping`
if [ "$RES" != "pong" ]; then
echo "emqttd is not running!"
exit 1
fi
if [ $# -eq 2 -a $2 = "list" ]; then
$NODETOOL rpc emqttd_ctl trace list
elif [ $# -eq 4 ]; then
shift
$NODETOOL rpc emqttd_ctl trace $@
else
echo "Usage: "
echo "$SCRIPT trace list"
echo "$SCRIPT trace client <ClientId> <LogFile>"
echo "$SCRIPT trace client <ClientId> off"
echo "$SCRIPT trace topic <Topic> <LogFile>"
echo "$SCRIPT trace topic <Topic> off"
exit 1
fi
;;
*)
echo "Usage: $SCRIPT"
@ -280,15 +302,25 @@ case "$1" in
echo " stats #query broker statistics of clients, topics, subscribers"
echo " metrics #query broker metrics"
echo " cluster [<Node>] #query or cluster nodes"
echo " ----------------------------------------------------------------"
echo " plugins list #query loaded plugins"
echo " plugins load <Plugin> #load plugin"
echo " plugins unload <Plugin> #unload plugin"
echo " ----------------------------------------------------------------"
echo " bridges list #query bridges"
echo " bridges start <Node> <Topic> #start bridge"
echo " bridges stop <Node> <Topic> #stop bridge"
echo " ----------------------------------------------------------------"
echo " useradd <Username> <Password> #add user"
echo " userdel <Username> #delete user"
echo " ----------------------------------------------------------------"
echo " listeners #query broker listeners"
echo " ----------------------------------------------------------------"
echo " trace list #query all traces"
echo " trace client <ClientId> <LogFile> #trace client with ClientId"
echo " trace client <ClientId> off #stop to trace client"
echo " trace topic <Topic> <LogFile> #trace topic with Topic"
echo " trace topic <Topic> off #stop to trace Topic"
exit 1
;;