diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 908e71bb5..b8c853431 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -75,7 +75,8 @@ %% Managment APIs: -export([ - list_client_subscriptions/1 + list_client_subscriptions/1, + get_client_subscription/2 ]). %% session table operations @@ -736,6 +737,11 @@ list_client_subscriptions(ClientId) -> {error, not_found} end. +-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) -> + subscription() | undefined. +get_client_subscription(ClientId, Topic) -> + emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic). + %%-------------------------------------------------------------------- %% Session tables operations %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index bc603647a..9efffc7ff 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -22,6 +22,9 @@ %% It is responsible for saving, caching, and restoring session state. %% It is completely devoid of business logic. Not even the default %% values should be set in this module. +%% +%% Session process MUST NOT use `cold_*' functions! They are reserved +%% for use in the management APIs. -module(emqx_persistent_session_ds_state). -export([create_tables/0]). @@ -40,12 +43,14 @@ -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). -export([ get_subscription_state/2, + cold_get_subscription_state/2, fold_subscription_states/3, put_subscription_state/3, del_subscription_state/2 ]). -export([ get_subscription/2, + cold_get_subscription/2, fold_subscriptions/3, n_subscriptions/1, put_subscription/3, @@ -383,6 +388,11 @@ new_id(Rec) -> get_subscription(TopicFilter, Rec) -> gen_get(?subscriptions, TopicFilter, Rec). +-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> + [emqx_persistent_session_ds_subs:subscription()]. +cold_get_subscription(SessionId, Topic) -> + kv_pmap_read(?subscription_tab, SessionId, Topic). + -spec fold_subscriptions(fun(), Acc, t()) -> Acc. fold_subscriptions(Fun, Acc, Rec) -> gen_fold(?subscriptions, Fun, Acc, Rec). @@ -410,6 +420,13 @@ del_subscription(TopicFilter, Rec) -> get_subscription_state(SStateId, Rec) -> gen_get(?subscription_states, SStateId, Rec). +-spec cold_get_subscription_state( + emqx_persistent_session_ds:id(), emqx_persistent_session_ds_subs:subscription_state_id() +) -> + [emqx_persistent_session_ds_subs:subscription_state()]. +cold_get_subscription_state(SessionId, SStateId) -> + kv_pmap_read(?subscription_states_tab, SessionId, SStateId). + -spec fold_subscription_states(fun(), Acc, t()) -> Acc. fold_subscription_states(Fun, Acc, Rec) -> gen_fold(?subscription_states, Fun, Acc, Rec). @@ -675,6 +692,14 @@ kv_pmap_persist(Tab, SessionId, Key, Val0) -> Val = encoder(encode, Tab, Val0), mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write). +kv_pmap_read(Table, SessionId, Key) -> + lists:map( + fun(#kv{v = Val}) -> + encoder(decode, Table, Val) + end, + mnesia:dirty_read(Table, {SessionId, Key}) + ). + kv_pmap_restore(Table, SessionId) -> MS = [{#kv{k = {SessionId, '$1'}, v = '$2'}, [], [{{'$1', '$2'}}]}], Objs = mnesia:select(Table, MS, read), diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index 1993370ed..99ad9f9fc 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -33,6 +33,11 @@ fold/3 ]). +%% Management API: +-export([ + cold_get_subscription/2 +]). + -export_type([subscription_state_id/0, subscription/0, subscription_state/0]). -include("emqx_persistent_session_ds.hrl"). @@ -206,6 +211,23 @@ to_map(S) -> fold(Fun, Acc, S) -> emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S). +-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) -> + emqx_persistent_session_ds:subscription() | undefined. +cold_get_subscription(SessionId, Topic) -> + case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, Topic) of + [Sub = #{current_state := SStateId}] -> + case + emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId) + of + [#{subopts := Subopts}] -> + Sub#{subopts => Subopts}; + _ -> + undefined + end; + _ -> + undefined + end. + %%================================================================================ %% Internal functions %%================================================================================