diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index ee7fb3eb9..20153f4a7 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -93,6 +93,7 @@ seqno/0, timestamp/0, topic_filter/0, + subscription_id/0, subscription/0, session/0, stream_state/0 @@ -105,7 +106,10 @@ -type id() :: binary(). -type topic_filter() :: emqx_types:topic(). +-type subscription_id() :: integer(). + -type subscription() :: #{ + id := subscription_id(), start_time := emqx_ds:time(), props := map(), extra := map() @@ -286,16 +290,19 @@ subscribe( %% router and iterator information can be reconstructed %% from this table, if needed. ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID), + {SubId, S1} = emqx_persistent_session_ds_state:new_subid(S0), Subscription = #{ start_time => now_ms(), - props => SubOpts + props => SubOpts, + id => SubId }, IsNew = true; Subscription0 = #{} -> Subscription = Subscription0#{props => SubOpts}, - IsNew = false + IsNew = false, + S1 = S0 end, - S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0), + S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S1), ?tp(persistent_session_ds_subscription_added, #{ topic_filter => TopicFilter, sub => Subscription, is_new => IsNew }), @@ -309,7 +316,7 @@ unsubscribe( ) -> %% TODO: drop streams and messages from the buffer case subs_lookup(TopicFilter, S0) of - #{props := SubOpts} -> + #{props := SubOpts, id := _SubId} -> S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], S0), ?tp_span( persistent_session_ds_subscription_route_delete, @@ -477,7 +484,7 @@ disconnect(Session = #{s := S0}, _ConnInfo) -> -spec terminate(Reason :: term(), session()) -> ok. terminate(_Reason, _Session = #{s := S}) -> - emqx_persistent_session_ds_state:commit(S), + _ = emqx_persistent_session_ds_state:commit(S), ok. %%-------------------------------------------------------------------- @@ -584,7 +591,9 @@ do_ensure_all_iterators_closed(_DSSessionID) -> %%-------------------------------------------------------------------- fill_buffer(Session = #{s := S}, ClientInfo) -> - fill_buffer(shuffle(find_new_streams(S)), Session, ClientInfo). + Streams = shuffle(find_new_streams(S)), + ?SLOG(error, #{msg => "fill_buffer", streams => Streams}), + fill_buffer(Streams, Session, ClientInfo). -spec shuffle([A]) -> [A]. shuffle(L0) -> @@ -827,82 +836,124 @@ find_new_streams(S) -> -spec renew_streams(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). renew_streams(S0) -> - CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0), - CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0), + S1 = remove_old_streams(S0), subs_fold( - fun(TopicFilterBin, _Subscription = #{start_time := StartTime}, S1) -> - SubId = [], + fun(TopicFilterBin, _Subscription = #{start_time := StartTime, id := SubId}, S2) -> TopicFilter = emqx_topic:words(TopicFilterBin), - TopicStreams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), - TopicStreamGroups = maps:groups_from_list(fun({{X, _}, _}) -> X end, TopicStreams), - %% Iterate over groups of streams with the same rank X, - %% finding the first eligible stream to replay: - maps:fold( - fun(RankX, Streams, S2) -> - Key = {RankX, SubId}, - case emqx_persistent_session_ds_state:get_stream(Key, S2) of - undefined -> - MinRankY = emqx_persistent_session_ds_state:get_rank(RankX, S2), - start_stream_replay( - TopicFilter, StartTime, Key, MinRankY, Streams, S2 - ); - Stream = #ifs{it_end = end_of_stream, rank_y = MinRankY} when - ?fully_replayed(Stream, CommQos1, CommQos2) - -> - %% We have fully replayed the stream with - %% the given rank X, and the client acked - %% all messages: - S3 = emqx_persistent_session_ds_state:del_stream(Key, S2), - S4 = emqx_persistent_session_ds_state:put_rank(RankX, MinRankY, S3), - start_stream_replay(TopicFilter, StartTime, Key, MinRankY, Streams, S4); - #ifs{} -> - %% Stream replay is currently in progress, leave it as is: - S2 - end + Streams = select_streams( + SubId, + emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartTime), + S2 + ), + lists:foldl( + fun(I, Acc) -> + ensure_iterator(TopicFilter, StartTime, SubId, I, Acc) end, - S1, - TopicStreamGroups + S2, + Streams ) end, - S0, - S0 + S1, + S1 ). -start_stream_replay(TopicFilter, StartTime, Key, MinRankY, Streams, S0) -> - case find_first_stream(MinRankY, Streams) of - {RankY, Stream} -> +ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> + Key = {SubId, Stream}, + case emqx_persistent_session_ds_state:get_stream(Key, S) of + undefined -> {ok, Iterator} = emqx_ds:make_iterator( ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime ), NewStreamState = #ifs{ + rank_x = RankX, rank_y = RankY, it_end = Iterator }, - emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S0); - undefined -> - S0 + emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); + #ifs{} -> + S end. -%% @doc Find the first stream with rank Y greater than the one given as the first argument. --spec find_first_stream(emqx_ds:rank_y() | undefined, [ - {emqx_ds:stream_rank(), emqx_ds:ds_specific_stream()} -]) -> - {emqx_ds:rank_y(), emqx_ds:ds_specific_stream()} | undefined. -find_first_stream(MinRankY, Streams) -> - lists:foldl( +select_streams(SubId, Streams0, S) -> + TopicStreamGroups = maps:groups_from_list(fun({{X, _}, _}) -> X end, Streams0), + maps:fold( + fun(RankX, Streams, Acc) -> + select_streams(SubId, RankX, Streams, S) ++ Acc + end, + [], + TopicStreamGroups + ). + +select_streams(SubId, RankX, Streams0, S) -> + %% 1. Find the streams with the rank Y greater than the recorded one: + Streams1 = + case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, S) of + undefined -> + Streams0; + ReplayedY -> + [I || I = {{_, Y}, _} <- Streams0, Y > ReplayedY] + end, + %% 2. Sort streams by rank Y: + Streams = lists:sort( + fun({{_, Y1}, _}, {{_, Y2}, _}) -> + Y1 =< Y2 + end, + Streams1 + ), + %% 3. Select streams with the least rank Y: + case Streams of + [] -> + []; + [{{_, MinRankY}, _} | _] -> + lists:takewhile(fun({{_, Y}, _}) -> Y =:= MinRankY end, Streams) + end. + +-spec remove_old_streams(emqx_persistent_session_ds_state:t()) -> + emqx_persistent_session_ds_state:t(). +remove_old_streams(S0) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0), + %% 1. For each subscription, find the X ranks that were fully replayed: + Groups = emqx_persistent_session_ds_state:fold_streams( + fun({SubId, _Stream}, StreamState = #ifs{rank_x = RankX, rank_y = RankY, it_end = It}, Acc) -> + Key = {SubId, RankX}, + IsComplete = + It =:= end_of_stream andalso ?fully_replayed(StreamState, CommQos1, CommQos2), + case {maps:get(Key, Acc, undefined), IsComplete} of + {undefined, true} -> + Acc#{Key => {true, RankY}}; + {_, false} -> + Acc#{Key => false}; + _ -> + Acc + end + end, + #{}, + S0 + ), + %% 2. Advance rank y for each fully replayed set of streams: + S1 = maps:fold( fun - ({{_RankX, RankY}, Stream}, Acc) when RankY > MinRankY; MinRankY =:= undefined -> - case Acc of - {AccY, _} when AccY < RankY -> - Acc; - _ -> - {RankY, Stream} - end; - (_, Acc) -> + (Key, {true, RankY}, Acc) -> + emqx_persistent_session_ds_state:put_rank(Key, RankY, Acc); + (_, _, Acc) -> Acc end, - undefined, - Streams + S0, + Groups + ), + %% 3. Remove the fully replayed streams: + emqx_persistent_session_ds_state:fold_streams( + fun(Key = {SubId, _Stream}, #ifs{rank_x = RankX, rank_y = RankY}, Acc) -> + case emqx_persistent_session_ds_state:get_rank({SubId, RankX}, Acc) of + MinRankY when RankY < MinRankY -> + emqx_persistent_session_ds_state:del_stream(Key, Acc); + _ -> + Acc + end + end, + S1, + S1 ). %%-------------------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 936b36841..4cb6eb596 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -27,6 +27,7 @@ %% State of the stream: -record(ifs, { + rank_x :: emqx_ds:rank_x(), rank_y :: emqx_ds:rank_y(), %% Iterator at the end of the last batch: it_end :: emqx_ds:iterator() | undefined | end_of_stream, diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 39fd7eeb7..d3dc70e2d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -26,10 +26,11 @@ -export([create_tables/0]). --export([open/1, create_new/1, delete/1, commit/1, print_session/1, list_sessions/0]). +-export([open/1, create_new/1, delete/1, commit/1, format/1, print_session/1, list_sessions/0]). -export([get_created_at/1, set_created_at/2]). -export([get_last_alive_at/1, set_last_alive_at/2]). -export([get_conninfo/1, set_conninfo/2]). +-export([new_subid/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). -export([get_seqno/2, put_seqno/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). @@ -38,7 +39,7 @@ %% internal exports: -export([]). --export_type([t/0, subscriptions/0, seqno_type/0, stream_key/0]). +-export_type([t/0, subscriptions/0, seqno_type/0, stream_key/0, rank_key/0]). -include("emqx_persistent_session_ds.hrl"). @@ -78,18 +79,18 @@ -define(created_at, created_at). -define(last_alive_at, last_alive_at). -define(conninfo, conninfo). +-define(last_subid, last_subid). -type metadata() :: #{ ?created_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(), - ?conninfo => emqx_types:conninfo() + ?conninfo => emqx_types:conninfo(), + ?last_subid => integer() }. -type seqno_type() :: term(). --type stream_key() :: {emqx_ds:rank_x(), _SubId}. - -opaque t() :: #{ id := emqx_persistent_session_ds:id(), dirty := boolean(), @@ -151,27 +152,31 @@ print_session(SessionId) -> case open(SessionId) of undefined -> undefined; - {ok, #{ - metadata := Metadata, - subscriptions := SubsGBT, - streams := Streams, - seqnos := Seqnos, - ranks := Ranks - }} -> - Subs = emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end, - #{}, - SubsGBT - ), - #{ - session => Metadata, - subscriptions => Subs, - streams => Streams#pmap.clean, - seqnos => Seqnos#pmap.clean, - ranks => Ranks#pmap.clean - } + {ok, Session} -> + format(Session) end. +-spec format(t()) -> map(). +format(#{ + metadata := Metadata, + subscriptions := SubsGBT, + streams := Streams, + seqnos := Seqnos, + ranks := Ranks +}) -> + Subs = emqx_topic_gbt:fold( + fun(Key, Sub, Acc) -> maps:put(Key, Sub, Acc) end, + #{}, + SubsGBT + ), + #{ + metadata => Metadata, + subscriptions => Subs, + streams => pmap_format(Streams), + seqnos => pmap_format(Seqnos), + ranks => pmap_format(Ranks) + }. + -spec list_sessions() -> [emqx_persistent_session_ds:id()]. list_sessions() -> mnesia:dirty_all_keys(?session_tab). @@ -248,52 +253,14 @@ get_conninfo(Rec) -> set_conninfo(Val, Rec) -> set_meta(?conninfo, Val, Rec). -%% - --spec get_stream(stream_key(), t()) -> - emqx_persistent_session_ds:stream_state() | undefined. -get_stream(Key, Rec) -> - gen_get(streams, Key, Rec). - --spec put_stream(stream_key(), emqx_persistent_session_ds:stream_state(), t()) -> t(). -put_stream(Key, Val, Rec) -> - gen_put(streams, Key, Val, Rec). - --spec del_stream(stream_key(), t()) -> t(). -del_stream(Key, Rec) -> - gen_del(stream, Key, Rec). - --spec fold_streams(fun(), Acc, t()) -> Acc. -fold_streams(Fun, Acc, Rec) -> - gen_fold(streams, Fun, Acc, Rec). - -%% - --spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined. -get_seqno(Key, Rec) -> - gen_get(seqnos, Key, Rec). - --spec put_seqno(seqno_type(), emqx_persistent_session_ds:seqno(), t()) -> t(). -put_seqno(Key, Val, Rec) -> - gen_put(seqnos, Key, Val, Rec). - -%% - --spec get_rank(term(), t()) -> integer() | undefined. -get_rank(Key, Rec) -> - gen_get(ranks, Key, Rec). - --spec put_rank(term(), integer(), t()) -> t(). -put_rank(Key, Val, Rec) -> - gen_put(ranks, Key, Val, Rec). - --spec del_rank(term(), t()) -> t(). -del_rank(Key, Rec) -> - gen_del(ranks, Key, Rec). - --spec fold_ranks(fun(), Acc, t()) -> Acc. -fold_ranks(Fun, Acc, Rec) -> - gen_fold(ranks, Fun, Acc, Rec). +-spec new_subid(t()) -> {emqx_persistent_session_ds:subscription_id(), t()}. +new_subid(Rec) -> + LastSubId = + case get_meta(?last_subid, Rec) of + undefined -> 0; + N when is_integer(N) -> N + end, + {LastSubId, set_meta(?last_subid, LastSubId + 1, Rec)}. %% @@ -322,6 +289,57 @@ del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0), Rec#{subscriptions => Subs}. +%% + +-type stream_key() :: {emqx_persistent_session_ds:subscription_id(), emqx_ds:stream()}. + +-spec get_stream(stream_key(), t()) -> + emqx_persistent_session_ds:stream_state() | undefined. +get_stream(Key, Rec) -> + gen_get(streams, Key, Rec). + +-spec put_stream(stream_key(), emqx_persistent_session_ds:stream_state(), t()) -> t(). +put_stream(Key, Val, Rec) -> + gen_put(streams, Key, Val, Rec). + +-spec del_stream(stream_key(), t()) -> t(). +del_stream(Key, Rec) -> + gen_del(streams, Key, Rec). + +-spec fold_streams(fun(), Acc, t()) -> Acc. +fold_streams(Fun, Acc, Rec) -> + gen_fold(streams, Fun, Acc, Rec). + +%% + +-spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined. +get_seqno(Key, Rec) -> + gen_get(seqnos, Key, Rec). + +-spec put_seqno(seqno_type(), emqx_persistent_session_ds:seqno(), t()) -> t(). +put_seqno(Key, Val, Rec) -> + gen_put(seqnos, Key, Val, Rec). + +%% + +-type rank_key() :: {emqx_persistent_session_ds:subscription_id(), emqx_ds:rank_x()}. + +-spec get_rank(rank_key(), t()) -> integer() | undefined. +get_rank(Key, Rec) -> + gen_get(ranks, Key, Rec). + +-spec put_rank(rank_key(), integer(), t()) -> t(). +put_rank(Key, Val, Rec) -> + gen_put(ranks, Key, Val, Rec). + +-spec del_rank(rank_key(), t()) -> t(). +del_rank(Key, Rec) -> + gen_del(ranks, Key, Rec). + +-spec fold_ranks(fun(), Acc, t()) -> Acc. +fold_ranks(Fun, Acc, Rec) -> + gen_fold(ranks, Fun, Acc, Rec). + %%================================================================================ %% Internal functions %%================================================================================ @@ -445,6 +463,10 @@ pmap_commit( clean = maps:merge(Clean, Dirty) }. +-spec pmap_format(pmap(_K, _V)) -> map(). +pmap_format(#pmap{clean = Clean, dirty = Dirty}) -> + maps:merge(Clean, Dirty). + %% Functions dealing with set tables: kv_persist(Tab, SessionId, Val0) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 6c9da71e0..2c6f0e46f 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -811,7 +811,8 @@ t_publish_many_while_client_is_gone(Config) -> Msgs1 = receive_messages(NPubs1), ct:pal("Msgs1 = ~p", [Msgs1]), NMsgs1 = length(Msgs1), - ?assertEqual(NPubs1, NMsgs1, debug_info(ClientId)), + NPubs1 =:= NMsgs1 orelse + throw_with_debug_info({NPubs1, '==', NMsgs1}, ClientId), ?assertEqual( get_topicwise_order(Pubs1), @@ -1086,11 +1087,15 @@ skip_ds_tc(Config) -> Config end. -fail_with_debug_info(Exception, ClientId) -> - case emqx_cm:lookup_channels(ClientId) of - [Chan] -> - sys:get_state(Chan, 1000); - [] -> - no_channel - end, - exit(Exception). +throw_with_debug_info(Error, ClientId) -> + Info = + case emqx_cm:lookup_channels(ClientId) of + [Pid] -> + #{channel := ChanState} = emqx_connection:get_state(Pid), + SessionState = emqx_channel:info(session_state, ChanState), + maps:update_with(s, fun emqx_persistent_session_ds_state:format/1, SessionState); + [] -> + no_channel + end, + ct:pal("!!! Assertion failed: ~p~nState:~n~p", [Error, Info]), + exit(Error).