From 763115e14925d5563dd9f42eca4873f97fea1bc0 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Sat, 16 Mar 2019 20:33:24 +0800 Subject: [PATCH] Delegate serialize fun into sendfun --- src/emqx_connection.erl | 11 +++++++++-- src/emqx_protocol.erl | 5 ++--- src/emqx_ws_connection.erl | 6 +++--- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index e30958735..beacbb255 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -141,7 +141,15 @@ init({Transport, RawSocket, Options}) -> ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N), EnableStats = emqx_zone:get_env(Zone, enable_stats, true), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), - SendFun = fun(Data) -> Transport:async_send(Socket, Data) end, + SendFun = fun(Packet, SeriaOpts) -> + Data = emqx_frame:serialize(Packet, SeriaOpts), + case Transport:async_send(Socket, Data) of + ok -> + {ok, Data}; + {error, Reason} -> + {error, Reason} + end + end, ProtoState = emqx_protocol:init(#{peername => Peername, sockname => Sockname, peercert => Peercert, @@ -484,4 +492,3 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index ecea0b54e..aa4ad5c8a 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -683,9 +683,8 @@ deliver({disconnect, _ReasonCode}, PState) -> -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = Send}) -> - Data = emqx_frame:serialize(Packet, #{version => Ver}), - case Send(Data) of - ok -> + case Send(Packet, #{version => Ver}) of + {ok, Data} -> trace(send, Packet), emqx_metrics:sent(Packet), emqx_metrics:trans(inc, 'bytes/sent', iolist_size(Data)), diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 0c88751fc..27d52ab00 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -143,12 +143,13 @@ websocket_init(#state{request = Req, options = Options}) -> idle_timeout = IdleTimout}}. send_fun(WsPid) -> - fun(Data) -> + fun(Packet, Options) -> + Data = emqx_frame:serialize(Packet, Options), BinSize = iolist_size(Data), emqx_pd:update_counter(send_cnt, 1), emqx_pd:update_counter(send_oct, BinSize), WsPid ! {binary, iolist_to_binary(Data)}, - ok + {ok, Data} end. stat_fun() -> @@ -305,4 +306,3 @@ shutdown(Reason, State) -> wsock_stats() -> [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS]. -