feat(queue): reorganize group sm callbacks and methods

This commit is contained in:
Ilya Averyanov 2024-06-21 13:01:27 +03:00
parent b9c5911883
commit 2096755ad6
1 changed files with 48 additions and 28 deletions

View File

@ -96,6 +96,35 @@ new(#{
GSM2 = ensure_state_timeout(GSM1, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader),
GSM2.
fetch_stream_events(
#{
state := ?replaying,
topic_filter := TopicFilter,
state_data := #{stream_lease_events := Events0} = Data
} = GSM
) ->
Events1 = lists:map(
fun(Event) ->
Event#{topic_filter => TopicFilter}
end,
Events0
),
{
GSM#{
state_data => Data#{stream_lease_events => []}
},
Events1
};
fetch_stream_events(GSM) ->
{GSM, []}.
%%-----------------------------------------------------------------------
%% Event Handlers
%%-----------------------------------------------------------------------
%%-----------------------------------------------------------------------
%% Connecting state
handle_leader_lease_streams(
#{state := ?connecting, topic_filter := TopicFilter} = GSM, StreamProgresses, Version
) ->
@ -131,6 +160,14 @@ handle_leader_lease_streams(
handle_leader_lease_streams(GSM, _StreamProgresses, _Version) ->
GSM.
handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) ->
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter),
GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader),
GSM1.
%%-----------------------------------------------------------------------
%% Replaying state
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version} = Data} = GSM, Version
) ->
@ -140,40 +177,23 @@ handle_leader_renew_stream_lease(
handle_leader_renew_stream_lease(GSM, _Version) ->
GSM.
%%-----------------------------------------------------------------------
%% Updating state
%%-----------------------------------------------------------------------
%% Internal API
%%-----------------------------------------------------------------------
handle_state_timeout(
#{agent := Agent, state := ?connecting, topic_filter := TopicFilter} = GSM0,
#{state := ?connecting, topic_filter := TopicFilter} = GSM,
find_leader_timeout,
find_leader
) ->
?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter),
GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader),
GSM1.
fetch_stream_events(
#{
state := ?replaying,
topic_filter := TopicFilter,
state_data := #{stream_lease_events := Events0} = Data
} = GSM
) ->
Events1 = lists:map(
fun(Event) ->
Event#{topic_filter => TopicFilter}
end,
Events0
),
{
GSM#{
state_data => Data#{stream_lease_events => []}
},
Events1
};
fetch_stream_events(GSM) ->
{GSM, []}.
handle_find_leader_timeout(GSM).
handle_info(
#{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = Msg
#{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info
) ->
case Timers of
#{Name := #timer{id = Id}} ->
@ -182,7 +202,7 @@ handle_info(
%% Stale timer
GSM
end;
handle_info(GSM, Msg) ->
handle_info(GSM, _Info) ->
GSM.
%%--------------------------------------------------------------------