Refactor send_fun in protocol and other connection module

Prior to this change, in the send function, the packet is forced to
use emqx:serialize to serialize packet, it is a wrong design because
other plugins which need to transform the mqtt packet to other packets
can not use their own serialize function to serialize packet.

This change solve the problem issued above.
This commit is contained in:
Gilbert Wong 2018-10-18 13:24:06 +08:00
parent 599121052a
commit a748e8f1d8
4 changed files with 16 additions and 17 deletions

View File

@ -165,7 +165,8 @@ init_limiter({Rate, Burst}) ->
esockd_rate_limit:new(Rate, Burst). esockd_rate_limit:new(Rate, Burst).
send_fun(Transport, Socket, Peername) -> send_fun(Transport, Socket, Peername) ->
fun(Data) -> fun(Serialize, Packet, Options) ->
Data = Serialize(Packet, Options),
try Transport:async_send(Socket, Data) of try Transport:async_send(Socket, Data) of
ok -> ok ->
?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}), ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}),
@ -408,4 +409,3 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) ->
ok = emqx_gc:inc(1, Oct); ok = emqx_gc:inc(1, Oct);
maybe_gc(_, _) -> maybe_gc(_, _) ->
ok. ok.

View File

@ -130,7 +130,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) ->
WillQoS : 2, WillQoS : 2,
WillFlag : 1, WillFlag : 1,
CleanStart : 1, CleanStart : 1,
0 : 1, 0 : 1,
KeepAlive : 16/big, KeepAlive : 16/big,
Rest2/binary>> = Rest1, Rest2/binary>> = Rest1,
@ -634,4 +634,3 @@ fixqos(?PUBREL, 0) -> 1;
fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1;
fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1;
fixqos(_Type, QoS) -> QoS. fixqos(_Type, QoS) -> QoS.

View File

@ -407,13 +407,13 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
process_packet(?PACKET(?PINGREQ), PState) -> process_packet(?PACKET(?PINGREQ), PState) ->
send(?PACKET(?PINGRESP), PState); send(?PACKET(?PINGRESP), PState);
process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}), process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := Interval}),
PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) -> PState = #pstate{session = SPid, conn_props = #{'Session-Expiry-Interval' := OldInterval}}) ->
case Interval =/= 0 andalso OldInterval =:= 0 of case Interval =/= 0 andalso OldInterval =:= 0 of
true -> true ->
deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState), deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
{error, protocol_error, PState#pstate{will_msg = undefined}}; {error, protocol_error, PState#pstate{will_msg = undefined}};
false -> false ->
emqx_session:update_expiry_interval(SPid, Interval), emqx_session:update_expiry_interval(SPid, Interval),
%% Clean willmsg %% Clean willmsg
{stop, normal, PState#pstate{will_msg = undefined}} {stop, normal, PState#pstate{will_msg = undefined}}
@ -495,13 +495,13 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
'Subscription-Identifier-Available' => 1, 'Subscription-Identifier-Available' => 1,
'Shared-Subscription-Available' => flag(Shared)}, 'Shared-Subscription-Available' => flag(Shared)},
Props1 = if Props1 = if
MaxQoS =:= ?QOS_2 -> MaxQoS =:= ?QOS_2 ->
Props; Props;
true -> true ->
maps:put('Maximum-QoS', MaxQoS, Props) maps:put('Maximum-QoS', MaxQoS, Props)
end, end,
Props2 = if IsAssigned -> Props2 = if IsAssigned ->
Props1#{'Assigned-Client-Identifier' => ClientId}; Props1#{'Assigned-Client-Identifier' => ClientId};
true -> Props1 true -> Props1
@ -555,7 +555,7 @@ deliver({disconnect, _ReasonCode}, PState) ->
-spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
trace(send, Packet, PState), trace(send, Packet, PState),
case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of case SendFun(fun emqx_frame:serialize/2, Packet, #{version => Ver}) of
ok -> ok ->
emqx_metrics:sent(Packet), emqx_metrics:sent(Packet),
{ok, inc_stats(send, Type, PState)}; {ok, inc_stats(send, Type, PState)};
@ -601,17 +601,17 @@ set_session_attrs({max_inflight, #pstate{zone = Zone, proto_ver = ProtoVer, conn
maps:put(max_inflight, if maps:put(max_inflight, if
ProtoVer =:= ?MQTT_PROTO_V5 -> ProtoVer =:= ?MQTT_PROTO_V5 ->
maps:get('Receive-Maximum', ConnProps, 65535); maps:get('Receive-Maximum', ConnProps, 65535);
true -> true ->
emqx_zone:get_env(Zone, max_inflight, 65535) emqx_zone:get_env(Zone, max_inflight, 65535)
end, SessAttrs); end, SessAttrs);
set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) -> set_session_attrs({expiry_interval, #pstate{zone = Zone, proto_ver = ProtoVer, conn_props = ConnProps, clean_start = CleanStart}}, SessAttrs) ->
maps:put(expiry_interval, if maps:put(expiry_interval, if
ProtoVer =:= ?MQTT_PROTO_V5 -> ProtoVer =:= ?MQTT_PROTO_V5 ->
maps:get('Session-Expiry-Interval', ConnProps, 0); maps:get('Session-Expiry-Interval', ConnProps, 0);
true -> true ->
case CleanStart of case CleanStart of
true -> 0; true -> 0;
false -> false ->
emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff) emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
end end
end, SessAttrs); end, SessAttrs);
@ -619,7 +619,7 @@ set_session_attrs({topic_alias_maximum, #pstate{zone = Zone, proto_ver = ProtoVe
maps:put(topic_alias_maximum, if maps:put(topic_alias_maximum, if
ProtoVer =:= ?MQTT_PROTO_V5 -> ProtoVer =:= ?MQTT_PROTO_V5 ->
maps:get('Topic-Alias-Maximum', ConnProps, 0); maps:get('Topic-Alias-Maximum', ConnProps, 0);
true -> true ->
emqx_zone:get_env(Zone, max_topic_alias, 0) emqx_zone:get_env(Zone, max_topic_alias, 0)
end, SessAttrs); end, SessAttrs);
set_session_attrs({_, #pstate{}}, SessAttrs) -> set_session_attrs({_, #pstate{}}, SessAttrs) ->

View File

@ -144,7 +144,8 @@ websocket_init(#state{request = Req, options = Options}) ->
idle_timeout = IdleTimout}}. idle_timeout = IdleTimout}}.
send_fun(WsPid) -> send_fun(WsPid) ->
fun(Data) -> fun(Serialize, Packet, Options) ->
Data = Serialize(Packet, Options),
BinSize = iolist_size(Data), BinSize = iolist_size(Data),
emqx_metrics:inc('bytes/sent', BinSize), emqx_metrics:inc('bytes/sent', BinSize),
put(send_oct, get(send_oct) + BinSize), put(send_oct, get(send_oct) + BinSize),
@ -299,4 +300,3 @@ stop(Error, State) ->
wsock_stats() -> wsock_stats() ->
[{Key, get(Key)} || Key <- ?SOCK_STATS]. [{Key, get(Key)} || Key <- ?SOCK_STATS].