Update the 'attrs/1' and 'handle_timeout/3' functions
This commit is contained in:
parent
981afd38e3
commit
1d429dad8d
|
@ -102,9 +102,9 @@ start_link(Transport, Socket, Options) ->
|
|||
info(CPid) when is_pid(CPid) ->
|
||||
call(CPid, info);
|
||||
info(Conn = #connection{chan_state = ChanState}) ->
|
||||
ConnInfo = info(?INFO_KEYS, Conn),
|
||||
ChanInfo = emqx_channel:info(ChanState),
|
||||
maps:merge(ChanInfo, #{conninfo => maps:from_list(ConnInfo)}).
|
||||
SockInfo = maps:from_list(info(?INFO_KEYS, Conn)),
|
||||
maps:merge(ChanInfo, #{sockinfo => SockInfo}).
|
||||
|
||||
info(Keys, Conn) when is_list(Keys) ->
|
||||
[{Key, info(Key, Conn)} || Key <- Keys];
|
||||
|
@ -133,9 +133,9 @@ limit_info(Limit) ->
|
|||
attrs(CPid) when is_pid(CPid) ->
|
||||
call(CPid, attrs);
|
||||
attrs(Conn = #connection{chan_state = ChanState}) ->
|
||||
ConnAttrs = info(?ATTR_KEYS, Conn),
|
||||
ChanAttrs = emqx_channel:attrs(ChanState),
|
||||
maps:merge(ChanAttrs, #{connection => maps:from_list(ConnAttrs)}).
|
||||
SockAttrs = maps:from_list(info(?ATTR_KEYS, Conn)),
|
||||
maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
|
||||
|
||||
%% @doc Get stats of the channel.
|
||||
-spec(stats(pid()|connection()) -> emqx_types:stats()).
|
||||
|
@ -219,7 +219,7 @@ idle(timeout, _Timeout, State) ->
|
|||
|
||||
idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
|
||||
#mqtt_packet_connect{proto_ver = ProtoVer, properties = Properties} = ConnPkt,
|
||||
MaxPacketSize = emqx_mqtt_props:get_property('Maximum-Packet-Size', Properties, undefined),
|
||||
MaxPacketSize = emqx_mqtt_props:get('Maximum-Packet-Size', Properties, undefined),
|
||||
NState = State#connection{serialize = serialize_fun(ProtoVer, MaxPacketSize)},
|
||||
SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end,
|
||||
handle_incoming(Packet, SuccFun, NState);
|
||||
|
@ -422,7 +422,7 @@ process_incoming(Data, Packets, State = #connection{parse_state = ParseState,
|
|||
catch
|
||||
error:Reason:Stk ->
|
||||
?LOG(error, "Parse failed for ~p~nStacktrace:~p~nError data:~p", [Reason, Stk, Data]),
|
||||
Result =
|
||||
Result =
|
||||
case emqx_channel:info(connected, ChanState) of
|
||||
undefined ->
|
||||
emqx_channel:handle_out({connack, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState);
|
||||
|
@ -508,7 +508,7 @@ serialize_fun(ProtoVer, MaxPacketSize) ->
|
|||
false ->
|
||||
?LOG(warning, "DROP ~s due to oversize packet size", [emqx_packet:format(Packet)]),
|
||||
<<"">>
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -530,7 +530,7 @@ send(IoData, SuccFun, State = #connection{transport = Transport,
|
|||
%% Handle timeout
|
||||
|
||||
handle_timeout(TRef, Msg, State = #connection{chan_state = ChanState}) ->
|
||||
case emqx_channel:timeout(TRef, Msg, ChanState) of
|
||||
case emqx_channel:handle_timeout(TRef, Msg, ChanState) of
|
||||
{ok, NChanState} ->
|
||||
keep_state(State#connection{chan_state = NChanState});
|
||||
{ok, Packets, NChanState} ->
|
||||
|
|
|
@ -74,9 +74,9 @@
|
|||
info(WsPid) when is_pid(WsPid) ->
|
||||
call(WsPid, info);
|
||||
info(WsConn = #ws_connection{chan_state = ChanState}) ->
|
||||
ConnInfo = info(?INFO_KEYS, WsConn),
|
||||
ChanInfo = emqx_channel:info(ChanState),
|
||||
maps:merge(ChanInfo, #{connection => maps:from_list(ConnInfo)}).
|
||||
SockInfo = maps:from_list(info(?INFO_KEYS, WsConn)),
|
||||
maps:merge(ChanInfo, #{sockinfo => SockInfo}).
|
||||
|
||||
info(Keys, WsConn) when is_list(Keys) ->
|
||||
[{Key, info(Key, WsConn)} || Key <- Keys];
|
||||
|
@ -95,9 +95,9 @@ info(chan_state, #ws_connection{chan_state = ChanState}) ->
|
|||
attrs(WsPid) when is_pid(WsPid) ->
|
||||
call(WsPid, attrs);
|
||||
attrs(WsConn = #ws_connection{chan_state = ChanState}) ->
|
||||
ConnAttrs = info(?ATTR_KEYS, WsConn),
|
||||
ChanAttrs = emqx_channel:attrs(ChanState),
|
||||
maps:merge(ChanAttrs, #{conninfo => maps:from_list(ConnAttrs)}).
|
||||
SockAttrs = maps:from_list(info(?ATTR_KEYS, WsConn)),
|
||||
maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
|
||||
|
||||
-spec(stats(pid()|ws_connection()) -> emqx_types:stats()).
|
||||
stats(WsPid) when is_pid(WsPid) ->
|
||||
|
@ -257,7 +257,7 @@ websocket_info({cast, Msg}, State = #ws_connection{chan_state = ChanState}) ->
|
|||
websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
|
||||
State = #ws_connection{fsm_state = idle}) ->
|
||||
#mqtt_packet_connect{proto_ver = ProtoVer, properties = Properties} = ConnPkt,
|
||||
MaxPacketSize = emqx_mqtt_props:get_property('Maximum-Packet-Size', Properties, undefined),
|
||||
MaxPacketSize = emqx_mqtt_props:get('Maximum-Packet-Size', Properties, undefined),
|
||||
NState = State#ws_connection{serialize = serialize_fun(ProtoVer, MaxPacketSize)},
|
||||
handle_incoming(Packet, fun connected/1, NState);
|
||||
|
||||
|
@ -322,7 +322,7 @@ connected(State = #ws_connection{chan_state = ChanState}) ->
|
|||
%% Handle timeout
|
||||
|
||||
handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) ->
|
||||
case emqx_channel:timeout(TRef, Msg, ChanState) of
|
||||
case emqx_channel:handle_timeout(TRef, Msg, ChanState) of
|
||||
{ok, NChanState} ->
|
||||
{ok, State#ws_connection{chan_state = NChanState}};
|
||||
{ok, Packets, NChanState} ->
|
||||
|
|
Loading…
Reference in New Issue