chore(sessds): simplify subscriptions handling

There's currently no point in storing parsed topic filters in the
subscriptions table.
This commit is contained in:
Andrew Mayorov 2023-11-20 13:25:24 +07:00
parent 648b6ac63e
commit 5b40304d1f
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 28 additions and 58 deletions

View File

@ -11,12 +11,6 @@
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/src/emqx_persistent_session_ds.hrl").
-define(DEFAULT_KEYSPACE, default).
-define(DS_SHARD_ID, <<"local">>).
-define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}).
-import(emqx_common_test_helpers, [on_exit/1]). -import(emqx_common_test_helpers, [on_exit/1]).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -92,12 +86,6 @@ get_mqtt_port(Node, Type) ->
{_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]),
Port. Port.
get_all_iterator_ids(Node) ->
Fn = fun(K, _V, Acc) -> [K | Acc] end,
erpc:call(Node, fun() ->
emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, [])
end).
wait_nodeup(Node) -> wait_nodeup(Node) ->
?retry( ?retry(
_Sleep0 = 500, _Sleep0 = 500,
@ -233,9 +221,8 @@ t_session_subscription_idempotency(Config) ->
end, end,
fun(Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]), ct:pal("trace:\n ~p", [Trace]),
SubTopicFilterWords = emqx_topic:words(SubTopicFilter),
?assertMatch( ?assertMatch(
{ok, #{}, #{SubTopicFilterWords := #{}}}, #{subscriptions := #{SubTopicFilter := #{}}},
erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId])
) )
end end
@ -308,7 +295,7 @@ t_session_unsubscription_idempotency(Config) ->
fun(Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]), ct:pal("trace:\n ~p", [Trace]),
?assertMatch( ?assertMatch(
{ok, #{}, Subs = #{}} when map_size(Subs) =:= 0, #{subscriptions := Subs = #{}} when map_size(Subs) =:= 0,
erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId]) erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId])
), ),
ok ok

View File

