feat(ds): Discard session when client connects with CleanStart=1
This commit is contained in:
parent
449bafc27e
commit
8dfcb69e52
|
@ -142,13 +142,19 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
|
|||
%% somehow isolate those idling not-yet-expired sessions into a separate process
|
||||
%% space, and move this call back into `emqx_cm` where it belongs.
|
||||
ok = emqx_cm:discard_session(ClientID),
|
||||
case session_open(ClientID) of
|
||||
Session0 = #{} ->
|
||||
ensure_timers(),
|
||||
ReceiveMaximum = receive_maximum(ConnInfo),
|
||||
Session = Session0#{receive_maximum => ReceiveMaximum},
|
||||
{true, Session, []};
|
||||
case maps:get(clean_start, ConnInfo, false) of
|
||||
false ->
|
||||
case session_open(ClientID) of
|
||||
Session0 = #{} ->
|
||||
ensure_timers(),
|
||||
ReceiveMaximum = receive_maximum(ConnInfo),
|
||||
Session = Session0#{receive_maximum => ReceiveMaximum},
|
||||
{true, Session, []};
|
||||
false ->
|
||||
false
|
||||
end;
|
||||
true ->
|
||||
session_drop(ClientID),
|
||||
false
|
||||
end.
|
||||
|
||||
|
|
|
@ -242,7 +242,9 @@ with_redispatch_to(Msg, Group, Topic) ->
|
|||
is_redispatch_needed(#message{qos = ?QOS_0}) ->
|
||||
false;
|
||||
is_redispatch_needed(#message{headers = #{redispatch_to := ?REDISPATCH_TO(_, _)}}) ->
|
||||
true.
|
||||
true;
|
||||
is_redispatch_needed(#message{}) ->
|
||||
false.
|
||||
|
||||
%% @doc Redispatch shared deliveries to other members in the group.
|
||||
redispatch(Messages0) ->
|
||||
|
|
|
@ -745,9 +745,6 @@ t_publish_while_client_is_gone(Config) ->
|
|||
|
||||
ok = emqtt:disconnect(Client2).
|
||||
|
||||
%% TODO: don't skip after QoS2 support is added to DS.
|
||||
t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config);
|
||||
t_clean_start_drops_subscriptions('end', _Config) -> ok.
|
||||
t_clean_start_drops_subscriptions(Config) ->
|
||||
%% 1. A persistent session is started and disconnected.
|
||||
%% 2. While disconnected, a message is published and persisted.
|
||||
|
@ -773,13 +770,13 @@ t_clean_start_drops_subscriptions(Config) ->
|
|||
| Config
|
||||
]),
|
||||
{ok, _} = emqtt:ConnFun(Client1),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2),
|
||||
{ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1),
|
||||
|
||||
ok = emqtt:disconnect(Client1),
|
||||
maybe_kill_connection_process(ClientId, Config),
|
||||
|
||||
%% 2.
|
||||
ok = publish(Topic, Payload1),
|
||||
ok = publish(Topic, Payload1, ?QOS_1),
|
||||
|
||||
%% 3.
|
||||
{ok, Client2} = emqtt:start_link([
|
||||
|
@ -791,9 +788,10 @@ t_clean_start_drops_subscriptions(Config) ->
|
|||
]),
|
||||
{ok, _} = emqtt:ConnFun(Client2),
|
||||
?assertEqual(0, client_info(session_present, Client2)),
|
||||
{ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2),
|
||||
{ok, _, [1]} = emqtt:subscribe(Client2, STopic, qos1),
|
||||
|
||||
ok = publish(Topic, Payload2),
|
||||
timer:sleep(100),
|
||||
ok = publish(Topic, Payload2, ?QOS_1),
|
||||
[Msg1] = receive_messages(1),
|
||||
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)),
|
||||
|
||||
|
@ -810,7 +808,7 @@ t_clean_start_drops_subscriptions(Config) ->
|
|||
]),
|
||||
{ok, _} = emqtt:ConnFun(Client3),
|
||||
|
||||
ok = publish(Topic, Payload3),
|
||||
ok = publish(Topic, Payload3, ?QOS_1),
|
||||
[Msg2] = receive_messages(1),
|
||||
?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),
|
||||
|
||||
|
|
Loading…
Reference in New Issue