feat(queue): compact protocol structures, organize formatting

This commit is contained in:
Ilya Averyanov 2024-08-02 12:41:30 +03:00
parent 3b52b658cd
commit e17becb84d
5 changed files with 225 additions and 221 deletions

View File

@ -56,6 +56,11 @@
cold_get_subscription/2 cold_get_subscription/2
]). ]).
-export([
format_lease_events/1,
format_stream_progresses/1
]).
-define(schedule_subscribe, schedule_subscribe). -define(schedule_subscribe, schedule_subscribe).
-define(schedule_unsubscribe, schedule_unsubscribe). -define(schedule_unsubscribe, schedule_unsubscribe).

View File

@ -268,7 +268,9 @@ handle_leader_update_streams(
id => Id, id => Id,
version_old => VersionOld, version_old => VersionOld,
version_new => VersionNew, version_new => VersionNew,
stream_progresses => emqx_ds_shared_sub_proto:format_stream_progresses(StreamProgresses) stream_progresses => emqx_persistent_session_ds_shared_subs:format_stream_progresses(
StreamProgresses
)
}), }),
{AddEvents, Streams1} = lists:foldl( {AddEvents, Streams1} = lists:foldl(
fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) -> fun(#{stream := Stream, progress := Progress}, {AddEventAcc, StreamsAcc}) ->
@ -305,7 +307,9 @@ handle_leader_update_streams(
StreamLeaseEvents = AddEvents ++ RevokeEvents, StreamLeaseEvents = AddEvents ++ RevokeEvents,
?tp(warning, shared_sub_group_sm_leader_update_streams, #{ ?tp(warning, shared_sub_group_sm_leader_update_streams, #{
id => Id, id => Id,
stream_lease_events => emqx_ds_shared_sub_proto:format_lease_events(StreamLeaseEvents) stream_lease_events => emqx_persistent_session_ds_shared_subs:format_lease_events(
StreamLeaseEvents
)
}), }),
transition( transition(
GSM, GSM,

View File

@ -22,12 +22,6 @@
]). ]).
-export([ -export([
format_stream_progresses/1,
format_stream_progress/1,
format_stream_key/1,
format_stream_keys/1,
format_lease_event/1,
format_lease_events/1,
agent/2 agent/2
]). ]).
@ -57,6 +51,20 @@
agent_metadata/0 agent_metadata/0
]). ]).
-define(log_agent_msg(ToLeader, Msg),
?tp(debug, shared_sub_proto_msg, #{
to_leader => ToLeader,
msg => emqx_ds_shared_sub_proto_format:format_agent_msg(Msg)
})
).
-define(log_leader_msg(ToAgent, Msg),
?tp(debug, shared_sub_proto_msg, #{
to_agent => ToAgent,
msg => emqx_ds_shared_sub_proto_format:format_leader_msg(Msg)
})
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -67,15 +75,7 @@
agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) when agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) when
?is_local_leader(ToLeader) ?is_local_leader(ToLeader)
-> ->
?tp(warning, shared_sub_proto_msg, #{ send_agent_msg(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter));
type => agent_connect_leader,
to_leader => ToLeader,
from_agent => FromAgent,
agent_metadata => AgentMetadata,
share_topic_filter => ShareTopicFilter
}),
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, AgentMetadata, ShareTopicFilter)),
ok;
agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) -> agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
emqx_ds_shared_sub_proto_v1:agent_connect_leader( emqx_ds_shared_sub_proto_v1:agent_connect_leader(
?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, ShareTopicFilter ?leader_node(ToLeader), ToLeader, FromAgent, AgentMetadata, ShareTopicFilter
@ -85,15 +85,7 @@ agent_connect_leader(ToLeader, FromAgent, AgentMetadata, ShareTopicFilter) ->
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when
?is_local_leader(ToLeader) ?is_local_leader(ToLeader)
-> ->
?tp(warning, shared_sub_proto_msg, #{ send_agent_msg(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version));
type => agent_update_stream_states,
to_leader => ToLeader,
from_agent => FromAgent,
stream_progresses => format_stream_progresses(StreamProgresses),
version => Version
}),
_ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
ok;
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
emqx_ds_shared_sub_proto_v1:agent_update_stream_states( emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
@ -105,18 +97,9 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) when agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) when
?is_local_leader(ToLeader) ?is_local_leader(ToLeader)
-> ->
?tp(warning, shared_sub_proto_msg, #{ send_agent_msg(
type => agent_update_stream_states,
to_leader => ToLeader,
from_agent => FromAgent,
stream_progresses => format_stream_progresses(StreamProgresses),
version_old => VersionOld,
version_new => VersionNew
}),
_ = erlang:send(
ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew) ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew)
), );
ok;
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) -> agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) ->
emqx_ds_shared_sub_proto_v1:agent_update_stream_states( emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew
@ -125,15 +108,7 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve
agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) when
?is_local_leader(ToLeader) ?is_local_leader(ToLeader)
-> ->
?tp(warning, shared_sub_proto_msg, #{ send_agent_msg(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version));
type => agent_disconnect,
to_leader => ToLeader,
from_agent => FromAgent,
stream_progresses => format_stream_progresses(StreamProgresses),
version => Version
}),
_ = erlang:send(ToLeader, ?agent_disconnect(FromAgent, StreamProgresses, Version)),
ok;
agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) -> agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) ->
emqx_ds_shared_sub_proto_v1:agent_disconnect( emqx_ds_shared_sub_proto_v1:agent_disconnect(
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
@ -144,19 +119,7 @@ agent_disconnect(ToLeader, FromAgent, StreamProgresses, Version) ->
-spec leader_lease_streams(agent(), group(), leader(), list(leader_stream_progress()), version()) -> -spec leader_lease_streams(agent(), group(), leader(), list(leader_stream_progress()), version()) ->
ok. ok.
leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) -> leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{ send_leader_msg(ToAgent, ?leader_lease_streams(OfGroup, Leader, Streams, Version));
type => leader_lease_streams,
to_agent => ToAgent,
of_group => OfGroup,
leader => Leader,
streams => format_stream_progresses(Streams),
version => Version
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent),
?leader_lease_streams(OfGroup, Leader, Streams, Version)
),
ok;
leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
emqx_ds_shared_sub_proto_v1:leader_lease_streams( emqx_ds_shared_sub_proto_v1:leader_lease_streams(
?agent_node(ToAgent), ToAgent, OfGroup, Leader, Streams, Version ?agent_node(ToAgent), ToAgent, OfGroup, Leader, Streams, Version
@ -164,17 +127,7 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
-spec leader_renew_stream_lease(agent(), group(), version()) -> ok. -spec leader_renew_stream_lease(agent(), group(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, Version) when ?is_local_agent(ToAgent) -> leader_renew_stream_lease(ToAgent, OfGroup, Version) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{ send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, Version));
type => leader_renew_stream_lease,
to_agent => ToAgent,
of_group => OfGroup,
version => Version
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent),
?leader_renew_stream_lease(OfGroup, Version)
),
ok;
leader_renew_stream_lease(ToAgent, OfGroup, Version) -> leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease( emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
?agent_node(ToAgent), ToAgent, OfGroup, Version ?agent_node(ToAgent), ToAgent, OfGroup, Version
@ -182,18 +135,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
-spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok. -spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) when ?is_local_agent(ToAgent) -> leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{ send_leader_msg(ToAgent, ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew));
type => leader_renew_stream_lease,
to_agent => ToAgent,
of_group => OfGroup,
version_old => VersionOld,
version_new => VersionNew
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent),
?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)
),
ok;
leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease( emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew
@ -204,19 +146,7 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when
?is_local_agent(ToAgent) ?is_local_agent(ToAgent)
-> ->
?tp(warning, shared_sub_proto_msg, #{ send_leader_msg(ToAgent, ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew));
type => leader_update_streams,
to_agent => ToAgent,
of_group => OfGroup,
version_old => VersionOld,
version_new => VersionNew,
streams_new => format_stream_progresses(StreamsNew)
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent),
?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)
),
ok;
leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
emqx_ds_shared_sub_proto_v1:leader_update_streams( emqx_ds_shared_sub_proto_v1:leader_update_streams(
?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew
@ -224,16 +154,7 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
-spec leader_invalidate(agent(), group()) -> ok. -spec leader_invalidate(agent(), group()) -> ok.
leader_invalidate(ToAgent, OfGroup) when ?is_local_agent(ToAgent) -> leader_invalidate(ToAgent, OfGroup) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{ send_leader_msg(ToAgent, ?leader_invalidate(OfGroup));
type => leader_invalidate,
to_agent => ToAgent,
of_group => OfGroup
}),
_ = emqx_persistent_session_ds_shared_subs_agent:send(
?agent_pid(ToAgent),
?leader_invalidate(OfGroup)
),
ok;
leader_invalidate(ToAgent, OfGroup) -> leader_invalidate(ToAgent, OfGroup) ->
emqx_ds_shared_sub_proto_v1:leader_invalidate( emqx_ds_shared_sub_proto_v1:leader_invalidate(
?agent_node(ToAgent), ToAgent, OfGroup ?agent_node(ToAgent), ToAgent, OfGroup
@ -247,41 +168,12 @@ agent(Id, Pid) ->
_ = Id, _ = Id,
?agent(Id, Pid). ?agent(Id, Pid).
format_stream_progresses(Streams) -> send_agent_msg(ToLeader, Msg) ->
lists:map( ?log_agent_msg(ToLeader, Msg),
fun format_stream_progress/1, _ = erlang:send(ToLeader, Msg),
Streams ok.
).
format_stream_progress(#{stream := Stream, progress := Progress} = Value) -> send_leader_msg(ToAgent, Msg) ->
Value#{stream => format_opaque(Stream), progress => format_progress(Progress)}. ?log_leader_msg(ToAgent, Msg),
_ = emqx_persistent_session_ds_shared_subs_agent:send(?agent_pid(ToAgent), Msg),
format_progress(#{iterator := Iterator} = Progress) -> ok.
Progress#{iterator => format_opaque(Iterator)}.
format_stream_key({SubId, Stream}) ->
{SubId, format_opaque(Stream)}.
format_stream_keys(StreamKeys) ->
lists:map(
fun format_stream_key/1,
StreamKeys
).
format_lease_events(Events) ->
lists:map(
fun format_lease_event/1,
Events
).
format_lease_event(#{stream := Stream, progress := Progress} = Event) ->
Event#{stream => format_opaque(Stream), progress => format_progress(Progress)};
format_lease_event(#{stream := Stream} = Event) ->
Event#{stream => format_opaque(Stream)}.
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
format_opaque(Opaque) ->
erlang:phash2(Opaque).

View File

@ -12,146 +12,167 @@
%% agent messages, sent from agent side to the leader %% agent messages, sent from agent side to the leader
-define(agent_connect_leader_msg, agent_connect_leader). -define(agent_connect_leader_msg, 1).
-define(agent_update_stream_states_msg, agent_update_stream_states). -define(agent_update_stream_states_msg, 2).
-define(agent_connect_leader_timeout_msg, agent_connect_leader_timeout). -define(agent_connect_leader_timeout_msg, 3).
-define(agent_renew_stream_lease_timeout_msg, agent_renew_stream_lease_timeout). -define(agent_renew_stream_lease_timeout_msg, 4).
-define(agent_disconnect_msg, agent_disconnect). -define(agent_disconnect_msg, 5).
%% message keys (used used not to send atoms over the network)
-define(agent_msg_type, 1).
-define(agent_msg_agent, 2).
-define(agent_msg_share_topic_filter, 3).
-define(agent_msg_agent_metadata, 4).
-define(agent_msg_stream_states, 5).
-define(agent_msg_version, 6).
-define(agent_msg_version_old, 7).
-define(agent_msg_version_new, 8).
%% Agent messages sent to the leader. %% Agent messages sent to the leader.
%% Leader talks to many agents, `agent` field is used to identify the sender. %% Leader talks to many agents, `agent` field is used to identify the sender.
-define(agent_connect_leader(Agent, AgentMetadata, ShareTopicFilter), #{ -define(agent_connect_leader(Agent, AgentMetadata, ShareTopicFilter), #{
type => ?agent_connect_leader_msg, ?agent_msg_type => ?agent_connect_leader_msg,
share_topic_filter => ShareTopicFilter, ?agent_msg_share_topic_filter => ShareTopicFilter,
agent_metadata => AgentMetadata, ?agent_msg_agent_metadata => AgentMetadata,
agent => Agent ?agent_msg_agent => Agent
}). }).
-define(agent_connect_leader_match(Agent, AgentMetadata, ShareTopicFilter), #{ -define(agent_connect_leader_match(Agent, AgentMetadata, ShareTopicFilter), #{
type := ?agent_connect_leader_msg, ?agent_msg_type := ?agent_connect_leader_msg,
share_topic_filter := ShareTopicFilter, ?agent_msg_share_topic_filter := ShareTopicFilter,
agent_metadata := AgentMetadata, ?agent_msg_agent_metadata := AgentMetadata,
agent := Agent ?agent_msg_agent := Agent
}). }).
-define(agent_update_stream_states(Agent, StreamStates, Version), #{ -define(agent_update_stream_states(Agent, StreamStates, Version), #{
type => ?agent_update_stream_states_msg, ?agent_msg_type => ?agent_update_stream_states_msg,
stream_states => StreamStates, ?agent_msg_stream_states => StreamStates,
version => Version, ?agent_msg_version => Version,
agent => Agent ?agent_msg_agent => Agent
}). }).
-define(agent_update_stream_states_match(Agent, StreamStates, Version), #{ -define(agent_update_stream_states_match(Agent, StreamStates, Version), #{
type := ?agent_update_stream_states_msg, ?agent_msg_type := ?agent_update_stream_states_msg,
stream_states := StreamStates, ?agent_msg_stream_states := StreamStates,
version := Version, ?agent_msg_version := Version,
agent := Agent ?agent_msg_agent := Agent
}). }).
-define(agent_update_stream_states(Agent, StreamStates, VersionOld, VersionNew), #{ -define(agent_update_stream_states(Agent, StreamStates, VersionOld, VersionNew), #{
type => ?agent_update_stream_states_msg, ?agent_msg_type => ?agent_update_stream_states_msg,
stream_states => StreamStates, ?agent_msg_stream_states => StreamStates,
version_old => VersionOld, ?agent_msg_version_old => VersionOld,
version_new => VersionNew, ?agent_msg_version_new => VersionNew,
agent => Agent ?agent_msg_agent => Agent
}). }).
-define(agent_update_stream_states_match(Agent, StreamStates, VersionOld, VersionNew), #{ -define(agent_update_stream_states_match(Agent, StreamStates, VersionOld, VersionNew), #{
type := ?agent_update_stream_states_msg, ?agent_msg_type := ?agent_update_stream_states_msg,
stream_states := StreamStates, ?agent_msg_stream_states := StreamStates,
version_old := VersionOld, ?agent_msg_version_old := VersionOld,
version_new := VersionNew, ?agent_msg_version_new := VersionNew,
agent := Agent ?agent_msg_agent := Agent
}). }).
-define(agent_disconnect(Agent, StreamStates, Version), #{ -define(agent_disconnect(Agent, StreamStates, Version), #{
type => ?agent_disconnect_msg, ?agent_msg_type => ?agent_disconnect_msg,
stream_states => StreamStates, ?agent_msg_stream_states => StreamStates,
version => Version, ?agent_msg_version => Version,
agent => Agent ?agent_msg_agent => Agent
}). }).
-define(agent_disconnect_match(Agent, StreamStates, Version), #{ -define(agent_disconnect_match(Agent, StreamStates, Version), #{
type := ?agent_disconnect_msg, ?agent_msg_type := ?agent_disconnect_msg,
stream_states := StreamStates, ?agent_msg_stream_states := StreamStates,
version := Version, ?agent_msg_version := Version,
agent := Agent ?agent_msg_agent := Agent
}). }).
%% leader messages, sent from the leader to the agent %% leader messages, sent from the leader to the agent
%% Agent may have several shared subscriptions, so may talk to several leaders %% Agent may have several shared subscriptions, so may talk to several leaders
%% `group_id` field is used to identify the leader. %% `group_id` field is used to identify the leader.
-define(leader_lease_streams_msg, leader_lease_streams). -define(leader_lease_streams_msg, 101).
-define(leader_renew_stream_lease_msg, leader_renew_stream_lease). -define(leader_renew_stream_lease_msg, 102).
-define(leader_update_streams, 103).
-define(leader_invalidate, 104).
-define(leader_msg_type, 101).
-define(leader_msg_streams, 102).
-define(leader_msg_version, 103).
-define(leader_msg_version_old, 104).
-define(leader_msg_version_new, 105).
-define(leader_msg_streams_new, 106).
-define(leader_msg_leader, 107).
-define(leader_msg_group_id, 108).
-define(leader_lease_streams(GrouId, Leader, Streams, Version), #{ -define(leader_lease_streams(GrouId, Leader, Streams, Version), #{
type => ?leader_lease_streams_msg, ?leader_msg_type => ?leader_lease_streams_msg,
streams => Streams, ?leader_msg_streams => Streams,
version => Version, ?leader_msg_version => Version,
leader => Leader, ?leader_msg_leader => Leader,
group_id => GrouId ?leader_msg_group_id => GrouId
}). }).
-define(leader_lease_streams_match(GroupId, Leader, Streams, Version), #{ -define(leader_lease_streams_match(GroupId, Leader, Streams, Version), #{
type := ?leader_lease_streams_msg, ?leader_msg_type := ?leader_lease_streams_msg,
streams := Streams, ?leader_msg_streams := Streams,
version := Version, ?leader_msg_version := Version,
leader := Leader, ?leader_msg_leader := Leader,
group_id := GroupId ?leader_msg_group_id := GroupId
}). }).
-define(leader_renew_stream_lease(GroupId, Version), #{ -define(leader_renew_stream_lease(GroupId, Version), #{
type => ?leader_renew_stream_lease_msg, ?leader_msg_type => ?leader_renew_stream_lease_msg,
version => Version, ?leader_msg_version => Version,
group_id => GroupId ?leader_msg_group_id => GroupId
}). }).
-define(leader_renew_stream_lease_match(GroupId, Version), #{ -define(leader_renew_stream_lease_match(GroupId, Version), #{
type := ?leader_renew_stream_lease_msg, ?leader_msg_type := ?leader_renew_stream_lease_msg,
version := Version, ?leader_msg_version := Version,
group_id := GroupId ?leader_msg_group_id := GroupId
}). }).
-define(leader_renew_stream_lease(GroupId, VersionOld, VersionNew), #{ -define(leader_renew_stream_lease(GroupId, VersionOld, VersionNew), #{
type => ?leader_renew_stream_lease_msg, ?leader_msg_type => ?leader_renew_stream_lease_msg,
version_old => VersionOld, ?leader_msg_version_old => VersionOld,
version_new => VersionNew, ?leader_msg_version_new => VersionNew,
group_id => GroupId ?leader_msg_group_id => GroupId
}). }).
-define(leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew), #{ -define(leader_renew_stream_lease_match(GroupId, VersionOld, VersionNew), #{
type := ?leader_renew_stream_lease_msg, ?leader_msg_type := ?leader_renew_stream_lease_msg,
version_old := VersionOld, ?leader_msg_version_old := VersionOld,
version_new := VersionNew, ?leader_msg_version_new := VersionNew,
group_id := GroupId ?leader_msg_group_id := GroupId
}). }).
-define(leader_update_streams(GroupId, VersionOld, VersionNew, StreamsNew), #{ -define(leader_update_streams(GroupId, VersionOld, VersionNew, StreamsNew), #{
type => leader_update_streams, ?leader_msg_type => ?leader_update_streams,
version_old => VersionOld, ?leader_msg_version_old => VersionOld,
version_new => VersionNew, ?leader_msg_version_new => VersionNew,
streams_new => StreamsNew, ?leader_msg_streams_new => StreamsNew,
group_id => GroupId ?leader_msg_group_id => GroupId
}). }).
-define(leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew), #{ -define(leader_update_streams_match(GroupId, VersionOld, VersionNew, StreamsNew), #{
type := leader_update_streams, ?leader_msg_type := ?leader_update_streams,
version_old := VersionOld, ?leader_msg_version_old := VersionOld,
version_new := VersionNew, ?leader_msg_version_new := VersionNew,
streams_new := StreamsNew, ?leader_msg_streams_new := StreamsNew,
group_id := GroupId ?leader_msg_group_id := GroupId
}). }).
-define(leader_invalidate(GroupId), #{ -define(leader_invalidate(GroupId), #{
type => leader_invalidate, ?leader_msg_type => ?leader_invalidate,
group_id => GroupId ?leader_msg_group_id => GroupId
}). }).
-define(leader_invalidate_match(GroupId), #{ -define(leader_invalidate_match(GroupId), #{
type := leader_invalidate, ?leader_msg_type := ?leader_invalidate,
group_id := GroupId ?leader_msg_group_id := GroupId
}). }).
%% Helpers %% Helpers

