diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl index d7d85b8f2..363a16e46 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.erl @@ -9,6 +9,7 @@ -module(emqx_ds_shared_sub_proto). -include("emqx_ds_shared_sub_proto.hrl"). + -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ @@ -28,16 +29,7 @@ agent/2 ]). --ifdef(TEST). --record(agent, { - pid :: pid(), - id :: term() -}). --type agent() :: #agent{}. --else. --type agent() :: pid(). --endif. - +-type agent() :: ?agent(emqx_persistent_session_ds:id(), pid()). -type leader() :: pid(). -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type group() :: emqx_types:group(). @@ -69,7 +61,7 @@ %% agent -> leader messages -spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok. -agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> +agent_connect_leader(ToLeader, FromAgent, TopicFilter) when ?is_local_leader(ToLeader) -> ?tp(warning, shared_sub_proto_msg, #{ type => agent_connect_leader, to_leader => ToLeader, @@ -77,10 +69,16 @@ agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> topic_filter => TopicFilter }), _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)), - ok. + ok; +agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> + emqx_ds_shared_sub_proto_v1:agent_connect_leader( + ?leader_node(ToLeader), ToLeader, FromAgent, TopicFilter + ). -spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok. -agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> +agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when + ?is_local_leader(ToLeader) +-> ?tp(warning, shared_sub_proto_msg, #{ type => agent_update_stream_states, to_leader => ToLeader, @@ -89,12 +87,18 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> version => Version }), _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), - ok. + ok; +agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> + emqx_ds_shared_sub_proto_v1:agent_update_stream_states( + ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version + ). -spec agent_update_stream_states( leader(), agent(), list(agent_stream_progress()), version(), version() ) -> ok. -agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) -> +agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) when + ?is_local_leader(ToLeader) +-> ?tp(warning, shared_sub_proto_msg, #{ type => agent_update_stream_states, to_leader => ToLeader, @@ -106,12 +110,16 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve _ = erlang:send( ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew) ), - ok. + ok; +agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) -> + emqx_ds_shared_sub_proto_v1:agent_update_stream_states( + ?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew + ). %% leader -> agent messages -spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok. -leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> +leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) -> ?tp(warning, shared_sub_proto_msg, #{ type => leader_lease_streams, to_agent => ToAgent, @@ -121,13 +129,17 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> version => Version }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - agent_pid(ToAgent), + ?agent_pid(ToAgent), ?leader_lease_streams(OfGroup, Leader, Streams, Version) ), - ok. + ok; +leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> + emqx_ds_shared_sub_proto_v1:leader_lease_streams( + ?agent_node(ToAgent), ToAgent, OfGroup, Leader, Streams, Version + ). -spec leader_renew_stream_lease(agent(), group(), version()) -> ok. -leader_renew_stream_lease(ToAgent, OfGroup, Version) -> +leader_renew_stream_lease(ToAgent, OfGroup, Version) when ?is_local_agent(ToAgent) -> ?tp(warning, shared_sub_proto_msg, #{ type => leader_renew_stream_lease, to_agent => ToAgent, @@ -135,13 +147,17 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) -> version => Version }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - agent_pid(ToAgent), + ?agent_pid(ToAgent), ?leader_renew_stream_lease(OfGroup, Version) ), - ok. + ok; +leader_renew_stream_lease(ToAgent, OfGroup, Version) -> + emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease( + ?agent_node(ToAgent), ToAgent, OfGroup, Version + ). -spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok. -leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> +leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) when ?is_local_agent(ToAgent) -> ?tp(warning, shared_sub_proto_msg, #{ type => leader_renew_stream_lease, to_agent => ToAgent, @@ -150,13 +166,19 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> version_new => VersionNew }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - agent_pid(ToAgent), + ?agent_pid(ToAgent), ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew) ), - ok. + ok; +leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> + emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease( + ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew + ). -spec leader_update_streams(agent(), group(), version(), version(), list(stream_progress())) -> ok. -leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> +leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when + ?is_local_agent(ToAgent) +-> ?tp(warning, shared_sub_proto_msg, #{ type => leader_update_streams, to_agent => ToAgent, @@ -166,42 +188,38 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> streams_new => format_streams(StreamsNew) }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - agent_pid(ToAgent), + ?agent_pid(ToAgent), ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew) ), - ok. + ok; +leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> + emqx_ds_shared_sub_proto_v1:leader_update_streams( + ?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew + ). -spec leader_invalidate(agent(), group()) -> ok. -leader_invalidate(ToAgent, OfGroup) -> +leader_invalidate(ToAgent, OfGroup) when ?is_local_agent(ToAgent) -> ?tp(warning, shared_sub_proto_msg, #{ type => leader_invalidate, to_agent => ToAgent, of_group => OfGroup }), _ = emqx_persistent_session_ds_shared_subs_agent:send( - agent_pid(ToAgent), + ?agent_pid(ToAgent), ?leader_invalidate(OfGroup) ), - ok. + ok; +leader_invalidate(ToAgent, OfGroup) -> + emqx_ds_shared_sub_proto_v1:leader_invalidate( + ?agent_node(ToAgent), ToAgent, OfGroup + ). %%-------------------------------------------------------------------- %% Internal API %%-------------------------------------------------------------------- --ifdef(TEST). -agent(Id, Pid) -> - #agent{id = Id, pid = Pid}. - -agent_pid(#agent{pid = Pid}) -> - Pid. - --else. agent(_Id, Pid) -> - Pid. - -agent_pid(Pid) -> - Pid. --endif. + ?agent(_Id, Pid). format_streams(Streams) -> lists:map( diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl index 6689a0d3b..c9227ea2d 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_proto.hrl @@ -6,9 +6,6 @@ %% These messages are instantiated on the receiver's side, so they do not %% travel over the network. --ifndef(EMQX_DS_SHARED_SUB_PROTO_HRL). --define(EMQX_DS_SHARED_SUB_PROTO_HRL, true). - %% NOTE %% We do not need any kind of request/response identification, %% because the protocol is fully event-based. @@ -140,4 +137,32 @@ group := Group }). +%% Helpers +%% In test mode we extend agents with (session) Id to have more +%% readable traces. + +-ifdef(TEST). + +-define(agent(Id, Pid), {Id, Pid}). + +-define(agent_pid(Agent), element(2, Agent)). + +-define(agent_node(Agent), node(element(2, Agent))). + +%% -ifdef(TEST). +-else. + +-define(agent(Id, Pid), Pid). + +-define(agent_pid(Agent), Agent). + +-define(agent_node(Agent), node(Agent)). + +%% -ifdef(TEST). -endif. + +-define(is_local_agent(Agent), (?agent_node(Agent) =:= node())). + +-define(leader_node(Leader), node(Leader)). + +-define(is_local_leader(Leader), (?leader_node(Leader) =:= node())). diff --git a/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl new file mode 100644 index 000000000..b0a132ea5 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/proto/emqx_ds_shared_sub_proto_v1.erl @@ -0,0 +1,116 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_proto_v1). + +-behaviour(emqx_bpapi). + +-include_lib("emqx/include/bpapi.hrl"). + +-export([ + introduced_in/0, + + agent_connect_leader/4, + agent_update_stream_states/5, + agent_update_stream_states/6, + + leader_lease_streams/6, + leader_renew_stream_lease/4, + leader_renew_stream_lease/5, + leader_update_streams/6, + leader_invalidate/3 +]). + +introduced_in() -> + "5.8.0". + +-spec agent_connect_leader( + node(), + emqx_ds_shared_sub_proto:leader(), + emqx_ds_shared_sub_proto:agent(), + emqx_ds_shared_sub_proto:topic_filter() +) -> ok. +agent_connect_leader(Node, ToLeader, FromAgent, TopicFilter) -> + erpc:cast(Node, emqx_ds_shared_sub_proto, agent_connect_leader, [ + ToLeader, FromAgent, TopicFilter + ]). + +-spec agent_update_stream_states( + node(), + emqx_ds_shared_sub_proto:leader(), + emqx_ds_shared_sub_proto:agent(), + list(emqx_ds_shared_sub_proto:agent_stream_progress()), + emqx_ds_shared_sub_proto:version() +) -> ok. +agent_update_stream_states(Node, ToLeader, FromAgent, StreamProgresses, Version) -> + erpc:cast(Node, emqx_ds_shared_sub_proto, agent_update_stream_states, [ + ToLeader, FromAgent, StreamProgresses, Version + ]). + +-spec agent_update_stream_states( + node(), + emqx_ds_shared_sub_proto:leader(), + emqx_ds_shared_sub_proto:agent(), + list(emqx_ds_shared_sub_proto:agent_stream_progress()), + emqx_ds_shared_sub_proto:version(), + emqx_ds_shared_sub_proto:version() +) -> ok. +agent_update_stream_states(Node, ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) -> + erpc:cast(Node, emqx_ds_shared_sub_proto, agent_update_stream_states, [ + ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew + ]). + +%% leader -> agent messages + +-spec leader_lease_streams( + node(), + emqx_ds_shared_sub_proto:agent(), + emqx_ds_shared_sub_proto:group(), + emqx_ds_shared_sub_proto:leader(), + list(emqx_ds_shared_sub_proto:stream_progress()), + emqx_ds_shared_sub_proto:version() +) -> ok. +leader_lease_streams(Node, ToAgent, OfGroup, Leader, Streams, Version) -> + erpc:cast(Node, emqx_ds_shared_sub_proto, leader_lease_streams, [ + ToAgent, OfGroup, Leader, Streams, Version + ]). + +-spec leader_renew_stream_lease( + node(), + emqx_ds_shared_sub_proto:agent(), + emqx_ds_shared_sub_proto:group(), + emqx_ds_shared_sub_proto:version() +) -> ok. +leader_renew_stream_lease(Node, ToAgent, OfGroup, Version) -> + erpc:cast(Node, emqx_ds_shared_sub_proto, leader_renew_stream_lease, [ToAgent, OfGroup, Version]). + +-spec leader_renew_stream_lease( + node(), + emqx_ds_shared_sub_proto:agent(), + emqx_ds_shared_sub_proto:group(), + emqx_ds_shared_sub_proto:version(), + emqx_ds_shared_sub_proto:version() +) -> ok. +leader_renew_stream_lease(Node, ToAgent, OfGroup, VersionOld, VersionNew) -> + erpc:cast(Node, emqx_ds_shared_sub_proto, leader_renew_stream_lease, [ + ToAgent, OfGroup, VersionOld, VersionNew + ]). + +-spec leader_update_streams( + node(), + emqx_ds_shared_sub_proto:agent(), + emqx_ds_shared_sub_proto:group(), + emqx_ds_shared_sub_proto:version(), + emqx_ds_shared_sub_proto:version(), + list(emqx_ds_shared_sub_proto:stream_progress()) +) -> ok. +leader_update_streams(Node, ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> + erpc:cast(Node, emqx_ds_shared_sub_proto, leader_update_streams, [ + ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew + ]). + +-spec leader_invalidate(node(), emqx_ds_shared_sub_proto:agent(), emqx_ds_shared_sub_proto:group()) -> + ok. +leader_invalidate(Node, ToAgent, OfGroup) -> + erpc:cast(Node, emqx_ds_shared_sub_proto, leader_invalidate, [ToAgent, OfGroup]).