diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl index 95f6ee375..aa520b252 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_state.erl @@ -113,7 +113,7 @@ -type pmap(K, V) :: #pmap{ table :: atom(), - cache :: #{K => V}, + cache :: #{K => V} | gb_trees:tree(K, V), dirty :: #{K => dirty | del} }. @@ -619,7 +619,7 @@ update_pmaps(Fun, Map) -> %% This functtion should be ran in a transaction. -spec pmap_open(atom(), emqx_persistent_session_ds:id()) -> pmap(_K, _V). pmap_open(Table, SessionId) -> - Clean = maps:from_list(kv_pmap_restore(Table, SessionId)), + Clean = cache_from_list(Table, kv_pmap_restore(Table, SessionId)), #pmap{ table = Table, cache = Clean, @@ -627,29 +627,29 @@ pmap_open(Table, SessionId) -> }. -spec pmap_get(K, pmap(K, V)) -> V | undefined. -pmap_get(K, #pmap{cache = Cache}) -> - maps:get(K, Cache, undefined). +pmap_get(K, #pmap{table = Table, cache = Cache}) -> + cache_get(Table, K, Cache). -spec pmap_put(K, V, pmap(K, V)) -> pmap(K, V). -pmap_put(K, V, Pmap = #pmap{dirty = Dirty, cache = Cache}) -> +pmap_put(K, V, Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache}) -> Pmap#pmap{ - cache = maps:put(K, V, Cache), + cache = cache_put(Table, K, V, Cache), dirty = Dirty#{K => dirty} }. -spec pmap_del(K, pmap(K, V)) -> pmap(K, V). pmap_del( Key, - Pmap = #pmap{dirty = Dirty, cache = Cache} + Pmap = #pmap{table = Table, dirty = Dirty, cache = Cache} ) -> Pmap#pmap{ - cache = maps:remove(Key, Cache), + cache = cache_remove(Table, Key, Cache), dirty = Dirty#{Key => del} }. -spec pmap_fold(fun((K, V, A) -> A), A, pmap(K, V)) -> A. -pmap_fold(Fun, Acc, #pmap{cache = Cache}) -> - maps:fold(Fun, Acc, Cache). +pmap_fold(Fun, Acc, #pmap{table = Table, cache = Cache}) -> + cache_fold(Table, Fun, Acc, Cache). -spec pmap_commit(emqx_persistent_session_ds:id(), pmap(K, V)) -> pmap(K, V). pmap_commit( @@ -660,7 +660,7 @@ pmap_commit( (K, del) -> kv_pmap_delete(Tab, SessionId, K); (K, dirty) -> - V = maps:get(K, Cache), + V = cache_get(Tab, K, Cache), kv_pmap_persist(Tab, SessionId, K, V) end, Dirty @@ -670,13 +670,110 @@ pmap_commit( }. -spec pmap_format(pmap(_K, _V)) -> map(). -pmap_format(#pmap{cache = Cache}) -> - Cache. +pmap_format(#pmap{table = Table, cache = Cache}) -> + cache_format(Table, Cache). -spec pmap_size(pmap(_K, _V)) -> non_neg_integer(). -pmap_size(#pmap{cache = Cache}) -> +pmap_size(#pmap{table = Table, cache = Cache}) -> + cache_size(Table, Cache). + +pmap_iter_after(After, #pmap{table = Table, cache = Cache}) -> + %% NOTE: Only valid for gbt-backed PMAPs. + gbt = cache_data_type(Table), + gbt_iter_after(After, Cache). + +%% + +cache_data_type(?stream_tab) -> gbt; +cache_data_type(_Table) -> map. + +cache_from_list(?stream_tab, L) -> + gbt_from_list(L); +cache_from_list(_Table, L) -> + maps:from_list(L). + +cache_get(?stream_tab, K, Cache) -> + gbt_get(K, Cache, undefined); +cache_get(_Table, K, Cache) -> + maps:get(K, Cache, undefined). + +cache_put(?stream_tab, K, V, Cache) -> + gbt_put(K, V, Cache); +cache_put(_Table, K, V, Cache) -> + maps:put(K, V, Cache). + +cache_remove(?stream_tab, K, Cache) -> + gbt_remove(K, Cache); +cache_remove(_Table, K, Cache) -> + maps:remove(K, Cache). + +cache_fold(?stream_tab, Fun, Acc, Cache) -> + gbt_fold(Fun, Acc, Cache); +cache_fold(_Table, Fun, Acc, Cache) -> + maps:fold(Fun, Acc, Cache). + +cache_format(?stream_tab, Cache) -> + gbt_format(Cache); +cache_format(_Table, Cache) -> + Cache. + +cache_size(?stream_tab, Cache) -> + gbt_size(Cache); +cache_size(_Table, Cache) -> maps:size(Cache). +%% PMAP Cache implementation backed by `gb_trees'. +%% Supports iteration starting from specific key. + +gbt_from_list(L) -> + lists:foldl( + fun({K, V}, Acc) -> gb_trees:insert(K, V, Acc) end, + gb_trees:empty(), + L + ). + +gbt_get(K, Cache, undefined) -> + case gb_trees:lookup(K, Cache) of + none -> undefined; + {_, V} -> V + end. + +gbt_put(K, V, Cache) -> + gb_trees:enter(K, V, Cache). + +gbt_remove(K, Cache) -> + gb_trees:delete(K, Cache). + +gbt_format(Cache) -> + gb_trees:to_list(Cache). + +gbt_fold(Fun, Acc, Cache) -> + It = gb_trees:iterator(Cache), + gbt_fold_iter(Fun, Acc, It). + +gbt_fold_iter(Fun, Acc, It0) -> + case gb_trees:next(It0) of + {K, V, It} -> + gbt_fold_iter(Fun, Fun(K, V, Acc), It); + _ -> + Acc + end. + +gbt_size(Cache) -> + gb_trees:size(Cache). + +gbt_iter_after(After, Cache) -> + It0 = gb_trees:iterator_from(After, Cache), + case gb_trees:next(It0) of + {After, _, It} -> + It; + _ -> + It0 + end. + +gbt_iter_next(It) -> + gb_trees:next(It). + %% Functions dealing with set tables: kv_persist(Tab, SessionId, Val0) ->