refactor: rm iterators from DS `#session{}` record
This commit is contained in:
parent
e4e88ebf36
commit
021755b82b
|
@ -49,9 +49,7 @@
|
|||
%% Awaiting PUBREL Timeout (Unit: millisecond)
|
||||
await_rel_timeout :: timeout(),
|
||||
%% Created at
|
||||
created_at :: pos_integer(),
|
||||
%% Durable storage iterators for existing subscriptions
|
||||
iterators = [] :: [emqx_ds_replay:replay_id()]
|
||||
created_at :: pos_integer()
|
||||
}).
|
||||
|
||||
-endif.
|
||||
|
|
|
@ -85,7 +85,7 @@ open_session(ClientID) ->
|
|||
?WHEN_ENABLED(emqx_ds:session_open(ClientID)).
|
||||
|
||||
-spec add_subscription(emqx_types:topic(), emqx_ds:session_id()) ->
|
||||
{ok, emqx_ds:iterator_id(), _IsNew :: boolean()} | {skipped, disabled}.
|
||||
{ok, emqx_ds:iterator_id(), IsNew :: boolean()} | {skipped, disabled}.
|
||||
add_subscription(TopicFilterBin, DSSessionID) ->
|
||||
?WHEN_ENABLED(
|
||||
begin
|
||||
|
|
|
@ -169,14 +169,8 @@
|
|||
-spec init_and_open(emqx_types:clientid(), options()) -> session().
|
||||
init_and_open(ClientID, Options) ->
|
||||
Session0 = emqx_session:init(Options),
|
||||
IteratorIDs =
|
||||
case emqx_persistent_session_ds:open_session(ClientID) of
|
||||
{skipped, disabled} ->
|
||||
[];
|
||||
{_IsNew, _DSSessionID, Iterators0} ->
|
||||
Iterators0
|
||||
end,
|
||||
Session0#session{iterators = IteratorIDs}.
|
||||
_ = emqx_persistent_session_ds:open_session(ClientID),
|
||||
Session0.
|
||||
|
||||
-spec init(options()) -> session().
|
||||
init(Opts) ->
|
||||
|
@ -274,9 +268,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
|
|||
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
|
||||
Timeout;
|
||||
info(created_at, #session{created_at = CreatedAt}) ->
|
||||
CreatedAt;
|
||||
info(iterators, #session{iterators = IteratorIds}) ->
|
||||
IteratorIds.
|
||||
CreatedAt.
|
||||
|
||||
%% @doc Get stats of the session.
|
||||
-spec stats(session()) -> emqx_types:stats().
|
||||
|
@ -325,15 +317,8 @@ is_subscriptions_full(#session{
|
|||
-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) ->
|
||||
session().
|
||||
add_persistent_subscription(TopicFilterBin, ClientId, Session) ->
|
||||
case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of
|
||||
{skipped, disabled} ->
|
||||
Session;
|
||||
{ok, IteratorID, _IsNew = true} ->
|
||||
Iterators = Session#session.iterators,
|
||||
Session#session{iterators = [IteratorID | Iterators]};
|
||||
{ok, _IteratorID, _IsNew = false} ->
|
||||
Session
|
||||
end.
|
||||
_ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId),
|
||||
Session.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Client -> Broker: UNSUBSCRIBE
|
||||
|
|
|
@ -148,18 +148,15 @@ t_session_subscription_iterators(Config) ->
|
|||
ct:pal("publishing 4"),
|
||||
Message4 = emqx_message:make(AnotherTopic, Payload4),
|
||||
publish(Node1, Message4),
|
||||
IteratorIds = get_iterator_ids(Node1, ClientId),
|
||||
emqtt:stop(Client),
|
||||
#{
|
||||
messages => [Message1, Message2, Message3, Message4],
|
||||
iterator_ids => IteratorIds
|
||||
messages => [Message1, Message2, Message3, Message4]
|
||||
}
|
||||
end,
|
||||
fun(Results, Trace) ->
|
||||
ct:pal("trace:\n ~p", [Trace]),
|
||||
#{
|
||||
messages := [_Message1, Message2, Message3 | _],
|
||||
iterator_ids := IteratorIds
|
||||
messages := [_Message1, Message2, Message3 | _]
|
||||
} = Results,
|
||||
case ?of_kind(ds_session_subscription_added, Trace) of
|
||||
[] ->
|
||||
|
@ -182,10 +179,9 @@ t_session_subscription_iterators(Config) ->
|
|||
),
|
||||
ok
|
||||
end,
|
||||
?assertMatch([_], IteratorIds),
|
||||
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
|
||||
?assertMatch({ok, [_]}, get_all_iterator_ids(Node2)),
|
||||
[IteratorId] = IteratorIds,
|
||||
{ok, [IteratorId]} = get_all_iterator_ids(Node1),
|
||||
?assertMatch({ok, [IteratorId]}, get_all_iterator_ids(Node2)),
|
||||
ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end),
|
||||
ExpectedMessages = [Message2, Message3],
|
||||
?assertEqual(ExpectedMessages, ReplayMessages1),
|
||||
|
@ -284,7 +280,4 @@ get_mqtt_port(Node, Type) ->
|
|||
Port.
|
||||
|
||||
get_all_iterator_ids(Node) ->
|
||||
Fn = fun(K, _V, Acc) -> [K | Acc] end,
|
||||
erpc:call(Node, fun() ->
|
||||
emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
|
||||
end).
|
||||
erpc:call(Node, emqx_ds_storage_layer, list_iterator_prefix, [?DS_SHARD, <<>>]).
|
||||
|
|
|
@ -147,8 +147,7 @@ sessioninfo() ->
|
|||
awaiting_rel = awaiting_rel(),
|
||||
max_awaiting_rel = non_neg_integer(),
|
||||
await_rel_timeout = safty_timeout(),
|
||||
created_at = timestamp(),
|
||||
iterators = []
|
||||
created_at = timestamp()
|
||||
},
|
||||
emqx_session:info(Session)
|
||||
).
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ds).
|
||||
|
||||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
%% API:
|
||||
|
@ -124,17 +125,15 @@ message_stats() ->
|
|||
%%
|
||||
%% Note: session API doesn't handle session takeovers, it's the job of
|
||||
%% the broker.
|
||||
-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id(), [iterator_id()]}.
|
||||
-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}.
|
||||
session_open(ClientID) ->
|
||||
case mnesia:dirty_read(?SESSION_TAB, ClientID) of
|
||||
[#session{iterators = Iterators}] ->
|
||||
IteratorIDs = maps:values(Iterators),
|
||||
{false, ClientID, IteratorIDs};
|
||||
[#session{}] ->
|
||||
{false, ClientID};
|
||||
[] ->
|
||||
Iterators = #{},
|
||||
Session = #session{id = ClientID, iterators = Iterators},
|
||||
Session = #session{id = ClientID},
|
||||
mria:dirty_write(?SESSION_TAB, Session),
|
||||
{true, ClientID, _IteratorIDs = []}
|
||||
{true, ClientID}
|
||||
end.
|
||||
|
||||
%% @doc Called when a client reconnects with `clean session=true' or
|
||||
|
|
|
@ -21,8 +21,10 @@
|
|||
-define(DS_SHARD, emqx_ds_shard).
|
||||
|
||||
-record(session, {
|
||||
%% same as clientid
|
||||
id :: emqx_ds:session_id(),
|
||||
iterators :: #{emqx_topic:words() => emqx_ds:iterator_id()}
|
||||
%% for future usage
|
||||
props = #{} :: map()
|
||||
}).
|
||||
|
||||
-record(iterator_ref, {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
discard_iterator/2,
|
||||
is_iterator_present/2,
|
||||
discard_iterator_prefix/2,
|
||||
list_iterator_prefix/2,
|
||||
foldl_iterator_prefix/4
|
||||
]).
|
||||
|
||||
|
@ -203,7 +204,17 @@ discard_iterator(Shard, ReplayID) ->
|
|||
-spec discard_iterator_prefix(emqx_ds:shard(), binary()) ->
|
||||
ok | {error, _TODO}.
|
||||
discard_iterator_prefix(Shard, KeyPrefix) ->
|
||||
do_discard_iterator_prefix(Shard, KeyPrefix).
|
||||
case do_discard_iterator_prefix(Shard, KeyPrefix) of
|
||||
{ok, _} -> ok;
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
-spec list_iterator_prefix(
|
||||
emqx_ds:shard(),
|
||||
binary()
|
||||
) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}.
|
||||
list_iterator_prefix(Shard, KeyPrefix) ->
|
||||
do_list_iterator_prefix(Shard, KeyPrefix).
|
||||
|
||||
-spec foldl_iterator_prefix(
|
||||
emqx_ds:shard(),
|
||||
|
@ -377,7 +388,11 @@ open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay},
|
|||
|
||||
%%
|
||||
|
||||
-define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>).
|
||||
-define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>).
|
||||
-define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin
|
||||
<<IteratorId:(size(KeyReplayState) - 2)/binary, "rs">> = (KeyReplayState),
|
||||
IteratorId
|
||||
end).
|
||||
|
||||
-define(ITERATION_WRITE_OPTS, []).
|
||||
-define(ITERATION_READ_OPTS, []).
|
||||
|
@ -424,6 +439,13 @@ restore_iterator_state(
|
|||
It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}},
|
||||
open_restore_iterator(meta_get_gen(Shard, Gen), It, State).
|
||||
|
||||
do_list_iterator_prefix(Shard, KeyPrefix) ->
|
||||
Fn = fun(K0, _V, Acc) ->
|
||||
K = ?KEY_REPLAY_STATE_PAT(K0),
|
||||
[K | Acc]
|
||||
end,
|
||||
do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []).
|
||||
|
||||
do_discard_iterator_prefix(Shard, KeyPrefix) ->
|
||||
#db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db),
|
||||
Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end,
|
||||
|
|
|
@ -169,7 +169,7 @@ t_session_subscription_idempotency(Config) ->
|
|||
%% Exactly one iterator should have been opened.
|
||||
?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)),
|
||||
?assertMatch(
|
||||
{_IsNew = false, ClientId, _},
|
||||
{_IsNew = false, ClientId},
|
||||
erpc:call(Node1, emqx_ds, session_open, [ClientId])
|
||||
),
|
||||
ok
|
||||
|
|
Loading…
Reference in New Issue