From e17becb84d8d440171bfe07092b4ebf60d9a4294 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 2 Aug 2024 12:41:30 +0300 Subject: [PATCH 1/2] feat(queue): compact protocol structures, organize formatting --- ...emqx_persistent_session_ds_shared_subs.erl | 5 + .../src/emqx_ds_shared_sub_group_sm.erl | 8 +- .../src/emqx_ds_shared_sub_proto.erl | 172 ++++------------- .../src/emqx_ds_shared_sub_proto.hrl | 179 ++++++++++-------- .../src/emqx_ds_shared_sub_proto_format.erl | 82 ++++++++ 5 files changed, 225 insertions(+), 221 deletions(-) create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 5b54c6f73..10d41e91e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -56,6 +56,11 @@ cold_get_subscription/2 ]). +-export([ + format_lease_events/1, + format_stream_progresses/1 +]). + -define(schedule_subscribe, schedule_subscribe). -define(schedule_unsubscribe, schedule_unsubscribe). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index a648bbaef..4cae98f65 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -268,7 +268,9 @@ handle_leader_update_streams( id => Id, version_old => VersionOld, version_new => VersionNew, - stream_progresses => emqx_ds_shared_sub_proto:format_stream_progresses(StreamProgresses) + stream_progresses => emqx_persistent_session_ds_shared_subs:format_stream_progresses( + StreamProgresses + ) }), {AddEvents, Streams1} = lists:foldl( fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) -> @@ -305,7 +307,9 @@ handle_leader_update_streams( StreamLeaseEvents = AddEvents ++ RevokeEvents, ?tp(warning, shared_sub_group_sm_leader_update_streams, #{ id => Id, - stream_lease_events => emqx_ds_shared_sub_proto:format_lease_events(StreamLeaseEvents) + stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events( + StreamLeaseEvents + ) }), transition( GSM, diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index 383f66ff2..93278a5d5 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -22,12 +22,6 @@ ]). -export([ - format_stream_progresses/1, - format_stream_progress/1, - format_stream_key/1, - format_stream_keys/1, - format_lease_event/1, - format_lease_events/1, agent/2 ]). @@ -57,6 +51,20 @@ agent_metadata/0 ]). +-define(log_agent_msg(ToLeader, Msg), + ?tp(debug, shared_sub_proto_msg, #{ + to_leader => ToLeader, + msg => emqx_ds_shared_sub_proto_format:format_agent_msg(Msg) + }) +). + +-define(log_leader_msg(ToAgent, Msg), + ?tp(debug, shared_sub_proto_msg, #{ + to_agent => ToAgent, + msg => emqx_ds_shared_sub_proto_format:format_leader_msg(Msg) + }) +). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -67,15 +75,7 @@ agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) when ?is_local_leader(ToLeader) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => agent_connect_leader, - to_leader => ToLeader, - from_agent => FromAgent, - agent_metadata => AgentMetadata, - share_topic_filter => ShareTopicFilter - }), - _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)), - ok; + send_agent_msg(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)); agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) -> emqx_ds_shared_sub_proto_v1:agent_connect_leader( ?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, ShareTopicFilter @@ -85,15 +85,7 @@ agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) -> agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when ?is_local_leader(ToLeader) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => agent_update_stream_states, - to_leader => ToLeader, - from_agent => FromAgent, - stream_progresses => format_stream_progresses(StreamProgresses), - version => Version - }), - _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), - ok; + send_agent_msg(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)); agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> emqx_ds_shared_sub_proto_v1:agent_update_stream_states( ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version @@ -105,18 +97,9 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) when ?is_local_leader(ToLeader) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => agent_update_stream_states, - to_leader => ToLeader, - from_agent => FromAgent, - stream_progresses => format_stream_progresses(StreamProgresses), - version_old => VersionOld, - version_new => VersionNew - }), - _ = erlang:send( + send_agent_msg( ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew) - ), - ok; + ); agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) -> emqx_ds_shared_sub_proto_v1:agent_update_stream_states( ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew @@ -125,15 +108,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when ?is_local_leader(ToLeader) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => agent_disconnect, - to_leader => ToLeader, - from_agent => FromAgent, - stream_progresses => format_stream_progresses(StreamProgresses), - version => Version - }), - _ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)), - ok; + send_agent_msg(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)); agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) -> emqx_ds_shared_sub_proto_v1:agent_disconnect( ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version @@ -144,19 +119,7 @@ agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) -> -spec leader_lease_streams(agent(), group(), leader(), list(leader_stream_progress()), version()) -> ok. leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_lease_streams, - to_agent => ToAgent, - of_group => OfGroup, - leader => Leader, - streams => format_stream_progresses(Streams), - version => Version - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_lease_streams(OfGroup, Leader, Streams, Version) - ), - ok; + send_leader_msg(ToAgent, ?leader_lease_streams(OfGroup, Leader, Streams, Version)); leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> emqx_ds_shared_sub_proto_v1:leader_lease_streams( ?agent_node(ToAgent), ToAgent, OfGroup, Leader, Streams, Version @@ -164,17 +127,7 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> -spec leader_renew_stream_lease(agent(), group(), version()) -> ok. leader_renew_stream_lease(ToAgent, OfGroup, Version) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_renew_stream_lease, - to_agent => ToAgent, - of_group => OfGroup, - version => Version - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_renew_stream_lease(OfGroup, Version) - ), - ok; + send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, Version)); leader_renew_stream_lease(ToAgent, OfGroup, Version) -> emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease( ?agent_node(ToAgent), ToAgent, OfGroup, Version @@ -182,18 +135,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) -> -spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok. leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_renew_stream_lease, - to_agent => ToAgent, - of_group => OfGroup, - version_old => VersionOld, - version_new => VersionNew - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew) - ), - ok; + send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)); leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease( ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew @@ -204,19 +146,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_update_streams, - to_agent => ToAgent, - of_group => OfGroup, - version_old => VersionOld, - version_new => VersionNew, - streams_new => format_stream_progresses(StreamsNew) - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew) - ), - ok; + send_leader_msg(ToAgent, ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)); leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> emqx_ds_shared_sub_proto_v1:leader_update_streams( ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew @@ -224,16 +154,7 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> -spec leader_invalidate(agent(), group()) -> ok. leader_invalidate(ToAgent, OfGroup) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_invalidate, - to_agent => ToAgent, - of_group => OfGroup - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_invalidate(OfGroup) - ), - ok; + send_leader_msg(ToAgent, ?leader_invalidate(OfGroup)); leader_invalidate(ToAgent, OfGroup) -> emqx_ds_shared_sub_proto_v1:leader_invalidate( ?agent_node(ToAgent), ToAgent, OfGroup @@ -247,41 +168,12 @@ agent(Id, Pid) -> _ = Id, ?agent(Id, Pid). -format_stream_progresses(Streams) -> - lists:map( - fun format_stream_progress/1, - Streams - ). +send_agent_msg(ToLeader, Msg) -> + ?log_agent_msg(ToLeader, Msg), + _ = erlang:send(ToLeader, Msg), + ok. -format_stream_progress(#{stream := Stream, progress := Progress} = Value) -> - Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}. - -format_progress(#{iterator := Iterator} = Progress) -> - Progress#{iterator => format_opaque(Iterator)}. - -format_stream_key({SubId, Stream}) -> - {SubId, format_opaque(Stream)}. - -format_stream_keys(StreamKeys) -> - lists:map( - fun format_stream_key/1, - StreamKeys - ). - -format_lease_events(Events) -> - lists:map( - fun format_lease_event/1, - Events - ). - -format_lease_event(#{stream := Stream, progress := Progress} = Event) -> - Event#{stream => format_opaque(Stream), progress => format_progress(Progress)}; -format_lease_event(#{stream := Stream} = Event) -> - Event#{stream => format_opaque(Stream)}. - -%%-------------------------------------------------------------------- -%% Helpers -%%-------------------------------------------------------------------- - -format_opaque(Opaque) -> - erlang:phash2(Opaque). +send_leader_msg(ToAgent, Msg) -> + ?log_leader_msg(ToAgent, Msg), + _ = emqx_persistent_session_ds_shared_subs_agent:send(?agent_pid(ToAgent), Msg), + ok. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl index bf54b2930..89e0d2c84 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl @@ -12,146 +12,167 @@ %% agent messages, sent from agent side to the leader --define(agent_connect_leader_msg, agent_connect_leader). --define(agent_update_stream_states_msg, agent_update_stream_states). --define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout). --define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout). --define(agent_disconnect_msg, agent_disconnect). +-define(agent_connect_leader_msg, 1). +-define(agent_update_stream_states_msg, 2). +-define(agent_connect_leader_timeout_msg, 3). +-define(agent_renew_stream_lease_timeout_msg, 4). +-define(agent_disconnect_msg, 5). + +%% message keys (used used not to send atoms over the network) +-define(agent_msg_type, 1). +-define(agent_msg_agent, 2). +-define(agent_msg_share_topic_filter, 3). +-define(agent_msg_agent_metadata, 4). +-define(agent_msg_stream_states, 5). +-define(agent_msg_version, 6). +-define(agent_msg_version_old, 7). +-define(agent_msg_version_new, 8). %% Agent messages sent to the leader. %% Leader talks to many agents, `agent` field is used to identify the sender. -define(agent_connect_leader(Agent, AgentMetadata, ShareTopicFilter), #{ - type => ?agent_connect_leader_msg, - share_topic_filter => ShareTopicFilter, - agent_metadata => AgentMetadata, - agent => Agent + ?agent_msg_type => ?agent_connect_leader_msg, + ?agent_msg_share_topic_filter => ShareTopicFilter, + ?agent_msg_agent_metadata => AgentMetadata, + ?agent_msg_agent => Agent }). -define(agent_connect_leader_match(Agent, AgentMetadata, ShareTopicFilter), #{ - type := ?agent_connect_leader_msg, - share_topic_filter := ShareTopicFilter, - agent_metadata := AgentMetadata, - agent := Agent + ?agent_msg_type := ?agent_connect_leader_msg, + ?agent_msg_share_topic_filter := ShareTopicFilter, + ?agent_msg_agent_metadata := AgentMetadata, + ?agent_msg_agent := Agent }). -define(agent_update_stream_states(Agent, StreamStates, Version), #{ - type => ?agent_update_stream_states_msg, - stream_states => StreamStates, - version => Version, - agent => Agent + ?agent_msg_type => ?agent_update_stream_states_msg, + ?agent_msg_stream_states => StreamStates, + ?agent_msg_version => Version, + ?agent_msg_agent => Agent }). -define(agent_update_stream_states_match(Agent, StreamStates, Version), #{ - type := ?agent_update_stream_states_msg, - stream_states := StreamStates, - version := Version, - agent := Agent + ?agent_msg_type := ?agent_update_stream_states_msg, + ?agent_msg_stream_states := StreamStates, + ?agent_msg_version := Version, + ?agent_msg_agent := Agent }). -define(agent_update_stream_states(Agent, StreamStates, VersionOld, VersionNew), #{ - type => ?agent_update_stream_states_msg, - stream_states => StreamStates, - version_old => VersionOld, - version_new => VersionNew, - agent => Agent + ?agent_msg_type => ?agent_update_stream_states_msg, + ?agent_msg_stream_states => StreamStates, + ?agent_msg_version_old => VersionOld, + ?agent_msg_version_new => VersionNew, + ?agent_msg_agent => Agent }). -define(agent_update_stream_states_match(Agent, StreamStates, VersionOld, VersionNew), #{ - type := ?agent_update_stream_states_msg, - stream_states := StreamStates, - version_old := VersionOld, - version_new := VersionNew, - agent := Agent + ?agent_msg_type := ?agent_update_stream_states_msg, + ?agent_msg_stream_states := StreamStates, + ?agent_msg_version_old := VersionOld, + ?agent_msg_version_new := VersionNew, + ?agent_msg_agent := Agent }). -define(agent_disconnect(Agent, StreamStates, Version), #{ - type => ?agent_disconnect_msg, - stream_states => StreamStates, - version => Version, - agent => Agent + ?agent_msg_type => ?agent_disconnect_msg, + ?agent_msg_stream_states => StreamStates, + ?agent_msg_version => Version, + ?agent_msg_agent => Agent }). -define(agent_disconnect_match(Agent, StreamStates, Version), #{ - type := ?agent_disconnect_msg, - stream_states := StreamStates, - version := Version, - agent := Agent + ?agent_msg_type := ?agent_disconnect_msg, + ?agent_msg_stream_states := StreamStates, + ?agent_msg_version := Version, + ?agent_msg_agent := Agent }). %% leader messages, sent from the leader to the agent %% Agent may have several shared subscriptions, so may talk to several leaders %% `group_id` field is used to identify the leader. --define(leader_lease_streams_msg, leader_lease_streams). --define(leader_renew_stream_lease_msg, leader_renew_stream_lease). +-define(leader_lease_streams_msg, 101). +-define(leader_renew_stream_lease_msg, 102). +-define(leader_update_streams, 103). +-define(leader_invalidate, 104). + +-define(leader_msg_type, 101). +-define(leader_msg_streams, 102). +-define(leader_msg_version, 103). +-define(leader_msg_version_old, 104). +-define(leader_msg_version_new, 105). +-define(leader_msg_streams_new, 106). +-define(leader_msg_leader, 107). +-define(leader_msg_group_id, 108). -define(leader_lease_streams(GrouId, Leader, Streams, Version), #{ - type => ?leader_lease_streams_msg, - streams => Streams, - version => Version, - leader => Leader, - group_id => GrouId + ?leader_msg_type => ?leader_lease_streams_msg, + ?leader_msg_streams => Streams, + ?leader_msg_version => Version, + ?leader_msg_leader => Leader, + ?leader_msg_group_id => GrouId }). -define(leader_lease_streams_match(GroupId, Leader, Streams, Version), #{ - type := ?leader_lease_streams_msg, - streams := Streams, - version := Version, - leader := Leader, - group_id := GroupId + ?leader_msg_type := ?leader_lease_streams_msg, + ?leader_msg_streams := Streams, + ?leader_msg_version := Version, + ?leader_msg_leader := Leader, + ?leader_msg_group_id := GroupId }). -define(leader_renew_stream_lease(GroupId, Version), #{ - type => ?leader_renew_stream_lease_msg, - version => Version, - group_id => GroupId + ?leader_msg_type => ?leader_renew_stream_lease_msg, + ?leader_msg_version => Version, + ?leader_msg_group_id => GroupId }). -define(leader_renew_stream_lease_match(GroupId, Version), #{ - type := ?leader_renew_stream_lease_msg, - version := Version, - group_id := GroupId + ?leader_msg_type := ?leader_renew_stream_lease_msg, + ?leader_msg_version := Version, + ?leader_msg_group_id := GroupId }). -define(leader_renew_stream_lease(GroupId, VersionOld, VersionNew), #{ - type => ?leader_renew_stream_lease_msg, - version_old => VersionOld, - version_new => VersionNew, - group_id => GroupId + ?leader_msg_type => ?leader_renew_stream_lease_msg, + ?leader_msg_version_old => VersionOld, + ?leader_msg_version_new => VersionNew, + ?leader_msg_group_id => GroupId }). -define(leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew), #{ - type := ?leader_renew_stream_lease_msg, - version_old := VersionOld, - version_new := VersionNew, - group_id := GroupId + ?leader_msg_type := ?leader_renew_stream_lease_msg, + ?leader_msg_version_old := VersionOld, + ?leader_msg_version_new := VersionNew, + ?leader_msg_group_id := GroupId }). -define(leader_update_streams(GroupId, VersionOld, VersionNew, StreamsNew), #{ - type => leader_update_streams, - version_old => VersionOld, - version_new => VersionNew, - streams_new => StreamsNew, - group_id => GroupId + ?leader_msg_type => ?leader_update_streams, + ?leader_msg_version_old => VersionOld, + ?leader_msg_version_new => VersionNew, + ?leader_msg_streams_new => StreamsNew, + ?leader_msg_group_id => GroupId }). -define(leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew), #{ - type := leader_update_streams, - version_old := VersionOld, - version_new := VersionNew, - streams_new := StreamsNew, - group_id := GroupId + ?leader_msg_type := ?leader_update_streams, + ?leader_msg_version_old := VersionOld, + ?leader_msg_version_new := VersionNew, + ?leader_msg_streams_new := StreamsNew, + ?leader_msg_group_id := GroupId }). -define(leader_invalidate(GroupId), #{ - type => leader_invalidate, - group_id => GroupId + ?leader_msg_type => ?leader_invalidate, + ?leader_msg_group_id => GroupId }). -define(leader_invalidate_match(GroupId), #{ - type := leader_invalidate, - group_id := GroupId + ?leader_msg_type := ?leader_invalidate, + ?leader_msg_group_id := GroupId }). %% Helpers diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl new file mode 100644 index 000000000..4fa01e7a8 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl @@ -0,0 +1,82 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_proto_format). + +-include("emqx_ds_shared_sub_proto.hrl"). + +-export([format_agent_msg/1, format_leader_msg/1]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +format_agent_msg(Msg) -> + maps:from_list( + lists:map( + fun({K, V}) -> + FormattedKey = agent_msg_key(K), + {FormattedKey, format_agent_msg_value(FormattedKey, V)} + end, + maps:to_list(Msg) + ) + ). + +format_leader_msg(Msg) -> + maps:from_list( + lists:map( + fun({K, V}) -> + FormattedKey = leader_msg_key(K), + {FormattedKey, format_leader_msg_value(FormattedKey, V)} + end, + maps:to_list(Msg) + ) + ). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +format_agent_msg_value(agent_msg_type, Type) -> + agent_msg_type(Type); +format_agent_msg_value(agent_msg_stream_states, StreamStates) -> + emqx_persistent_session_ds_shared_subs:format_stream_progresses(StreamStates); +format_agent_msg_value(_, Value) -> + Value. + +format_leader_msg_value(leader_msg_type, Type) -> + leader_msg_type(Type); +format_leader_msg_value(leader_msg_streams, Streams) -> + emqx_persistent_session_ds_shared_subs:format_lease_events(Streams); +format_leader_msg_value(_, Value) -> + Value. + +agent_msg_type(?agent_connect_leader_msg) -> agent_connect_leader_msg; +agent_msg_type(?agent_update_stream_states_msg) -> agent_update_stream_states_msg; +agent_msg_type(?agent_connect_leader_timeout_msg) -> agent_connect_leader_timeout_msg; +agent_msg_type(?agent_renew_stream_lease_timeout_msg) -> agent_renew_stream_lease_timeout_msg; +agent_msg_type(?agent_disconnect_msg) -> agent_disconnect_msg. + +agent_msg_key(?agent_msg_type) -> agent_msg_type; +agent_msg_key(?agent_msg_agent) -> agent_msg_agent; +agent_msg_key(?agent_msg_share_topic_filter) -> agent_msg_share_topic_filter; +agent_msg_key(?agent_msg_agent_metadata) -> agent_msg_agent_metadata; +agent_msg_key(?agent_msg_stream_states) -> agent_msg_stream_states; +agent_msg_key(?agent_msg_version) -> agent_msg_version; +agent_msg_key(?agent_msg_version_old) -> agent_msg_version_old; +agent_msg_key(?agent_msg_version_new) -> agent_msg_version_new. + +leader_msg_type(?leader_lease_streams_msg) -> leader_lease_streams_msg; +leader_msg_type(?leader_renew_stream_lease_msg) -> leader_renew_stream_lease_msg; +leader_msg_type(?leader_update_streams) -> leader_update_streams; +leader_msg_type(?leader_invalidate) -> leader_invalidate. + +leader_msg_key(?leader_msg_type) -> leader_msg_type; +leader_msg_key(?leader_msg_streams) -> leader_msg_streams; +leader_msg_key(?leader_msg_version) -> leader_msg_version; +leader_msg_key(?leader_msg_version_old) -> leader_msg_version_old; +leader_msg_key(?leader_msg_version_new) -> leader_msg_version_new; +leader_msg_key(?leader_msg_streams_new) -> leader_msg_streams_new; +leader_msg_key(?leader_msg_leader) -> leader_msg_leader; +leader_msg_key(?leader_msg_group_id) -> leader_msg_group_id. From 9ad65c6ac18bbc14bed845c9c6234e838c5285cd Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 2 Aug 2024 13:53:31 +0300 Subject: [PATCH 2/2] feat(queue): reduce logging levels --- ...emqx_persistent_session_ds_shared_subs.erl | 14 ++++---- .../src/emqx_ds_shared_sub_agent.erl | 16 ++++----- .../src/emqx_ds_shared_sub_group_sm.erl | 29 +++++---------- .../src/emqx_ds_shared_sub_leader.erl | 36 +++++++++---------- ...mqx_ds_shared_sub_leader_rank_progress.erl | 2 +- .../src/emqx_ds_shared_sub_registry.erl | 2 +- .../test/emqx_ds_shared_sub_SUITE.erl | 2 +- 7 files changed, 44 insertions(+), 57 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 10d41e91e..76f9e0a54 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -241,14 +241,14 @@ schedule_subscribe( ScheduledActions1 = ScheduledActions0#{ ShareTopicFilter => ScheduledAction#{type => {?schedule_subscribe, SubOpts}} }, - ?tp(warning, shared_subs_schedule_subscribe_override, #{ + ?tp(debug, shared_subs_schedule_subscribe_override, #{ share_topic_filter => ShareTopicFilter, new_type => {?schedule_subscribe, SubOpts}, old_action => format_schedule_action(ScheduledAction) }), SharedSubS0#{scheduled_actions := ScheduledActions1}; _ -> - ?tp(warning, shared_subs_schedule_subscribe_new, #{ + ?tp(debug, shared_subs_schedule_subscribe_new, #{ share_topic_filter => ShareTopicFilter, subopts => SubOpts }), Agent1 = emqx_persistent_session_ds_shared_subs_agent:on_subscribe( @@ -299,7 +299,7 @@ schedule_unsubscribe( ScheduledActions1 = ScheduledActions0#{ ShareTopicFilter => ScheduledAction1 }, - ?tp(warning, shared_subs_schedule_unsubscribe_override, #{ + ?tp(debug, shared_subs_schedule_unsubscribe_override, #{ share_topic_filter => ShareTopicFilter, new_type => ?schedule_unsubscribe, old_action => format_schedule_action(ScheduledAction0) @@ -314,7 +314,7 @@ schedule_unsubscribe( progresses => [] } }, - ?tp(warning, shared_subs_schedule_unsubscribe_new, #{ + ?tp(debug, shared_subs_schedule_unsubscribe_new, #{ share_topic_filter => ShareTopicFilter, stream_keys => format_stream_keys(StreamKeys) }), @@ -339,7 +339,7 @@ renew_streams(S0, #{agent := Agent0, scheduled_actions := ScheduledActions} = Sh Agent0 ), StreamLeaseEvents =/= [] andalso - ?tp(warning, shared_subs_new_stream_lease_events, #{ + ?tp(debug, shared_subs_new_stream_lease_events, #{ stream_lease_events => format_lease_events(StreamLeaseEvents) }), S1 = lists:foldl( @@ -506,7 +506,7 @@ run_scheduled_action( Progresses1 = stream_progresses(S, StreamKeysToWait0 -- StreamKeysToWait1) ++ Progresses0, case StreamKeysToWait1 of [] -> - ?tp(warning, shared_subs_schedule_action_complete, #{ + ?tp(debug, shared_subs_schedule_action_complete, #{ share_topic_filter => ShareTopicFilter, progresses => format_stream_progresses(Progresses1), type => Type @@ -530,7 +530,7 @@ run_scheduled_action( end; _ -> Action1 = Action#{stream_keys_to_wait => StreamKeysToWait1, progresses => Progresses1}, - ?tp(warning, shared_subs_schedule_action_continue, #{ + ?tp(debug, shared_subs_schedule_action_continue, #{ share_topic_filter => ShareTopicFilter, new_action => format_schedule_action(Action1) }), diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl index a90f1286d..1504ea697 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_agent.erl @@ -100,7 +100,7 @@ open(TopicSubscriptions, Opts) -> State0 = init_state(Opts), State1 = lists:foldl( fun({ShareTopicFilter, #{}}, State) -> - ?tp(warning, ds_agent_open_subscription, #{ + ?tp(debug, ds_agent_open_subscription, #{ topic_filter => ShareTopicFilter }), add_shared_subscription(State, ShareTopicFilter) @@ -120,7 +120,7 @@ can_subscribe(_State, _ShareTopicFilter, _SubOpts) -> -spec on_subscribe(t(), share_topic_filter(), emqx_types:subopts()) -> t(). on_subscribe(State0, ShareTopicFilter, _SubOpts) -> - ?tp(warning, ds_agent_on_subscribe, #{ + ?tp(debug, ds_agent_on_subscribe, #{ share_topic_filter => ShareTopicFilter }), add_shared_subscription(State0, ShareTopicFilter). @@ -163,7 +163,7 @@ on_disconnect(#{groups := Groups0} = State, StreamProgresses) -> -spec on_info(t(), term()) -> t(). on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Version)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_lease_streams, group_id => GroupId, streams => StreamProgresses, @@ -176,7 +176,7 @@ on_info(State, ?leader_lease_streams_match(GroupId, Leader, StreamProgresses, Ve ) end); on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_renew_stream_lease, group_id => GroupId, version => Version @@ -185,7 +185,7 @@ on_info(State, ?leader_renew_stream_lease_match(GroupId, Version)) -> emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, Version) end); on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_renew_stream_lease, group_id => GroupId, version_old => VersionOld, @@ -195,7 +195,7 @@ on_info(State, ?leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew) emqx_ds_shared_sub_group_sm:handle_leader_renew_stream_lease(GSM, VersionOld, VersionNew) end); on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_update_streams, group_id => GroupId, version_old => VersionOld, @@ -208,7 +208,7 @@ on_info(State, ?leader_update_streams_match(GroupId, VersionOld, VersionNew, Str ) end); on_info(State, ?leader_invalidate_match(GroupId)) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_invalidate, group_id => GroupId }), @@ -245,7 +245,7 @@ delete_shared_subscription(State, ShareTopicFilter, GroupProgress) -> add_shared_subscription( #{session_id := SessionId, groups := Groups0} = State0, ShareTopicFilter ) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => agent_add_shared_subscription, share_topic_filter => ShareTopicFilter }), diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index 4cae98f65..ec55cf359 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -120,7 +120,7 @@ new(#{ send_after := SendAfter }) -> ?SLOG( - info, + debug, #{ msg => group_sm_new, agent => Agent, @@ -133,7 +133,7 @@ new(#{ agent => Agent, send_after => SendAfter }, - ?tp(warning, group_sm_new, #{ + ?tp(debug, group_sm_new, #{ agent => Agent, share_topic_filter => ShareTopicFilter }), @@ -176,7 +176,7 @@ handle_disconnect( %% Connecting state handle_connecting(#{agent := Agent, share_topic_filter := ShareTopicFilter} = GSM) -> - ?tp(warning, group_sm_enter_connecting, #{ + ?tp(debug, group_sm_enter_connecting, #{ agent => Agent, share_topic_filter => ShareTopicFilter }), @@ -264,7 +264,7 @@ handle_leader_update_streams( VersionNew, StreamProgresses ) -> - ?tp(warning, shared_sub_group_sm_leader_update_streams, #{ + ?tp(debug, shared_sub_group_sm_leader_update_streams, #{ id => Id, version_old => VersionOld, version_new => VersionNew, @@ -305,7 +305,7 @@ handle_leader_update_streams( maps:keys(Streams1) ), StreamLeaseEvents = AddEvents ++ RevokeEvents, - ?tp(warning, shared_sub_group_sm_leader_update_streams, #{ + ?tp(debug, shared_sub_group_sm_leader_update_streams, #{ id => Id, stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events( StreamLeaseEvents @@ -435,24 +435,11 @@ handle_leader_invalidate(#{agent := Agent, share_topic_filter := ShareTopicFilte %% Internal API %%----------------------------------------------------------------------- -handle_state_timeout( - #{state := ?connecting, share_topic_filter := ShareTopicFilter} = GSM, - find_leader_timeout, - _Message -) -> - ?tp(debug, find_leader_timeout, #{share_topic_filter => ShareTopicFilter}), +handle_state_timeout(#{state := ?connecting} = GSM, find_leader_timeout, _Message) -> handle_find_leader_timeout(GSM); -handle_state_timeout( - #{state := ?replaying} = GSM, - renew_lease_timeout, - _Message -) -> +handle_state_timeout(#{state := ?replaying} = GSM, renew_lease_timeout, _Message) -> handle_renew_lease_timeout(GSM); -handle_state_timeout( - GSM, - update_stream_state_timeout, - _Message -) -> +handle_state_timeout(GSM, update_stream_state_timeout, _Message) -> ?tp(debug, update_stream_state_timeout, #{}), handle_stream_progress(GSM, []). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index 912253205..877e59c2a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -164,7 +164,7 @@ handle_event({call, From}, #register{register_fun = Fun}, ?leader_waiting_regist %%-------------------------------------------------------------------- %% repalying state handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) -> - ?tp(warning, shared_sub_leader_enter_actve, #{topic => Topic}), + ?tp(debug, shared_sub_leader_enter_actve, #{topic => Topic}), {keep_state_and_data, [ {{timeout, #renew_streams{}}, 0, #renew_streams{}}, {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}, @@ -174,7 +174,7 @@ handle_event(enter, _OldState, ?leader_active, #{topic := Topic} = _Data) -> %% timers %% renew_streams timer handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data0) -> - % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_streams}), + ?tp(debug, shared_sub_leader_timeout, #{timeout => renew_streams}), Data1 = renew_streams(Data0), {keep_state, Data1, { @@ -184,7 +184,7 @@ handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data }}; %% renew_leases timer handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) -> - % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_leases}), + ?tp(debug, shared_sub_leader_timeout, #{timeout => renew_leases}), Data1 = renew_leases(Data0), {keep_state, Data1, {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}}; @@ -279,7 +279,7 @@ renew_streams( Data2 = Data1#{stream_states => NewStreamStates, rank_progress => RankProgress1}, Data3 = revoke_streams(Data2), Data4 = assign_streams(Data3), - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_renew_streams, topic_filter => TopicFilter, new_streams => length(NewStreamsWRanks) @@ -368,7 +368,7 @@ revoke_excess_streams_from_agent(Data0, Agent, DesiredCount) -> false -> AgentState0; true -> - ?tp(warning, shared_sub_leader_revoke_streams, #{ + ?tp(debug, shared_sub_leader_revoke_streams, #{ agent => Agent, agent_stream_count => length(Streams0), revoke_count => RevokeCount, @@ -421,7 +421,7 @@ assign_lacking_streams(Data0, Agent, DesiredCount) -> false -> Data0; true -> - ?tp(warning, shared_sub_leader_assign_streams, #{ + ?tp(debug, shared_sub_leader_assign_streams, #{ agent => Agent, agent_stream_count => length(Streams0), assign_count => AssignCount, @@ -449,7 +449,7 @@ select_streams_for_assign(Data0, _Agent, AssignCount) -> %% renew_leases - send lease confirmations to agents renew_leases(#{agents := AgentStates} = Data) -> - ?tp(warning, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}), + ?tp(debug, shared_sub_leader_renew_leases, #{agents => maps:keys(AgentStates)}), ok = lists:foreach( fun({Agent, AgentState}) -> renew_lease(Data, Agent, AgentState) @@ -492,7 +492,7 @@ drop_timeout_agents(#{agents := Agents} = Data) -> (is_integer(NoReplayingDeadline) andalso NoReplayingDeadline < Now) of true -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_agent_timeout, now => Now, update_deadline => UpdateDeadline, @@ -516,14 +516,14 @@ connect_agent( Agent, AgentMetadata ) -> - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => leader_agent_connected, agent => Agent, group_id => GroupId }), case Agents of #{Agent := AgentState} -> - ?tp(warning, shared_sub_leader_agent_already_connected, #{ + ?tp(debug, shared_sub_leader_agent_already_connected, #{ agent => Agent }), reconnect_agent(Data, Agent, AgentMetadata, AgentState); @@ -546,7 +546,7 @@ reconnect_agent( AgentMetadata, #{streams := OldStreams, revoked_streams := OldRevokedStreams} = _OldAgentState ) -> - ?tp(warning, shared_sub_leader_agent_reconnect, #{ + ?tp(debug, shared_sub_leader_agent_reconnect, #{ agent => Agent, agent_metadata => AgentMetadata, inherited_streams => OldStreams @@ -767,7 +767,7 @@ update_agent_stream_states(Data0, Agent, AgentStreamProgresses, VersionOld, Vers disconnect_agent(Data0, Agent, AgentStreamProgresses, Version) -> case get_agent_state(Data0, Agent) of #{version := Version} -> - ?tp(warning, shared_sub_leader_disconnect_agent, #{ + ?tp(debug, shared_sub_leader_disconnect_agent, #{ agent => Agent, version => Version }), @@ -794,7 +794,7 @@ agent_transition_to_waiting_updating( Streams, RevokedStreams ) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => OldState, new_state => ?waiting_updating @@ -818,7 +818,7 @@ agent_transition_to_waiting_updating( agent_transition_to_waiting_replaying( #{group_id := GroupId} = _Data, Agent, #{state := OldState, version := Version} = AgentState0 ) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => OldState, new_state => ?waiting_replaying @@ -833,7 +833,7 @@ agent_transition_to_waiting_replaying( agent_transition_to_initial_waiting_replaying( #{group_id := GroupId} = Data, Agent, AgentMetadata, InitialStreams ) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => none, new_state => ?waiting_replaying @@ -856,7 +856,7 @@ agent_transition_to_initial_waiting_replaying( renew_no_replaying_deadline(AgentState). agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => ?waiting_replaying, new_state => ?replaying @@ -868,7 +868,7 @@ agent_transition_to_replaying(Agent, #{state := ?waiting_replaying} = AgentState }. agent_transition_to_updating(Agent, #{state := ?waiting_updating} = AgentState0) -> - ?tp(warning, shared_sub_leader_agent_state_transition, #{ + ?tp(debug, shared_sub_leader_agent_state_transition, #{ agent => Agent, old_state => ?waiting_updating, new_state => ?updating @@ -995,7 +995,7 @@ drop_agent(#{agents := Agents} = Data0, Agent) -> #{streams := Streams, revoked_streams := RevokedStreams} = AgentState, AllStreams = Streams ++ RevokedStreams, Data1 = unassign_streams(Data0, AllStreams), - ?tp(warning, shared_sub_leader_drop_agent, #{agent => Agent}), + ?tp(debug, shared_sub_leader_drop_agent, #{agent => Agent}), Data1#{agents => maps:remove(Agent, Agents)}. invalidate_agent(#{group_id := GroupId}, Agent) -> diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl index fa611463d..a07c15e4d 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader_rank_progress.erl @@ -55,7 +55,7 @@ set_replayed({{RankX, RankY}, Stream}, State) -> State#{RankX => #{min_y => MinY, ys => Ys2}}; _ -> ?SLOG( - warning, + debug, #{ msg => leader_rank_progress_double_or_invalid_update, rank_x => RankX, diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl index eae212458..16b672e19 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_registry.erl @@ -113,7 +113,7 @@ do_lookup_leader(Agent, AgentMetadata, ShareTopicFilter, State) -> Pid -> Pid end, - ?SLOG(info, #{ + ?SLOG(debug, #{ msg => lookup_leader, agent => Agent, share_topic_filter => ShareTopicFilter, diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl index 4f99a8455..a38045023 100644 --- a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_SUITE.erl @@ -417,7 +417,7 @@ t_lease_reconnect(_Config) -> ?assertWaitEvent( {ok, _, _} = emqtt:subscribe(ConnShared, <<"$share/gr2/topic2/#">>, 1), - #{?snk_kind := find_leader_timeout}, + #{?snk_kind := group_sm_find_leader_timeout}, 5_000 ),