From e17becb84d8d440171bfe07092b4ebf60d9a4294 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 2 Aug 2024 12:41:30 +0300 Subject: [PATCH] feat(queue): compact protocol structures, organize formatting --- ...emqx_persistent_session_ds_shared_subs.erl | 5 + .../src/emqx_ds_shared_sub_group_sm.erl | 8 +- .../src/emqx_ds_shared_sub_proto.erl | 172 ++++------------- .../src/emqx_ds_shared_sub_proto.hrl | 179 ++++++++++-------- .../src/emqx_ds_shared_sub_proto_format.erl | 82 ++++++++ 5 files changed, 225 insertions(+), 221 deletions(-) create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl diff --git a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl index 5b54c6f73..10d41e91e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_shared_subs.erl @@ -56,6 +56,11 @@ cold_get_subscription/2 ]). +-export([ + format_lease_events/1, + format_stream_progresses/1 +]). + -define(schedule_subscribe, schedule_subscribe). -define(schedule_unsubscribe, schedule_unsubscribe). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index a648bbaef..4cae98f65 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -268,7 +268,9 @@ handle_leader_update_streams( id => Id, version_old => VersionOld, version_new => VersionNew, - stream_progresses => emqx_ds_shared_sub_proto:format_stream_progresses(StreamProgresses) + stream_progresses => emqx_persistent_session_ds_shared_subs:format_stream_progresses( + StreamProgresses + ) }), {AddEvents, Streams1} = lists:foldl( fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) -> @@ -305,7 +307,9 @@ handle_leader_update_streams( StreamLeaseEvents = AddEvents ++ RevokeEvents, ?tp(warning, shared_sub_group_sm_leader_update_streams, #{ id => Id, - stream_lease_events => emqx_ds_shared_sub_proto:format_lease_events(StreamLeaseEvents) + stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events( + StreamLeaseEvents + ) }), transition( GSM, diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index 383f66ff2..93278a5d5 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -22,12 +22,6 @@ ]). -export([ - format_stream_progresses/1, - format_stream_progress/1, - format_stream_key/1, - format_stream_keys/1, - format_lease_event/1, - format_lease_events/1, agent/2 ]). @@ -57,6 +51,20 @@ agent_metadata/0 ]). +-define(log_agent_msg(ToLeader, Msg), + ?tp(debug, shared_sub_proto_msg, #{ + to_leader => ToLeader, + msg => emqx_ds_shared_sub_proto_format:format_agent_msg(Msg) + }) +). + +-define(log_leader_msg(ToAgent, Msg), + ?tp(debug, shared_sub_proto_msg, #{ + to_agent => ToAgent, + msg => emqx_ds_shared_sub_proto_format:format_leader_msg(Msg) + }) +). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -67,15 +75,7 @@ agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) when ?is_local_leader(ToLeader) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => agent_connect_leader, - to_leader => ToLeader, - from_agent => FromAgent, - agent_metadata => AgentMetadata, - share_topic_filter => ShareTopicFilter - }), - _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)), - ok; + send_agent_msg(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)); agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) -> emqx_ds_shared_sub_proto_v1:agent_connect_leader( ?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, ShareTopicFilter @@ -85,15 +85,7 @@ agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) -> agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when ?is_local_leader(ToLeader) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => agent_update_stream_states, - to_leader => ToLeader, - from_agent => FromAgent, - stream_progresses => format_stream_progresses(StreamProgresses), - version => Version - }), - _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), - ok; + send_agent_msg(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)); agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> emqx_ds_shared_sub_proto_v1:agent_update_stream_states( ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version @@ -105,18 +97,9 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) when ?is_local_leader(ToLeader) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => agent_update_stream_states, - to_leader => ToLeader, - from_agent => FromAgent, - stream_progresses => format_stream_progresses(StreamProgresses), - version_old => VersionOld, - version_new => VersionNew - }), - _ = erlang:send( + send_agent_msg( ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew) - ), - ok; + ); agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) -> emqx_ds_shared_sub_proto_v1:agent_update_stream_states( ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew @@ -125,15 +108,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when ?is_local_leader(ToLeader) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => agent_disconnect, - to_leader => ToLeader, - from_agent => FromAgent, - stream_progresses => format_stream_progresses(StreamProgresses), - version => Version - }), - _ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)), - ok; + send_agent_msg(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)); agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) -> emqx_ds_shared_sub_proto_v1:agent_disconnect( ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version @@ -144,19 +119,7 @@ agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) -> -spec leader_lease_streams(agent(), group(), leader(), list(leader_stream_progress()), version()) -> ok. leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_lease_streams, - to_agent => ToAgent, - of_group => OfGroup, - leader => Leader, - streams => format_stream_progresses(Streams), - version => Version - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_lease_streams(OfGroup, Leader, Streams, Version) - ), - ok; + send_leader_msg(ToAgent, ?leader_lease_streams(OfGroup, Leader, Streams, Version)); leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> emqx_ds_shared_sub_proto_v1:leader_lease_streams( ?agent_node(ToAgent), ToAgent, OfGroup, Leader, Streams, Version @@ -164,17 +127,7 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> -spec leader_renew_stream_lease(agent(), group(), version()) -> ok. leader_renew_stream_lease(ToAgent, OfGroup, Version) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_renew_stream_lease, - to_agent => ToAgent, - of_group => OfGroup, - version => Version - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_renew_stream_lease(OfGroup, Version) - ), - ok; + send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, Version)); leader_renew_stream_lease(ToAgent, OfGroup, Version) -> emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease( ?agent_node(ToAgent), ToAgent, OfGroup, Version @@ -182,18 +135,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) -> -spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok. leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_renew_stream_lease, - to_agent => ToAgent, - of_group => OfGroup, - version_old => VersionOld, - version_new => VersionNew - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew) - ), - ok; + send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)); leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease( ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew @@ -204,19 +146,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_update_streams, - to_agent => ToAgent, - of_group => OfGroup, - version_old => VersionOld, - version_new => VersionNew, - streams_new => format_stream_progresses(StreamsNew) - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew) - ), - ok; + send_leader_msg(ToAgent, ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)); leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> emqx_ds_shared_sub_proto_v1:leader_update_streams( ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew @@ -224,16 +154,7 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> -spec leader_invalidate(agent(), group()) -> ok. leader_invalidate(ToAgent, OfGroup) when ?is_local_agent(ToAgent) -> - ?tp(warning, shared_sub_proto_msg, #{ - type => leader_invalidate, - to_agent => ToAgent, - of_group => OfGroup - }), - _ = emqx_persistent_session_ds_shared_subs_agent:send( - ?agent_pid(ToAgent), - ?leader_invalidate(OfGroup) - ), - ok; + send_leader_msg(ToAgent, ?leader_invalidate(OfGroup)); leader_invalidate(ToAgent, OfGroup) -> emqx_ds_shared_sub_proto_v1:leader_invalidate( ?agent_node(ToAgent), ToAgent, OfGroup @@ -247,41 +168,12 @@ agent(Id, Pid) -> _ = Id, ?agent(Id, Pid). -format_stream_progresses(Streams) -> - lists:map( - fun format_stream_progress/1, - Streams - ). +send_agent_msg(ToLeader, Msg) -> + ?log_agent_msg(ToLeader, Msg), + _ = erlang:send(ToLeader, Msg), + ok. -format_stream_progress(#{stream := Stream, progress := Progress} = Value) -> - Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}. - -format_progress(#{iterator := Iterator} = Progress) -> - Progress#{iterator => format_opaque(Iterator)}. - -format_stream_key({SubId, Stream}) -> - {SubId, format_opaque(Stream)}. - -format_stream_keys(StreamKeys) -> - lists:map( - fun format_stream_key/1, - StreamKeys - ). - -format_lease_events(Events) -> - lists:map( - fun format_lease_event/1, - Events - ). - -format_lease_event(#{stream := Stream, progress := Progress} = Event) -> - Event#{stream => format_opaque(Stream), progress => format_progress(Progress)}; -format_lease_event(#{stream := Stream} = Event) -> - Event#{stream => format_opaque(Stream)}. - -%%-------------------------------------------------------------------- -%% Helpers -%%-------------------------------------------------------------------- - -format_opaque(Opaque) -> - erlang:phash2(Opaque). +send_leader_msg(ToAgent, Msg) -> + ?log_leader_msg(ToAgent, Msg), + _ = emqx_persistent_session_ds_shared_subs_agent:send(?agent_pid(ToAgent), Msg), + ok. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl index bf54b2930..89e0d2c84 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl @@ -12,146 +12,167 @@ %% agent messages, sent from agent side to the leader --define(agent_connect_leader_msg, agent_connect_leader). --define(agent_update_stream_states_msg, agent_update_stream_states). --define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout). --define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout). --define(agent_disconnect_msg, agent_disconnect). +-define(agent_connect_leader_msg, 1). +-define(agent_update_stream_states_msg, 2). +-define(agent_connect_leader_timeout_msg, 3). +-define(agent_renew_stream_lease_timeout_msg, 4). +-define(agent_disconnect_msg, 5). + +%% message keys (used used not to send atoms over the network) +-define(agent_msg_type, 1). +-define(agent_msg_agent, 2). +-define(agent_msg_share_topic_filter, 3). +-define(agent_msg_agent_metadata, 4). +-define(agent_msg_stream_states, 5). +-define(agent_msg_version, 6). +-define(agent_msg_version_old, 7). +-define(agent_msg_version_new, 8). %% Agent messages sent to the leader. %% Leader talks to many agents, `agent` field is used to identify the sender. -define(agent_connect_leader(Agent, AgentMetadata, ShareTopicFilter), #{ - type => ?agent_connect_leader_msg, - share_topic_filter => ShareTopicFilter, - agent_metadata => AgentMetadata, - agent => Agent + ?agent_msg_type => ?agent_connect_leader_msg, + ?agent_msg_share_topic_filter => ShareTopicFilter, + ?agent_msg_agent_metadata => AgentMetadata, + ?agent_msg_agent => Agent }). -define(agent_connect_leader_match(Agent, AgentMetadata, ShareTopicFilter), #{ - type := ?agent_connect_leader_msg, - share_topic_filter := ShareTopicFilter, - agent_metadata := AgentMetadata, - agent := Agent + ?agent_msg_type := ?agent_connect_leader_msg, + ?agent_msg_share_topic_filter := ShareTopicFilter, + ?agent_msg_agent_metadata := AgentMetadata, + ?agent_msg_agent := Agent }). -define(agent_update_stream_states(Agent, StreamStates, Version), #{ - type => ?agent_update_stream_states_msg, - stream_states => StreamStates, - version => Version, - agent => Agent + ?agent_msg_type => ?agent_update_stream_states_msg, + ?agent_msg_stream_states => StreamStates, + ?agent_msg_version => Version, + ?agent_msg_agent => Agent }). -define(agent_update_stream_states_match(Agent, StreamStates, Version), #{ - type := ?agent_update_stream_states_msg, - stream_states := StreamStates, - version := Version, - agent := Agent + ?agent_msg_type := ?agent_update_stream_states_msg, + ?agent_msg_stream_states := StreamStates, + ?agent_msg_version := Version, + ?agent_msg_agent := Agent }). -define(agent_update_stream_states(Agent, StreamStates, VersionOld, VersionNew), #{ - type => ?agent_update_stream_states_msg, - stream_states => StreamStates, - version_old => VersionOld, - version_new => VersionNew, - agent => Agent + ?agent_msg_type => ?agent_update_stream_states_msg, + ?agent_msg_stream_states => StreamStates, + ?agent_msg_version_old => VersionOld, + ?agent_msg_version_new => VersionNew, + ?agent_msg_agent => Agent }). -define(agent_update_stream_states_match(Agent, StreamStates, VersionOld, VersionNew), #{ - type := ?agent_update_stream_states_msg, - stream_states := StreamStates, - version_old := VersionOld, - version_new := VersionNew, - agent := Agent + ?agent_msg_type := ?agent_update_stream_states_msg, + ?agent_msg_stream_states := StreamStates, + ?agent_msg_version_old := VersionOld, + ?agent_msg_version_new := VersionNew, + ?agent_msg_agent := Agent }). -define(agent_disconnect(Agent, StreamStates, Version), #{ - type => ?agent_disconnect_msg, - stream_states => StreamStates, - version => Version, - agent => Agent + ?agent_msg_type => ?agent_disconnect_msg, + ?agent_msg_stream_states => StreamStates, + ?agent_msg_version => Version, + ?agent_msg_agent => Agent }). -define(agent_disconnect_match(Agent, StreamStates, Version), #{ - type := ?agent_disconnect_msg, - stream_states := StreamStates, - version := Version, - agent := Agent + ?agent_msg_type := ?agent_disconnect_msg, + ?agent_msg_stream_states := StreamStates, + ?agent_msg_version := Version, + ?agent_msg_agent := Agent }). %% leader messages, sent from the leader to the agent %% Agent may have several shared subscriptions, so may talk to several leaders %% `group_id` field is used to identify the leader. --define(leader_lease_streams_msg, leader_lease_streams). --define(leader_renew_stream_lease_msg, leader_renew_stream_lease). +-define(leader_lease_streams_msg, 101). +-define(leader_renew_stream_lease_msg, 102). +-define(leader_update_streams, 103). +-define(leader_invalidate, 104). + +-define(leader_msg_type, 101). +-define(leader_msg_streams, 102). +-define(leader_msg_version, 103). +-define(leader_msg_version_old, 104). +-define(leader_msg_version_new, 105). +-define(leader_msg_streams_new, 106). +-define(leader_msg_leader, 107). +-define(leader_msg_group_id, 108). -define(leader_lease_streams(GrouId, Leader, Streams, Version), #{ - type => ?leader_lease_streams_msg, - streams => Streams, - version => Version, - leader => Leader, - group_id => GrouId + ?leader_msg_type => ?leader_lease_streams_msg, + ?leader_msg_streams => Streams, + ?leader_msg_version => Version, + ?leader_msg_leader => Leader, + ?leader_msg_group_id => GrouId }). -define(leader_lease_streams_match(GroupId, Leader, Streams, Version), #{ - type := ?leader_lease_streams_msg, - streams := Streams, - version := Version, - leader := Leader, - group_id := GroupId + ?leader_msg_type := ?leader_lease_streams_msg, + ?leader_msg_streams := Streams, + ?leader_msg_version := Version, + ?leader_msg_leader := Leader, + ?leader_msg_group_id := GroupId }). -define(leader_renew_stream_lease(GroupId, Version), #{ - type => ?leader_renew_stream_lease_msg, - version => Version, - group_id => GroupId + ?leader_msg_type => ?leader_renew_stream_lease_msg, + ?leader_msg_version => Version, + ?leader_msg_group_id => GroupId }). -define(leader_renew_stream_lease_match(GroupId, Version), #{ - type := ?leader_renew_stream_lease_msg, - version := Version, - group_id := GroupId + ?leader_msg_type := ?leader_renew_stream_lease_msg, + ?leader_msg_version := Version, + ?leader_msg_group_id := GroupId }). -define(leader_renew_stream_lease(GroupId, VersionOld, VersionNew), #{ - type => ?leader_renew_stream_lease_msg, - version_old => VersionOld, - version_new => VersionNew, - group_id => GroupId + ?leader_msg_type => ?leader_renew_stream_lease_msg, + ?leader_msg_version_old => VersionOld, + ?leader_msg_version_new => VersionNew, + ?leader_msg_group_id => GroupId }). -define(leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew), #{ - type := ?leader_renew_stream_lease_msg, - version_old := VersionOld, - version_new := VersionNew, - group_id := GroupId + ?leader_msg_type := ?leader_renew_stream_lease_msg, + ?leader_msg_version_old := VersionOld, + ?leader_msg_version_new := VersionNew, + ?leader_msg_group_id := GroupId }). -define(leader_update_streams(GroupId, VersionOld, VersionNew, StreamsNew), #{ - type => leader_update_streams, - version_old => VersionOld, - version_new => VersionNew, - streams_new => StreamsNew, - group_id => GroupId + ?leader_msg_type => ?leader_update_streams, + ?leader_msg_version_old => VersionOld, + ?leader_msg_version_new => VersionNew, + ?leader_msg_streams_new => StreamsNew, + ?leader_msg_group_id => GroupId }). -define(leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew), #{ - type := leader_update_streams, - version_old := VersionOld, - version_new := VersionNew, - streams_new := StreamsNew, - group_id := GroupId + ?leader_msg_type := ?leader_update_streams, + ?leader_msg_version_old := VersionOld, + ?leader_msg_version_new := VersionNew, + ?leader_msg_streams_new := StreamsNew, + ?leader_msg_group_id := GroupId }). -define(leader_invalidate(GroupId), #{ - type => leader_invalidate, - group_id => GroupId + ?leader_msg_type => ?leader_invalidate, + ?leader_msg_group_id => GroupId }). -define(leader_invalidate_match(GroupId), #{ - type := leader_invalidate, - group_id := GroupId + ?leader_msg_type := ?leader_invalidate, + ?leader_msg_group_id := GroupId }). %% Helpers diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl new file mode 100644 index 000000000..4fa01e7a8 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto_format.erl @@ -0,0 +1,82 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_proto_format). + +-include("emqx_ds_shared_sub_proto.hrl"). + +-export([format_agent_msg/1, format_leader_msg/1]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +format_agent_msg(Msg) -> + maps:from_list( + lists:map( + fun({K, V}) -> + FormattedKey = agent_msg_key(K), + {FormattedKey, format_agent_msg_value(FormattedKey, V)} + end, + maps:to_list(Msg) + ) + ). + +format_leader_msg(Msg) -> + maps:from_list( + lists:map( + fun({K, V}) -> + FormattedKey = leader_msg_key(K), + {FormattedKey, format_leader_msg_value(FormattedKey, V)} + end, + maps:to_list(Msg) + ) + ). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +format_agent_msg_value(agent_msg_type, Type) -> + agent_msg_type(Type); +format_agent_msg_value(agent_msg_stream_states, StreamStates) -> + emqx_persistent_session_ds_shared_subs:format_stream_progresses(StreamStates); +format_agent_msg_value(_, Value) -> + Value. + +format_leader_msg_value(leader_msg_type, Type) -> + leader_msg_type(Type); +format_leader_msg_value(leader_msg_streams, Streams) -> + emqx_persistent_session_ds_shared_subs:format_lease_events(Streams); +format_leader_msg_value(_, Value) -> + Value. + +agent_msg_type(?agent_connect_leader_msg) -> agent_connect_leader_msg; +agent_msg_type(?agent_update_stream_states_msg) -> agent_update_stream_states_msg; +agent_msg_type(?agent_connect_leader_timeout_msg) -> agent_connect_leader_timeout_msg; +agent_msg_type(?agent_renew_stream_lease_timeout_msg) -> agent_renew_stream_lease_timeout_msg; +agent_msg_type(?agent_disconnect_msg) -> agent_disconnect_msg. + +agent_msg_key(?agent_msg_type) -> agent_msg_type; +agent_msg_key(?agent_msg_agent) -> agent_msg_agent; +agent_msg_key(?agent_msg_share_topic_filter) -> agent_msg_share_topic_filter; +agent_msg_key(?agent_msg_agent_metadata) -> agent_msg_agent_metadata; +agent_msg_key(?agent_msg_stream_states) -> agent_msg_stream_states; +agent_msg_key(?agent_msg_version) -> agent_msg_version; +agent_msg_key(?agent_msg_version_old) -> agent_msg_version_old; +agent_msg_key(?agent_msg_version_new) -> agent_msg_version_new. + +leader_msg_type(?leader_lease_streams_msg) -> leader_lease_streams_msg; +leader_msg_type(?leader_renew_stream_lease_msg) -> leader_renew_stream_lease_msg; +leader_msg_type(?leader_update_streams) -> leader_update_streams; +leader_msg_type(?leader_invalidate) -> leader_invalidate. + +leader_msg_key(?leader_msg_type) -> leader_msg_type; +leader_msg_key(?leader_msg_streams) -> leader_msg_streams; +leader_msg_key(?leader_msg_version) -> leader_msg_version; +leader_msg_key(?leader_msg_version_old) -> leader_msg_version_old; +leader_msg_key(?leader_msg_version_new) -> leader_msg_version_new; +leader_msg_key(?leader_msg_streams_new) -> leader_msg_streams_new; +leader_msg_key(?leader_msg_leader) -> leader_msg_leader; +leader_msg_key(?leader_msg_group_id) -> leader_msg_group_id.