diff --git a/src/emqttd_ws_client.erl b/src/emqttd_ws_client.erl index bd416fa27..3641a98a8 100644 --- a/src/emqttd_ws_client.erl +++ b/src/emqttd_ws_client.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,50 +18,64 @@ -behaviour(gen_server). +-author("Feng Lee "). + -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). %% API Exports --export([start_link/4, session/1, info/1, kick/1]). +-export([start_link/4]). + +%% Management and Monitor API +-export([info/1, stats/1, kick/1]). %% SUB/UNSUB Asynchronously -export([subscribe/2, unsubscribe/2]). +%% Get the session proc? +-export([session/1]). + %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% WebSocket Client State --record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive}). +-record(wsclient_state, {ws_pid, peer, connection, proto_state, keepalive, + enable_stats, stats_timer}). + +-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(WSLOG(Level, Peer, Format, Args), lager:Level("WsClient(~s): " ++ Format, [Peer | Args])). %% @doc Start WebSocket Client. -start_link(MqttEnv, WsPid, Req, ReplyChannel) -> - gen_server:start_link(?MODULE, [MqttEnv, WsPid, Req, ReplyChannel], []). - -session(CPid) -> - gen_server:call(CPid, session, infinity). +start_link(Env, WsPid, Req, ReplyChannel) -> + gen_server:start_link(?MODULE, [Env, WsPid, Req, ReplyChannel], []). info(CPid) -> - gen_server:call(CPid, info, infinity). + gen_server:call(CPid, info). + +stats(CPid) -> + gen_server:call(CPid, stats). kick(CPid) -> gen_server:call(CPid, kick). subscribe(CPid, TopicTable) -> - gen_server:cast(CPid, {subscribe, TopicTable}). + CPid ! {subscribe, TopicTable}. unsubscribe(CPid, Topics) -> - gen_server:cast(CPid, {unsubscribe, Topics}). + CPid ! {unsubscribe, Topics}. + +session(CPid) -> + gen_server:call(CPid, session). %%-------------------------------------------------------------------- %% gen_server Callbacks %%-------------------------------------------------------------------- -init([MqttEnv, WsPid, Req, ReplyChannel]) -> +init([Env, WsPid, Req, ReplyChannel]) -> process_flag(trap_exit, true), true = link(WsPid), {ok, Peername} = Req:get(peername), @@ -73,45 +87,41 @@ init([MqttEnv, WsPid, Req, ReplyChannel]) -> emqttd_metrics:inc('bytes/sent', iolist_size(Data)), ReplyChannel({binary, Data}) end, + EnableStats = proplists:get_value(client_enable_stats, Env, false), ProtoState = emqttd_protocol:init(Peername, SendFun, - [{ws_initial_headers, Headers} | MqttEnv]), - {ok, #wsclient_state{ws_pid = WsPid, peer = Req:get(peer), - connection = Req:get(connection), - proto_state = ProtoState}, idle_timeout(MqttEnv)}. + [{ws_initial_headers, Headers} | Env]), + {ok, maybe_enable_stats(#wsclient_state{ws_pid = WsPid, + peer = Req:get(peer), + connection = Req:get(connection), + proto_state = ProtoState, + enable_stats = EnableStats}), + proplists:get_value(client_idle_timeout, Env, 30000)}. -idle_timeout(MqttEnv) -> - timer:seconds(proplists:get_value(client_idle_timeout, MqttEnv, 10)). +handle_call(info, From, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> + Info = [{websocket, true}, {peer, Peer} | emqttd_protocol:info(ProtoState)], + {reply, Stats, _} = handle_call(stats, From, State), + {reply, lists:append(Info, Stats), State}; -handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> - {reply, emqttd_protocol:session(ProtoState), State}; - -handle_call(info, _From, State = #wsclient_state{peer = Peer, - proto_state = ProtoState}) -> - ProtoInfo = emqttd_protocol:info(ProtoState), - {reply, [{websocket, true}, {peer, Peer}| ProtoInfo], State}; +handle_call(stats, _From, State = #wsclient_state{proto_state = ProtoState}) -> + {reply, lists:append([emqttd_misc:proc_stats(), + wsock_stats(State), + emqttd_protocol:stats(ProtoState)]), State}; handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; +handle_call(session, _From, State = #wsclient_state{proto_state = ProtoState}) -> + {reply, emqttd_protocol:session(ProtoState), State}; + handle_call(Req, _From, State = #wsclient_state{peer = Peer}) -> - ?WSLOG(critical, Peer, "Unexpected request: ~p", [Req]), + ?WSLOG(error, Peer, "Unexpected request: ~p", [Req]), {reply, {error, unsupported_request}, State}. -handle_cast({subscribe, TopicTable}, State) -> - with_proto_state(fun(ProtoState) -> - emqttd_protocol:handle({subscribe, TopicTable}, ProtoState) - end, State); - -handle_cast({unsubscribe, Topics}, State) -> - with_proto_state(fun(ProtoState) -> - emqttd_protocol:handle({unsubscribe, Topics}, ProtoState) - end, State); - handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state = ProtoState}) -> emqttd_metrics:received(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - noreply(State#wsclient_state{proto_state = ProtoState1}); + {noreply, State#wsclient_state{proto_state = ProtoState1}, hibernate}; {error, Error} -> ?WSLOG(error, Peer, "Protocol error - ~p", [Error]), shutdown(Error, State); @@ -122,28 +132,46 @@ handle_cast({received, Packet}, State = #wsclient_state{peer = Peer, proto_state end; handle_cast(Msg, State = #wsclient_state{peer = Peer}) -> - ?WSLOG(critical, Peer, "Unexpected msg: ~p", [Msg]), - noreply(State). + ?WSLOG(error, Peer, "Unexpected msg: ~p", [Msg]), + {noreply, State}. + +handle_info({subscribe, TopicTable}, State) -> + with_proto( + fun(ProtoState) -> + emqttd_protocol:subscribe(TopicTable, ProtoState) + end, State); + +handle_info({unsubscribe, Topics}, State) -> + with_proto( + fun(ProtoState) -> + emqttd_protocol:unsubscribe(Topics, ProtoState) + end, State); + +handle_info({suback, PacketId, GrantedQos}, State) -> + with_proto( + fun(ProtoState) -> + Packet = ?SUBACK_PACKET(PacketId, GrantedQos), + emqttd_protocol:send(Packet, ProtoState) + end, State); + +handle_info({deliver, Message}, State) -> + with_proto( + fun(ProtoState) -> + emqttd_protocol:send(Message, ProtoState) + end, State); + +handle_info({redeliver, {?PUBREL, PacketId}}, State) -> + with_proto( + fun(ProtoState) -> + emqttd_protocol:pubrel(PacketId, ProtoState) + end, State); + +handle_info({timeout, _Timer, emit_stats}, State) -> + {noreply, maybe_enable_stats(emit_stats(State)), hibernate}; handle_info(timeout, State) -> shutdown(idle_timeout, State); -handle_info({suback, PacketId, GrantedQos}, State) -> - with_proto_state(fun(ProtoState) -> - Packet = ?SUBACK_PACKET(PacketId, GrantedQos), - emqttd_protocol:send(Packet, ProtoState) - end, State); - -handle_info({deliver, Message}, State) -> - with_proto_state(fun(ProtoState) -> - emqttd_protocol:send(Message, ProtoState) - end, State); - -handle_info({redeliver, {?PUBREL, PacketId}}, State) -> - with_proto_state(fun(ProtoState) -> - emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState) - end, State); - handle_info({shutdown, conflict, {ClientId, NewPid}}, State = #wsclient_state{peer = Peer}) -> ?WSLOG(warning, Peer, "clientid '~s' conflict with ~p", [ClientId, NewPid]), shutdown(conflict, State); @@ -157,13 +185,13 @@ handle_info({keepalive, start, Interval}, State = #wsclient_state{peer = Peer, c end end, KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}), - noreply(State#wsclient_state{keepalive = KeepAlive}); + {noreply, stats_by_keepalive(State#wsclient_state{keepalive = KeepAlive})}; handle_info({keepalive, check}, State = #wsclient_state{peer = Peer, keepalive = KeepAlive}) -> case emqttd_keepalive:check(KeepAlive) of {ok, KeepAlive1} -> - noreply(State#wsclient_state{keepalive = KeepAlive1}); + {noreply, emit_stats(State#wsclient_state{keepalive = KeepAlive1}), hibernate}; {error, timeout} -> ?WSLOG(debug, Peer, "Keepalive Timeout!", []), shutdown(keepalive_timeout, State); @@ -180,8 +208,8 @@ handle_info({'EXIT', WsPid, Reason}, State = #wsclient_state{peer = Peer, ws_pid shutdown(Reason, State); handle_info(Info, State = #wsclient_state{peer = Peer}) -> - ?WSLOG(critical, Peer, "Unexpected Info: ~p", [Info]), - noreply(State). + ?WSLOG(error, Peer, "Unexpected Info: ~p", [Info]), + {noreply, State}. terminate(Reason, #wsclient_state{proto_state = ProtoState, keepalive = KeepAlive}) -> emqttd_keepalive:cancel(KeepAlive), @@ -199,12 +227,32 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- -with_proto_state(Fun, State = #wsclient_state{proto_state = ProtoState}) -> - {ok, ProtoState1} = Fun(ProtoState), - noreply(State#wsclient_state{proto_state = ProtoState1}). +maybe_enable_stats(State = #wsclient_state{enable_stats = false}) -> + State; +maybe_enable_stats(State = #wsclient_state{enable_stats = keepalive}) -> + State; +maybe_enable_stats(State = #wsclient_state{enable_stats = Interval}) -> + State#wsclient_state{stats_timer = emqttd_misc:start_timer(Interval, self(), emit_stats)}. -noreply(State) -> - {noreply, State, hibernate}. +stats_by_keepalive(State) -> + State#wsclient_state{enable_stats = keepalive}. + +emit_stats(State = #wsclient_state{enable_stats = false}) -> + State; +emit_stats(State = #wsclient_state{proto_state = ProtoState}) -> + {reply, Stats, _} = handle_call(stats, undefined, State), + emqttd_stats:set_client_stats(emqttd_protocol:clientid(ProtoState), Stats), + State. + +wsock_stats(#wsclient_state{connection = Conn}) -> + case Conn:getstat(?SOCK_STATS) of + {ok, Ss} -> Ss; + {error, _} -> [] + end. + +with_proto(Fun, State = #wsclient_state{proto_state = ProtoState}) -> + {ok, ProtoState1} = Fun(ProtoState), + {noreply, State#wsclient_state{proto_state = ProtoState1}}. shutdown(Reason, State) -> stop({shutdown, Reason}, State).