From db28a042d5454decbea1a1d4b13f82ab4ad16209 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 21 Jun 2024 14:38:41 +0300 Subject: [PATCH] feat(queue): handle renew_lease_timeout --- .../src/emqx_ds_shared_sub_group_sm.erl | 60 +++++++++++++------ .../test/emqx_ds_shared_sub_SUITE.erl | 55 ++++++++++++----- 2 files changed, 84 insertions(+), 31 deletions(-) diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index d1f78bf68..c6bdf9d93 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -67,6 +67,7 @@ %% TODO https://emqx.atlassian.net/browse/EMQX-12574 %% Move to settings -define(FIND_LEADER_TIMEOUT, 1000). +-define(RENEW_LEASE_TIMEOUT, 2000). %%----------------------------------------------------------------------- %% API @@ -91,10 +92,7 @@ new(#{ agent => Agent, send_after => SendAfter }, - GSM1 = transition(GSM0, ?connecting, #{}), - ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter), - GSM2 = ensure_state_timeout(GSM1, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader), - GSM2. + transition(GSM0, ?connecting, #{}). fetch_stream_events( #{ @@ -125,8 +123,12 @@ fetch_stream_events(GSM) -> %%----------------------------------------------------------------------- %% Connecting state +handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) -> + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter), + ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT). + handle_leader_lease_streams( - #{state := ?connecting, topic_filter := TopicFilter} = GSM, StreamProgresses, Version + #{state := ?connecting, topic_filter := TopicFilter} = GSM0, StreamProgresses, Version ) -> ?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}), Streams = lists:foldl( @@ -147,14 +149,13 @@ handle_leader_lease_streams( StreamProgresses ), transition( - GSM, + GSM0, ?replaying, #{ streams => Streams, stream_lease_events => StreamLeaseEvents, prev_version => undefined, - version => Version, - last_update_time => erlang:monotonic_time(millisecond) + version => Version } ); handle_leader_lease_streams(GSM, _StreamProgresses, _Version) -> @@ -162,24 +163,32 @@ handle_leader_lease_streams(GSM, _StreamProgresses, _Version) -> handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0) -> ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter), - GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader), + GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT), GSM1. %%----------------------------------------------------------------------- %% Replaying state +handle_replaying(GSM) -> + ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT). + handle_leader_renew_stream_lease( - #{state := ?replaying, state_data := #{version := Version} = Data} = GSM, Version + #{state := ?replaying, state_data := #{version := Version}} = GSM, Version ) -> - GSM#{ - state_data => Data#{last_update_time => erlang:monotonic_time(millisecond)} - }; + ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); handle_leader_renew_stream_lease(GSM, _Version) -> GSM. +handle_renew_lease_timeout(GSM) -> + ?tp(debug, renew_lease_timeout, #{}), + transition(GSM, ?connecting, #{}). + %%----------------------------------------------------------------------- %% Updating state +% handle_updating(GSM) -> +% GSM. + %%----------------------------------------------------------------------- %% Internal API %%----------------------------------------------------------------------- @@ -187,10 +196,16 @@ handle_leader_renew_stream_lease(GSM, _Version) -> handle_state_timeout( #{state := ?connecting, topic_filter := TopicFilter} = GSM, find_leader_timeout, - find_leader + _Message ) -> ?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}), - handle_find_leader_timeout(GSM). + handle_find_leader_timeout(GSM); +handle_state_timeout( + #{state := ?replaying} = GSM, + renew_lease_timeout, + _Message +) -> + handle_renew_lease_timeout(GSM). handle_info( #{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = _Info @@ -219,11 +234,15 @@ transition(GSM0, NewState, NewStateData) -> GSM0, TimerNames ), - GSM1#{ + GSM2 = GSM1#{ state => NewState, state_data => NewStateData, state_timers => #{} - }. + }, + run_enter_callback(GSM2). + +ensure_state_timeout(GSM0, Name, Delay) -> + ensure_state_timeout(GSM0, Name, Delay, Name). ensure_state_timeout(GSM0, Name, Delay, Message) -> Id = make_ref(), @@ -254,3 +273,10 @@ cancel_timer(GSM, Name) -> _ -> GSM end. + +run_enter_callback(#{state := ?connecting} = GSM) -> + handle_connecting(GSM); +run_enter_callback(#{state := ?replaying} = GSM) -> + handle_replaying(GSM). +% run_enter_callback(#{state := ?updating} = GSM) -> +% handle_updating(GSM). diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 3500e0726..bca8eb0eb 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -53,16 +53,16 @@ end_per_testcase(_TC, _Config) -> t_lease_initial(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>), - %% Need to pre-reate some streams in "topic/#". + %% Need to pre-create some streams in "topic/#". %% Leader is dummy by far and won't update streams after the first lease to the agent. %% So there should be some streams already when the agent connects. - ok = init_streams(ConnPub, <<"topic/1">>), + ok = init_streams(ConnPub, <<"topic1/1">>), ConnShared = emqtt_connect_sub(<<"client_shared">>), - {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic/#">>, 1), + {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic1/#">>, 1), - {ok, _} = emqtt:publish(ConnPub, <<"topic/1">>, <<"hello2">>, 1), - ?assertReceive({publish, #{payload := <<"hello2">>}}, 10000), + {ok, _} = emqtt:publish(ConnPub, <<"topic1/1">>, <<"hello2">>, 1), + ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000), ok = emqtt:disconnect(ConnShared), ok = emqtt:disconnect(ConnPub). @@ -70,10 +70,10 @@ t_lease_initial(_Config) -> t_lease_reconnect(_Config) -> ConnPub = emqtt_connect_pub(<<"client_pub">>), - %% Need to pre-reate some streams in "topic/#". + %% Need to pre-create some streams in "topic/#". %% Leader is dummy by far and won't update streams after the first lease to the agent. %% So there should be some streams already when the agent connects. - ok = init_streams(ConnPub, <<"topic/2">>), + ok = init_streams(ConnPub, <<"topic2/2">>), ConnShared = emqtt_connect_sub(<<"client_shared">>), @@ -81,25 +81,52 @@ t_lease_reconnect(_Config) -> ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry), ?assertWaitEvent( - {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic/#">>, 1), + {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1), #{?snk_kind := find_leader_timeout}, - 5000 + 5_000 ), %% Start registry, agent should retry after some time and find the leader. ?assertWaitEvent( {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry), #{?snk_kind := leader_lease_streams}, - 5000 + 5_000 ), - {ok, _} = emqtt:publish(ConnPub, <<"topic/2">>, <<"hello2">>, 1), + ct:sleep(1_000), + {ok, _} = emqtt:publish(ConnPub, <<"topic2/2">>, <<"hello2">>, 1), - ?assertReceive({publish, #{payload := <<"hello2">>}}, 10000), + ?assertReceive({publish, #{payload := <<"hello2">>}}, 10_000), ok = emqtt:disconnect(ConnShared), ok = emqtt:disconnect(ConnPub). +t_renew_lease_timeout(_Config) -> + ConnShared = emqtt_connect_sub(<<"client_shared">>), + + ?assertWaitEvent( + {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr3/topic3/#">>, 1), + #{?snk_kind := leader_lease_streams}, + 5_000 + ), + + ?check_trace( + ?wait_async_action( + ok = terminate_leaders(), + #{?snk_kind := leader_lease_streams}, + 5_000 + ), + fun(Trace) -> + ?strict_causality( + #{?snk_kind := renew_lease_timeout}, + #{?snk_kind := leader_lease_streams}, + Trace + ) + end + ), + + ok = emqtt:disconnect(ConnShared). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- @@ -109,7 +136,7 @@ init_streams(ConnPub, Topic) -> {ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1), {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1), - ?assertReceive({publish, #{payload := <<"hello1">>}}, 5000), + ?assertReceive({publish, #{payload := <<"hello1">>}}, 5_000), ok = emqtt:disconnect(ConnRegular). @@ -118,7 +145,7 @@ emqtt_connect_sub(ClientId) -> {client_id, ClientId}, {clean_start, true}, {proto_ver, v5}, - {properties, #{'Session-Expiry-Interval' => 7200}} + {properties, #{'Session-Expiry-Interval' => 7_200}} ]), {ok, _} = emqtt:connect(C), C.