feat(queue): handle renew_lease_timeout

This commit is contained in:
Ilya Averyanov 2024-06-21 14:38:41 +03:00
parent 2096755ad6
commit db28a042d5
2 changed files with 84 additions and 31 deletions

View File

@ -67,6 +67,7 @@
%% TODO https://emqx.atlassian.net/browse/EMQX-12574 %% TODO https://emqx.atlassian.net/browse/EMQX-12574
%% Move to settings %% Move to settings
-define(FIND_LEADER_TIMEOUT, 1000). -define(FIND_LEADER_TIMEOUT, 1000).
-define(RENEW_LEASE_TIMEOUT, 2000).
%%----------------------------------------------------------------------- %%-----------------------------------------------------------------------
%% API %% API
@ -91,10 +92,7 @@ new(#{
agent => Agent, agent => Agent,
send_after => SendAfter send_after => SendAfter
}, },
GSM1 = transition(GSM0, ?connecting, #{}), transition(GSM0, ?connecting, #{}).
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter),
GSM2 = ensure_state_timeout(GSM1, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader),
GSM2.
fetch_stream_events( fetch_stream_events(
#{ #{
@ -125,8 +123,12 @@ fetch_stream_events(GSM) ->
%%----------------------------------------------------------------------- %%-----------------------------------------------------------------------
%% Connecting state %% Connecting state
handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) ->
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter),
ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT).
handle_leader_lease_streams( handle_leader_lease_streams(
#{state := ?connecting, topic_filter := TopicFilter} = GSM, StreamProgresses, Version #{state := ?connecting, topic_filter := TopicFilter} = GSM0, StreamProgresses, Version
) -> ) ->
?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}), ?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}),
Streams = lists:foldl( Streams = lists:foldl(
@ -147,14 +149,13 @@ handle_leader_lease_streams(
StreamProgresses StreamProgresses
), ),
transition( transition(
GSM, GSM0,
?replaying, ?replaying,
#{ #{
streams => Streams, streams => Streams,
stream_lease_events => StreamLeaseEvents, stream_lease_events => StreamLeaseEvents,
prev_version => undefined, prev_version => undefined,
version => Version, version => Version
last_update_time => erlang:monotonic_time(millisecond)
} }
); );
handle_leader_lease_streams(GSM, _StreamProgresses, _Version) -> handle_leader_lease_streams(GSM, _StreamProgresses, _Version) ->
@ -162,24 +163,32 @@ handle_leader_lease_streams(GSM, _StreamProgresses, _Version) ->
handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) -> handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) ->
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter), ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter),
GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader), GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT),
GSM1. GSM1.
%%----------------------------------------------------------------------- %%-----------------------------------------------------------------------
%% Replaying state %% Replaying state
handle_replaying(GSM) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT).
handle_leader_renew_stream_lease( handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version} = Data} = GSM, Version #{state := ?replaying, state_data := #{version := Version}} = GSM, Version
) -> ) ->
GSM#{ ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
state_data => Data#{last_update_time => erlang:monotonic_time(millisecond)}
};
handle_leader_renew_stream_lease(GSM, _Version) -> handle_leader_renew_stream_lease(GSM, _Version) ->
GSM. GSM.
handle_renew_lease_timeout(GSM) ->
?tp(debug, renew_lease_timeout, #{}),
transition(GSM, ?connecting, #{}).
%%----------------------------------------------------------------------- %%-----------------------------------------------------------------------
%% Updating state %% Updating state
% handle_updating(GSM) ->
% GSM.
%%----------------------------------------------------------------------- %%-----------------------------------------------------------------------
%% Internal API %% Internal API
%%----------------------------------------------------------------------- %%-----------------------------------------------------------------------
@ -187,10 +196,16 @@ handle_leader_renew_stream_lease(GSM, _Version) ->
handle_state_timeout( handle_state_timeout(
#{state := ?connecting, topic_filter := TopicFilter} = GSM, #{state := ?connecting, topic_filter := TopicFilter} = GSM,
find_leader_timeout, find_leader_timeout,
find_leader _Message
) -> ) ->
?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}), ?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}),
handle_find_leader_timeout(GSM). handle_find_leader_timeout(GSM);
handle_state_timeout(
#{state := ?replaying} = GSM,
renew_lease_timeout,
_Message
) ->
handle_renew_lease_timeout(GSM).
handle_info( handle_info(
#{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info #{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info
@ -219,11 +234,15 @@ transition(GSM0, NewState, NewStateData) ->
GSM0, GSM0,
TimerNames TimerNames
), ),
GSM1#{ GSM2 = GSM1#{
state => NewState, state => NewState,
state_data => NewStateData, state_data => NewStateData,
state_timers => #{} state_timers => #{}
}. },
run_enter_callback(GSM2).
ensure_state_timeout(GSM0, Name, Delay) ->
ensure_state_timeout(GSM0, Name, Delay, Name).
ensure_state_timeout(GSM0, Name, Delay, Message) -> ensure_state_timeout(GSM0, Name, Delay, Message) ->
Id = make_ref(), Id = make_ref(),
@ -254,3 +273,10 @@ cancel_timer(GSM, Name) ->
_ -> _ ->
GSM GSM
end. end.
run_enter_callback(#{state := ?connecting} = GSM) ->
handle_connecting(GSM);
run_enter_callback(#{state := ?replaying} = GSM) ->
handle_replaying(GSM).
% run_enter_callback(#{state := ?updating} = GSM) ->
% handle_updating(GSM).

View File

@ -53,16 +53,16 @@ end_per_testcase(_TC, _Config) ->
t_lease_initial(_Config) -> t_lease_initial(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>), ConnPub = emqtt_connect_pub(<<"client_pub">>),
%% Need to pre-reate some streams in "topic/#". %% Need to pre-create some streams in "topic/#".
%% Leader is dummy by far and won't update streams after the first lease to the agent. %% Leader is dummy by far and won't update streams after the first lease to the agent.
%% So there should be some streams already when the agent connects. %% So there should be some streams already when the agent connects.
ok = init_streams(ConnPub, <<"topic/1">>), ok = init_streams(ConnPub, <<"topic1/1">>),
ConnShared = emqtt_connect_sub(<<"client_shared">>), ConnShared = emqtt_connect_sub(<<"client_shared">>),
{ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic/#">>, 1), {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic1/#">>, 1),
{ok, _} = emqtt:publish(ConnPub, <<"topic/1">>, <<"hello2">>, 1), {ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10000), ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
ok = emqtt:disconnect(ConnShared), ok = emqtt:disconnect(ConnShared),
ok = emqtt:disconnect(ConnPub). ok = emqtt:disconnect(ConnPub).
@ -70,10 +70,10 @@ t_lease_initial(_Config) ->
t_lease_reconnect(_Config) -> t_lease_reconnect(_Config) ->
ConnPub = emqtt_connect_pub(<<"client_pub">>), ConnPub = emqtt_connect_pub(<<"client_pub">>),
%% Need to pre-reate some streams in "topic/#". %% Need to pre-create some streams in "topic/#".
%% Leader is dummy by far and won't update streams after the first lease to the agent. %% Leader is dummy by far and won't update streams after the first lease to the agent.
%% So there should be some streams already when the agent connects. %% So there should be some streams already when the agent connects.
ok = init_streams(ConnPub, <<"topic/2">>), ok = init_streams(ConnPub, <<"topic2/2">>),
ConnShared = emqtt_connect_sub(<<"client_shared">>), ConnShared = emqtt_connect_sub(<<"client_shared">>),
@ -81,25 +81,52 @@ t_lease_reconnect(_Config) ->
ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry), ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
?assertWaitEvent( ?assertWaitEvent(
{ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic/#">>, 1), {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1),
#{?snk_kind := find_leader_timeout}, #{?snk_kind := find_leader_timeout},
5000 5_000
), ),
%% Start registry, agent should retry after some time and find the leader. %% Start registry, agent should retry after some time and find the leader.
?assertWaitEvent( ?assertWaitEvent(
{ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry), {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry),
#{?snk_kind := leader_lease_streams}, #{?snk_kind := leader_lease_streams},
5000 5_000
), ),
{ok, _} = emqtt:publish(ConnPub, <<"topic/2">>, <<"hello2">>, 1), ct:sleep(1_000),
{ok, _} = emqtt:publish(ConnPub, <<"topic2/2">>, <<"hello2">>, 1),
?assertReceive({publish, #{payload := <<"hello2">>}}, 10000), ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000),
ok = emqtt:disconnect(ConnShared), ok = emqtt:disconnect(ConnShared),
ok = emqtt:disconnect(ConnPub). ok = emqtt:disconnect(ConnPub).
t_renew_lease_timeout(_Config) ->
ConnShared = emqtt_connect_sub(<<"client_shared">>),
?assertWaitEvent(
{ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr3/topic3/#">>, 1),
#{?snk_kind := leader_lease_streams},
5_000
),
?check_trace(
?wait_async_action(
ok = terminate_leaders(),
#{?snk_kind := leader_lease_streams},
5_000
),
fun(Trace) ->
?strict_causality(
#{?snk_kind := renew_lease_timeout},
#{?snk_kind := leader_lease_streams},
Trace
)
end
),
ok = emqtt:disconnect(ConnShared).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -109,7 +136,7 @@ init_streams(ConnPub, Topic) ->
{ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1), {ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1),
{ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1), {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1),
?assertReceive({publish, #{payload := <<"hello1">>}}, 5000), ?assertReceive({publish, #{payload := <<"hello1">>}}, 5_000),
ok = emqtt:disconnect(ConnRegular). ok = emqtt:disconnect(ConnRegular).
@ -118,7 +145,7 @@ emqtt_connect_sub(ClientId) ->
{client_id, ClientId}, {client_id, ClientId},
{clean_start, true}, {clean_start, true},
{proto_ver, v5}, {proto_ver, v5},
{properties, #{'Session-Expiry-Interval' => 7200}} {properties, #{'Session-Expiry-Interval' => 7_200}}
]), ]),
{ok, _} = emqtt:connect(C), {ok, _} = emqtt:connect(C),
C. C.