refactor client
This commit is contained in:
parent
8d9270a6ba
commit
f4cf90ae91
|
@ -29,8 +29,8 @@
|
|||
{deps, [
|
||||
{gproc, ".*", {git, "git://github.com/uwiger/gproc.git", {branch, "master"}}},
|
||||
{lager, ".*", {git, "git://github.com/basho/lager.git", {branch, "master"}}},
|
||||
{esockd, "2.*", {git, "git://github.com/emqtt/esockd.git", {branch, "master"}}},
|
||||
{mochiweb, ".*", {git, "git://github.com/emqtt/mochiweb.git", {branch, "master"}}}
|
||||
{esockd, "3.*", {git, "git://github.com/emqtt/esockd.git", {branch, "3.0"}}},
|
||||
{mochiweb, ".*", {git, "git://github.com/emqtt/mochiweb.git", {branch, "4.0"}}}
|
||||
]}.
|
||||
|
||||
{recursive_cmds, [ct, eunit, clean]}.
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_client).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
@ -33,40 +32,35 @@
|
|||
|
||||
-include("emqttd_protocol.hrl").
|
||||
|
||||
-include("emqttd_internel.hrl").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API Function Exports
|
||||
-export([start_link/2, session/1, info/1, kick/1]).
|
||||
|
||||
%% SUB/UNSUB Asynchronously
|
||||
%% SUB/UNSUB Asynchronously, called by plugins.
|
||||
-export([subscribe/2, unsubscribe/2]).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% gen_server Function Exports
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
code_change/3, terminate/2]).
|
||||
|
||||
%% Client State...
|
||||
-record(state, {transport,
|
||||
socket,
|
||||
peername,
|
||||
conn_name,
|
||||
await_recv,
|
||||
conn_state,
|
||||
rate_limiter,
|
||||
parser,
|
||||
proto_state,
|
||||
packet_opts,
|
||||
keepalive}).
|
||||
%% Client State
|
||||
-record(client_state, {connection, peername, peerhost, peerport,
|
||||
await_recv, conn_state, rate_limit,
|
||||
parser_fun, proto_state, packet_opts,
|
||||
keepalive}).
|
||||
|
||||
-define(DEBUG(Format, Args, State),
|
||||
lager:debug("Client(~s): " ++ Format,
|
||||
[emqttd_net:format(State#state.peername) | Args])).
|
||||
-define(ERROR(Format, Args, State),
|
||||
lager:error("Client(~s): " ++ Format,
|
||||
[emqttd_net:format(State#state.peername) | Args])).
|
||||
-define(INFO_KEYS, [peername, peerhost, peerport, await_recv, conn_state]).
|
||||
|
||||
start_link(SockArgs, MqttEnv) ->
|
||||
{ok, proc_lib:spawn_link(?MODULE, init, [[SockArgs, MqttEnv]])}.
|
||||
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
|
||||
|
||||
-define(LOG(Level, Format, Args, State),
|
||||
lager:Level("Client(~s): " ++ Format, [State#client_state.peername | Args])).
|
||||
|
||||
start_link(Connection, MqttEnv) ->
|
||||
{ok, proc_lib:spawn_link(?MODULE, init, [[Connection, MqttEnv]])}.
|
||||
|
||||
session(CPid) ->
|
||||
gen_server:call(CPid, session, infinity).
|
||||
|
@ -83,140 +77,156 @@ subscribe(CPid, TopicTable) ->
|
|||
unsubscribe(CPid, Topics) ->
|
||||
gen_server:cast(CPid, {unsubscribe, Topics}).
|
||||
|
||||
init([SockArgs = {Transport, Sock, _SockFun}, MqttEnv]) ->
|
||||
% Transform if ssl.
|
||||
{ok, NewSock} = esockd_connection:accept(SockArgs),
|
||||
%%TODO:...
|
||||
{ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]),
|
||||
io:format("~p~n", [BufSizes]),
|
||||
{ok, Peername} = emqttd_net:peername(Sock),
|
||||
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
|
||||
SendFun = send_fun(Transport, NewSock),
|
||||
PktOpts = proplists:get_value(packet, MqttEnv),
|
||||
ProtoState = emqttd_protocol:init(Peername, SendFun, PktOpts),
|
||||
Limiter = proplists:get_value(rate_limiter, MqttEnv),
|
||||
State = run_socket(#state{transport = Transport,
|
||||
socket = NewSock,
|
||||
peername = Peername,
|
||||
conn_name = ConnStr,
|
||||
await_recv = false,
|
||||
conn_state = running,
|
||||
rate_limiter = Limiter,
|
||||
packet_opts = PktOpts,
|
||||
parser = emqttd_parser:new(PktOpts),
|
||||
proto_state = ProtoState}),
|
||||
init([Connection0, MqttEnv]) ->
|
||||
{ok, Connection} = Connection0:wait(),
|
||||
{PeerHost, PeerPort, PeerName} =
|
||||
case Connection:peername() of
|
||||
{ok, {Host, Port}} ->
|
||||
{Host, Port, esockd_net:format({Host, Port})};
|
||||
{error, enotconn} ->
|
||||
Connection:fast_close(),
|
||||
exit(normal);
|
||||
{error, Reason} ->
|
||||
Connection:fast_close(),
|
||||
exit({shutdown, Reason})
|
||||
end,
|
||||
SendFun = fun(Data) ->
|
||||
try Connection:async_send(Data) of
|
||||
true -> ok
|
||||
catch
|
||||
error:Error -> exit({shutdown, Error})
|
||||
end
|
||||
end,
|
||||
PktOpts = proplists:get_value(packet, MqttEnv),
|
||||
ParserFun = emqttd_parser:new(PktOpts),
|
||||
ProtoState = emqttd_protocol:init(PeerName, SendFun, PktOpts),
|
||||
RateLimit = proplists:get_value(rate_limit, Connection:opts()),
|
||||
State = run_socket(#client_state{connection = Connection,
|
||||
peername = PeerName,
|
||||
peerhost = PeerHost,
|
||||
peerport = PeerPort,
|
||||
await_recv = false,
|
||||
conn_state = running,
|
||||
rate_limit = RateLimit,
|
||||
parser_fun = ParserFun,
|
||||
proto_state = ProtoState,
|
||||
packet_opts = PktOpts}),
|
||||
ClientOpts = proplists:get_value(client, MqttEnv),
|
||||
IdleTimout = proplists:get_value(idle_timeout, ClientOpts, 10),
|
||||
gen_server:enter_loop(?MODULE, [], State, timer:seconds(IdleTimout)).
|
||||
|
||||
handle_call(session, _From, State = #state{proto_state = ProtoState}) ->
|
||||
handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
|
||||
{reply, emqttd_protocol:session(ProtoState), State};
|
||||
|
||||
handle_call(info, _From, State = #state{conn_name = ConnName,
|
||||
proto_state = ProtoState}) ->
|
||||
{reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State};
|
||||
handle_call(info, _From, State = #client_state{connection = Connection,
|
||||
proto_state = ProtoState}) ->
|
||||
ClientInfo = [{Key, Val} || {Key, Val} <- ?record_to_proplist(client_state, State), lists:member(Key, ?INFO_KEYS)],
|
||||
ProtoInfo = emqttd_protocol:info(ProtoState),
|
||||
{ok, SockStats} = Connection:getstat(?SOCK_STATS),
|
||||
Info = lists:append([ClientInfo, [{proto_info, ProtoInfo}, {sock_stats, SockStats}]]),
|
||||
{reply, Info, State};
|
||||
|
||||
handle_call(kick, _From, State) ->
|
||||
{stop, {shutdown, kick}, ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?ERROR("Unexpected request: ~p", [Req], State),
|
||||
{reply, {error, unsupported_request}, State}.
|
||||
?LOG(critical, "Unexpected request: ~p", [Req], State),
|
||||
{reply, {error, unsupported_request}, State}.
|
||||
|
||||
handle_cast({subscribe, TopicTable}, State) ->
|
||||
with_session(fun(SessPid) -> emqttd_session:subscribe(SessPid, TopicTable) end, State);
|
||||
with_session(fun(SessPid) ->
|
||||
emqttd_session:subscribe(SessPid, TopicTable)
|
||||
end, State);
|
||||
|
||||
handle_cast({unsubscribe, Topics}, State) ->
|
||||
with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
|
||||
with_session(fun(SessPid) ->
|
||||
emqttd_session:unsubscribe(SessPid, Topics)
|
||||
end, State);
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?ERROR("Unexpected msg: ~p",[Msg], State),
|
||||
{noreply, State}.
|
||||
?LOG(critical, "Unexpected msg: ~p", [Msg], State),
|
||||
noreply(State).
|
||||
|
||||
handle_info(timeout, State) ->
|
||||
stop({shutdown, timeout}, State);
|
||||
shutdown(idle_timeout, State);
|
||||
|
||||
%% Asynchronous SUBACK
|
||||
handle_info({suback, PacketId, GrantedQos}, State = #state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = emqttd_protocol:send(?SUBACK_PACKET(PacketId, GrantedQos), ProtoState),
|
||||
noreply(State#state{proto_state = ProtoState1});
|
||||
handle_info({suback, PacketId, GrantedQos}, State) ->
|
||||
with_proto_state(fun(ProtoState) ->
|
||||
Packet = ?SUBACK_PACKET(PacketId, GrantedQos),
|
||||
emqttd_protocol:send(Packet, ProtoState)
|
||||
end, State);
|
||||
|
||||
handle_info({deliver, Message}, State = #state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
|
||||
noreply(State#state{proto_state = ProtoState1});
|
||||
handle_info({deliver, Message}, State) ->
|
||||
with_proto_state(fun(ProtoState) ->
|
||||
emqttd_protocol:send(Message, ProtoState)
|
||||
end, State);
|
||||
|
||||
handle_info({redeliver, {?PUBREL, PacketId}}, State = #state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
|
||||
noreply(State#state{proto_state = ProtoState1});
|
||||
handle_info({redeliver, {?PUBREL, PacketId}}, State) ->
|
||||
with_proto_state(fun(ProtoState) ->
|
||||
emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState)
|
||||
end, State);
|
||||
|
||||
handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState,
|
||||
conn_name = ConnName}) ->
|
||||
lager:warning("Shutdown for duplicate clientid: ~s, conn:~s",
|
||||
[emqttd_protocol:clientid(ProtoState), ConnName]),
|
||||
stop({shutdown, duplicate_id}, State);
|
||||
handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
|
||||
?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
|
||||
shutdown(confict, State);
|
||||
|
||||
handle_info(activate_sock, State) ->
|
||||
noreply(run_socket(State#state{conn_state = running}));
|
||||
noreply(run_socket(State#client_state{conn_state = running}));
|
||||
|
||||
handle_info({inet_async, Sock, _Ref, {ok, Data}}, State = #state{peername = Peername, socket = Sock}) ->
|
||||
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
|
||||
Size = size(Data),
|
||||
lager:debug("RECV from ~s: ~p", [emqttd_net:format(Peername), Data]),
|
||||
?LOG(debug, "RECV: ~p", [Data], State),
|
||||
emqttd_metrics:inc('bytes/received', Size),
|
||||
received(Data, rate_limit(Size, State#state{await_recv = false}));
|
||||
received(Data, rate_limit(Size, State#client_state{await_recv = false}));
|
||||
|
||||
handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
||||
%%TODO: ...
|
||||
network_error(Reason, State);
|
||||
shutdown(Reason, State);
|
||||
|
||||
handle_info({inet_reply, _Ref, ok}, State) ->
|
||||
%%TODO: ok...
|
||||
io:format("inet_reply ok~n"),
|
||||
handle_info({inet_reply, _Sock, ok}, State) ->
|
||||
noreply(State);
|
||||
|
||||
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||
network_error(Reason, State);
|
||||
shutdown(Reason, State);
|
||||
|
||||
handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket}) ->
|
||||
?DEBUG("Start KeepAlive with ~p seconds", [TimeoutSec], State),
|
||||
handle_info({keepalive, start, Interval}, State = #client_state{connection = Connection}) ->
|
||||
?LOG(debug, "Keepalive at the interval of ~p", [Interval], State),
|
||||
StatFun = fun() ->
|
||||
case Transport:getstat(Socket, [recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||
{error, Error} -> {error, Error}
|
||||
end
|
||||
end,
|
||||
KeepAlive = emqttd_keepalive:start(StatFun, TimeoutSec, {keepalive, check}),
|
||||
noreply(State#state{keepalive = KeepAlive});
|
||||
case Connection:getstat([recv_oct]) of
|
||||
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
|
||||
{error, Error} -> {error, Error}
|
||||
end
|
||||
end,
|
||||
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
|
||||
noreply(State#client_state{keepalive = KeepAlive});
|
||||
|
||||
handle_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
|
||||
handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
|
||||
case emqttd_keepalive:check(KeepAlive) of
|
||||
{ok, KeepAlive1} ->
|
||||
noreply(State#state{keepalive = KeepAlive1});
|
||||
{error, timeout} ->
|
||||
?DEBUG("Keepalive Timeout!", [], State),
|
||||
stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
|
||||
?LOG(debug, "Keepalive timeout", [], State),
|
||||
shutdown(keepalive_timeout, State);
|
||||
{error, Error} ->
|
||||
?DEBUG("Keepalive Error - ~p", [Error], State),
|
||||
stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
|
||||
?LOG(warning, "Keepalive error - ~p", [Error], State),
|
||||
shutdown(Error, State)
|
||||
end;
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?ERROR("Unexpected info: ~p", [Info], State),
|
||||
{noreply, State}.
|
||||
?LOG(critical, "Unexpected info: ~p", [Info], State),
|
||||
noreply(State).
|
||||
|
||||
terminate(Reason, #state{transport = Transport,
|
||||
socket = Socket,
|
||||
keepalive = KeepAlive,
|
||||
proto_state = ProtoState}) ->
|
||||
terminate(Reason, #client_state{connection = Connection,
|
||||
keepalive = KeepAlive,
|
||||
proto_state = ProtoState}) ->
|
||||
Connection:fast_close(),
|
||||
emqttd_keepalive:cancel(KeepAlive),
|
||||
if
|
||||
Reason == {shutdown, conn_closed} -> ok;
|
||||
true -> Transport:fast_close(Socket)
|
||||
end,
|
||||
case {ProtoState, Reason} of
|
||||
{undefined, _} -> ok;
|
||||
{undefined, _} ->
|
||||
ok;
|
||||
{_, {shutdown, Error}} ->
|
||||
emqttd_protocol:shutdown(Error, ProtoState);
|
||||
{_, Reason} ->
|
||||
{_, Reason} ->
|
||||
emqttd_protocol:shutdown(Reason, ProtoState)
|
||||
end.
|
||||
|
||||
|
@ -227,79 +237,73 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%=============================================================================
|
||||
|
||||
noreply(State) ->
|
||||
{noreply, State, hibernate}.
|
||||
with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = Fun(ProtoState),
|
||||
noreply(State#client_state{proto_state = ProtoState1}).
|
||||
|
||||
stop(Reason, State) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
with_session(Fun, State = #state{proto_state = ProtoState}) ->
|
||||
Fun(emqttd_protocol:session(ProtoState)), noreply(State).
|
||||
with_session(Fun, State = #client_state{proto_state = ProtoState}) ->
|
||||
Fun(emqttd_protocol:session(ProtoState)),
|
||||
noreply(State).
|
||||
|
||||
%% receive and parse tcp data
|
||||
received(<<>>, State) ->
|
||||
noreply(State);
|
||||
|
||||
received(Bytes, State = #state{parser = Parser,
|
||||
packet_opts = PacketOpts,
|
||||
proto_state = ProtoState}) ->
|
||||
case catch Parser(Bytes) of
|
||||
received(Bytes, State = #client_state{parser_fun = ParserFun,
|
||||
packet_opts = PacketOpts,
|
||||
proto_state = ProtoState}) ->
|
||||
case catch ParserFun(Bytes) of
|
||||
{more, NewParser} ->
|
||||
noreply(run_socket(State#state{parser = NewParser}));
|
||||
noreply(run_socket(State#client_state{parser_fun = NewParser}));
|
||||
{ok, Packet, Rest} ->
|
||||
emqttd_metrics:received(Packet),
|
||||
case emqttd_protocol:received(Packet, ProtoState) of
|
||||
{ok, ProtoState1} ->
|
||||
received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
|
||||
proto_state = ProtoState1});
|
||||
received(Rest, State#client_state{parser_fun = emqttd_parser:new(PacketOpts),
|
||||
proto_state = ProtoState1});
|
||||
{error, Error} ->
|
||||
?ERROR("Protocol error - ~p", [Error], State),
|
||||
stop({shutdown, Error}, State);
|
||||
?LOG(error, "Protocol error - ~p", [Error], State),
|
||||
shutdown(Error, State);
|
||||
{error, Error, ProtoState1} ->
|
||||
stop({shutdown, Error}, State#state{proto_state = ProtoState1});
|
||||
shutdown(Error, State#client_state{proto_state = ProtoState1});
|
||||
{stop, Reason, ProtoState1} ->
|
||||
stop(Reason, State#state{proto_state = ProtoState1})
|
||||
stop(Reason, State#client_state{proto_state = ProtoState1})
|
||||
end;
|
||||
{error, Error} ->
|
||||
?ERROR("Framing error - ~p", [Error], State),
|
||||
stop({shutdown, Error}, State);
|
||||
?LOG(error, "Framing error - ~p", [Error], State),
|
||||
shutdown(Error, State);
|
||||
{'EXIT', Reason} ->
|
||||
?ERROR("Parser failed for ~p~nError Frame: ~p", [Reason, Bytes], State),
|
||||
{stop, {shutdown, frame_error}, State}
|
||||
?LOG(error, "Parser failed for ~p", [Reason], State),
|
||||
?LOG(error, "Error data: ~p", [Bytes], State),
|
||||
shutdown(parser_error, State)
|
||||
end.
|
||||
|
||||
network_error(Reason, State = #state{peername = Peername}) ->
|
||||
lager:warning("Client(~s): network error - ~p",
|
||||
[emqttd_net:format(Peername), Reason]),
|
||||
stop({shutdown, conn_closed}, State).
|
||||
|
||||
rate_limit(_Size, State = #state{rate_limiter = undefined}) ->
|
||||
rate_limit(_Size, State = #client_state{rate_limit = undefined}) ->
|
||||
run_socket(State);
|
||||
rate_limit(Size, State = #state{socket = Sock, rate_limiter = Limiter}) ->
|
||||
{ok, BufSizes} = inet:getopts(Sock, [sndbuf, recbuf, buffer]),
|
||||
io:format("~p~n", [BufSizes]),
|
||||
case esockd_rate_limiter:check(Limiter, Size) of
|
||||
rate_limit(Size, State = #client_state{rate_limit = Limiter}) ->
|
||||
case esockd_ratelimit:check(Limiter, Size) of
|
||||
{0, Limiter1} ->
|
||||
run_socket(State#state{conn_state = running, rate_limiter = Limiter1});
|
||||
run_socket(State#client_state{conn_state = running, rate_limit = Limiter1});
|
||||
{Pause, Limiter1} ->
|
||||
?ERROR("~p Received, Rate Limiter Pause for ~w", [Size, Pause], State),
|
||||
?LOG(error, "Rate limiter pause for ~p", [Size, Pause], State),
|
||||
erlang:send_after(Pause, self(), activate_sock),
|
||||
State#state{conn_state = blocked, rate_limiter = Limiter1}
|
||||
State#client_state{conn_state = blocked, rate_limit = Limiter1}
|
||||
end.
|
||||
|
||||
run_socket(State = #state{conn_state = blocked}) ->
|
||||
run_socket(State = #client_state{conn_state = blocked}) ->
|
||||
State;
|
||||
run_socket(State = #state{await_recv = true}) ->
|
||||
run_socket(State = #client_state{await_recv = true}) ->
|
||||
State;
|
||||
run_socket(State = #state{transport = Transport, socket = Sock}) ->
|
||||
Transport:async_recv(Sock, 0, infinity),
|
||||
State#state{await_recv = true}.
|
||||
run_socket(State = #client_state{connection = Connection}) ->
|
||||
Connection:async_recv(0, infinity),
|
||||
State#client_state{await_recv = true}.
|
||||
|
||||
noreply(State) ->
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
shutdown(Reason, State) ->
|
||||
stop({shutdown, Reason}, State).
|
||||
|
||||
stop(Reason, State) ->
|
||||
{stop, Reason, State}.
|
||||
|
||||
send_fun(Transport, Sock) ->
|
||||
fun(Data) ->
|
||||
try Transport:port_command(Sock, Data) of
|
||||
true -> ok
|
||||
catch
|
||||
error:Error -> exit({socket_error, Error})
|
||||
end
|
||||
end.
|
||||
|
|
Loading…
Reference in New Issue