diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index b5e8c3f4c..434ea0f14 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -10,6 +10,7 @@ -module(emqx_ds_shared_sub_group_sm). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ new/1, @@ -37,15 +38,40 @@ -define(replaying, replaying). -define(updating, updating). +-type state() :: ?connecting | ?replaying | ?updating. + -type group_sm() :: #{ topic_filter => emqx_persistent_session_ds:share_topic_filter(), agent => emqx_ds_shared_sub_proto:agent(), send_after => fun((non_neg_integer(), term()) -> reference()), - state => ?connecting | ?replaying | ?updating, - state_data => map() + state => state(), + state_data => map(), + state_timers => map() }. +-record(state_timeout, { + id :: reference(), + name :: atom(), + message :: term() +}). +-record(timer, { + ref :: reference(), + id :: reference() +}). + +%%----------------------------------------------------------------------- +%% Constants +%%----------------------------------------------------------------------- + +%% TODO https://emqx.atlassian.net/browse/EMQX-12574 +%% Move to settings +-define(FIND_LEADER_TIMEOUT, 1000). + +%%----------------------------------------------------------------------- +%% API +%%----------------------------------------------------------------------- + -spec new(options()) -> group_sm(). new(#{ agent := Agent, @@ -60,17 +86,20 @@ new(#{ topic_filter => ShareTopicFilter } ), - ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter), - #{ + GSM0 = #{ topic_filter => ShareTopicFilter, agent => Agent, - send_after => SendAfter, + send_after => SendAfter + }, + GSM1 = transition(GSM0, ?connecting, #{}), + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, ShareTopicFilter), + GSM2 = ensure_state_timeout(GSM1, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader), + GSM2. - state => ?connecting, - state_data => #{} - }. - -handle_leader_lease_streams(#{state := ?connecting} = GSM, StreamProgresses, Version) -> +handle_leader_lease_streams( + #{state := ?connecting, topic_filter := TopicFilter} = GSM, StreamProgresses, Version +) -> + ?tp(debug, leader_lease_streams, #{topic_filter => TopicFilter}), Streams = lists:foldl( fun(#{stream := Stream, iterator := It}, Acc) -> Acc#{Stream => It} @@ -88,16 +117,17 @@ handle_leader_lease_streams(#{state := ?connecting} = GSM, StreamProgresses, Ver end, StreamProgresses ), - GSM#{ - state => ?replaying, - state_data => #{ + transition( + GSM, + ?replaying, + #{ streams => Streams, stream_lease_events => StreamLeaseEvents, prev_version => undefined, version => Version, last_update_time => erlang:monotonic_time(millisecond) } - }; + ); handle_leader_lease_streams(GSM, _StreamProgresses, _Version) -> GSM. @@ -110,8 +140,15 @@ handle_leader_renew_stream_lease( handle_leader_renew_stream_lease(GSM, _Version) -> GSM. -handle_info(GSM, _Info) -> - GSM. +handle_state_timeout( + #{agent := Agent, state := ?connecting, topic_filter := TopicFilter} = GSM0, + find_leader_timeout, + find_leader +) -> + ?tp(debug, find_leader_timeout, #{topic_filter => TopicFilter}), + ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, TopicFilter), + GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT, find_leader), + GSM1. fetch_stream_events( #{ @@ -135,9 +172,65 @@ fetch_stream_events( fetch_stream_events(GSM) -> {GSM, []}. +handle_info( + #{state_timers := Timers} = GSM, #state_timeout{message = Message, name = Name, id = Id} = Msg +) -> + case Timers of + #{Name := #timer{id = Id}} -> + handle_state_timeout(GSM, Name, Message); + _ -> + %% Stale timer + GSM + end; +handle_info(GSM, Msg) -> + GSM. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -% send_after(#{send_after := SendAfter} = _GSM, Delay, Message) -> -% SendAfter(Delay, Message). +transition(GSM0, NewState, NewStateData) -> + Timers = maps:get(state_timers, GSM0, #{}), + TimerNames = maps:keys(Timers), + GSM1 = lists:foldl( + fun(Name, Acc) -> + cancel_timer(Acc, Name) + end, + GSM0, + TimerNames + ), + GSM1#{ + state => NewState, + state_data => NewStateData, + state_timers => #{} + }. + +ensure_state_timeout(GSM0, Name, Delay, Message) -> + Id = make_ref(), + GSM1 = cancel_timer(GSM0, Name), + Timers = maps:get(state_timers, GSM1), + TimerMessage = #state_timeout{ + id = Id, + name = Name, + message = Message + }, + TimerRef = send_after(GSM1, Delay, TimerMessage), + GSM2 = GSM1#{ + state_timers := Timers#{Name => #timer{ref = TimerRef, id = Id}} + }, + GSM2. + +send_after(#{send_after := SendAfter} = _GSM, Delay, Message) -> + SendAfter(Delay, Message). + +cancel_timer(GSM, Name) -> + Timers = maps:get(state_timers, GSM, #{}), + case Timers of + #{Name := #timer{ref = TimerRef}} -> + _ = erlang:cancel_timer(TimerRef), + GSM#{ + state_timers := maps:remove(Name, Timers) + }; + _ -> + GSM + end. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 2081f7a49..5323595cf 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -260,13 +260,6 @@ connect_agent( streams => length(StreamLease), version => Version }), - % ct:print("connect_agent: ~p~n", [#{ - % msg => leader_lease_streams, - % agent => Agent, - % group => Group, - % streams => length(StreamLease), - % version => Version - % }]), ok = emqx_ds_shared_sub_proto:leader_lease_streams( Agent, Group, StreamLease, Version ), diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 4a43c3243..3500e0726 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -42,24 +42,21 @@ end_per_suite(Config) -> ok. init_per_testcase(_TC, Config) -> + ok = snabbkaffe:start_trace(), Config. end_per_testcase(_TC, _Config) -> + ok = snabbkaffe:stop(), + ok = terminate_leaders(), ok. t_lease_initial(_Config) -> - ConnRegular = emqtt_connect_sub(<<"client_regular">>), ConnPub = emqtt_connect_pub(<<"client_pub">>), %% Need to pre-reate some streams in "topic/#". %% Leader is dummy by far and won't update streams after the first lease to the agent. %% So there should be some streams already when the agent connects. - {ok, _, _} = emqtt:subscribe(ConnRegular, <<"topic/1">>, 1), - {ok, _} = emqtt:publish(ConnPub, <<"topic/1">>, <<"hello1">>, 1), - - ?assertReceive({publish, #{payload := <<"hello1">>}}, 5000), - - ok = emqtt:disconnect(ConnRegular), + ok = init_streams(ConnPub, <<"topic/1">>), ConnShared = emqtt_connect_sub(<<"client_shared">>), {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic/#">>, 1), @@ -70,10 +67,52 @@ t_lease_initial(_Config) -> ok = emqtt:disconnect(ConnShared), ok = emqtt:disconnect(ConnPub). +t_lease_reconnect(_Config) -> + ConnPub = emqtt_connect_pub(<<"client_pub">>), + + %% Need to pre-reate some streams in "topic/#". + %% Leader is dummy by far and won't update streams after the first lease to the agent. + %% So there should be some streams already when the agent connects. + ok = init_streams(ConnPub, <<"topic/2">>), + + ConnShared = emqtt_connect_sub(<<"client_shared">>), + + %% Stop registry to simulate unability to find leader. + ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry), + + ?assertWaitEvent( + {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr1/topic/#">>, 1), + #{?snk_kind := find_leader_timeout}, + 5000 + ), + + %% Start registry, agent should retry after some time and find the leader. + ?assertWaitEvent( + {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_registry), + #{?snk_kind := leader_lease_streams}, + 5000 + ), + + {ok, _} = emqtt:publish(ConnPub, <<"topic/2">>, <<"hello2">>, 1), + + ?assertReceive({publish, #{payload := <<"hello2">>}}, 10000), + + ok = emqtt:disconnect(ConnShared), + ok = emqtt:disconnect(ConnPub). + %%-------------------------------------------------------------------- %% Helper functions %%-------------------------------------------------------------------- +init_streams(ConnPub, Topic) -> + ConnRegular = emqtt_connect_sub(<<"client_regular">>), + {ok, _, _} = emqtt:subscribe(ConnRegular, Topic, 1), + {ok, _} = emqtt:publish(ConnPub, Topic, <<"hello1">>, 1), + + ?assertReceive({publish, #{payload := <<"hello1">>}}, 5000), + + ok = emqtt:disconnect(ConnRegular). + emqtt_connect_sub(ClientId) -> {ok, C} = emqtt:start_link([ {client_id, ClientId}, @@ -92,3 +131,8 @@ emqtt_connect_pub(ClientId) -> ]), {ok, _} = emqtt:connect(C), C. + +terminate_leaders() -> + ok = supervisor:terminate_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup), + {ok, _} = supervisor:restart_child(emqx_ds_shared_sub_sup, emqx_ds_shared_sub_leader_sup), + ok.