From 2096755ad6a1e7189ade2dddecda5c24a7909045 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 21 Jun 2024 13:01:27 +0300 Subject: [PATCH] feat(queue): reorganize group sm callbacks and methods --- .../src/emqx_ds_shared_sub_group_sm.erl | 76 ++++++++++++------- 1 file changed, 48 insertions(+), 28 deletions(-) diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index 434ea0f14..d1f78bf68 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -96,6 +96,35 @@ new(#{ GSM2 = ensure_state_timeout(GSM1, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader), GSM2. +fetch_stream_events( + #{ + state := ?replaying, + topic_filter := TopicFilter, + state_data := #{stream_lease_events := Events0} = Data + } = GSM +) -> + Events1 = lists:map( + fun(Event) -> + Event#{topic_filter => TopicFilter} + end, + Events0 + ), + { + GSM#{ + state_data => Data#{stream_lease_events => []} + }, + Events1 + }; +fetch_stream_events(GSM) -> + {GSM, []}. + +%%----------------------------------------------------------------------- +%% Event Handlers +%%----------------------------------------------------------------------- + +%%----------------------------------------------------------------------- +%% Connecting state + handle_leader_lease_streams( #{state := ?connecting, topic_filter := TopicFilter} = GSM, StreamProgresses, Version ) -> @@ -131,6 +160,14 @@ handle_leader_lease_streams( handle_leader_lease_streams(GSM, _StreamProgresses, _Version) -> GSM. +handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) -> + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter), + GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader), + GSM1. + +%%----------------------------------------------------------------------- +%% Replaying state + handle_leader_renew_stream_lease( #{state := ?replaying, state_data := #{version := Version} = Data} = GSM, Version ) -> @@ -140,40 +177,23 @@ handle_leader_renew_stream_lease( handle_leader_renew_stream_lease(GSM, _Version) -> GSM. +%%----------------------------------------------------------------------- +%% Updating state + +%%----------------------------------------------------------------------- +%% Internal API +%%----------------------------------------------------------------------- + handle_state_timeout( - #{agent := Agent, state := ?connecting, topic_filter := TopicFilter} = GSM0, + #{state := ?connecting, topic_filter := TopicFilter} = GSM, find_leader_timeout, find_leader ) -> ?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}), - ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter), - GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader), - GSM1. - -fetch_stream_events( - #{ - state := ?replaying, - topic_filter := TopicFilter, - state_data := #{stream_lease_events := Events0} = Data - } = GSM -) -> - Events1 = lists:map( - fun(Event) -> - Event#{topic_filter => TopicFilter} - end, - Events0 - ), - { - GSM#{ - state_data => Data#{stream_lease_events => []} - }, - Events1 - }; -fetch_stream_events(GSM) -> - {GSM, []}. + handle_find_leader_timeout(GSM). handle_info( - #{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = Msg + #{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info ) -> case Timers of #{Name := #timer{id = Id}} -> @@ -182,7 +202,7 @@ handle_info( %% Stale timer GSM end; -handle_info(GSM, Msg) -> +handle_info(GSM, _Info) -> GSM. %%--------------------------------------------------------------------