feat(queue): move route registration to sessions

This commit is contained in:
Ilya Averyanov 2024-07-11 11:36:20 +03:00
parent 81f4103d60
commit cae27293a5
3 changed files with 14 additions and 18 deletions

View File

@ -160,7 +160,8 @@ on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) ->
update_subscription(Subscription, ShareTopicFilter, SubOpts, Session).
-dialyzer({nowarn_function, create_new_subscription/3}).
create_new_subscription(ShareTopicFilter, SubOpts, #{
create_new_subscription(#share{topic = TopicFilter} = ShareTopicFilter, SubOpts, #{
id := SessionId,
s := S0,
shared_sub_s := #{agent := Agent} = SharedSubS0,
props := Props
@ -171,6 +172,9 @@ create_new_subscription(ShareTopicFilter, SubOpts, #{
)
of
ok ->
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
_ = emqx_external_broker:add_persistent_route(TopicFilter, SessionId),
#{upgrade_qos := UpgradeQoS} = Props,
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
@ -188,6 +192,7 @@ create_new_subscription(ShareTopicFilter, SubOpts, #{
S = emqx_persistent_session_ds_state:put_subscription(
ShareTopicFilter, Subscription, S3
),
SharedSubS = schedule_subscribe(SharedSubS0, ShareTopicFilter, SubOpts),
{ok, S, SharedSubS};
{error, _} = Error ->
@ -254,7 +259,7 @@ schedule_subscribe(
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(SessionId, ShareTopicFilter, S0, SharedSubS0) ->
on_unsubscribe(SessionId, #share{topic = TopicFilter} = ShareTopicFilter, S0, SharedSubS0) ->
case lookup(ShareTopicFilter, S0) of
undefined ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
@ -262,6 +267,8 @@ on_unsubscribe(SessionId, ShareTopicFilter, S0, SharedSubS0) ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, share_topic_filter => ShareTopicFilter
}),
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId),
_ = emqx_external_broker:delete_persistent_route(TopicFilter, SessionId),
S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0),
SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter),
{ok, S, SharedSubS, Subscription}

View File

@ -21,6 +21,7 @@
%% Till full implementation we need to dispach to the null agent.
%% It will report "not implemented" error for attempts to use shared subscriptions.
-define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent).
% -define(shared_subs_agent, emqx_ds_shared_sub_agent).
%% end of -ifdef(TEST).
-endif.

View File

@ -68,8 +68,6 @@
%%
group_id := group_id(),
topic := emqx_types:topic(),
%% For ds router, not an actual session_id
router_id := binary(),
%% TODO https://emqx.atlassian.net/browse/EMQX-12575
%% Implement some stats to assign evenly?
stream_states := #{
@ -108,10 +106,6 @@
-record(renew_leases, {}).
-record(drop_timeout, {}).
%% Constants
-define(START_TIME_THRESHOLD, 5000).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@ -148,8 +142,7 @@ init([#{share_topic_filter := #share{topic = Topic} = ShareTopicFilter} = _Optio
Data = #{
group_id => ShareTopicFilter,
topic => Topic,
router_id => gen_router_id(),
start_time => now_ms() - ?START_TIME_THRESHOLD,
start_time => now_ms(),
stream_states => #{},
stream_owners => #{},
agents => #{},
@ -170,9 +163,8 @@ handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_regist
end;
%%--------------------------------------------------------------------
%% repalying state
handle_event(enter, _OldState, ?leader_active, #{topic := Topic, router_id := RouterId} = _Data) ->
?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic, router_id => RouterId}),
ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId),
handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) ->
?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic}),
{keep_state_and_data, [
{{timeout, #renew_streams{}}, 0, #renew_streams{}},
{{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}},
@ -251,8 +243,7 @@ handle_event(Event, Content, State, _Data) ->
}),
keep_state_and_data.
terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) ->
ok = emqx_persistent_session_ds_router:do_delete_route(Topic, RouterId),
terminate(_Reason, _State, _Data) ->
ok.
%%--------------------------------------------------------------------
@ -889,9 +880,6 @@ agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0)
%% Helper functions
%%--------------------------------------------------------------------
gen_router_id() ->
emqx_guid:to_hexstr(emqx_guid:gen()).
now_ms() ->
erlang:system_time(millisecond).