Refactor send_fun

This commit is contained in:
Gilbert Wong 2018-10-22 09:04:03 +08:00
parent d10edfe025
commit 35460d8227
4 changed files with 6 additions and 7 deletions

View File

@ -36,7 +36,7 @@ EUNIT_OPTS = verbose
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
CT_SUITES = emqx emqx_zone emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \
emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_mod emqx_mqtt_caps \
emqx_mqtt_compat emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm \
emqx_mountpoint emqx_listeners emqx_protocol emqx_pool emqx_shared_sub
@ -138,4 +138,3 @@ dep-vsn-check:
{[], []} -> halt(0); \
{Rebar, Mk} -> erlang:error({deps_version_discrepancy, [{rebar, Rebar}, {mk, Mk}]}) \
end."

View File

@ -165,8 +165,8 @@ init_limiter({Rate, Burst}) ->
esockd_rate_limit:new(Rate, Burst).
send_fun(Transport, Socket, Peername) ->
fun(Serialize, Packet, Options) ->
Data = Serialize(Packet, Options),
fun(Packet, Options) ->
Data = emqx_frame:serialize(Packet, Options),
try Transport:async_send(Socket, Data) of
ok ->
?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}),

View File

@ -555,7 +555,7 @@ deliver({disconnect, _ReasonCode}, PState) ->
-spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
trace(send, Packet, PState),
case SendFun(fun emqx_frame:serialize/2, Packet, #{version => Ver}) of
case SendFun(Packet, #{version => Ver}) of
ok ->
emqx_metrics:sent(Packet),
{ok, inc_stats(send, Type, PState)};

View File

@ -144,8 +144,8 @@ websocket_init(#state{request = Req, options = Options}) ->
idle_timeout = IdleTimout}}.
send_fun(WsPid) ->
fun(Serialize, Packet, Options) ->
Data = Serialize(Packet, Options),
fun(Packet, Options) ->
Data = emqx_frame:serialize(Packet, Options),
BinSize = iolist_size(Data),
emqx_metrics:inc('bytes/sent', BinSize),
put(send_oct, get(send_oct) + BinSize),