feat(sessds): use trees to hold streams in session state

This commit is contained in:
Andrew Mayorov 2024-07-02 15:39:02 +02:00
parent 532f04da9d
commit dc73b957b3
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 111 additions and 14 deletions

View File

@ -113,7 +113,7 @@
-type pmap(K, V) ::
#pmap{
table :: atom(),
cache :: #{K => V},
cache :: #{K => V} | gb_trees:tree(K, V),
dirty :: #{K => dirty | del}
}.
@ -619,7 +619,7 @@ update_pmaps(Fun, Map) ->
%% This functtion should be ran in a transaction.
-spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V).
pmap_open(Table, SessionId) ->
Clean = maps:from_list(kv_pmap_restore(Table, SessionId)),
Clean = cache_from_list(Table, kv_pmap_restore(Table, SessionId)),
#pmap{
table = Table,
cache = Clean,
@ -627,29 +627,29 @@ pmap_open(Table, SessionId) ->
}.
-spec pmap_get(K, pmap(K, V)) -> V | undefined.
pmap_get(K, #pmap{cache = Cache}) ->
maps:get(K, Cache, undefined).
pmap_get(K, #pmap{table = Table, cache = Cache}) ->
cache_get(Table, K, Cache).
-spec pmap_put(K, V, pmap(K, V)) -> pmap(K, V).
pmap_put(K, V, Pmap = #pmap{dirty = Dirty, cache = Cache}) ->
pmap_put(K, V, Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}) ->
Pmap#pmap{
cache = maps:put(K, V, Cache),
cache = cache_put(Table, K, V, Cache),
dirty = Dirty#{K => dirty}
}.
-spec pmap_del(K, pmap(K, V)) -> pmap(K, V).
pmap_del(
Key,
Pmap = #pmap{dirty = Dirty, cache = Cache}
Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}
) ->
Pmap#pmap{
cache = maps:remove(Key, Cache),
cache = cache_remove(Table, Key, Cache),
dirty = Dirty#{Key => del}
}.
-spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A.
pmap_fold(Fun, Acc, #pmap{cache = Cache}) ->
maps:fold(Fun, Acc, Cache).
pmap_fold(Fun, Acc, #pmap{table = Table, cache = Cache}) ->
cache_fold(Table, Fun, Acc, Cache).
-spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V).
pmap_commit(
@ -660,7 +660,7 @@ pmap_commit(
(K, del) ->
kv_pmap_delete(Tab, SessionId, K);
(K, dirty) ->
V = maps:get(K, Cache),
V = cache_get(Tab, K, Cache),
kv_pmap_persist(Tab, SessionId, K, V)
end,
Dirty
@ -670,13 +670,110 @@ pmap_commit(
}.
-spec pmap_format(pmap(_K, _V)) -> map().
pmap_format(#pmap{cache = Cache}) ->
Cache.
pmap_format(#pmap{table = Table, cache = Cache}) ->
cache_format(Table, Cache).
-spec pmap_size(pmap(_K, _V)) -> non_neg_integer().
pmap_size(#pmap{cache = Cache}) ->
pmap_size(#pmap{table = Table, cache = Cache}) ->
cache_size(Table, Cache).
pmap_iter_after(After, #pmap{table = Table, cache = Cache}) ->
%% NOTE: Only valid for gbt-backed PMAPs.
gbt = cache_data_type(Table),
gbt_iter_after(After, Cache).
%%
cache_data_type(?stream_tab) -> gbt;
cache_data_type(_Table) -> map.
cache_from_list(?stream_tab, L) ->
gbt_from_list(L);
cache_from_list(_Table, L) ->
maps:from_list(L).
cache_get(?stream_tab, K, Cache) ->
gbt_get(K, Cache, undefined);
cache_get(_Table, K, Cache) ->
maps:get(K, Cache, undefined).
cache_put(?stream_tab, K, V, Cache) ->
gbt_put(K, V, Cache);
cache_put(_Table, K, V, Cache) ->
maps:put(K, V, Cache).
cache_remove(?stream_tab, K, Cache) ->
gbt_remove(K, Cache);
cache_remove(_Table, K, Cache) ->
maps:remove(K, Cache).
cache_fold(?stream_tab, Fun, Acc, Cache) ->
gbt_fold(Fun, Acc, Cache);
cache_fold(_Table, Fun, Acc, Cache) ->
maps:fold(Fun, Acc, Cache).
cache_format(?stream_tab, Cache) ->
gbt_format(Cache);
cache_format(_Table, Cache) ->
Cache.
cache_size(?stream_tab, Cache) ->
gbt_size(Cache);
cache_size(_Table, Cache) ->
maps:size(Cache).
%% PMAP Cache implementation backed by `gb_trees'.
%% Supports iteration starting from specific key.
gbt_from_list(L) ->
lists:foldl(
fun({K, V}, Acc) -> gb_trees:insert(K, V, Acc) end,
gb_trees:empty(),
L
).
gbt_get(K, Cache, undefined) ->
case gb_trees:lookup(K, Cache) of
none -> undefined;
{_, V} -> V
end.
gbt_put(K, V, Cache) ->
gb_trees:enter(K, V, Cache).
gbt_remove(K, Cache) ->
gb_trees:delete(K, Cache).
gbt_format(Cache) ->
gb_trees:to_list(Cache).
gbt_fold(Fun, Acc, Cache) ->
It = gb_trees:iterator(Cache),
gbt_fold_iter(Fun, Acc, It).
gbt_fold_iter(Fun, Acc, It0) ->
case gb_trees:next(It0) of
{K, V, It} ->
gbt_fold_iter(Fun, Fun(K, V, Acc), It);
_ ->
Acc
end.
gbt_size(Cache) ->
gb_trees:size(Cache).
gbt_iter_after(After, Cache) ->
It0 = gb_trees:iterator_from(After, Cache),
case gb_trees:next(It0) of
{After, _, It} ->
It;
_ ->
It0
end.
gbt_iter_next(It) ->
gb_trees:next(It).
%% Functions dealing with set tables:
kv_persist(Tab, SessionId, Val0) ->