feat(queue): implement `find_leader_timeout` event and so the `connecting` group_sm state

This commit is contained in:
Ilya Averyanov 2024-06-21 12:14:57 +03:00
parent 979fb58e50
commit b9c5911883
3 changed files with 162 additions and 32 deletions

View File

@ -10,6 +10,7 @@
-module(emqx_ds_shared_sub_group_sm).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([
new/1,
@ -37,15 +38,40 @@
-define(replaying, replaying).
-define(updating, updating).
-type state() :: ?connecting | ?replaying | ?updating.
-type group_sm() :: #{
topic_filter => emqx_persistent_session_ds:share_topic_filter(),
agent => emqx_ds_shared_sub_proto:agent(),
send_after => fun((non_neg_integer(), term()) -> reference()),
state => ?connecting | ?replaying | ?updating,
state_data => map()
state => state(),
state_data => map(),
state_timers => map()
}.
-record(state_timeout, {
id :: reference(),
name :: atom(),
message :: term()
}).
-record(timer, {
ref :: reference(),
id :: reference()
}).
%%-----------------------------------------------------------------------
%% Constants
%%-----------------------------------------------------------------------
%% TODO https://emqx.atlassian.net/browse/EMQX-12574
%% Move to settings
-define(FIND_LEADER_TIMEOUT, 1000).
%%-----------------------------------------------------------------------
%% API
%%-----------------------------------------------------------------------
-spec new(options()) -> group_sm().
new(#{
agent := Agent,
@ -60,17 +86,20 @@ new(#{
topic_filter => ShareTopicFilter
}
),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter),
#{
GSM0 = #{
topic_filter => ShareTopicFilter,
agent => Agent,
send_after => SendAfter,
send_after => SendAfter
},
GSM1 = transition(GSM0, ?connecting, #{}),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter),
GSM2 = ensure_state_timeout(GSM1, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader),
GSM2.
state => ?connecting,
state_data => #{}
}.
handle_leader_lease_streams(#{state := ?connecting} = GSM, StreamProgresses, Version) ->
handle_leader_lease_streams(
#{state := ?connecting, topic_filter := TopicFilter} = GSM, StreamProgresses, Version
) ->
?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}),
Streams = lists:foldl(
fun(#{stream := Stream, iterator := It}, Acc) ->
Acc#{Stream => It}
@ -88,16 +117,17 @@ handle_leader_lease_streams(#{state := ?connecting} = GSM, StreamProgresses, Ver
end,
StreamProgresses
),
GSM#{
state => ?replaying,
state_data => #{
transition(
GSM,
?replaying,
#{
streams => Streams,
stream_lease_events => StreamLeaseEvents,
prev_version => undefined,
version => Version,
last_update_time => erlang:monotonic_time(millisecond)
}
};
);
handle_leader_lease_streams(GSM, _StreamProgresses, _Version) ->
GSM.
@ -110,8 +140,15 @@ handle_leader_renew_stream_lease(
handle_leader_renew_stream_lease(GSM, _Version) ->
GSM.
handle_info(GSM, _Info) ->
GSM.
handle_state_timeout(
#{agent := Agent, state := ?connecting, topic_filter := TopicFilter} = GSM0,
find_leader_timeout,
find_leader
) ->
?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter),
GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader),
GSM1.
fetch_stream_events(
#{
@ -135,9 +172,65 @@ fetch_stream_events(
fetch_stream_events(GSM) ->
{GSM, []}.
handle_info(
#{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = Msg
) ->
case Timers of
#{Name := #timer{id = Id}} ->
handle_state_timeout(GSM, Name, Message);
_ ->
%% Stale timer
GSM
end;
handle_info(GSM, Msg) ->
GSM.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
% send_after(#{send_after := SendAfter} = _GSM, Delay, Message) ->
% SendAfter(Delay, Message).
transition(GSM0, NewState, NewStateData) ->
Timers = maps:get(state_timers, GSM0, #{}),
TimerNames = maps:keys(Timers),
GSM1 = lists:foldl(
fun(Name, Acc) ->
cancel_timer(Acc, Name)
end,
GSM0,
TimerNames
),
GSM1#{
state => NewState,
state_data => NewStateData,
state_timers => #{}
}.
ensure_state_timeout(GSM0, Name, Delay, Message) ->
Id = make_ref(),
GSM1 = cancel_timer(GSM0, Name),
Timers = maps:get(state_timers, GSM1),
TimerMessage = #state_timeout{
id = Id,
name = Name,
message = Message
},
TimerRef = send_after(GSM1, Delay, TimerMessage),
GSM2 = GSM1#{
state_timers := Timers#{Name => #timer{ref = TimerRef, id = Id}}
},
GSM2.
send_after(#{send_after := SendAfter} = _GSM, Delay, Message) ->
SendAfter(Delay, Message).
cancel_timer(GSM, Name) ->
Timers = maps:get(state_timers, GSM, #{}),
case Timers of
#{Name := #timer{ref = TimerRef}} ->
_ = erlang:cancel_timer(TimerRef),
GSM#{
state_timers := maps:remove(Name, Timers)
};
_ ->
GSM
end.

View File

@ -260,13 +260,6 @@ connect_agent(
streams => length(StreamLease),
version => Version
}),
% ct:print("connect_agent: ~p~n", [#{
% msg => leader_lease_streams,
% agent => Agent,
% group => Group,
% streams => length(StreamLease),
% version => Version
% }]),
ok = emqx_ds_shared_sub_proto:leader_lease_streams(
Agent, Group, StreamLease, Version
),

