diff --git a/etc/emqtt.config b/etc/emqtt.config index d36330143..eca6ec071 100644 --- a/etc/emqtt.config +++ b/etc/emqtt.config @@ -12,12 +12,13 @@ ]}, {emqtt, [ {tcp_listeners, [1883]}, - {tcp_listen_options, [ - binary, - {packet, raw}, - {reuseaddr, true}, - {backlog, 128}, - {nodelay, true}]} + {tcp_listen_options, [binary, + {packet, raw}, + {reuseaddr, true}, + {backlog, 128}, + {nodelay, true}, + {linger, {true, 0}}, + {exit_on_close, false}]} ]} ]. diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index 9059173b0..6629914ad 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -23,8 +23,12 @@ init([]) -> {ok, undefined, hibernate, {backoff, 1000, 1000, 10000}}. handle_call({go, Sock}, _From, State) -> - error_logger:info_msg("go.... sock: ~p", [Sock]), - inet:setopts(Sock, [{active, true}]), + process_flag(trap_exit, true), + ok = throw_on_error( + inet_error, fun () -> emqtt_net:tune_buffer_size(Sock) end), + {ok, ConnStr} = emqtt_net:connection_string(Sock, inbound), + error_logger:info_msg("accepting MQTT connection (~s)~n", [ConnStr]), + %inet:setopts(Sock, [{active, once}]), {reply, ok, State}. handle_cast(Msg, State) -> @@ -39,3 +43,11 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +throw_on_error(E, Thunk) -> + case Thunk() of + {error, Reason} -> throw({E, Reason}); + {ok, Res} -> Res; + Res -> Res + end. + + diff --git a/src/emqtt_net.erl b/src/emqtt_net.erl new file mode 100644 index 000000000..68d322747 --- /dev/null +++ b/src/emqtt_net.erl @@ -0,0 +1,86 @@ +-module(emqtt_net). + +-export([tune_buffer_size/1, connection_string/2]). + +-include_lib("kernel/include/inet.hrl"). + +tune_buffer_size(Sock) -> + case getopts(Sock, [sndbuf, recbuf, buffer]) of + {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), + setopts(Sock, [{buffer, BufSz}]); + Err -> Err + end. + +connection_string(Sock, Direction) -> + case socket_ends(Sock, Direction) of + {ok, {FromAddress, FromPort, ToAddress, ToPort}} -> + {ok, format( + "~s:~p -> ~s:~p", + [maybe_ntoab(FromAddress), FromPort, + maybe_ntoab(ToAddress), ToPort])}; + Error -> + Error + end. + +format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)). + +socket_ends(Sock, Direction) -> + {From, To} = sock_funs(Direction), + case {From(Sock), To(Sock)} of + {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} -> + {ok, {rdns(FromAddress), FromPort, + rdns(ToAddress), ToPort}}; + {{error, _Reason} = Error, _} -> + Error; + {_, {error, _Reason} = Error} -> + Error + end. + +maybe_ntoab(Addr) when is_tuple(Addr) -> ntoab(Addr); +maybe_ntoab(Host) -> Host. + +rdns(Addr) -> Addr. + +sock_funs(inbound) -> {fun peername/1, fun sockname/1}; +sock_funs(outbound) -> {fun sockname/1, fun peername/1}. + +getopts(Sock, Options) when is_port(Sock) -> + inet:getopts(Sock, Options). + +setopts(Sock, Options) when is_port(Sock) -> + inet:setopts(Sock, Options). + +sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). + +peername(Sock) when is_port(Sock) -> inet:peername(Sock). + +ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> + inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); +ntoa(IP) -> + inet_parse:ntoa(IP). + +ntoab(IP) -> + Str = ntoa(IP), + case string:str(Str, ":") of + 0 -> Str; + _ -> "[" ++ Str ++ "]" + end. + +tcp_host({0,0,0,0}) -> + hostname(); + +tcp_host({0,0,0,0,0,0,0,0}) -> + hostname(); + +tcp_host(IPAddress) -> + case inet:gethostbyaddr(IPAddress) of + {ok, #hostent{h_name = Name}} -> Name; + {error, _Reason} -> ntoa(IPAddress) + end. + +hostname() -> + {ok, Hostname} = inet:gethostname(), + case inet:gethostbyname(Hostname) of + {ok, #hostent{h_name = Name}} -> Name; + {error, _Reason} -> Hostname + end.