feat(sessds): Add API for getting session data from the cold storage

This commit is contained in:
ieQu1 2024-04-16 23:38:01 +02:00
parent 38a2e8add9
commit 124c5047d0
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
3 changed files with 54 additions and 1 deletions

View File

@ -75,7 +75,8 @@
%% Managment APIs: %% Managment APIs:
-export([ -export([
list_client_subscriptions/1 list_client_subscriptions/1,
get_client_subscription/2
]). ]).
%% session table operations %% session table operations
@ -736,6 +737,11 @@ list_client_subscriptions(ClientId) ->
{error, not_found} {error, not_found}
end. end.
-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) ->
subscription() | undefined.
get_client_subscription(ClientId, Topic) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Session tables operations %% Session tables operations
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -22,6 +22,9 @@
%% It is responsible for saving, caching, and restoring session state. %% It is responsible for saving, caching, and restoring session state.
%% It is completely devoid of business logic. Not even the default %% It is completely devoid of business logic. Not even the default
%% values should be set in this module. %% values should be set in this module.
%%
%% Session process MUST NOT use `cold_*' functions! They are reserved
%% for use in the management APIs.
-module(emqx_persistent_session_ds_state). -module(emqx_persistent_session_ds_state).
-export([create_tables/0]). -export([create_tables/0]).
@ -40,12 +43,14 @@
-export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
-export([ -export([
get_subscription_state/2, get_subscription_state/2,
cold_get_subscription_state/2,
fold_subscription_states/3, fold_subscription_states/3,
put_subscription_state/3, put_subscription_state/3,
del_subscription_state/2 del_subscription_state/2
]). ]).
-export([ -export([
get_subscription/2, get_subscription/2,
cold_get_subscription/2,
fold_subscriptions/3, fold_subscriptions/3,
n_subscriptions/1, n_subscriptions/1,
put_subscription/3, put_subscription/3,
@ -383,6 +388,11 @@ new_id(Rec) ->
get_subscription(TopicFilter, Rec) -> get_subscription(TopicFilter, Rec) ->
gen_get(?subscriptions, TopicFilter, Rec). gen_get(?subscriptions, TopicFilter, Rec).
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
[emqx_persistent_session_ds_subs:subscription()].
cold_get_subscription(SessionId, Topic) ->
kv_pmap_read(?subscription_tab, SessionId, Topic).
-spec fold_subscriptions(fun(), Acc, t()) -> Acc. -spec fold_subscriptions(fun(), Acc, t()) -> Acc.
fold_subscriptions(Fun, Acc, Rec) -> fold_subscriptions(Fun, Acc, Rec) ->
gen_fold(?subscriptions, Fun, Acc, Rec). gen_fold(?subscriptions, Fun, Acc, Rec).
@ -410,6 +420,13 @@ del_subscription(TopicFilter, Rec) ->
get_subscription_state(SStateId, Rec) -> get_subscription_state(SStateId, Rec) ->
gen_get(?subscription_states, SStateId, Rec). gen_get(?subscription_states, SStateId, Rec).
-spec cold_get_subscription_state(
emqx_persistent_session_ds:id(), emqx_persistent_session_ds_subs:subscription_state_id()
) ->
[emqx_persistent_session_ds_subs:subscription_state()].
cold_get_subscription_state(SessionId, SStateId) ->
kv_pmap_read(?subscription_states_tab, SessionId, SStateId).
-spec fold_subscription_states(fun(), Acc, t()) -> Acc. -spec fold_subscription_states(fun(), Acc, t()) -> Acc.
fold_subscription_states(Fun, Acc, Rec) -> fold_subscription_states(Fun, Acc, Rec) ->
gen_fold(?subscription_states, Fun, Acc, Rec). gen_fold(?subscription_states, Fun, Acc, Rec).
@ -675,6 +692,14 @@ kv_pmap_persist(Tab, SessionId, Key, Val0) ->
Val = encoder(encode, Tab, Val0), Val = encoder(encode, Tab, Val0),
mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write). mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write).
kv_pmap_read(Table, SessionId, Key) ->
lists:map(
fun(#kv{v = Val}) ->
encoder(decode, Table, Val)
end,
mnesia:dirty_read(Table, {SessionId, Key})
).
kv_pmap_restore(Table, SessionId) -> kv_pmap_restore(Table, SessionId) ->
MS = [{#kv{k = {SessionId, '$1'}, v = '$2'}, [], [{{'$1', '$2'}}]}], MS = [{#kv{k = {SessionId, '$1'}, v = '$2'}, [], [{{'$1', '$2'}}]}],
Objs = mnesia:select(Table, MS, read), Objs = mnesia:select(Table, MS, read),

View File

@ -33,6 +33,11 @@
fold/3 fold/3
]). ]).
%% Management API:
-export([
cold_get_subscription/2
]).
-export_type([subscription_state_id/0, subscription/0, subscription_state/0]). -export_type([subscription_state_id/0, subscription/0, subscription_state/0]).
-include("emqx_persistent_session_ds.hrl"). -include("emqx_persistent_session_ds.hrl").
@ -206,6 +211,23 @@ to_map(S) ->
fold(Fun, Acc, S) -> fold(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S). emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S).
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
emqx_persistent_session_ds:subscription() | undefined.
cold_get_subscription(SessionId, Topic) ->
case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, Topic) of
[Sub = #{current_state := SStateId}] ->
case
emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId)
of
[#{subopts := Subopts}] ->
Sub#{subopts => Subopts};
_ ->
undefined
end;
_ ->
undefined
end.
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================