This commit is contained in:
Feng Lee 2014-12-07 11:56:31 +08:00
parent f51b961ccc
commit eed75c5802
2 changed files with 13 additions and 24 deletions

View File

@ -24,7 +24,7 @@
-behaviour(gen_server).
-export([start_link/1, info/1]).
-export([start_link/1, info/1, go/2]).
-export([init/1,
handle_call/3,
@ -63,24 +63,22 @@
Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
start_link(Sock) ->
Res = gen_server:start_link(?MODULE, [Sock], []),
?INFO("~p", [Res]).
gen_server:start_link(?MODULE, [Sock], []).
info(Pid) ->
gen_server:call(Pid, info).
go(Pid, Sock) ->
gen_server:call(Pid, {go, Sock}).
init([Sock]) ->
process_flag(trap_exit, true),
esockd_client:ack(Sock),
%%TODO: Move to esockd...
ok = throw_on_error(
inet_error, fun () -> emqtt_net:tune_buffer_size(Sock) end),
{ok, #state{socket = Sock}}.
handle_call({go, Sock}, _From, State=#state{socket = Sock}) ->
{ok, ConnStr} = emqtt_net:connection_string(Sock, inbound),
%FIXME: merge to registry
%%emqtt_client_monitor:mon(self()),
{ok,
control_throttle(
#state{ socket = Sock,
{reply, ok,
control_throttle(
#state{ socket = Sock,
conn_name = ConnStr,
await_recv = false,
connection_state = running,
@ -89,9 +87,7 @@ init([Sock]) ->
message_id = 1,
subtopics = [],
awaiting_ack = gb_trees:empty(),
awaiting_rel = gb_trees:empty()})}.
%{ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}.
awaiting_rel = gb_trees:empty()})};
handle_call(duplicate_id, _From, State=#state{conn_name=ConnName, client_id=ClientId}) ->
?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]),

View File

@ -24,7 +24,7 @@
-export([tcp_name/3, tcp_host/1, getaddr/2, port_to_listeners/1]).
-export([tune_buffer_size/1, connection_string/2]).
-export([connection_string/2]).
-include_lib("kernel/include/inet.hrl").
@ -154,13 +154,6 @@ tcp_name(Prefix, IPAddress, Port)
io_lib:format(
"~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))).
tune_buffer_size(Sock) ->
case getopts(Sock, [sndbuf, recbuf, buffer]) of
{ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
setopts(Sock, [{buffer, BufSz}]);
Err -> Err
end.
connection_string(Sock, Direction) ->
case socket_ends(Sock, Direction) of
{ok, {FromAddress, FromPort, ToAddress, ToPort}} ->