feat(queue): fix quick resubscription
This commit is contained in:
parent
53d4cd3174
commit
65ab81ff74
|
@ -198,8 +198,16 @@ schedule_subscribe(
|
|||
ScheduledActions1 = ScheduledActions0#{
|
||||
TopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}}
|
||||
},
|
||||
?tp(warning, shared_subs_schedule_subscribe_override, #{
|
||||
topic_filter => TopicFilter,
|
||||
new_type => {?schedule_subscribe, SubOpts},
|
||||
old_action => format_schedule_action(ScheduledAction)
|
||||
}),
|
||||
SharedSubS0#{scheduled_actions := ScheduledActions1};
|
||||
_ ->
|
||||
?tp(warning, shared_subs_schedule_subscribe_new, #{
|
||||
topic_filter => TopicFilter, subopts => SubOpts
|
||||
}),
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
|
||||
Agent0, TopicFilter, SubOpts
|
||||
),
|
||||
|
@ -237,20 +245,30 @@ schedule_unsubscribe(
|
|||
S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, TopicFilter
|
||||
) ->
|
||||
case ScheduledActions0 of
|
||||
#{TopicFilter := ScheduledAction} ->
|
||||
#{TopicFilter := ScheduledAction0} ->
|
||||
ScheduledAction1 = ScheduledAction0#{type => ?schedule_unsubscribe},
|
||||
ScheduledActions1 = ScheduledActions0#{
|
||||
TopicFilter => ScheduledAction#{type => ?schedule_unsubscribe}
|
||||
TopicFilter => ScheduledAction1
|
||||
},
|
||||
?tp(warning, shared_subs_schedule_unsubscribe_override, #{
|
||||
topic_filter => TopicFilter,
|
||||
new_type => ?schedule_unsubscribe,
|
||||
old_action => format_schedule_action(ScheduledAction0)
|
||||
}),
|
||||
SharedSubS0#{scheduled_actions := ScheduledActions1};
|
||||
_ ->
|
||||
StreamIdsToFinalize = stream_ids_by_sub_id(S, UnsubscridedSubId),
|
||||
StreamKeys = stream_keys_by_sub_id(S, UnsubscridedSubId),
|
||||
ScheduledActions1 = ScheduledActions0#{
|
||||
TopicFilter => #{
|
||||
type => ?schedule_unsubscribe,
|
||||
stream_keys_to_wait => StreamIdsToFinalize,
|
||||
stream_keys_to_wait => StreamKeys,
|
||||
progresses => []
|
||||
}
|
||||
},
|
||||
?tp(warning, shared_subs_schedule_unsubscribe_new, #{
|
||||
topic_filter => TopicFilter,
|
||||
stream_keys => emqx_ds_shared_sub_proto:format_stream_keys(StreamKeys)
|
||||
}),
|
||||
SharedSubS0#{scheduled_actions := ScheduledActions1}
|
||||
end.
|
||||
|
||||
|
@ -400,28 +418,43 @@ run_scheduled_actions(S, Agent, ScheduledActions) ->
|
|||
|
||||
run_scheduled_action(
|
||||
S,
|
||||
Agent,
|
||||
TopicFilter,
|
||||
Agent0,
|
||||
#share{group = Group} = TopicFilter,
|
||||
#{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action
|
||||
) ->
|
||||
StreamKeysToWait1 = filter_unfinished_streams(S, StreamKeysToWait0),
|
||||
Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0,
|
||||
case StreamKeysToWait1 of
|
||||
[] ->
|
||||
?tp(warning, shared_subs_schedule_action_complete, #{
|
||||
topic_filter => TopicFilter,
|
||||
progresses => emqx_ds_shared_sub_proto:format_streams(Progresses1),
|
||||
type => Type
|
||||
}),
|
||||
%% Regular progress won't se unsubscribed streams, so we need to
|
||||
%% send the progress explicitly.
|
||||
Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress(
|
||||
Agent0, #{Group => Progresses1}
|
||||
),
|
||||
case Type of
|
||||
{?schedule_subscribe, SubOpts} ->
|
||||
{ok,
|
||||
emqx_persistent_session_ds_shared_subs_agent:on_subscribe(
|
||||
Agent, TopicFilter, SubOpts
|
||||
Agent1, TopicFilter, SubOpts
|
||||
)};
|
||||
?schedule_unsubscribe ->
|
||||
{ok,
|
||||
emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe(
|
||||
Agent, TopicFilter, Progresses1
|
||||
Agent1, TopicFilter, Progresses1
|
||||
)}
|
||||
end;
|
||||
_ ->
|
||||
{continue, Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1}}
|
||||
Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1},
|
||||
?tp(warning, shared_subs_schedule_action_continue, #{
|
||||
topic_filter => TopicFilter,
|
||||
new_action => format_schedule_action(Action1)
|
||||
}),
|
||||
{continue, Action1}
|
||||
end.
|
||||
|
||||
filter_unfinished_streams(S, StreamKeysToWait) ->
|
||||
|
@ -509,14 +542,14 @@ lookup(TopicFilter, S) ->
|
|||
undefined
|
||||
end.
|
||||
|
||||
stream_ids_by_sub_id(S, MatchSubId) ->
|
||||
stream_keys_by_sub_id(S, MatchSubId) ->
|
||||
emqx_persistent_session_ds_state:fold_streams(
|
||||
fun({SubId, _Stream} = StreamStateId, _SRS, StreamStateIds) ->
|
||||
fun({SubId, _Stream} = StreamKey, _SRS, StreamKeys) ->
|
||||
case SubId of
|
||||
MatchSubId ->
|
||||
[StreamStateId | StreamStateIds];
|
||||
[StreamKey | StreamKeys];
|
||||
_ ->
|
||||
StreamStateIds
|
||||
StreamKeys
|
||||
end
|
||||
end,
|
||||
[],
|
||||
|
@ -580,3 +613,12 @@ is_use_finished(_S, #srs{unsubscribed = Unsubscribed}) ->
|
|||
|
||||
is_stream_fully_acked(S, SRS) ->
|
||||
emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S).
|
||||
|
||||
format_schedule_action(#{
|
||||
type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait
|
||||
}) ->
|
||||
#{
|
||||
type => Type,
|
||||
progresses => emqx_ds_shared_sub_proto:format_streams(Progresses),
|
||||
stream_keys_to_wait => emqx_ds_shared_sub_proto:format_stream_keys(StreamKeysToWait)
|
||||
}.
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-include("emqx_ds_shared_sub_proto.hrl").
|
||||
|
||||
|
@ -41,6 +42,9 @@ open(TopicSubscriptions, Opts) ->
|
|||
State0 = init_state(Opts),
|
||||
State1 = lists:foldl(
|
||||
fun({ShareTopicFilter, #{}}, State) ->
|
||||
?tp(warning, ds_agent_open_subscription, #{
|
||||
topic_filter => ShareTopicFilter
|
||||
}),
|
||||
add_group_subscription(State, ShareTopicFilter)
|
||||
end,
|
||||
State0,
|
||||
|
@ -52,6 +56,9 @@ can_subscribe(_State, _TopicFilter, _SubOpts) ->
|
|||
ok.
|
||||
|
||||
on_subscribe(State0, TopicFilter, _SubOpts) ->
|
||||
?tp(warning, ds_agent_on_subscribe, #{
|
||||
topic_filter => TopicFilter
|
||||
}),
|
||||
add_group_subscription(State0, TopicFilter).
|
||||
|
||||
on_unsubscribe(State, TopicFilter, GroupProgress) ->
|
||||
|
|
|
@ -153,6 +153,10 @@ new(#{
|
|||
agent => Agent,
|
||||
send_after => SendAfter
|
||||
},
|
||||
?tp(warning, group_sm_new, #{
|
||||
agent => Agent,
|
||||
topic_filter => ShareTopicFilter
|
||||
}),
|
||||
transition(GSM0, ?connecting, #{}).
|
||||
|
||||
-spec fetch_stream_events(group_sm()) -> {group_sm(), list(external_lease_event())}.
|
||||
|
@ -191,6 +195,10 @@ handle_disconnect(
|
|||
%% Connecting state
|
||||
|
||||
handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) ->
|
||||
?tp(warning, group_sm_enter_connecting, #{
|
||||
agent => Agent,
|
||||
topic_filter => ShareTopicFilter
|
||||
}),
|
||||
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter),
|
||||
ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT).
|
||||
|
||||
|
@ -215,6 +223,10 @@ handle_leader_lease_streams(GSM, _Leader, _StreamProgresses, _Version) ->
|
|||
GSM.
|
||||
|
||||
handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) ->
|
||||
?tp(warning, group_sm_find_leader_timeout, #{
|
||||
agent => Agent,
|
||||
topic_filter => TopicFilter
|
||||
}),
|
||||
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), TopicFilter),
|
||||
GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT),
|
||||
GSM1.
|
||||
|
@ -229,8 +241,8 @@ handle_replaying(GSM0) ->
|
|||
),
|
||||
GSM2.
|
||||
|
||||
handle_renew_lease_timeout(GSM) ->
|
||||
?tp(debug, renew_lease_timeout, #{}),
|
||||
handle_renew_lease_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM) ->
|
||||
?tp(warning, renew_lease_timeout, #{agent => Agent, topic_filter => TopicFilter}),
|
||||
transition(GSM, ?connecting, #{}).
|
||||
|
||||
%%-----------------------------------------------------------------------
|
||||
|
@ -326,12 +338,12 @@ handle_leader_update_streams(
|
|||
) ->
|
||||
GSM;
|
||||
handle_leader_update_streams(GSM, VersionOld, VersionNew, _StreamProgresses) ->
|
||||
%% Unexpected versions or state
|
||||
?tp(warning, shared_sub_group_sm_unexpected_leader_update_streams, #{
|
||||
gsm => GSM,
|
||||
version_old => VersionOld,
|
||||
version_new => VersionNew
|
||||
}),
|
||||
%% Unexpected versions or state
|
||||
transition(GSM, ?connecting, #{}).
|
||||
|
||||
handle_leader_renew_stream_lease(
|
||||
|
@ -364,12 +376,12 @@ handle_leader_renew_stream_lease(
|
|||
) ->
|
||||
GSM;
|
||||
handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) ->
|
||||
%% Unexpected versions or state
|
||||
?tp(warning, shared_sub_group_sm_unexpected_leader_renew_stream_lease, #{
|
||||
gsm => GSM,
|
||||
version_old => VersionOld,
|
||||
version_new => VersionNew
|
||||
}),
|
||||
%% Unexpected versions or state
|
||||
transition(GSM, ?connecting, #{}).
|
||||
|
||||
-spec handle_stream_progress(group_sm(), list(emqx_ds_shared_sub_proto:agent_stream_progress())) ->
|
||||
|
@ -410,7 +422,11 @@ handle_stream_progress(
|
|||
handle_stream_progress(#{state := ?disconnected} = GSM, _StreamProgresses) ->
|
||||
GSM.
|
||||
|
||||
handle_leader_invalidate(GSM) ->
|
||||
handle_leader_invalidate(#{agent := Agent, topic_filter := TopicFilter} = GSM) ->
|
||||
?tp(warning, shared_sub_group_sm_leader_invalidate, #{
|
||||
agent => Agent,
|
||||
topic_filter => TopicFilter
|
||||
}),
|
||||
transition(GSM, ?connecting, #{}).
|
||||
|
||||
%%-----------------------------------------------------------------------
|
||||
|
|
|
@ -2,10 +2,6 @@
|
|||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% TODO https://emqx.atlassian.net/browse/EMQX-12573
|
||||
%% This should be wrapped with a proto_v1 module.
|
||||
%% For simplicity, send as simple OTP messages for now.
|
||||
|
||||
-module(emqx_ds_shared_sub_proto).
|
||||
|
||||
-include("emqx_ds_shared_sub_proto.hrl").
|
||||
|
@ -27,6 +23,9 @@
|
|||
|
||||
-export([
|
||||
format_streams/1,
|
||||
format_stream/1,
|
||||
format_stream_key/1,
|
||||
format_stream_keys/1,
|
||||
agent/2
|
||||
]).
|
||||
|
||||
|
@ -254,12 +253,21 @@ format_streams(Streams) ->
|
|||
Streams
|
||||
).
|
||||
|
||||
format_stream(#{stream := Stream, iterator := Iterator} = Value) ->
|
||||
Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}.
|
||||
|
||||
format_stream_key({SubId, Stream}) ->
|
||||
{SubId, format_opaque(Stream)}.
|
||||
|
||||
format_stream_keys(StreamKeys) ->
|
||||
lists:map(
|
||||
fun format_stream_key/1,
|
||||
StreamKeys
|
||||
).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
format_opaque(Opaque) ->
|
||||
erlang:phash2(Opaque).
|
||||
|
||||
format_stream(#{stream := Stream, iterator := Iterator} = Value) ->
|
||||
Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}.
|
||||
|
|
|
@ -297,8 +297,14 @@ t_quick_resubscribe(_Config) ->
|
|||
|
||||
ConnShared2 = emqtt_connect_sub(<<"client_shared2">>),
|
||||
{ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr10/topic10/#">>, 1),
|
||||
ok = lists:foreach(
|
||||
fun(_) ->
|
||||
{ok, _, _} = emqtt:unsubscribe(ConnShared1, <<"$share/gr10/topic10/#">>),
|
||||
{ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr10/topic10/#">>, 1),
|
||||
ct:sleep(5)
|
||||
end,
|
||||
lists:seq(1, 10)
|
||||
),
|
||||
|
||||
receive
|
||||
publish_done -> ok
|
||||
|
|
Loading…
Reference in New Issue