From 1d429dad8d64ad7cc8ff0b767270b16877f4406e Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 18 Sep 2019 20:01:22 +0800 Subject: [PATCH] Update the 'attrs/1' and 'handle_timeout/3' functions --- src/emqx_connection.erl | 16 ++++++++-------- src/emqx_ws_connection.erl | 12 ++++++------ 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 302a1b4c5..74ff9df82 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -102,9 +102,9 @@ start_link(Transport, Socket, Options) -> info(CPid) when is_pid(CPid) -> call(CPid, info); info(Conn = #connection{chan_state = ChanState}) -> - ConnInfo = info(?INFO_KEYS, Conn), ChanInfo = emqx_channel:info(ChanState), - maps:merge(ChanInfo, #{conninfo => maps:from_list(ConnInfo)}). + SockInfo = maps:from_list(info(?INFO_KEYS, Conn)), + maps:merge(ChanInfo, #{sockinfo => SockInfo}). info(Keys, Conn) when is_list(Keys) -> [{Key, info(Key, Conn)} || Key <- Keys]; @@ -133,9 +133,9 @@ limit_info(Limit) -> attrs(CPid) when is_pid(CPid) -> call(CPid, attrs); attrs(Conn = #connection{chan_state = ChanState}) -> - ConnAttrs = info(?ATTR_KEYS, Conn), ChanAttrs = emqx_channel:attrs(ChanState), - maps:merge(ChanAttrs, #{connection => maps:from_list(ConnAttrs)}). + SockAttrs = maps:from_list(info(?ATTR_KEYS, Conn)), + maps:merge(ChanAttrs, #{sockinfo => SockAttrs}). %% @doc Get stats of the channel. -spec(stats(pid()|connection()) -> emqx_types:stats()). @@ -219,7 +219,7 @@ idle(timeout, _Timeout, State) -> idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> #mqtt_packet_connect{proto_ver = ProtoVer, properties = Properties} = ConnPkt, - MaxPacketSize = emqx_mqtt_props:get_property('Maximum-Packet-Size', Properties, undefined), + MaxPacketSize = emqx_mqtt_props:get('Maximum-Packet-Size', Properties, undefined), NState = State#connection{serialize = serialize_fun(ProtoVer, MaxPacketSize)}, SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end, handle_incoming(Packet, SuccFun, NState); @@ -422,7 +422,7 @@ process_incoming(Data, Packets, State = #connection{parse_state = ParseState, catch error:Reason:Stk -> ?LOG(error, "Parse failed for ~p~nStacktrace:~p~nError data:~p", [Reason, Stk, Data]), - Result = + Result = case emqx_channel:info(connected, ChanState) of undefined -> emqx_channel:handle_out({connack, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState); @@ -508,7 +508,7 @@ serialize_fun(ProtoVer, MaxPacketSize) -> false -> ?LOG(warning, "DROP ~s due to oversize packet size", [emqx_packet:format(Packet)]), <<"">> - end + end end. %%-------------------------------------------------------------------- @@ -530,7 +530,7 @@ send(IoData, SuccFun, State = #connection{transport = Transport, %% Handle timeout handle_timeout(TRef, Msg, State = #connection{chan_state = ChanState}) -> - case emqx_channel:timeout(TRef, Msg, ChanState) of + case emqx_channel:handle_timeout(TRef, Msg, ChanState) of {ok, NChanState} -> keep_state(State#connection{chan_state = NChanState}); {ok, Packets, NChanState} -> diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index f9527634c..9e2f27a49 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -74,9 +74,9 @@ info(WsPid) when is_pid(WsPid) -> call(WsPid, info); info(WsConn = #ws_connection{chan_state = ChanState}) -> - ConnInfo = info(?INFO_KEYS, WsConn), ChanInfo = emqx_channel:info(ChanState), - maps:merge(ChanInfo, #{connection => maps:from_list(ConnInfo)}). + SockInfo = maps:from_list(info(?INFO_KEYS, WsConn)), + maps:merge(ChanInfo, #{sockinfo => SockInfo}). info(Keys, WsConn) when is_list(Keys) -> [{Key, info(Key, WsConn)} || Key <- Keys]; @@ -95,9 +95,9 @@ info(chan_state, #ws_connection{chan_state = ChanState}) -> attrs(WsPid) when is_pid(WsPid) -> call(WsPid, attrs); attrs(WsConn = #ws_connection{chan_state = ChanState}) -> - ConnAttrs = info(?ATTR_KEYS, WsConn), ChanAttrs = emqx_channel:attrs(ChanState), - maps:merge(ChanAttrs, #{conninfo => maps:from_list(ConnAttrs)}). + SockAttrs = maps:from_list(info(?ATTR_KEYS, WsConn)), + maps:merge(ChanAttrs, #{sockinfo => SockAttrs}). -spec(stats(pid()|ws_connection()) -> emqx_types:stats()). stats(WsPid) when is_pid(WsPid) -> @@ -257,7 +257,7 @@ websocket_info({cast, Msg}, State = #ws_connection{chan_state = ChanState}) -> websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #ws_connection{fsm_state = idle}) -> #mqtt_packet_connect{proto_ver = ProtoVer, properties = Properties} = ConnPkt, - MaxPacketSize = emqx_mqtt_props:get_property('Maximum-Packet-Size', Properties, undefined), + MaxPacketSize = emqx_mqtt_props:get('Maximum-Packet-Size', Properties, undefined), NState = State#ws_connection{serialize = serialize_fun(ProtoVer, MaxPacketSize)}, handle_incoming(Packet, fun connected/1, NState); @@ -322,7 +322,7 @@ connected(State = #ws_connection{chan_state = ChanState}) -> %% Handle timeout handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) -> - case emqx_channel:timeout(TRef, Msg, ChanState) of + case emqx_channel:handle_timeout(TRef, Msg, ChanState) of {ok, NChanState} -> {ok, State#ws_connection{chan_state = NChanState}}; {ok, Packets, NChanState} ->