feat(ds): Implement a function for dumping persistent session state

This commit is contained in:
ieQu1 2023-11-24 04:18:29 +01:00
parent 8dfcb69e52
commit e616e0746a
1 changed files with 39 additions and 9 deletions

View File

@ -70,6 +70,8 @@
do_ensure_all_iterators_closed/1 do_ensure_all_iterators_closed/1
]). ]).
-export([print_session/1]).
-ifdef(TEST). -ifdef(TEST).
-export([ -export([
session_open/1, session_open/1,
@ -226,6 +228,25 @@ info(await_rel_timeout, #{props := Conf}) ->
stats(Session) -> stats(Session) ->
info(?STATS_KEYS, Session). info(?STATS_KEYS, Session).
%% Debug/troubleshooting
-spec print_session(emqx_types:client_id()) -> map() | undefined.
print_session(ClientId) ->
catch ro_transaction(
fun() ->
case mnesia:read(?SESSION_TAB, ClientId) of
[Session] ->
#{
session => Session,
streams => mnesia:read(?SESSION_STREAM_TAB, ClientId),
pubranges => session_read_pubranges(ClientId),
subscriptions => session_read_subscriptions(ClientId)
};
[] ->
undefined
end
end
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE %% Client -> Broker: SUBSCRIBE / UNSUBSCRIBE
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -563,7 +584,7 @@ session_drop(DSSessionId) ->
-spec session_drop_subscriptions(id()) -> ok. -spec session_drop_subscriptions(id()) -> ok.
session_drop_subscriptions(DSSessionId) -> session_drop_subscriptions(DSSessionId) ->
Subscriptions = session_read_subscriptions(DSSessionId), Subscriptions = session_read_subscriptions(DSSessionId, write),
lists:foreach( lists:foreach(
fun(#ds_sub{id = DSSubId} = DSSub) -> fun(#ds_sub{id = DSSubId} = DSSub) ->
TopicFilter = subscription_id_to_topic_filter(DSSubId), TopicFilter = subscription_id_to_topic_filter(DSSubId),
@ -626,13 +647,27 @@ session_del_subscription(DSSessionId, TopicFilter) ->
session_del_subscription(#ds_sub{id = DSSubId}) -> session_del_subscription(#ds_sub{id = DSSubId}) ->
mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write). mnesia:delete(?SESSION_SUBSCRIPTIONS_TAB, DSSubId, write).
session_read_subscriptions(DSSessionId) -> session_read_subscriptions(DSSessionID) ->
session_read_subscriptions(DSSessionID, read).
session_read_subscriptions(DSSessionId, LockKind) ->
MS = ets:fun2ms( MS = ets:fun2ms(
fun(Sub = #ds_sub{id = {Sess, _}}) when Sess =:= DSSessionId -> fun(Sub = #ds_sub{id = {Sess, _}}) when Sess =:= DSSessionId ->
Sub Sub
end end
), ),
mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read). mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, LockKind).
session_read_pubranges(DSSessionID) ->
session_read_pubranges(DSSessionID, read).
session_read_pubranges(DSSessionId, LockKind) ->
MS = ets:fun2ms(
fun(#ds_pubrange{id = {Sess, First}}) when Sess =:= DSSessionId ->
{DSSessionId, First}
end
),
mnesia:select(?SESSION_PUBRANGE_TAB, MS, LockKind).
-spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}. -spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}.
new_subscription_id(DSSessionId, TopicFilter) -> new_subscription_id(DSSessionId, TopicFilter) ->
@ -735,12 +770,7 @@ session_drop_streams(DSSessionId) ->
%% must be called inside a transaction %% must be called inside a transaction
-spec session_drop_pubranges(id()) -> ok. -spec session_drop_pubranges(id()) -> ok.
session_drop_pubranges(DSSessionId) -> session_drop_pubranges(DSSessionId) ->
MS = ets:fun2ms( RangeIds = session_read_pubranges(DSSessionId, write),
fun(#ds_pubrange{id = {DSSessionId0, First}}) when DSSessionId0 =:= DSSessionId ->
{DSSessionId, First}
end
),
RangeIds = mnesia:select(?SESSION_PUBRANGE_TAB, MS, write),
lists:foreach( lists:foreach(
fun(RangeId) -> fun(RangeId) ->
mnesia:delete(?SESSION_PUBRANGE_TAB, RangeId, write) mnesia:delete(?SESSION_PUBRANGE_TAB, RangeId, write)