diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index bbaf3fd10..3bf24407a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -160,7 +160,8 @@ on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) -> update_subscription(Subscription, ShareTopicFilter, SubOpts, Session). -dialyzer({nowarn_function, create_new_subscription/3}). -create_new_subscription(ShareTopicFilter, SubOpts, #{ +create_new_subscription(#share{topic = TopicFilter} = ShareTopicFilter, SubOpts, #{ + id := SessionId, s := S0, shared_sub_s := #{agent := Agent} = SharedSubS0, props := Props @@ -171,6 +172,9 @@ create_new_subscription(ShareTopicFilter, SubOpts, #{ ) of ok -> + ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId), + _ = emqx_external_broker:add_persistent_route(TopicFilter, SessionId), + #{upgrade_qos := UpgradeQoS} = Props, {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), {SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1), @@ -188,6 +192,7 @@ create_new_subscription(ShareTopicFilter, SubOpts, #{ S = emqx_persistent_session_ds_state:put_subscription( ShareTopicFilter, Subscription, S3 ), + SharedSubS = schedule_subscribe(SharedSubS0, ShareTopicFilter, SubOpts), {ok, S, SharedSubS}; {error, _} = Error -> @@ -254,7 +259,7 @@ schedule_subscribe( ) -> {ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()} | {error, emqx_types:reason_code()}. -on_unsubscribe(SessionId, ShareTopicFilter, S0, SharedSubS0) -> +on_unsubscribe(SessionId, #share{topic = TopicFilter} = ShareTopicFilter, S0, SharedSubS0) -> case lookup(ShareTopicFilter, S0) of undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}; @@ -262,6 +267,8 @@ on_unsubscribe(SessionId, ShareTopicFilter, S0, SharedSubS0) -> ?tp(persistent_session_ds_subscription_delete, #{ session_id => SessionId, share_topic_filter => ShareTopicFilter }), + ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId), + _ = emqx_external_broker:delete_persistent_route(TopicFilter, SessionId), S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0), SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter), {ok, S, SharedSubS, Subscription} diff --git a/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl index 4fcd43e8a..ea2d41def 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds/shared_subs_agent.hrl @@ -21,6 +21,7 @@ %% Till full implementation we need to dispach to the null agent. %% It will report "not implemented" error for attempts to use shared subscriptions. -define(shared_subs_agent, emqx_persistent_session_ds_shared_subs_null_agent). +% -define(shared_subs_agent, emqx_ds_shared_sub_agent). %% end of -ifdef(TEST). -endif. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 24b78155f..912253205 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -68,8 +68,6 @@ %% group_id := group_id(), topic := emqx_types:topic(), - %% For ds router, not an actual session_id - router_id := binary(), %% TODO https://emqx.atlassian.net/browse/EMQX-12575 %% Implement some stats to assign evenly? stream_states := #{ @@ -108,10 +106,6 @@ -record(renew_leases, {}). -record(drop_timeout, {}). -%% Constants - --define(START_TIME_THRESHOLD, 5000). - %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -148,8 +142,7 @@ init([#{share_topic_filter := #share{topic = Topic} = ShareTopicFilter} = _Optio Data = #{ group_id => ShareTopicFilter, topic => Topic, - router_id => gen_router_id(), - start_time => now_ms() - ?START_TIME_THRESHOLD, + start_time => now_ms(), stream_states => #{}, stream_owners => #{}, agents => #{}, @@ -170,9 +163,8 @@ handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_regist end; %%-------------------------------------------------------------------- %% repalying state -handle_event(enter, _OldState, ?leader_active, #{topic := Topic, router_id := RouterId} = _Data) -> - ?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic, router_id => RouterId}), - ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId), +handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) -> + ?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic}), {keep_state_and_data, [ {{timeout, #renew_streams{}}, 0, #renew_streams{}}, {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}, @@ -251,8 +243,7 @@ handle_event(Event, Content, State, _Data) -> }), keep_state_and_data. -terminate(_Reason, _State, #{topic := Topic, router_id := RouterId} = _Data) -> - ok = emqx_persistent_session_ds_router:do_delete_route(Topic, RouterId), +terminate(_Reason, _State, _Data) -> ok. %%-------------------------------------------------------------------- @@ -889,9 +880,6 @@ agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0) %% Helper functions %%-------------------------------------------------------------------- -gen_router_id() -> - emqx_guid:to_hexstr(emqx_guid:gen()). - now_ms() -> erlang:system_time(millisecond).