From c645cfa5d63196441f8edab48fcd23d79498697f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 12 Apr 2024 18:04:27 +0200 Subject: [PATCH 01/16] fix(sessds): Graceful handling of shared subscription error --- apps/emqx/src/emqx_persistent_session_ds.erl | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 83ed5d465..0f1e77370 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -337,6 +337,13 @@ print_session(ClientId) -> -spec subscribe(topic_filter(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}. +subscribe( + #share{}, + _SubOpts, + _Session +) -> + %% TODO: Shared subscriptions are not supported yet: + {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; subscribe( TopicFilter, SubOpts, @@ -421,6 +428,9 @@ do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) -> -spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. +get_subscription(#share{}, _) -> + %% TODO: shared subscriptions are not supported yet: + undefined; get_subscription(TopicFilter, #{s := S}) -> case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of _Subscription = #{props := SubOpts} -> From 3e0c649e8e5cd0810469771d0b21ef2e95631066 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 12 Apr 2024 16:33:28 +0200 Subject: [PATCH 02/16] feat(sessds): Store awaiting rel --- apps/emqx/src/emqx_persistent_session_ds.erl | 111 ++++++++++++++---- .../src/emqx_persistent_session_ds_state.erl | 63 +++++++++- .../src/emqx_persistent_session_ds_subs.erl | 10 +- 3 files changed, 152 insertions(+), 32 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 0f1e77370..4517fa1b7 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -184,7 +184,9 @@ seqno_q2_dup, seqno_q2_rec, seqno_q2_next, - n_streams + n_streams, + awaiting_rel_cnt, + awaiting_rel_max ]). %% @@ -206,7 +208,8 @@ open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> ok = emqx_cm:takeover_kick(ClientID), case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of Session0 = #{} -> - Session = Session0#{props => Conf}, + Session1 = Session0#{props => Conf}, + Session = do_expire(ClientInfo, Session1), {true, ensure_timers(Session), []}; false -> false @@ -262,21 +265,21 @@ info(inflight_max, #{inflight := Inflight}) -> emqx_persistent_session_ds_inflight:receive_maximum(Inflight); info(retry_interval, #{props := Conf}) -> maps:get(retry_interval, Conf); -% info(mqueue, #sessmem{mqueue = MQueue}) -> -% MQueue; info(mqueue_len, #{inflight := Inflight}) -> emqx_persistent_session_ds_inflight:n_buffered(all, Inflight); -% info(mqueue_max, #sessmem{mqueue = MQueue}) -> -% emqx_mqueue:max_len(MQueue); info(mqueue_dropped, _Session) -> 0; %% info(next_pkt_id, #{s := S}) -> %% {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S), %% PacketId; -% info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> -% AwaitingRel; -%% info(awaiting_rel_cnt, #{s := S}) -> -%% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); +info(awaiting_rel, #{s := S}) -> + emqx_persistent_session_ds_state:fold_awaiting_rel(fun maps:put/3, #{}, S); +info(awaiting_rel_max, #{props := Conf}) -> + maps:get(max_awaiting_rel, Conf); +info(awaiting_rel_cnt, #{s := S}) -> + emqx_persistent_session_ds_state:n_awaiting_rel(S); +info(await_rel_timeout, #{props := Conf}) -> + maps:get(await_rel_timeout, Conf); info(seqno_q1_comm, #{s := S}) -> emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S); info(seqno_q1_dup, #{s := S}) -> @@ -292,17 +295,7 @@ info(seqno_q2_rec, #{s := S}) -> info(seqno_q2_next, #{s := S}) -> emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S); info(n_streams, #{s := S}) -> - emqx_persistent_session_ds_state:fold_streams( - fun(_, _, Acc) -> Acc + 1 end, - 0, - S - ); -info(awaiting_rel_max, #{props := Conf}) -> - maps:get(max_awaiting_rel, Conf); -info(await_rel_timeout, #{props := _Conf}) -> - %% TODO: currently this setting is ignored: - %% maps:get(await_rel_timeout, Conf). - 0; + emqx_persistent_session_ds_state:n_streams(S); info({MsgsQ, _PagerParams}, _Session) when MsgsQ =:= mqueue_msgs; MsgsQ =:= inflight_msgs -> {error, not_implemented}. @@ -446,11 +439,72 @@ get_subscription(TopicFilter, #{s := S}) -> -spec publish(emqx_types:packet_id(), emqx_types:message(), session()) -> {ok, emqx_types:publish_result(), session()} | {error, emqx_types:reason_code()}. +publish( + PacketId, + Msg = #message{qos = ?QOS_2, timestamp = Ts}, + Session = #{s := S0} +) -> + case is_awaiting_full(Session) of + false -> + case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of + undefined -> + Results = emqx_broker:publish(Msg), + S = emqx_persistent_session_ds_state:put_awaiting_rel(PacketId, Ts, S0), + {ok, Results, Session#{s => S}}; + _Ts -> + {error, ?RC_PACKET_IDENTIFIER_IN_USE} + end; + true -> + {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} + end; publish(_PacketId, Msg, Session) -> - %% TODO: QoS2 Result = emqx_broker:publish(Msg), {ok, Result, Session}. +is_awaiting_full(#{s := S, props := Props}) -> + emqx_persistent_session_ds_state:n_awaiting_rel(S) >= + maps:get(max_awaiting_rel, Props, infinity). + +-spec expire(emqx_types:clientinfo(), session()) -> + {ok, [], timeout(), session()} | {ok, [], session()}. +expire(ClientInfo, Session0 = #{props := Props}) -> + Session = #{s := S} = do_expire(ClientInfo, Session0), + case emqx_persistent_session_ds_state:n_awaiting_rel(S) of + 0 -> + {ok, [], Session}; + _ -> + AwaitRelTimeout = maps:get(await_rel_timeout, Props), + {ok, [], AwaitRelTimeout, Session} + end. + +do_expire(ClientInfo, Session = #{s := S0, props := Props}) -> + %% 1. Find expired packet IDs: + Now = erlang:system_time(millisecond), + AwaitRelTimeout = maps:get(await_rel_timeout, Props), + ExpiredPacketIds = + emqx_persistent_session_ds_state:fold_awaiting_rel( + fun(PacketId, Ts, Acc) -> + Age = Now - Ts, + case Age > AwaitRelTimeout of + true -> + [PacketId | Acc]; + false -> + Acc + end + end, + [], + S0 + ), + %% 2. Perform side effects: + _ = emqx_session_events:handle_event(ClientInfo, {expired_rel, length(ExpiredPacketIds)}), + %% 3. Update state: + S = lists:foldl( + fun emqx_persistent_session_ds_state:del_awaiting_rel/2, + S0, + ExpiredPacketIds + ), + Session#{s => S}. + %%-------------------------------------------------------------------- %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- @@ -487,9 +541,14 @@ pubrec(PacketId, Session0) -> -spec pubrel(emqx_types:packet_id(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}. -pubrel(_PacketId, Session = #{}) -> - % TODO: stub - {ok, Session}. +pubrel(PacketId, Session = #{s := S0}) -> + case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of + undefined -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}; + _TS -> + S = emqx_persistent_session_ds_state:del_awaiting_rel(PacketId, S0), + {ok, Session#{s => S}} + end. %%-------------------------------------------------------------------- %% Client -> Broker: PUBCOMP @@ -562,6 +621,8 @@ handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s := S = emqx_persistent_session_ds_state:commit(S0), From ! Ref, {ok, [], Session#{s => S}}; +handle_timeout(ClientInfo, expire_awaiting_rel, Session) -> + expire(ClientInfo, Session); handle_timeout(_ClientInfo, Timeout, Session) -> ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}), {ok, [], Session}. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 28297964d..fc2da1317 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -34,10 +34,17 @@ -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). -export([get_peername/1, set_peername/2]). -export([new_id/1]). --export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). +-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]). -export([get_seqno/2, put_seqno/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). -export([get_subscriptions/1, put_subscription/4, del_subscription/3]). +-export([ + get_awaiting_rel/2, + put_awaiting_rel/3, + del_awaiting_rel/2, + fold_awaiting_rel/3, + n_awaiting_rel/1 +]). -export([make_session_iterator/0, session_iterator_next/2]). @@ -117,7 +124,8 @@ subscriptions := subscriptions(), seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), - ranks := pmap(term(), integer()) + ranks := pmap(term(), integer()), + awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer()) }. -define(session_tab, emqx_ds_session_tab). @@ -125,7 +133,8 @@ -define(stream_tab, emqx_ds_session_streams). -define(seqno_tab, emqx_ds_session_seqnos). -define(rank_tab, emqx_ds_session_ranks). --define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). +-define(awaiting_rel_tab, emqx_ds_session_awaiting_rel). +-define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab, ?awaiting_rel_tab]). %% Enable this flag if you suspect some code breaks the sequence: -ifndef(CHECK_SEQNO). @@ -167,6 +176,7 @@ open(SessionId) -> streams => pmap_open(?stream_tab, SessionId), seqnos => pmap_open(?seqno_tab, SessionId), ranks => pmap_open(?rank_tab, SessionId), + awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId), ?unset_dirty }, {ok, Rec}; @@ -190,7 +200,8 @@ format(#{ subscriptions := SubsGBT, streams := Streams, seqnos := Seqnos, - ranks := Ranks + ranks := Ranks, + awaiting_rel := AwaitingRel }) -> Subs = emqx_topic_gbt:fold( fun(Key, Sub, Acc) -> @@ -204,7 +215,8 @@ format(#{ subscriptions => Subs, streams => pmap_format(Streams), seqnos => pmap_format(Seqnos), - ranks => pmap_format(Ranks) + ranks => pmap_format(Ranks), + awaiting_rel => pmap_format(AwaitingRel) }. -spec list_sessions() -> [emqx_persistent_session_ds:id()]. @@ -229,7 +241,8 @@ commit( metadata := Metadata, streams := Streams, seqnos := SeqNos, - ranks := Ranks + ranks := Ranks, + awaiting_rel := AwaitingRel } ) -> check_sequence(Rec), @@ -239,6 +252,7 @@ commit( streams => pmap_commit(SessionId, Streams), seqnos => pmap_commit(SessionId, SeqNos), ranks => pmap_commit(SessionId, Ranks), + awaiting_rel => pmap_commit(SessionId, AwaitingRel), ?unset_dirty } end). @@ -254,6 +268,7 @@ create_new(SessionId) -> streams => pmap_open(?stream_tab, SessionId), seqnos => pmap_open(?seqno_tab, SessionId), ranks => pmap_open(?rank_tab, SessionId), + awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId), ?set_dirty } end). @@ -382,6 +397,10 @@ del_stream(Key, Rec) -> fold_streams(Fun, Acc, Rec) -> gen_fold(streams, Fun, Acc, Rec). +-spec n_streams(t()) -> non_neg_integer(). +n_streams(Rec) -> + gen_size(streams, Rec). + %% -spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined. @@ -412,6 +431,30 @@ del_rank(Key, Rec) -> fold_ranks(Fun, Acc, Rec) -> gen_fold(ranks, Fun, Acc, Rec). +%% + +-spec get_awaiting_rel(emqx_types:packet_id(), t()) -> integer() | undefined. +get_awaiting_rel(Key, Rec) -> + gen_get(awaiting_rel, Key, Rec). + +-spec put_awaiting_rel(emqx_types:packet_id(), _Timestamp :: integer(), t()) -> t(). +put_awaiting_rel(Key, Val, Rec) -> + gen_put(awaiting_rel, Key, Val, Rec). + +-spec del_awaiting_rel(emqx_types:packet_id(), t()) -> t(). +del_awaiting_rel(Key, Rec) -> + gen_del(awaiting_rel, Key, Rec). + +-spec fold_awaiting_rel(fun(), Acc, t()) -> Acc. +fold_awaiting_rel(Fun, Acc, Rec) -> + gen_fold(awaiting_rel, Fun, Acc, Rec). + +-spec n_awaiting_rel(t()) -> non_neg_integer(). +n_awaiting_rel(Rec) -> + gen_size(awaiting_rel, Rec). + +%% + -spec make_session_iterator() -> session_iterator(). make_session_iterator() -> mnesia:dirty_first(?session_tab). @@ -475,6 +518,10 @@ gen_del(Field, Key, Rec) -> Rec#{?set_dirty} ). +gen_size(Field, Rec) -> + check_sequence(Rec), + pmap_size(maps:get(Field, Rec)). + %% read_subscriptions(SessionId) -> @@ -547,6 +594,10 @@ pmap_commit( pmap_format(#pmap{cache = Cache}) -> Cache. +-spec pmap_size(pmap(_K, _V)) -> non_neg_integer(). +pmap_size(#pmap{cache = Cache}) -> + maps:size(Cache). + %% Functions dealing with set tables: kv_persist(Tab, SessionId, Val0) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index 92f17b108..9071ad9d9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -24,7 +24,15 @@ -module(emqx_persistent_session_ds_subs). %% API: --export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]). +-export([ + on_subscribe/3, + on_unsubscribe/3, + gc/1, + lookup/2, + to_map/1, + fold/3, + fold_all/3 +]). -export_type([]). From b30ddc206e9244050764f4bff32151b07b83ea00 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 13 Apr 2024 01:17:32 +0200 Subject: [PATCH 03/16] fix(sessds): Immutable subscriptions This commit fixes two issues: - Behavior of overlapping subscriptions has been aligned with the in-memory session. - Fixed handling of replays when subscription changes (either by client or EMQX configuration) --- apps/emqx/src/emqx_persistent_session_ds.erl | 166 +++++++------- apps/emqx/src/emqx_persistent_session_ds.hrl | 4 +- .../src/emqx_persistent_session_ds_state.erl | 205 ++++++++++-------- ...persistent_session_ds_stream_scheduler.erl | 46 +++- .../src/emqx_persistent_session_ds_subs.erl | 183 ++++++++++------ apps/emqx/src/emqx_session.erl | 5 + ...emqx_persistent_session_ds_state_tests.erl | 64 +----- 7 files changed, 370 insertions(+), 303 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 4517fa1b7..0829b3fd3 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -116,15 +116,42 @@ %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). -type id() :: binary(). --type topic_filter() :: emqx_types:topic(). +-type topic_filter() :: emqx_types:topic() | #share{}. + +%% Subscription and subscription states: +%% +%% Persistent sessions cannot simply update or delete subscriptions, +%% since subscription parameters must be exactly the same during +%% replay. +%% +%% To solve this problem, we store subscriptions in a twofold manner: +%% +%% - `subscription' is an object that holds up-to-date information +%% about the client's subscription and a reference to the latest +%% subscription state id +%% +%% - `subscription_state' is an immutable object that holds +%% information about the subcription parameters at a certain point of +%% time +%% +%% New subscription states are created whenever the client subscribes +%% to a topics, or updates an existing subscription. +%% +%% Stream replay states contain references to the subscription states. +%% +%% Outdated subscription states are discarded when they are not +%% referenced by either subscription or stream replay state objects. -type subscription_id() :: integer(). +%% This type is a result of merging +%% `emqx_persistent_session_ds_subs:subscription()' with its current +%% state. -type subscription() :: #{ id := subscription_id(), start_time := emqx_ds:time(), - props := map(), - deleted := boolean() + current_state := emqx_persistent_session_ds_subs:subscription_state_id(), + subopts := map() }. -define(TIMER_PULL, timer_pull). @@ -252,7 +279,7 @@ info(is_persistent, #{}) -> info(subscriptions, #{s := S}) -> emqx_persistent_session_ds_subs:to_map(S); info(subscriptions_cnt, #{s := S}) -> - emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S)); + emqx_persistent_session_ds_state:n_subscriptions(S); info(subscriptions_max, #{props := Conf}) -> maps:get(max_subscriptions, Conf); info(upgrade_qos, #{props := Conf}) -> @@ -340,53 +367,20 @@ subscribe( subscribe( TopicFilter, SubOpts, - Session = #{id := ID, s := S0} + Session = #{id := ID, s := S0, props := #{upgrade_qos := UpgradeQoS}} ) -> - case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of - undefined -> - %% TODO: max subscriptions - - %% N.B.: we chose to update the router before adding the - %% subscription to the session/iterator table. The - %% reasoning for this is as follows: - %% - %% Messages matching this topic filter should start to be - %% persisted as soon as possible to avoid missing - %% messages. If this is the first such persistent session - %% subscription, it's important to do so early on. - %% - %% This could, in turn, lead to some inconsistency: if - %% such a route gets created but the session/iterator data - %% fails to be updated accordingly, we have a dangling - %% route. To remove such dangling routes, we may have a - %% periodic GC process that removes routes that do not - %% have a matching persistent subscription. Also, route - %% operations use dirty mnesia operations, which - %% inherently have room for inconsistencies. - %% - %% In practice, we use the iterator reference table as a - %% source of truth, since it is guarded by a transaction - %% context: we consider a subscription operation to be - %% successful if it ended up changing this table. Both - %% router and iterator information can be reconstructed - %% from this table, if needed. - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID), - {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), - Subscription = #{ - start_time => now_ms(), - props => SubOpts, - id => SubId, - deleted => false - }, - IsNew = true; - Subscription0 = #{} -> - Subscription = Subscription0#{props => SubOpts}, - IsNew = false, - S1 = S0 + {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe( + TopicFilter, UpgradeQoS, SubOpts, S0 + ), + case UpdateRouter of + true -> + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID); + false -> + ok end, - S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1), + S = emqx_persistent_session_ds_state:commit(S1), ?tp(persistent_session_ds_subscription_added, #{ - topic_filter => TopicFilter, sub => Subscription, is_new => IsNew + topic_filter => TopicFilter, is_new => UpdateRouter }), {ok, Session#{s => S}}. @@ -399,15 +393,15 @@ unsubscribe( case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}; - Subscription = #{props := SubOpts} -> + Subscription = #{subopts := SubOpts} -> S = do_unsubscribe(ID, TopicFilter, Subscription, S0), {ok, Session#{s => S}, SubOpts} end. -spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). -do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) -> - S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0), +do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) -> + S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, S0), ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, topic_filter => TopicFilter }), @@ -426,7 +420,7 @@ get_subscription(#share{}, _) -> undefined; get_subscription(TopicFilter, #{s := S}) -> case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of - _Subscription = #{props := SubOpts} -> + #{subopts := SubOpts} -> SubOpts; undefined -> undefined @@ -716,7 +710,7 @@ list_client_subscriptions(ClientId) -> %% TODO: this is not the most optimal implementation, since it %% should be possible to avoid reading extra data (streams, etc.) case print_session(ClientId) of - Sess = #{s := #{subscriptions := Subs}} -> + Sess = #{s := #{subscriptions := Subs, subscription_states := SStates}} -> Node = case Sess of #{'_alive' := {true, Pid}} -> @@ -726,8 +720,9 @@ list_client_subscriptions(ClientId) -> end, SubList = maps:fold( - fun(Topic, #{props := SubProps}, Acc) -> - Elem = {Topic, SubProps}, + fun(Topic, #{current_state := CS}, Acc) -> + #{subopts := SubOpts} = maps:get(CS, SStates), + Elem = {Topic, SubOpts}, [Elem | Acc] end, [], @@ -945,22 +940,31 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> Session0 end. -enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) -> +enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) -> #srs{ it_begin = ItBegin0, it_end = ItEnd0, first_seqno_qos1 = FirstSeqnoQos1, - first_seqno_qos2 = FirstSeqnoQos2 + first_seqno_qos2 = FirstSeqnoQos2, + sub_state_id = SubStateId } = Srs0, ItBegin = case IsReplay of true -> ItBegin0; false -> ItEnd0 end, + SubState = #{} = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S), case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of {ok, ItEnd, Messages} -> {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch( - IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0 + IsReplay, + Session, + SubState, + ClientInfo, + FirstSeqnoQos1, + FirstSeqnoQos2, + Messages, + Inflight0 ), Srs = Srs0#srs{ it_begin = ItBegin, @@ -984,27 +988,29 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, Cli %% key_of_iter(#{3 := #{3 := #{5 := K}}}) -> %% K. -process_batch(_IsReplay, _Session, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight) -> +process_batch( + _IsReplay, _Session, _SubState, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight +) -> {Inflight, LastSeqNoQos1, LastSeqNoQos2}; process_batch( - IsReplay, Session, ClientInfo, FirstSeqNoQos1, FirstSeqNoQos2, [KV | Messages], Inflight0 + IsReplay, + Session, + SubState, + ClientInfo, + FirstSeqNoQos1, + FirstSeqNoQos2, + [KV | Messages], + Inflight0 ) -> - #{s := S, props := #{upgrade_qos := UpgradeQoS}} = Session, - {_DsMsgKey, Msg0 = #message{topic = Topic}} = KV, + #{s := S} = Session, + #{upgrade_qos := UpgradeQoS, subopts := SubOpts} = SubState, + {_DsMsgKey, Msg0} = KV, Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S), Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S), Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S), - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - Msgs = [ - Msg - || SubMatch <- emqx_topic_gbt:matches(Topic, Subs, []), - Msg <- begin - #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs), - emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS) - end - ], + Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS), {Inflight, LastSeqNoQos1, LastSeqNoQos2} = lists:foldl( fun(Msg = #message{qos = Qos}, {Acc, SeqNoQos10, SeqNoQos20}) -> case Qos of @@ -1060,7 +1066,7 @@ process_batch( Msgs ), process_batch( - IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight + IsReplay, Session, SubState, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight ). %%-------------------------------------------------------------------- @@ -1077,15 +1083,13 @@ enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos : %% queued messages. Since streams in this DB are exclusive to the %% session, messages from the queue can be dropped as soon as they %% are acked. - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - Msgs = [ - Msg - || SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []), - Msg <- begin - #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs), - emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS) - end - ], + case emqx_persistent_session_ds_state:get_subscription(Msg0#message.topic, S) of + #{current_state := CS} -> + #{subopts := SubOpts} = emqx_persistent_session_ds_state:get_subscription_state(CS, S); + undefined -> + SubOpts = undefined + end, + Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS), lists:foldl(fun do_enqueue_transient/2, Session, Msgs). do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 56862dfa5..e2b52e36d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -65,7 +65,9 @@ last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(), %% This stream belongs to an unsubscribed topic-filter, and is %% marked for deletion: - unsubscribed = false :: boolean() + unsubscribed = false :: boolean(), + %% Reference to the subscription state: + sub_state_id :: emqx_persistent_session_ds_subs:subscription_state_id() }). %% Session metadata keys: diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index fc2da1317..90d86bb1d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -37,7 +37,19 @@ -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]). -export([get_seqno/2, put_seqno/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). --export([get_subscriptions/1, put_subscription/4, del_subscription/3]). +-export([ + get_subscription_state/2, + fold_subscription_states/3, + put_subscription_state/3, + del_subscription_state/2 +]). +-export([ + get_subscription/2, + fold_subscriptions/3, + n_subscriptions/1, + put_subscription/3, + del_subscription/2 +]). -export([ get_awaiting_rel/2, put_awaiting_rel/3, @@ -51,7 +63,6 @@ -export_type([ t/0, metadata/0, - subscriptions/0, seqno_type/0, stream_key/0, rank_key/0, @@ -69,8 +80,6 @@ -type message() :: emqx_types:message(). --type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()). - -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. %% Generic key-value wrapper that is used for exporting arbitrary @@ -121,7 +130,13 @@ id := emqx_persistent_session_ds:id(), dirty := boolean(), metadata := metadata(), - subscriptions := subscriptions(), + subscriptions := pmap( + emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_subs:subscription() + ), + subscription_states := pmap( + emqx_persistent_session_ds_subs:subscription_state_id(), + emqx_persistent_session_ds_subs:subscription_state() + ), seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), ranks := pmap(term(), integer()), @@ -130,11 +145,20 @@ -define(session_tab, emqx_ds_session_tab). -define(subscription_tab, emqx_ds_session_subscriptions). +-define(subscription_states_tab, emqx_ds_session_subscription_states). -define(stream_tab, emqx_ds_session_streams). -define(seqno_tab, emqx_ds_session_seqnos). -define(rank_tab, emqx_ds_session_ranks). -define(awaiting_rel_tab, emqx_ds_session_awaiting_rel). --define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab, ?awaiting_rel_tab]). + +-define(pmaps, [ + {subscriptions, ?subscription_tab}, + {subscription_states, ?subscription_states_tab}, + {streams, ?stream_tab}, + {seqnos, ?seqno_tab}, + {ranks, ?rank_tab}, + {awaiting_rel, ?awaiting_rel_tab} +]). %% Enable this flag if you suspect some code breaks the sequence: -ifndef(CHECK_SEQNO). @@ -161,24 +185,25 @@ create_tables() -> {attributes, record_info(fields, kv)} ] ), - [create_kv_pmap_table(Table) || Table <- ?pmap_tables], - mria:wait_for_tables([?session_tab | ?pmap_tables]). + {_, PmapTables} = lists:unzip(?pmaps), + [create_kv_pmap_table(Table) || Table <- PmapTables], + mria:wait_for_tables([?session_tab | PmapTables]). -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined. open(SessionId) -> ro_transaction(fun() -> case kv_restore(?session_tab, SessionId) of [Metadata] -> - Rec = #{ - id => SessionId, - metadata => Metadata, - subscriptions => read_subscriptions(SessionId), - streams => pmap_open(?stream_tab, SessionId), - seqnos => pmap_open(?seqno_tab, SessionId), - ranks => pmap_open(?rank_tab, SessionId), - awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId), - ?unset_dirty - }, + Rec = update_pmaps( + fun(_Pmap, Table) -> + pmap_open(Table, SessionId) + end, + #{ + id => SessionId, + metadata => Metadata, + ?unset_dirty + } + ), {ok, Rec}; [] -> undefined @@ -195,29 +220,13 @@ print_session(SessionId) -> end. -spec format(t()) -> map(). -format(#{ - metadata := Metadata, - subscriptions := SubsGBT, - streams := Streams, - seqnos := Seqnos, - ranks := Ranks, - awaiting_rel := AwaitingRel -}) -> - Subs = emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> - maps:put(emqx_topic_gbt:get_topic(Key), Sub, Acc) +format(Rec) -> + update_pmaps( + fun(Pmap, _Table) -> + pmap_format(Pmap) end, - #{}, - SubsGBT - ), - #{ - metadata => Metadata, - subscriptions => Subs, - streams => pmap_format(Streams), - seqnos => pmap_format(Seqnos), - ranks => pmap_format(Ranks), - awaiting_rel => pmap_format(AwaitingRel) - }. + maps:without([id, dirty], Rec) + ). -spec list_sessions() -> [emqx_persistent_session_ds:id()]. list_sessions() -> @@ -227,7 +236,7 @@ list_sessions() -> delete(Id) -> transaction( fun() -> - [kv_pmap_delete(Table, Id) || Table <- ?pmap_tables], + [kv_pmap_delete(Table, Id) || {_, Table} <- ?pmaps], mnesia:delete(?session_tab, Id, write) end ). @@ -238,39 +247,34 @@ commit(Rec = #{dirty := false}) -> commit( Rec = #{ id := SessionId, - metadata := Metadata, - streams := Streams, - seqnos := SeqNos, - ranks := Ranks, - awaiting_rel := AwaitingRel + metadata := Metadata } ) -> check_sequence(Rec), transaction(fun() -> kv_persist(?session_tab, SessionId, Metadata), - Rec#{ - streams => pmap_commit(SessionId, Streams), - seqnos => pmap_commit(SessionId, SeqNos), - ranks => pmap_commit(SessionId, Ranks), - awaiting_rel => pmap_commit(SessionId, AwaitingRel), - ?unset_dirty - } + update_pmaps( + fun(Pmap, _Table) -> + pmap_commit(SessionId, Pmap) + end, + Rec#{?unset_dirty} + ) end). -spec create_new(emqx_persistent_session_ds:id()) -> t(). create_new(SessionId) -> transaction(fun() -> delete(SessionId), - #{ - id => SessionId, - metadata => #{}, - subscriptions => emqx_topic_gbt:new(), - streams => pmap_open(?stream_tab, SessionId), - seqnos => pmap_open(?seqno_tab, SessionId), - ranks => pmap_open(?rank_tab, SessionId), - awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId), - ?set_dirty - } + update_pmaps( + fun(_Pmap, Table) -> + pmap_open(Table, SessionId) + end, + #{ + id => SessionId, + metadata => #{}, + ?set_dirty + } + ) end). %% @@ -351,30 +355,53 @@ new_id(Rec) -> %% --spec get_subscriptions(t()) -> subscriptions(). -get_subscriptions(#{subscriptions := Subs}) -> - Subs. +-spec get_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> + emqx_persistent_session_ds_subs:subscription() | undefined. +get_subscription(TopicFilter, Rec) -> + gen_get(subscriptions, TopicFilter, Rec). + +-spec fold_subscriptions(fun(), Acc, t()) -> Acc. +fold_subscriptions(Fun, Acc, Rec) -> + gen_fold(subscriptions, Fun, Acc, Rec). + +-spec n_subscriptions(t()) -> non_neg_integer(). +n_subscriptions(Rec) -> + gen_size(subscriptions, Rec). -spec put_subscription( emqx_persistent_session_ds:topic_filter(), - _SubId, - emqx_persistent_session_ds:subscription(), + emqx_persistent_session_ds_subs:subscription(), t() ) -> t(). -put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) -> - %% Note: currently changes to the subscriptions are persisted immediately. - Key = {TopicFilter, SubId}, - transaction(fun() -> kv_pmap_persist(?subscription_tab, Id, Key, Subscription) end), - Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0), - Rec#{subscriptions => Subs}. +put_subscription(TopicFilter, Subscription, Rec) -> + gen_put(subscriptions, TopicFilter, Subscription, Rec). --spec del_subscription(emqx_persistent_session_ds:topic_filter(), _SubId, t()) -> t(). -del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) -> - %% Note: currently the subscriptions are persisted immediately. - Key = {TopicFilter, SubId}, - transaction(fun() -> kv_pmap_delete(?subscription_tab, Id, Key) end), - Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0), - Rec#{subscriptions => Subs}. +-spec del_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> t(). +del_subscription(TopicFilter, Rec) -> + gen_del(subscriptions, TopicFilter, Rec). + +%% + +-spec get_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> + emqx_persistent_session_ds_subs:subscription_state() | undefined. +get_subscription_state(SStateId, Rec) -> + gen_get(subscription_states, SStateId, Rec). + +-spec fold_subscription_states(fun(), Acc, t()) -> Acc. +fold_subscription_states(Fun, Acc, Rec) -> + gen_fold(subscription_states, Fun, Acc, Rec). + +-spec put_subscription_state( + emqx_persistent_session_ds_subs:subscription_state_id(), + emqx_persistent_session_ds_subs:subscription_state(), + t() +) -> t(). +put_subscription_state(SStateId, SState, Rec) -> + gen_put(subscription_states, SStateId, SState, Rec). + +-spec del_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> t(). +del_subscription_state(SStateId, Rec) -> + gen_del(subscription_states, SStateId, Rec). %% @@ -522,16 +549,16 @@ gen_size(Field, Rec) -> check_sequence(Rec), pmap_size(maps:get(Field, Rec)). -%% - -read_subscriptions(SessionId) -> - Records = kv_pmap_restore(?subscription_tab, SessionId), +-spec update_pmaps(fun((pmap(_K, _V) | undefined, atom()) -> term()), map()) -> map(). +update_pmaps(Fun, Map) -> lists:foldl( - fun({{TopicFilter, SubId}, Subscription}, Acc) -> - emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Acc) + fun({MapKey, Table}, Acc) -> + OldVal = maps:get(MapKey, Map, undefined), + Val = Fun(OldVal, Table), + maps:put(MapKey, Val, Acc) end, - emqx_topic_gbt:new(), - Records + Map, + ?pmaps ). %% diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 154f59b44..1be0bdf4a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -126,9 +126,10 @@ find_new_streams(S) -> renew_streams(S0) -> S1 = remove_unsubscribed_streams(S0), S2 = remove_fully_replayed_streams(S1), + S3 = update_stream_subscription_state_ids(S2), emqx_persistent_session_ds_subs:fold( fun - (Key, #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> + (Key, #{start_time := StartTime, id := SubId, current_state := SStateId}, Acc) -> TopicFilter = emqx_topic:words(Key), Streams = select_streams( SubId, @@ -137,7 +138,7 @@ renew_streams(S0) -> ), lists:foldl( fun(I, Acc1) -> - ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) + ensure_iterator(TopicFilter, StartTime, SubId, SStateId, I, Acc1) end, Acc, Streams @@ -145,8 +146,8 @@ renew_streams(S0) -> (_Key, _DeletedSubscription, Acc) -> Acc end, - S2, - S2 + S3, + S3 ). -spec on_unsubscribe( @@ -201,7 +202,7 @@ is_fully_acked(Srs, S) -> %% Internal functions %%================================================================================ -ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> +ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream}, S) -> Key = {SubId, Stream}, case emqx_persistent_session_ds_state:get_stream(Key, S) of undefined -> @@ -214,7 +215,8 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> rank_x = RankX, rank_y = RankY, it_begin = Iterator, - it_end = Iterator + it_end = Iterator, + sub_state_id = SStateId }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); {error, recoverable, Reason} -> @@ -350,6 +352,38 @@ remove_fully_replayed_streams(S0) -> S1 ). +%% @doc Update subscription state IDs for all streams that don't have unacked messages +-spec update_stream_subscription_state_ids(emqx_persistent_session_ds_state:t()) -> + emqx_persistent_session_ds_state:t(). +update_stream_subscription_state_ids(S0) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0), + %% Find the latest state IDs for each subscription: + LastSStateIds = emqx_persistent_session_ds_state:fold_subscriptions( + fun(_, #{id := SubId, current_state := SStateId}, Acc) -> + Acc#{SubId => SStateId} + end, + #{}, + S0 + ), + %% Update subscription state IDs for fully acked streams: + emqx_persistent_session_ds_state:fold_streams( + fun + (_, #srs{unsubscribed = true}, S) -> + S; + (Key = {SubId, _Stream}, SRS0, S) -> + case is_fully_acked(CommQos1, CommQos2, SRS0) of + true -> + SRS = SRS0#srs{sub_state_id = maps:get(SubId, LastSStateIds)}, + emqx_persistent_session_ds_state:put_stream(Key, SRS, S); + false -> + S + end + end, + S0, + S0 + ). + %% @doc Compare the streams by the order in which they were replayed. compare_streams( {_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}}, diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index 9071ad9d9..e9e2a97ee 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -25,21 +25,47 @@ %% API: -export([ - on_subscribe/3, - on_unsubscribe/3, + on_subscribe/4, + on_unsubscribe/2, gc/1, lookup/2, to_map/1, - fold/3, - fold_all/3 + fold/3 ]). --export_type([]). +-export_type([subscription_state_id/0, subscription/0, subscription_state/0]). + +-include("emqx_persistent_session_ds.hrl"). %%================================================================================ %% Type declarations %%================================================================================ +-type subscription() :: #{ + %% Session-unique identifier of the subscription. Other objects + %% can use it as a compact reference: + id := emqx_persistent_session_ds:subscription_id(), + %% Reference to the current subscription state: + current_state := subscription_state_id(), + %% Time when the subscription was added: + start_time := emqx_ds:time() +}. + +-type subscription_state_id() :: integer(). + +-type subscription_state() :: #{ + parent_subscription := emqx_persistent_session_ds:subscription_id(), + upgrade_qos := boolean(), + %% SubOpts: + subopts := #{ + nl => _, + qos => _, + rap => _, + subid => _, + _ => _ + } +}. + %%================================================================================ %% API functions %%================================================================================ @@ -47,41 +73,88 @@ %% @doc Process a new subscription -spec on_subscribe( emqx_persistent_session_ds:topic_filter(), - emqx_persistent_session_ds:subscription(), + boolean(), + emqx_types:subopts(), emqx_persistent_session_ds_state:t() ) -> - emqx_persistent_session_ds_state:t(). -on_subscribe(TopicFilter, Subscription, S) -> - emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S). + {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}. +on_subscribe(TopicFilter, UpgradeQoS, SubOpts, S0) -> + case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of + undefined -> + %% This is a new subscription: + {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), + {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), + SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, + S3 = emqx_persistent_session_ds_state:put_subscription_state(SStateId, SState, S2), + Subscription = #{ + id => SubId, + current_state => SStateId, + start_time => now_ms() + }, + S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Subscription, S3), + {true, S}; + Sub0 = #{current_state := SStateId0, id := SubId} -> + SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, + case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of + SState -> + %% Client resubscribed with the same parameters: + {false, S0}; + _ -> + %% Subsription parameters changed: + {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0), + S2 = emqx_persistent_session_ds_state:put_subscription_state( + SStateId, SState, S1 + ), + Sub = Sub0#{current_state => SStateId}, + S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2), + {false, S} + end + end. %% @doc Process UNSUBSCRIBE -spec on_unsubscribe( emqx_persistent_session_ds:topic_filter(), - emqx_persistent_session_ds:subscription(), emqx_persistent_session_ds_state:t() ) -> emqx_persistent_session_ds_state:t(). -on_unsubscribe(TopicFilter, Subscription0, S0) -> - %% Note: we cannot delete the subscription immediately, since its - %% metadata can be used during replay (see `process_batch'). We - %% instead mark it as deleted, and let `subscription_gc' function - %% dispatch it later: - Subscription = Subscription0#{deleted => true}, - emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0). +on_unsubscribe(TopicFilter, S0) -> + emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0). -%% @doc Remove subscriptions that have been marked for deletion, and -%% that don't have any unacked messages: +%% @doc Remove subscription states that don't have a parent, and that +%% don't have any unacked messages: -spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). gc(S0) -> - fold_all( - fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> - case Deleted andalso has_no_unacked_streams(SubId, S0) of - true -> - emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); + %% Create a set of subscription states IDs referenced either by a + %% subscription or a stream replay state: + AliveSet0 = emqx_persistent_session_ds_state:fold_subscriptions( + fun(_TopicFilter, #{current_state := SStateId}, Acc) -> + Acc#{SStateId => true} + end, + #{}, + S0 + ), + AliveSet = emqx_persistent_session_ds_state:fold_streams( + fun(_StreamId, SRS = #srs{sub_state_id = SStateId}, Acc) -> + case emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S0) of false -> + Acc#{SStateId => true}; + true -> Acc end end, + AliveSet0, + S0 + ), + %% Delete dangling subscription states: + emqx_persistent_session_ds_state:fold_subscription_states( + fun(SStateId, _, S) -> + case maps:is_key(SStateId, AliveSet) of + true -> + S; + false -> + emqx_persistent_session_ds_state:del_subscription_state(SStateId, S) + end + end, S0, S0 ). @@ -90,12 +163,16 @@ gc(S0) -> -spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds:subscription() | undefined. lookup(TopicFilter, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of - #{deleted := true} -> - undefined; - Sub -> - Sub + case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of + Sub = #{current_state := SStateId} -> + case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of + #{subopts := SubOpts} -> + Sub#{subopts => SubOpts}; + undefined -> + undefined + end; + undefined -> + undefined end. %% @doc Convert active subscriptions to a map, for information @@ -103,7 +180,7 @@ lookup(TopicFilter, S) -> -spec to_map(emqx_persistent_session_ds_state:t()) -> map(). to_map(S) -> fold( - fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, + fun(TopicFilter, _, Acc) -> Acc#{TopicFilter => lookup(TopicFilter, S)} end, #{}, S ). @@ -115,48 +192,12 @@ to_map(S) -> emqx_persistent_session_ds_state:t() ) -> Acc. -fold(Fun, AccIn, S) -> - fold_all( - fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) -> - case Deleted of - true -> Acc; - false -> Fun(TopicFilter, Sub, Acc) - end - end, - AccIn, - S - ). - -%% @doc Fold over all subscriptions, including inactive ones: --spec fold_all( - fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc), - Acc, - emqx_persistent_session_ds_state:t() -) -> - Acc. -fold_all(Fun, AccIn, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, - AccIn, - Subs - ). +fold(Fun, Acc, S) -> + emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S). %%================================================================================ %% Internal functions %%================================================================================ --spec has_no_unacked_streams( - emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t() -) -> boolean(). -has_no_unacked_streams(SubId, S) -> - emqx_persistent_session_ds_state:fold_streams( - fun - ({SID, _Stream}, Srs, Acc) when SID =:= SubId -> - emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc; - (_StreamKey, _Srs, Acc) -> - Acc - end, - true, - S - ). +now_ms() -> + erlang:system_time(millisecond). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 37a86bda6..3892740a6 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -429,6 +429,11 @@ enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> end, enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS). +%% Caution: updating this function _may_ break consistency of replay +%% for persistent sessions. Persistent sessions expect it to return +%% the same result during replay. If it changes the behavior between +%% releases, sessions restored from the cold storage may end up +%% replaying messages with different QoS, etc. enrich_message( ClientInfo = #{clientid := ClientId}, Msg = #message{from = ClientId}, diff --git a/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl index 61e0575a8..375b4f4b1 100644 --- a/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl +++ b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl @@ -74,9 +74,6 @@ session_id() -> topic() -> oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]). -subid() -> - oneof([[]]). - subscription() -> oneof([#{}]). @@ -129,18 +126,25 @@ put_req() -> {Track, Seqno}, {seqno_track(), seqno()}, {#s.seqno, put_seqno, Track, Seqno} + ), + ?LET( + {Topic, Subscription}, + {topic(), subscription()}, + {#s.subs, put_subscription, Topic, Subscription} ) ]). get_req() -> oneof([ {#s.streams, get_stream, stream_id()}, - {#s.seqno, get_seqno, seqno_track()} + {#s.seqno, get_seqno, seqno_track()}, + {#s.subs, get_subscription, topic()} ]). del_req() -> oneof([ - {#s.streams, del_stream, stream_id()} + {#s.streams, del_stream, stream_id()}, + {#s.subs, del_subscription, topic()} ]). command(S) -> @@ -153,13 +157,6 @@ command(S) -> {2, {call, ?MODULE, reopen, [session_id(S)]}}, {2, {call, ?MODULE, commit, [session_id(S)]}}, - %% Subscriptions: - {3, - {call, ?MODULE, put_subscription, [ - session_id(S), topic(), subid(), subscription() - ]}}, - {3, {call, ?MODULE, del_subscription, [session_id(S), topic(), subid()]}}, - %% Metadata: {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}}, {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}}, @@ -170,7 +167,6 @@ command(S) -> {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}}, %% Getters: - {4, {call, ?MODULE, get_subscriptions, [session_id(S)]}}, {1, {call, ?MODULE, iterate_sessions, [batch_size()]}} ]); false -> @@ -207,19 +203,6 @@ postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result) #{session_id => SessionId, key => Key, 'fun' => Fun} ), true; -postcondition(S, {call, ?MODULE, get_subscriptions, [SessionId]}, Result) -> - #{SessionId := #s{subs = Subs}} = S, - ?assertEqual(maps:size(Subs), emqx_topic_gbt:size(Result)), - maps:foreach( - fun({TopicFilter, Id}, Expected) -> - ?assertEqual( - Expected, - emqx_topic_gbt:lookup(TopicFilter, Id, Result, default) - ) - end, - Subs - ), - true; postcondition(_, _, _) -> true. @@ -227,22 +210,6 @@ next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) -> S#{SessionId => #s{}}; next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) -> maps:remove(SessionId, S); -next_state(S, _V, {call, ?MODULE, put_subscription, [SessionId, TopicFilter, SubId, Subscription]}) -> - Key = {TopicFilter, SubId}, - update( - SessionId, - #s.subs, - fun(Subs) -> Subs#{Key => Subscription} end, - S - ); -next_state(S, _V, {call, ?MODULE, del_subscription, [SessionId, TopicFilter, SubId]}) -> - Key = {TopicFilter, SubId}, - update( - SessionId, - #s.subs, - fun(Subs) -> maps:remove(Key, Subs) end, - S - ); next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) -> update( SessionId, @@ -296,19 +263,6 @@ reopen(SessionId) -> {ok, S} = emqx_persistent_session_ds_state:open(SessionId), put_state(SessionId, S). -put_subscription(SessionId, TopicFilter, SubId, Subscription) -> - S = emqx_persistent_session_ds_state:put_subscription( - TopicFilter, SubId, Subscription, get_state(SessionId) - ), - put_state(SessionId, S). - -del_subscription(SessionId, TopicFilter, SubId) -> - S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, SubId, get_state(SessionId)), - put_state(SessionId, S). - -get_subscriptions(SessionId) -> - emqx_persistent_session_ds_state:get_subscriptions(get_state(SessionId)). - put_metadata(SessionId, {_MetaKey, Fun, Value}) -> S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]), put_state(SessionId, S). From 6c83bbe10bdd472e9e820b0c90d1aa709104e7eb Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 7 Apr 2024 16:47:24 +0200 Subject: [PATCH 04/16] feat(mgmt): Filter subscriptions by durability --- .../src/emqx_mgmt_api_subscriptions.erl | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index cb8421211..9976bf881 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -86,7 +86,8 @@ fields(subscription) -> {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>, example => 0})}, {nl, hoconsc:mk(integer(), #{desc => <<"No Local">>, example => 0})}, {rap, hoconsc:mk(integer(), #{desc => <<"Retain as Published">>, example => 0})}, - {rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>, example => 0})} + {rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>, example => 0})}, + {durable, hoconsc:mk(boolean(), #{desc => <<"Durable subscription">>, example => false})} ]. parameters() -> @@ -141,6 +142,14 @@ parameters() -> required => false, desc => <<"Shared subscription group name">> }) + }, + { + durable, + hoconsc:mk(boolean(), #{ + in => query, + required => false, + desc => <<"Filter subscriptions by durability">> + }) } ]. From 6c897c26aedf2843b203181f66e2ec3c94195b01 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 13 Apr 2024 10:23:52 +0200 Subject: [PATCH 05/16] fix(sessds): Commit session on unsubscribe --- apps/emqx/src/emqx_persistent_session_ds.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 0829b3fd3..5cae8487d 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -394,7 +394,8 @@ unsubscribe( undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}; Subscription = #{subopts := SubOpts} -> - S = do_unsubscribe(ID, TopicFilter, Subscription, S0), + S1 = do_unsubscribe(ID, TopicFilter, Subscription, S0), + S = emqx_persistent_session_ds_state:commit(S1), {ok, Session#{s => S}, SubOpts} end. From 113a990482bbddb61d46fd75c4da57914aada27f Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 13 Apr 2024 11:13:17 +0200 Subject: [PATCH 06/16] feat(sessds): Support max subscriptions --- apps/emqx/src/emqx_persistent_session_ds.erl | 9 ++-- .../src/emqx_persistent_session_ds_subs.erl | 41 ++++++++++++------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 5cae8487d..c25e3c813 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -367,10 +367,10 @@ subscribe( subscribe( TopicFilter, SubOpts, - Session = #{id := ID, s := S0, props := #{upgrade_qos := UpgradeQoS}} + Session = #{id := ID} ) -> {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe( - TopicFilter, UpgradeQoS, SubOpts, S0 + TopicFilter, SubOpts, Session ), case UpdateRouter of true -> @@ -379,9 +379,8 @@ subscribe( ok end, S = emqx_persistent_session_ds_state:commit(S1), - ?tp(persistent_session_ds_subscription_added, #{ - topic_filter => TopicFilter, is_new => UpdateRouter - }), + UpdateRouter andalso + ?tp(persistent_session_ds_subscription_added, #{topic_filter => TopicFilter, session => ID}), {ok, Session#{s => S}}. -spec unsubscribe(topic_filter(), session()) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index e9e2a97ee..1993370ed 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -25,7 +25,7 @@ %% API: -export([ - on_subscribe/4, + on_subscribe/3, on_unsubscribe/2, gc/1, lookup/2, @@ -73,26 +73,37 @@ %% @doc Process a new subscription -spec on_subscribe( emqx_persistent_session_ds:topic_filter(), - boolean(), emqx_types:subopts(), - emqx_persistent_session_ds_state:t() + emqx_persistent_session_ds:session() ) -> {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}. -on_subscribe(TopicFilter, UpgradeQoS, SubOpts, S0) -> +on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) -> + #{upgrade_qos := UpgradeQoS, max_subscriptions := MaxSubscriptions} = Props, case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of undefined -> %% This is a new subscription: - {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), - {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), - SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, - S3 = emqx_persistent_session_ds_state:put_subscription_state(SStateId, SState, S2), - Subscription = #{ - id => SubId, - current_state => SStateId, - start_time => now_ms() - }, - S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Subscription, S3), - {true, S}; + case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of + true -> + {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), + {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), + SState = #{ + parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts + }, + S3 = emqx_persistent_session_ds_state:put_subscription_state( + SStateId, SState, S2 + ), + Subscription = #{ + id => SubId, + current_state => SStateId, + start_time => now_ms() + }, + S = emqx_persistent_session_ds_state:put_subscription( + TopicFilter, Subscription, S3 + ), + {true, S}; + false -> + {false, S0} + end; Sub0 = #{current_state := SStateId0, id := SubId} -> SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of From 87ffaf89e5e98c32a83ef6d20b0e8278f0bb6c57 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 13 Apr 2024 12:09:21 +0200 Subject: [PATCH 07/16] refactor(sessds_state): Use macros for map keys --- .../src/emqx_persistent_session_ds_state.erl | 90 ++++++++++--------- 1 file changed, 50 insertions(+), 40 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 90d86bb1d..bad8352c8 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -126,21 +126,31 @@ | ?rec | ?committed(?QOS_2). +-define(id, id). +-define(dirty, dirty). +-define(metadata, metadata). +-define(subscriptions, subscriptions). +-define(subscription_states, subscription_states). +-define(seqnos, seqnos). +-define(streams, streams). +-define(ranks, ranks). +-define(awaiting_rel, awaiting_rel). + -opaque t() :: #{ - id := emqx_persistent_session_ds:id(), - dirty := boolean(), - metadata := metadata(), - subscriptions := pmap( + ?id := emqx_persistent_session_ds:id(), + ?dirty := boolean(), + ?metadata := metadata(), + ?subscriptions := pmap( emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_subs:subscription() ), - subscription_states := pmap( + ?subscription_states := pmap( emqx_persistent_session_ds_subs:subscription_state_id(), emqx_persistent_session_ds_subs:subscription_state() ), - seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), - streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), - ranks := pmap(term(), integer()), - awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer()) + ?seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), + ?streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), + ?ranks := pmap(term(), integer()), + ?awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer()) }. -define(session_tab, emqx_ds_session_tab). @@ -152,12 +162,12 @@ -define(awaiting_rel_tab, emqx_ds_session_awaiting_rel). -define(pmaps, [ - {subscriptions, ?subscription_tab}, - {subscription_states, ?subscription_states_tab}, - {streams, ?stream_tab}, - {seqnos, ?seqno_tab}, - {ranks, ?rank_tab}, - {awaiting_rel, ?awaiting_rel_tab} + {?subscriptions, ?subscription_tab}, + {?subscription_states, ?subscription_states_tab}, + {?streams, ?stream_tab}, + {?seqnos, ?seqno_tab}, + {?ranks, ?rank_tab}, + {?awaiting_rel, ?awaiting_rel_tab} ]). %% Enable this flag if you suspect some code breaks the sequence: @@ -358,15 +368,15 @@ new_id(Rec) -> -spec get_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> emqx_persistent_session_ds_subs:subscription() | undefined. get_subscription(TopicFilter, Rec) -> - gen_get(subscriptions, TopicFilter, Rec). + gen_get(?subscriptions, TopicFilter, Rec). -spec fold_subscriptions(fun(), Acc, t()) -> Acc. fold_subscriptions(Fun, Acc, Rec) -> - gen_fold(subscriptions, Fun, Acc, Rec). + gen_fold(?subscriptions, Fun, Acc, Rec). -spec n_subscriptions(t()) -> non_neg_integer(). n_subscriptions(Rec) -> - gen_size(subscriptions, Rec). + gen_size(?subscriptions, Rec). -spec put_subscription( emqx_persistent_session_ds:topic_filter(), @@ -374,22 +384,22 @@ n_subscriptions(Rec) -> t() ) -> t(). put_subscription(TopicFilter, Subscription, Rec) -> - gen_put(subscriptions, TopicFilter, Subscription, Rec). + gen_put(?subscriptions, TopicFilter, Subscription, Rec). -spec del_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> t(). del_subscription(TopicFilter, Rec) -> - gen_del(subscriptions, TopicFilter, Rec). + gen_del(?subscriptions, TopicFilter, Rec). %% -spec get_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> emqx_persistent_session_ds_subs:subscription_state() | undefined. get_subscription_state(SStateId, Rec) -> - gen_get(subscription_states, SStateId, Rec). + gen_get(?subscription_states, SStateId, Rec). -spec fold_subscription_states(fun(), Acc, t()) -> Acc. fold_subscription_states(Fun, Acc, Rec) -> - gen_fold(subscription_states, Fun, Acc, Rec). + gen_fold(?subscription_states, Fun, Acc, Rec). -spec put_subscription_state( emqx_persistent_session_ds_subs:subscription_state_id(), @@ -397,11 +407,11 @@ fold_subscription_states(Fun, Acc, Rec) -> t() ) -> t(). put_subscription_state(SStateId, SState, Rec) -> - gen_put(subscription_states, SStateId, SState, Rec). + gen_put(?subscription_states, SStateId, SState, Rec). -spec del_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> t(). del_subscription_state(SStateId, Rec) -> - gen_del(subscription_states, SStateId, Rec). + gen_del(?subscription_states, SStateId, Rec). %% @@ -410,33 +420,33 @@ del_subscription_state(SStateId, Rec) -> -spec get_stream(stream_key(), t()) -> emqx_persistent_session_ds:stream_state() | undefined. get_stream(Key, Rec) -> - gen_get(streams, Key, Rec). + gen_get(?streams, Key, Rec). -spec put_stream(stream_key(), emqx_persistent_session_ds:stream_state(), t()) -> t(). put_stream(Key, Val, Rec) -> - gen_put(streams, Key, Val, Rec). + gen_put(?streams, Key, Val, Rec). -spec del_stream(stream_key(), t()) -> t(). del_stream(Key, Rec) -> - gen_del(streams, Key, Rec). + gen_del(?streams, Key, Rec). -spec fold_streams(fun(), Acc, t()) -> Acc. fold_streams(Fun, Acc, Rec) -> - gen_fold(streams, Fun, Acc, Rec). + gen_fold(?streams, Fun, Acc, Rec). -spec n_streams(t()) -> non_neg_integer(). n_streams(Rec) -> - gen_size(streams, Rec). + gen_size(?streams, Rec). %% -spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined. get_seqno(Key, Rec) -> - gen_get(seqnos, Key, Rec). + gen_get(?seqnos, Key, Rec). -spec put_seqno(seqno_type(), emqx_persistent_session_ds:seqno(), t()) -> t(). put_seqno(Key, Val, Rec) -> - gen_put(seqnos, Key, Val, Rec). + gen_put(?seqnos, Key, Val, Rec). %% @@ -444,41 +454,41 @@ put_seqno(Key, Val, Rec) -> -spec get_rank(rank_key(), t()) -> integer() | undefined. get_rank(Key, Rec) -> - gen_get(ranks, Key, Rec). + gen_get(?ranks, Key, Rec). -spec put_rank(rank_key(), integer(), t()) -> t(). put_rank(Key, Val, Rec) -> - gen_put(ranks, Key, Val, Rec). + gen_put(?ranks, Key, Val, Rec). -spec del_rank(rank_key(), t()) -> t(). del_rank(Key, Rec) -> - gen_del(ranks, Key, Rec). + gen_del(?ranks, Key, Rec). -spec fold_ranks(fun(), Acc, t()) -> Acc. fold_ranks(Fun, Acc, Rec) -> - gen_fold(ranks, Fun, Acc, Rec). + gen_fold(?ranks, Fun, Acc, Rec). %% -spec get_awaiting_rel(emqx_types:packet_id(), t()) -> integer() | undefined. get_awaiting_rel(Key, Rec) -> - gen_get(awaiting_rel, Key, Rec). + gen_get(?awaiting_rel, Key, Rec). -spec put_awaiting_rel(emqx_types:packet_id(), _Timestamp :: integer(), t()) -> t(). put_awaiting_rel(Key, Val, Rec) -> - gen_put(awaiting_rel, Key, Val, Rec). + gen_put(?awaiting_rel, Key, Val, Rec). -spec del_awaiting_rel(emqx_types:packet_id(), t()) -> t(). del_awaiting_rel(Key, Rec) -> - gen_del(awaiting_rel, Key, Rec). + gen_del(?awaiting_rel, Key, Rec). -spec fold_awaiting_rel(fun(), Acc, t()) -> Acc. fold_awaiting_rel(Fun, Acc, Rec) -> - gen_fold(awaiting_rel, Fun, Acc, Rec). + gen_fold(?awaiting_rel, Fun, Acc, Rec). -spec n_awaiting_rel(t()) -> non_neg_integer(). n_awaiting_rel(Rec) -> - gen_size(awaiting_rel, Rec). + gen_size(?awaiting_rel, Rec). %% From 93bb8403654ea6cb945f32d9b799bca01adecc02 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 13 Apr 2024 21:38:45 +0200 Subject: [PATCH 08/16] docs(ds): Update README --- apps/emqx_durable_storage/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_durable_storage/README.md b/apps/emqx_durable_storage/README.md index f613085bb..1e87f3907 100644 --- a/apps/emqx_durable_storage/README.md +++ b/apps/emqx_durable_storage/README.md @@ -13,7 +13,7 @@ This makes the storage disk requirements very predictable: only the number of _p DS _backend_ is a callback module that implements `emqx_ds` behavior. -EMQX repository contains the "builtin" backend, implemented in `emqx_ds_replication_layer` module, that uses RocksDB as the main storage. +EMQX repository contains the "builtin" backend, implemented in `emqx_ds_replication_layer` module, that uses Raft algorithm for data replication, and RocksDB as the main storage. Note that builtin backend introduces the concept of **site** to alleviate the problem of changing node names. Site IDs are persistent, and they are randomly generated at the first startup of the node. @@ -95,10 +95,10 @@ Consumption of messages is done in several stages: # Limitation -- Builtin backend currently doesn't replicate data across different sites - There is no local cache of messages, which may result in transferring the same data multiple times # Documentation links + TBD # Usage From 197a4c30bee2b1b76b25e6f4c601963584d8fdd1 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 16 Apr 2024 11:51:53 +0200 Subject: [PATCH 09/16] fix(sessds): Strip unneccessary data from the durable session state --- apps/emqx/src/emqx_persistent_session_ds.erl | 9 +++++++-- apps/emqx_management/src/emqx_mgmt_api_clients.erl | 4 +++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index c25e3c813..d77372864 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -786,7 +786,7 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> maps:get(peername, NewConnInfo), S2 ), S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), - S5 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S4), + S5 = set_clientinfo(ClientInfo, S4), S = emqx_persistent_session_ds_state:commit(S5), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) @@ -833,7 +833,7 @@ session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> ] ), S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4), - S6 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S5), + S6 = set_clientinfo(ClientInfo, S5), S = emqx_persistent_session_ds_state:commit(S6), #{ id => Id, @@ -864,6 +864,11 @@ session_drop(ID, Reason) -> now_ms() -> erlang:system_time(millisecond). +set_clientinfo(ClientInfo0, S) -> + %% Remove unnecessary fields from the clientinfo: + ClientInfo = maps:without([cn, dn, auth_result], ClientInfo0), + emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S). + %%-------------------------------------------------------------------- %% RPC targets (v1) %%-------------------------------------------------------------------- diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 7128f1c3a..4d08854af 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1763,7 +1763,9 @@ format_persistent_session_info(ClientId, PSInfo0) -> connected_at => CreatedAt, ip_address => IpAddress, is_persistent => true, - port => Port + port => Port, + heap_size => 0, + mqueue_len => 0 }, PSInfo = lists:foldl( fun result_format_time_fun/2, From e439a2e0f2513c005e3126c93a36a55b7db10d28 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 16 Apr 2024 12:05:48 +0200 Subject: [PATCH 10/16] fix(sessds): Save protocol name and version in the session metadata --- .../emqx_persistent_session_ds_SUITE.erl | 2 +- apps/emqx/src/emqx_persistent_session_ds.erl | 17 +++++++++++++---- apps/emqx/src/emqx_persistent_session_ds.hrl | 4 +++- .../src/emqx_persistent_session_ds_state.erl | 17 +++++++++++++++-- .../src/emqx_mgmt_api_clients.erl | 5 ++++- 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 39764af30..ab062bff7 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -184,7 +184,7 @@ list_all_pubranges(Node) -> session_open(Node, ClientId) -> ClientInfo = #{}, - ConnInfo = #{peername => {undefined, undefined}}, + ConnInfo = #{peername => {undefined, undefined}, proto_name => <<"MQTT">>, proto_ver => 5}, WillMsg = undefined, erpc:call( Node, diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index d77372864..908e71bb5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -767,7 +767,12 @@ sync(ClientId) -> %% the broker. -spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) -> session() | false. -session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> +session_open( + SessionId, + ClientInfo, + NewConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, + MaybeWillMsg +) -> NowMS = now_ms(), case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> @@ -787,7 +792,8 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> ), S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), S5 = set_clientinfo(ClientInfo, S4), - S = emqx_persistent_session_ds_state:commit(S5), + S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5), + S = emqx_persistent_session_ds_state:commit(S6), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) ), @@ -810,7 +816,9 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> emqx_session:conf() ) -> session(). -session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> +session_ensure_new( + Id, ClientInfo, ConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, MaybeWillMsg, Conf +) -> ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), Now = now_ms(), S0 = emqx_persistent_session_ds_state:create_new(Id), @@ -834,7 +842,8 @@ session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> ), S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4), S6 = set_clientinfo(ClientInfo, S5), - S = emqx_persistent_session_ds_state:commit(S6), + S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6), + S = emqx_persistent_session_ds_state:commit(S7), #{ id => Id, props => Conf, diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index e2b52e36d..79920629a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -74,10 +74,12 @@ -define(created_at, created_at). -define(last_alive_at, last_alive_at). -define(expiry_interval, expiry_interval). -%% Unique integer used to create unique identities +%% Unique integer used to create unique identities: -define(last_id, last_id). +%% Connection info (relevent for the dashboard): -define(peername, peername). -define(will_message, will_message). -define(clientinfo, clientinfo). +-define(protocol, protocol). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index bad8352c8..bc603647a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -33,6 +33,7 @@ -export([get_clientinfo/1, set_clientinfo/2]). -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). -export([get_peername/1, set_peername/2]). +-export([get_protocol/1, set_protocol/2]). -export([new_id/1]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]). -export([get_seqno/2, put_seqno/3]). @@ -66,7 +67,8 @@ seqno_type/0, stream_key/0, rank_key/0, - session_iterator/0 + session_iterator/0, + protocol/0 ]). -include("emqx_mqtt.hrl"). @@ -108,13 +110,16 @@ dirty :: #{K => dirty | del} }. +-type protocol() :: {binary(), emqx_types:proto_ver()}. + -type metadata() :: #{ ?created_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(), ?expiry_interval => non_neg_integer(), ?last_id => integer(), - ?peername => emqx_types:peername() + ?peername => emqx_types:peername(), + ?protocol => protocol() }. -type seqno_type() :: @@ -321,6 +326,14 @@ get_peername(Rec) -> set_peername(Val, Rec) -> set_meta(?peername, Val, Rec). +-spec get_protocol(t()) -> protocol() | undefined. +get_protocol(Rec) -> + get_meta(?protocol, Rec). + +-spec set_protocol(protocol(), t()) -> t(). +set_protocol(Val, Rec) -> + set_meta(?protocol, Val, Rec). + -spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()). get_clientinfo(Rec) -> get_meta(?clientinfo, Rec). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 4d08854af..301d4e47e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1747,6 +1747,7 @@ format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) -> format_persistent_session_info(ClientId, PSInfo0) -> Metadata = maps:get(metadata, PSInfo0, #{}), + {ProtoName, ProtoVer} = maps:get(protocol, Metadata), PSInfo1 = maps:with([created_at, expiry_interval], Metadata), CreatedAt = maps:get(created_at, PSInfo1), case Metadata of @@ -1765,7 +1766,9 @@ format_persistent_session_info(ClientId, PSInfo0) -> is_persistent => true, port => Port, heap_size => 0, - mqueue_len => 0 + mqueue_len => 0, + proto_name => ProtoName, + proto_ver => ProtoVer }, PSInfo = lists:foldl( fun result_format_time_fun/2, From 38a2e8add945b85eb7db0d9cff7e06c49306e3f0 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 16 Apr 2024 14:55:48 +0200 Subject: [PATCH 11/16] fix(sessds): Return the number of subscriptions for offline sessions --- apps/emqx_management/src/emqx_mgmt_api_clients.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 301d4e47e..38320780d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1768,7 +1768,8 @@ format_persistent_session_info(ClientId, PSInfo0) -> heap_size => 0, mqueue_len => 0, proto_name => ProtoName, - proto_ver => ProtoVer + proto_ver => ProtoVer, + subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo0, #{})) }, PSInfo = lists:foldl( fun result_format_time_fun/2, From 124c5047d07c01576507cd49ddfe1304a440b04e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 16 Apr 2024 23:38:01 +0200 Subject: [PATCH 12/16] feat(sessds): Add API for getting session data from the cold storage --- apps/emqx/src/emqx_persistent_session_ds.erl | 8 +++++- .../src/emqx_persistent_session_ds_state.erl | 25 +++++++++++++++++++ .../src/emqx_persistent_session_ds_subs.erl | 22 ++++++++++++++++ 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 908e71bb5..b8c853431 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -75,7 +75,8 @@ %% Managment APIs: -export([ - list_client_subscriptions/1 + list_client_subscriptions/1, + get_client_subscription/2 ]). %% session table operations @@ -736,6 +737,11 @@ list_client_subscriptions(ClientId) -> {error, not_found} end. +-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) -> + subscription() | undefined. +get_client_subscription(ClientId, Topic) -> + emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic). + %%-------------------------------------------------------------------- %% Session tables operations %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index bc603647a..9efffc7ff 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -22,6 +22,9 @@ %% It is responsible for saving, caching, and restoring session state. %% It is completely devoid of business logic. Not even the default %% values should be set in this module. +%% +%% Session process MUST NOT use `cold_*' functions! They are reserved +%% for use in the management APIs. -module(emqx_persistent_session_ds_state). -export([create_tables/0]). @@ -40,12 +43,14 @@ -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). -export([ get_subscription_state/2, + cold_get_subscription_state/2, fold_subscription_states/3, put_subscription_state/3, del_subscription_state/2 ]). -export([ get_subscription/2, + cold_get_subscription/2, fold_subscriptions/3, n_subscriptions/1, put_subscription/3, @@ -383,6 +388,11 @@ new_id(Rec) -> get_subscription(TopicFilter, Rec) -> gen_get(?subscriptions, TopicFilter, Rec). +-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> + [emqx_persistent_session_ds_subs:subscription()]. +cold_get_subscription(SessionId, Topic) -> + kv_pmap_read(?subscription_tab, SessionId, Topic). + -spec fold_subscriptions(fun(), Acc, t()) -> Acc. fold_subscriptions(Fun, Acc, Rec) -> gen_fold(?subscriptions, Fun, Acc, Rec). @@ -410,6 +420,13 @@ del_subscription(TopicFilter, Rec) -> get_subscription_state(SStateId, Rec) -> gen_get(?subscription_states, SStateId, Rec). +-spec cold_get_subscription_state( + emqx_persistent_session_ds:id(), emqx_persistent_session_ds_subs:subscription_state_id() +) -> + [emqx_persistent_session_ds_subs:subscription_state()]. +cold_get_subscription_state(SessionId, SStateId) -> + kv_pmap_read(?subscription_states_tab, SessionId, SStateId). + -spec fold_subscription_states(fun(), Acc, t()) -> Acc. fold_subscription_states(Fun, Acc, Rec) -> gen_fold(?subscription_states, Fun, Acc, Rec). @@ -675,6 +692,14 @@ kv_pmap_persist(Tab, SessionId, Key, Val0) -> Val = encoder(encode, Tab, Val0), mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write). +kv_pmap_read(Table, SessionId, Key) -> + lists:map( + fun(#kv{v = Val}) -> + encoder(decode, Table, Val) + end, + mnesia:dirty_read(Table, {SessionId, Key}) + ). + kv_pmap_restore(Table, SessionId) -> MS = [{#kv{k = {SessionId, '$1'}, v = '$2'}, [], [{{'$1', '$2'}}]}], Objs = mnesia:select(Table, MS, read), diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index 1993370ed..99ad9f9fc 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -33,6 +33,11 @@ fold/3 ]). +%% Management API: +-export([ + cold_get_subscription/2 +]). + -export_type([subscription_state_id/0, subscription/0, subscription_state/0]). -include("emqx_persistent_session_ds.hrl"). @@ -206,6 +211,23 @@ to_map(S) -> fold(Fun, Acc, S) -> emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S). +-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> + emqx_persistent_session_ds:subscription() | undefined. +cold_get_subscription(SessionId, Topic) -> + case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, Topic) of + [Sub = #{current_state := SStateId}] -> + case + emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId) + of + [#{subopts := Subopts}] -> + Sub#{subopts => Subopts}; + _ -> + undefined + end; + _ -> + undefined + end. + %%================================================================================ %% Internal functions %%================================================================================ From 180130d684af17cdf87e097a48cd3f983b0f7d36 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 16 Apr 2024 16:46:31 +0200 Subject: [PATCH 13/16] feat(sessds): List persistent subscriptions in the REST API --- .../src/emqx_mgmt_api_subscriptions.erl | 210 +++++++++++++++++- .../test/emqx_mgmt_api_subscription_SUITE.erl | 91 ++++++-- changes/ce/fix-12874.en.md | 7 + 3 files changed, 285 insertions(+), 23 deletions(-) create mode 100644 changes/ce/fix-12874.en.md diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 9976bf881..b1a8fbce2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -176,7 +176,8 @@ format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> #{ topic => emqx_topic:maybe_format_share(Topic), clientid => maps:get(subid, SubOpts, null), - node => WhichNode + node => WhichNode, + durable => false }, maps:with([qos, nl, rap, rh], SubOpts) ). @@ -196,7 +197,22 @@ check_match_topic(#{<<"match_topic">> := MatchTopic}) -> check_match_topic(_) -> ok. -do_subscriptions_query(QString) -> +do_subscriptions_query(QString0) -> + {IsDurable, QString} = maps:take( + <<"durable">>, maps:merge(#{<<"durable">> => undefined}, QString0) + ), + case emqx_persistent_message:is_persistence_enabled() andalso IsDurable of + false -> + do_subscriptions_query_mem(QString); + true -> + do_subscriptions_query_persistent(QString); + undefined -> + merge_queries( + QString, fun do_subscriptions_query_mem/1, fun do_subscriptions_query_persistent/1 + ) + end. + +do_subscriptions_query_mem(QString) -> Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2], case maps:get(<<"node">>, QString, undefined) of undefined -> @@ -210,8 +226,196 @@ do_subscriptions_query(QString) -> end end. +do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} = QString) -> + Count = emqx_persistent_session_ds_router:stats(n_routes), + %% TODO: filtering by client ID can be implemented more efficiently: + FilterTopic = maps:get(<<"topic">>, QString, '_'), + Stream0 = emqx_persistent_session_ds_router:stream(FilterTopic), + SubPred = fun(Sub) -> + compare_optional(<<"topic">>, QString, topic, Sub) andalso + compare_optional(<<"clientid">>, QString, clientid, Sub) andalso + compare_optional(<<"qos">>, QString, qos, Sub) andalso + compare_match_topic_optional(<<"match_topic">>, QString, topic, Sub) + end, + NDropped = (Page - 1) * Limit, + {_, Stream} = consume_n_matching( + fun persistent_route_to_subscription/1, SubPred, NDropped, Stream0 + ), + {Subscriptions, Stream1} = consume_n_matching( + fun persistent_route_to_subscription/1, SubPred, Limit, Stream + ), + HasNext = Stream1 =/= [], + Meta = + case maps:is_key(<<"match_topic">>, QString) orelse maps:is_key(<<"qos">>, QString) of + true -> + %% Fuzzy searches shouldn't return count: + #{ + limit => Limit, + page => Page, + hasnext => HasNext + }; + false -> + #{ + count => Count, + limit => Limit, + page => Page, + hasnext => HasNext + } + end, + + #{ + meta => Meta, + data => Subscriptions + }. + +compare_optional(QField, Query, SField, Subscription) -> + case Query of + #{QField := Expected} -> + maps:get(SField, Subscription) =:= Expected; + _ -> + true + end. + +compare_match_topic_optional(QField, Query, SField, Subscription) -> + case Query of + #{QField := TopicFilter} -> + Topic = maps:get(SField, Subscription), + emqx_topic:match(Topic, TopicFilter); + _ -> + true + end. + +%% @doc Drop elements from the stream until encountered N elements +%% matching the predicate function. +-spec consume_n_matching( + fun((T) -> Q), + fun((Q) -> boolean()), + non_neg_integer(), + emqx_utils_stream:stream(T) +) -> {[Q], emqx_utils_stream:stream(T) | empty}. +consume_n_matching(Map, Pred, N, S) -> + consume_n_matching(Map, Pred, N, S, []). + +consume_n_matching(_Map, _Pred, _N, [], Acc) -> + {lists:reverse(Acc), []}; +consume_n_matching(_Map, _Pred, 0, S, Acc) -> + {lists:reverse(Acc), S}; +consume_n_matching(Map, Pred, N, S0, Acc) -> + case emqx_utils_stream:next(S0) of + [] -> + consume_n_matching(Map, Pred, N, [], Acc); + [Elem | S] -> + Mapped = Map(Elem), + case Pred(Mapped) of + true -> consume_n_matching(Map, Pred, N - 1, S, [Mapped | Acc]); + false -> consume_n_matching(Map, Pred, N, S, Acc) + end + end. + +persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) -> + case emqx_persistent_session_ds:get_client_subscription(SessionId, Topic) of + #{subopts := SubOpts} -> + #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts, + #{ + topic => Topic, + clientid => SessionId, + node => all, + + qos => Qos, + nl => Nl, + rh => Rh, + rap => Rap, + durable => true + }; + undefined -> + #{ + topic => Topic, + clientid => SessionId, + node => all, + durable => true + } + end. + +%% @private This function merges paginated results from two sources. +%% +%% Note: this implementation is far from ideal: `count' for the +%% queries may be missing, it may be larger than the actual number of +%% elements. This may lead to empty pages that can confuse the user. +%% +%% Not much can be done to mitigate that, though: since the count may +%% be incorrect, we cannot run simple math to determine when one +%% stream begins and another ends: it requires actual iteration. +%% +%% Ideally, the dashboard must be split between durable and mem +%% subscriptions, and this function should be removed for good. +merge_queries(QString0, Q1, Q2) -> + #{<<"limit">> := Limit, <<"page">> := Page} = QString0, + C1 = resp_count(QString0, Q1), + C2 = resp_count(QString0, Q2), + Meta = + case is_number(C1) andalso is_number(C2) of + true -> + #{ + count => C1 + C2, + limit => Limit, + page => Page + }; + false -> + #{ + limit => Limit, + page => Page + } + end, + case {C1, C2} of + {_, 0} -> + %% The second query is empty. Just return the result of Q1 as usual: + Q1(QString0); + {0, _} -> + %% The first query is empty. Just return the result of Q2 as usual: + Q2(QString0); + _ when is_number(C1) -> + %% Both queries are potentially non-empty, but we at least + %% have the page number for the first query. We try to + %% stich the pages together and thus respect the limit + %% (except for the page where the results switch from Q1 + %% to Q2). + + %% Page where data from the second query is estimated to + %% begin: + Q2Page = ceil(C1 / Limit), + case Page =< Q2Page of + true -> + #{data := Data, meta := #{hasnext := HN}} = Q1(QString0), + #{ + data => Data, + meta => Meta#{hasnext => HN orelse C2 > 0} + }; + false -> + QString = QString0#{<<"page">> => Page - Q2Page}, + #{data := Data, meta := #{hasnext := HN}} = Q2(QString), + #{data => Data, meta => Meta#{hasnext => HN}} + end; + _ -> + %% We don't know how many items is there in the first + %% query, and the second query is not empty (this includes + %% the case where `C2' is `undefined'). Best we can do is + %% to interleave the queries. This may produce less + %% results per page than `Limit'. + QString = QString0#{<<"limit">> => ceil(Limit / 2)}, + #{data := D1, meta := #{hasnext := HN1}} = Q1(QString), + #{data := D2, meta := #{hasnext := HN2}} = Q2(QString), + #{ + meta => Meta#{hasnext => HN1 or HN2}, + data => D1 ++ D2 + } + end. + +resp_count(Query, QFun) -> + #{meta := Meta} = QFun(Query#{<<"limit">> => 1, <<"page">> => 1}), + maps:get(count, Meta, undefined). + %%-------------------------------------------------------------------- -%% QueryString to MatchSpec +%% QueryString to MatchSpec (mem sessions) %%-------------------------------------------------------------------- -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index 356ae97e4..435a837e3 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -36,17 +36,72 @@ -define(TOPIC_SORT, #{?TOPIC1 => 1, ?TOPIC2 => 2}). all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, mem}, + {group, persistent} + ]. + +groups() -> + CommonTCs = emqx_common_test_helpers:all(?MODULE), + [ + {mem, CommonTCs}, + %% Shared subscriptions are currently not supported: + {persistent, CommonTCs -- [t_list_with_shared_sub, t_subscription_api]} + ]. init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite(), + Apps = emqx_cth_suite:start( + [ + {emqx, + "session_persistence {\n" + " enable = true\n" + " renew_streams_interval = 10ms\n" + "}"}, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). + +init_per_group(persistent, Config) -> + ClientConfig = #{ + username => ?USERNAME, + clientid => ?CLIENTID, + proto_ver => v5, + clean_start => true, + properties => #{'Session-Expiry-Interval' => 300} + }, + [{client_config, ClientConfig}, {durable, true} | Config]; +init_per_group(mem, Config) -> + ClientConfig = #{ + username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5, clean_start => true + }, + [{client_config, ClientConfig}, {durable, false} | Config]. + +end_per_group(_, Config) -> Config. -end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite(). +init_per_testcase(_TC, Config) -> + case ?config(client_config, Config) of + ClientConfig when is_map(ClientConfig) -> + {ok, Client} = emqtt:start_link(ClientConfig), + {ok, _} = emqtt:connect(Client), + [{client, Client} | Config]; + _ -> + Config + end. + +end_per_testcase(_TC, Config) -> + Client = proplists:get_value(client, Config), + emqtt:disconnect(Client). t_subscription_api(Config) -> Client = proplists:get_value(client, Config), + Durable = atom_to_list(?config(durable, Config)), {ok, _, _} = emqtt:subscribe( Client, [ {?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]} @@ -54,12 +109,13 @@ t_subscription_api(Config) -> ), {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2), Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]), + timer:sleep(100), {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path), Data = emqx_utils_json:decode(Response, [return_maps]), Meta = maps:get(<<"meta">>, Data), ?assertEqual(1, maps:get(<<"page">>, Meta)), ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)), - ?assertEqual(2, maps:get(<<"count">>, Meta)), + ?assertEqual(2, maps:get(<<"count">>, Meta), Data), Subscriptions = maps:get(<<"data">>, Data), ?assertEqual(length(Subscriptions), 2), Sort = @@ -90,7 +146,8 @@ t_subscription_api(Config) -> {"node", atom_to_list(node())}, {"qos", "0"}, {"share_group", "test_group"}, - {"match_topic", "t/#"} + {"match_topic", "t/#"}, + {"durable", Durable} ], Headers = emqx_mgmt_api_test_util:auth_header_(), @@ -103,6 +160,7 @@ t_subscription_api(Config) -> t_subscription_fuzzy_search(Config) -> Client = proplists:get_value(client, Config), + Durable = atom_to_list(?config(durable, Config)), Topics = [ <<"t/foo">>, <<"t/foo/bar">>, @@ -116,7 +174,8 @@ t_subscription_fuzzy_search(Config) -> MatchQs = [ {"clientid", ?CLIENTID}, {"node", atom_to_list(node())}, - {"match_topic", "t/#"} + {"match_topic", "t/#"}, + {"durable", Durable} ], MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers), @@ -130,12 +189,13 @@ t_subscription_fuzzy_search(Config) -> LimitMatchQuery = [ {"clientid", ?CLIENTID}, {"match_topic", "+/+/+"}, - {"limit", "3"} + {"limit", "3"}, + {"durable", Durable} ], MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers), ?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2), - ?assertEqual(3, length(maps:get(<<"data">>, MatchData2))), + ?assertEqual(3, length(maps:get(<<"data">>, MatchData2)), MatchData2), MatchData2P2 = #{<<"meta">> := MatchMeta2P2} = @@ -176,8 +236,8 @@ t_list_with_shared_sub(_Config) -> ok. -t_list_with_invalid_match_topic(_Config) -> - Client = proplists:get_value(client, _Config), +t_list_with_invalid_match_topic(Config) -> + Client = proplists:get_value(client, Config), RealTopic = <<"t/+">>, Topic = <<"$share/g1/", RealTopic/binary>>, @@ -212,12 +272,3 @@ request_json(Method, Query, Headers) when is_list(Query) -> path() -> emqx_mgmt_api_test_util:api_path(["subscriptions"]). - -init_per_testcase(_TC, Config) -> - {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}), - {ok, _} = emqtt:connect(Client), - [{client, Client} | Config]. - -end_per_testcase(_TC, Config) -> - Client = proplists:get_value(client, Config), - emqtt:disconnect(Client). diff --git a/changes/ce/fix-12874.en.md b/changes/ce/fix-12874.en.md new file mode 100644 index 000000000..1a5814b07 --- /dev/null +++ b/changes/ce/fix-12874.en.md @@ -0,0 +1,7 @@ +- Ensure consistency of the durable message replay when the subscriptions are modified before session reconnects + +- Persistent sessions save inflight packet IDs for the received QoS2 messages + +- Make behavior of the persistent sessions consistent with the non-persistent sessions in regard to overlapping subscriptions + +- List persistent subscriptions in the REST API From d12966db5b51c278d8ae5c6fe699c80230362283 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 17 Apr 2024 17:00:06 +0200 Subject: [PATCH 14/16] test: Avoid dumping raw snabbkaffe traces to the console --- .../integration_test/emqx_persistent_session_ds_SUITE.erl | 7 +------ .../test/emqx_bridge_cassandra_SUITE.erl | 1 - .../test/emqx_bridge_gcp_pubsub_producer_SUITE.erl | 1 - apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl | 1 - apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl | 1 - apps/emqx_resource/test/emqx_resource_SUITE.erl | 2 -- 6 files changed, 1 insertion(+), 12 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index ab062bff7..a5260f780 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -252,7 +252,6 @@ t_session_subscription_idempotency(Config) -> ok end, fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), Session = session_open(Node1, ClientId), ?assertMatch( #{SubTopicFilter := #{}}, @@ -326,7 +325,6 @@ t_session_unsubscription_idempotency(Config) -> ok end, fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), Session = session_open(Node1, ClientId), ?assertEqual( #{}, @@ -415,10 +413,7 @@ do_t_session_discard(Params) -> ok end, - fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), - ok - end + [] ), ok. diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 868d0191e..449d1fa51 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -581,7 +581,6 @@ t_write_failure(Config) -> ) end), fun(Trace0) -> - ct:pal("trace: ~p", [Trace0]), Trace = ?of_kind( [buffer_worker_flush_nack, buffer_worker_retry_inflight_failed], Trace0 ), diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index 6666a3fd0..d96157f8c 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -1929,7 +1929,6 @@ t_bad_attributes(Config) -> ok end, fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), ?assertMatch( [ #{placeholder := [<<"payload">>, <<"ok">>], value := #{}}, diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index 8b719de9a..9ad2fbc5a 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -517,7 +517,6 @@ t_write_failure(Config) -> ok end, fun(Trace0) -> - ct:pal("trace: ~p", [Trace0]), Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([#{result := {error, _}} | _], Trace), [#{result := {error, Error}} | _] = Trace, diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 3e9428c88..f4917f387 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -520,7 +520,6 @@ t_write_failure(Config) -> ) end), fun(Trace0) -> - ct:pal("trace: ~p", [Trace0]), Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([#{result := {error, _}} | _], Trace), [#{result := {error, Error}} | _] = Trace, diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 99e85424d..171baf4ad 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -3346,7 +3346,6 @@ wait_n_events(NEvents, Timeout, EventName) -> end. assert_sync_retry_fail_then_succeed_inflight(Trace) -> - ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( #{?snk_kind := buffer_worker_flush_nack, ref := _Ref}, @@ -3366,7 +3365,6 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) -> ok. assert_async_retry_fail_then_succeed_inflight(Trace) -> - ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( #{?snk_kind := handle_async_reply, action := nack}, From f1e6565ddd03b61b19dc21eaecf8d0b0971388b7 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 19 Apr 2024 00:10:30 +0200 Subject: [PATCH 15/16] refactor(sessds): Move all subscription logic to the subs module --- apps/emqx/src/emqx_persistent_session_ds.erl | 69 ++++++------------- .../src/emqx_persistent_session_ds_subs.erl | 55 ++++++++++++--- 2 files changed, 66 insertions(+), 58 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b8c853431..20c382934 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -368,52 +368,31 @@ subscribe( subscribe( TopicFilter, SubOpts, - Session = #{id := ID} + Session ) -> - {UpdateRouter, S1} = emqx_persistent_session_ds_subs:on_subscribe( - TopicFilter, SubOpts, Session - ), - case UpdateRouter of - true -> - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID); - false -> - ok - end, - S = emqx_persistent_session_ds_state:commit(S1), - UpdateRouter andalso - ?tp(persistent_session_ds_subscription_added, #{topic_filter => TopicFilter, session => ID}), - {ok, Session#{s => S}}. + case emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, SubOpts, Session) of + {ok, S1} -> + S = emqx_persistent_session_ds_state:commit(S1), + {ok, Session#{s => S}}; + Error = {error, _} -> + Error + end. -spec unsubscribe(topic_filter(), session()) -> {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. unsubscribe( TopicFilter, - Session = #{id := ID, s := S0} + Session = #{id := SessionId, s := S0} ) -> - case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of - undefined -> - {error, ?RC_NO_SUBSCRIPTION_EXISTED}; - Subscription = #{subopts := SubOpts} -> - S1 = do_unsubscribe(ID, TopicFilter, Subscription, S0), - S = emqx_persistent_session_ds_state:commit(S1), - {ok, Session#{s => S}, SubOpts} + case emqx_persistent_session_ds_subs:on_unsubscribe(SessionId, TopicFilter, S0) of + {ok, S1, #{id := SubId, subopts := SubOpts}} -> + S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), + S = emqx_persistent_session_ds_state:commit(S2), + {ok, Session#{s => S}, SubOpts}; + Error = {error, _} -> + Error end. --spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> - emqx_persistent_session_ds_state:t(). -do_unsubscribe(SessionId, TopicFilter, #{id := SubId}, S0) -> - S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, S0), - ?tp(persistent_session_ds_subscription_delete, #{ - session_id => SessionId, topic_filter => TopicFilter - }), - S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), - ?tp_span( - persistent_session_ds_subscription_route_delete, - #{session_id => SessionId, topic_filter => TopicFilter}, - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId) - ), - S. - -spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. get_subscription(#share{}, _) -> @@ -860,18 +839,12 @@ session_ensure_new( %% @doc Called when a client reconnects with `clean session=true' or %% during session GC -spec session_drop(id(), _Reason) -> ok. -session_drop(ID, Reason) -> - case emqx_persistent_session_ds_state:open(ID) of +session_drop(SessionId, Reason) -> + case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> - ?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}), - _S = emqx_persistent_session_ds_subs:fold( - fun(TopicFilter, Subscription, S) -> - do_unsubscribe(ID, TopicFilter, Subscription, S) - end, - S0, - S0 - ), - emqx_persistent_session_ds_state:delete(ID); + ?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}), + emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0), + emqx_persistent_session_ds_state:delete(SessionId); undefined -> ok end. diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index 99ad9f9fc..8b4f70a69 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -26,7 +26,8 @@ %% API: -export([ on_subscribe/3, - on_unsubscribe/2, + on_unsubscribe/3, + on_session_drop/2, gc/1, lookup/2, to_map/1, @@ -41,6 +42,8 @@ -export_type([subscription_state_id/0, subscription/0, subscription_state/0]). -include("emqx_persistent_session_ds.hrl"). +-include("emqx_mqtt.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ %% Type declarations @@ -81,14 +84,15 @@ emqx_types:subopts(), emqx_persistent_session_ds:session() ) -> - {_UpdateRouter :: boolean(), emqx_persistent_session_ds_state:t()}. -on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) -> + {ok, emqx_persistent_session_ds_state:t()} | {error, ?RC_QUOTA_EXCEEDED}. +on_subscribe(TopicFilter, SubOpts, #{id := SessionId, s := S0, props := Props}) -> #{upgrade_qos := UpgradeQoS, max_subscriptions := MaxSubscriptions} = Props, case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of undefined -> %% This is a new subscription: case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of true -> + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), SState = #{ @@ -105,16 +109,19 @@ on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) -> S = emqx_persistent_session_ds_state:put_subscription( TopicFilter, Subscription, S3 ), - {true, S}; + ?tp(persistent_session_ds_subscription_added, #{ + topic_filter => TopicFilter, session => SessionId + }), + {ok, S}; false -> - {false, S0} + {error, ?RC_QUOTA_EXCEEDED} end; Sub0 = #{current_state := SStateId0, id := SubId} -> SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of SState -> %% Client resubscribed with the same parameters: - {false, S0}; + {ok, S0}; _ -> %% Subsription parameters changed: {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0), @@ -123,18 +130,46 @@ on_subscribe(TopicFilter, SubOpts, #{s := S0, props := Props}) -> ), Sub = Sub0#{current_state => SStateId}, S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2), - {false, S} + {ok, S} end end. %% @doc Process UNSUBSCRIBE -spec on_unsubscribe( + emqx_persistent_session_ds:id(), emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t() ) -> - emqx_persistent_session_ds_state:t(). -on_unsubscribe(TopicFilter, S0) -> - emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0). + {ok, emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()} + | {error, ?RC_NO_SUBSCRIPTION_EXISTED}. +on_unsubscribe(SessionId, TopicFilter, S0) -> + case lookup(TopicFilter, S0) of + undefined -> + {error, ?RC_NO_SUBSCRIPTION_EXISTED}; + Subscription -> + ?tp(persistent_session_ds_subscription_delete, #{ + session_id => SessionId, topic_filter => TopicFilter + }), + ?tp_span( + persistent_session_ds_subscription_route_delete, + #{session_id => SessionId, topic_filter => TopicFilter}, + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId) + ), + {ok, emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), Subscription} + end. + +-spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok. +on_session_drop(SessionId, S0) -> + fold( + fun(TopicFilter, _Subscription, S) -> + case on_unsubscribe(SessionId, TopicFilter, S) of + {ok, S1, _} -> S1; + _ -> S + end + end, + S0, + S0 + ). %% @doc Remove subscription states that don't have a parent, and that %% don't have any unacked messages: From ede72468827ad3eb1f1086b1d923291add4c156b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 19 Apr 2024 13:39:04 +0200 Subject: [PATCH 16/16] fix(sessds): Avoid double-enriching transient messages --- apps/emqx/src/emqx_persistent_session_ds.erl | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 20c382934..4bfefe5b6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -1066,7 +1066,9 @@ process_batch( %% Transient messages %%-------------------------------------------------------------------- -enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) -> +enqueue_transient( + _ClientInfo, Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0} +) -> %% TODO: Such messages won't be retransmitted, should the session %% reconnect before transient messages are acked. %% @@ -1076,16 +1078,6 @@ enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos : %% queued messages. Since streams in this DB are exclusive to the %% session, messages from the queue can be dropped as soon as they %% are acked. - case emqx_persistent_session_ds_state:get_subscription(Msg0#message.topic, S) of - #{current_state := CS} -> - #{subopts := SubOpts} = emqx_persistent_session_ds_state:get_subscription_state(CS, S); - undefined -> - SubOpts = undefined - end, - Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS), - lists:foldl(fun do_enqueue_transient/2, Session, Msgs). - -do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) -> case Qos of ?QOS_0 -> S = S0,