Add 'state' field to channel info
This commit is contained in:
parent
e8491b69a9
commit
fe2a72c664
|
@ -76,7 +76,7 @@
|
||||||
%% Timers
|
%% Timers
|
||||||
timers :: #{atom() => disabled | maybe(reference())},
|
timers :: #{atom() => disabled | maybe(reference())},
|
||||||
%% Fsm State
|
%% Fsm State
|
||||||
fsm_state :: fsm_state(),
|
state :: fsm_state(),
|
||||||
%% GC State
|
%% GC State
|
||||||
gc_state :: maybe(emqx_gc:gc_state()),
|
gc_state :: maybe(emqx_gc:gc_state()),
|
||||||
%% Takeover
|
%% Takeover
|
||||||
|
@ -94,7 +94,7 @@
|
||||||
| connected
|
| connected
|
||||||
| disconnected,
|
| disconnected,
|
||||||
connected_at := pos_integer(),
|
connected_at := pos_integer(),
|
||||||
disconnected := pos_integer()
|
disconnected_at := pos_integer()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(TIMER_TABLE, #{
|
-define(TIMER_TABLE, #{
|
||||||
|
@ -106,10 +106,10 @@
|
||||||
will_timer => will_message
|
will_timer => will_message
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(ATTR_KEYS, [conninfo, clientinfo, session]).
|
-define(ATTR_KEYS, [conninfo, clientinfo, state, session]).
|
||||||
|
|
||||||
-define(INFO_KEYS, ?ATTR_KEYS ++ [conninfo, clientinfo, session, keepalive,
|
-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, will_msg, topic_aliases,
|
||||||
will_msg, topic_aliases, alias_maximum, gc_state]).
|
alias_maximum, gc_state]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Info, Attrs and Caps
|
%% Info, Attrs and Caps
|
||||||
|
@ -129,6 +129,8 @@ info(clientinfo, #channel{clientinfo = ClientInfo}) ->
|
||||||
ClientInfo;
|
ClientInfo;
|
||||||
info(session, #channel{session = Session}) ->
|
info(session, #channel{session = Session}) ->
|
||||||
maybe_apply(fun emqx_session:info/1, Session);
|
maybe_apply(fun emqx_session:info/1, Session);
|
||||||
|
info(state, #channel{state = State}) ->
|
||||||
|
State;
|
||||||
info(keepalive, #channel{keepalive = Keepalive}) ->
|
info(keepalive, #channel{keepalive = Keepalive}) ->
|
||||||
maybe_apply(fun emqx_keepalive:info/1, Keepalive);
|
maybe_apply(fun emqx_keepalive:info/1, Keepalive);
|
||||||
info(topic_aliases, #channel{topic_aliases = Aliases}) ->
|
info(topic_aliases, #channel{topic_aliases = Aliases}) ->
|
||||||
|
@ -202,7 +204,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}}, Options) ->
|
||||||
clientinfo = ClientInfo,
|
clientinfo = ClientInfo,
|
||||||
pub_stats = #{},
|
pub_stats = #{},
|
||||||
timers = #{stats_timer => StatsTimer},
|
timers = #{stats_timer => StatsTimer},
|
||||||
fsm_state = #{state_name => initialized},
|
state = #{state_name => initialized},
|
||||||
gc_state = init_gc_state(Zone),
|
gc_state = init_gc_state(Zone),
|
||||||
takeover = false,
|
takeover = false,
|
||||||
resuming = false,
|
resuming = false,
|
||||||
|
@ -227,7 +229,7 @@ init_gc_state(Zone) ->
|
||||||
| {close, emqx_types:packet(), channel()}
|
| {close, emqx_types:packet(), channel()}
|
||||||
| {stop, Error :: term(), channel()}
|
| {stop, Error :: term(), channel()}
|
||||||
| {stop, Error :: term(), emqx_types:packet(), channel()}).
|
| {stop, Error :: term(), emqx_types:packet(), channel()}).
|
||||||
handle_in(?CONNECT_PACKET(_), Channel = #channel{fsm_state = #{state_name := connected}}) ->
|
handle_in(?CONNECT_PACKET(_), Channel = #channel{state = #{state_name := connected}}) ->
|
||||||
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
|
handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel);
|
||||||
|
|
||||||
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||||
|
@ -364,7 +366,7 @@ handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninf
|
||||||
handle_in(?AUTH_PACKET(), Channel) ->
|
handle_in(?AUTH_PACKET(), Channel) ->
|
||||||
handle_out({disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, Channel);
|
handle_out({disconnect, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}, Channel);
|
||||||
|
|
||||||
handle_in({frame_error, Reason}, Channel = #channel{fsm_state = FsmState}) ->
|
handle_in({frame_error, Reason}, Channel = #channel{state = FsmState}) ->
|
||||||
case FsmState of
|
case FsmState of
|
||||||
#{state_name := initialized} ->
|
#{state_name := initialized} ->
|
||||||
{stop, {shutdown, Reason}, Channel};
|
{stop, {shutdown, Reason}, Channel};
|
||||||
|
@ -528,16 +530,16 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
|
||||||
|
|
||||||
%%TODO: RunFold or Pipeline
|
%%TODO: RunFold or Pipeline
|
||||||
handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
|
handle_out({connack, ?RC_SUCCESS, SP, ConnPkt},
|
||||||
Channel = #channel{conninfo = ConnInfo,
|
Channel = #channel{conninfo = ConnInfo,
|
||||||
clientinfo = ClientInfo,
|
clientinfo = ClientInfo,
|
||||||
fsm_state = FsmState}) ->
|
state = FsmState}) ->
|
||||||
AckProps = run_fold([fun enrich_caps/2,
|
AckProps = run_fold([fun enrich_caps/2,
|
||||||
fun enrich_server_keepalive/2,
|
fun enrich_server_keepalive/2,
|
||||||
fun enrich_assigned_clientid/2], #{}, Channel),
|
fun enrich_assigned_clientid/2], #{}, Channel),
|
||||||
FsmState1 = FsmState#{state_name => connected,
|
FsmState1 = FsmState#{state_name => connected,
|
||||||
connected_at => erlang:system_time(second)
|
connected_at => erlang:system_time(second)
|
||||||
},
|
},
|
||||||
Channel1 = Channel#channel{fsm_state = FsmState1,
|
Channel1 = Channel#channel{state = FsmState1,
|
||||||
will_msg = emqx_packet:will_msg(ConnPkt),
|
will_msg = emqx_packet:will_msg(ConnPkt),
|
||||||
alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
|
alias_maximum = init_alias_maximum(ConnPkt, ClientInfo)
|
||||||
},
|
},
|
||||||
|
@ -564,8 +566,8 @@ handle_out({connack, ReasonCode, _ConnPkt}, Channel = #channel{conninfo = ConnIn
|
||||||
Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
|
Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer),
|
||||||
{stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel};
|
{stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel};
|
||||||
|
|
||||||
handle_out({deliver, Delivers}, Channel = #channel{fsm_state = #{state_name := disconnected},
|
handle_out({deliver, Delivers}, Channel = #channel{state = #{state_name := disconnected},
|
||||||
session = Session}) ->
|
session = Session}) ->
|
||||||
NSession = emqx_session:enqueue(Delivers, Session),
|
NSession = emqx_session:enqueue(Delivers, Session),
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, Channel#channel{session = NSession}};
|
||||||
|
|
||||||
|
@ -669,10 +671,10 @@ handle_out({Type, Data}, Channel) ->
|
||||||
handle_call(kick, Channel) ->
|
handle_call(kick, Channel) ->
|
||||||
{stop, {shutdown, kicked}, ok, Channel};
|
{stop, {shutdown, kicked}, ok, Channel};
|
||||||
|
|
||||||
handle_call(discard, Channel = #channel{fsm_state = #{state_name := connected}}) ->
|
handle_call(discard, Channel = #channel{state = #{state_name := connected}}) ->
|
||||||
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
|
Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER),
|
||||||
{stop, {shutdown, discarded}, Packet, ok, Channel};
|
{stop, {shutdown, discarded}, Packet, ok, Channel};
|
||||||
handle_call(discard, Channel = #channel{fsm_state = #{state_name := disconnected}}) ->
|
handle_call(discard, Channel = #channel{state = #{state_name := disconnected}}) ->
|
||||||
{stop, {shutdown, discarded}, ok, Channel};
|
{stop, {shutdown, discarded}, ok, Channel};
|
||||||
|
|
||||||
%% Session Takeover
|
%% Session Takeover
|
||||||
|
@ -718,7 +720,7 @@ handle_info({register, Attrs, Stats}, #channel{clientinfo = #{clientid := Client
|
||||||
%%handle_info(disconnected, Channel = #channel{connected = undefined}) ->
|
%%handle_info(disconnected, Channel = #channel{connected = undefined}) ->
|
||||||
%% shutdown(closed, Channel);
|
%% shutdown(closed, Channel);
|
||||||
|
|
||||||
handle_info(disconnected, Channel = #channel{fsm_state = #{state_name := disconnected}}) ->
|
handle_info(disconnected, Channel = #channel{state = #{state_name := disconnected}}) ->
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
|
||||||
handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval},
|
handle_info(disconnected, Channel = #channel{conninfo = #{expiry_interval := ExpiryInterval},
|
||||||
|
@ -864,7 +866,7 @@ disconnect(_Reason, Channel) -> {ok, Channel}.
|
||||||
%% Terminate
|
%% Terminate
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
terminate(_, #channel{fsm_state = #{state_name := initialized}}) ->
|
terminate(_, #channel{state = #{state_name := initialized}}) ->
|
||||||
ok;
|
ok;
|
||||||
terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
|
terminate(normal, #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) ->
|
||||||
ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
|
ok = emqx_hooks:run('client.disconnected', [ClientInfo, normal, ConnInfo]);
|
||||||
|
@ -1140,10 +1142,10 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
|
||||||
inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, 0)};
|
inbound => emqx_mqtt_caps:get_caps(Zone, max_topic_alias, 0)};
|
||||||
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
|
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
|
||||||
|
|
||||||
ensure_disconnected(Channel = #channel{fsm_state = FsmState}) ->
|
ensure_disconnected(Channel = #channel{state = FsmState}) ->
|
||||||
Channel#channel{fsm_state = FsmState#{state_name := disconnected,
|
Channel#channel{state = FsmState#{state_name := disconnected,
|
||||||
disconnected_at => erlang:system_time(second)
|
disconnected_at => erlang:system_time(second)
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
|
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
|
||||||
ensure_keepalive_timer(Interval, Channel);
|
ensure_keepalive_timer(Interval, Channel);
|
||||||
|
|
Loading…
Reference in New Issue