diff --git a/apps/emqtt/src/emqtt_client.erl b/apps/emqtt/src/emqtt_client.erl index e20c33b72..f665c3c1d 100644 --- a/apps/emqtt/src/emqtt_client.erl +++ b/apps/emqtt/src/emqtt_client.erl @@ -24,7 +24,7 @@ -behaviour(gen_server). --export([start_link/1, info/1]). +-export([start_link/1, info/1, go/2]). -export([init/1, handle_call/3, @@ -63,24 +63,22 @@ Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). start_link(Sock) -> - Res = gen_server:start_link(?MODULE, [Sock], []), - ?INFO("~p", [Res]). + gen_server:start_link(?MODULE, [Sock], []). info(Pid) -> gen_server:call(Pid, info). +go(Pid, Sock) -> + gen_server:call(Pid, {go, Sock}). + init([Sock]) -> - process_flag(trap_exit, true), - esockd_client:ack(Sock), - %%TODO: Move to esockd... - ok = throw_on_error( - inet_error, fun () -> emqtt_net:tune_buffer_size(Sock) end), + {ok, #state{socket = Sock}}. + +handle_call({go, Sock}, _From, State=#state{socket = Sock}) -> {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), - %FIXME: merge to registry - %%emqtt_client_monitor:mon(self()), - {ok, - control_throttle( - #state{ socket = Sock, + {reply, ok, + control_throttle( + #state{ socket = Sock, conn_name = ConnStr, await_recv = false, connection_state = running, @@ -89,9 +87,7 @@ init([Sock]) -> message_id = 1, subtopics = [], awaiting_ack = gb_trees:empty(), - awaiting_rel = gb_trees:empty()})}. - - %{ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}. + awaiting_rel = gb_trees:empty()})}; handle_call(duplicate_id, _From, State=#state{conn_name=ConnName, client_id=ClientId}) -> ?ERROR("Shutdown for duplicate clientid:~s, conn:~s", [ClientId, ConnName]), diff --git a/apps/emqtt/src/emqtt_net.erl b/apps/emqtt/src/emqtt_net.erl index 97f7e2a28..545e31061 100644 --- a/apps/emqtt/src/emqtt_net.erl +++ b/apps/emqtt/src/emqtt_net.erl @@ -24,7 +24,7 @@ -export([tcp_name/3, tcp_host/1, getaddr/2, port_to_listeners/1]). --export([tune_buffer_size/1, connection_string/2]). +-export([connection_string/2]). -include_lib("kernel/include/inet.hrl"). @@ -154,13 +154,6 @@ tcp_name(Prefix, IPAddress, Port) io_lib:format( "~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port]))). -tune_buffer_size(Sock) -> - case getopts(Sock, [sndbuf, recbuf, buffer]) of - {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), - setopts(Sock, [{buffer, BufSz}]); - Err -> Err - end. - connection_string(Sock, Direction) -> case socket_ends(Sock, Direction) of {ok, {FromAddress, FromPort, ToAddress, ToPort}} ->