@ -142,7 +142,7 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
%% somehow isolate those idling not-yet-expired sessions into a separate process %% somehow isolate those idling not-yet-expired sessions into a separate process
%% space, and move this call back into `emqx_cm` where it belongs. %% space, and move this call back into `emqx_cm` where it belongs.
ok = emqx_cm:discard_session(ClientID), ok = emqx_cm:discard_session(ClientID),
case open_session(ClientID) of case session_open(ClientID) of
Session0 = #{} -> Session0 = #{} ->
ensure_timers(), ensure_timers(),
ReceiveMaximum = receive_maximum(ConnInfo), ReceiveMaximum = receive_maximum(ConnInfo),
@ -153,24 +153,9 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
end. end.
ensure_session(ClientID, ConnInfo, Conf) -> ensure_session(ClientID, ConnInfo, Conf) ->
{ok, Session, #{}} = session_ensure_new(ClientID, Conf), Session = session_ensure_new(ClientID, Conf),
ReceiveMaximum = receive_maximum(ConnInfo), ReceiveMaximum = receive_maximum(ConnInfo),
Session#{iterators => #{}, receive_maximum => ReceiveMaximum}. Session#{subscriptions => #{}, receive_maximum => ReceiveMaximum}.
open_session(ClientID) ->
case session_open(ClientID) of
{ok, Session, Subscriptions} ->
Session#{iterators => prep_subscriptions(Subscriptions)};
false ->
false
end.
prep_subscriptions(Subscriptions) ->
maps:fold(
fun(Topic, Subscription, Acc) -> Acc#{emqx_topic:join(Topic) => Subscription} end,
#{},
Subscriptions
).
-spec destroy(session() | clientinfo()) -> ok. -spec destroy(session() | clientinfo()) -> ok.
destroy(#{id := ClientID}) -> destroy(#{id := ClientID}) ->
@ -392,14 +377,13 @@ disconnect(Session = #{}) ->
-spec terminate(Reason :: term(), session()) -> ok. -spec terminate(Reason :: term(), session()) -> ok.
terminate(_Reason, _Session = #{}) -> terminate(_Reason, _Session = #{}) ->
% TODO: close iterators
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec add_subscription(topic_filter(), emqx_types:subopts(), id()) -> -spec add_subscription(topic_filter(), emqx_types:subopts(), id()) ->
subscription(). subscription().
add_subscription(TopicFilterBin, SubOpts, DSSessionID) -> add_subscription(TopicFilter, SubOpts, DSSessionID) ->
%% N.B.: we chose to update the router before adding the subscription to the %% N.B.: we chose to update the router before adding the subscription to the
%% session/iterator table. The reasoning for this is as follows: %% session/iterator table. The reasoning for this is as follows:
%% %%
@ -418,8 +402,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) ->
%% since it is guarded by a transaction context: we consider a subscription %% since it is guarded by a transaction context: we consider a subscription
%% operation to be successful if it ended up changing this table. Both router %% operation to be successful if it ended up changing this table. Both router
%% and iterator information can be reconstructed from this table, if needed. %% and iterator information can be reconstructed from this table, if needed.
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilterBin, DSSessionID), ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, DSSessionID),
TopicFilter = emqx_topic:words(TopicFilterBin),
{ok, DSSubExt, IsNew} = session_add_subscription( {ok, DSSubExt, IsNew} = session_add_subscription(
DSSessionID, TopicFilter, SubOpts DSSessionID, TopicFilter, SubOpts
), ),
@ -429,8 +412,7 @@ add_subscription(TopicFilterBin, SubOpts, DSSessionID) ->
-spec update_subscription(topic_filter(), subscription(), emqx_types:subopts(), id()) -> -spec update_subscription(topic_filter(), subscription(), emqx_types:subopts(), id()) ->
subscription(). subscription().
update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) -> update_subscription(TopicFilter, DSSubExt, SubOpts, DSSessionID) ->
TopicFilter = emqx_topic:words(TopicFilterBin),
{ok, NDSSubExt, false} = session_add_subscription( {ok, NDSSubExt, false} = session_add_subscription(
DSSessionID, TopicFilter, SubOpts DSSessionID, TopicFilter, SubOpts
), ),
@ -439,8 +421,8 @@ update_subscription(TopicFilterBin, DSSubExt, SubOpts, DSSessionID) ->
-spec del_subscription(topic_filter(), id()) -> -spec del_subscription(topic_filter(), id()) ->
ok. ok.
del_subscription(TopicFilterBin, DSSessionId) -> del_subscription(TopicFilter, DSSessionId) ->
TopicFilter = emqx_topic:words(TopicFilterBin), %% TODO: transaction?
?tp_span( ?tp_span(
persistent_session_ds_subscription_delete, persistent_session_ds_subscription_delete,
#{session_id => DSSessionId}, #{session_id => DSSessionId},
@ -449,7 +431,7 @@ del_subscription(TopicFilterBin, DSSessionId) ->
?tp_span( ?tp_span(
persistent_session_ds_subscription_route_delete, persistent_session_ds_subscription_route_delete,
#{session_id => DSSessionId}, #{session_id => DSSessionId},
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionId) ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, DSSessionId)
). ).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -522,27 +504,33 @@ storage() ->
%% Note: session API doesn't handle session takeovers, it's the job of %% Note: session API doesn't handle session takeovers, it's the job of
%% the broker. %% the broker.
-spec session_open(id()) -> -spec session_open(id()) ->
{ok, session(), #{topic_filter_words() => subscription()}} | false. session() | false.
session_open(SessionId) -> session_open(SessionId) ->
transaction(fun() -> ro_transaction(fun() ->
case mnesia:read(?SESSION_TAB, SessionId, write) of case mnesia:read(?SESSION_TAB, SessionId, write) of
[Record = #session{}] -> [Record = #session{}] ->
Session = export_session(Record), Session = export_session(Record),
DSSubs = session_read_subscriptions(SessionId), DSSubs = session_read_subscriptions(SessionId),
Subscriptions = export_subscriptions(DSSubs), Subscriptions = export_subscriptions(DSSubs),
{ok, Session, Subscriptions}; Session#{
subscriptions => Subscriptions,
inflight => emqx_persistent_message_ds_replayer:new()
};
[] -> [] ->
false false
end end
end). end).
-spec session_ensure_new(id(), _Props :: map()) -> -spec session_ensure_new(id(), _Props :: map()) ->
{ok, session(), #{topic_filter_words() => subscription()}}. session().
session_ensure_new(SessionId, Props) -> session_ensure_new(SessionId, Props) ->
transaction(fun() -> transaction(fun() ->
ok = session_drop_subscriptions(SessionId), ok = session_drop_subscriptions(SessionId),
Session = export_session(session_create(SessionId, Props)), Session = export_session(session_create(SessionId, Props)),
{ok, Session, #{}} Session#{
subscriptions => #{},
inflight => emqx_persistent_message_ds_replayer:new()
}
end). end).
session_create(SessionId, Props) -> session_create(SessionId, Props) ->
@ -550,8 +538,7 @@ session_create(SessionId, Props) ->
id = SessionId, id = SessionId,
created_at = erlang:system_time(millisecond), created_at = erlang:system_time(millisecond),
expires_at = never, expires_at = never,
props = Props, props = Props
inflight = emqx_persistent_message_ds_replayer:new()
}, },
ok = mnesia:write(?SESSION_TAB, Session, write), ok = mnesia:write(?SESSION_TAB, Session, write),
Session. Session.
@ -573,15 +560,14 @@ session_drop_subscriptions(DSSessionId) ->
lists:foreach( lists:foreach(
fun(#ds_sub{id = DSSubId} = DSSub) -> fun(#ds_sub{id = DSSubId} = DSSub) ->
TopicFilter = subscription_id_to_topic_filter(DSSubId), TopicFilter = subscription_id_to_topic_filter(DSSubId),
TopicFilterBin = emqx_topic:join(TopicFilter), ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, DSSessionId),
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilterBin, DSSessionId),
ok = session_del_subscription(DSSub) ok = session_del_subscription(DSSub)
end, end,
Subscriptions Subscriptions
). ).
%% @doc Called when a client subscribes to a topic. Idempotent. %% @doc Called when a client subscribes to a topic. Idempotent.
-spec session_add_subscription(id(), topic_filter_words(), _Props :: map()) -> -spec session_add_subscription(id(), topic_filter(), _Props :: map()) ->
{ok, subscription(), _IsNew :: boolean()}. {ok, subscription(), _IsNew :: boolean()}.
session_add_subscription(DSSessionId, TopicFilter, Props) -> session_add_subscription(DSSessionId, TopicFilter, Props) ->
DSSubId = {DSSessionId, TopicFilter}, DSSubId = {DSSessionId, TopicFilter},
@ -606,7 +592,7 @@ session_add_subscription(DSSessionId, TopicFilter, Props) ->
end end
end). end).
-spec session_insert_subscription(id(), topic_filter_words(), map()) -> ds_sub(). -spec session_insert_subscription(id(), topic_filter(), map()) -> ds_sub().
session_insert_subscription(DSSessionId, TopicFilter, Props) -> session_insert_subscription(DSSessionId, TopicFilter, Props) ->
{DSSubId, StartMS} = new_subscription_id(DSSessionId, TopicFilter), {DSSubId, StartMS} = new_subscription_id(DSSessionId, TopicFilter),
DSSub = #ds_sub{ DSSub = #ds_sub{
@ -641,7 +627,7 @@ session_read_subscriptions(DSSessionId) ->
), ),
mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read). mnesia:select(?SESSION_SUBSCRIPTIONS_TAB, MS, read).
-spec new_subscription_id(id(), topic_filter_words()) -> {subscription_id(), integer()}. -spec new_subscription_id(id(), topic_filter()) -> {subscription_id(), integer()}.
new_subscription_id(DSSessionId, TopicFilter) -> new_subscription_id(DSSessionId, TopicFilter) ->
%% Note: here we use _milliseconds_ to match with the timestamp %% Note: here we use _milliseconds_ to match with the timestamp
%% field of `#message' record. %% field of `#message' record.
@ -808,10 +794,7 @@ receive_maximum(ConnInfo) ->
list_all_sessions() -> list_all_sessions() ->
DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB), DSSessionIds = mnesia:dirty_all_keys(?SESSION_TAB),
Sessions = lists:map( Sessions = lists:map(
fun(SessionID) -> fun(SessionID) -> {SessionID, session_open(SessionID)} end,
{ok, Session, Subscriptions} = session_open(SessionID),
{SessionID, #{session => Session, subscriptions => Subscriptions}}
end,
DSSessionIds DSSessionIds
), ),
maps:from_list(Sessions). maps:from_list(Sessions).