View File

@ -0,0 +1,82 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_proto_format).
-include("emqx_ds_shared_sub_proto.hrl").
-export([format_agent_msg/1, format_leader_msg/1]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
format_agent_msg(Msg) ->
maps:from_list(
lists:map(
fun({K, V}) ->
FormattedKey = agent_msg_key(K),
{FormattedKey, format_agent_msg_value(FormattedKey, V)}
end,
maps:to_list(Msg)
)
).
format_leader_msg(Msg) ->
maps:from_list(
lists:map(
fun({K, V}) ->
FormattedKey = leader_msg_key(K),
{FormattedKey, format_leader_msg_value(FormattedKey, V)}
end,
maps:to_list(Msg)
)
).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
format_agent_msg_value(agent_msg_type, Type) ->
agent_msg_type(Type);
format_agent_msg_value(agent_msg_stream_states, StreamStates) ->
emqx_persistent_session_ds_shared_subs:format_stream_progresses(StreamStates);
format_agent_msg_value(_, Value) ->
Value.
format_leader_msg_value(leader_msg_type, Type) ->
leader_msg_type(Type);
format_leader_msg_value(leader_msg_streams, Streams) ->
emqx_persistent_session_ds_shared_subs:format_lease_events(Streams);
format_leader_msg_value(_, Value) ->
Value.
agent_msg_type(?agent_connect_leader_msg) -> agent_connect_leader_msg;
agent_msg_type(?agent_update_stream_states_msg) -> agent_update_stream_states_msg;
agent_msg_type(?agent_connect_leader_timeout_msg) -> agent_connect_leader_timeout_msg;
agent_msg_type(?agent_renew_stream_lease_timeout_msg) -> agent_renew_stream_lease_timeout_msg;
agent_msg_type(?agent_disconnect_msg) -> agent_disconnect_msg.
agent_msg_key(?agent_msg_type) -> agent_msg_type;
agent_msg_key(?agent_msg_agent) -> agent_msg_agent;
agent_msg_key(?agent_msg_share_topic_filter) -> agent_msg_share_topic_filter;
agent_msg_key(?agent_msg_agent_metadata) -> agent_msg_agent_metadata;
agent_msg_key(?agent_msg_stream_states) -> agent_msg_stream_states;
agent_msg_key(?agent_msg_version) -> agent_msg_version;
agent_msg_key(?agent_msg_version_old) -> agent_msg_version_old;
agent_msg_key(?agent_msg_version_new) -> agent_msg_version_new.
leader_msg_type(?leader_lease_streams_msg) -> leader_lease_streams_msg;
leader_msg_type(?leader_renew_stream_lease_msg) -> leader_renew_stream_lease_msg;
leader_msg_type(?leader_update_streams) -> leader_update_streams;
leader_msg_type(?leader_invalidate) -> leader_invalidate.
leader_msg_key(?leader_msg_type) -> leader_msg_type;
leader_msg_key(?leader_msg_streams) -> leader_msg_streams;
leader_msg_key(?leader_msg_version) -> leader_msg_version;
leader_msg_key(?leader_msg_version_old) -> leader_msg_version_old;
leader_msg_key(?leader_msg_version_new) -> leader_msg_version_new;
leader_msg_key(?leader_msg_streams_new) -> leader_msg_streams_new;
leader_msg_key(?leader_msg_leader) -> leader_msg_leader;
leader_msg_key(?leader_msg_group_id) -> leader_msg_group_id.