diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 0e17f71f2..78cf3825e 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -65,9 +65,20 @@ %% Route %%-------------------------------------------------------------------- +-record(share_dest, { + session_id :: emqx_session:session_id(), + group :: emqx_types:group() +}). + -record(route, { topic :: binary(), - dest :: node() | {binary(), node()} | emqx_session:session_id() | emqx_external_broker:dest() + dest :: + node() + | {binary(), node()} + | emqx_session:session_id() + %% One session can also have multiple subscriptions to the same topic through different groups + | #share_dest{} + | emqx_external_broker:dest() }). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 0984f9de8..bd763e62f 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -821,10 +821,12 @@ list_client_subscriptions(ClientId) -> {error, not_found} end. --spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) -> +-spec get_client_subscription(emqx_types:clientid(), topic_filter()) -> subscription() | undefined. -get_client_subscription(ClientId, Topic) -> - emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic). +get_client_subscription(ClientId, #share{} = ShareTopicFilter) -> + emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, ShareTopicFilter); +get_client_subscription(ClientId, TopicFilter) -> + emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, TopicFilter). %%-------------------------------------------------------------------- %% Session tables operations diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 3bf24407a..634207d12 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -29,10 +29,10 @@ -module(emqx_persistent_session_ds_shared_subs). -include("emqx_mqtt.hrl"). +-include("emqx.hrl"). -include("logger.hrl"). -include("session_internals.hrl"). --include_lib("emqx/include/emqx_persistent_message.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -export([ @@ -51,7 +51,10 @@ to_map/2 ]). --define(EPOCH_BITS, 15). +%% Management API: +-export([ + cold_get_subscription/2 +]). -define(schedule_subscribe, schedule_subscribe). -define(schedule_unsubscribe, schedule_unsubscribe). @@ -160,7 +163,7 @@ on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) -> update_subscription(Subscription, ShareTopicFilter, SubOpts, Session). -dialyzer({nowarn_function, create_new_subscription/3}). -create_new_subscription(#share{topic = TopicFilter} = ShareTopicFilter, SubOpts, #{ +create_new_subscription(#share{topic = TopicFilter, group = Group} = ShareTopicFilter, SubOpts, #{ id := SessionId, s := S0, shared_sub_s := #{agent := Agent} = SharedSubS0, @@ -172,9 +175,9 @@ create_new_subscription(#share{topic = TopicFilter} = ShareTopicFilter, SubOpts, ) of ok -> - ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), - _ = emqx_external_broker:add_persistent_route(TopicFilter, SessionId), - + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, #share_dest{ + session_id = SessionId, group = Group + }), #{upgrade_qos := UpgradeQoS} = Props, {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), @@ -259,7 +262,9 @@ schedule_subscribe( ) -> {ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()} | {error, emqx_types:reason_code()}. -on_unsubscribe(SessionId, #share{topic = TopicFilter} = ShareTopicFilter, S0, SharedSubS0) -> +on_unsubscribe( + SessionId, #share{topic = TopicFilter, group = Group} = ShareTopicFilter, S0, SharedSubS0 +) -> case lookup(ShareTopicFilter, S0) of undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}; @@ -267,8 +272,9 @@ on_unsubscribe(SessionId, #share{topic = TopicFilter} = ShareTopicFilter, S0, Sh ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, share_topic_filter => ShareTopicFilter }), - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId), - _ = emqx_external_broker:delete_persistent_route(TopicFilter, SessionId), + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, #share_dest{ + session_id = SessionId, group = Group + }), S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0), SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter), {ok, S, SharedSubS, Subscription} @@ -588,9 +594,32 @@ on_info(S, #{agent := Agent0} = SharedSubS0, Info) -> %% to_map -spec to_map(emqx_persistent_session_ds_state:t(), t()) -> map(). -to_map(_S, _SharedSubS) -> - %% TODO - #{}. +to_map(S, _SharedSubS) -> + fold_shared_subs( + fun(ShareTopicFilter, _, Acc) -> Acc#{ShareTopicFilter => lookup(ShareTopicFilter, S)} end, + #{}, + S + ). + +%%-------------------------------------------------------------------- +%% cold_get_subscription + +-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> + emqx_persistent_session_ds:subscription() | undefined. +cold_get_subscription(SessionId, ShareTopicFilter) -> + case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, ShareTopicFilter) of + [Sub = #{current_state := SStateId}] -> + case + emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId) + of + [#{subopts := Subopts}] -> + Sub#{subopts => Subopts}; + _ -> + undefined + end; + _ -> + undefined + end. %%-------------------------------------------------------------------- %% Generic helpers @@ -629,21 +658,13 @@ stream_progress( Stream, #srs{ it_end = EndIt, - it_begin = BeginIt, - first_seqno_qos1 = StartQos1, - first_seqno_qos2 = StartQos2 + it_begin = BeginIt } = SRS ) -> - Qos1Acked = n_acked(?QOS_1, CommQos1, StartQos1), - Qos2Acked = n_acked(?QOS_2, CommQos2, StartQos2), Iterator = case is_stream_fully_acked(CommQos1, CommQos2, SRS) of - true -> - EndIt; - false -> - emqx_ds_skipping_iterator:update_or_new( - BeginIt, Qos1Acked, Qos2Acked - ) + true -> EndIt; + false -> BeginIt end, #{ stream => Stream, @@ -714,19 +735,6 @@ is_stream_fully_acked(_, _, #srs{ is_stream_fully_acked(Comm1, Comm2, #srs{last_seqno_qos1 = S1, last_seqno_qos2 = S2}) -> (Comm1 >= S1) andalso (Comm2 >= S2). -n_acked(Qos, A, B) -> - max(seqno_diff(Qos, A, B), 0). - --dialyzer({nowarn_function, seqno_diff/3}). -seqno_diff(?QOS_1, A, B) -> - %% For QoS1 messages we skip a seqno every time the epoch changes, - %% we need to substract that from the diff: - EpochA = A bsr ?EPOCH_BITS, - EpochB = B bsr ?EPOCH_BITS, - A - B - (EpochA - EpochB); -seqno_diff(?QOS_2, A, B) -> - A - B. - %%-------------------------------------------------------------------- %% Formatters %%-------------------------------------------------------------------- diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index dfc2203c4..4f99a8455 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -365,10 +365,13 @@ t_disconnect_no_double_replay1(_Config) -> end end, - {Missing, Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), + {Missing, _Duplicate} = verify_received_pubs(Pubs, 2 * NPubs, ClientByBid), ?assertEqual([], Missing), - ?assertEqual([], Duplicate), + + %% We cannnot garantee that the message are not duplicated until we are able + %% to send progress of a partially replayed stream range to the leader. + % ?assertEqual([], Duplicate), ok = emqtt:disconnect(ConnShared1), ok = emqtt:disconnect(ConnPub). @@ -395,10 +398,12 @@ t_disconnect_no_double_replay2(_Config) -> ConnShared12 = emqtt_connect_sub(<<"client_shared12">>), {ok, _, _} = emqtt:subscribe(ConnShared12, <<"$share/gr12/topic12/#">>, 1), - ?assertNotReceive( - {publish, #{payload := <<"1">>}}, - 3000 - ), + %% We cannnot garantee that the message is not duplicated until we are able + %% to send progress of a partially replayed stream range to the leader. + % ?assertNotReceive( + % {publish, #{payload := <<"1">>}}, + % 3000 + % ), ok = emqtt:disconnect(ConnShared12). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 38d63e41f..69de92325 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -132,7 +132,7 @@ %% TODO: Not implemented -type iterator_id() :: term(). --type iterator() :: ds_specific_iterator(). +-opaque iterator() :: ds_specific_iterator(). -opaque delete_iterator() :: ds_specific_delete_iterator(). @@ -401,14 +401,10 @@ make_iterator(DB, Stream, TopicFilter, StartTime) -> -spec update_iterator(db(), iterator(), message_key()) -> make_iterator_result(). -update_iterator(DB, ?skipping_iterator_match = OldIter, DSKey) -> - emqx_ds_skipping_iterator:update_iterator(DB, OldIter, DSKey); update_iterator(DB, OldIter, DSKey) -> ?module(DB):update_iterator(DB, OldIter, DSKey). -spec next(db(), iterator(), pos_integer()) -> next_result(). -next(DB, ?skipping_iterator_match = Iter, BatchSize) -> - emqx_ds_skipping_iterator:next(DB, Iter, BatchSize); next(DB, Iter, BatchSize) -> ?module(DB):next(DB, Iter, BatchSize). diff --git a/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.erl b/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.erl deleted file mode 100644 index b14bd26f8..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.erl +++ /dev/null @@ -1,86 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ds_skipping_iterator). - --include("emqx_ds_skipping_iterator.hrl"). - --type t() :: ?skipping_iterator(emqx_ds:iterator(), non_neg_integer(), non_neg_integer()). - --export([ - update_or_new/3, - update_iterator/3, - next/3 -]). - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - --spec update_or_new(t() | emqx_ds:iterator(), non_neg_integer(), non_neg_integer()) -> t(). -update_or_new(?skipping_iterator_match(Iterator, Q1Skip0, Q2Skip0), Q1Skip, Q2Skip) when - Q1Skip >= 0 andalso Q2Skip >= 0 --> - ?skipping_iterator(Iterator, Q1Skip0 + Q1Skip, Q2Skip0 + Q2Skip); -update_or_new(Iterator, Q1Skip, Q2Skip) when Q1Skip >= 0 andalso Q2Skip >= 0 -> - ?skipping_iterator(Iterator, Q1Skip, Q2Skip). - --spec next(emqx_ds:db(), t(), pos_integer()) -> emqx_ds:next_result(t()). -next(DB, ?skipping_iterator_match(Iterator0, Q1Skip0, Q2Skip0), Count) -> - case emqx_ds:next(DB, Iterator0, Count) of - {error, _, _} = Error -> - Error; - {ok, end_of_stream} -> - {ok, end_of_stream}; - {ok, Iterator1, Messages0} -> - {Messages1, Q1Skip1, Q2Skip1} = skip(Messages0, Q1Skip0, Q2Skip0), - case {Q1Skip1, Q2Skip1} of - {0, 0} -> {ok, Iterator1, Messages1}; - _ -> {ok, ?skipping_iterator(Iterator1, Q1Skip1, Q2Skip1)} - end - end. - --spec update_iterator(emqx_ds:db(), emqx_ds:iterator(), emqx_ds:message_key()) -> - emqx_ds:make_iterator_result(). -update_iterator(DB, ?skipping_iterator_match(Iterator0, Q1Skip0, Q2Skip0), Key) -> - case emqx_ds:update_iterator(DB, Iterator0, Key) of - {error, _, _} = Error -> Error; - {ok, Iterator1} -> {ok, ?skipping_iterator(Iterator1, Q1Skip0, Q2Skip0)} - end. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -skip(Messages, Q1Skip, Q2Skip) -> - skip(Messages, Q1Skip, Q2Skip, []). - -skip([], Q1Skip, Q2Skip, Agg) -> - {lists:reverse(Agg), Q1Skip, Q2Skip}; -skip([{Key, Message} | Messages], Q1Skip, Q2Skip, Agg) -> - Qos = emqx_message:qos(Message), - skip({Key, Message}, Qos, Messages, Q1Skip, Q2Skip, Agg). - -skip(_KeyMessage, ?QOS_0, Messages, Q1Skip, Q2Skip, Agg) -> - skip(Messages, Q1Skip, Q2Skip, Agg); -skip(_KeyMessage, ?QOS_1, Messages, Q1Skip, Q2Skip, Agg) when Q1Skip > 0 -> - skip(Messages, Q1Skip - 1, Q2Skip, Agg); -skip(KeyMessage, ?QOS_1, Messages, 0, Q2Skip, Agg) -> - skip(Messages, 0, Q2Skip, [KeyMessage | Agg]); -skip(_KeyMessage, ?QOS_2, Messages, Q1Skip, Q2Skip, Agg) when Q2Skip > 0 -> - skip(Messages, Q1Skip, Q2Skip - 1, Agg); -skip(KeyMessage, ?QOS_2, Messages, Q1Skip, 0, Agg) -> - skip(Messages, Q1Skip, 0, [KeyMessage | Agg]). diff --git a/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.hrl b/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.hrl deleted file mode 100644 index 6ec8ba16c..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_skipping_iterator.hrl +++ /dev/null @@ -1,36 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --define(tag, 1). --define(it, 2). --define(qos1_skip, 3). --define(qos2_skip, 4). - --define(IT, -1000). - --define(skipping_iterator_match, #{?tag := ?IT}). - --define(skipping_iterator_match(Iterator, Q1Skip, Q2Skip), #{ - ?tag := ?IT, ?it := Iterator, ?qos1_skip := Q1Skip, ?qos2_skip := Q2Skip -}). - --define(skipping_iterator(Iterator, Q1Skip, Q2Skip), #{ - ?tag => ?IT, ?it => Iterator, ?qos1_skip => Q1Skip, ?qos2_skip => Q2Skip -}). - --define(QOS_0, 0). --define(QOS_1, 1). --define(QOS_2, 2). diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index b9cefeb1f..c4aa55463 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -328,13 +328,13 @@ consume_n_matching(Map, Pred, N, S0, Acc) -> end end. -persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) -> - case emqx_persistent_session_ds:get_client_subscription(SessionId, Topic) of +persistent_route_to_subscription(#route{dest = Dest} = Route) -> + case get_client_subscription(Route) of #{subopts := SubOpts} -> #{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts, #{ - topic => Topic, - clientid => SessionId, + topic => format_topic(Route), + clientid => session_id(Dest), node => all, qos => Qos, @@ -345,13 +345,26 @@ persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) -> }; undefined -> #{ - topic => Topic, - clientid => SessionId, + topic => format_topic(Route), + clientid => session_id(Dest), node => all, durable => true } end. +get_client_subscription(#route{topic = Topic, dest = #share_dest{session_id = SessionId, group = Group}}) -> + emqx_persistent_session_ds:get_client_subscription(SessionId, #share{topic = Topic, group = Group}); +get_client_subscription(#route{topic = Topic, dest = SessionId}) -> + emqx_persistent_session_ds:get_client_subscription(SessionId, Topic). + +session_id(#share_dest{session_id = SessionId}) -> SessionId; +session_id(SessionId) -> SessionId. + +format_topic(#route{topic = Topic, dest = #share_dest{group = Group}}) -> + <<"$share/", Group/binary, "/", Topic/binary>>; +format_topic(#route{topic = Topic}) -> + Topic. + %% @private This function merges paginated results from two sources. %% %% Note: this implementation is far from ideal: `count' for the