diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ccb5f59fa..37665ac10 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -165,7 +165,8 @@ init_limiter({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). send_fun(Transport, Socket, Peername) -> - fun(Data) -> + fun(Serialize, Packet, Options) -> + Data = Serialize(Packet, Options), try Transport:async_send(Socket, Data) of ok -> ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}), @@ -408,4 +409,3 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> ok = emqx_gc:inc(1, Oct); maybe_gc(_, _) -> ok. - diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index aa7aad064..075f0a11e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -130,7 +130,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> WillQoS : 2, WillFlag : 1, CleanStart : 1, - 0 : 1, + 0 : 1, KeepAlive : 16/big, Rest2/binary>> = Rest1, @@ -634,4 +634,3 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 1fb80c0ce..93a1a8b64 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -407,13 +407,13 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), +process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> case Interval =/= 0 andalso OldInterval =:= 0 of - true -> + true -> deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), {error, protocol_error, PState#pstate{will_msg = undefined}}; - false -> + false -> emqx_session:update_expiry_interval(SPid, Interval), %% Clean willmsg {stop, normal, PState#pstate{will_msg = undefined}} @@ -495,13 +495,13 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, 'Subscription-Identifier-Available' => 1, 'Shared-Subscription-Available' => flag(Shared)}, - Props1 = if - MaxQoS =:= ?QOS_2 -> + Props1 = if + MaxQoS =:= ?QOS_2 -> Props; true -> maps:put('Maximum-QoS', MaxQoS, Props) end, - + Props2 = if IsAssigned -> Props1#{'Assigned-Client-Identifier' => ClientId}; true -> Props1 @@ -555,7 +555,7 @@ deliver({disconnect, _ReasonCode}, PState) -> -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> trace(send, Packet, PState), - case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of + case SendFun(fun emqx_frame:serialize/2, Packet, #{version => Ver}) of ok -> emqx_metrics:sent(Packet), {ok, inc_stats(send, Type, PState)}; @@ -601,17 +601,17 @@ set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn maps:put(max_inflight, if ProtoVer =:= ?MQTT_PROTO_V5 -> maps:get('Receive-Maximum', ConnProps, 65535); - true -> + true -> emqx_zone:get_env(Zone, max_inflight, 65535) end, SessAttrs); set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> maps:put(expiry_interval, if ProtoVer =:= ?MQTT_PROTO_V5 -> maps:get('Session-Expiry-Interval', ConnProps, 0); - true -> + true -> case CleanStart of true -> 0; - false -> + false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) end end, SessAttrs); @@ -619,7 +619,7 @@ set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVe maps:put(topic_alias_maximum, if ProtoVer =:= ?MQTT_PROTO_V5 -> maps:get('Topic-Alias-Maximum', ConnProps, 0); - true -> + true -> emqx_zone:get_env(Zone, max_topic_alias, 0) end, SessAttrs); set_session_attrs({_, #pstate{}}, SessAttrs) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index fa08fa1bb..1907695af 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -144,7 +144,8 @@ websocket_init(#state{request = Req, options = Options}) -> idle_timeout = IdleTimout}}. send_fun(WsPid) -> - fun(Data) -> + fun(Serialize, Packet, Options) -> + Data = Serialize(Packet, Options), BinSize = iolist_size(Data), emqx_metrics:inc('bytes/sent', BinSize), put(send_oct, get(send_oct) + BinSize), @@ -299,4 +300,3 @@ stop(Error, State) -> wsock_stats() -> [{Key, get(Key)} || Key <- ?SOCK_STATS]. -