From a748e8f1d8a0ebcefcaccf736057fa0659a1be44 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 18 Oct 2018 13:24:06 +0800 Subject: [PATCH] Refactor send_fun in protocol and other connection module Prior to this change, in the send function, the packet is forced to use emqx:serialize to serialize packet, it is a wrong design because other plugins which need to transform the mqtt packet to other packets can not use their own serialize function to serialize packet. This change solve the problem issued above. --- src/emqx_connection.erl | 4 ++-- src/emqx_frame.erl | 3 +-- src/emqx_protocol.erl | 22 +++++++++++----------- src/emqx_ws_connection.erl | 4 ++-- 4 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ccb5f59fa..37665ac10 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -165,7 +165,8 @@ init_limiter({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). send_fun(Transport, Socket, Peername) -> - fun(Data) -> + fun(Serialize, Packet, Options) -> + Data = Serialize(Packet, Options), try Transport:async_send(Socket, Data) of ok -> ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}), @@ -408,4 +409,3 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> ok = emqx_gc:inc(1, Oct); maybe_gc(_, _) -> ok. - diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index aa7aad064..075f0a11e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -130,7 +130,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> WillQoS : 2, WillFlag : 1, CleanStart : 1, - 0 : 1, + 0 : 1, KeepAlive : 16/big, Rest2/binary>> = Rest1, @@ -634,4 +634,3 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. - diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 1fb80c0ce..93a1a8b64 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -407,13 +407,13 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), process_packet(?PACKET(?PINGREQ), PState) -> send(?PACKET(?PINGRESP), PState); -process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), +process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> case Interval =/= 0 andalso OldInterval =:= 0 of - true -> + true -> deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), {error, protocol_error, PState#pstate{will_msg = undefined}}; - false -> + false -> emqx_session:update_expiry_interval(SPid, Interval), %% Clean willmsg {stop, normal, PState#pstate{will_msg = undefined}} @@ -495,13 +495,13 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone, 'Subscription-Identifier-Available' => 1, 'Shared-Subscription-Available' => flag(Shared)}, - Props1 = if - MaxQoS =:= ?QOS_2 -> + Props1 = if + MaxQoS =:= ?QOS_2 -> Props; true -> maps:put('Maximum-QoS', MaxQoS, Props) end, - + Props2 = if IsAssigned -> Props1#{'Assigned-Client-Identifier' => ClientId}; true -> Props1 @@ -555,7 +555,7 @@ deliver({disconnect, _ReasonCode}, PState) -> -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> trace(send, Packet, PState), - case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of + case SendFun(fun emqx_frame:serialize/2, Packet, #{version => Ver}) of ok -> emqx_metrics:sent(Packet), {ok, inc_stats(send, Type, PState)}; @@ -601,17 +601,17 @@ set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn maps:put(max_inflight, if ProtoVer =:= ?MQTT_PROTO_V5 -> maps:get('Receive-Maximum', ConnProps, 65535); - true -> + true -> emqx_zone:get_env(Zone, max_inflight, 65535) end, SessAttrs); set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> maps:put(expiry_interval, if ProtoVer =:= ?MQTT_PROTO_V5 -> maps:get('Session-Expiry-Interval', ConnProps, 0); - true -> + true -> case CleanStart of true -> 0; - false -> + false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) end end, SessAttrs); @@ -619,7 +619,7 @@ set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVe maps:put(topic_alias_maximum, if ProtoVer =:= ?MQTT_PROTO_V5 -> maps:get('Topic-Alias-Maximum', ConnProps, 0); - true -> + true -> emqx_zone:get_env(Zone, max_topic_alias, 0) end, SessAttrs); set_session_attrs({_, #pstate{}}, SessAttrs) -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index fa08fa1bb..1907695af 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -144,7 +144,8 @@ websocket_init(#state{request = Req, options = Options}) -> idle_timeout = IdleTimout}}. send_fun(WsPid) -> - fun(Data) -> + fun(Serialize, Packet, Options) -> + Data = Serialize(Packet, Options), BinSize = iolist_size(Data), emqx_metrics:inc('bytes/sent', BinSize), put(send_oct, get(send_oct) + BinSize), @@ -299,4 +300,3 @@ stop(Error, State) -> wsock_stats() -> [{Key, get(Key)} || Key <- ?SOCK_STATS]. -