From f21356946089addbfd965cf39be3e3ecbd967568 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 10 Jul 2024 19:02:15 +0300 Subject: [PATCH] feat(queue): clarify naming; identify shared subs by full topic filter --- ...emqx_persistent_session_ds_shared_subs.erl | 137 ++++++++-------- ...ersistent_session_ds_shared_subs_agent.erl | 44 ++--- .../src/emqx_ds_shared_sub_agent.erl | 152 ++++++++++++------ .../src/emqx_ds_shared_sub_group_sm.erl | 61 +++---- .../src/emqx_ds_shared_sub_leader.erl | 60 +++---- .../src/emqx_ds_shared_sub_proto.erl | 14 +- .../src/emqx_ds_shared_sub_proto.hrl | 50 +++--- .../src/emqx_ds_shared_sub_registry.erl | 25 +-- .../src/proto/emqx_ds_shared_sub_proto_v1.erl | 4 +- 9 files changed, 303 insertions(+), 244 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 1f4d6f6e9..506114f35 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -119,8 +119,8 @@ new(Opts) -> {ok, emqx_persistent_session_ds_state:t(), t()}. open(S, Opts) -> SharedSubscriptions = fold_shared_subs( - fun(#share{} = TopicFilter, Sub, Acc) -> - [{TopicFilter, to_agent_subscription(S, Sub)} | Acc] + fun(#share{} = ShareTopicFilter, Sub, Acc) -> + [{ShareTopicFilter, to_agent_subscription(S, Sub)} | Acc] end, [], S @@ -139,33 +139,33 @@ open(S, Opts) -> emqx_types:subopts(), emqx_persistent_session_ds:session() ) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}. -on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) -> - Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S), - on_subscribe(Subscription, TopicFilter, SubOpts, Session). +on_subscribe(#share{} = ShareTopicFilter, SubOpts, #{s := S} = Session) -> + Subscription = emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S), + on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session). %%-------------------------------------------------------------------- %% on_subscribe internal functions -on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) -> +on_subscribe(undefined, ShareTopicFilter, SubOpts, #{props := Props, s := S} = Session) -> #{max_subscriptions := MaxSubscriptions} = Props, case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of true -> - create_new_subscription(TopicFilter, SubOpts, Session); + create_new_subscription(ShareTopicFilter, SubOpts, Session); false -> {error, ?RC_QUOTA_EXCEEDED} end; -on_subscribe(Subscription, TopicFilter, SubOpts, Session) -> - update_subscription(Subscription, TopicFilter, SubOpts, Session). +on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) -> + update_subscription(Subscription, ShareTopicFilter, SubOpts, Session). -dialyzer({nowarn_function, create_new_subscription/3}). -create_new_subscription(TopicFilter, SubOpts, #{ +create_new_subscription(ShareTopicFilter, SubOpts, #{ s := S0, shared_sub_s := #{agent := Agent} = SharedSubS0, props := Props }) -> case emqx_persistent_session_ds_shared_subs_agent:can_subscribe( - Agent, TopicFilter, SubOpts + Agent, ShareTopicFilter, SubOpts ) of ok -> @@ -184,17 +184,19 @@ create_new_subscription(TopicFilter, SubOpts, #{ start_time => now_ms() }, S = emqx_persistent_session_ds_state:put_subscription( - TopicFilter, Subscription, S3 + ShareTopicFilter, Subscription, S3 ), - SharedSubS = schedule_subscribe(SharedSubS0, TopicFilter, SubOpts), + SharedSubS = schedule_subscribe(SharedSubS0, ShareTopicFilter, SubOpts), {ok, S, SharedSubS}; {error, _} = Error -> Error end. -update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{ - s := S0, shared_sub_s := SharedSubS, props := Props -}) -> +update_subscription( + #{current_state := SStateId0, id := SubId} = Sub0, ShareTopicFilter, SubOpts, #{ + s := S0, shared_sub_s := SharedSubS, props := Props + } +) -> #{upgrade_qos := UpgradeQoS} = Props, SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts}, case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of @@ -208,31 +210,33 @@ update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilt SStateId, SState, S1 ), Sub = Sub0#{current_state => SStateId}, - S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2), + S = emqx_persistent_session_ds_state:put_subscription(ShareTopicFilter, Sub, S2), {ok, S, SharedSubS} end. -dialyzer({nowarn_function, schedule_subscribe/3}). schedule_subscribe( - #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0, TopicFilter, SubOpts + #{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0, + ShareTopicFilter, + SubOpts ) -> case ScheduledActions0 of - #{TopicFilter := ScheduledAction} -> + #{ShareTopicFilter := ScheduledAction} -> ScheduledActions1 = ScheduledActions0#{ - TopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}} + ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}} }, ?tp(warning, shared_subs_schedule_subscribe_override, #{ - topic_filter => TopicFilter, + share_topic_filter => ShareTopicFilter, new_type => {?schedule_subscribe, SubOpts}, old_action => format_schedule_action(ScheduledAction) }), SharedSubS0#{scheduled_actions := ScheduledActions1}; _ -> ?tp(warning, shared_subs_schedule_subscribe_new, #{ - topic_filter => TopicFilter, subopts => SubOpts + share_topic_filter => ShareTopicFilter, subopts => SubOpts }), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe( - Agent0, TopicFilter, SubOpts + Agent0, ShareTopicFilter, SubOpts ), SharedSubS0#{agent => Agent1} end. @@ -242,22 +246,22 @@ schedule_subscribe( -spec on_unsubscribe( emqx_persistent_session_ds:id(), - emqx_persistent_session_ds:topic_filter(), + share_topic_filter(), emqx_persistent_session_ds_state:t(), t() ) -> {ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()} | {error, emqx_types:reason_code()}. -on_unsubscribe(SessionId, TopicFilter, S0, SharedSubS0) -> - case lookup(TopicFilter, S0) of +on_unsubscribe(SessionId, ShareTopicFilter, S0, SharedSubS0) -> + case lookup(ShareTopicFilter, S0) of undefined -> {error, ?RC_NO_SUBSCRIPTION_EXISTED}; #{id := SubId} = Subscription -> ?tp(persistent_session_ds_subscription_delete, #{ - session_id => SessionId, topic_filter => TopicFilter + session_id => SessionId, share_topic_filter => ShareTopicFilter }), - S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), - SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, TopicFilter), + S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0), + SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter), {ok, S, SharedSubS, Subscription} end. @@ -265,16 +269,16 @@ on_unsubscribe(SessionId, TopicFilter, S0, SharedSubS0) -> %% on_unsubscribe internal functions schedule_unsubscribe( - S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, TopicFilter + S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, ShareTopicFilter ) -> case ScheduledActions0 of - #{TopicFilter := ScheduledAction0} -> + #{ShareTopicFilter := ScheduledAction0} -> ScheduledAction1 = ScheduledAction0#{type => ?schedule_unsubscribe}, ScheduledActions1 = ScheduledActions0#{ - TopicFilter => ScheduledAction1 + ShareTopicFilter => ScheduledAction1 }, ?tp(warning, shared_subs_schedule_unsubscribe_override, #{ - topic_filter => TopicFilter, + share_topic_filter => ShareTopicFilter, new_type => ?schedule_unsubscribe, old_action => format_schedule_action(ScheduledAction0) }), @@ -282,14 +286,14 @@ schedule_unsubscribe( _ -> StreamKeys = stream_keys_by_sub_id(S, UnsubscridedSubId), ScheduledActions1 = ScheduledActions0#{ - TopicFilter => #{ + ShareTopicFilter => #{ type => ?schedule_unsubscribe, stream_keys_to_wait => StreamKeys, progresses => [] } }, ?tp(warning, shared_subs_schedule_unsubscribe_new, #{ - topic_filter => TopicFilter, + share_topic_filter => ShareTopicFilter, stream_keys => format_stream_keys(StreamKeys) }), SharedSubS0#{scheduled_actions := ScheduledActions1} @@ -322,13 +326,13 @@ renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = Sh %%-------------------------------------------------------------------- %% renew_streams internal functions -accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) -> +accept_stream(#{share_topic_filter := ShareTopicFilter} = Event, S, ScheduledActions) -> %% If we have a pending action (subscribe or unsubscribe) for this topic filter, %% we should not accept a stream and start replaying it. We won't use it anyway: %% * if subscribe is pending, we will reset agent obtain a new lease %% * if unsubscribe is pending, we will drop connection case ScheduledActions of - #{TopicFilter := _Action} -> + #{ShareTopicFilter := _Action} -> S; _ -> accept_stream(Event, S) @@ -336,13 +340,13 @@ accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) -> accept_stream( #{ - topic_filter := TopicFilter, + share_topic_filter := ShareTopicFilter, stream := Stream, progress := #{iterator := Iterator} = _Progress } = _Event, S0 ) -> - case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of + case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of undefined -> %% We unsubscribed S0; @@ -375,9 +379,9 @@ accept_stream( end. revoke_stream( - #{topic_filter := TopicFilter, stream := Stream}, S0 + #{share_topic_filter := ShareTopicFilter, stream := Stream}, S0 ) -> - case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of + case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of undefined -> %% This should not happen. %% Agent should have received unsubscribe callback @@ -427,12 +431,7 @@ all_stream_progresses(S, _Agent, NeedUnacked) -> CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), fold_shared_stream_states( - fun( - #share{group = Group}, - Stream, - SRS, - ProgressesAcc0 - ) -> + fun(ShareTopicFilter, Stream, SRS, ProgressesAcc0) -> case is_stream_started(CommQos1, CommQos2, SRS) and (NeedUnacked or is_stream_fully_acked(CommQos1, CommQos2, SRS)) @@ -440,7 +439,7 @@ all_stream_progresses(S, _Agent, NeedUnacked) -> true -> StreamProgress = stream_progress(CommQos1, CommQos2, Stream, SRS), maps:update_with( - Group, + ShareTopicFilter, fun(Progresses) -> [StreamProgress | Progresses] end, [StreamProgress], ProgressesAcc0 @@ -455,12 +454,12 @@ all_stream_progresses(S, _Agent, NeedUnacked) -> run_scheduled_actions(S, Agent, ScheduledActions) -> maps:fold( - fun(TopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) -> - case run_scheduled_action(S, AgentAcc0, TopicFilter, Action0) of + fun(ShareTopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) -> + case run_scheduled_action(S, AgentAcc0, ShareTopicFilter, Action0) of {ok, AgentAcc1} -> - {AgentAcc1, maps:remove(TopicFilter, ScheduledActionsAcc)}; + {AgentAcc1, maps:remove(ShareTopicFilter, ScheduledActionsAcc)}; {continue, Action1} -> - {AgentAcc0, ScheduledActionsAcc#{TopicFilter => Action1}} + {AgentAcc0, ScheduledActionsAcc#{ShareTopicFilter => Action1}} end end, {Agent, ScheduledActions}, @@ -470,7 +469,7 @@ run_scheduled_actions(S, Agent, ScheduledActions) -> run_scheduled_action( S, Agent0, - #share{group = Group} = TopicFilter, + ShareTopicFilter, #{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action ) -> StreamKeysToWait1 = filter_unfinished_streams(S, StreamKeysToWait0), @@ -478,31 +477,31 @@ run_scheduled_action( case StreamKeysToWait1 of [] -> ?tp(warning, shared_subs_schedule_action_complete, #{ - topic_filter => TopicFilter, + share_topic_filter => ShareTopicFilter, progresses => format_stream_progresses(Progresses1), type => Type }), %% Regular progress won't se unsubscribed streams, so we need to %% send the progress explicitly. Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress( - Agent0, #{Group => Progresses1} + Agent0, #{ShareTopicFilter => Progresses1} ), case Type of {?schedule_subscribe, SubOpts} -> {ok, emqx_persistent_session_ds_shared_subs_agent:on_subscribe( - Agent1, TopicFilter, SubOpts + Agent1, ShareTopicFilter, SubOpts )}; ?schedule_unsubscribe -> {ok, emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe( - Agent1, TopicFilter, Progresses1 + Agent1, ShareTopicFilter, Progresses1 )} end; _ -> Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1}, ?tp(warning, shared_subs_schedule_action_continue, #{ - topic_filter => TopicFilter, + share_topic_filter => ShareTopicFilter, new_action => format_schedule_action(Action1) }), {continue, Action1} @@ -551,8 +550,8 @@ on_disconnect(S0, #{agent := Agent0} = SharedSubS0) -> revoke_all_streams(S0) -> fold_shared_stream_states( - fun(TopicFilter, Stream, _SRS, S) -> - revoke_stream(#{topic_filter => TopicFilter, stream => Stream}, S) + fun(ShareTopicFilter, Stream, _SRS, S) -> + revoke_stream(#{share_topic_filter => ShareTopicFilter, stream => Stream}, S) end, S0, S0 @@ -580,8 +579,8 @@ to_map(_S, _SharedSubS) -> %% Generic helpers %%-------------------------------------------------------------------- -lookup(TopicFilter, S) -> - case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of +lookup(ShareTopicFilter, S) -> + case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S) of Sub = #{current_state := SStateId} -> case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of #{subopts := SubOpts} -> @@ -640,7 +639,7 @@ stream_progress( fold_shared_subs(Fun, Acc, S) -> emqx_persistent_session_ds_state:fold_subscriptions( fun - (#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0); + (#share{} = ShareTopicFilter, Sub, Acc0) -> Fun(ShareTopicFilter, Sub, Acc0); (_, _Sub, Acc0) -> Acc0 end, Acc, @@ -650,10 +649,10 @@ fold_shared_subs(Fun, Acc, S) -> fold_shared_stream_states(Fun, Acc, S) -> %% TODO %% Optimize or cache - TopicFilters = fold_shared_subs( + ShareTopicFilters = fold_shared_subs( fun - (#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) -> - Acc0#{Id => TopicFilter}; + (#share{} = ShareTopicFilter, #{id := Id} = _Sub, Acc0) -> + Acc0#{Id => ShareTopicFilter}; (_, _, Acc0) -> Acc0 end, @@ -662,9 +661,9 @@ fold_shared_stream_states(Fun, Acc, S) -> ), emqx_persistent_session_ds_state:fold_streams( fun({SubId, Stream}, SRS, Acc0) -> - case TopicFilters of - #{SubId := TopicFilter} -> - Fun(TopicFilter, Stream, SRS, Acc0); + case ShareTopicFilters of + #{SubId := ShareTopicFilter} -> + Fun(ShareTopicFilter, Stream, SRS, Acc0); _ -> Acc0 end diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl index b49ceabcf..022963ad9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs_agent.erl @@ -15,7 +15,7 @@ }. -type t() :: term(). --type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). +-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type opts() :: #{ session_id := session_id() @@ -28,21 +28,21 @@ -type stream_lease() :: #{ type => lease, %% Used as "external" subscription_id - topic_filter := topic_filter(), + share_topic_filter := share_topic_filter(), stream := emqx_ds:stream(), iterator := emqx_ds:iterator() }. -type stream_revoke() :: #{ type => revoke, - topic_filter := topic_filter(), + share_topic_filter := share_topic_filter(), stream := emqx_ds:stream() }. -type stream_lease_event() :: stream_lease() | stream_revoke(). -type stream_progress() :: #{ - topic_filter := topic_filter(), + share_topic_filter := share_topic_filter(), stream := emqx_ds:stream(), iterator := emqx_ds:iterator(), use_finished := boolean() @@ -80,13 +80,13 @@ %%-------------------------------------------------------------------- -callback new(opts()) -> t(). --callback open([{topic_filter(), subscription()}], opts()) -> t(). --callback can_subscribe(t(), topic_filter(), emqx_types:subopts()) -> ok | {error, term()}. --callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> t(). --callback on_unsubscribe(t(), topic_filter(), [stream_progress()]) -> t(). --callback on_disconnect(t(), #{emqx_types:group() => [stream_progress()]}) -> t(). +-callback open([{share_topic_filter(), subscription()}], opts()) -> t(). +-callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}. +-callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t(). +-callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t(). +-callback on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t(). -callback renew_streams(t()) -> {[stream_lease_event()], t()}. --callback on_stream_progress(t(), #{emqx_types:group() => [stream_progress()]}) -> t(). +-callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t(). -callback on_info(t(), term()) -> t(). %%-------------------------------------------------------------------- @@ -97,23 +97,23 @@ new(Opts) -> ?shared_subs_agent:new(Opts). --spec open([{topic_filter(), subscription()}], opts()) -> t(). +-spec open([{share_topic_filter(), subscription()}], opts()) -> t(). open(Topics, Opts) -> ?shared_subs_agent:open(Topics, Opts). --spec can_subscribe(t(), topic_filter(), emqx_types:subopts()) -> ok | {error, term()}. -can_subscribe(Agent, TopicFilter, SubOpts) -> - ?shared_subs_agent:can_subscribe(Agent, TopicFilter, SubOpts). +-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}. +can_subscribe(Agent, ShareTopicFilter, SubOpts) -> + ?shared_subs_agent:can_subscribe(Agent, ShareTopicFilter, SubOpts). --spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> t(). -on_subscribe(Agent, TopicFilter, SubOpts) -> - ?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts). +-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t(). +on_subscribe(Agent, ShareTopicFilter, SubOpts) -> + ?shared_subs_agent:on_subscribe(Agent, ShareTopicFilter, SubOpts). --spec on_unsubscribe(t(), topic_filter(), [stream_progress()]) -> t(). -on_unsubscribe(Agent, TopicFilter, StreamProgresses) -> - ?shared_subs_agent:on_unsubscribe(Agent, TopicFilter, StreamProgresses). +-spec on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t(). +on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses) -> + ?shared_subs_agent:on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses). --spec on_disconnect(t(), #{emqx_types:group() => [stream_progress()]}) -> t(). +-spec on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t(). on_disconnect(Agent, StreamProgresses) -> ?shared_subs_agent:on_disconnect(Agent, StreamProgresses). @@ -121,7 +121,7 @@ on_disconnect(Agent, StreamProgresses) -> renew_streams(Agent) -> ?shared_subs_agent:renew_streams(Agent). --spec on_stream_progress(t(), #{emqx_types:group() => [stream_progress()]}) -> t(). +-spec on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t(). on_stream_progress(Agent, StreamProgress) -> ?shared_subs_agent:on_stream_progress(Agent, StreamProgress). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index b896370f3..005307ca2 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -26,18 +26,66 @@ -behaviour(emqx_persistent_session_ds_shared_subs_agent). +-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). +-type group_id() :: share_topic_filter(). + +-type progress() :: emqx_persistent_session_ds_shared_subs:progress(). +-type external_lease_event() :: + #{ + type => lease, + stream => emqx_ds:stream(), + progress => progress(), + share_topic_filter => emqx_persistent_session_ds:share_topic_filter() + } + | #{ + type => revoke, + stream => emqx_ds:stream(), + share_topic_filter => emqx_persistent_session_ds:share_topic_filter() + }. + +-type options() :: #{ + session_id := emqx_persistent_session_ds:id() +}. + +-type t() :: #{ + groups := #{ + group_id() => emqx_ds_shared_sub_group_sm:t() + }, + session_id := emqx_persistent_session_ds:id() +}. + +%% Techinically, group_id and share_topic_filter are the same. +%% However, we speak in the terms of share_topic_filter in the API, +%% which is known to the shared subscription handler of persistent session. +%% +%% And we speak in the terms of group_id internally: +%% * we keep group_sm's in the state by group_id +%% * we use group_id to address group_sm's, e.g. when sending messages to them +%% from leader or from themselves. +-define(group_id(ShareTopicFilter), ShareTopicFilter). +-define(share_topic_filter(GroupId), GroupId). + -record(message_to_group_sm, { - group :: emqx_types:group(), + group_id :: group_id(), message :: term() }). +-export_type([ + t/0, + group_id/0, + options/0, + external_lease_event/0 +]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- +-spec new(options()) -> t(). new(Opts) -> init_state(Opts). +-spec open([{share_topic_filter(), emqx_types:subopts()}], options()) -> t(). open(TopicSubscriptions, Opts) -> State0 = init_state(Opts), State1 = lists:foldl( @@ -45,32 +93,41 @@ open(TopicSubscriptions, Opts) -> ?tp(warning, ds_agent_open_subscription, #{ topic_filter => ShareTopicFilter }), - add_group_subscription(State, ShareTopicFilter) + add_shared_subscription(State, ShareTopicFilter) end, State0, TopicSubscriptions ), State1. -can_subscribe(_State, _TopicFilter, _SubOpts) -> +-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok. +can_subscribe(_State, _ShareTopicFilter, _SubOpts) -> ok. -on_subscribe(State0, TopicFilter, _SubOpts) -> +-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t(). +on_subscribe(State0, ShareTopicFilter, _SubOpts) -> ?tp(warning, ds_agent_on_subscribe, #{ - topic_filter => TopicFilter + share_topic_filter => ShareTopicFilter }), - add_group_subscription(State0, TopicFilter). + add_shared_subscription(State0, ShareTopicFilter). -on_unsubscribe(State, TopicFilter, GroupProgress) -> - delete_group_subscription(State, TopicFilter, GroupProgress). +-spec on_unsubscribe(t(), share_topic_filter(), [ + emqx_persistent_session_ds_shared_subs:agent_stream_progress() +]) -> t(). +on_unsubscribe(State, ShareTopicFilter, GroupProgress) -> + delete_shared_subscription(State, ShareTopicFilter, GroupProgress). +-spec renew_streams(t()) -> {[emqx_persistent_session_ds_shared_subs:agent_stream_event()], t()}. renew_streams(#{} = State) -> fetch_stream_events(State). +-spec on_stream_progress(t(), #{ + share_topic_filter() => [emqx_persistent_session_ds_shared_subs:agent_stream_progress()] +}) -> t(). on_stream_progress(State, StreamProgresses) -> maps:fold( - fun(Group, GroupProgresses, StateAcc) -> - with_group_sm(StateAcc, Group, fun(GSM) -> + fun(ShareTopicFilter, GroupProgresses, StateAcc) -> + with_group_sm(StateAcc, ?group_id(ShareTopicFilter), fun(GSM) -> emqx_ds_shared_sub_group_sm:handle_stream_progress(GSM, GroupProgresses) end) end, @@ -78,72 +135,74 @@ on_stream_progress(State, StreamProgresses) -> StreamProgresses ). +-spec on_disconnect(t(), [emqx_persistent_session_ds_shared_subs:agent_stream_progress()]) -> t(). on_disconnect(#{groups := Groups0} = State, StreamProgresses) -> ok = maps:foreach( - fun(Group, GroupSM0) -> - GroupProgresses = maps:get(Group, StreamProgresses, []), + fun(GroupId, GroupSM0) -> + GroupProgresses = maps:get(?share_topic_filter(GroupId), StreamProgresses, []), emqx_ds_shared_sub_group_sm:handle_disconnect(GroupSM0, GroupProgresses) end, Groups0 ), State#{groups => #{}}. -on_info(State, ?leader_lease_streams_match(Group, Leader, StreamProgresses, Version)) -> +-spec on_info(t(), term()) -> t(). +on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Version)) -> ?SLOG(info, #{ msg => leader_lease_streams, - group => Group, + group_id => GroupId, streams => StreamProgresses, version => Version, leader => Leader }), - with_group_sm(State, Group, fun(GSM) -> + with_group_sm(State, GroupId, fun(GSM) -> emqx_ds_shared_sub_group_sm:handle_leader_lease_streams( GSM, Leader, StreamProgresses, Version ) end); -on_info(State, ?leader_renew_stream_lease_match(Group, Version)) -> +on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) -> ?SLOG(info, #{ msg => leader_renew_stream_lease, - group => Group, + group_id => GroupId, version => Version }), - with_group_sm(State, Group, fun(GSM) -> + with_group_sm(State, GroupId, fun(GSM) -> emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version) end); -on_info(State, ?leader_renew_stream_lease_match(Group, VersionOld, VersionNew)) -> +on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)) -> ?SLOG(info, #{ msg => leader_renew_stream_lease, - group => Group, + group_id => GroupId, version_old => VersionOld, version_new => VersionNew }), - with_group_sm(State, Group, fun(GSM) -> + with_group_sm(State, GroupId, fun(GSM) -> emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) end); -on_info(State, ?leader_update_streams_match(Group, VersionOld, VersionNew, StreamsNew)) -> +on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew)) -> ?SLOG(info, #{ msg => leader_update_streams, - group => Group, + group_id => GroupId, version_old => VersionOld, version_new => VersionNew, streams_new => StreamsNew }), - with_group_sm(State, Group, fun(GSM) -> + with_group_sm(State, GroupId, fun(GSM) -> emqx_ds_shared_sub_group_sm:handle_leader_update_streams( GSM, VersionOld, VersionNew, StreamsNew ) end); -on_info(State, ?leader_invalidate_match(Group)) -> +on_info(State, ?leader_invalidate_match(GroupId)) -> ?SLOG(info, #{ msg => leader_invalidate, - group => Group + group_id => GroupId }), - with_group_sm(State, Group, fun(GSM) -> + with_group_sm(State, GroupId, fun(GSM) -> emqx_ds_shared_sub_group_sm:handle_leader_invalidate(GSM) end); %% Generic messages sent by group_sm's to themselves (timeouts). -on_info(State, #message_to_group_sm{group = Group, message = Message}) -> - with_group_sm(State, Group, fun(GSM) -> +on_info(State, #message_to_group_sm{group_id = GroupId, message = Message}) -> + with_group_sm(State, GroupId, fun(GSM) -> emqx_ds_shared_sub_group_sm:handle_info(GSM, Message) end). @@ -158,29 +217,30 @@ init_state(Opts) -> groups => #{} }. -delete_group_subscription(State, #share{group = Group}, GroupProgress) -> +delete_shared_subscription(State, ShareTopicFilter, GroupProgress) -> + GroupId = ?group_id(ShareTopicFilter), case State of - #{groups := #{Group := GSM} = Groups} -> + #{groups := #{GroupId := GSM} = Groups} -> _ = emqx_ds_shared_sub_group_sm:handle_disconnect(GSM, GroupProgress), - State#{groups => maps:remove(Group, Groups)}; + State#{groups => maps:remove(GroupId, Groups)}; _ -> State end. -add_group_subscription( +add_shared_subscription( #{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter ) -> ?SLOG(info, #{ - msg => agent_add_group_subscription, - topic_filter => ShareTopicFilter + msg => agent_add_shared_subscription, + share_topic_filter => ShareTopicFilter }), - #share{group = Group} = ShareTopicFilter, + GroupId = ?group_id(ShareTopicFilter), Groups1 = Groups0#{ - Group => emqx_ds_shared_sub_group_sm:new(#{ + GroupId => emqx_ds_shared_sub_group_sm:new(#{ session_id => SessionId, - topic_filter => ShareTopicFilter, + share_topic_filter => ShareTopicFilter, agent => this_agent(SessionId), - send_after => send_to_subscription_after(Group) + send_after => send_to_subscription_after(GroupId) }) }, State1 = State0#{groups => Groups1}, @@ -188,9 +248,9 @@ add_group_subscription( fetch_stream_events(#{groups := Groups0} = State0) -> {Groups1, Events} = maps:fold( - fun(Group, GroupSM0, {GroupsAcc, EventsAcc}) -> + fun(GroupId, GroupSM0, {GroupsAcc, EventsAcc}) -> {GroupSM1, Events} = emqx_ds_shared_sub_group_sm:fetch_stream_events(GroupSM0), - {GroupsAcc#{Group => GroupSM1}, [Events | EventsAcc]} + {GroupsAcc#{GroupId => GroupSM1}, [Events | EventsAcc]} end, {#{}, []}, Groups0 @@ -201,20 +261,20 @@ fetch_stream_events(#{groups := Groups0} = State0) -> this_agent(Id) -> emqx_ds_shared_sub_proto:agent(Id, self()). -send_to_subscription_after(Group) -> +send_to_subscription_after(GroupId) -> fun(Time, Msg) -> emqx_persistent_session_ds_shared_subs_agent:send_after( Time, self(), - #message_to_group_sm{group = Group, message = Msg} + #message_to_group_sm{group_id = GroupId, message = Msg} ) end. -with_group_sm(State, Group, Fun) -> +with_group_sm(State, GroupId, Fun) -> case State of - #{groups := #{Group := GSM0} = Groups} -> + #{groups := #{GroupId := GSM0} = Groups} -> #{} = GSM1 = Fun(GSM0), - State#{groups => Groups#{Group => GSM1}}; + State#{groups => Groups#{GroupId => GSM1}}; _ -> %% TODO %% Error? diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index 7d260dc0b..2b37328a2 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -41,7 +41,7 @@ -type options() :: #{ session_id := emqx_persistent_session_ds:id(), agent := emqx_ds_shared_sub_proto:agent(), - topic_filter := emqx_persistent_session_ds:share_topic_filter(), + share_topic_filter := emqx_persistent_session_ds:share_topic_filter(), send_after := fun((non_neg_integer(), term()) -> reference()) }. @@ -58,19 +58,6 @@ stream => emqx_ds:stream() }. --type external_lease_event() :: - #{ - type => lease, - stream => emqx_ds:stream(), - progress => progress(), - topic_filter => emqx_persistent_session_ds:share_topic_filter() - } - | #{ - type => revoke, - stream => emqx_ds:stream(), - topic_filter => emqx_persistent_session_ds:share_topic_filter() - }. - %% GroupSM States -define(connecting, connecting). @@ -111,7 +98,7 @@ -type timer() :: #timer{}. -type group_sm() :: #{ - topic_filter => emqx_persistent_session_ds:share_topic_filter(), + share_topic_filter => emqx_persistent_session_ds:share_topic_filter(), agent => emqx_ds_shared_sub_proto:agent(), send_after => fun((non_neg_integer(), term()) -> reference()), stream_lease_events => list(stream_lease_event()), @@ -129,7 +116,7 @@ new(#{ session_id := SessionId, agent := Agent, - topic_filter := ShareTopicFilter, + share_topic_filter := ShareTopicFilter, send_after := SendAfter }) -> ?SLOG( @@ -137,32 +124,33 @@ new(#{ #{ msg => group_sm_new, agent => Agent, - topic_filter => ShareTopicFilter + share_topic_filter => ShareTopicFilter } ), GSM0 = #{ id => SessionId, - topic_filter => ShareTopicFilter, + share_topic_filter => ShareTopicFilter, agent => Agent, send_after => SendAfter }, ?tp(warning, group_sm_new, #{ agent => Agent, - topic_filter => ShareTopicFilter + share_topic_filter => ShareTopicFilter }), transition(GSM0, ?connecting, #{}). --spec fetch_stream_events(group_sm()) -> {group_sm(), list(external_lease_event())}. +-spec fetch_stream_events(group_sm()) -> + {group_sm(), [emqx_ds_shared_sub_agent:external_lease_event()]}. fetch_stream_events( #{ state := _State, - topic_filter := TopicFilter, + share_topic_filter := ShareTopicFilter, stream_lease_events := Events0 } = GSM ) -> Events1 = lists:map( fun(Event) -> - Event#{topic_filter => TopicFilter} + Event#{share_topic_filter => ShareTopicFilter} end, Events0 ), @@ -187,18 +175,21 @@ handle_disconnect( %%----------------------------------------------------------------------- %% Connecting state -handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) -> +handle_connecting(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) -> ?tp(warning, group_sm_enter_connecting, #{ agent => Agent, - topic_filter => ShareTopicFilter + share_topic_filter => ShareTopicFilter }), ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter), ensure_state_timeout(GSM, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)). handle_leader_lease_streams( - #{state := ?connecting, topic_filter := TopicFilter} = GSM0, Leader, StreamProgresses, Version + #{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM0, + Leader, + StreamProgresses, + Version ) -> - ?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}), + ?tp(debug, leader_lease_streams, #{share_topic_filter => ShareTopicFilter}), Streams = progresses_to_map(StreamProgresses), StreamLeaseEvents = progresses_to_lease_events(StreamProgresses), transition( @@ -215,12 +206,12 @@ handle_leader_lease_streams( handle_leader_lease_streams(GSM, _Leader, _StreamProgresses, _Version) -> GSM. -handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) -> +handle_find_leader_timeout(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM0) -> ?tp(warning, group_sm_find_leader_timeout, #{ agent => Agent, - topic_filter => TopicFilter + share_topic_filter => ShareTopicFilter }), - ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), TopicFilter), + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), ShareTopicFilter), GSM1 = ensure_state_timeout( GSM0, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms) ), @@ -238,8 +229,8 @@ handle_replaying(GSM0) -> ), GSM2. -handle_renew_lease_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM) -> - ?tp(warning, renew_lease_timeout, #{agent => Agent, topic_filter => TopicFilter}), +handle_renew_lease_timeout(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) -> + ?tp(warning, renew_lease_timeout, #{agent => Agent, share_topic_filter => ShareTopicFilter}), transition(GSM, ?connecting, #{}). %%----------------------------------------------------------------------- @@ -429,10 +420,10 @@ handle_stream_progress( handle_stream_progress(#{state := ?disconnected} = GSM, _StreamProgresses) -> GSM. -handle_leader_invalidate(#{agent := Agent, topic_filter := TopicFilter} = GSM) -> +handle_leader_invalidate(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) -> ?tp(warning, shared_sub_group_sm_leader_invalidate, #{ agent => Agent, - topic_filter => TopicFilter + share_topic_filter => ShareTopicFilter }), transition(GSM, ?connecting, #{}). @@ -441,11 +432,11 @@ handle_leader_invalidate(#{agent := Agent, topic_filter := TopicFilter} = GSM) - %%----------------------------------------------------------------------- handle_state_timeout( - #{state := ?connecting, topic_filter := TopicFilter} = GSM, + #{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM, find_leader_timeout, _Message ) -> - ?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}), + ?tp(debug, find_leader_timeout, #{share_topic_filter => ShareTopicFilter}), handle_find_leader_timeout(GSM); handle_state_timeout( #{state := ?replaying} = GSM, diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index e98c74b27..3a2081f1b 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -27,8 +27,12 @@ terminate/3 ]). +-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). + +-type group_id() :: share_topic_filter(). + -type options() :: #{ - topic_filter := emqx_persistent_session_ds:share_topic_filter() + share_topic_filter := share_topic_filter() }. %% Agent states @@ -39,7 +43,7 @@ -define(updating, updating). -type agent_state() :: #{ - %% Our view of group sm's status + %% Our view of group_id sm's status %% it lags the actual state state := ?waiting_replaying | ?replaying | ?waiting_updating | ?updating, prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()), @@ -62,7 +66,7 @@ %% %% Persistent data %% - group := emqx_types:group(), + group_id := group_id(), topic := emqx_types:topic(), %% For ds router, not an actual session_id router_id := binary(), @@ -119,9 +123,9 @@ register(Pid, Fun) -> %% Internal API %%-------------------------------------------------------------------- -child_spec(#{topic_filter := TopicFilter} = Options) -> +child_spec(#{share_topic_filter := ShareTopicFilter} = Options) -> #{ - id => id(TopicFilter), + id => id(ShareTopicFilter), start => {?MODULE, start_link, [Options]}, restart => temporary, shutdown => 5000, @@ -131,8 +135,8 @@ child_spec(#{topic_filter := TopicFilter} = Options) -> start_link(Options) -> gen_statem:start_link(?MODULE, [Options], []). -id(#share{group = Group} = _TopicFilter) -> - {?MODULE, Group}. +id(ShareTopicFilter) -> + {?MODULE, ShareTopicFilter}. %%-------------------------------------------------------------------- %% gen_statem callbacks @@ -140,9 +144,9 @@ id(#share{group = Group} = _TopicFilter) -> callback_mode() -> [handle_event_function, state_enter]. -init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) -> +init([#{share_topic_filter := #share{topic = Topic} = ShareTopicFilter} = _Options]) -> Data = #{ - group => Group, + group_id => ShareTopicFilter, topic => Topic, router_id => gen_router_id(), start_time => now_ms() - ?START_TIME_THRESHOLD, @@ -463,14 +467,14 @@ select_streams_for_assign(Data0, _Agent, AssignCount) -> %% Handle a newly connected agent connect_agent( - #{group := Group, agents := Agents} = Data, + #{group_id := GroupId, agents := Agents} = Data, Agent, AgentMetadata ) -> ?SLOG(info, #{ msg => leader_agent_connected, agent => Agent, - group => Group + group_id => GroupId }), case Agents of #{Agent := AgentState} -> @@ -583,22 +587,22 @@ renew_leases(#{agents := AgentStates} = Data) -> ), Data. -renew_lease(#{group := Group}, Agent, #{state := ?replaying, version := Version}) -> - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version); -renew_lease(#{group := Group}, Agent, #{state := ?waiting_replaying, version := Version}) -> - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version); -renew_lease(#{group := Group} = Data, Agent, #{ +renew_lease(#{group_id := GroupId}, Agent, #{state := ?replaying, version := Version}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version); +renew_lease(#{group_id := GroupId}, Agent, #{state := ?waiting_replaying, version := Version}) -> + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version); +renew_lease(#{group_id := GroupId} = Data, Agent, #{ streams := Streams, state := ?waiting_updating, version := Version, prev_version := PrevVersion }) -> StreamProgresses = stream_progresses(Data, Streams), ok = emqx_ds_shared_sub_proto:leader_update_streams( - Agent, Group, PrevVersion, Version, StreamProgresses + Agent, GroupId, PrevVersion, Version, StreamProgresses ), - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, PrevVersion, Version); -renew_lease(#{group := Group}, Agent, #{ + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version); +renew_lease(#{group_id := GroupId}, Agent, #{ state := ?updating, version := Version, prev_version := PrevVersion }) -> - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, PrevVersion, Version). + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version). %%-------------------------------------------------------------------- %% Handle stream progress updates from agent in replaying state @@ -802,7 +806,7 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers %%-------------------------------------------------------------------- agent_transition_to_waiting_updating( - #{group := Group} = Data, + #{group_id := GroupId} = Data, Agent, #{state := OldState, version := Version, prev_version := undefined} = AgentState0, Streams, @@ -825,19 +829,19 @@ agent_transition_to_waiting_updating( AgentState2 = renew_no_replaying_deadline(AgentState1), StreamProgresses = stream_progresses(Data, Streams), ok = emqx_ds_shared_sub_proto:leader_update_streams( - Agent, Group, Version, NewVersion, StreamProgresses + Agent, GroupId, Version, NewVersion, StreamProgresses ), AgentState2. agent_transition_to_waiting_replaying( - #{group := Group} = _Data, Agent, #{state := OldState, version := Version} = AgentState0 + #{group_id := GroupId} = _Data, Agent, #{state := OldState, version := Version} = AgentState0 ) -> ?tp(warning, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => OldState, new_state => ?waiting_replaying }), - ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version), + ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version), AgentState1 = AgentState0#{ state => ?waiting_replaying, revoked_streams => [] @@ -845,7 +849,7 @@ agent_transition_to_waiting_replaying( renew_no_replaying_deadline(AgentState1). agent_transition_to_initial_waiting_replaying( - #{group := Group} = Data, Agent, AgentMetadata, InitialStreams + #{group_id := GroupId} = Data, Agent, AgentMetadata, InitialStreams ) -> ?tp(warning, shared_sub_leader_agent_state_transition, #{ agent => Agent, @@ -856,7 +860,7 @@ agent_transition_to_initial_waiting_replaying( StreamProgresses = stream_progresses(Data, InitialStreams), Leader = this_leader(Data), ok = emqx_ds_shared_sub_proto:leader_lease_streams( - Agent, Group, Leader, StreamProgresses, Version + Agent, GroupId, Leader, StreamProgresses, Version ), AgentState = #{ metadata => AgentMetadata, @@ -1015,8 +1019,8 @@ drop_agent(#{agents := Agents} = Data0, Agent) -> ?tp(warning, shared_sub_leader_drop_agent, #{agent => Agent}), Data1#{agents => maps:remove(Agent, Agents)}. -invalidate_agent(#{group := Group}, Agent) -> - ok = emqx_ds_shared_sub_proto:leader_invalidate(Agent, Group). +invalidate_agent(#{group_id := GroupId}, Agent) -> + ok = emqx_ds_shared_sub_proto:leader_invalidate(Agent, GroupId). drop_invalidate_agent(Data0, Agent) -> Data1 = drop_agent(Data0, Agent), diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index e74fae19c..383f66ff2 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -33,7 +33,7 @@ -type agent() :: ?agent(emqx_persistent_session_ds:id(), pid()). -type leader() :: pid(). --type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). +-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type group() :: emqx_types:group(). -type version() :: non_neg_integer(). -type agent_metadata() :: #{ @@ -63,8 +63,8 @@ %% agent -> leader messages --spec agent_connect_leader(leader(), agent(), agent_metadata(), topic_filter()) -> ok. -agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) when +-spec agent_connect_leader(leader(), agent(), agent_metadata(), share_topic_filter()) -> ok. +agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) when ?is_local_leader(ToLeader) -> ?tp(warning, shared_sub_proto_msg, #{ @@ -72,13 +72,13 @@ agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) when to_leader => ToLeader, from_agent => FromAgent, agent_metadata => AgentMetadata, - topic_filter => TopicFilter + share_topic_filter => ShareTopicFilter }), - _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, TopicFilter)), + _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)), ok; -agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) -> +agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) -> emqx_ds_shared_sub_proto_v1:agent_connect_leader( - ?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, TopicFilter + ?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, ShareTopicFilter ). -spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl index f8158c918..bf54b2930 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl @@ -21,16 +21,16 @@ %% Agent messages sent to the leader. %% Leader talks to many agents, `agent` field is used to identify the sender. --define(agent_connect_leader(Agent, AgentMetadata, TopicFilter), #{ +-define(agent_connect_leader(Agent, AgentMetadata, ShareTopicFilter), #{ type => ?agent_connect_leader_msg, - topic_filter => TopicFilter, + share_topic_filter => ShareTopicFilter, agent_metadata => AgentMetadata, agent => Agent }). --define(agent_connect_leader_match(Agent, AgentMetadata, TopicFilter), #{ +-define(agent_connect_leader_match(Agent, AgentMetadata, ShareTopicFilter), #{ type := ?agent_connect_leader_msg, - topic_filter := TopicFilter, + share_topic_filter := ShareTopicFilter, agent_metadata := AgentMetadata, agent := Agent }). @@ -81,77 +81,77 @@ %% leader messages, sent from the leader to the agent %% Agent may have several shared subscriptions, so may talk to several leaders -%% `group` field is used to identify the leader. +%% `group_id` field is used to identify the leader. -define(leader_lease_streams_msg, leader_lease_streams). -define(leader_renew_stream_lease_msg, leader_renew_stream_lease). --define(leader_lease_streams(Group, Leader, Streams, Version), #{ +-define(leader_lease_streams(GrouId, Leader, Streams, Version), #{ type => ?leader_lease_streams_msg, streams => Streams, version => Version, leader => Leader, - group => Group + group_id => GrouId }). --define(leader_lease_streams_match(Group, Leader, Streams, Version), #{ +-define(leader_lease_streams_match(GroupId, Leader, Streams, Version), #{ type := ?leader_lease_streams_msg, streams := Streams, version := Version, leader := Leader, - group := Group + group_id := GroupId }). --define(leader_renew_stream_lease(Group, Version), #{ +-define(leader_renew_stream_lease(GroupId, Version), #{ type => ?leader_renew_stream_lease_msg, version => Version, - group => Group + group_id => GroupId }). --define(leader_renew_stream_lease_match(Group, Version), #{ +-define(leader_renew_stream_lease_match(GroupId, Version), #{ type := ?leader_renew_stream_lease_msg, version := Version, - group := Group + group_id := GroupId }). --define(leader_renew_stream_lease(Group, VersionOld, VersionNew), #{ +-define(leader_renew_stream_lease(GroupId, VersionOld, VersionNew), #{ type => ?leader_renew_stream_lease_msg, version_old => VersionOld, version_new => VersionNew, - group => Group + group_id => GroupId }). --define(leader_renew_stream_lease_match(Group, VersionOld, VersionNew), #{ +-define(leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew), #{ type := ?leader_renew_stream_lease_msg, version_old := VersionOld, version_new := VersionNew, - group := Group + group_id := GroupId }). --define(leader_update_streams(Group, VersionOld, VersionNew, StreamsNew), #{ +-define(leader_update_streams(GroupId, VersionOld, VersionNew, StreamsNew), #{ type => leader_update_streams, version_old => VersionOld, version_new => VersionNew, streams_new => StreamsNew, - group => Group + group_id => GroupId }). --define(leader_update_streams_match(Group, VersionOld, VersionNew, StreamsNew), #{ +-define(leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew), #{ type := leader_update_streams, version_old := VersionOld, version_new := VersionNew, streams_new := StreamsNew, - group := Group + group_id := GroupId }). --define(leader_invalidate(Group), #{ +-define(leader_invalidate(GroupId), #{ type => leader_invalidate, - group => Group + group_id => GroupId }). --define(leader_invalidate_match(Group), #{ +-define(leader_invalidate_match(GroupId), #{ type := leader_invalidate, - group := Group + group_id := GroupId }). %% Helpers diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl index bc732249a..eae212458 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl @@ -26,7 +26,7 @@ -record(lookup_leader, { agent :: emqx_ds_shared_sub_proto:agent(), agent_metadata :: emqx_ds_shared_sub_proto:agent_metadata(), - topic_filter :: emqx_persistent_session_ds:share_topic_filter() + share_topic_filter :: emqx_persistent_session_ds:share_topic_filter() }). -define(gproc_id(ID), {n, l, ID}). @@ -40,9 +40,9 @@ emqx_ds_shared_sub_proto:agent_metadata(), emqx_persistent_session_ds:share_topic_filter() ) -> ok. -lookup_leader(Agent, AgentMetadata, TopicFilter) -> +lookup_leader(Agent, AgentMetadata, ShareTopicFilter) -> gen_server:cast(?MODULE, #lookup_leader{ - agent = Agent, agent_metadata = AgentMetadata, topic_filter = TopicFilter + agent = Agent, agent_metadata = AgentMetadata, share_topic_filter = ShareTopicFilter }). %%-------------------------------------------------------------------- @@ -72,9 +72,14 @@ handle_call(_Request, _From, State) -> {reply, {error, unknown_request}, State}. handle_cast( - #lookup_leader{agent = Agent, agent_metadata = AgentMetadata, topic_filter = TopicFilter}, State + #lookup_leader{ + agent = Agent, + agent_metadata = AgentMetadata, + share_topic_filter = ShareTopicFilter + }, + State ) -> - State1 = do_lookup_leader(Agent, AgentMetadata, TopicFilter, State), + State1 = do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State), {noreply, State1}. handle_info(_Info, State) -> @@ -87,15 +92,15 @@ terminate(_Reason, _State) -> %% Internal functions %%-------------------------------------------------------------------- -do_lookup_leader(Agent, AgentMetadata, TopicFilter, State) -> +do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State) -> %% TODO https://emqx.atlassian.net/browse/EMQX-12309 %% Cluster-wide unique leader election should be implemented - Id = emqx_ds_shared_sub_leader:id(TopicFilter), + Id = emqx_ds_shared_sub_leader:id(ShareTopicFilter), LeaderPid = case gproc:where(?gproc_id(Id)) of undefined -> {ok, Pid} = emqx_ds_shared_sub_leader_sup:start_leader(#{ - topic_filter => TopicFilter + share_topic_filter => ShareTopicFilter }), {ok, NewLeaderPid} = emqx_ds_shared_sub_leader:register( Pid, @@ -111,10 +116,10 @@ do_lookup_leader(Agent, AgentMetadata, TopicFilter, State) -> ?SLOG(info, #{ msg => lookup_leader, agent => Agent, - topic_filter => TopicFilter, + share_topic_filter => ShareTopicFilter, leader => LeaderPid }), ok = emqx_ds_shared_sub_proto:agent_connect_leader( - LeaderPid, Agent, AgentMetadata, TopicFilter + LeaderPid, Agent, AgentMetadata, ShareTopicFilter ), State. diff --git a/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl index 52f64937d..17ceb4876 100644 --- a/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl +++ b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl @@ -33,9 +33,9 @@ introduced_in() -> emqx_ds_shared_sub_proto:agent_metadata(), emqx_persistent_session_ds:share_topic_filter() ) -> ok. -agent_connect_leader(Node, ToLeader, FromAgent, AgentMetadata, TopicFilter) -> +agent_connect_leader(Node, ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) -> erpc:cast(Node, emqx_ds_shared_sub_proto, agent_connect_leader, [ - ToLeader, FromAgent, AgentMetadata, TopicFilter + ToLeader, FromAgent, AgentMetadata, ShareTopicFilter ]). -spec agent_update_stream_states(