From fe2a72c66417cb41dd5392c9b23be2bafb4baeca Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 29 Sep 2019 11:47:31 +0800 Subject: [PATCH] Add 'state' field to channel info --- src/emqx_channel.erl | 44 +++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index e741dd7cb..01e32410d 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -76,7 +76,7 @@ %% Timers timers :: #{atom() => disabled | maybe(reference())}, %% Fsm State - fsm_state :: fsm_state(), + state :: fsm_state(), %% GC State gc_state :: maybe(emqx_gc:gc_state()), %% Takeover @@ -94,7 +94,7 @@ | connected | disconnected, connected_at := pos_integer(), - disconnected := pos_integer() + disconnected_at := pos_integer() }). -define(TIMER_TABLE, #{ @@ -106,10 +106,10 @@ will_timer => will_message }). --define(ATTR_KEYS, [conninfo, clientinfo, session]). +-define(ATTR_KEYS, [conninfo, clientinfo, state, session]). --define(INFO_KEYS, ?ATTR_KEYS ++ [conninfo, clientinfo, session, keepalive, - will_msg, topic_aliases, alias_maximum, gc_state]). +-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases, + alias_maximum, gc_state]). %%-------------------------------------------------------------------- %% Info, Attrs and Caps @@ -129,6 +129,8 @@ info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> maybe_apply(fun emqx_session:info/1, Session); +info(state, #channel{state = State}) -> + State; info(keepalive, #channel{keepalive = Keepalive}) -> maybe_apply(fun emqx_keepalive:info/1, Keepalive); info(topic_aliases, #channel{topic_aliases = Aliases}) -> @@ -202,7 +204,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) -> clientinfo = ClientInfo, pub_stats = #{}, timers = #{stats_timer => StatsTimer}, - fsm_state = #{state_name => initialized}, + state = #{state_name => initialized}, gc_state = init_gc_state(Zone), takeover = false, resuming = false, @@ -227,7 +229,7 @@ init_gc_state(Zone) -> | {close, emqx_types:packet(), channel()} | {stop, Error :: term(), channel()} | {stop, Error :: term(), emqx_types:packet(), channel()}). -handle_in(?CONNECT_PACKET(_), Channel = #channel{fsm_state = #{state_name := connected}}) -> +handle_in(?CONNECT_PACKET(_), Channel = #channel{state = #{state_name := connected}}) -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> @@ -364,7 +366,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf handle_in(?AUTH_PACKET(), Channel) -> handle_out({disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, Channel); -handle_in({frame_error, Reason}, Channel = #channel{fsm_state = FsmState}) -> +handle_in({frame_error, Reason}, Channel = #channel{state = FsmState}) -> case FsmState of #{state_name := initialized} -> {stop, {shutdown, Reason}, Channel}; @@ -528,16 +530,16 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = %%TODO: RunFold or Pipeline handle_out({connack, ?RC_SUCCESS, SP, ConnPkt}, - Channel = #channel{conninfo = ConnInfo, + Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo, - fsm_state = FsmState}) -> + state = FsmState}) -> AckProps = run_fold([fun enrich_caps/2, fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2], #{}, Channel), FsmState1 = FsmState#{state_name => connected, connected_at => erlang:system_time(second) }, - Channel1 = Channel#channel{fsm_state = FsmState1, + Channel1 = Channel#channel{state = FsmState1, will_msg = emqx_packet:will_msg(ConnPkt), alias_maximum = init_alias_maximum(ConnPkt, ClientInfo) }, @@ -564,8 +566,8 @@ handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), {stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel}; -handle_out({deliver, Delivers}, Channel = #channel{fsm_state = #{state_name := disconnected}, - session = Session}) -> +handle_out({deliver, Delivers}, Channel = #channel{state = #{state_name := disconnected}, + session = Session}) -> NSession = emqx_session:enqueue(Delivers, Session), {ok, Channel#channel{session = NSession}}; @@ -669,10 +671,10 @@ handle_out({Type, Data}, Channel) -> handle_call(kick, Channel) -> {stop, {shutdown, kicked}, ok, Channel}; -handle_call(discard, Channel = #channel{fsm_state = #{state_name := connected}}) -> +handle_call(discard, Channel = #channel{state = #{state_name := connected}}) -> Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), {stop, {shutdown, discarded}, Packet, ok, Channel}; -handle_call(discard, Channel = #channel{fsm_state = #{state_name := disconnected}}) -> +handle_call(discard, Channel = #channel{state = #{state_name := disconnected}}) -> {stop, {shutdown, discarded}, ok, Channel}; %% Session Takeover @@ -718,7 +720,7 @@ handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := Client %%handle_info(disconnected, Channel = #channel{connected = undefined}) -> %% shutdown(closed, Channel); -handle_info(disconnected, Channel = #channel{fsm_state = #{state_name := disconnected}}) -> +handle_info(disconnected, Channel = #channel{state = #{state_name := disconnected}}) -> {ok, Channel}; handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval}, @@ -864,7 +866,7 @@ disconnect(_Reason, Channel) -> {ok, Channel}. %% Terminate %%-------------------------------------------------------------------- -terminate(_, #channel{fsm_state = #{state_name := initialized}}) -> +terminate(_, #channel{state = #{state_name := initialized}}) -> ok; terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]); @@ -1140,10 +1142,10 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5, inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, 0)}; init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined. -ensure_disconnected(Channel = #channel{fsm_state = FsmState}) -> - Channel#channel{fsm_state = FsmState#{state_name := disconnected, - disconnected_at => erlang:system_time(second) - }}. +ensure_disconnected(Channel = #channel{state = FsmState}) -> + Channel#channel{state = FsmState#{state_name := disconnected, + disconnected_at => erlang:system_time(second) + }}. ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) -> ensure_keepalive_timer(Interval, Channel);