diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index eb4eb0b1b..76b54e34a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -70,6 +70,8 @@ do_ensure_all_iterators_closed/1 ]). +-export([print_session/1]). + -ifdef(TEST). -export([ session_open/1, @@ -142,13 +144,19 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> %% somehow isolate those idling not-yet-expired sessions into a separate process %% space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), - case session_open(ClientID) of - Session0 = #{} -> - ensure_timers(), - ReceiveMaximum = receive_maximum(ConnInfo), - Session = Session0#{receive_maximum => ReceiveMaximum}, - {true, Session, []}; + case maps:get(clean_start, ConnInfo, false) of false -> + case session_open(ClientID) of + Session0 = #{} -> + ensure_timers(), + ReceiveMaximum = receive_maximum(ConnInfo), + Session = Session0#{receive_maximum => ReceiveMaximum}, + {true, Session, []}; + false -> + false + end; + true -> + session_drop(ClientID), false end. @@ -220,6 +228,25 @@ info(await_rel_timeout, #{props := Conf}) -> stats(Session) -> info(?STATS_KEYS, Session). +%% Debug/troubleshooting +-spec print_session(emqx_types:client_id()) -> map() | undefined. +print_session(ClientId) -> + catch ro_transaction( + fun() -> + case mnesia:read(?SESSION_TAB, ClientId) of + [Session] -> + #{ + session => Session, + streams => mnesia:read(?SESSION_STREAM_TAB, ClientId), + pubranges => session_read_pubranges(ClientId), + subscriptions => session_read_subscriptions(ClientId) + }; + [] -> + undefined + end + end + ). + %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE %%-------------------------------------------------------------------- @@ -557,7 +584,7 @@ session_drop(DSSessionId) -> -spec session_drop_subscriptions(id()) -> ok. session_drop_subscriptions(DSSessionId) -> - Subscriptions = session_read_subscriptions(DSSessionId), + Subscriptions = session_read_subscriptions(DSSessionId, write), lists:foreach( fun(#ds_sub{id = DSSubId} = DSSub) -> TopicFilter = subscription_id_to_topic_filter(DSSubId), @@ -620,13 +647,27 @@ session_del_subscription(DSSessionId, TopicFilter) -> session_del_subscription(#ds_sub{id = DSSubId}) -> mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write). -session_read_subscriptions(DSSessionId) -> +session_read_subscriptions(DSSessionID) -> + session_read_subscriptions(DSSessionID, read). + +session_read_subscriptions(DSSessionId, LockKind) -> MS = ets:fun2ms( fun(Sub = #ds_sub{id = {Sess, _}}) when Sess =:= DSSessionId -> Sub end ), - mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read). + mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, LockKind). + +session_read_pubranges(DSSessionID) -> + session_read_pubranges(DSSessionID, read). + +session_read_pubranges(DSSessionId, LockKind) -> + MS = ets:fun2ms( + fun(#ds_pubrange{id = {Sess, First}}) when Sess =:= DSSessionId -> + {DSSessionId, First} + end + ), + mnesia:select(?SESSION_PUBRANGE_TAB, MS, LockKind). -spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}. new_subscription_id(DSSessionId, TopicFilter) -> @@ -729,12 +770,7 @@ session_drop_streams(DSSessionId) -> %% must be called inside a transaction -spec session_drop_pubranges(id()) -> ok. session_drop_pubranges(DSSessionId) -> - MS = ets:fun2ms( - fun(#ds_pubrange{id = {DSSessionId0, First}}) when DSSessionId0 =:= DSSessionId -> - {DSSessionId, First} - end - ), - RangeIds = mnesia:select(?SESSION_PUBRANGE_TAB, MS, write), + RangeIds = session_read_pubranges(DSSessionId, write), lists:foreach( fun(RangeId) -> mnesia:delete(?SESSION_PUBRANGE_TAB, RangeId, write) diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 89a785590..0a6538282 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -242,7 +242,9 @@ with_redispatch_to(Msg, Group, Topic) -> is_redispatch_needed(#message{qos = ?QOS_0}) -> false; is_redispatch_needed(#message{headers = #{redispatch_to := ?REDISPATCH_TO(_, _)}}) -> - true. + true; +is_redispatch_needed(#message{}) -> + false. %% @doc Redispatch shared deliveries to other members in the group. redispatch(Messages0) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index f3af45fe0..1be929c7f 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -745,9 +745,6 @@ t_publish_while_client_is_gone(Config) -> ok = emqtt:disconnect(Client2). -%% TODO: don't skip after QoS2 support is added to DS. -t_clean_start_drops_subscriptions(init, Config) -> skip_ds_tc(Config); -t_clean_start_drops_subscriptions('end', _Config) -> ok. t_clean_start_drops_subscriptions(Config) -> %% 1. A persistent session is started and disconnected. %% 2. While disconnected, a message is published and persisted. @@ -773,13 +770,13 @@ t_clean_start_drops_subscriptions(Config) -> | Config ]), {ok, _} = emqtt:ConnFun(Client1), - {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), + {ok, _, [1]} = emqtt:subscribe(Client1, STopic, qos1), ok = emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), %% 2. - ok = publish(Topic, Payload1), + ok = publish(Topic, Payload1, ?QOS_1), %% 3. {ok, Client2} = emqtt:start_link([ @@ -791,9 +788,10 @@ t_clean_start_drops_subscriptions(Config) -> ]), {ok, _} = emqtt:ConnFun(Client2), ?assertEqual(0, client_info(session_present, Client2)), - {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2), + {ok, _, [1]} = emqtt:subscribe(Client2, STopic, qos1), - ok = publish(Topic, Payload2), + timer:sleep(100), + ok = publish(Topic, Payload2, ?QOS_1), [Msg1] = receive_messages(1), ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)), @@ -810,7 +808,7 @@ t_clean_start_drops_subscriptions(Config) -> ]), {ok, _} = emqtt:ConnFun(Client3), - ok = publish(Topic, Payload3), + ok = publish(Topic, Payload3, ?QOS_1), [Msg2] = receive_messages(1), ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)),