View File

@ -42,24 +42,21 @@ end_per_suite(Config) ->
ok.
init_per_testcase(_TC, Config) ->
ok = snabbkaffe:start_trace(),
Config.
end_per_testcase(_TC, _Config) ->
ok = snabbkaffe:stop(),
ok = terminate_leaders(),
ok.
t_lease_initial(_Config) ->
ConnRegular = emqtt_connect_sub(<<"client_regular">>),
ConnPub = emqtt_connect_pub(<<"client_pub">>),
%% Need to pre-reate some streams in "topic/#".
%% Leader is dummy by far and won't update streams after the first lease to the agent.
%% So there should be some streams already when the agent connects.
{ok, _, _} = emqtt:subscribe(ConnRegular, <<"topic/1">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic/1">>, <<"hello1">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 5000),
ok = emqtt:disconnect(ConnRegular),
ok = init_streams(ConnPub, <<"topic/1">>),
ConnShared = emqtt_connect_sub(<<"client_shared">>),
{ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic/#">>, 1),
@ -70,10 +67,52 @@ t_lease_initial(_Config) ->
ok = emqtt:disconnect(ConnShared),
ok = emqtt:disconnect(ConnPub).
t_lease_reconnect(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>),
%% Need to pre-reate some streams in "topic/#".
%% Leader is dummy by far and won't update streams after the first lease to the agent.
%% So there should be some streams already when the agent connects.
ok = init_streams(ConnPub, <<"topic/2">>),
ConnShared = emqtt_connect_sub(<<"client_shared">>),
%% Stop registry to simulate unability to find leader.
ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
?assertWaitEvent(
{ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic/#">>, 1),
#{?snk_kind := find_leader_timeout},
5000
),
%% Start registry, agent should retry after some time and find the leader.
?assertWaitEvent(
{ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
#{?snk_kind := leader_lease_streams},
5000
),
{ok, _} = emqtt:publish(ConnPub, <<"topic/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10000),
ok = emqtt:disconnect(ConnShared),
ok = emqtt:disconnect(ConnPub).
%%--------------------------------------------------------------------
%% Helper functions
%%--------------------------------------------------------------------
init_streams(ConnPub, Topic) ->
ConnRegular = emqtt_connect_sub(<<"client_regular">>),
{ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1),
{ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 5000),
ok = emqtt:disconnect(ConnRegular).
emqtt_connect_sub(ClientId) ->
{ok, C} = emqtt:start_link([
{client_id, ClientId},
@ -92,3 +131,8 @@ emqtt_connect_pub(ClientId) ->
]),
{ok, _} = emqtt:connect(C),
C.
terminate_leaders() ->
ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
{ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup),
ok.