feat(queue): clarify naming; identify shared subs by full topic filter

This commit is contained in:
Ilya Averyanov 2024-07-10 19:02:15 +03:00
parent 7e23f8d19f
commit f213569460
9 changed files with 303 additions and 244 deletions

View File

@ -119,8 +119,8 @@ new(Opts) ->
{ok, emqx_persistent_session_ds_state:t(), t()}.
open(S, Opts) ->
SharedSubscriptions = fold_shared_subs(
fun(#share{} = TopicFilter, Sub, Acc) ->
[{TopicFilter, to_agent_subscription(S, Sub)} | Acc]
fun(#share{} = ShareTopicFilter, Sub, Acc) ->
[{ShareTopicFilter, to_agent_subscription(S, Sub)} | Acc]
end,
[],
S
@ -139,33 +139,33 @@ open(S, Opts) ->
emqx_types:subopts(),
emqx_persistent_session_ds:session()
) -> {ok, emqx_persistent_session_ds_state:t(), t()} | {error, emqx_types:reason_code()}.
on_subscribe(TopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(TopicFilter, S),
on_subscribe(Subscription, TopicFilter, SubOpts, Session).
on_subscribe(#share{} = ShareTopicFilter, SubOpts, #{s := S} = Session) ->
Subscription = emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S),
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session).
%%--------------------------------------------------------------------
%% on_subscribe internal functions
on_subscribe(undefined, TopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
on_subscribe(undefined, ShareTopicFilter, SubOpts, #{props := Props, s := S} = Session) ->
#{max_subscriptions := MaxSubscriptions} = Props,
case emqx_persistent_session_ds_state:n_subscriptions(S) < MaxSubscriptions of
true ->
create_new_subscription(TopicFilter, SubOpts, Session);
create_new_subscription(ShareTopicFilter, SubOpts, Session);
false ->
{error, ?RC_QUOTA_EXCEEDED}
end;
on_subscribe(Subscription, TopicFilter, SubOpts, Session) ->
update_subscription(Subscription, TopicFilter, SubOpts, Session).
on_subscribe(Subscription, ShareTopicFilter, SubOpts, Session) ->
update_subscription(Subscription, ShareTopicFilter, SubOpts, Session).
-dialyzer({nowarn_function, create_new_subscription/3}).
create_new_subscription(TopicFilter, SubOpts, #{
create_new_subscription(ShareTopicFilter, SubOpts, #{
s := S0,
shared_sub_s := #{agent := Agent} = SharedSubS0,
props := Props
}) ->
case
emqx_persistent_session_ds_shared_subs_agent:can_subscribe(
Agent, TopicFilter, SubOpts
Agent, ShareTopicFilter, SubOpts
)
of
ok ->
@ -184,17 +184,19 @@ create_new_subscription(TopicFilter, SubOpts, #{
start_time => now_ms()
},
S = emqx_persistent_session_ds_state:put_subscription(
TopicFilter, Subscription, S3
ShareTopicFilter, Subscription, S3
),
SharedSubS = schedule_subscribe(SharedSubS0, TopicFilter, SubOpts),
SharedSubS = schedule_subscribe(SharedSubS0, ShareTopicFilter, SubOpts),
{ok, S, SharedSubS};
{error, _} = Error ->
Error
end.
update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilter, SubOpts, #{
update_subscription(
#{current_state := SStateId0, id := SubId} = Sub0, ShareTopicFilter, SubOpts, #{
s := S0, shared_sub_s := SharedSubS, props := Props
}) ->
}
) ->
#{upgrade_qos := UpgradeQoS} = Props,
SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
@ -208,31 +210,33 @@ update_subscription(#{current_state := SStateId0, id := SubId} = Sub0, TopicFilt
SStateId, SState, S1
),
Sub = Sub0#{current_state => SStateId},
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
S = emqx_persistent_session_ds_state:put_subscription(ShareTopicFilter, Sub, S2),
{ok, S, SharedSubS}
end.
-dialyzer({nowarn_function, schedule_subscribe/3}).
schedule_subscribe(
#{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0, TopicFilter, SubOpts
#{agent := Agent0, scheduled_actions := ScheduledActions0} = SharedSubS0,
ShareTopicFilter,
SubOpts
) ->
case ScheduledActions0 of
#{TopicFilter := ScheduledAction} ->
#{ShareTopicFilter := ScheduledAction} ->
ScheduledActions1 = ScheduledActions0#{
TopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
},
?tp(warning, shared_subs_schedule_subscribe_override, #{
topic_filter => TopicFilter,
share_topic_filter => ShareTopicFilter,
new_type => {?schedule_subscribe, SubOpts},
old_action => format_schedule_action(ScheduledAction)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1};
_ ->
?tp(warning, shared_subs_schedule_subscribe_new, #{
topic_filter => TopicFilter, subopts => SubOpts
share_topic_filter => ShareTopicFilter, subopts => SubOpts
}),
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent0, TopicFilter, SubOpts
Agent0, ShareTopicFilter, SubOpts
),
SharedSubS0#{agent => Agent1}
end.
@ -242,22 +246,22 @@ schedule_subscribe(
-spec on_unsubscribe(
emqx_persistent_session_ds:id(),
emqx_persistent_session_ds:topic_filter(),
share_topic_filter(),
emqx_persistent_session_ds_state:t(),
t()
) ->
{ok, emqx_persistent_session_ds_state:t(), t(), emqx_persistent_session_ds:subscription()}
| {error, emqx_types:reason_code()}.
on_unsubscribe(SessionId, TopicFilter, S0, SharedSubS0) ->
case lookup(TopicFilter, S0) of
on_unsubscribe(SessionId, ShareTopicFilter, S0, SharedSubS0) ->
case lookup(ShareTopicFilter, S0) of
undefined ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED};
#{id := SubId} = Subscription ->
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, topic_filter => TopicFilter
session_id => SessionId, share_topic_filter => ShareTopicFilter
}),
S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0),
SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, TopicFilter),
S = emqx_persistent_session_ds_state:del_subscription(ShareTopicFilter, S0),
SharedSubS = schedule_unsubscribe(S, SharedSubS0, SubId, ShareTopicFilter),
{ok, S, SharedSubS, Subscription}
end.
@ -265,16 +269,16 @@ on_unsubscribe(SessionId, TopicFilter, S0, SharedSubS0) ->
%% on_unsubscribe internal functions
schedule_unsubscribe(
S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, TopicFilter
S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, ShareTopicFilter
) ->
case ScheduledActions0 of
#{TopicFilter := ScheduledAction0} ->
#{ShareTopicFilter := ScheduledAction0} ->
ScheduledAction1 = ScheduledAction0#{type => ?schedule_unsubscribe},
ScheduledActions1 = ScheduledActions0#{
TopicFilter => ScheduledAction1
ShareTopicFilter => ScheduledAction1
},
?tp(warning, shared_subs_schedule_unsubscribe_override, #{
topic_filter => TopicFilter,
share_topic_filter => ShareTopicFilter,
new_type => ?schedule_unsubscribe,
old_action => format_schedule_action(ScheduledAction0)
}),
@ -282,14 +286,14 @@ schedule_unsubscribe(
_ ->
StreamKeys = stream_keys_by_sub_id(S, UnsubscridedSubId),
ScheduledActions1 = ScheduledActions0#{
TopicFilter => #{
ShareTopicFilter => #{
type => ?schedule_unsubscribe,
stream_keys_to_wait => StreamKeys,
progresses => []
}
},
?tp(warning, shared_subs_schedule_unsubscribe_new, #{
topic_filter => TopicFilter,
share_topic_filter => ShareTopicFilter,
stream_keys => format_stream_keys(StreamKeys)
}),
SharedSubS0#{scheduled_actions := ScheduledActions1}
@ -322,13 +326,13 @@ renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = Sh
%%--------------------------------------------------------------------
%% renew_streams internal functions
accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) ->
accept_stream(#{share_topic_filter := ShareTopicFilter} = Event, S, ScheduledActions) ->
%% If we have a pending action (subscribe or unsubscribe) for this topic filter,
%% we should not accept a stream and start replaying it. We won't use it anyway:
%% * if subscribe is pending, we will reset agent obtain a new lease
%% * if unsubscribe is pending, we will drop connection
case ScheduledActions of
#{TopicFilter := _Action} ->
#{ShareTopicFilter := _Action} ->
S;
_ ->
accept_stream(Event, S)
@ -336,13 +340,13 @@ accept_stream(#{topic_filter := TopicFilter} = Event, S, ScheduledActions) ->
accept_stream(
#{
topic_filter := TopicFilter,
share_topic_filter := ShareTopicFilter,
stream := Stream,
progress := #{iterator := Iterator} = _Progress
} = _Event,
S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% We unsubscribed
S0;
@ -375,9 +379,9 @@ accept_stream(
end.
revoke_stream(
#{topic_filter := TopicFilter, stream := Stream}, S0
#{share_topic_filter := ShareTopicFilter, stream := Stream}, S0
) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S0) of
undefined ->
%% This should not happen.
%% Agent should have received unsubscribe callback
@ -427,12 +431,7 @@ all_stream_progresses(S, _Agent, NeedUnacked) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
fold_shared_stream_states(
fun(
#share{group = Group},
Stream,
SRS,
ProgressesAcc0
) ->
fun(ShareTopicFilter, Stream, SRS, ProgressesAcc0) ->
case
is_stream_started(CommQos1, CommQos2, SRS) and
(NeedUnacked or is_stream_fully_acked(CommQos1, CommQos2, SRS))
@ -440,7 +439,7 @@ all_stream_progresses(S, _Agent, NeedUnacked) ->
true ->
StreamProgress = stream_progress(CommQos1, CommQos2, Stream, SRS),
maps:update_with(
Group,
ShareTopicFilter,
fun(Progresses) -> [StreamProgress | Progresses] end,
[StreamProgress],
ProgressesAcc0
@ -455,12 +454,12 @@ all_stream_progresses(S, _Agent, NeedUnacked) ->
run_scheduled_actions(S, Agent, ScheduledActions) ->
maps:fold(
fun(TopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) ->
case run_scheduled_action(S, AgentAcc0, TopicFilter, Action0) of
fun(ShareTopicFilter, Action0, {AgentAcc0, ScheduledActionsAcc}) ->
case run_scheduled_action(S, AgentAcc0, ShareTopicFilter, Action0) of
{ok, AgentAcc1} ->
{AgentAcc1, maps:remove(TopicFilter, ScheduledActionsAcc)};
{AgentAcc1, maps:remove(ShareTopicFilter, ScheduledActionsAcc)};
{continue, Action1} ->
{AgentAcc0, ScheduledActionsAcc#{TopicFilter => Action1}}
{AgentAcc0, ScheduledActionsAcc#{ShareTopicFilter => Action1}}
end
end,
{Agent, ScheduledActions},
@ -470,7 +469,7 @@ run_scheduled_actions(S, Agent, ScheduledActions) ->
run_scheduled_action(
S,
Agent0,
#share{group = Group} = TopicFilter,
ShareTopicFilter,
#{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action
) ->
StreamKeysToWait1 = filter_unfinished_streams(S, StreamKeysToWait0),
@ -478,31 +477,31 @@ run_scheduled_action(
case StreamKeysToWait1 of
[] ->
?tp(warning, shared_subs_schedule_action_complete, #{
topic_filter => TopicFilter,
share_topic_filter => ShareTopicFilter,
progresses => format_stream_progresses(Progresses1),
type => Type
}),
%% Regular progress won't se unsubscribed streams, so we need to
%% send the progress explicitly.
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
Agent0, #{Group => Progresses1}
Agent0, #{ShareTopicFilter => Progresses1}
),
case Type of
{?schedule_subscribe, SubOpts} ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
Agent1, TopicFilter, SubOpts
Agent1, ShareTopicFilter, SubOpts
)};
?schedule_unsubscribe ->
{ok,
emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
Agent1, TopicFilter, Progresses1
Agent1, ShareTopicFilter, Progresses1
)}
end;
_ ->
Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
?tp(warning, shared_subs_schedule_action_continue, #{
topic_filter => TopicFilter,
share_topic_filter => ShareTopicFilter,
new_action => format_schedule_action(Action1)
}),
{continue, Action1}
@ -551,8 +550,8 @@ on_disconnect(S0, #{agent := Agent0} = SharedSubS0) ->
revoke_all_streams(S0) ->
fold_shared_stream_states(
fun(TopicFilter, Stream, _SRS, S) ->
revoke_stream(#{topic_filter => TopicFilter, stream => Stream}, S)
fun(ShareTopicFilter, Stream, _SRS, S) ->
revoke_stream(#{share_topic_filter => ShareTopicFilter, stream => Stream}, S)
end,
S0,
S0
@ -580,8 +579,8 @@ to_map(_S, _SharedSubS) ->
%% Generic helpers
%%--------------------------------------------------------------------
lookup(TopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
lookup(ShareTopicFilter, S) ->
case emqx_persistent_session_ds_state:get_subscription(ShareTopicFilter, S) of
Sub = #{current_state := SStateId} ->
case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
#{subopts := SubOpts} ->
@ -640,7 +639,7 @@ stream_progress(
fold_shared_subs(Fun, Acc, S) ->
emqx_persistent_session_ds_state:fold_subscriptions(
fun
(#share{} = TopicFilter, Sub, Acc0) -> Fun(TopicFilter, Sub, Acc0);
(#share{} = ShareTopicFilter, Sub, Acc0) -> Fun(ShareTopicFilter, Sub, Acc0);
(_, _Sub, Acc0) -> Acc0
end,
Acc,
@ -650,10 +649,10 @@ fold_shared_subs(Fun, Acc, S) ->
fold_shared_stream_states(Fun, Acc, S) ->
%% TODO
%% Optimize or cache
TopicFilters = fold_shared_subs(
ShareTopicFilters = fold_shared_subs(
fun
(#share{} = TopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => TopicFilter};
(#share{} = ShareTopicFilter, #{id := Id} = _Sub, Acc0) ->
Acc0#{Id => ShareTopicFilter};
(_, _, Acc0) ->
Acc0
end,
@ -662,9 +661,9 @@ fold_shared_stream_states(Fun, Acc, S) ->
),
emqx_persistent_session_ds_state:fold_streams(
fun({SubId, Stream}, SRS, Acc0) ->
case TopicFilters of
#{SubId := TopicFilter} ->
Fun(TopicFilter, Stream, SRS, Acc0);
case ShareTopicFilters of
#{SubId := ShareTopicFilter} ->
Fun(ShareTopicFilter, Stream, SRS, Acc0);
_ ->
Acc0
end

View File

@ -15,7 +15,7 @@
}.
-type t() :: term().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type opts() :: #{
session_id := session_id()
@ -28,21 +28,21 @@
-type stream_lease() :: #{
type => lease,
%% Used as "external" subscription_id
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator()
}.
-type stream_revoke() :: #{
type => revoke,
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream()
}.
-type stream_lease_event() :: stream_lease() | stream_revoke().
-type stream_progress() :: #{
topic_filter := topic_filter(),
share_topic_filter := share_topic_filter(),
stream := emqx_ds:stream(),
iterator := emqx_ds:iterator(),
use_finished := boolean()
@ -80,13 +80,13 @@
%%--------------------------------------------------------------------
-callback new(opts()) -> t().
-callback open([{topic_filter(), subscription()}], opts()) -> t().
-callback can_subscribe(t(), topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
-callback on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> t().
-callback on_unsubscribe(t(), topic_filter(), [stream_progress()]) -> t().
-callback on_disconnect(t(), #{emqx_types:group() => [stream_progress()]}) -> t().
-callback open([{share_topic_filter(), subscription()}], opts()) -> t().
-callback can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
-callback on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
-callback on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
-callback on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
-callback renew_streams(t()) -> {[stream_lease_event()], t()}.
-callback on_stream_progress(t(), #{emqx_types:group() => [stream_progress()]}) -> t().
-callback on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
-callback on_info(t(), term()) -> t().
%%--------------------------------------------------------------------
@ -97,23 +97,23 @@
new(Opts) ->
?shared_subs_agent:new(Opts).
-spec open([{topic_filter(), subscription()}], opts()) -> t().
-spec open([{share_topic_filter(), subscription()}], opts()) -> t().
open(Topics, Opts) ->
?shared_subs_agent:open(Topics, Opts).
-spec can_subscribe(t(), topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
can_subscribe(Agent, TopicFilter, SubOpts) ->
?shared_subs_agent:can_subscribe(Agent, TopicFilter, SubOpts).
-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok | {error, term()}.
can_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:can_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_subscribe(t(), topic_filter(), emqx_types:subopts()) -> t().
on_subscribe(Agent, TopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, TopicFilter, SubOpts).
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
on_subscribe(Agent, ShareTopicFilter, SubOpts) ->
?shared_subs_agent:on_subscribe(Agent, ShareTopicFilter, SubOpts).
-spec on_unsubscribe(t(), topic_filter(), [stream_progress()]) -> t().
on_unsubscribe(Agent, TopicFilter, StreamProgresses) ->
?shared_subs_agent:on_unsubscribe(Agent, TopicFilter, StreamProgresses).
-spec on_unsubscribe(t(), share_topic_filter(), [stream_progress()]) -> t().
on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses) ->
?shared_subs_agent:on_unsubscribe(Agent, ShareTopicFilter, StreamProgresses).
-spec on_disconnect(t(), #{emqx_types:group() => [stream_progress()]}) -> t().
-spec on_disconnect(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_disconnect(Agent, StreamProgresses) ->
?shared_subs_agent:on_disconnect(Agent, StreamProgresses).
@ -121,7 +121,7 @@ on_disconnect(Agent, StreamProgresses) ->
renew_streams(Agent) ->
?shared_subs_agent:renew_streams(Agent).
-spec on_stream_progress(t(), #{emqx_types:group() => [stream_progress()]}) -> t().
-spec on_stream_progress(t(), #{share_topic_filter() => [stream_progress()]}) -> t().
on_stream_progress(Agent, StreamProgress) ->
?shared_subs_agent:on_stream_progress(Agent, StreamProgress).

View File

@ -26,18 +26,66 @@
-behaviour(emqx_persistent_session_ds_shared_subs_agent).
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type group_id() :: share_topic_filter().
-type progress() :: emqx_persistent_session_ds_shared_subs:progress().
-type external_lease_event() ::
#{
type => lease,
stream => emqx_ds:stream(),
progress => progress(),
share_topic_filter => emqx_persistent_session_ds:share_topic_filter()
}
| #{
type => revoke,
stream => emqx_ds:stream(),
share_topic_filter => emqx_persistent_session_ds:share_topic_filter()
}.
-type options() :: #{
session_id := emqx_persistent_session_ds:id()
}.
-type t() :: #{
groups := #{
group_id() => emqx_ds_shared_sub_group_sm:t()
},
session_id := emqx_persistent_session_ds:id()
}.
%% Techinically, group_id and share_topic_filter are the same.
%% However, we speak in the terms of share_topic_filter in the API,
%% which is known to the shared subscription handler of persistent session.
%%
%% And we speak in the terms of group_id internally:
%% * we keep group_sm's in the state by group_id
%% * we use group_id to address group_sm's, e.g. when sending messages to them
%% from leader or from themselves.
-define(group_id(ShareTopicFilter), ShareTopicFilter).
-define(share_topic_filter(GroupId), GroupId).
-record(message_to_group_sm, {
group :: emqx_types:group(),
group_id :: group_id(),
message :: term()
}).
-export_type([
t/0,
group_id/0,
options/0,
external_lease_event/0
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec new(options()) -> t().
new(Opts) ->
init_state(Opts).
-spec open([{share_topic_filter(), emqx_types:subopts()}], options()) -> t().
open(TopicSubscriptions, Opts) ->
State0 = init_state(Opts),
State1 = lists:foldl(
@ -45,32 +93,41 @@ open(TopicSubscriptions, Opts) ->
?tp(warning, ds_agent_open_subscription, #{
topic_filter => ShareTopicFilter
}),
add_group_subscription(State, ShareTopicFilter)
add_shared_subscription(State, ShareTopicFilter)
end,
State0,
TopicSubscriptions
),
State1.
can_subscribe(_State, _TopicFilter, _SubOpts) ->
-spec can_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> ok.
can_subscribe(_State, _ShareTopicFilter, _SubOpts) ->
ok.
on_subscribe(State0, TopicFilter, _SubOpts) ->
-spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t().
on_subscribe(State0, ShareTopicFilter, _SubOpts) ->
?tp(warning, ds_agent_on_subscribe, #{
topic_filter => TopicFilter
share_topic_filter => ShareTopicFilter
}),
add_group_subscription(State0, TopicFilter).
add_shared_subscription(State0, ShareTopicFilter).
on_unsubscribe(State, TopicFilter, GroupProgress) ->
delete_group_subscription(State, TopicFilter, GroupProgress).
-spec on_unsubscribe(t(), share_topic_filter(), [
emqx_persistent_session_ds_shared_subs:agent_stream_progress()
]) -> t().
on_unsubscribe(State, ShareTopicFilter, GroupProgress) ->
delete_shared_subscription(State, ShareTopicFilter, GroupProgress).
-spec renew_streams(t()) -> {[emqx_persistent_session_ds_shared_subs:agent_stream_event()], t()}.
renew_streams(#{} = State) ->
fetch_stream_events(State).
-spec on_stream_progress(t(), #{
share_topic_filter() => [emqx_persistent_session_ds_shared_subs:agent_stream_progress()]
}) -> t().
on_stream_progress(State, StreamProgresses) ->
maps:fold(
fun(Group, GroupProgresses, StateAcc) ->
with_group_sm(StateAcc, Group, fun(GSM) ->
fun(ShareTopicFilter, GroupProgresses, StateAcc) ->
with_group_sm(StateAcc, ?group_id(ShareTopicFilter), fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_stream_progress(GSM, GroupProgresses)
end)
end,
@ -78,72 +135,74 @@ on_stream_progress(State, StreamProgresses) ->
StreamProgresses
).
-spec on_disconnect(t(), [emqx_persistent_session_ds_shared_subs:agent_stream_progress()]) -> t().
on_disconnect(#{groups := Groups0} = State, StreamProgresses) ->
ok = maps:foreach(
fun(Group, GroupSM0) ->
GroupProgresses = maps:get(Group, StreamProgresses, []),
fun(GroupId, GroupSM0) ->
GroupProgresses = maps:get(?share_topic_filter(GroupId), StreamProgresses, []),
emqx_ds_shared_sub_group_sm:handle_disconnect(GroupSM0, GroupProgresses)
end,
Groups0
),
State#{groups => #{}}.
on_info(State, ?leader_lease_streams_match(Group, Leader, StreamProgresses, Version)) ->
-spec on_info(t(), term()) -> t().
on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Version)) ->
?SLOG(info, #{
msg => leader_lease_streams,
group => Group,
group_id => GroupId,
streams => StreamProgresses,
version => Version,
leader => Leader
}),
with_group_sm(State, Group, fun(GSM) ->
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_lease_streams(
GSM, Leader, StreamProgresses, Version
)
end);
on_info(State, ?leader_renew_stream_lease_match(Group, Version)) ->
on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) ->
?SLOG(info, #{
msg => leader_renew_stream_lease,
group => Group,
group_id => GroupId,
version => Version
}),
with_group_sm(State, Group, fun(GSM) ->
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version)
end);
on_info(State, ?leader_renew_stream_lease_match(Group, VersionOld, VersionNew)) ->
on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)) ->
?SLOG(info, #{
msg => leader_renew_stream_lease,
group => Group,
group_id => GroupId,
version_old => VersionOld,
version_new => VersionNew
}),
with_group_sm(State, Group, fun(GSM) ->
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew)
end);
on_info(State, ?leader_update_streams_match(Group, VersionOld, VersionNew, StreamsNew)) ->
on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew)) ->
?SLOG(info, #{
msg => leader_update_streams,
group => Group,
group_id => GroupId,
version_old => VersionOld,
version_new => VersionNew,
streams_new => StreamsNew
}),
with_group_sm(State, Group, fun(GSM) ->
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_update_streams(
GSM, VersionOld, VersionNew, StreamsNew
)
end);
on_info(State, ?leader_invalidate_match(Group)) ->
on_info(State, ?leader_invalidate_match(GroupId)) ->
?SLOG(info, #{
msg => leader_invalidate,
group => Group
group_id => GroupId
}),
with_group_sm(State, Group, fun(GSM) ->
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_leader_invalidate(GSM)
end);
%% Generic messages sent by group_sm's to themselves (timeouts).
on_info(State, #message_to_group_sm{group = Group, message = Message}) ->
with_group_sm(State, Group, fun(GSM) ->
on_info(State, #message_to_group_sm{group_id = GroupId, message = Message}) ->
with_group_sm(State, GroupId, fun(GSM) ->
emqx_ds_shared_sub_group_sm:handle_info(GSM, Message)
end).
@ -158,29 +217,30 @@ init_state(Opts) ->
groups => #{}
}.
delete_group_subscription(State, #share{group = Group}, GroupProgress) ->
delete_shared_subscription(State, ShareTopicFilter, GroupProgress) ->
GroupId = ?group_id(ShareTopicFilter),
case State of
#{groups := #{Group := GSM} = Groups} ->
#{groups := #{GroupId := GSM} = Groups} ->
_ = emqx_ds_shared_sub_group_sm:handle_disconnect(GSM, GroupProgress),
State#{groups => maps:remove(Group, Groups)};
State#{groups => maps:remove(GroupId, Groups)};
_ ->
State
end.
add_group_subscription(
add_shared_subscription(
#{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter
) ->
?SLOG(info, #{
msg => agent_add_group_subscription,
topic_filter => ShareTopicFilter
msg => agent_add_shared_subscription,
share_topic_filter => ShareTopicFilter
}),
#share{group = Group} = ShareTopicFilter,
GroupId = ?group_id(ShareTopicFilter),
Groups1 = Groups0#{
Group => emqx_ds_shared_sub_group_sm:new(#{
GroupId => emqx_ds_shared_sub_group_sm:new(#{
session_id => SessionId,
topic_filter => ShareTopicFilter,
share_topic_filter => ShareTopicFilter,
agent => this_agent(SessionId),
send_after => send_to_subscription_after(Group)
send_after => send_to_subscription_after(GroupId)
})
},
State1 = State0#{groups => Groups1},
@ -188,9 +248,9 @@ add_group_subscription(
fetch_stream_events(#{groups := Groups0} = State0) ->
{Groups1, Events} = maps:fold(
fun(Group, GroupSM0, {GroupsAcc, EventsAcc}) ->
fun(GroupId, GroupSM0, {GroupsAcc, EventsAcc}) ->
{GroupSM1, Events} = emqx_ds_shared_sub_group_sm:fetch_stream_events(GroupSM0),
{GroupsAcc#{Group => GroupSM1}, [Events | EventsAcc]}
{GroupsAcc#{GroupId => GroupSM1}, [Events | EventsAcc]}
end,
{#{}, []},
Groups0
@ -201,20 +261,20 @@ fetch_stream_events(#{groups := Groups0} = State0) ->
this_agent(Id) ->
emqx_ds_shared_sub_proto:agent(Id, self()).
send_to_subscription_after(Group) ->
send_to_subscription_after(GroupId) ->
fun(Time, Msg) ->
emqx_persistent_session_ds_shared_subs_agent:send_after(
Time,
self(),
#message_to_group_sm{group = Group, message = Msg}
#message_to_group_sm{group_id = GroupId, message = Msg}
)
end.
with_group_sm(State, Group, Fun) ->
with_group_sm(State, GroupId, Fun) ->
case State of
#{groups := #{Group := GSM0} = Groups} ->
#{groups := #{GroupId := GSM0} = Groups} ->
#{} = GSM1 = Fun(GSM0),
State#{groups => Groups#{Group => GSM1}};
State#{groups => Groups#{GroupId => GSM1}};
_ ->
%% TODO
%% Error?

View File

@ -41,7 +41,7 @@
-type options() :: #{
session_id := emqx_persistent_session_ds:id(),
agent := emqx_ds_shared_sub_proto:agent(),
topic_filter := emqx_persistent_session_ds:share_topic_filter(),
share_topic_filter := emqx_persistent_session_ds:share_topic_filter(),
send_after := fun((non_neg_integer(), term()) -> reference())
}.
@ -58,19 +58,6 @@
stream => emqx_ds:stream()
}.
-type external_lease_event() ::
#{
type => lease,
stream => emqx_ds:stream(),
progress => progress(),
topic_filter => emqx_persistent_session_ds:share_topic_filter()
}
| #{
type => revoke,
stream => emqx_ds:stream(),
topic_filter => emqx_persistent_session_ds:share_topic_filter()
}.
%% GroupSM States
-define(connecting, connecting).
@ -111,7 +98,7 @@
-type timer() :: #timer{}.
-type group_sm() :: #{
topic_filter => emqx_persistent_session_ds:share_topic_filter(),
share_topic_filter => emqx_persistent_session_ds:share_topic_filter(),
agent => emqx_ds_shared_sub_proto:agent(),
send_after => fun((non_neg_integer(), term()) -> reference()),
stream_lease_events => list(stream_lease_event()),
@ -129,7 +116,7 @@
new(#{
session_id := SessionId,
agent := Agent,
topic_filter := ShareTopicFilter,
share_topic_filter := ShareTopicFilter,
send_after := SendAfter
}) ->
?SLOG(
@ -137,32 +124,33 @@ new(#{
#{
msg => group_sm_new,
agent => Agent,
topic_filter => ShareTopicFilter
share_topic_filter => ShareTopicFilter
}
),
GSM0 = #{
id => SessionId,
topic_filter => ShareTopicFilter,
share_topic_filter => ShareTopicFilter,
agent => Agent,
send_after => SendAfter
},
?tp(warning, group_sm_new, #{
agent => Agent,
topic_filter => ShareTopicFilter
share_topic_filter => ShareTopicFilter
}),
transition(GSM0, ?connecting, #{}).
-spec fetch_stream_events(group_sm()) -> {group_sm(), list(external_lease_event())}.
-spec fetch_stream_events(group_sm()) ->
{group_sm(), [emqx_ds_shared_sub_agent:external_lease_event()]}.
fetch_stream_events(
#{
state := _State,
topic_filter := TopicFilter,
share_topic_filter := ShareTopicFilter,
stream_lease_events := Events0
} = GSM
) ->
Events1 = lists:map(
fun(Event) ->
Event#{topic_filter => TopicFilter}
Event#{share_topic_filter => ShareTopicFilter}
end,
Events0
),
@ -187,18 +175,21 @@ handle_disconnect(
%%-----------------------------------------------------------------------
%% Connecting state
handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) ->
handle_connecting(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) ->
?tp(warning, group_sm_enter_connecting, #{
agent => Agent,
topic_filter => ShareTopicFilter
share_topic_filter => ShareTopicFilter
}),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter),
ensure_state_timeout(GSM, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)).
handle_leader_lease_streams(
#{state := ?connecting, topic_filter := TopicFilter} = GSM0, Leader, StreamProgresses, Version
#{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM0,
Leader,
StreamProgresses,
Version
) ->
?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}),
?tp(debug, leader_lease_streams, #{share_topic_filter => ShareTopicFilter}),
Streams = progresses_to_map(StreamProgresses),
StreamLeaseEvents = progresses_to_lease_events(StreamProgresses),
transition(
@ -215,12 +206,12 @@ handle_leader_lease_streams(
handle_leader_lease_streams(GSM, _Leader, _StreamProgresses, _Version) ->
GSM.
handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) ->
handle_find_leader_timeout(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM0) ->
?tp(warning, group_sm_find_leader_timeout, #{
agent => Agent,
topic_filter => TopicFilter
share_topic_filter => ShareTopicFilter
}),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), TopicFilter),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), ShareTopicFilter),
GSM1 = ensure_state_timeout(
GSM0, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)
),
@ -238,8 +229,8 @@ handle_replaying(GSM0) ->
),
GSM2.
handle_renew_lease_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM) ->
?tp(warning, renew_lease_timeout, #{agent => Agent, topic_filter => TopicFilter}),
handle_renew_lease_timeout(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) ->
?tp(warning, renew_lease_timeout, #{agent => Agent, share_topic_filter => ShareTopicFilter}),
transition(GSM, ?connecting, #{}).
%%-----------------------------------------------------------------------
@ -429,10 +420,10 @@ handle_stream_progress(
handle_stream_progress(#{state := ?disconnected} = GSM, _StreamProgresses) ->
GSM.
handle_leader_invalidate(#{agent := Agent, topic_filter := TopicFilter} = GSM) ->
handle_leader_invalidate(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) ->
?tp(warning, shared_sub_group_sm_leader_invalidate, #{
agent => Agent,
topic_filter => TopicFilter
share_topic_filter => ShareTopicFilter
}),
transition(GSM, ?connecting, #{}).
@ -441,11 +432,11 @@ handle_leader_invalidate(#{agent := Agent, topic_filter := TopicFilter} = GSM) -
%%-----------------------------------------------------------------------
handle_state_timeout(
#{state := ?connecting, topic_filter := TopicFilter} = GSM,
#{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM,
find_leader_timeout,
_Message
) ->
?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}),
?tp(debug, find_leader_timeout, #{share_topic_filter => ShareTopicFilter}),
handle_find_leader_timeout(GSM);
handle_state_timeout(
#{state := ?replaying} = GSM,

View File

@ -27,8 +27,12 @@
terminate/3
]).
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type group_id() :: share_topic_filter().
-type options() :: #{
topic_filter := emqx_persistent_session_ds:share_topic_filter()
share_topic_filter := share_topic_filter()
}.
%% Agent states
@ -39,7 +43,7 @@
-define(updating, updating).
-type agent_state() :: #{
%% Our view of group sm's status
%% Our view of group_id sm's status
%% it lags the actual state
state := ?waiting_replaying | ?replaying | ?waiting_updating | ?updating,
prev_version := emqx_maybe:t(emqx_ds_shared_sub_proto:version()),
@ -62,7 +66,7 @@
%%
%% Persistent data
%%
group := emqx_types:group(),
group_id := group_id(),
topic := emqx_types:topic(),
%% For ds router, not an actual session_id
router_id := binary(),
@ -119,9 +123,9 @@ register(Pid, Fun) ->
%% Internal API
%%--------------------------------------------------------------------
child_spec(#{topic_filter := TopicFilter} = Options) ->
child_spec(#{share_topic_filter := ShareTopicFilter} = Options) ->
#{
id => id(TopicFilter),
id => id(ShareTopicFilter),
start => {?MODULE, start_link, [Options]},
restart => temporary,
shutdown => 5000,
@ -131,8 +135,8 @@ child_spec(#{topic_filter := TopicFilter} = Options) ->
start_link(Options) ->
gen_statem:start_link(?MODULE, [Options], []).
id(#share{group = Group} = _TopicFilter) ->
{?MODULE, Group}.
id(ShareTopicFilter) ->
{?MODULE, ShareTopicFilter}.
%%--------------------------------------------------------------------
%% gen_statem callbacks
@ -140,9 +144,9 @@ id(#share{group = Group} = _TopicFilter) ->
callback_mode() -> [handle_event_function, state_enter].
init([#{topic_filter := #share{group = Group, topic = Topic}} = _Options]) ->
init([#{share_topic_filter := #share{topic = Topic} = ShareTopicFilter} = _Options]) ->
Data = #{
group => Group,
group_id => ShareTopicFilter,
topic => Topic,
router_id => gen_router_id(),
start_time => now_ms() - ?START_TIME_THRESHOLD,
@ -463,14 +467,14 @@ select_streams_for_assign(Data0, _Agent, AssignCount) ->
%% Handle a newly connected agent
connect_agent(
#{group := Group, agents := Agents} = Data,
#{group_id := GroupId, agents := Agents} = Data,
Agent,
AgentMetadata
) ->
?SLOG(info, #{
msg => leader_agent_connected,
agent => Agent,
group => Group
group_id => GroupId
}),
case Agents of
#{Agent := AgentState} ->
@ -583,22 +587,22 @@ renew_leases(#{agents := AgentStates} = Data) ->
),
Data.
renew_lease(#{group := Group}, Agent, #{state := ?replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version);
renew_lease(#{group := Group}, Agent, #{state := ?waiting_replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version);
renew_lease(#{group := Group} = Data, Agent, #{
renew_lease(#{group_id := GroupId}, Agent, #{state := ?replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version);
renew_lease(#{group_id := GroupId}, Agent, #{state := ?waiting_replaying, version := Version}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version);
renew_lease(#{group_id := GroupId} = Data, Agent, #{
streams := Streams, state := ?waiting_updating, version := Version, prev_version := PrevVersion
}) ->
StreamProgresses = stream_progresses(Data, Streams),
ok = emqx_ds_shared_sub_proto:leader_update_streams(
Agent, Group, PrevVersion, Version, StreamProgresses
Agent, GroupId, PrevVersion, Version, StreamProgresses
),
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, PrevVersion, Version);
renew_lease(#{group := Group}, Agent, #{
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version);
renew_lease(#{group_id := GroupId}, Agent, #{
state := ?updating, version := Version, prev_version := PrevVersion
}) ->
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, PrevVersion, Version).
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, PrevVersion, Version).
%%--------------------------------------------------------------------
%% Handle stream progress updates from agent in replaying state
@ -802,7 +806,7 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers
%%--------------------------------------------------------------------
agent_transition_to_waiting_updating(
#{group := Group} = Data,
#{group_id := GroupId} = Data,
Agent,
#{state := OldState, version := Version, prev_version := undefined} = AgentState0,
Streams,
@ -825,19 +829,19 @@ agent_transition_to_waiting_updating(
AgentState2 = renew_no_replaying_deadline(AgentState1),
StreamProgresses = stream_progresses(Data, Streams),
ok = emqx_ds_shared_sub_proto:leader_update_streams(
Agent, Group, Version, NewVersion, StreamProgresses
Agent, GroupId, Version, NewVersion, StreamProgresses
),
AgentState2.
agent_transition_to_waiting_replaying(
#{group := Group} = _Data, Agent, #{state := OldState, version := Version} = AgentState0
#{group_id := GroupId} = _Data, Agent, #{state := OldState, version := Version} = AgentState0
) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{
agent => Agent,
old_state => OldState,
new_state => ?waiting_replaying
}),
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, Group, Version),
ok = emqx_ds_shared_sub_proto:leader_renew_stream_lease(Agent, GroupId, Version),
AgentState1 = AgentState0#{
state => ?waiting_replaying,
revoked_streams => []
@ -845,7 +849,7 @@ agent_transition_to_waiting_replaying(
renew_no_replaying_deadline(AgentState1).
agent_transition_to_initial_waiting_replaying(
#{group := Group} = Data, Agent, AgentMetadata, InitialStreams
#{group_id := GroupId} = Data, Agent, AgentMetadata, InitialStreams
) ->
?tp(warning, shared_sub_leader_agent_state_transition, #{
agent => Agent,
@ -856,7 +860,7 @@ agent_transition_to_initial_waiting_replaying(
StreamProgresses = stream_progresses(Data, InitialStreams),
Leader = this_leader(Data),
ok = emqx_ds_shared_sub_proto:leader_lease_streams(
Agent, Group, Leader, StreamProgresses, Version
Agent, GroupId, Leader, StreamProgresses, Version
),
AgentState = #{
metadata => AgentMetadata,
@ -1015,8 +1019,8 @@ drop_agent(#{agents := Agents} = Data0, Agent) ->
?tp(warning, shared_sub_leader_drop_agent, #{agent => Agent}),
Data1#{agents => maps:remove(Agent, Agents)}.
invalidate_agent(#{group := Group}, Agent) ->
ok = emqx_ds_shared_sub_proto:leader_invalidate(Agent, Group).
invalidate_agent(#{group_id := GroupId}, Agent) ->
ok = emqx_ds_shared_sub_proto:leader_invalidate(Agent, GroupId).
drop_invalidate_agent(Data0, Agent) ->
Data1 = drop_agent(Data0, Agent),

View File

@ -33,7 +33,7 @@
-type agent() :: ?agent(emqx_persistent_session_ds:id(), pid()).
-type leader() :: pid().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type share_topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type group() :: emqx_types:group().
-type version() :: non_neg_integer().
-type agent_metadata() :: #{
@ -63,8 +63,8 @@
%% agent -> leader messages
-spec agent_connect_leader(leader(), agent(), agent_metadata(), topic_filter()) -> ok.
agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) when
-spec agent_connect_leader(leader(), agent(), agent_metadata(), share_topic_filter()) -> ok.
agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) when
?is_local_leader(ToLeader)
->
?tp(warning, shared_sub_proto_msg, #{
@ -72,13 +72,13 @@ agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) when
to_leader => ToLeader,
from_agent => FromAgent,
agent_metadata => AgentMetadata,
topic_filter => TopicFilter
share_topic_filter => ShareTopicFilter
}),
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, TopicFilter)),
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)),
ok;
agent_connect_leader(ToLeader, FromAgent, AgentMetadata, TopicFilter) ->
agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
emqx_ds_shared_sub_proto_v1:agent_connect_leader(
?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, TopicFilter
?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, ShareTopicFilter
).
-spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok.

View File

@ -21,16 +21,16 @@
%% Agent messages sent to the leader.
%% Leader talks to many agents, `agent` field is used to identify the sender.
-define(agent_connect_leader(Agent, AgentMetadata, TopicFilter), #{
-define(agent_connect_leader(Agent, AgentMetadata, ShareTopicFilter), #{
type => ?agent_connect_leader_msg,
topic_filter => TopicFilter,
share_topic_filter => ShareTopicFilter,
agent_metadata => AgentMetadata,
agent => Agent
}).
-define(agent_connect_leader_match(Agent, AgentMetadata, TopicFilter), #{
-define(agent_connect_leader_match(Agent, AgentMetadata, ShareTopicFilter), #{
type := ?agent_connect_leader_msg,
topic_filter := TopicFilter,
share_topic_filter := ShareTopicFilter,
agent_metadata := AgentMetadata,
agent := Agent
}).
@ -81,77 +81,77 @@
%% leader messages, sent from the leader to the agent
%% Agent may have several shared subscriptions, so may talk to several leaders
%% `group` field is used to identify the leader.
%% `group_id` field is used to identify the leader.
-define(leader_lease_streams_msg, leader_lease_streams).
-define(leader_renew_stream_lease_msg, leader_renew_stream_lease).
-define(leader_lease_streams(Group, Leader, Streams, Version), #{
-define(leader_lease_streams(GrouId, Leader, Streams, Version), #{
type => ?leader_lease_streams_msg,
streams => Streams,
version => Version,
leader => Leader,
group => Group
group_id => GrouId
}).
-define(leader_lease_streams_match(Group, Leader, Streams, Version), #{
-define(leader_lease_streams_match(GroupId, Leader, Streams, Version), #{
type := ?leader_lease_streams_msg,
streams := Streams,
version := Version,
leader := Leader,
group := Group
group_id := GroupId
}).
-define(leader_renew_stream_lease(Group, Version), #{
-define(leader_renew_stream_lease(GroupId, Version), #{
type => ?leader_renew_stream_lease_msg,
version => Version,
group => Group
group_id => GroupId
}).
-define(leader_renew_stream_lease_match(Group, Version), #{
-define(leader_renew_stream_lease_match(GroupId, Version), #{
type := ?leader_renew_stream_lease_msg,
version := Version,
group := Group
group_id := GroupId
}).
-define(leader_renew_stream_lease(Group, VersionOld, VersionNew), #{
-define(leader_renew_stream_lease(GroupId, VersionOld, VersionNew), #{
type => ?leader_renew_stream_lease_msg,
version_old => VersionOld,
version_new => VersionNew,
group => Group
group_id => GroupId
}).
-define(leader_renew_stream_lease_match(Group, VersionOld, VersionNew), #{
-define(leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew), #{
type := ?leader_renew_stream_lease_msg,
version_old := VersionOld,
version_new := VersionNew,
group := Group
group_id := GroupId
}).
-define(leader_update_streams(Group, VersionOld, VersionNew, StreamsNew), #{
-define(leader_update_streams(GroupId, VersionOld, VersionNew, StreamsNew), #{
type => leader_update_streams,
version_old => VersionOld,
version_new => VersionNew,
streams_new => StreamsNew,
group => Group
group_id => GroupId
}).
-define(leader_update_streams_match(Group, VersionOld, VersionNew, StreamsNew), #{
-define(leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew), #{
type := leader_update_streams,
version_old := VersionOld,
version_new := VersionNew,
streams_new := StreamsNew,
group := Group
group_id := GroupId
}).
-define(leader_invalidate(Group), #{
-define(leader_invalidate(GroupId), #{
type => leader_invalidate,
group => Group
group_id => GroupId
}).
-define(leader_invalidate_match(Group), #{
-define(leader_invalidate_match(GroupId), #{
type := leader_invalidate,
group := Group
group_id := GroupId
}).
%% Helpers

View File

@ -26,7 +26,7 @@
-record(lookup_leader, {
agent :: emqx_ds_shared_sub_proto:agent(),
agent_metadata :: emqx_ds_shared_sub_proto:agent_metadata(),
topic_filter :: emqx_persistent_session_ds:share_topic_filter()
share_topic_filter :: emqx_persistent_session_ds:share_topic_filter()
}).
-define(gproc_id(ID), {n, l, ID}).
@ -40,9 +40,9 @@
emqx_ds_shared_sub_proto:agent_metadata(),
emqx_persistent_session_ds:share_topic_filter()
) -> ok.
lookup_leader(Agent, AgentMetadata, TopicFilter) ->
lookup_leader(Agent, AgentMetadata, ShareTopicFilter) ->
gen_server:cast(?MODULE, #lookup_leader{
agent = Agent, agent_metadata = AgentMetadata, topic_filter = TopicFilter
agent = Agent, agent_metadata = AgentMetadata, share_topic_filter = ShareTopicFilter
}).
%%--------------------------------------------------------------------
@ -72,9 +72,14 @@ handle_call(_Request, _From, State) ->
{reply, {error, unknown_request}, State}.
handle_cast(
#lookup_leader{agent = Agent, agent_metadata = AgentMetadata, topic_filter = TopicFilter}, State
#lookup_leader{
agent = Agent,
agent_metadata = AgentMetadata,
share_topic_filter = ShareTopicFilter
},
State
) ->
State1 = do_lookup_leader(Agent, AgentMetadata, TopicFilter, State),
State1 = do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State),
{noreply, State1}.
handle_info(_Info, State) ->
@ -87,15 +92,15 @@ terminate(_Reason, _State) ->
%% Internal functions
%%--------------------------------------------------------------------
do_lookup_leader(Agent, AgentMetadata, TopicFilter, State) ->
do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State) ->
%% TODO https://emqx.atlassian.net/browse/EMQX-12309
%% Cluster-wide unique leader election should be implemented
Id = emqx_ds_shared_sub_leader:id(TopicFilter),
Id = emqx_ds_shared_sub_leader:id(ShareTopicFilter),
LeaderPid =
case gproc:where(?gproc_id(Id)) of
undefined ->
{ok, Pid} = emqx_ds_shared_sub_leader_sup:start_leader(#{
topic_filter => TopicFilter
share_topic_filter => ShareTopicFilter
}),
{ok, NewLeaderPid} = emqx_ds_shared_sub_leader:register(
Pid,
@ -111,10 +116,10 @@ do_lookup_leader(Agent, AgentMetadata, TopicFilter, State) ->
?SLOG(info, #{
msg => lookup_leader,
agent => Agent,
topic_filter => TopicFilter,
share_topic_filter => ShareTopicFilter,
leader => LeaderPid
}),
ok = emqx_ds_shared_sub_proto:agent_connect_leader(
LeaderPid, Agent, AgentMetadata, TopicFilter
LeaderPid, Agent, AgentMetadata, ShareTopicFilter
),
State.

View File

@ -33,9 +33,9 @@ introduced_in() ->
emqx_ds_shared_sub_proto:agent_metadata(),
emqx_persistent_session_ds:share_topic_filter()
) -> ok.
agent_connect_leader(Node, ToLeader, FromAgent, AgentMetadata, TopicFilter) ->
agent_connect_leader(Node, ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_connect_leader, [
ToLeader, FromAgent, AgentMetadata, TopicFilter
ToLeader, FromAgent, AgentMetadata, ShareTopicFilter
]).
-spec agent_update_stream_states(