From 65ab81ff7487f8a1df61e58dd74c34f0b10fdcfa Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Thu, 4 Jul 2024 17:22:26 +0300 Subject: [PATCH] feat(queue): fix quick resubscription --- ...emqx_persistent_session_ds_shared_subs.erl | 68 +++++++++++++++---- .../src/emqx_ds_shared_sub_agent.erl | 7 ++ .../src/emqx_ds_shared_sub_group_sm.erl | 26 +++++-- .../src/emqx_ds_shared_sub_proto.erl | 22 ++++-- .../test/emqx_ds_shared_sub_SUITE.erl | 10 ++- 5 files changed, 106 insertions(+), 27 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 6709eb37a..7db86dfe0 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -198,8 +198,16 @@ schedule_subscribe( ScheduledActions1 = ScheduledActions0#{ TopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}} }, + ?tp(warning, shared_subs_schedule_subscribe_override, #{ + topic_filter => TopicFilter, + new_type => {?schedule_subscribe, SubOpts}, + old_action => format_schedule_action(ScheduledAction) + }), SharedSubS0#{scheduled_actions := ScheduledActions1}; _ -> + ?tp(warning, shared_subs_schedule_subscribe_new, #{ + topic_filter => TopicFilter, subopts => SubOpts + }), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe( Agent0, TopicFilter, SubOpts ), @@ -237,20 +245,30 @@ schedule_unsubscribe( S, #{scheduled_actions := ScheduledActions0} = SharedSubS0, UnsubscridedSubId, TopicFilter ) -> case ScheduledActions0 of - #{TopicFilter := ScheduledAction} -> + #{TopicFilter := ScheduledAction0} -> + ScheduledAction1 = ScheduledAction0#{type => ?schedule_unsubscribe}, ScheduledActions1 = ScheduledActions0#{ - TopicFilter => ScheduledAction#{type => ?schedule_unsubscribe} + TopicFilter => ScheduledAction1 }, + ?tp(warning, shared_subs_schedule_unsubscribe_override, #{ + topic_filter => TopicFilter, + new_type => ?schedule_unsubscribe, + old_action => format_schedule_action(ScheduledAction0) + }), SharedSubS0#{scheduled_actions := ScheduledActions1}; _ -> - StreamIdsToFinalize = stream_ids_by_sub_id(S, UnsubscridedSubId), + StreamKeys = stream_keys_by_sub_id(S, UnsubscridedSubId), ScheduledActions1 = ScheduledActions0#{ TopicFilter => #{ type => ?schedule_unsubscribe, - stream_keys_to_wait => StreamIdsToFinalize, + stream_keys_to_wait => StreamKeys, progresses => [] } }, + ?tp(warning, shared_subs_schedule_unsubscribe_new, #{ + topic_filter => TopicFilter, + stream_keys => emqx_ds_shared_sub_proto:format_stream_keys(StreamKeys) + }), SharedSubS0#{scheduled_actions := ScheduledActions1} end. @@ -400,28 +418,43 @@ run_scheduled_actions(S, Agent, ScheduledActions) -> run_scheduled_action( S, - Agent, - TopicFilter, + Agent0, + #share{group = Group} = TopicFilter, #{type := Type, stream_keys_to_wait := StreamKeysToWait0, progresses := Progresses0} = Action ) -> StreamKeysToWait1 = filter_unfinished_streams(S, StreamKeysToWait0), Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0, case StreamKeysToWait1 of [] -> + ?tp(warning, shared_subs_schedule_action_complete, #{ + topic_filter => TopicFilter, + progresses => emqx_ds_shared_sub_proto:format_streams(Progresses1), + type => Type + }), + %% Regular progress won't se unsubscribed streams, so we need to + %% send the progress explicitly. + Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_stream_progress( + Agent0, #{Group => Progresses1} + ), case Type of {?schedule_subscribe, SubOpts} -> {ok, emqx_persistent_session_ds_shared_subs_agent:on_subscribe( - Agent, TopicFilter, SubOpts + Agent1, TopicFilter, SubOpts )}; ?schedule_unsubscribe -> {ok, emqx_persistent_session_ds_shared_subs_agent:on_unsubscribe( - Agent, TopicFilter, Progresses1 + Agent1, TopicFilter, Progresses1 )} end; _ -> - {continue, Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1}} + Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1}, + ?tp(warning, shared_subs_schedule_action_continue, #{ + topic_filter => TopicFilter, + new_action => format_schedule_action(Action1) + }), + {continue, Action1} end. filter_unfinished_streams(S, StreamKeysToWait) -> @@ -509,14 +542,14 @@ lookup(TopicFilter, S) -> undefined end. -stream_ids_by_sub_id(S, MatchSubId) -> +stream_keys_by_sub_id(S, MatchSubId) -> emqx_persistent_session_ds_state:fold_streams( - fun({SubId, _Stream} = StreamStateId, _SRS, StreamStateIds) -> + fun({SubId, _Stream} = StreamKey, _SRS, StreamKeys) -> case SubId of MatchSubId -> - [StreamStateId | StreamStateIds]; + [StreamKey | StreamKeys]; _ -> - StreamStateIds + StreamKeys end end, [], @@ -580,3 +613,12 @@ is_use_finished(_S, #srs{unsubscribed = Unsubscribed}) -> is_stream_fully_acked(S, SRS) -> emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S). + +format_schedule_action(#{ + type := Type, progresses := Progresses, stream_keys_to_wait := StreamKeysToWait +}) -> + #{ + type => Type, + progresses => emqx_ds_shared_sub_proto:format_streams(Progresses), + stream_keys_to_wait => emqx_ds_shared_sub_proto:format_stream_keys(StreamKeysToWait) + }. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index 70b203661..b896370f3 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -6,6 +6,7 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include("emqx_ds_shared_sub_proto.hrl"). @@ -41,6 +42,9 @@ open(TopicSubscriptions, Opts) -> State0 = init_state(Opts), State1 = lists:foldl( fun({ShareTopicFilter, #{}}, State) -> + ?tp(warning, ds_agent_open_subscription, #{ + topic_filter => ShareTopicFilter + }), add_group_subscription(State, ShareTopicFilter) end, State0, @@ -52,6 +56,9 @@ can_subscribe(_State, _TopicFilter, _SubOpts) -> ok. on_subscribe(State0, TopicFilter, _SubOpts) -> + ?tp(warning, ds_agent_on_subscribe, #{ + topic_filter => TopicFilter + }), add_group_subscription(State0, TopicFilter). on_unsubscribe(State, TopicFilter, GroupProgress) -> diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index aab47802b..f9a81bbb8 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -153,6 +153,10 @@ new(#{ agent => Agent, send_after => SendAfter }, + ?tp(warning, group_sm_new, #{ + agent => Agent, + topic_filter => ShareTopicFilter + }), transition(GSM0, ?connecting, #{}). -spec fetch_stream_events(group_sm()) -> {group_sm(), list(external_lease_event())}. @@ -191,6 +195,10 @@ handle_disconnect( %% Connecting state handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) -> + ?tp(warning, group_sm_enter_connecting, #{ + agent => Agent, + topic_filter => ShareTopicFilter + }), ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter), ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT). @@ -215,6 +223,10 @@ handle_leader_lease_streams(GSM, _Leader, _StreamProgresses, _Version) -> GSM. handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) -> + ?tp(warning, group_sm_find_leader_timeout, #{ + agent => Agent, + topic_filter => TopicFilter + }), ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), TopicFilter), GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT), GSM1. @@ -229,8 +241,8 @@ handle_replaying(GSM0) -> ), GSM2. -handle_renew_lease_timeout(GSM) -> - ?tp(debug, renew_lease_timeout, #{}), +handle_renew_lease_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM) -> + ?tp(warning, renew_lease_timeout, #{agent => Agent, topic_filter => TopicFilter}), transition(GSM, ?connecting, #{}). %%----------------------------------------------------------------------- @@ -326,12 +338,12 @@ handle_leader_update_streams( ) -> GSM; handle_leader_update_streams(GSM, VersionOld, VersionNew, _StreamProgresses) -> + %% Unexpected versions or state ?tp(warning, shared_sub_group_sm_unexpected_leader_update_streams, #{ gsm => GSM, version_old => VersionOld, version_new => VersionNew }), - %% Unexpected versions or state transition(GSM, ?connecting, #{}). handle_leader_renew_stream_lease( @@ -364,12 +376,12 @@ handle_leader_renew_stream_lease( ) -> GSM; handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) -> + %% Unexpected versions or state ?tp(warning, shared_sub_group_sm_unexpected_leader_renew_stream_lease, #{ gsm => GSM, version_old => VersionOld, version_new => VersionNew }), - %% Unexpected versions or state transition(GSM, ?connecting, #{}). -spec handle_stream_progress(group_sm(), list(emqx_ds_shared_sub_proto:agent_stream_progress())) -> @@ -410,7 +422,11 @@ handle_stream_progress( handle_stream_progress(#{state := ?disconnected} = GSM, _StreamProgresses) -> GSM. -handle_leader_invalidate(GSM) -> +handle_leader_invalidate(#{agent := Agent, topic_filter := TopicFilter} = GSM) -> + ?tp(warning, shared_sub_group_sm_leader_invalidate, #{ + agent => Agent, + topic_filter => TopicFilter + }), transition(GSM, ?connecting, #{}). %%----------------------------------------------------------------------- diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index 0b1770f3c..184e8d147 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -2,10 +2,6 @@ %% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -%% TODO https://emqx.atlassian.net/browse/EMQX-12573 -%% This should be wrapped with a proto_v1 module. -%% For simplicity, send as simple OTP messages for now. - -module(emqx_ds_shared_sub_proto). -include("emqx_ds_shared_sub_proto.hrl"). @@ -27,6 +23,9 @@ -export([ format_streams/1, + format_stream/1, + format_stream_key/1, + format_stream_keys/1, agent/2 ]). @@ -254,12 +253,21 @@ format_streams(Streams) -> Streams ). +format_stream(#{stream := Stream, iterator := Iterator} = Value) -> + Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}. + +format_stream_key({SubId, Stream}) -> + {SubId, format_opaque(Stream)}. + +format_stream_keys(StreamKeys) -> + lists:map( + fun format_stream_key/1, + StreamKeys + ). + %%-------------------------------------------------------------------- %% Helpers %%-------------------------------------------------------------------- format_opaque(Opaque) -> erlang:phash2(Opaque). - -format_stream(#{stream := Stream, iterator := Iterator} = Value) -> - Value#{stream => format_opaque(Stream), iterator => format_opaque(Iterator)}. diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 4c2e9a239..4733dc650 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -297,8 +297,14 @@ t_quick_resubscribe(_Config) -> ConnShared2 = emqtt_connect_sub(<<"client_shared2">>), {ok, _, _} = emqtt:subscribe(ConnShared2, <<"$share/gr10/topic10/#">>, 1), - {ok, _, _} = emqtt:unsubscribe(ConnShared1, <<"$share/gr10/topic10/#">>), - {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr10/topic10/#">>, 1), + ok = lists:foreach( + fun(_) -> + {ok, _, _} = emqtt:unsubscribe(ConnShared1, <<"$share/gr10/topic10/#">>), + {ok, _, _} = emqtt:subscribe(ConnShared1, <<"$share/gr10/topic10/#">>, 1), + ct:sleep(5) + end, + lists:seq(1, 10) + ), receive publish_done -> ok