diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 39764af30..a5260f780 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -184,7 +184,7 @@ list_all_pubranges(Node) -> session_open(Node, ClientId) -> ClientInfo = #{}, - ConnInfo = #{peername => {undefined, undefined}}, + ConnInfo = #{peername => {undefined, undefined}, proto_name => <<"MQTT">>, proto_ver => 5}, WillMsg = undefined, erpc:call( Node, @@ -252,7 +252,6 @@ t_session_subscription_idempotency(Config) -> ok end, fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), Session = session_open(Node1, ClientId), ?assertMatch( #{SubTopicFilter := #{}}, @@ -326,7 +325,6 @@ t_session_unsubscription_idempotency(Config) -> ok end, fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), Session = session_open(Node1, ClientId), ?assertEqual( #{}, @@ -415,10 +413,7 @@ do_t_session_discard(Params) -> ok end, - fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), - ok - end + [] ), ok. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 83ed5d465..4bfefe5b6 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -75,7 +75,8 @@ %% Managment APIs: -export([ - list_client_subscriptions/1 + list_client_subscriptions/1, + get_client_subscription/2 ]). %% session table operations @@ -116,15 +117,42 @@ %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). -type id() :: binary(). --type topic_filter() :: emqx_types:topic(). +-type topic_filter() :: emqx_types:topic() | #share{}. + +%% Subscription and subscription states: +%% +%% Persistent sessions cannot simply update or delete subscriptions, +%% since subscription parameters must be exactly the same during +%% replay. +%% +%% To solve this problem, we store subscriptions in a twofold manner: +%% +%% - `subscription' is an object that holds up-to-date information +%% about the client's subscription and a reference to the latest +%% subscription state id +%% +%% - `subscription_state' is an immutable object that holds +%% information about the subcription parameters at a certain point of +%% time +%% +%% New subscription states are created whenever the client subscribes +%% to a topics, or updates an existing subscription. +%% +%% Stream replay states contain references to the subscription states. +%% +%% Outdated subscription states are discarded when they are not +%% referenced by either subscription or stream replay state objects. -type subscription_id() :: integer(). +%% This type is a result of merging +%% `emqx_persistent_session_ds_subs:subscription()' with its current +%% state. -type subscription() :: #{ id := subscription_id(), start_time := emqx_ds:time(), - props := map(), - deleted := boolean() + current_state := emqx_persistent_session_ds_subs:subscription_state_id(), + subopts := map() }. -define(TIMER_PULL, timer_pull). @@ -184,7 +212,9 @@ seqno_q2_dup, seqno_q2_rec, seqno_q2_next, - n_streams + n_streams, + awaiting_rel_cnt, + awaiting_rel_max ]). %% @@ -206,7 +236,8 @@ open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> ok = emqx_cm:takeover_kick(ClientID), case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of Session0 = #{} -> - Session = Session0#{props => Conf}, + Session1 = Session0#{props => Conf}, + Session = do_expire(ClientInfo, Session1), {true, ensure_timers(Session), []}; false -> false @@ -249,7 +280,7 @@ info(is_persistent, #{}) -> info(subscriptions, #{s := S}) -> emqx_persistent_session_ds_subs:to_map(S); info(subscriptions_cnt, #{s := S}) -> - emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S)); + emqx_persistent_session_ds_state:n_subscriptions(S); info(subscriptions_max, #{props := Conf}) -> maps:get(max_subscriptions, Conf); info(upgrade_qos, #{props := Conf}) -> @@ -262,21 +293,21 @@ info(inflight_max, #{inflight := Inflight}) -> emqx_persistent_session_ds_inflight:receive_maximum(Inflight); info(retry_interval, #{props := Conf}) -> maps:get(retry_interval, Conf); -% info(mqueue, #sessmem{mqueue = MQueue}) -> -% MQueue; info(mqueue_len, #{inflight := Inflight}) -> emqx_persistent_session_ds_inflight:n_buffered(all, Inflight); -% info(mqueue_max, #sessmem{mqueue = MQueue}) -> -% emqx_mqueue:max_len(MQueue); info(mqueue_dropped, _Session) -> 0; %% info(next_pkt_id, #{s := S}) -> %% {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S), %% PacketId; -% info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> -% AwaitingRel; -%% info(awaiting_rel_cnt, #{s := S}) -> -%% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); +info(awaiting_rel, #{s := S}) -> + emqx_persistent_session_ds_state:fold_awaiting_rel(fun maps:put/3, #{}, S); +info(awaiting_rel_max, #{props := Conf}) -> + maps:get(max_awaiting_rel, Conf); +info(awaiting_rel_cnt, #{s := S}) -> + emqx_persistent_session_ds_state:n_awaiting_rel(S); +info(await_rel_timeout, #{props := Conf}) -> + maps:get(await_rel_timeout, Conf); info(seqno_q1_comm, #{s := S}) -> emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S); info(seqno_q1_dup, #{s := S}) -> @@ -292,17 +323,7 @@ info(seqno_q2_rec, #{s := S}) -> info(seqno_q2_next, #{s := S}) -> emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S); info(n_streams, #{s := S}) -> - emqx_persistent_session_ds_state:fold_streams( - fun(_, _, Acc) -> Acc + 1 end, - 0, - S - ); -info(awaiting_rel_max, #{props := Conf}) -> - maps:get(max_awaiting_rel, Conf); -info(await_rel_timeout, #{props := _Conf}) -> - %% TODO: currently this setting is ignored: - %% maps:get(await_rel_timeout, Conf). - 0; + emqx_persistent_session_ds_state:n_streams(S); info({MsgsQ, _PagerParams}, _Session) when MsgsQ =:= mqueue_msgs; MsgsQ =:= inflight_msgs -> {error, not_implemented}. @@ -337,93 +358,49 @@ print_session(ClientId) -> -spec subscribe(topic_filter(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}. +subscribe( + #share{}, + _SubOpts, + _Session +) -> + %% TODO: Shared subscriptions are not supported yet: + {error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}; subscribe( TopicFilter, SubOpts, - Session = #{id := ID, s := S0} + Session ) -> - case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of - undefined -> - %% TODO: max subscriptions - - %% N.B.: we chose to update the router before adding the - %% subscription to the session/iterator table. The - %% reasoning for this is as follows: - %% - %% Messages matching this topic filter should start to be - %% persisted as soon as possible to avoid missing - %% messages. If this is the first such persistent session - %% subscription, it's important to do so early on. - %% - %% This could, in turn, lead to some inconsistency: if - %% such a route gets created but the session/iterator data - %% fails to be updated accordingly, we have a dangling - %% route. To remove such dangling routes, we may have a - %% periodic GC process that removes routes that do not - %% have a matching persistent subscription. Also, route - %% operations use dirty mnesia operations, which - %% inherently have room for inconsistencies. - %% - %% In practice, we use the iterator reference table as a - %% source of truth, since it is guarded by a transaction - %% context: we consider a subscription operation to be - %% successful if it ended up changing this table. Both - %% router and iterator information can be reconstructed - %% from this table, if needed. - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID), - {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), - Subscription = #{ - start_time => now_ms(), - props => SubOpts, - id => SubId, - deleted => false - }, - IsNew = true; - Subscription0 = #{} -> - Subscription = Subscription0#{props => SubOpts}, - IsNew = false, - S1 = S0 - end, - S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1), - ?tp(persistent_session_ds_subscription_added, #{ - topic_filter => TopicFilter, sub => Subscription, is_new => IsNew - }), - {ok, Session#{s => S}}. + case emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, SubOpts, Session) of + {ok, S1} -> + S = emqx_persistent_session_ds_state:commit(S1), + {ok, Session#{s => S}}; + Error = {error, _} -> + Error + end. -spec unsubscribe(topic_filter(), session()) -> {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. unsubscribe( TopicFilter, - Session = #{id := ID, s := S0} + Session = #{id := SessionId, s := S0} ) -> - case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of - undefined -> - {error, ?RC_NO_SUBSCRIPTION_EXISTED}; - Subscription = #{props := SubOpts} -> - S = do_unsubscribe(ID, TopicFilter, Subscription, S0), - {ok, Session#{s => S}, SubOpts} + case emqx_persistent_session_ds_subs:on_unsubscribe(SessionId, TopicFilter, S0) of + {ok, S1, #{id := SubId, subopts := SubOpts}} -> + S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), + S = emqx_persistent_session_ds_state:commit(S2), + {ok, Session#{s => S}, SubOpts}; + Error = {error, _} -> + Error end. --spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) -> - emqx_persistent_session_ds_state:t(). -do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) -> - S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0), - ?tp(persistent_session_ds_subscription_delete, #{ - session_id => SessionId, topic_filter => TopicFilter - }), - S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1), - ?tp_span( - persistent_session_ds_subscription_route_delete, - #{session_id => SessionId, topic_filter => TopicFilter}, - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId) - ), - S. - -spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. +get_subscription(#share{}, _) -> + %% TODO: shared subscriptions are not supported yet: + undefined; get_subscription(TopicFilter, #{s := S}) -> case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of - _Subscription = #{props := SubOpts} -> + #{subopts := SubOpts} -> SubOpts; undefined -> undefined @@ -436,11 +413,72 @@ get_subscription(TopicFilter, #{s := S}) -> -spec publish(emqx_types:packet_id(), emqx_types:message(), session()) -> {ok, emqx_types:publish_result(), session()} | {error, emqx_types:reason_code()}. +publish( + PacketId, + Msg = #message{qos = ?QOS_2, timestamp = Ts}, + Session = #{s := S0} +) -> + case is_awaiting_full(Session) of + false -> + case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of + undefined -> + Results = emqx_broker:publish(Msg), + S = emqx_persistent_session_ds_state:put_awaiting_rel(PacketId, Ts, S0), + {ok, Results, Session#{s => S}}; + _Ts -> + {error, ?RC_PACKET_IDENTIFIER_IN_USE} + end; + true -> + {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} + end; publish(_PacketId, Msg, Session) -> - %% TODO: QoS2 Result = emqx_broker:publish(Msg), {ok, Result, Session}. +is_awaiting_full(#{s := S, props := Props}) -> + emqx_persistent_session_ds_state:n_awaiting_rel(S) >= + maps:get(max_awaiting_rel, Props, infinity). + +-spec expire(emqx_types:clientinfo(), session()) -> + {ok, [], timeout(), session()} | {ok, [], session()}. +expire(ClientInfo, Session0 = #{props := Props}) -> + Session = #{s := S} = do_expire(ClientInfo, Session0), + case emqx_persistent_session_ds_state:n_awaiting_rel(S) of + 0 -> + {ok, [], Session}; + _ -> + AwaitRelTimeout = maps:get(await_rel_timeout, Props), + {ok, [], AwaitRelTimeout, Session} + end. + +do_expire(ClientInfo, Session = #{s := S0, props := Props}) -> + %% 1. Find expired packet IDs: + Now = erlang:system_time(millisecond), + AwaitRelTimeout = maps:get(await_rel_timeout, Props), + ExpiredPacketIds = + emqx_persistent_session_ds_state:fold_awaiting_rel( + fun(PacketId, Ts, Acc) -> + Age = Now - Ts, + case Age > AwaitRelTimeout of + true -> + [PacketId | Acc]; + false -> + Acc + end + end, + [], + S0 + ), + %% 2. Perform side effects: + _ = emqx_session_events:handle_event(ClientInfo, {expired_rel, length(ExpiredPacketIds)}), + %% 3. Update state: + S = lists:foldl( + fun emqx_persistent_session_ds_state:del_awaiting_rel/2, + S0, + ExpiredPacketIds + ), + Session#{s => S}. + %%-------------------------------------------------------------------- %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- @@ -477,9 +515,14 @@ pubrec(PacketId, Session0) -> -spec pubrel(emqx_types:packet_id(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}. -pubrel(_PacketId, Session = #{}) -> - % TODO: stub - {ok, Session}. +pubrel(PacketId, Session = #{s := S0}) -> + case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of + undefined -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}; + _TS -> + S = emqx_persistent_session_ds_state:del_awaiting_rel(PacketId, S0), + {ok, Session#{s => S}} + end. %%-------------------------------------------------------------------- %% Client -> Broker: PUBCOMP @@ -552,6 +595,8 @@ handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s := S = emqx_persistent_session_ds_state:commit(S0), From ! Ref, {ok, [], Session#{s => S}}; +handle_timeout(ClientInfo, expire_awaiting_rel, Session) -> + expire(ClientInfo, Session); handle_timeout(_ClientInfo, Timeout, Session) -> ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}), {ok, [], Session}. @@ -645,7 +690,7 @@ list_client_subscriptions(ClientId) -> %% TODO: this is not the most optimal implementation, since it %% should be possible to avoid reading extra data (streams, etc.) case print_session(ClientId) of - Sess = #{s := #{subscriptions := Subs}} -> + Sess = #{s := #{subscriptions := Subs, subscription_states := SStates}} -> Node = case Sess of #{'_alive' := {true, Pid}} -> @@ -655,8 +700,9 @@ list_client_subscriptions(ClientId) -> end, SubList = maps:fold( - fun(Topic, #{props := SubProps}, Acc) -> - Elem = {Topic, SubProps}, + fun(Topic, #{current_state := CS}, Acc) -> + #{subopts := SubOpts} = maps:get(CS, SStates), + Elem = {Topic, SubOpts}, [Elem | Acc] end, [], @@ -670,6 +716,11 @@ list_client_subscriptions(ClientId) -> {error, not_found} end. +-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) -> + subscription() | undefined. +get_client_subscription(ClientId, Topic) -> + emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic). + %%-------------------------------------------------------------------- %% Session tables operations %%-------------------------------------------------------------------- @@ -701,7 +752,12 @@ sync(ClientId) -> %% the broker. -spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) -> session() | false. -session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> +session_open( + SessionId, + ClientInfo, + NewConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, + MaybeWillMsg +) -> NowMS = now_ms(), case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> @@ -720,8 +776,9 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> maps:get(peername, NewConnInfo), S2 ), S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), - S5 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S4), - S = emqx_persistent_session_ds_state:commit(S5), + S5 = set_clientinfo(ClientInfo, S4), + S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5), + S = emqx_persistent_session_ds_state:commit(S6), Inflight = emqx_persistent_session_ds_inflight:new( receive_maximum(NewConnInfo) ), @@ -744,7 +801,9 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> emqx_session:conf() ) -> session(). -session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> +session_ensure_new( + Id, ClientInfo, ConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, MaybeWillMsg, Conf +) -> ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), Now = now_ms(), S0 = emqx_persistent_session_ds_state:create_new(Id), @@ -767,8 +826,9 @@ session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> ] ), S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4), - S6 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S5), - S = emqx_persistent_session_ds_state:commit(S6), + S6 = set_clientinfo(ClientInfo, S5), + S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6), + S = emqx_persistent_session_ds_state:commit(S7), #{ id => Id, props => Conf, @@ -779,18 +839,12 @@ session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> %% @doc Called when a client reconnects with `clean session=true' or %% during session GC -spec session_drop(id(), _Reason) -> ok. -session_drop(ID, Reason) -> - case emqx_persistent_session_ds_state:open(ID) of +session_drop(SessionId, Reason) -> + case emqx_persistent_session_ds_state:open(SessionId) of {ok, S0} -> - ?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}), - _S = emqx_persistent_session_ds_subs:fold( - fun(TopicFilter, Subscription, S) -> - do_unsubscribe(ID, TopicFilter, Subscription, S) - end, - S0, - S0 - ), - emqx_persistent_session_ds_state:delete(ID); + ?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}), + emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0), + emqx_persistent_session_ds_state:delete(SessionId); undefined -> ok end. @@ -798,6 +852,11 @@ session_drop(ID, Reason) -> now_ms() -> erlang:system_time(millisecond). +set_clientinfo(ClientInfo0, S) -> + %% Remove unnecessary fields from the clientinfo: + ClientInfo = maps:without([cn, dn, auth_result], ClientInfo0), + emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S). + %%-------------------------------------------------------------------- %% RPC targets (v1) %%-------------------------------------------------------------------- @@ -874,22 +933,31 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) -> Session0 end. -enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) -> +enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) -> #srs{ it_begin = ItBegin0, it_end = ItEnd0, first_seqno_qos1 = FirstSeqnoQos1, - first_seqno_qos2 = FirstSeqnoQos2 + first_seqno_qos2 = FirstSeqnoQos2, + sub_state_id = SubStateId } = Srs0, ItBegin = case IsReplay of true -> ItBegin0; false -> ItEnd0 end, + SubState = #{} = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S), case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of {ok, ItEnd, Messages} -> {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch( - IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0 + IsReplay, + Session, + SubState, + ClientInfo, + FirstSeqnoQos1, + FirstSeqnoQos2, + Messages, + Inflight0 ), Srs = Srs0#srs{ it_begin = ItBegin, @@ -913,27 +981,29 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, Cli %% key_of_iter(#{3 := #{3 := #{5 := K}}}) -> %% K. -process_batch(_IsReplay, _Session, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight) -> +process_batch( + _IsReplay, _Session, _SubState, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight +) -> {Inflight, LastSeqNoQos1, LastSeqNoQos2}; process_batch( - IsReplay, Session, ClientInfo, FirstSeqNoQos1, FirstSeqNoQos2, [KV | Messages], Inflight0 + IsReplay, + Session, + SubState, + ClientInfo, + FirstSeqNoQos1, + FirstSeqNoQos2, + [KV | Messages], + Inflight0 ) -> - #{s := S, props := #{upgrade_qos := UpgradeQoS}} = Session, - {_DsMsgKey, Msg0 = #message{topic = Topic}} = KV, + #{s := S} = Session, + #{upgrade_qos := UpgradeQoS, subopts := SubOpts} = SubState, + {_DsMsgKey, Msg0} = KV, Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S), Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S), Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S), - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - Msgs = [ - Msg - || SubMatch <- emqx_topic_gbt:matches(Topic, Subs, []), - Msg <- begin - #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs), - emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS) - end - ], + Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS), {Inflight, LastSeqNoQos1, LastSeqNoQos2} = lists:foldl( fun(Msg = #message{qos = Qos}, {Acc, SeqNoQos10, SeqNoQos20}) -> case Qos of @@ -989,14 +1059,16 @@ process_batch( Msgs ), process_batch( - IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight + IsReplay, Session, SubState, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight ). %%-------------------------------------------------------------------- %% Transient messages %%-------------------------------------------------------------------- -enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) -> +enqueue_transient( + _ClientInfo, Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0} +) -> %% TODO: Such messages won't be retransmitted, should the session %% reconnect before transient messages are acked. %% @@ -1006,18 +1078,6 @@ enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos : %% queued messages. Since streams in this DB are exclusive to the %% session, messages from the queue can be dropped as soon as they %% are acked. - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - Msgs = [ - Msg - || SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []), - Msg <- begin - #{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs), - emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS) - end - ], - lists:foldl(fun do_enqueue_transient/2, Session, Msgs). - -do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) -> case Qos of ?QOS_0 -> S = S0, diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 56862dfa5..79920629a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -65,17 +65,21 @@ last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(), %% This stream belongs to an unsubscribed topic-filter, and is %% marked for deletion: - unsubscribed = false :: boolean() + unsubscribed = false :: boolean(), + %% Reference to the subscription state: + sub_state_id :: emqx_persistent_session_ds_subs:subscription_state_id() }). %% Session metadata keys: -define(created_at, created_at). -define(last_alive_at, last_alive_at). -define(expiry_interval, expiry_interval). -%% Unique integer used to create unique identities +%% Unique integer used to create unique identities: -define(last_id, last_id). +%% Connection info (relevent for the dashboard): -define(peername, peername). -define(will_message, will_message). -define(clientinfo, clientinfo). +-define(protocol, protocol). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 28297964d..9efffc7ff 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -22,6 +22,9 @@ %% It is responsible for saving, caching, and restoring session state. %% It is completely devoid of business logic. Not even the default %% values should be set in this module. +%% +%% Session process MUST NOT use `cold_*' functions! They are reserved +%% for use in the management APIs. -module(emqx_persistent_session_ds_state). -export([create_tables/0]). @@ -33,22 +36,44 @@ -export([get_clientinfo/1, set_clientinfo/2]). -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). -export([get_peername/1, set_peername/2]). +-export([get_protocol/1, set_protocol/2]). -export([new_id/1]). --export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). +-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]). -export([get_seqno/2, put_seqno/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). --export([get_subscriptions/1, put_subscription/4, del_subscription/3]). +-export([ + get_subscription_state/2, + cold_get_subscription_state/2, + fold_subscription_states/3, + put_subscription_state/3, + del_subscription_state/2 +]). +-export([ + get_subscription/2, + cold_get_subscription/2, + fold_subscriptions/3, + n_subscriptions/1, + put_subscription/3, + del_subscription/2 +]). +-export([ + get_awaiting_rel/2, + put_awaiting_rel/3, + del_awaiting_rel/2, + fold_awaiting_rel/3, + n_awaiting_rel/1 +]). -export([make_session_iterator/0, session_iterator_next/2]). -export_type([ t/0, metadata/0, - subscriptions/0, seqno_type/0, stream_key/0, rank_key/0, - session_iterator/0 + session_iterator/0, + protocol/0 ]). -include("emqx_mqtt.hrl"). @@ -62,8 +87,6 @@ -type message() :: emqx_types:message(). --type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()). - -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. %% Generic key-value wrapper that is used for exporting arbitrary @@ -92,13 +115,16 @@ dirty :: #{K => dirty | del} }. +-type protocol() :: {binary(), emqx_types:proto_ver()}. + -type metadata() :: #{ ?created_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(), ?expiry_interval => non_neg_integer(), ?last_id => integer(), - ?peername => emqx_types:peername() + ?peername => emqx_types:peername(), + ?protocol => protocol() }. -type seqno_type() :: @@ -110,22 +136,49 @@ | ?rec | ?committed(?QOS_2). +-define(id, id). +-define(dirty, dirty). +-define(metadata, metadata). +-define(subscriptions, subscriptions). +-define(subscription_states, subscription_states). +-define(seqnos, seqnos). +-define(streams, streams). +-define(ranks, ranks). +-define(awaiting_rel, awaiting_rel). + -opaque t() :: #{ - id := emqx_persistent_session_ds:id(), - dirty := boolean(), - metadata := metadata(), - subscriptions := subscriptions(), - seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), - streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), - ranks := pmap(term(), integer()) + ?id := emqx_persistent_session_ds:id(), + ?dirty := boolean(), + ?metadata := metadata(), + ?subscriptions := pmap( + emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_subs:subscription() + ), + ?subscription_states := pmap( + emqx_persistent_session_ds_subs:subscription_state_id(), + emqx_persistent_session_ds_subs:subscription_state() + ), + ?seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), + ?streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), + ?ranks := pmap(term(), integer()), + ?awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer()) }. -define(session_tab, emqx_ds_session_tab). -define(subscription_tab, emqx_ds_session_subscriptions). +-define(subscription_states_tab, emqx_ds_session_subscription_states). -define(stream_tab, emqx_ds_session_streams). -define(seqno_tab, emqx_ds_session_seqnos). -define(rank_tab, emqx_ds_session_ranks). --define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). +-define(awaiting_rel_tab, emqx_ds_session_awaiting_rel). + +-define(pmaps, [ + {?subscriptions, ?subscription_tab}, + {?subscription_states, ?subscription_states_tab}, + {?streams, ?stream_tab}, + {?seqnos, ?seqno_tab}, + {?ranks, ?rank_tab}, + {?awaiting_rel, ?awaiting_rel_tab} +]). %% Enable this flag if you suspect some code breaks the sequence: -ifndef(CHECK_SEQNO). @@ -152,23 +205,25 @@ create_tables() -> {attributes, record_info(fields, kv)} ] ), - [create_kv_pmap_table(Table) || Table <- ?pmap_tables], - mria:wait_for_tables([?session_tab | ?pmap_tables]). + {_, PmapTables} = lists:unzip(?pmaps), + [create_kv_pmap_table(Table) || Table <- PmapTables], + mria:wait_for_tables([?session_tab | PmapTables]). -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined. open(SessionId) -> ro_transaction(fun() -> case kv_restore(?session_tab, SessionId) of [Metadata] -> - Rec = #{ - id => SessionId, - metadata => Metadata, - subscriptions => read_subscriptions(SessionId), - streams => pmap_open(?stream_tab, SessionId), - seqnos => pmap_open(?seqno_tab, SessionId), - ranks => pmap_open(?rank_tab, SessionId), - ?unset_dirty - }, + Rec = update_pmaps( + fun(_Pmap, Table) -> + pmap_open(Table, SessionId) + end, + #{ + id => SessionId, + metadata => Metadata, + ?unset_dirty + } + ), {ok, Rec}; [] -> undefined @@ -185,27 +240,13 @@ print_session(SessionId) -> end. -spec format(t()) -> map(). -format(#{ - metadata := Metadata, - subscriptions := SubsGBT, - streams := Streams, - seqnos := Seqnos, - ranks := Ranks -}) -> - Subs = emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> - maps:put(emqx_topic_gbt:get_topic(Key), Sub, Acc) +format(Rec) -> + update_pmaps( + fun(Pmap, _Table) -> + pmap_format(Pmap) end, - #{}, - SubsGBT - ), - #{ - metadata => Metadata, - subscriptions => Subs, - streams => pmap_format(Streams), - seqnos => pmap_format(Seqnos), - ranks => pmap_format(Ranks) - }. + maps:without([id, dirty], Rec) + ). -spec list_sessions() -> [emqx_persistent_session_ds:id()]. list_sessions() -> @@ -215,7 +256,7 @@ list_sessions() -> delete(Id) -> transaction( fun() -> - [kv_pmap_delete(Table, Id) || Table <- ?pmap_tables], + [kv_pmap_delete(Table, Id) || {_, Table} <- ?pmaps], mnesia:delete(?session_tab, Id, write) end ). @@ -226,36 +267,34 @@ commit(Rec = #{dirty := false}) -> commit( Rec = #{ id := SessionId, - metadata := Metadata, - streams := Streams, - seqnos := SeqNos, - ranks := Ranks + metadata := Metadata } ) -> check_sequence(Rec), transaction(fun() -> kv_persist(?session_tab, SessionId, Metadata), - Rec#{ - streams => pmap_commit(SessionId, Streams), - seqnos => pmap_commit(SessionId, SeqNos), - ranks => pmap_commit(SessionId, Ranks), - ?unset_dirty - } + update_pmaps( + fun(Pmap, _Table) -> + pmap_commit(SessionId, Pmap) + end, + Rec#{?unset_dirty} + ) end). -spec create_new(emqx_persistent_session_ds:id()) -> t(). create_new(SessionId) -> transaction(fun() -> delete(SessionId), - #{ - id => SessionId, - metadata => #{}, - subscriptions => emqx_topic_gbt:new(), - streams => pmap_open(?stream_tab, SessionId), - seqnos => pmap_open(?seqno_tab, SessionId), - ranks => pmap_open(?rank_tab, SessionId), - ?set_dirty - } + update_pmaps( + fun(_Pmap, Table) -> + pmap_open(Table, SessionId) + end, + #{ + id => SessionId, + metadata => #{}, + ?set_dirty + } + ) end). %% @@ -292,6 +331,14 @@ get_peername(Rec) -> set_peername(Val, Rec) -> set_meta(?peername, Val, Rec). +-spec get_protocol(t()) -> protocol() | undefined. +get_protocol(Rec) -> + get_meta(?protocol, Rec). + +-spec set_protocol(protocol(), t()) -> t(). +set_protocol(Val, Rec) -> + set_meta(?protocol, Val, Rec). + -spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()). get_clientinfo(Rec) -> get_meta(?clientinfo, Rec). @@ -336,30 +383,65 @@ new_id(Rec) -> %% --spec get_subscriptions(t()) -> subscriptions(). -get_subscriptions(#{subscriptions := Subs}) -> - Subs. +-spec get_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> + emqx_persistent_session_ds_subs:subscription() | undefined. +get_subscription(TopicFilter, Rec) -> + gen_get(?subscriptions, TopicFilter, Rec). + +-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> + [emqx_persistent_session_ds_subs:subscription()]. +cold_get_subscription(SessionId, Topic) -> + kv_pmap_read(?subscription_tab, SessionId, Topic). + +-spec fold_subscriptions(fun(), Acc, t()) -> Acc. +fold_subscriptions(Fun, Acc, Rec) -> + gen_fold(?subscriptions, Fun, Acc, Rec). + +-spec n_subscriptions(t()) -> non_neg_integer(). +n_subscriptions(Rec) -> + gen_size(?subscriptions, Rec). -spec put_subscription( emqx_persistent_session_ds:topic_filter(), - _SubId, - emqx_persistent_session_ds:subscription(), + emqx_persistent_session_ds_subs:subscription(), t() ) -> t(). -put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) -> - %% Note: currently changes to the subscriptions are persisted immediately. - Key = {TopicFilter, SubId}, - transaction(fun() -> kv_pmap_persist(?subscription_tab, Id, Key, Subscription) end), - Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0), - Rec#{subscriptions => Subs}. +put_subscription(TopicFilter, Subscription, Rec) -> + gen_put(?subscriptions, TopicFilter, Subscription, Rec). --spec del_subscription(emqx_persistent_session_ds:topic_filter(), _SubId, t()) -> t(). -del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) -> - %% Note: currently the subscriptions are persisted immediately. - Key = {TopicFilter, SubId}, - transaction(fun() -> kv_pmap_delete(?subscription_tab, Id, Key) end), - Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0), - Rec#{subscriptions => Subs}. +-spec del_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> t(). +del_subscription(TopicFilter, Rec) -> + gen_del(?subscriptions, TopicFilter, Rec). + +%% + +-spec get_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> + emqx_persistent_session_ds_subs:subscription_state() | undefined. +get_subscription_state(SStateId, Rec) -> + gen_get(?subscription_states, SStateId, Rec). + +-spec cold_get_subscription_state( + emqx_persistent_session_ds:id(), emqx_persistent_session_ds_subs:subscription_state_id() +) -> + [emqx_persistent_session_ds_subs:subscription_state()]. +cold_get_subscription_state(SessionId, SStateId) -> + kv_pmap_read(?subscription_states_tab, SessionId, SStateId). + +-spec fold_subscription_states(fun(), Acc, t()) -> Acc. +fold_subscription_states(Fun, Acc, Rec) -> + gen_fold(?subscription_states, Fun, Acc, Rec). + +-spec put_subscription_state( + emqx_persistent_session_ds_subs:subscription_state_id(), + emqx_persistent_session_ds_subs:subscription_state(), + t() +) -> t(). +put_subscription_state(SStateId, SState, Rec) -> + gen_put(?subscription_states, SStateId, SState, Rec). + +-spec del_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> t(). +del_subscription_state(SStateId, Rec) -> + gen_del(?subscription_states, SStateId, Rec). %% @@ -368,29 +450,33 @@ del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) -spec get_stream(stream_key(), t()) -> emqx_persistent_session_ds:stream_state() | undefined. get_stream(Key, Rec) -> - gen_get(streams, Key, Rec). + gen_get(?streams, Key, Rec). -spec put_stream(stream_key(), emqx_persistent_session_ds:stream_state(), t()) -> t(). put_stream(Key, Val, Rec) -> - gen_put(streams, Key, Val, Rec). + gen_put(?streams, Key, Val, Rec). -spec del_stream(stream_key(), t()) -> t(). del_stream(Key, Rec) -> - gen_del(streams, Key, Rec). + gen_del(?streams, Key, Rec). -spec fold_streams(fun(), Acc, t()) -> Acc. fold_streams(Fun, Acc, Rec) -> - gen_fold(streams, Fun, Acc, Rec). + gen_fold(?streams, Fun, Acc, Rec). + +-spec n_streams(t()) -> non_neg_integer(). +n_streams(Rec) -> + gen_size(?streams, Rec). %% -spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined. get_seqno(Key, Rec) -> - gen_get(seqnos, Key, Rec). + gen_get(?seqnos, Key, Rec). -spec put_seqno(seqno_type(), emqx_persistent_session_ds:seqno(), t()) -> t(). put_seqno(Key, Val, Rec) -> - gen_put(seqnos, Key, Val, Rec). + gen_put(?seqnos, Key, Val, Rec). %% @@ -398,19 +484,43 @@ put_seqno(Key, Val, Rec) -> -spec get_rank(rank_key(), t()) -> integer() | undefined. get_rank(Key, Rec) -> - gen_get(ranks, Key, Rec). + gen_get(?ranks, Key, Rec). -spec put_rank(rank_key(), integer(), t()) -> t(). put_rank(Key, Val, Rec) -> - gen_put(ranks, Key, Val, Rec). + gen_put(?ranks, Key, Val, Rec). -spec del_rank(rank_key(), t()) -> t(). del_rank(Key, Rec) -> - gen_del(ranks, Key, Rec). + gen_del(?ranks, Key, Rec). -spec fold_ranks(fun(), Acc, t()) -> Acc. fold_ranks(Fun, Acc, Rec) -> - gen_fold(ranks, Fun, Acc, Rec). + gen_fold(?ranks, Fun, Acc, Rec). + +%% + +-spec get_awaiting_rel(emqx_types:packet_id(), t()) -> integer() | undefined. +get_awaiting_rel(Key, Rec) -> + gen_get(?awaiting_rel, Key, Rec). + +-spec put_awaiting_rel(emqx_types:packet_id(), _Timestamp :: integer(), t()) -> t(). +put_awaiting_rel(Key, Val, Rec) -> + gen_put(?awaiting_rel, Key, Val, Rec). + +-spec del_awaiting_rel(emqx_types:packet_id(), t()) -> t(). +del_awaiting_rel(Key, Rec) -> + gen_del(?awaiting_rel, Key, Rec). + +-spec fold_awaiting_rel(fun(), Acc, t()) -> Acc. +fold_awaiting_rel(Fun, Acc, Rec) -> + gen_fold(?awaiting_rel, Fun, Acc, Rec). + +-spec n_awaiting_rel(t()) -> non_neg_integer(). +n_awaiting_rel(Rec) -> + gen_size(?awaiting_rel, Rec). + +%% -spec make_session_iterator() -> session_iterator(). make_session_iterator() -> @@ -475,16 +585,20 @@ gen_del(Field, Key, Rec) -> Rec#{?set_dirty} ). -%% +gen_size(Field, Rec) -> + check_sequence(Rec), + pmap_size(maps:get(Field, Rec)). -read_subscriptions(SessionId) -> - Records = kv_pmap_restore(?subscription_tab, SessionId), +-spec update_pmaps(fun((pmap(_K, _V) | undefined, atom()) -> term()), map()) -> map(). +update_pmaps(Fun, Map) -> lists:foldl( - fun({{TopicFilter, SubId}, Subscription}, Acc) -> - emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Acc) + fun({MapKey, Table}, Acc) -> + OldVal = maps:get(MapKey, Map, undefined), + Val = Fun(OldVal, Table), + maps:put(MapKey, Val, Acc) end, - emqx_topic_gbt:new(), - Records + Map, + ?pmaps ). %% @@ -547,6 +661,10 @@ pmap_commit( pmap_format(#pmap{cache = Cache}) -> Cache. +-spec pmap_size(pmap(_K, _V)) -> non_neg_integer(). +pmap_size(#pmap{cache = Cache}) -> + maps:size(Cache). + %% Functions dealing with set tables: kv_persist(Tab, SessionId, Val0) -> @@ -574,6 +692,14 @@ kv_pmap_persist(Tab, SessionId, Key, Val0) -> Val = encoder(encode, Tab, Val0), mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write). +kv_pmap_read(Table, SessionId, Key) -> + lists:map( + fun(#kv{v = Val}) -> + encoder(decode, Table, Val) + end, + mnesia:dirty_read(Table, {SessionId, Key}) + ). + kv_pmap_restore(Table, SessionId) -> MS = [{#kv{k = {SessionId, '$1'}, v = '$2'}, [], [{{'$1', '$2'}}]}], Objs = mnesia:select(Table, MS, read), diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index 154f59b44..1be0bdf4a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -126,9 +126,10 @@ find_new_streams(S) -> renew_streams(S0) -> S1 = remove_unsubscribed_streams(S0), S2 = remove_fully_replayed_streams(S1), + S3 = update_stream_subscription_state_ids(S2), emqx_persistent_session_ds_subs:fold( fun - (Key, #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> + (Key, #{start_time := StartTime, id := SubId, current_state := SStateId}, Acc) -> TopicFilter = emqx_topic:words(Key), Streams = select_streams( SubId, @@ -137,7 +138,7 @@ renew_streams(S0) -> ), lists:foldl( fun(I, Acc1) -> - ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) + ensure_iterator(TopicFilter, StartTime, SubId, SStateId, I, Acc1) end, Acc, Streams @@ -145,8 +146,8 @@ renew_streams(S0) -> (_Key, _DeletedSubscription, Acc) -> Acc end, - S2, - S2 + S3, + S3 ). -spec on_unsubscribe( @@ -201,7 +202,7 @@ is_fully_acked(Srs, S) -> %% Internal functions %%================================================================================ -ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> +ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream}, S) -> Key = {SubId, Stream}, case emqx_persistent_session_ds_state:get_stream(Key, S) of undefined -> @@ -214,7 +215,8 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> rank_x = RankX, rank_y = RankY, it_begin = Iterator, - it_end = Iterator + it_end = Iterator, + sub_state_id = SStateId }, emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); {error, recoverable, Reason} -> @@ -350,6 +352,38 @@ remove_fully_replayed_streams(S0) -> S1 ). +%% @doc Update subscription state IDs for all streams that don't have unacked messages +-spec update_stream_subscription_state_ids(emqx_persistent_session_ds_state:t()) -> + emqx_persistent_session_ds_state:t(). +update_stream_subscription_state_ids(S0) -> + CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0), + CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0), + %% Find the latest state IDs for each subscription: + LastSStateIds = emqx_persistent_session_ds_state:fold_subscriptions( + fun(_, #{id := SubId, current_state := SStateId}, Acc) -> + Acc#{SubId => SStateId} + end, + #{}, + S0 + ), + %% Update subscription state IDs for fully acked streams: + emqx_persistent_session_ds_state:fold_streams( + fun + (_, #srs{unsubscribed = true}, S) -> + S; + (Key = {SubId, _Stream}, SRS0, S) -> + case is_fully_acked(CommQos1, CommQos2, SRS0) of + true -> + SRS = SRS0#srs{sub_state_id = maps:get(SubId, LastSStateIds)}, + emqx_persistent_session_ds_state:put_stream(Key, SRS, S); + false -> + S + end + end, + S0, + S0 + ). + %% @doc Compare the streams by the order in which they were replayed. compare_streams( {_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}}, diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index 92f17b108..8b4f70a69 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -24,14 +24,56 @@ -module(emqx_persistent_session_ds_subs). %% API: --export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]). +-export([ + on_subscribe/3, + on_unsubscribe/3, + on_session_drop/2, + gc/1, + lookup/2, + to_map/1, + fold/3 +]). --export_type([]). +%% Management API: +-export([ + cold_get_subscription/2 +]). + +-export_type([subscription_state_id/0, subscription/0, subscription_state/0]). + +-include("emqx_persistent_session_ds.hrl"). +-include("emqx_mqtt.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ %% Type declarations %%================================================================================ +-type subscription() :: #{ + %% Session-unique identifier of the subscription. Other objects + %% can use it as a compact reference: + id := emqx_persistent_session_ds:subscription_id(), + %% Reference to the current subscription state: + current_state := subscription_state_id(), + %% Time when the subscription was added: + start_time := emqx_ds:time() +}. + +-type subscription_state_id() :: integer(). + +-type subscription_state() :: #{ + parent_subscription := emqx_persistent_session_ds:subscription_id(), + upgrade_qos := boolean(), + %% SubOpts: + subopts := #{ + nl => _, + qos => _, + rap => _, + subid => _, + _ => _ + } +}. + %%================================================================================ %% API functions %%================================================================================ @@ -39,41 +81,131 @@ %% @doc Process a new subscription -spec on_subscribe( emqx_persistent_session_ds:topic_filter(), - emqx_persistent_session_ds:subscription(), - emqx_persistent_session_ds_state:t() + emqx_types:subopts(), + emqx_persistent_session_ds:session() ) -> - emqx_persistent_session_ds_state:t(). -on_subscribe(TopicFilter, Subscription, S) -> - emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S). + {ok, emqx_persistent_session_ds_state:t()} | {error, ?RC_QUOTA_EXCEEDED}. +on_subscribe(TopicFilter, SubOpts, #{id := SessionId, s := S0, props := Props}) -> + #{upgrade_qos := UpgradeQoS, max_subscriptions := MaxSubscriptions} = Props, + case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of + undefined -> + %% This is a new subscription: + case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of + true -> + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), + {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), + {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), + SState = #{ + parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts + }, + S3 = emqx_persistent_session_ds_state:put_subscription_state( + SStateId, SState, S2 + ), + Subscription = #{ + id => SubId, + current_state => SStateId, + start_time => now_ms() + }, + S = emqx_persistent_session_ds_state:put_subscription( + TopicFilter, Subscription, S3 + ), + ?tp(persistent_session_ds_subscription_added, #{ + topic_filter => TopicFilter, session => SessionId + }), + {ok, S}; + false -> + {error, ?RC_QUOTA_EXCEEDED} + end; + Sub0 = #{current_state := SStateId0, id := SubId} -> + SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, + case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of + SState -> + %% Client resubscribed with the same parameters: + {ok, S0}; + _ -> + %% Subsription parameters changed: + {SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0), + S2 = emqx_persistent_session_ds_state:put_subscription_state( + SStateId, SState, S1 + ), + Sub = Sub0#{current_state => SStateId}, + S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2), + {ok, S} + end + end. %% @doc Process UNSUBSCRIBE -spec on_unsubscribe( + emqx_persistent_session_ds:id(), emqx_persistent_session_ds:topic_filter(), - emqx_persistent_session_ds:subscription(), emqx_persistent_session_ds_state:t() ) -> - emqx_persistent_session_ds_state:t(). -on_unsubscribe(TopicFilter, Subscription0, S0) -> - %% Note: we cannot delete the subscription immediately, since its - %% metadata can be used during replay (see `process_batch'). We - %% instead mark it as deleted, and let `subscription_gc' function - %% dispatch it later: - Subscription = Subscription0#{deleted => true}, - emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0). + {ok, emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()} + | {error, ?RC_NO_SUBSCRIPTION_EXISTED}. +on_unsubscribe(SessionId, TopicFilter, S0) -> + case lookup(TopicFilter, S0) of + undefined -> + {error, ?RC_NO_SUBSCRIPTION_EXISTED}; + Subscription -> + ?tp(persistent_session_ds_subscription_delete, #{ + session_id => SessionId, topic_filter => TopicFilter + }), + ?tp_span( + persistent_session_ds_subscription_route_delete, + #{session_id => SessionId, topic_filter => TopicFilter}, + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId) + ), + {ok, emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), Subscription} + end. -%% @doc Remove subscriptions that have been marked for deletion, and -%% that don't have any unacked messages: +-spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok. +on_session_drop(SessionId, S0) -> + fold( + fun(TopicFilter, _Subscription, S) -> + case on_unsubscribe(SessionId, TopicFilter, S) of + {ok, S1, _} -> S1; + _ -> S + end + end, + S0, + S0 + ). + +%% @doc Remove subscription states that don't have a parent, and that +%% don't have any unacked messages: -spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). gc(S0) -> - fold_all( - fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> - case Deleted andalso has_no_unacked_streams(SubId, S0) of - true -> - emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); + %% Create a set of subscription states IDs referenced either by a + %% subscription or a stream replay state: + AliveSet0 = emqx_persistent_session_ds_state:fold_subscriptions( + fun(_TopicFilter, #{current_state := SStateId}, Acc) -> + Acc#{SStateId => true} + end, + #{}, + S0 + ), + AliveSet = emqx_persistent_session_ds_state:fold_streams( + fun(_StreamId, SRS = #srs{sub_state_id = SStateId}, Acc) -> + case emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S0) of false -> + Acc#{SStateId => true}; + true -> Acc end end, + AliveSet0, + S0 + ), + %% Delete dangling subscription states: + emqx_persistent_session_ds_state:fold_subscription_states( + fun(SStateId, _, S) -> + case maps:is_key(SStateId, AliveSet) of + true -> + S; + false -> + emqx_persistent_session_ds_state:del_subscription_state(SStateId, S) + end + end, S0, S0 ). @@ -82,12 +214,16 @@ gc(S0) -> -spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds:subscription() | undefined. lookup(TopicFilter, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of - #{deleted := true} -> - undefined; - Sub -> - Sub + case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of + Sub = #{current_state := SStateId} -> + case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of + #{subopts := SubOpts} -> + Sub#{subopts => SubOpts}; + undefined -> + undefined + end; + undefined -> + undefined end. %% @doc Convert active subscriptions to a map, for information @@ -95,7 +231,7 @@ lookup(TopicFilter, S) -> -spec to_map(emqx_persistent_session_ds_state:t()) -> map(). to_map(S) -> fold( - fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, + fun(TopicFilter, _, Acc) -> Acc#{TopicFilter => lookup(TopicFilter, S)} end, #{}, S ). @@ -107,48 +243,29 @@ to_map(S) -> emqx_persistent_session_ds_state:t() ) -> Acc. -fold(Fun, AccIn, S) -> - fold_all( - fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) -> - case Deleted of - true -> Acc; - false -> Fun(TopicFilter, Sub, Acc) - end - end, - AccIn, - S - ). +fold(Fun, Acc, S) -> + emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S). -%% @doc Fold over all subscriptions, including inactive ones: --spec fold_all( - fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc), - Acc, - emqx_persistent_session_ds_state:t() -) -> - Acc. -fold_all(Fun, AccIn, S) -> - Subs = emqx_persistent_session_ds_state:get_subscriptions(S), - emqx_topic_gbt:fold( - fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, - AccIn, - Subs - ). +-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> + emqx_persistent_session_ds:subscription() | undefined. +cold_get_subscription(SessionId, Topic) -> + case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, Topic) of + [Sub = #{current_state := SStateId}] -> + case + emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId) + of + [#{subopts := Subopts}] -> + Sub#{subopts => Subopts}; + _ -> + undefined + end; + _ -> + undefined + end. %%================================================================================ %% Internal functions %%================================================================================ --spec has_no_unacked_streams( - emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t() -) -> boolean(). -has_no_unacked_streams(SubId, S) -> - emqx_persistent_session_ds_state:fold_streams( - fun - ({SID, _Stream}, Srs, Acc) when SID =:= SubId -> - emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc; - (_StreamKey, _Srs, Acc) -> - Acc - end, - true, - S - ). +now_ms() -> + erlang:system_time(millisecond). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 37a86bda6..3892740a6 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -429,6 +429,11 @@ enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) -> end, enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS). +%% Caution: updating this function _may_ break consistency of replay +%% for persistent sessions. Persistent sessions expect it to return +%% the same result during replay. If it changes the behavior between +%% releases, sessions restored from the cold storage may end up +%% replaying messages with different QoS, etc. enrich_message( ClientInfo = #{clientid := ClientId}, Msg = #message{from = ClientId}, diff --git a/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl index 61e0575a8..375b4f4b1 100644 --- a/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl +++ b/apps/emqx/test/emqx_persistent_session_ds_state_tests.erl @@ -74,9 +74,6 @@ session_id() -> topic() -> oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]). -subid() -> - oneof([[]]). - subscription() -> oneof([#{}]). @@ -129,18 +126,25 @@ put_req() -> {Track, Seqno}, {seqno_track(), seqno()}, {#s.seqno, put_seqno, Track, Seqno} + ), + ?LET( + {Topic, Subscription}, + {topic(), subscription()}, + {#s.subs, put_subscription, Topic, Subscription} ) ]). get_req() -> oneof([ {#s.streams, get_stream, stream_id()}, - {#s.seqno, get_seqno, seqno_track()} + {#s.seqno, get_seqno, seqno_track()}, + {#s.subs, get_subscription, topic()} ]). del_req() -> oneof([ - {#s.streams, del_stream, stream_id()} + {#s.streams, del_stream, stream_id()}, + {#s.subs, del_subscription, topic()} ]). command(S) -> @@ -153,13 +157,6 @@ command(S) -> {2, {call, ?MODULE, reopen, [session_id(S)]}}, {2, {call, ?MODULE, commit, [session_id(S)]}}, - %% Subscriptions: - {3, - {call, ?MODULE, put_subscription, [ - session_id(S), topic(), subid(), subscription() - ]}}, - {3, {call, ?MODULE, del_subscription, [session_id(S), topic(), subid()]}}, - %% Metadata: {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}}, {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}}, @@ -170,7 +167,6 @@ command(S) -> {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}}, %% Getters: - {4, {call, ?MODULE, get_subscriptions, [session_id(S)]}}, {1, {call, ?MODULE, iterate_sessions, [batch_size()]}} ]); false -> @@ -207,19 +203,6 @@ postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result) #{session_id => SessionId, key => Key, 'fun' => Fun} ), true; -postcondition(S, {call, ?MODULE, get_subscriptions, [SessionId]}, Result) -> - #{SessionId := #s{subs = Subs}} = S, - ?assertEqual(maps:size(Subs), emqx_topic_gbt:size(Result)), - maps:foreach( - fun({TopicFilter, Id}, Expected) -> - ?assertEqual( - Expected, - emqx_topic_gbt:lookup(TopicFilter, Id, Result, default) - ) - end, - Subs - ), - true; postcondition(_, _, _) -> true. @@ -227,22 +210,6 @@ next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) -> S#{SessionId => #s{}}; next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) -> maps:remove(SessionId, S); -next_state(S, _V, {call, ?MODULE, put_subscription, [SessionId, TopicFilter, SubId, Subscription]}) -> - Key = {TopicFilter, SubId}, - update( - SessionId, - #s.subs, - fun(Subs) -> Subs#{Key => Subscription} end, - S - ); -next_state(S, _V, {call, ?MODULE, del_subscription, [SessionId, TopicFilter, SubId]}) -> - Key = {TopicFilter, SubId}, - update( - SessionId, - #s.subs, - fun(Subs) -> maps:remove(Key, Subs) end, - S - ); next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) -> update( SessionId, @@ -296,19 +263,6 @@ reopen(SessionId) -> {ok, S} = emqx_persistent_session_ds_state:open(SessionId), put_state(SessionId, S). -put_subscription(SessionId, TopicFilter, SubId, Subscription) -> - S = emqx_persistent_session_ds_state:put_subscription( - TopicFilter, SubId, Subscription, get_state(SessionId) - ), - put_state(SessionId, S). - -del_subscription(SessionId, TopicFilter, SubId) -> - S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, SubId, get_state(SessionId)), - put_state(SessionId, S). - -get_subscriptions(SessionId) -> - emqx_persistent_session_ds_state:get_subscriptions(get_state(SessionId)). - put_metadata(SessionId, {_MetaKey, Fun, Value}) -> S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]), put_state(SessionId, S). diff --git a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl index 868d0191e..449d1fa51 100644 --- a/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl +++ b/apps/emqx_bridge_cassandra/test/emqx_bridge_cassandra_SUITE.erl @@ -581,7 +581,6 @@ t_write_failure(Config) -> ) end), fun(Trace0) -> - ct:pal("trace: ~p", [Trace0]), Trace = ?of_kind( [buffer_worker_flush_nack, buffer_worker_retry_inflight_failed], Trace0 ), diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index 6666a3fd0..d96157f8c 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -1929,7 +1929,6 @@ t_bad_attributes(Config) -> ok end, fun(Trace) -> - ct:pal("trace:\n ~p", [Trace]), ?assertMatch( [ #{placeholder := [<<"payload">>, <<"ok">>], value := #{}}, diff --git a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl index 8b719de9a..9ad2fbc5a 100644 --- a/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl +++ b/apps/emqx_bridge_mysql/test/emqx_bridge_mysql_SUITE.erl @@ -517,7 +517,6 @@ t_write_failure(Config) -> ok end, fun(Trace0) -> - ct:pal("trace: ~p", [Trace0]), Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([#{result := {error, _}} | _], Trace), [#{result := {error, Error}} | _] = Trace, diff --git a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl index 3e9428c88..f4917f387 100644 --- a/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl +++ b/apps/emqx_bridge_pgsql/test/emqx_bridge_pgsql_SUITE.erl @@ -520,7 +520,6 @@ t_write_failure(Config) -> ) end), fun(Trace0) -> - ct:pal("trace: ~p", [Trace0]), Trace = ?of_kind(buffer_worker_flush_nack, Trace0), ?assertMatch([#{result := {error, _}} | _], Trace), [#{result := {error, Error}} | _] = Trace, diff --git a/apps/emqx_durable_storage/README.md b/apps/emqx_durable_storage/README.md index f613085bb..1e87f3907 100644 --- a/apps/emqx_durable_storage/README.md +++ b/apps/emqx_durable_storage/README.md @@ -13,7 +13,7 @@ This makes the storage disk requirements very predictable: only the number of _p DS _backend_ is a callback module that implements `emqx_ds` behavior. -EMQX repository contains the "builtin" backend, implemented in `emqx_ds_replication_layer` module, that uses RocksDB as the main storage. +EMQX repository contains the "builtin" backend, implemented in `emqx_ds_replication_layer` module, that uses Raft algorithm for data replication, and RocksDB as the main storage. Note that builtin backend introduces the concept of **site** to alleviate the problem of changing node names. Site IDs are persistent, and they are randomly generated at the first startup of the node. @@ -95,10 +95,10 @@ Consumption of messages is done in several stages: # Limitation -- Builtin backend currently doesn't replicate data across different sites - There is no local cache of messages, which may result in transferring the same data multiple times # Documentation links + TBD # Usage diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 7128f1c3a..38320780d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -1747,6 +1747,7 @@ format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) -> format_persistent_session_info(ClientId, PSInfo0) -> Metadata = maps:get(metadata, PSInfo0, #{}), + {ProtoName, ProtoVer} = maps:get(protocol, Metadata), PSInfo1 = maps:with([created_at, expiry_interval], Metadata), CreatedAt = maps:get(created_at, PSInfo1), case Metadata of @@ -1763,7 +1764,12 @@ format_persistent_session_info(ClientId, PSInfo0) -> connected_at => CreatedAt, ip_address => IpAddress, is_persistent => true, - port => Port + port => Port, + heap_size => 0, + mqueue_len => 0, + proto_name => ProtoName, + proto_ver => ProtoVer, + subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo0, #{})) }, PSInfo = lists:foldl( fun result_format_time_fun/2, diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index cb8421211..b1a8fbce2 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -86,7 +86,8 @@ fields(subscription) -> {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>, example => 0})}, {nl, hoconsc:mk(integer(), #{desc => <<"No Local">>, example => 0})}, {rap, hoconsc:mk(integer(), #{desc => <<"Retain as Published">>, example => 0})}, - {rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>, example => 0})} + {rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>, example => 0})}, + {durable, hoconsc:mk(boolean(), #{desc => <<"Durable subscription">>, example => false})} ]. parameters() -> @@ -141,6 +142,14 @@ parameters() -> required => false, desc => <<"Shared subscription group name">> }) + }, + { + durable, + hoconsc:mk(boolean(), #{ + in => query, + required => false, + desc => <<"Filter subscriptions by durability">> + }) } ]. @@ -167,7 +176,8 @@ format(WhichNode, {{Topic, _Subscriber}, SubOpts}) -> #{ topic => emqx_topic:maybe_format_share(Topic), clientid => maps:get(subid, SubOpts, null), - node => WhichNode + node => WhichNode, + durable => false }, maps:with([qos, nl, rap, rh], SubOpts) ). @@ -187,7 +197,22 @@ check_match_topic(#{<<"match_topic">> := MatchTopic}) -> check_match_topic(_) -> ok. -do_subscriptions_query(QString) -> +do_subscriptions_query(QString0) -> + {IsDurable, QString} = maps:take( + <<"durable">>, maps:merge(#{<<"durable">> => undefined}, QString0) + ), + case emqx_persistent_message:is_persistence_enabled() andalso IsDurable of + false -> + do_subscriptions_query_mem(QString); + true -> + do_subscriptions_query_persistent(QString); + undefined -> + merge_queries( + QString, fun do_subscriptions_query_mem/1, fun do_subscriptions_query_persistent/1 + ) + end. + +do_subscriptions_query_mem(QString) -> Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2], case maps:get(<<"node">>, QString, undefined) of undefined -> @@ -201,8 +226,196 @@ do_subscriptions_query(QString) -> end end. +do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} = QString) -> + Count = emqx_persistent_session_ds_router:stats(n_routes), + %% TODO: filtering by client ID can be implemented more efficiently: + FilterTopic = maps:get(<<"topic">>, QString, '_'), + Stream0 = emqx_persistent_session_ds_router:stream(FilterTopic), + SubPred = fun(Sub) -> + compare_optional(<<"topic">>, QString, topic, Sub) andalso + compare_optional(<<"clientid">>, QString, clientid, Sub) andalso + compare_optional(<<"qos">>, QString, qos, Sub) andalso + compare_match_topic_optional(<<"match_topic">>, QString, topic, Sub) + end, + NDropped = (Page - 1) * Limit, + {_, Stream} = consume_n_matching( + fun persistent_route_to_subscription/1, SubPred, NDropped, Stream0 + ), + {Subscriptions, Stream1} = consume_n_matching( + fun persistent_route_to_subscription/1, SubPred, Limit, Stream + ), + HasNext = Stream1 =/= [], + Meta = + case maps:is_key(<<"match_topic">>, QString) orelse maps:is_key(<<"qos">>, QString) of + true -> + %% Fuzzy searches shouldn't return count: + #{ + limit => Limit, + page => Page, + hasnext => HasNext + }; + false -> + #{ + count => Count, + limit => Limit, + page => Page, + hasnext => HasNext + } + end, + + #{ + meta => Meta, + data => Subscriptions + }. + +compare_optional(QField, Query, SField, Subscription) -> + case Query of + #{QField := Expected} -> + maps:get(SField, Subscription) =:= Expected; + _ -> + true + end. + +compare_match_topic_optional(QField, Query, SField, Subscription) -> + case Query of + #{QField := TopicFilter} -> + Topic = maps:get(SField, Subscription), + emqx_topic:match(Topic, TopicFilter); + _ -> + true + end. + +%% @doc Drop elements from the stream until encountered N elements +%% matching the predicate function. +-spec consume_n_matching( + fun((T) -> Q), + fun((Q) -> boolean()), + non_neg_integer(), + emqx_utils_stream:stream(T) +) -> {[Q], emqx_utils_stream:stream(T) | empty}. +consume_n_matching(Map, Pred, N, S) -> + consume_n_matching(Map, Pred, N, S, []). + +consume_n_matching(_Map, _Pred, _N, [], Acc) -> + {lists:reverse(Acc), []}; +consume_n_matching(_Map, _Pred, 0, S, Acc) -> + {lists:reverse(Acc), S}; +consume_n_matching(Map, Pred, N, S0, Acc) -> + case emqx_utils_stream:next(S0) of + [] -> + consume_n_matching(Map, Pred, N, [], Acc); + [Elem | S] -> + Mapped = Map(Elem), + case Pred(Mapped) of + true -> consume_n_matching(Map, Pred, N - 1, S, [Mapped | Acc]); + false -> consume_n_matching(Map, Pred, N, S, Acc) + end + end. + +persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) -> + case emqx_persistent_session_ds:get_client_subscription(SessionId, Topic) of + #{subopts := SubOpts} -> + #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts, + #{ + topic => Topic, + clientid => SessionId, + node => all, + + qos => Qos, + nl => Nl, + rh => Rh, + rap => Rap, + durable => true + }; + undefined -> + #{ + topic => Topic, + clientid => SessionId, + node => all, + durable => true + } + end. + +%% @private This function merges paginated results from two sources. +%% +%% Note: this implementation is far from ideal: `count' for the +%% queries may be missing, it may be larger than the actual number of +%% elements. This may lead to empty pages that can confuse the user. +%% +%% Not much can be done to mitigate that, though: since the count may +%% be incorrect, we cannot run simple math to determine when one +%% stream begins and another ends: it requires actual iteration. +%% +%% Ideally, the dashboard must be split between durable and mem +%% subscriptions, and this function should be removed for good. +merge_queries(QString0, Q1, Q2) -> + #{<<"limit">> := Limit, <<"page">> := Page} = QString0, + C1 = resp_count(QString0, Q1), + C2 = resp_count(QString0, Q2), + Meta = + case is_number(C1) andalso is_number(C2) of + true -> + #{ + count => C1 + C2, + limit => Limit, + page => Page + }; + false -> + #{ + limit => Limit, + page => Page + } + end, + case {C1, C2} of + {_, 0} -> + %% The second query is empty. Just return the result of Q1 as usual: + Q1(QString0); + {0, _} -> + %% The first query is empty. Just return the result of Q2 as usual: + Q2(QString0); + _ when is_number(C1) -> + %% Both queries are potentially non-empty, but we at least + %% have the page number for the first query. We try to + %% stich the pages together and thus respect the limit + %% (except for the page where the results switch from Q1 + %% to Q2). + + %% Page where data from the second query is estimated to + %% begin: + Q2Page = ceil(C1 / Limit), + case Page =< Q2Page of + true -> + #{data := Data, meta := #{hasnext := HN}} = Q1(QString0), + #{ + data => Data, + meta => Meta#{hasnext => HN orelse C2 > 0} + }; + false -> + QString = QString0#{<<"page">> => Page - Q2Page}, + #{data := Data, meta := #{hasnext := HN}} = Q2(QString), + #{data => Data, meta => Meta#{hasnext => HN}} + end; + _ -> + %% We don't know how many items is there in the first + %% query, and the second query is not empty (this includes + %% the case where `C2' is `undefined'). Best we can do is + %% to interleave the queries. This may produce less + %% results per page than `Limit'. + QString = QString0#{<<"limit">> => ceil(Limit / 2)}, + #{data := D1, meta := #{hasnext := HN1}} = Q1(QString), + #{data := D2, meta := #{hasnext := HN2}} = Q2(QString), + #{ + meta => Meta#{hasnext => HN1 or HN2}, + data => D1 ++ D2 + } + end. + +resp_count(Query, QFun) -> + #{meta := Meta} = QFun(Query#{<<"limit">> => 1, <<"page">> => 1}), + maps:get(count, Meta, undefined). + %%-------------------------------------------------------------------- -%% QueryString to MatchSpec +%% QueryString to MatchSpec (mem sessions) %%-------------------------------------------------------------------- -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). diff --git a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl index 356ae97e4..435a837e3 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_subscription_SUITE.erl @@ -36,17 +36,72 @@ -define(TOPIC_SORT, #{?TOPIC1 => 1, ?TOPIC2 => 2}). all() -> - emqx_common_test_helpers:all(?MODULE). + [ + {group, mem}, + {group, persistent} + ]. + +groups() -> + CommonTCs = emqx_common_test_helpers:all(?MODULE), + [ + {mem, CommonTCs}, + %% Shared subscriptions are currently not supported: + {persistent, CommonTCs -- [t_list_with_shared_sub, t_subscription_api]} + ]. init_per_suite(Config) -> - emqx_mgmt_api_test_util:init_suite(), + Apps = emqx_cth_suite:start( + [ + {emqx, + "session_persistence {\n" + " enable = true\n" + " renew_streams_interval = 10ms\n" + "}"}, + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard() + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)). + +init_per_group(persistent, Config) -> + ClientConfig = #{ + username => ?USERNAME, + clientid => ?CLIENTID, + proto_ver => v5, + clean_start => true, + properties => #{'Session-Expiry-Interval' => 300} + }, + [{client_config, ClientConfig}, {durable, true} | Config]; +init_per_group(mem, Config) -> + ClientConfig = #{ + username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5, clean_start => true + }, + [{client_config, ClientConfig}, {durable, false} | Config]. + +end_per_group(_, Config) -> Config. -end_per_suite(_) -> - emqx_mgmt_api_test_util:end_suite(). +init_per_testcase(_TC, Config) -> + case ?config(client_config, Config) of + ClientConfig when is_map(ClientConfig) -> + {ok, Client} = emqtt:start_link(ClientConfig), + {ok, _} = emqtt:connect(Client), + [{client, Client} | Config]; + _ -> + Config + end. + +end_per_testcase(_TC, Config) -> + Client = proplists:get_value(client, Config), + emqtt:disconnect(Client). t_subscription_api(Config) -> Client = proplists:get_value(client, Config), + Durable = atom_to_list(?config(durable, Config)), {ok, _, _} = emqtt:subscribe( Client, [ {?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]} @@ -54,12 +109,13 @@ t_subscription_api(Config) -> ), {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2), Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]), + timer:sleep(100), {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path), Data = emqx_utils_json:decode(Response, [return_maps]), Meta = maps:get(<<"meta">>, Data), ?assertEqual(1, maps:get(<<"page">>, Meta)), ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)), - ?assertEqual(2, maps:get(<<"count">>, Meta)), + ?assertEqual(2, maps:get(<<"count">>, Meta), Data), Subscriptions = maps:get(<<"data">>, Data), ?assertEqual(length(Subscriptions), 2), Sort = @@ -90,7 +146,8 @@ t_subscription_api(Config) -> {"node", atom_to_list(node())}, {"qos", "0"}, {"share_group", "test_group"}, - {"match_topic", "t/#"} + {"match_topic", "t/#"}, + {"durable", Durable} ], Headers = emqx_mgmt_api_test_util:auth_header_(), @@ -103,6 +160,7 @@ t_subscription_api(Config) -> t_subscription_fuzzy_search(Config) -> Client = proplists:get_value(client, Config), + Durable = atom_to_list(?config(durable, Config)), Topics = [ <<"t/foo">>, <<"t/foo/bar">>, @@ -116,7 +174,8 @@ t_subscription_fuzzy_search(Config) -> MatchQs = [ {"clientid", ?CLIENTID}, {"node", atom_to_list(node())}, - {"match_topic", "t/#"} + {"match_topic", "t/#"}, + {"durable", Durable} ], MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers), @@ -130,12 +189,13 @@ t_subscription_fuzzy_search(Config) -> LimitMatchQuery = [ {"clientid", ?CLIENTID}, {"match_topic", "+/+/+"}, - {"limit", "3"} + {"limit", "3"}, + {"durable", Durable} ], MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers), ?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2), - ?assertEqual(3, length(maps:get(<<"data">>, MatchData2))), + ?assertEqual(3, length(maps:get(<<"data">>, MatchData2)), MatchData2), MatchData2P2 = #{<<"meta">> := MatchMeta2P2} = @@ -176,8 +236,8 @@ t_list_with_shared_sub(_Config) -> ok. -t_list_with_invalid_match_topic(_Config) -> - Client = proplists:get_value(client, _Config), +t_list_with_invalid_match_topic(Config) -> + Client = proplists:get_value(client, Config), RealTopic = <<"t/+">>, Topic = <<"$share/g1/", RealTopic/binary>>, @@ -212,12 +272,3 @@ request_json(Method, Query, Headers) when is_list(Query) -> path() -> emqx_mgmt_api_test_util:api_path(["subscriptions"]). - -init_per_testcase(_TC, Config) -> - {ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}), - {ok, _} = emqtt:connect(Client), - [{client, Client} | Config]. - -end_per_testcase(_TC, Config) -> - Client = proplists:get_value(client, Config), - emqtt:disconnect(Client). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index d018ce709..05a2f711d 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -3345,7 +3345,6 @@ wait_n_events(NEvents, Timeout, EventName) -> end. assert_sync_retry_fail_then_succeed_inflight(Trace) -> - ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( #{?snk_kind := buffer_worker_flush_nack, ref := _Ref}, @@ -3365,7 +3364,6 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) -> ok. assert_async_retry_fail_then_succeed_inflight(Trace) -> - ct:pal(" ~p", [Trace]), ?assert( ?strict_causality( #{?snk_kind := handle_async_reply, action := nack}, diff --git a/changes/ce/fix-12874.en.md b/changes/ce/fix-12874.en.md new file mode 100644 index 000000000..1a5814b07 --- /dev/null +++ b/changes/ce/fix-12874.en.md @@ -0,0 +1,7 @@ +- Ensure consistency of the durable message replay when the subscriptions are modified before session reconnects + +- Persistent sessions save inflight packet IDs for the received QoS2 messages + +- Make behavior of the persistent sessions consistent with the non-persistent sessions in regard to overlapping subscriptions + +- List persistent subscriptions in the REST API