Fix a bug that session terminates immediately when received DISCONNECT packet

This commit is contained in:
zhouzb 2019-08-23 16:08:26 +08:00
parent 970d243d94
commit 9c171f5d9c
1 changed files with 17 additions and 12 deletions

View File

@ -73,6 +73,8 @@
oom_policy :: emqx_oom:oom_policy(), oom_policy :: emqx_oom:oom_policy(),
%% Connected %% Connected
connected :: boolean(), connected :: boolean(),
%% Disonnected
disconnected :: boolean(),
%% Connected at %% Connected at
connected_at :: erlang:timestamp(), connected_at :: erlang:timestamp(),
disconnected_at :: erlang:timestamp(), disconnected_at :: erlang:timestamp(),
@ -123,16 +125,17 @@ init(ConnInfo, Options) ->
end, end,
GcState = emqx_gc:init(emqx_zone:get_env(Zone, force_gc_policy, false)), GcState = emqx_gc:init(emqx_zone:get_env(Zone, force_gc_policy, false)),
OomPolicy = emqx_oom:init(emqx_zone:get_env(Zone, force_shutdown_policy)), OomPolicy = emqx_oom:init(emqx_zone:get_env(Zone, force_shutdown_policy)),
#channel{client = Client, #channel{client = Client,
session = undefined, session = undefined,
protocol = undefined, protocol = undefined,
gc_state = GcState, gc_state = GcState,
oom_policy = OomPolicy, oom_policy = OomPolicy,
timers = #{stats_timer => StatsTimer}, timers = #{stats_timer => StatsTimer},
connected = false, connected = false,
takeover = false, disconnected = false,
resuming = false, takeover = false,
pendings = [] resuming = false,
pendings = []
}. }.
peer_cert_as_username(Options) -> peer_cert_as_username(Options) ->
@ -634,6 +637,8 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) ->
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
{ok, NChannel}; {ok, NChannel};
handle_info(sock_closed, Channel = #channel{disconnected = true}) ->
{ok, Channel};
handle_info(sock_closed, Channel = #channel{connected = false}) -> handle_info(sock_closed, Channel = #channel{connected = false}) ->
shutdown(closed, Channel); shutdown(closed, Channel);
handle_info(sock_closed, Channel = #channel{protocol = Protocol, handle_info(sock_closed, Channel = #channel{protocol = Protocol,
@ -1111,10 +1116,10 @@ enrich_assigned_clientid(AckProps, #channel{client = #{client_id := ClientId},
end. end.
ensure_connected(Channel) -> ensure_connected(Channel) ->
Channel#channel{connected = true, connected_at = os:timestamp()}. Channel#channel{connected = true, connected_at = os:timestamp(), disconnected = false}.
ensure_disconnected(Channel) -> ensure_disconnected(Channel) ->
Channel#channel{connected = false, disconnected_at = os:timestamp()}. Channel#channel{connected = false, disconnected_at = os:timestamp(), disconnected = true}.
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) -> ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) ->
ensure_keepalive_timer(Interval, Channel); ensure_keepalive_timer(Interval, Channel);