diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ccb5f59fa..6c3d4c7e6 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -165,7 +165,8 @@ init_limiter({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). send_fun(Transport, Socket, Peername) -> - fun(Data) -> + fun(Packet, Options) -> + Data = emqx_frame:serialize(Packet, Options), try Transport:async_send(Socket, Data) of ok -> ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}), @@ -408,4 +409,3 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> ok = emqx_gc:inc(1, Oct); maybe_gc(_, _) -> ok. - diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index aa7aad064..075f0a11e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -130,7 +130,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> WillQoS : 2, WillFlag : 1, CleanStart : 1, - 0 : 1, + 0 : 1, KeepAlive : 16/big, Rest2/binary>> = Rest1, @@ -634,4 +634,3 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. - diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 03c42510c..cf4a555ca 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -62,9 +62,11 @@ proc_stats(Pid) -> -define(DISABLED, 0). +init_proc_mng_policy(undefined) -> ok; init_proc_mng_policy(Zone) -> - #{max_heap_size := MaxHeapSizeInBytes} = ShutdownPolicy = - emqx_zone:get_env(Zone, force_shutdown_policy), + #{max_heap_size := MaxHeapSizeInBytes} + = ShutdownPolicy + = emqx_zone:get_env(Zone, force_shutdown_policy), MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize), _ = erlang:process_flag(max_heap_size, MaxHeapSize), % zero is discarded erlang:put(force_shutdown_policy, ShutdownPolicy), @@ -106,4 +108,3 @@ is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED. proc_info(Key) -> {Key, Value} = erlang:process_info(self(), Key), Value. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 2e4ff7d77..51951ccda 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -586,13 +586,10 @@ deliver({disconnect, _ReasonCode}, PState) -> -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> trace(send, Packet, PState), - case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of + case SendFun(Packet, #{version => Ver}) of ok -> emqx_metrics:sent(Packet), {ok, inc_stats(send, Type, PState)}; - {binary, _Data} -> - emqx_metrics:sent(Packet), - {ok, inc_stats(send, Type, PState)}; {error, Reason} -> {error, Reason} end. diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index fa08fa1bb..7ff0b55a8 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -144,12 +144,14 @@ websocket_init(#state{request = Req, options = Options}) -> idle_timeout = IdleTimout}}. send_fun(WsPid) -> - fun(Data) -> + fun(Packet, Options) -> + Data = emqx_frame:serialize(Packet, Options), BinSize = iolist_size(Data), emqx_metrics:inc('bytes/sent', BinSize), put(send_oct, get(send_oct) + BinSize), put(send_cnt, get(send_cnt) + 1), - WsPid ! {binary, iolist_to_binary(Data)} + WsPid ! {binary, iolist_to_binary(Data)}, + ok end. stat_fun() -> @@ -299,4 +301,3 @@ stop(Error, State) -> wsock_stats() -> [{Key, get(Key)} || Key <- ?SOCK_STATS]. -