Improve the module and support statistics

This commit is contained in:
Feng Lee 2017-02-16 11:25:10 +08:00
parent 23e49c317d
commit 90ff296ebe
1 changed files with 112 additions and 64 deletions

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2012-2017 Feng Lee <feng@emqtt.io>. %% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -18,50 +18,64 @@
-behaviour(gen_server). -behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl"). -include("emqttd.hrl").
-include("emqttd_protocol.hrl"). -include("emqttd_protocol.hrl").
%% API Exports %% API Exports
-export([start_link/4, session/1, info/1, kick/1]). -export([start_link/4]).
%% Management and Monitor API
-export([info/1, stats/1, kick/1]).
%% SUB/UNSUB Asynchronously %% SUB/UNSUB Asynchronously
-export([subscribe/2, unsubscribe/2]). -export([subscribe/2, unsubscribe/2]).
%% Get the session proc?
-export([session/1]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% WebSocket Client State %% WebSocket Client State
-record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive}). -record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive,
enable_stats, stats_timer}).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
-define(WSLOG(Level, Peer, Format, Args), -define(WSLOG(Level, Peer, Format, Args),
lager:Level("WsClient(~s): " ++ Format, [Peer | Args])). lager:Level("WsClient(~s): " ++ Format, [Peer | Args])).
%% @doc Start WebSocket Client. %% @doc Start WebSocket Client.
start_link(MqttEnv, WsPid, Req, ReplyChannel) -> start_link(Env, WsPid, Req, ReplyChannel) ->
gen_server:start_link(?MODULE, [MqttEnv, WsPid, Req, ReplyChannel], []). gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], []).
session(CPid) ->
gen_server:call(CPid, session, infinity).
info(CPid) -> info(CPid) ->
gen_server:call(CPid, info, infinity). gen_server:call(CPid, info).
stats(CPid) ->
gen_server:call(CPid, stats).
kick(CPid) -> kick(CPid) ->
gen_server:call(CPid, kick). gen_server:call(CPid, kick).
subscribe(CPid, TopicTable) -> subscribe(CPid, TopicTable) ->
gen_server:cast(CPid, {subscribe, TopicTable}). CPid ! {subscribe, TopicTable}.
unsubscribe(CPid, Topics) -> unsubscribe(CPid, Topics) ->
gen_server:cast(CPid, {unsubscribe, Topics}). CPid ! {unsubscribe, Topics}.
session(CPid) ->
gen_server:call(CPid, session).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server Callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([MqttEnv, WsPid, Req, ReplyChannel]) -> init([Env, WsPid, Req, ReplyChannel]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
true = link(WsPid), true = link(WsPid),
{ok, Peername} = Req:get(peername), {ok, Peername} = Req:get(peername),
@ -73,45 +87,41 @@ init([MqttEnv, WsPid, Req, ReplyChannel]) ->
emqttd_metrics:inc('bytes/sent', iolist_size(Data)), emqttd_metrics:inc('bytes/sent', iolist_size(Data)),
ReplyChannel({binary, Data}) ReplyChannel({binary, Data})
end, end,
EnableStats = proplists:get_value(client_enable_stats, Env, false),
ProtoState = emqttd_protocol:init(Peername, SendFun, ProtoState = emqttd_protocol:init(Peername, SendFun,
[{ws_initial_headers, Headers} | MqttEnv]), [{ws_initial_headers, Headers} | Env]),
{ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer), {ok, maybe_enable_stats(#wsclient_state{ws_pid = WsPid,
peer = Req:get(peer),
connection = Req:get(connection), connection = Req:get(connection),
proto_state = ProtoState}, idle_timeout(MqttEnv)}. proto_state = ProtoState,
enable_stats = EnableStats}),
proplists:get_value(client_idle_timeout, Env, 30000)}.
idle_timeout(MqttEnv) -> handle_call(info, From, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
timer:seconds(proplists:get_value(client_idle_timeout, MqttEnv, 10)). Info = [{websocket, true}, {peer, Peer} | emqttd_protocol:info(ProtoState)],
{reply, Stats, _} = handle_call(stats, From, State),
{reply, lists:append(Info, Stats), State};
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State}; {reply, lists:append([emqttd_misc:proc_stats(),
wsock_stats(State),
handle_call(info, _From, State = #wsclient_state{peer = Peer, emqttd_protocol:stats(ProtoState)]), State};
proto_state = ProtoState}) ->
ProtoInfo = emqttd_protocol:info(ProtoState),
{reply, [{websocket, true}, {peer, Peer}| ProtoInfo], State};
handle_call(kick, _From, State) -> handle_call(kick, _From, State) ->
{stop, {shutdown, kick}, ok, State}; {stop, {shutdown, kick}, ok, State};
handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) ->
{reply, emqttd_protocol:session(ProtoState), State};
handle_call(Req, _From, State = #wsclient_state{peer = Peer}) -> handle_call(Req, _From, State = #wsclient_state{peer = Peer}) ->
?WSLOG(critical, Peer, "Unexpected request: ~p", [Req]), ?WSLOG(error, Peer, "Unexpected request: ~p", [Req]),
{reply, {error, unsupported_request}, State}. {reply, {error, unsupported_request}, State}.
handle_cast({subscribe, TopicTable}, State) ->
with_proto_state(fun(ProtoState) ->
emqttd_protocol:handle({subscribe, TopicTable}, ProtoState)
end, State);
handle_cast({unsubscribe, Topics}, State) ->
with_proto_state(fun(ProtoState) ->
emqttd_protocol:handle({unsubscribe, Topics}, ProtoState)
end, State);
handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) ->
emqttd_metrics:received(Packet), emqttd_metrics:received(Packet),
case emqttd_protocol:received(Packet, ProtoState) of case emqttd_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} -> {ok, ProtoState1} ->
noreply(State#wsclient_state{proto_state = ProtoState1}); {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate};
{error, Error} -> {error, Error} ->
?WSLOG(error, Peer, "Protocol error - ~p", [Error]), ?WSLOG(error, Peer, "Protocol error - ~p", [Error]),
shutdown(Error, State); shutdown(Error, State);
@ -122,28 +132,46 @@ handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state
end; end;
handle_cast(Msg, State = #wsclient_state{peer = Peer}) -> handle_cast(Msg, State = #wsclient_state{peer = Peer}) ->
?WSLOG(critical, Peer, "Unexpected msg: ~p", [Msg]), ?WSLOG(error, Peer, "Unexpected msg: ~p", [Msg]),
noreply(State). {noreply, State}.
handle_info(timeout, State) -> handle_info({subscribe, TopicTable}, State) ->
shutdown(idle_timeout, State); with_proto(
fun(ProtoState) ->
emqttd_protocol:subscribe(TopicTable, ProtoState)
end, State);
handle_info({unsubscribe, Topics}, State) ->
with_proto(
fun(ProtoState) ->
emqttd_protocol:unsubscribe(Topics, ProtoState)
end, State);
handle_info({suback, PacketId, GrantedQos}, State) -> handle_info({suback, PacketId, GrantedQos}, State) ->
with_proto_state(fun(ProtoState) -> with_proto(
fun(ProtoState) ->
Packet = ?SUBACK_PACKET(PacketId, GrantedQos), Packet = ?SUBACK_PACKET(PacketId, GrantedQos),
emqttd_protocol:send(Packet, ProtoState) emqttd_protocol:send(Packet, ProtoState)
end, State); end, State);
handle_info({deliver, Message}, State) -> handle_info({deliver, Message}, State) ->
with_proto_state(fun(ProtoState) -> with_proto(
fun(ProtoState) ->
emqttd_protocol:send(Message, ProtoState) emqttd_protocol:send(Message, ProtoState)
end, State); end, State);
handle_info({redeliver, {?PUBREL, PacketId}}, State) -> handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
with_proto_state(fun(ProtoState) -> with_proto(
emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState) fun(ProtoState) ->
emqttd_protocol:pubrel(PacketId, ProtoState)
end, State); end, State);
handle_info({timeout, _Timer, emit_stats}, State) ->
{noreply, maybe_enable_stats(emit_stats(State)), hibernate};
handle_info(timeout, State) ->
shutdown(idle_timeout, State);
handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) -> handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) ->
?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]), ?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
shutdown(conflict, State); shutdown(conflict, State);
@ -157,13 +185,13 @@ handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, c
end end
end, end,
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
noreply(State#wsclient_state{keepalive = KeepAlive}); {noreply, stats_by_keepalive(State#wsclient_state{keepalive = KeepAlive})};
handle_info({keepalive, check}, State = #wsclient_state{peer = Peer, handle_info({keepalive, check}, State = #wsclient_state{peer = Peer,
keepalive = KeepAlive}) -> keepalive = KeepAlive}) ->
case emqttd_keepalive:check(KeepAlive) of case emqttd_keepalive:check(KeepAlive) of
{ok, KeepAlive1} -> {ok, KeepAlive1} ->
noreply(State#wsclient_state{keepalive = KeepAlive1}); {noreply, emit_stats(State#wsclient_state{keepalive = KeepAlive1}), hibernate};
{error, timeout} -> {error, timeout} ->
?WSLOG(debug, Peer, "Keepalive Timeout!", []), ?WSLOG(debug, Peer, "Keepalive Timeout!", []),
shutdown(keepalive_timeout, State); shutdown(keepalive_timeout, State);
@ -180,8 +208,8 @@ handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{peer = Peer, ws_pid
shutdown(Reason, State); shutdown(Reason, State);
handle_info(Info, State = #wsclient_state{peer = Peer}) -> handle_info(Info, State = #wsclient_state{peer = Peer}) ->
?WSLOG(critical, Peer, "Unexpected Info: ~p", [Info]), ?WSLOG(error, Peer, "Unexpected Info: ~p", [Info]),
noreply(State). {noreply, State}.
terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) ->
emqttd_keepalive:cancel(KeepAlive), emqttd_keepalive:cancel(KeepAlive),
@ -199,12 +227,32 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
with_proto_state(Fun, State = #wsclient_state{proto_state = ProtoState}) -> maybe_enable_stats(State = #wsclient_state{enable_stats = false}) ->
{ok, ProtoState1} = Fun(ProtoState), State;
noreply(State#wsclient_state{proto_state = ProtoState1}). maybe_enable_stats(State = #wsclient_state{enable_stats = keepalive}) ->
State;
maybe_enable_stats(State = #wsclient_state{enable_stats = Interval}) ->
State#wsclient_state{stats_timer = emqttd_misc:start_timer(Interval, self(), emit_stats)}.
noreply(State) -> stats_by_keepalive(State) ->
{noreply, State, hibernate}. State#wsclient_state{enable_stats = keepalive}.
emit_stats(State = #wsclient_state{enable_stats = false}) ->
State;
emit_stats(State = #wsclient_state{proto_state = ProtoState}) ->
{reply, Stats, _} = handle_call(stats, undefined, State),
emqttd_stats:set_client_stats(emqttd_protocol:clientid(ProtoState), Stats),
State.
wsock_stats(#wsclient_state{connection = Conn}) ->
case Conn:getstat(?SOCK_STATS) of
{ok, Ss} -> Ss;
{error, _} -> []
end.
with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) ->
{ok, ProtoState1} = Fun(ProtoState),
{noreply, State#wsclient_state{proto_state = ProtoState1}}.
shutdown(Reason, State) -> shutdown(Reason, State) ->
stop({shutdown, Reason}, State). stop({shutdown, Reason}, State).