From b30ddc206e9244050764f4bff32151b07b83ea00 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 13 Apr 2024 01:17:32 +0200 Subject: [PATCH] fix(sessds): Immutable subscriptions This commit fixes two issues: - Behavior of overlapping subscriptions has been aligned with the in-memory session. - Fixed handling of replays when subscription changes (either by client or EMQX configuration) --- apps/emqx/src/emqx_persistent_session_ds.erl | 166 +++++++------- apps/emqx/src/emqx_persistent_session_ds.hrl | 4 +- .../src/emqx_persistent_session_ds_state.erl | 205 ++++++++++-------- ...persistent_session_ds_stream_scheduler.erl | 46 +++- .../src/emqx_persistent_session_ds_subs.erl | 183 ++++++++++------ apps/emqx/src/emqx_session.erl | 5 + ...emqx_persistent_session_ds_state_tests.erl | 64 +----- 7 files changed, 370 insertions(+), 303 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 4517fa1b7..0829b3fd3 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -116,15 +116,42 @@ %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). -type id() :: binary(). --type topic_filter() :: emqx_types:topic(). +-type topic_filter() :: emqx_types:topic() | #share{}. + +%% Subscription and subscription states: +%% +%% Persistent sessions cannot simply update or delete subscriptions, +%% since subscription parameters must be exactly the same during +%% replay. +%% +%% To solve this problem, we store subscriptions in a twofold manner: +%% +%% - `subscription' is an object that holds up-to-date information +%% about the client's subscription and a reference to the latest +%% subscription state id +%% +%% - `subscription_state' is an immutable object that holds +%% information about the subcription parameters at a certain point of +%% time +%% +%% New subscription states are created whenever the client subscribes +%% to a topics, or updates an existing subscription. +%% +%% Stream replay states contain references to the subscription states. +%% +%% Outdated subscription states are discarded when they are not +%% referenced by either subscription or stream replay state objects. -type subscription_id() :: integer(). +%% This type is a result of merging +%% `emqx_persistent_session_ds_subs:subscription()' with its current +%% state. -type subscription() :: #{ id := subscription_id(), start_time := emqx_ds:time(), - props := map(), - deleted := boolean() + current_state := emqx_persistent_session_ds_subs:subscription_state_id(), + subopts := map() }. -define(TIMER_PULL, timer_pull). @@ -252,7 +279,7 @@ info(is_persistent, #{}) -> info(subscriptions, #{s := S}) -> emqx_persistent_session_ds_subs:to_map(S); info(subscriptions_cnt, #{s := S}) -> - emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S)); + emqx_persistent_session_ds_state:n_subscriptions(S); info(subscriptions_max, #{props := Conf}) -> maps:get(max_subscriptions, Conf); info(upgrade_qos, #{props := Conf}) -> @@ -340,53 +367,20 @@ subscribe( subscribe( TopicFilter, SubOpts, - Session = #{id := ID, s := S0} + Session = #{id := ID, s := S0, props := #{upgrade_qos := UpgradeQoS}} ) -> - case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of - undefined -> - %% TODO: max subscriptions - - %% N.B.: we chose to update the router before adding the - %% subscription to the session/iterator table. The - %% reasoning for this is as follows: - %% - %% Messages matching this topic filter should start to be - %% persisted as soon as possible to avoid missing - %% messages. If this is the first such persistent session - %% subscription, it's important to do so early on. - %% - %% This could, in turn, lead to some inconsistency: if - %% such a route gets created but the session/iterator data - %% fails to be updated accordingly, we have a dangling - %% route. To remove such dangling routes, we may have a - %% periodic GC process that removes routes that do not - %% have a matching persistent subscription. Also, route - %% operations use dirty mnesia operations, which - %% inherently have room for inconsistencies. - %% - %% In practice, we use the iterator reference table as a - %% source of truth, since it is guarded by a transaction - %% context: we consider a subscription operation to be - %% successful if it ended up changing this table. Both - %% router and iterator information can be reconstructed - %% from this table, if needed. - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID), - {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), - Subscription = #{ - start_time => now_ms(), - props => SubOpts, - id => SubId, - deleted => false - }, - IsNew = true; - Subscription0 = #{} -> - Subscription = Subscription0#{props => SubOpts}, - IsNew = false, - S1 = S0 + {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe( + TopicFilter, UpgradeQoS, SubOpts, S0 + ), + case UpdateRouter of + true -> + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID); + false -> + ok end, - S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1), + S = emqx_persistent_session_ds_state:commit(S1), ?tp(persistent_session_ds_subscription_added, #{ - topic_filter => TopicFilter, sub => Subscription, is_new => IsNew + topic_filter => TopicFilter, is_new => UpdateRouter }), {ok, Session#{s => S}}. @@ -399,15 +393,15 @@ unsubscribe( case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}; - Subscription = #{props := SubOpts} -> + Subscription = #{subopts := SubOpts} -> S = do_unsubscribe(ID, TopicFilter, Subscription, S0), {ok, Session#{s => S}, SubOpts} end. -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). -do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) -> - S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0), +do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) -> + S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, S0), ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, topic_filter => TopicFilter }), @@ -426,7 +420,7 @@ get_subscription(#share{}, _) -> undefined; get_subscription(TopicFilter, #{s := S}) -> case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of - _Subscription = #{props := SubOpts} -> + #{subopts := SubOpts} -> SubOpts; undefined -> undefined @@ -716,7 +710,7 @@ list_client_subscriptions(ClientId) -> %% TODO: this is not the most optimal implementation, since it %% should be possible to avoid reading extra data (streams, etc.) case print_session(ClientId) of - Sess = #{s := #{subscriptions := Subs}} -> + Sess = #{s := #{subscriptions := Subs, subscription_states := SStates}} -> Node = case Sess of #{'_alive' := {true, Pid}} -> @@ -726,8 +720,9 @@ list_client_subscriptions(ClientId) -> end, SubList = maps:fold( - fun(Topic, #{props := SubProps}, Acc) -> - Elem = {Topic, SubProps}, + fun(Topic, #{current_state := CS}, Acc) -> + #{subopts := SubOpts} = maps:get(CS, SStates), + Elem = {Topic, SubOpts}, [Elem | Acc] end, [], @@ -945,22 +940,31 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> Session0 end. -enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) -> +enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) -> #srs{ it_begin = ItBegin0, it_end = ItEnd0, first_seqno_qos1 = FirstSeqnoQos1, - first_seqno_qos2 = FirstSeqnoQos2 + first_seqno_qos2 = FirstSeqnoQos2, + sub_state_id = SubStateId } = Srs0, ItBegin = case IsReplay of true -> ItBegin0; false -> ItEnd0 end, + SubState = #{} = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S), case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of {ok, ItEnd, Messages} -> {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch( - IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0 + IsReplay, + Session, + SubState, + ClientInfo, + FirstSeqnoQos1, + FirstSeqnoQos2, + Messages, + Inflight0 ), Srs = Srs0#srs{ it_begin = ItBegin, @@ -984,27 +988,29 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, Cli %% key_of_iter(#{3 := #{3 := #{5 := K}}}) -> %% K. -process_batch(_IsReplay, _Session, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight) -> +process_batch( + _IsReplay, _Session, _SubState, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight +) -> {Inflight, LastSeqNoQos1, LastSeqNoQos2}; process_batch( - IsReplay, Session, ClientInfo, FirstSeqNoQos1, FirstSeqNoQos2, [KV | Messages], Inflight0 + IsReplay, + Session, + SubState, + ClientInfo, + FirstSeqNoQos1, + FirstSeqNoQos2, + [KV | Messages], + Inflight0 ) -> - #{s := S, props := #{upgrade_qos := UpgradeQoS}} = Session, - {_DsMsgKey, Msg0 = #message{topic = Topic}} = KV, + #{s := S} = Session, + #{upgrade_qos := UpgradeQoS, subopts := SubOpts} = SubState, + {_DsMsgKey, Msg0} = KV, Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S), Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S), Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S), - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - Msgs = [ - Msg - || SubMatch <- emqx_topic_gbt:matches(Topic, Subs, []), - Msg <- begin - #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs), - emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS) - end - ], + Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS), {Inflight, LastSeqNoQos1, LastSeqNoQos2} = lists:foldl( fun(Msg = #message{qos = Qos}, {Acc, SeqNoQos10, SeqNoQos20}) -> case Qos of @@ -1060,7 +1066,7 @@ process_batch( Msgs ), process_batch( - IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight + IsReplay, Session, SubState, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight ). %%-------------------------------------------------------------------- @@ -1077,15 +1083,13 @@ enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos : %% queued messages. Since streams in this DB are exclusive to the %% session, messages from the queue can be dropped as soon as they %% are acked. - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - Msgs = [ - Msg - || SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []), - Msg <- begin - #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs), - emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS) - end - ], + case emqx_persistent_session_ds_state:get_subscription(Msg0#message.topic, S) of + #{current_state := CS} -> + #{subopts := SubOpts} = emqx_persistent_session_ds_state:get_subscription_state(CS, S); + undefined -> + SubOpts = undefined + end, + Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS), lists:foldl(fun do_enqueue_transient/2, Session, Msgs). do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 56862dfa5..e2b52e36d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -65,7 +65,9 @@ last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(), %% This stream belongs to an unsubscribed topic-filter, and is %% marked for deletion: - unsubscribed = false :: boolean() + unsubscribed = false :: boolean(), + %% Reference to the subscription state: + sub_state_id :: emqx_persistent_session_ds_subs:subscription_state_id() }). %% Session metadata keys: diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index fc2da1317..90d86bb1d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -37,7 +37,19 @@ -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]). -export([get_seqno/2, put_seqno/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). --export([get_subscriptions/1, put_subscription/4, del_subscription/3]). +-export([ + get_subscription_state/2, + fold_subscription_states/3, + put_subscription_state/3, + del_subscription_state/2 +]). +-export([ + get_subscription/2, + fold_subscriptions/3, + n_subscriptions/1, + put_subscription/3, + del_subscription/2 +]). -export([ get_awaiting_rel/2, put_awaiting_rel/3, @@ -51,7 +63,6 @@ -export_type([ t/0, metadata/0, - subscriptions/0, seqno_type/0, stream_key/0, rank_key/0, @@ -69,8 +80,6 @@ -type message() :: emqx_types:message(). --type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()). - -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. %% Generic key-value wrapper that is used for exporting arbitrary @@ -121,7 +130,13 @@ id := emqx_persistent_session_ds:id(), dirty := boolean(), metadata := metadata(), - subscriptions := subscriptions(), + subscriptions := pmap( + emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_subs:subscription() + ), + subscription_states := pmap( + emqx_persistent_session_ds_subs:subscription_state_id(), + emqx_persistent_session_ds_subs:subscription_state() + ), seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), ranks := pmap(term(), integer()), @@ -130,11 +145,20 @@ -define(session_tab, emqx_ds_session_tab). -define(subscription_tab, emqx_ds_session_subscriptions). +-define(subscription_states_tab, emqx_ds_session_subscription_states). -define(stream_tab, emqx_ds_session_streams). -define(seqno_tab, emqx_ds_session_seqnos). -define(rank_tab, emqx_ds_session_ranks). -define(awaiting_rel_tab, emqx_ds_session_awaiting_rel). --define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab, ?awaiting_rel_tab]). + +-define(pmaps, [ + {subscriptions, ?subscription_tab}, + {subscription_states, ?subscription_states_tab}, + {streams, ?stream_tab}, + {seqnos, ?seqno_tab}, + {ranks, ?rank_tab}, + {awaiting_rel, ?awaiting_rel_tab} +]). %% Enable this flag if you suspect some code breaks the sequence: -ifndef(CHECK_SEQNO). @@ -161,24 +185,25 @@ create_tables() -> {attributes, record_info(fields, kv)} ] ), - [create_kv_pmap_table(Table) || Table <- ?pmap_tables], - mria:wait_for_tables([?session_tab | ?pmap_tables]). + {_, PmapTables} = lists:unzip(?pmaps), + [create_kv_pmap_table(Table) || Table <- PmapTables], + mria:wait_for_tables([?session_tab | PmapTables]). -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined. open(SessionId) -> ro_transaction(fun() -> case kv_restore(?session_tab, SessionId) of [Metadata] -> - Rec = #{ - id => SessionId, - metadata => Metadata, - subscriptions => read_subscriptions(SessionId), - streams => pmap_open(?stream_tab, SessionId), - seqnos => pmap_open(?seqno_tab, SessionId), - ranks => pmap_open(?rank_tab, SessionId), - awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId), - ?unset_dirty - }, + Rec = update_pmaps( + fun(_Pmap, Table) -> + pmap_open(Table, SessionId) + end, + #{ + id => SessionId, + metadata => Metadata, + ?unset_dirty + } + ), {ok, Rec}; [] -> undefined @@ -195,29 +220,13 @@ print_session(SessionId) -> end. -spec format(t()) -> map(). -format(#{ - metadata := Metadata, - subscriptions := SubsGBT, - streams := Streams, - seqnos := Seqnos, - ranks := Ranks, - awaiting_rel := AwaitingRel -}) -> - Subs = emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> - maps:put(emqx_topic_gbt:get_topic(Key), Sub, Acc) +format(Rec) -> + update_pmaps( + fun(Pmap, _Table) -> + pmap_format(Pmap) end, - #{}, - SubsGBT - ), - #{ - metadata => Metadata, - subscriptions => Subs, - streams => pmap_format(Streams), - seqnos => pmap_format(Seqnos), - ranks => pmap_format(Ranks), - awaiting_rel => pmap_format(AwaitingRel) - }. + maps:without([id, dirty], Rec) + ). -spec list_sessions() -> [emqx_persistent_session_ds:id()]. list_sessions() -> @@ -227,7 +236,7 @@ list_sessions() -> delete(Id) -> transaction( fun() -> - [kv_pmap_delete(Table, Id) || Table <- ?pmap_tables], + [kv_pmap_delete(Table, Id) || {_, Table} <- ?pmaps], mnesia:delete(?session_tab, Id, write) end ). @@ -238,39 +247,34 @@ commit(Rec = #{dirty := false}) -> commit( Rec = #{ id := SessionId, - metadata := Metadata, - streams := Streams, - seqnos := SeqNos, - ranks := Ranks, - awaiting_rel := AwaitingRel + metadata := Metadata } ) -> check_sequence(Rec), transaction(fun() -> kv_persist(?session_tab, SessionId, Metadata), - Rec#{ - streams => pmap_commit(SessionId, Streams), - seqnos => pmap_commit(SessionId, SeqNos), - ranks => pmap_commit(SessionId, Ranks), - awaiting_rel => pmap_commit(SessionId, AwaitingRel), - ?unset_dirty - } + update_pmaps( + fun(Pmap, _Table) -> + pmap_commit(SessionId, Pmap) + end, + Rec#{?unset_dirty} + ) end). -spec create_new(emqx_persistent_session_ds:id()) -> t(). create_new(SessionId) -> transaction(fun() -> delete(SessionId), - #{ - id => SessionId, - metadata => #{}, - subscriptions => emqx_topic_gbt:new(), - streams => pmap_open(?stream_tab, SessionId), - seqnos => pmap_open(?seqno_tab, SessionId), - ranks => pmap_open(?rank_tab, SessionId), - awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId), - ?set_dirty - } + update_pmaps( + fun(_Pmap, Table) -> + pmap_open(Table, SessionId) + end, + #{ + id => SessionId, + metadata => #{}, + ?set_dirty + } + ) end). %% @@ -351,30 +355,53 @@ new_id(Rec) -> %% --spec get_subscriptions(t()) -> subscriptions(). -get_subscriptions(#{subscriptions := Subs}) -> - Subs. +-spec get_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> + emqx_persistent_session_ds_subs:subscription() | undefined. +get_subscription(TopicFilter, Rec) -> + gen_get(subscriptions, TopicFilter, Rec). + +-spec fold_subscriptions(fun(), Acc, t()) -> Acc. +fold_subscriptions(Fun, Acc, Rec) -> + gen_fold(subscriptions, Fun, Acc, Rec). + +-spec n_subscriptions(t()) -> non_neg_integer(). +n_subscriptions(Rec) -> + gen_size(subscriptions, Rec). -spec put_subscription( emqx_persistent_session_ds:topic_filter(), - _SubId, - emqx_persistent_session_ds:subscription(), + emqx_persistent_session_ds_subs:subscription(), t() ) -> t(). -put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) -> - %% Note: currently changes to the subscriptions are persisted immediately. - Key = {TopicFilter, SubId}, - transaction(fun() -> kv_pmap_persist(?subscription_tab, Id, Key, Subscription) end), - Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0), - Rec#{subscriptions => Subs}. +put_subscription(TopicFilter, Subscription, Rec) -> + gen_put(subscriptions, TopicFilter, Subscription, Rec). --spec del_subscription(emqx_persistent_session_ds:topic_filter(), _SubId, t()) -> t(). -del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) -> - %% Note: currently the subscriptions are persisted immediately. - Key = {TopicFilter, SubId}, - transaction(fun() -> kv_pmap_delete(?subscription_tab, Id, Key) end), - Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0), - Rec#{subscriptions => Subs}. +-spec del_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> t(). +del_subscription(TopicFilter, Rec) -> + gen_del(subscriptions, TopicFilter, Rec). + +%% + +-spec get_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> + emqx_persistent_session_ds_subs:subscription_state() | undefined. +get_subscription_state(SStateId, Rec) -> + gen_get(subscription_states, SStateId, Rec). + +-spec fold_subscription_states(fun(), Acc, t()) -> Acc. +fold_subscription_states(Fun, Acc, Rec) -> + gen_fold(subscription_states, Fun, Acc, Rec). + +-spec put_subscription_state( + emqx_persistent_session_ds_subs:subscription_state_id(), + emqx_persistent_session_ds_subs:subscription_state(), + t() +) -> t(). +put_subscription_state(SStateId, SState, Rec) -> + gen_put(subscription_states, SStateId, SState, Rec). + +-spec del_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> t(). +del_subscription_state(SStateId, Rec) -> + gen_del(subscription_states, SStateId, Rec). %% @@ -522,16 +549,16 @@ gen_size(Field, Rec) -> check_sequence(Rec), pmap_size(maps:get(Field, Rec)). -%% - -read_subscriptions(SessionId) -> - Records = kv_pmap_restore(?subscription_tab, SessionId), +-spec update_pmaps(fun((pmap(_K, _V) | undefined, atom()) -> term()), map()) -> map(). +update_pmaps(Fun, Map) -> lists:foldl( - fun({{TopicFilter, SubId}, Subscription}, Acc) -> - emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Acc) + fun({MapKey, Table}, Acc) -> + OldVal = maps:get(MapKey, Map, undefined), + Val = Fun(OldVal, Table), + maps:put(MapKey, Val, Acc) end, - emqx_topic_gbt:new(), - Records + Map, + ?pmaps ). %% diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 154f59b44..1be0bdf4a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -126,9 +126,10 @@ find_new_streams(S) -> renew_streams(S0) -> S1 = remove_unsubscribed_streams(S0), S2 = remove_fully_replayed_streams(S1), + S3 = update_stream_subscription_state_ids(S2), emqx_persistent_session_ds_subs:fold( fun - (Key, #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> + (Key, #{start_time := StartTime, id := SubId, current_state := SStateId}, Acc) -> TopicFilter = emqx_topic:words(Key), Streams = select_streams( SubId, @@ -137,7 +138,7 @@ renew_streams(S0) -> ), lists:foldl( fun(I, Acc1) -> - ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) + ensure_iterator(TopicFilter, StartTime, SubId, SStateId, I, Acc1) end, Acc, Streams @@ -145,8 +146,8 @@ renew_streams(S0) -> (_Key, _DeletedSubscription, Acc) -> Acc end, - S2, - S2 + S3, + S3 ). -spec on_unsubscribe( @@ -201,7 +202,7 @@ is_fully_acked(Srs, S) -> %% Internal functions %%================================================================================ -ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> +ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream}, S) -> Key = {SubId, Stream}, case emqx_persistent_session_ds_state:get_stream(Key, S) of undefined -> @@ -214,7 +215,8 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> rank_x = RankX, rank_y = RankY, it_begin = Iterator, - it_end = Iterator + it_end = Iterator, + sub_state_id = SStateId }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); {error, recoverable, Reason} -> @@ -350,6 +352,38 @@ remove_fully_replayed_streams(S0) -> S1 ). +%% @doc Update subscription state IDs for all streams that don't have unacked messages +-spec update_stream_subscription_state_ids(emqx_persistent_session_ds_state:t()) -> + emqx_persistent_session_ds_state:t(). +update_stream_subscription_state_ids(S0) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0), + %% Find the latest state IDs for each subscription: + LastSStateIds = emqx_persistent_session_ds_state:fold_subscriptions( + fun(_, #{id := SubId, current_state := SStateId}, Acc) -> + Acc#{SubId => SStateId} + end, + #{}, + S0 + ), + %% Update subscription state IDs for fully acked streams: + emqx_persistent_session_ds_state:fold_streams( + fun + (_, #srs{unsubscribed = true}, S) -> + S; + (Key = {SubId, _Stream}, SRS0, S) -> + case is_fully_acked(CommQos1, CommQos2, SRS0) of + true -> + SRS = SRS0#srs{sub_state_id = maps:get(SubId, LastSStateIds)}, + emqx_persistent_session_ds_state:put_stream(Key, SRS, S); + false -> + S + end + end, + S0, + S0 + ). + %% @doc Compare the streams by the order in which they were replayed. compare_streams( {_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}}, diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index 9071ad9d9..e9e2a97ee 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -25,21 +25,47 @@ %% API: -export([ - on_subscribe/3, - on_unsubscribe/3, + on_subscribe/4, + on_unsubscribe/2, gc/1, lookup/2, to_map/1, - fold/3, - fold_all/3 + fold/3 ]). --export_type([]). +-export_type([subscription_state_id/0, subscription/0, subscription_state/0]). + +-include("emqx_persistent_session_ds.hrl"). %%================================================================================ %% Type declarations %%================================================================================ +-type subscription() :: #{ + %% Session-unique identifier of the subscription. Other objects + %% can use it as a compact reference: + id := emqx_persistent_session_ds:subscription_id(), + %% Reference to the current subscription state: + current_state := subscription_state_id(), + %% Time when the subscription was added: + start_time := emqx_ds:time() +}. + +-type subscription_state_id() :: integer(). + +-type subscription_state() :: #{ + parent_subscription := emqx_persistent_session_ds:subscription_id(), + upgrade_qos := boolean(), + %% SubOpts: + subopts := #{ + nl => _, + qos => _, + rap => _, + subid => _, + _ => _ + } +}. + %%================================================================================ %% API functions %%================================================================================ @@ -47,41 +73,88 @@ %% @doc Process a new subscription -spec on_subscribe( emqx_persistent_session_ds:topic_filter(), - emqx_persistent_session_ds:subscription(), + boolean(), + emqx_types:subopts(), emqx_persistent_session_ds_state:t() ) -> - emqx_persistent_session_ds_state:t(). -on_subscribe(TopicFilter, Subscription, S) -> - emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S). + {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}. +on_subscribe(TopicFilter, UpgradeQoS, SubOpts, S0) -> + case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of + undefined -> + %% This is a new subscription: + {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), + {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), + SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, + S3 = emqx_persistent_session_ds_state:put_subscription_state(SStateId, SState, S2), + Subscription = #{ + id => SubId, + current_state => SStateId, + start_time => now_ms() + }, + S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Subscription, S3), + {true, S}; + Sub0 = #{current_state := SStateId0, id := SubId} -> + SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, + case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of + SState -> + %% Client resubscribed with the same parameters: + {false, S0}; + _ -> + %% Subsription parameters changed: + {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0), + S2 = emqx_persistent_session_ds_state:put_subscription_state( + SStateId, SState, S1 + ), + Sub = Sub0#{current_state => SStateId}, + S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2), + {false, S} + end + end. %% @doc Process UNSUBSCRIBE -spec on_unsubscribe( emqx_persistent_session_ds:topic_filter(), - emqx_persistent_session_ds:subscription(), emqx_persistent_session_ds_state:t() ) -> emqx_persistent_session_ds_state:t(). -on_unsubscribe(TopicFilter, Subscription0, S0) -> - %% Note: we cannot delete the subscription immediately, since its - %% metadata can be used during replay (see `process_batch'). We - %% instead mark it as deleted, and let `subscription_gc' function - %% dispatch it later: - Subscription = Subscription0#{deleted => true}, - emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0). +on_unsubscribe(TopicFilter, S0) -> + emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0). -%% @doc Remove subscriptions that have been marked for deletion, and -%% that don't have any unacked messages: +%% @doc Remove subscription states that don't have a parent, and that +%% don't have any unacked messages: -spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). gc(S0) -> - fold_all( - fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> - case Deleted andalso has_no_unacked_streams(SubId, S0) of - true -> - emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); + %% Create a set of subscription states IDs referenced either by a + %% subscription or a stream replay state: + AliveSet0 = emqx_persistent_session_ds_state:fold_subscriptions( + fun(_TopicFilter, #{current_state := SStateId}, Acc) -> + Acc#{SStateId => true} + end, + #{}, + S0 + ), + AliveSet = emqx_persistent_session_ds_state:fold_streams( + fun(_StreamId, SRS = #srs{sub_state_id = SStateId}, Acc) -> + case emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S0) of false -> + Acc#{SStateId => true}; + true -> Acc end end, + AliveSet0, + S0 + ), + %% Delete dangling subscription states: + emqx_persistent_session_ds_state:fold_subscription_states( + fun(SStateId, _, S) -> + case maps:is_key(SStateId, AliveSet) of + true -> + S; + false -> + emqx_persistent_session_ds_state:del_subscription_state(SStateId, S) + end + end, S0, S0 ). @@ -90,12 +163,16 @@ gc(S0) -> -spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds:subscription() | undefined. lookup(TopicFilter, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of - #{deleted := true} -> - undefined; - Sub -> - Sub + case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of + Sub = #{current_state := SStateId} -> + case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of + #{subopts := SubOpts} -> + Sub#{subopts => SubOpts}; + undefined -> + undefined + end; + undefined -> + undefined end. %% @doc Convert active subscriptions to a map, for information @@ -103,7 +180,7 @@ lookup(TopicFilter, S) -> -spec to_map(emqx_persistent_session_ds_state:t()) -> map(). to_map(S) -> fold( - fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, + fun(TopicFilter, _, Acc) -> Acc#{TopicFilter => lookup(TopicFilter, S)} end, #{}, S ). @@ -115,48 +192,12 @@ to_map(S) -> emqx_persistent_session_ds_state:t() ) -> Acc. -fold(Fun, AccIn, S) -> - fold_all( - fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) -> - case Deleted of - true -> Acc; - false -> Fun(TopicFilter, Sub, Acc) - end - end, - AccIn, - S - ). - -%% @doc Fold over all subscriptions, including inactive ones: --spec fold_all( - fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc), - Acc, - emqx_persistent_session_ds_state:t() -) -> - Acc. -fold_all(Fun, AccIn, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, - AccIn, - Subs - ). +fold(Fun, Acc, S) -> + emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S). %%================================================================================ %% Internal functions %%================================================================================ --spec has_no_unacked_streams( - emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t() -) -> boolean(). -has_no_unacked_streams(SubId, S) -> - emqx_persistent_session_ds_state:fold_streams( - fun - ({SID, _Stream}, Srs, Acc) when SID =:= SubId -> - emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc; - (_StreamKey, _Srs, Acc) -> - Acc - end, - true, - S - ). +now_ms() -> + erlang:system_time(millisecond). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 37a86bda6..3892740a6 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -429,6 +429,11 @@ enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> end, enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS). +%% Caution: updating this function _may_ break consistency of replay +%% for persistent sessions. Persistent sessions expect it to return +%% the same result during replay. If it changes the behavior between +%% releases, sessions restored from the cold storage may end up +%% replaying messages with different QoS, etc. enrich_message( ClientInfo = #{clientid := ClientId}, Msg = #message{from = ClientId}, diff --git a/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl index 61e0575a8..375b4f4b1 100644 --- a/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl +++ b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl @@ -74,9 +74,6 @@ session_id() -> topic() -> oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]). -subid() -> - oneof([[]]). - subscription() -> oneof([#{}]). @@ -129,18 +126,25 @@ put_req() -> {Track, Seqno}, {seqno_track(), seqno()}, {#s.seqno, put_seqno, Track, Seqno} + ), + ?LET( + {Topic, Subscription}, + {topic(), subscription()}, + {#s.subs, put_subscription, Topic, Subscription} ) ]). get_req() -> oneof([ {#s.streams, get_stream, stream_id()}, - {#s.seqno, get_seqno, seqno_track()} + {#s.seqno, get_seqno, seqno_track()}, + {#s.subs, get_subscription, topic()} ]). del_req() -> oneof([ - {#s.streams, del_stream, stream_id()} + {#s.streams, del_stream, stream_id()}, + {#s.subs, del_subscription, topic()} ]). command(S) -> @@ -153,13 +157,6 @@ command(S) -> {2, {call, ?MODULE, reopen, [session_id(S)]}}, {2, {call, ?MODULE, commit, [session_id(S)]}}, - %% Subscriptions: - {3, - {call, ?MODULE, put_subscription, [ - session_id(S), topic(), subid(), subscription() - ]}}, - {3, {call, ?MODULE, del_subscription, [session_id(S), topic(), subid()]}}, - %% Metadata: {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}}, {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}}, @@ -170,7 +167,6 @@ command(S) -> {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}}, %% Getters: - {4, {call, ?MODULE, get_subscriptions, [session_id(S)]}}, {1, {call, ?MODULE, iterate_sessions, [batch_size()]}} ]); false -> @@ -207,19 +203,6 @@ postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result) #{session_id => SessionId, key => Key, 'fun' => Fun} ), true; -postcondition(S, {call, ?MODULE, get_subscriptions, [SessionId]}, Result) -> - #{SessionId := #s{subs = Subs}} = S, - ?assertEqual(maps:size(Subs), emqx_topic_gbt:size(Result)), - maps:foreach( - fun({TopicFilter, Id}, Expected) -> - ?assertEqual( - Expected, - emqx_topic_gbt:lookup(TopicFilter, Id, Result, default) - ) - end, - Subs - ), - true; postcondition(_, _, _) -> true. @@ -227,22 +210,6 @@ next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) -> S#{SessionId => #s{}}; next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) -> maps:remove(SessionId, S); -next_state(S, _V, {call, ?MODULE, put_subscription, [SessionId, TopicFilter, SubId, Subscription]}) -> - Key = {TopicFilter, SubId}, - update( - SessionId, - #s.subs, - fun(Subs) -> Subs#{Key => Subscription} end, - S - ); -next_state(S, _V, {call, ?MODULE, del_subscription, [SessionId, TopicFilter, SubId]}) -> - Key = {TopicFilter, SubId}, - update( - SessionId, - #s.subs, - fun(Subs) -> maps:remove(Key, Subs) end, - S - ); next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) -> update( SessionId, @@ -296,19 +263,6 @@ reopen(SessionId) -> {ok, S} = emqx_persistent_session_ds_state:open(SessionId), put_state(SessionId, S). -put_subscription(SessionId, TopicFilter, SubId, Subscription) -> - S = emqx_persistent_session_ds_state:put_subscription( - TopicFilter, SubId, Subscription, get_state(SessionId) - ), - put_state(SessionId, S). - -del_subscription(SessionId, TopicFilter, SubId) -> - S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, SubId, get_state(SessionId)), - put_state(SessionId, S). - -get_subscriptions(SessionId) -> - emqx_persistent_session_ds_state:get_subscriptions(get_state(SessionId)). - put_metadata(SessionId, {_MetaKey, Fun, Value}) -> S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]), put_state(SessionId, S).