From 903b3863d1fe0e627f547b3db06db3bed9f8388b Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 5 Oct 2023 17:17:08 -0300 Subject: [PATCH] chore(ps_ds): make persistent session module use new `emqx_ds` APIs --- .../emqx_persistent_session_ds_SUITE.erl | 125 +++----- apps/emqx/src/emqx_persistent_message.erl | 2 +- ...ds.erl_ => emqx_persistent_session_ds.erl} | 286 +++++++++--------- .../test/emqx_persistent_messages_SUITE.erl | 66 ++-- .../src/emqx_ds_replication_layer.erl | 10 +- 5 files changed, 203 insertions(+), 286 deletions(-) rename apps/emqx/src/{emqx_persistent_session_ds.erl_ => emqx_persistent_session_ds.erl} (66%) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index d2d23e8cd..ee5d203e4 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -14,7 +14,6 @@ -define(DEFAULT_KEYSPACE, default). -define(DS_SHARD_ID, <<"local">>). -define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). --define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). -import(emqx_common_test_helpers, [on_exit/1]). @@ -91,9 +90,6 @@ get_mqtt_port(Node, Type) -> {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), Port. -get_all_iterator_refs(Node) -> - erpc:call(Node, mnesia, dirty_all_keys, [?ITERATOR_REF_TAB]). - get_all_iterator_ids(Node) -> Fn = fun(K, _V, Acc) -> [K | Acc] end, erpc:call(Node, fun() -> @@ -126,6 +122,32 @@ start_client(Opts0 = #{}) -> on_exit(fun() -> catch emqtt:stop(Client) end), Client. +restart_node(Node, NodeSpec) -> + ?tp(will_restart_node, #{}), + ?tp(notice, "restarting node", #{node => Node}), + true = monitor_node(Node, true), + ok = erpc:call(Node, init, restart, []), + receive + {nodedown, Node} -> + ok + after 10_000 -> + ct:fail("node ~p didn't stop", [Node]) + end, + ?tp(notice, "waiting for nodeup", #{node => Node}), + wait_nodeup(Node), + wait_gen_rpc_down(NodeSpec), + ?tp(notice, "restarting apps", #{node => Node}), + Apps = maps:get(apps, NodeSpec), + ok = erpc:call(Node, emqx_cth_suite, load_apps, [Apps]), + _ = erpc:call(Node, emqx_cth_suite, start_apps, [Apps, NodeSpec]), + %% have to re-inject this so that we may stop the node succesfully at the + %% end.... + ok = emqx_cth_cluster:set_node_opts(Node, NodeSpec), + ok = snabbkaffe:forward_trace(Node), + ?tp(notice, "node restarted", #{node => Node}), + ?tp(restarted_node, #{}), + ok. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -143,24 +165,14 @@ t_non_persistent_session_subscription(_Config) -> {ok, _} = emqtt:connect(Client), ?tp(notice, "subscribing", #{}), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Client, SubTopicFilter, qos2), - IteratorRefs = get_all_iterator_refs(node()), - IteratorIds = get_all_iterator_ids(node()), ok = emqtt:stop(Client), - #{ - iterator_refs => IteratorRefs, - iterator_ids => IteratorIds - } + ok end, - fun(Res, Trace) -> + fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - #{ - iterator_refs := IteratorRefs, - iterator_ids := IteratorIds - } = Res, - ?assertEqual([], IteratorRefs), - ?assertEqual({ok, []}, IteratorIds), + ?assertEqual([], ?of_kind(ds_session_subscription_added, Trace)), ok end ), @@ -175,7 +187,7 @@ t_session_subscription_idempotency(Config) -> ?check_trace( begin ?force_ordering( - #{?snk_kind := persistent_session_ds_iterator_added}, + #{?snk_kind := persistent_session_ds_subscription_added}, _NEvents0 = 1, #{?snk_kind := will_restart_node}, _Guard0 = true @@ -187,32 +199,7 @@ t_session_subscription_idempotency(Config) -> _Guard1 = true ), - spawn_link(fun() -> - ?tp(will_restart_node, #{}), - ?tp(notice, "restarting node", #{node => Node1}), - true = monitor_node(Node1, true), - ok = erpc:call(Node1, init, restart, []), - receive - {nodedown, Node1} -> - ok - after 10_000 -> - ct:fail("node ~p didn't stop", [Node1]) - end, - ?tp(notice, "waiting for nodeup", #{node => Node1}), - wait_nodeup(Node1), - wait_gen_rpc_down(Node1Spec), - ?tp(notice, "restarting apps", #{node => Node1}), - Apps = maps:get(apps, Node1Spec), - ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]), - _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]), - %% have to re-inject this so that we may stop the node succesfully at the - %% end.... - ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec), - ok = snabbkaffe:forward_trace(Node1), - ?tp(notice, "node restarted", #{node => Node1}), - ?tp(restarted_node, #{}), - ok - end), + spawn_link(fun() -> restart_node(Node1, Node1Spec) end), ?tp(notice, "starting 1", #{}), Client0 = start_client(#{port => Port, clientid => ClientId}), @@ -223,7 +210,7 @@ t_session_subscription_idempotency(Config) -> receive {'EXIT', {shutdown, _}} -> ok - after 0 -> ok + after 100 -> ok end, process_flag(trap_exit, false), @@ -240,10 +227,7 @@ t_session_subscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - %% Exactly one iterator should have been opened. SubTopicFilterWords = emqx_topic:words(SubTopicFilter), - ?assertEqual([{ClientId, SubTopicFilterWords}], get_all_iterator_refs(Node1)), - ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), ?assertMatch( {ok, #{}, #{SubTopicFilterWords := #{}}}, erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) @@ -262,7 +246,10 @@ t_session_unsubscription_idempotency(Config) -> ?check_trace( begin ?force_ordering( - #{?snk_kind := persistent_session_ds_close_iterators, ?snk_span := {complete, _}}, + #{ + ?snk_kind := persistent_session_ds_subscription_delete, + ?snk_span := {complete, _} + }, _NEvents0 = 1, #{?snk_kind := will_restart_node}, _Guard0 = true @@ -270,36 +257,11 @@ t_session_unsubscription_idempotency(Config) -> ?force_ordering( #{?snk_kind := restarted_node}, _NEvents1 = 1, - #{?snk_kind := persistent_session_ds_iterator_delete, ?snk_span := start}, + #{?snk_kind := persistent_session_ds_subscription_route_delete, ?snk_span := start}, _Guard1 = true ), - spawn_link(fun() -> - ?tp(will_restart_node, #{}), - ?tp(notice, "restarting node", #{node => Node1}), - true = monitor_node(Node1, true), - ok = erpc:call(Node1, init, restart, []), - receive - {nodedown, Node1} -> - ok - after 10_000 -> - ct:fail("node ~p didn't stop", [Node1]) - end, - ?tp(notice, "waiting for nodeup", #{node => Node1}), - wait_nodeup(Node1), - wait_gen_rpc_down(Node1Spec), - ?tp(notice, "restarting apps", #{node => Node1}), - Apps = maps:get(apps, Node1Spec), - ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]), - _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]), - %% have to re-inject this so that we may stop the node succesfully at the - %% end.... - ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec), - ok = snabbkaffe:forward_trace(Node1), - ?tp(notice, "node restarted", #{node => Node1}), - ?tp(restarted_node, #{}), - ok - end), + spawn_link(fun() -> restart_node(Node1, Node1Spec) end), ?tp(notice, "starting 1", #{}), Client0 = start_client(#{port => Port, clientid => ClientId}), @@ -312,7 +274,7 @@ t_session_unsubscription_idempotency(Config) -> receive {'EXIT', {shutdown, _}} -> ok - after 0 -> ok + after 100 -> ok end, process_flag(trap_exit, false), @@ -327,7 +289,7 @@ t_session_unsubscription_idempotency(Config) -> ?wait_async_action( emqtt:unsubscribe(Client1, SubTopicFilter), #{ - ?snk_kind := persistent_session_ds_iterator_delete, + ?snk_kind := persistent_session_ds_subscription_route_delete, ?snk_span := {complete, _} }, 15_000 @@ -339,9 +301,10 @@ t_session_unsubscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - %% No iterators remaining - ?assertEqual([], get_all_iterator_refs(Node1)), - ?assertEqual({ok, []}, get_all_iterator_ids(Node1)), + ?assertMatch( + {ok, #{}, Subs = #{}} when map_size(Subs) =:= 0, + erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) + ), ok end ), diff --git a/apps/emqx/src/emqx_persistent_message.erl b/apps/emqx/src/emqx_persistent_message.erl index 82717cd01..f3ec9def5 100644 --- a/apps/emqx/src/emqx_persistent_message.erl +++ b/apps/emqx/src/emqx_persistent_message.erl @@ -42,7 +42,7 @@ init() -> ?WHEN_ENABLED(begin ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{}), ok = emqx_persistent_session_ds_router:init_tables(), - %ok = emqx_persistent_session_ds:create_tables(), + ok = emqx_persistent_session_ds:create_tables(), ok end). diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl_ b/apps/emqx/src/emqx_persistent_session_ds.erl similarity index 66% rename from apps/emqx/src/emqx_persistent_session_ds.erl_ rename to apps/emqx/src/emqx_persistent_session_ds.erl index 3fff5f7ba..9bc9e0b91 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl_ +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -65,10 +65,13 @@ %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% an atom, in theory (?). -type id() :: binary(). --type iterator() :: emqx_ds:iterator(). --type iterator_id() :: emqx_ds:iterator_id(). -type topic_filter() :: emqx_ds:topic_filter(). --type iterators() :: #{topic_filter() => iterator()}. +-type subscription_id() :: {id(), topic_filter()}. +-type subscription() :: #{ + start_time := emqx_ds:time(), + propts := map(), + extra := map() +}. -type session() :: #{ %% Client ID id := id(), @@ -77,7 +80,7 @@ %% When the session should expire expires_at := timestamp() | never, %% Client’s Subscriptions. - iterators := #{topic() => iterator()}, + iterators := #{topic() => subscription()}, %% props := map() }. @@ -90,6 +93,8 @@ -export_type([id/0]). +-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). + %% -spec create(clientinfo(), conninfo(), emqx_session:conf()) -> @@ -121,17 +126,17 @@ ensure_session(ClientID, Conf) -> open_session(ClientID) -> case session_open(ClientID) of - {ok, Session, Iterators} -> - Session#{iterators => prep_iterators(Iterators)}; + {ok, Session, Subscriptions} -> + Session#{iterators => prep_subscriptions(Subscriptions)}; false -> false end. -prep_iterators(Iterators) -> +prep_subscriptions(Subscriptions) -> maps:fold( - fun(Topic, Iterator, Acc) -> Acc#{emqx_topic:join(Topic) => Iterator} end, + fun(Topic, Subscription, Acc) -> Acc#{emqx_topic:join(Topic) => Subscription} end, #{}, - Iterators + Subscriptions ). -spec destroy(session() | clientinfo()) -> ok. @@ -228,7 +233,7 @@ unsubscribe( ) when is_map_key(TopicFilter, Iters) -> Iterator = maps:get(TopicFilter, Iters), SubOpts = maps:get(props, Iterator), - ok = del_subscription(TopicFilter, Iterator, ID), + ok = del_subscription(TopicFilter, ID), {ok, Session#{iterators := maps:remove(TopicFilter, Iters)}, SubOpts}; unsubscribe( _TopicFilter, @@ -327,91 +332,67 @@ terminate(_Reason, _Session = #{}) -> %%-------------------------------------------------------------------- -spec add_subscription(topic(), emqx_types:subopts(), id()) -> - emqx_ds:iterator(). + subscription(). add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> - % N.B.: we chose to update the router before adding the subscription to the - % session/iterator table. The reasoning for this is as follows: - % - % Messages matching this topic filter should start to be persisted as soon as - % possible to avoid missing messages. If this is the first such persistent - % session subscription, it's important to do so early on. - % - % This could, in turn, lead to some inconsistency: if such a route gets - % created but the session/iterator data fails to be updated accordingly, we - % have a dangling route. To remove such dangling routes, we may have a - % periodic GC process that removes routes that do not have a matching - % persistent subscription. Also, route operations use dirty mnesia - % operations, which inherently have room for inconsistencies. - % - % In practice, we use the iterator reference table as a source of truth, - % since it is guarded by a transaction context: we consider a subscription - % operation to be successful if it ended up changing this table. Both router - % and iterator information can be reconstructed from this table, if needed. + %% N.B.: we chose to update the router before adding the subscription to the + %% session/iterator table. The reasoning for this is as follows: + %% + %% Messages matching this topic filter should start to be persisted as soon as + %% possible to avoid missing messages. If this is the first such persistent + %% session subscription, it's important to do so early on. + %% + %% This could, in turn, lead to some inconsistency: if such a route gets + %% created but the session/iterator data fails to be updated accordingly, we + %% have a dangling route. To remove such dangling routes, we may have a + %% periodic GC process that removes routes that do not have a matching + %% persistent subscription. Also, route operations use dirty mnesia + %% operations, which inherently have room for inconsistencies. + %% + %% In practice, we use the iterator reference table as a source of truth, + %% since it is guarded by a transaction context: we consider a subscription + %% operation to be successful if it ended up changing this table. Both router + %% and iterator information can be reconstructed from this table, if needed. ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID), TopicFilter = emqx_topic:words(TopicFilterBin), - {ok, Iterator, IsNew} = session_add_iterator( + {ok, DSSubExt, IsNew} = session_add_subscription( DSSessionID, TopicFilter, SubOpts ), - Ctx = #{iterator => Iterator, is_new => IsNew}, - ?tp(persistent_session_ds_iterator_added, Ctx), + ?tp(persistent_session_ds_subscription_added, #{sub => DSSubExt, is_new => IsNew}), + %% we'll list streams and open iterators when implementing message replay. + DSSubExt. + +-spec update_subscription(topic(), subscription(), emqx_types:subopts(), id()) -> + subscription(). +update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) -> + TopicFilter = emqx_topic:words(TopicFilterBin), + {ok, NDSSubExt, false} = session_add_subscription( + DSSessionID, TopicFilter, SubOpts + ), + ok = ?tp(persistent_session_ds_iterator_updated, #{sub => DSSubExt}), + NDSSubExt. + +-spec del_subscription(topic(), id()) -> + ok. +del_subscription(TopicFilterBin, DSSessionId) -> + TopicFilter = emqx_topic:words(TopicFilterBin), ?tp_span( - persistent_session_ds_open_iterators, - Ctx, - ok = open_iterator_on_all_shards(TopicFilter, Iterator) + persistent_session_ds_subscription_delete, + #{session_id => DSSessionId}, + ok = session_del_subscription(DSSessionId, TopicFilter) ), - Iterator. - --spec update_subscription(topic(), iterator(), emqx_types:subopts(), id()) -> - iterator(). -update_subscription(TopicFilterBin, Iterator, SubOpts, DSSessionID) -> - TopicFilter = emqx_topic:words(TopicFilterBin), - {ok, NIterator, false} = session_add_iterator( - DSSessionID, TopicFilter, SubOpts - ), - ok = ?tp(persistent_session_ds_iterator_updated, #{iterator => Iterator}), - NIterator. - --spec open_iterator_on_all_shards(emqx_types:words(), emqx_ds:iterator()) -> ok. -open_iterator_on_all_shards(TopicFilter, Iterator) -> - ?tp(persistent_session_ds_will_open_iterators, #{iterator => Iterator}), - %% Note: currently, shards map 1:1 to nodes, but this will change in the future. - Nodes = emqx:running_nodes(), - Results = emqx_persistent_session_ds_proto_v1:open_iterator( - Nodes, - TopicFilter, - maps:get(start_time, Iterator), - maps:get(id, Iterator) - ), - %% TODO - %% 1. Handle errors. - %% 2. Iterator handles are rocksdb resources, it's doubtful they survive RPC. - %% Even if they do, we throw them away here anyway. All in all, we probably should - %% hold each of them in a process on the respective node. - true = lists:all(fun(Res) -> element(1, Res) =:= ok end, Results), - ok. - -%% RPC target. --spec do_open_iterator(emqx_types:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> - {ok, emqx_ds_storage_layer:iterator()} | {error, _Reason}. -do_open_iterator(TopicFilter, StartMS, _IteratorID) -> - %% TODO: wrong - {ok, emqx_ds:make_iterator(TopicFilter, StartMS)}. - --spec del_subscription(topic(), iterator(), id()) -> - ok. -del_subscription(TopicFilterBin, #{id := IteratorID}, DSSessionID) -> - % N.B.: see comments in `?MODULE:add_subscription' for a discussion about the - % order of operations here. - TopicFilter = emqx_topic:words(TopicFilterBin), - ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionID). + ?tp_span( + persistent_session_ds_subscription_route_delete, + #{session_id => DSSessionId}, + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionId) + ). %%-------------------------------------------------------------------- %% Session tables operations %%-------------------------------------------------------------------- -define(SESSION_TAB, emqx_ds_session). --define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). --define(DS_MRIA_SHARD, emqx_ds_shard). +-define(SESSION_SUBSCRIPTIONS_TAB, emqx_ds_session_subscriptions). +-define(DS_MRIA_SHARD, emqx_ds_session_shard). -record(session, { %% same as clientid @@ -423,12 +404,13 @@ del_subscription(TopicFilterBin, #{id := IteratorID}, DSSessionID) -> props = #{} :: map() }). --record(iterator_ref, { - ref_id :: {id(), emqx_ds:topic_filter()}, - it_id :: emqx_ds:iterator_id(), +-record(ds_sub, { + id :: subscription_id(), start_time :: emqx_ds:time(), - props = #{} :: map() + props = #{} :: map(), + extra = #{} :: map() }). +-type ds_sub() :: #ds_sub{}. create_tables() -> ok = mria:create_table( @@ -442,15 +424,16 @@ create_tables() -> ] ), ok = mria:create_table( - ?ITERATOR_REF_TAB, + ?SESSION_SUBSCRIPTIONS_TAB, [ {rlog_shard, ?DS_MRIA_SHARD}, {type, ordered_set}, {storage, storage()}, - {record_name, iterator_ref}, - {attributes, record_info(fields, iterator_ref)} + {record_name, ds_sub}, + {attributes, record_info(fields, ds_sub)} ] ), + ok = mria:wait_for_tables([?SESSION_TAB, ?SESSION_SUBSCRIPTIONS_TAB]), ok. -dialyzer({nowarn_function, storage/0}). @@ -471,26 +454,26 @@ storage() -> %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. -spec session_open(id()) -> - {ok, session(), iterators()} | false. + {ok, session(), #{topic() => subscription()}} | false. session_open(SessionId) -> transaction(fun() -> case mnesia:read(?SESSION_TAB, SessionId, write) of [Record = #session{}] -> - Session = export_record(Record), - IteratorRefs = session_read_iterators(SessionId), - Iterators = export_iterators(IteratorRefs), - {ok, Session, Iterators}; + Session = export_session(Record), + DSSubs = session_read_subscriptions(SessionId), + Subscriptions = export_subscriptions(DSSubs), + {ok, Session, Subscriptions}; [] -> false end end). -spec session_ensure_new(id(), _Props :: map()) -> - {ok, session(), iterators()}. + {ok, session(), #{topic() => subscription()}}. session_ensure_new(SessionId, Props) -> transaction(fun() -> - ok = session_drop_iterators(SessionId), - Session = export_record(session_create(SessionId, Props)), + ok = session_drop_subscriptions(SessionId), + Session = export_session(session_create(SessionId, Props)), {ok, Session, #{}} end). @@ -510,80 +493,80 @@ session_create(SessionId, Props) -> session_drop(DSSessionId) -> transaction(fun() -> %% TODO: ensure all iterators from this clientid are closed? - ok = session_drop_iterators(DSSessionId), + ok = session_drop_subscriptions(DSSessionId), ok = mnesia:delete(?SESSION_TAB, DSSessionId, write) end). -session_drop_iterators(DSSessionId) -> - IteratorRefs = session_read_iterators(DSSessionId), - ok = lists:foreach(fun session_del_iterator/1, IteratorRefs). +session_drop_subscriptions(DSSessionId) -> + IteratorRefs = session_read_subscriptions(DSSessionId), + ok = lists:foreach(fun session_del_subscription/1, IteratorRefs). %% @doc Called when a client subscribes to a topic. Idempotent. --spec session_add_iterator(id(), topic_filter(), _Props :: map()) -> - {ok, iterator(), _IsNew :: boolean()}. -session_add_iterator(DSSessionId, TopicFilter, Props) -> - IteratorRefId = {DSSessionId, TopicFilter}, +-spec session_add_subscription(id(), topic_filter(), _Props :: map()) -> + {ok, subscription(), _IsNew :: boolean()}. +session_add_subscription(DSSessionId, TopicFilter, Props) -> + DSSubId = {DSSessionId, TopicFilter}, transaction(fun() -> - case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of + case mnesia:read(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write) of [] -> - IteratorRef = session_insert_iterator(DSSessionId, TopicFilter, Props), - Iterator = export_record(IteratorRef), + DSSub = session_insert_subscription(DSSessionId, TopicFilter, Props), + DSSubExt = export_subscription(DSSub), ?tp( ds_session_subscription_added, - #{iterator => Iterator, session_id => DSSessionId} + #{sub => DSSubExt, session_id => DSSessionId} ), - {ok, Iterator, _IsNew = true}; - [#iterator_ref{} = IteratorRef] -> - NIteratorRef = session_update_iterator(IteratorRef, Props), - NIterator = export_record(NIteratorRef), + {ok, DSSubExt, _IsNew = true}; + [#ds_sub{} = DSSub] -> + NDSSub = session_update_subscription(DSSub, Props), + NDSSubExt = export_subscription(NDSSub), ?tp( ds_session_subscription_present, - #{iterator => NIterator, session_id => DSSessionId} + #{sub => NDSSubExt, session_id => DSSessionId} ), - {ok, NIterator, _IsNew = false} + {ok, NDSSubExt, _IsNew = false} end end). -session_insert_iterator(DSSessionId, TopicFilter, Props) -> - {IteratorId, StartMS} = new_iterator_id(DSSessionId), - IteratorRef = #iterator_ref{ - ref_id = {DSSessionId, TopicFilter}, - it_id = IteratorId, +-spec session_insert_subscription(id(), topic_filter(), map()) -> ds_sub(). +session_insert_subscription(DSSessionId, TopicFilter, Props) -> + {DSSubId, StartMS} = new_subscription_id(DSSessionId, TopicFilter), + DSSub = #ds_sub{ + id = DSSubId, start_time = StartMS, - props = Props + props = Props, + extra = #{} }, - ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write), - IteratorRef. + ok = mnesia:write(?SESSION_SUBSCRIPTIONS_TAB, DSSub, write), + DSSub. -session_update_iterator(IteratorRef, Props) -> - NIteratorRef = IteratorRef#iterator_ref{props = Props}, - ok = mnesia:write(?ITERATOR_REF_TAB, NIteratorRef, write), - NIteratorRef. +-spec session_update_subscription(ds_sub(), map()) -> ds_sub(). +session_update_subscription(DSSub, Props) -> + NDSSub = DSSub#ds_sub{props = Props}, + ok = mnesia:write(?SESSION_SUBSCRIPTIONS_TAB, NDSSub, write), + NDSSub. -%% @doc Called when a client unsubscribes from a topic. --spec session_del_iterator(id(), topic_filter()) -> ok. -session_del_iterator(DSSessionId, TopicFilter) -> - IteratorRefId = {DSSessionId, TopicFilter}, +session_del_subscription(DSSessionId, TopicFilter) -> + DSSubId = {DSSessionId, TopicFilter}, transaction(fun() -> - mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write) + mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write) end). -session_del_iterator(#iterator_ref{ref_id = IteratorRefId}) -> - mnesia:delete(?ITERATOR_REF_TAB, IteratorRefId, write). +session_del_subscription(#ds_sub{id = DSSubId}) -> + mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write). -session_read_iterators(DSSessionId) -> +session_read_subscriptions(DSSessionId) -> % NOTE: somewhat convoluted way to trick dialyzer - Pat = erlang:make_tuple(record_info(size, iterator_ref), '_', [ - {1, iterator_ref}, - {#iterator_ref.ref_id, {DSSessionId, '_'}} + Pat = erlang:make_tuple(record_info(size, ds_sub), '_', [ + {1, ds_sub}, + {#ds_sub.id, {DSSessionId, '_'}} ]), - mnesia:match_object(?ITERATOR_REF_TAB, Pat, read). + mnesia:match_object(?SESSION_SUBSCRIPTIONS_TAB, Pat, read). --spec new_iterator_id(id()) -> {iterator_id(), emqx_ds:time()}. -new_iterator_id(DSSessionId) -> +-spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), emqx_ds:time()}. +new_subscription_id(DSSessionId, TopicFilter) -> NowMS = erlang:system_time(microsecond), - IteratorId = <>, - {IteratorId, NowMS}. + DSSubId = {DSSessionId, TopicFilter}, + {DSSubId, NowMS}. %%-------------------------------------------------------------------------------- @@ -593,19 +576,20 @@ transaction(Fun) -> %%-------------------------------------------------------------------------------- -export_iterators(IteratorRefs) -> +export_subscriptions(DSSubs) -> lists:foldl( - fun(IteratorRef = #iterator_ref{ref_id = {_DSSessionId, TopicFilter}}, Acc) -> - Acc#{TopicFilter => export_record(IteratorRef)} + fun(DSSub = #ds_sub{id = {_DSSessionId, TopicFilter}}, Acc) -> + Acc#{TopicFilter => export_subscription(DSSub)} end, #{}, - IteratorRefs + DSSubs ). -export_record(#session{} = Record) -> - export_record(Record, #session.id, [id, created_at, expires_at, props], #{}); -export_record(#iterator_ref{} = Record) -> - export_record(Record, #iterator_ref.it_id, [id, start_time, props], #{}). +export_session(#session{} = Record) -> + export_record(Record, #session.id, [id, created_at, expires_at, props], #{}). + +export_subscription(#ds_sub{} = Record) -> + export_record(Record, #ds_sub.start_time, [start_time, props, extra], #{}). export_record(Record, I, [Field | Rest], Acc) -> export_record(Record, I + 1, Rest, Acc#{Field => element(I, Record)}); diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 2d8768e65..32e59a114 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -29,6 +29,7 @@ -define(DEFAULT_KEYSPACE, default). -define(DS_SHARD_ID, <<"local">>). -define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). +-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). all() -> emqx_common_test_helpers:all(?MODULE). @@ -62,6 +63,7 @@ end_per_testcase(t_session_subscription_iterators, Config) -> end_per_testcase(_TestCase, Config) -> Apps = ?config(apps, Config), emqx_common_test_helpers:call_janitor(60_000), + clear_db(), emqx_cth_suite:stop(Apps), ok. @@ -96,7 +98,7 @@ t_messages_persisted(_Config) -> ct:pal("Results = ~p", [Results]), - Persisted = consume(?DS_SHARD, {['#'], 0}), + Persisted = consume(['#'], 0), ct:pal("Persisted = ~p", [Persisted]), @@ -139,7 +141,7 @@ t_messages_persisted_2(_Config) -> {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = emqtt:publish(CP, T(<<"client/2/topic">>), <<"8">>, 1), - Persisted = consume(?DS_SHARD, {['#'], 0}), + Persisted = consume(['#'], 0), ct:pal("Persisted = ~p", [Persisted]), @@ -155,7 +157,7 @@ t_messages_persisted_2(_Config) -> %% TODO: test quic and ws too t_session_subscription_iterators(Config) -> - [Node1, Node2] = ?config(nodes, Config), + [Node1, _Node2] = ?config(nodes, Config), Port = get_mqtt_port(Node1, tcp), Topic = <<"t/topic">>, SubTopicFilter = <<"t/+">>, @@ -202,11 +204,8 @@ t_session_subscription_iterators(Config) -> messages => [Message1, Message2, Message3, Message4] } end, - fun(Results, Trace) -> + fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - #{ - messages := [_Message1, Message2, Message3 | _] - } = Results, case ?of_kind(ds_session_subscription_added, Trace) of [] -> %% Since `emqx_durable_storage' is a dependency of `emqx', it gets @@ -228,17 +227,6 @@ t_session_subscription_iterators(Config) -> ), ok end, - ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), - {ok, [IteratorId]} = get_all_iterator_ids(Node1), - ?assertMatch({ok, [IteratorId]}, get_all_iterator_ids(Node2)), - ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end), - ExpectedMessages = [Message2, Message3], - %% Note: it is expected that this will break after replayers are in place. - %% They might have consumed all the messages by this time. - ?assertEqual(ExpectedMessages, ReplayMessages1), - %% Different DS shard - ReplayMessages2 = erpc:call(Node2, fun() -> consume(?DS_SHARD, IteratorId) end), - ?assertEqual([], ReplayMessages2), ok end ), @@ -263,33 +251,21 @@ connect(Opts0 = #{}) -> {ok, _} = emqtt:connect(Client), Client. -consume(Shard, Replay = {_TopicFiler, _StartMS}) -> - {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay), - consume(It); -consume(Shard, IteratorId) when is_binary(IteratorId) -> - {ok, It} = emqx_ds_storage_layer:restore_iterator(Shard, IteratorId), +consume(TopicFiler, StartMS) -> + [{_, Stream}] = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFiler, StartMS), + {ok, It} = emqx_ds:make_iterator(Stream, StartMS), consume(It). consume(It) -> - case emqx_ds_storage_layer:next(It) of - {ok, NIt, [Msg]} -> - [emqx_persistent_message:deserialize(Msg) | consume(NIt)]; - end_of_stream -> + case emqx_ds:next(It, 100) of + {ok, _NIt, _Msgs = []} -> + []; + {ok, NIt, Msgs} -> + Msgs ++ consume(NIt); + {ok, end_of_stream} -> [] end. -delete_all_messages() -> - Persisted = consume(?DS_SHARD, {['#'], 0}), - lists:foreach( - fun(Msg) -> - GUID = emqx_message:id(Msg), - Topic = emqx_topic:words(emqx_message:topic(Msg)), - Timestamp = emqx_guid:timestamp(GUID), - ok = emqx_ds_storage_layer:delete(?DS_SHARD, GUID, Timestamp, Topic) - end, - Persisted - ). - receive_messages(Count) -> receive_messages(Count, []). @@ -306,13 +282,6 @@ receive_messages(Count, Msgs) -> publish(Node, Message) -> erpc:call(Node, emqx, publish, [Message]). -get_iterator_ids(Node, ClientId) -> - Channel = erpc:call(Node, fun() -> - [ConnPid] = emqx_cm:lookup_channels(ClientId), - sys:get_state(ConnPid) - end), - emqx_connection:info({channel, {session, iterators}}, Channel). - app_specs() -> [ emqx_durable_storage, @@ -330,5 +299,6 @@ get_mqtt_port(Node, Type) -> {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), Port. -get_all_iterator_ids(Node) -> - erpc:call(Node, emqx_ds_storage_layer, list_iterator_prefix, [?DS_SHARD, <<>>]). +clear_db() -> + ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), + ok. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index b43604469..a28c9de52 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -48,19 +48,19 @@ %% level. %% %% TODO: currently the stream is hardwired to only support the -%% internal rocksdb storage. In t he future we want to add another +%% internal rocksdb storage. In the future we want to add another %% implementations for emqx_ds, so this type has to take this into %% account. -record(stream, { shard :: emqx_ds_replication_layer:shard_id(), - enc :: emqx_ds_replication_layer:stream() + enc :: emqx_ds_storage_layer:stream() }). --opaque stream() :: stream(). +-opaque stream() :: #stream{}. -record(iterator, { shard :: emqx_ds_replication_layer:shard_id(), - enc :: enqx_ds_replication_layer:iterator() + enc :: enqx_ds_storage_layer:iterator() }). -opaque iterator() :: #iterator{}. @@ -154,7 +154,7 @@ next(Iter0, BatchSize) -> %% messages on the receiving node, hence saving some network. %% %% This kind of trickery should be probably done here in the - %% replication layer. Or, perhaps, in the logic lary. + %% replication layer. Or, perhaps, in the logic layer. case emqx_ds_proto_v1:next(Node, Shard, StorageIter0, BatchSize) of {ok, StorageIter, Batch} -> Iter = #iterator{shard = Shard, enc = StorageIter},