feat(queue): wrap remote calls in a proto

This commit is contained in:
Ilya Averyanov 2024-06-27 18:09:59 +03:00
parent 61eda0ff31
commit 49bff5c08a
3 changed files with 206 additions and 47 deletions

View File

@ -9,6 +9,7 @@
-module(emqx_ds_shared_sub_proto). -module(emqx_ds_shared_sub_proto).
-include("emqx_ds_shared_sub_proto.hrl"). -include("emqx_ds_shared_sub_proto.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([ -export([
@ -28,16 +29,7 @@
agent/2 agent/2
]). ]).
-ifdef(TEST). -type agent() :: ?agent(emqx_persistent_session_ds:id(), pid()).
-record(agent, {
pid :: pid(),
id :: term()
}).
-type agent() :: #agent{}.
-else.
-type agent() :: pid().
-endif.
-type leader() :: pid(). -type leader() :: pid().
-type topic_filter() :: emqx_persistent_session_ds:share_topic_filter(). -type topic_filter() :: emqx_persistent_session_ds:share_topic_filter().
-type group() :: emqx_types:group(). -type group() :: emqx_types:group().
@ -69,7 +61,7 @@
%% agent -> leader messages %% agent -> leader messages
-spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok. -spec agent_connect_leader(leader(), agent(), topic_filter()) -> ok.
agent_connect_leader(ToLeader, FromAgent, TopicFilter) -> agent_connect_leader(ToLeader, FromAgent, TopicFilter) when ?is_local_leader(ToLeader) ->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => agent_connect_leader, type => agent_connect_leader,
to_leader => ToLeader, to_leader => ToLeader,
@ -77,10 +69,16 @@ agent_connect_leader(ToLeader, FromAgent, TopicFilter) ->
topic_filter => TopicFilter topic_filter => TopicFilter
}), }),
_ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)), _ = erlang:send(ToLeader, ?agent_connect_leader(FromAgent, TopicFilter)),
ok. ok;
agent_connect_leader(ToLeader, FromAgent, TopicFilter) ->
emqx_ds_shared_sub_proto_v1:agent_connect_leader(
?leader_node(ToLeader), ToLeader, FromAgent, TopicFilter
).
-spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok. -spec agent_update_stream_states(leader(), agent(), list(agent_stream_progress()), version()) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) -> agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) when
?is_local_leader(ToLeader)
->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => agent_update_stream_states, type => agent_update_stream_states,
to_leader => ToLeader, to_leader => ToLeader,
@ -89,12 +87,18 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
version => Version version => Version
}), }),
_ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)), _ = erlang:send(ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, Version)),
ok. ok;
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, Version) ->
emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, Version
).
-spec agent_update_stream_states( -spec agent_update_stream_states(
leader(), agent(), list(agent_stream_progress()), version(), version() leader(), agent(), list(agent_stream_progress()), version(), version()
) -> ok. ) -> ok.
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) -> agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) when
?is_local_leader(ToLeader)
->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => agent_update_stream_states, type => agent_update_stream_states,
to_leader => ToLeader, to_leader => ToLeader,
@ -106,12 +110,16 @@ agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, Ve
_ = erlang:send( _ = erlang:send(
ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew) ToLeader, ?agent_update_stream_states(FromAgent, StreamProgresses, VersionOld, VersionNew)
), ),
ok. ok;
agent_update_stream_states(ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) ->
emqx_ds_shared_sub_proto_v1:agent_update_stream_states(
?leader_node(ToLeader), ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew
).
%% leader -> agent messages %% leader -> agent messages
-spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok. -spec leader_lease_streams(agent(), group(), leader(), list(stream_progress()), version()) -> ok.
leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) -> leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => leader_lease_streams, type => leader_lease_streams,
to_agent => ToAgent, to_agent => ToAgent,
@ -121,13 +129,17 @@ leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
version => Version version => Version
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
agent_pid(ToAgent), ?agent_pid(ToAgent),
?leader_lease_streams(OfGroup, Leader, Streams, Version) ?leader_lease_streams(OfGroup, Leader, Streams, Version)
), ),
ok. ok;
leader_lease_streams(ToAgent, OfGroup, Leader, Streams, Version) ->
emqx_ds_shared_sub_proto_v1:leader_lease_streams(
?agent_node(ToAgent), ToAgent, OfGroup, Leader, Streams, Version
).
-spec leader_renew_stream_lease(agent(), group(), version()) -> ok. -spec leader_renew_stream_lease(agent(), group(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, Version) -> leader_renew_stream_lease(ToAgent, OfGroup, Version) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => leader_renew_stream_lease, type => leader_renew_stream_lease,
to_agent => ToAgent, to_agent => ToAgent,
@ -135,13 +147,17 @@ leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
version => Version version => Version
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
agent_pid(ToAgent), ?agent_pid(ToAgent),
?leader_renew_stream_lease(OfGroup, Version) ?leader_renew_stream_lease(OfGroup, Version)
), ),
ok. ok;
leader_renew_stream_lease(ToAgent, OfGroup, Version) ->
emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
?agent_node(ToAgent), ToAgent, OfGroup, Version
).
-spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok. -spec leader_renew_stream_lease(agent(), group(), version(), version()) -> ok.
leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) -> leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => leader_renew_stream_lease, type => leader_renew_stream_lease,
to_agent => ToAgent, to_agent => ToAgent,
@ -150,13 +166,19 @@ leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
version_new => VersionNew version_new => VersionNew
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
agent_pid(ToAgent), ?agent_pid(ToAgent),
?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew) ?leader_renew_stream_lease(OfGroup, VersionOld, VersionNew)
), ),
ok. ok;
leader_renew_stream_lease(ToAgent, OfGroup, VersionOld, VersionNew) ->
emqx_ds_shared_sub_proto_v1:leader_renew_stream_lease(
?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew
).
-spec leader_update_streams(agent(), group(), version(), version(), list(stream_progress())) -> ok. -spec leader_update_streams(agent(), group(), version(), version(), list(stream_progress())) -> ok.
leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) -> leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) when
?is_local_agent(ToAgent)
->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => leader_update_streams, type => leader_update_streams,
to_agent => ToAgent, to_agent => ToAgent,
@ -166,42 +188,38 @@ leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
streams_new => format_streams(StreamsNew) streams_new => format_streams(StreamsNew)
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
agent_pid(ToAgent), ?agent_pid(ToAgent),
?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew) ?leader_update_streams(OfGroup, VersionOld, VersionNew, StreamsNew)
), ),
ok. ok;
leader_update_streams(ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
emqx_ds_shared_sub_proto_v1:leader_update_streams(
?agent_node(ToAgent), ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew
).
-spec leader_invalidate(agent(), group()) -> ok. -spec leader_invalidate(agent(), group()) -> ok.
leader_invalidate(ToAgent, OfGroup) -> leader_invalidate(ToAgent, OfGroup) when ?is_local_agent(ToAgent) ->
?tp(warning, shared_sub_proto_msg, #{ ?tp(warning, shared_sub_proto_msg, #{
type => leader_invalidate, type => leader_invalidate,
to_agent => ToAgent, to_agent => ToAgent,
of_group => OfGroup of_group => OfGroup
}), }),
_ = emqx_persistent_session_ds_shared_subs_agent:send( _ = emqx_persistent_session_ds_shared_subs_agent:send(
agent_pid(ToAgent), ?agent_pid(ToAgent),
?leader_invalidate(OfGroup) ?leader_invalidate(OfGroup)
), ),
ok. ok;
leader_invalidate(ToAgent, OfGroup) ->
emqx_ds_shared_sub_proto_v1:leader_invalidate(
?agent_node(ToAgent), ToAgent, OfGroup
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal API %% Internal API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-ifdef(TEST).
agent(Id, Pid) ->
#agent{id = Id, pid = Pid}.
agent_pid(#agent{pid = Pid}) ->
Pid.
-else.
agent(_Id, Pid) -> agent(_Id, Pid) ->
Pid. ?agent(_Id, Pid).
agent_pid(Pid) ->
Pid.
-endif.
format_streams(Streams) -> format_streams(Streams) ->
lists:map( lists:map(

View File

@ -6,9 +6,6 @@
%% These messages are instantiated on the receiver's side, so they do not %% These messages are instantiated on the receiver's side, so they do not
%% travel over the network. %% travel over the network.
-ifndef(EMQX_DS_SHARED_SUB_PROTO_HRL).
-define(EMQX_DS_SHARED_SUB_PROTO_HRL, true).
%% NOTE %% NOTE
%% We do not need any kind of request/response identification, %% We do not need any kind of request/response identification,
%% because the protocol is fully event-based. %% because the protocol is fully event-based.
@ -140,4 +137,32 @@
group := Group group := Group
}). }).
%% Helpers
%% In test mode we extend agents with (session) Id to have more
%% readable traces.
-ifdef(TEST).
-define(agent(Id, Pid), {Id, Pid}).
-define(agent_pid(Agent), element(2, Agent)).
-define(agent_node(Agent), node(element(2, Agent))).
%% -ifdef(TEST).
-else.
-define(agent(Id, Pid), Pid).
-define(agent_pid(Agent), Agent).
-define(agent_node(Agent), node(Agent)).
%% -ifdef(TEST).
-endif. -endif.
-define(is_local_agent(Agent), (?agent_node(Agent) =:= node())).
-define(leader_node(Leader), node(Leader)).
-define(is_local_leader(Leader), (?leader_node(Leader) =:= node())).

View File

@ -0,0 +1,116 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_proto_v1).
-behaviour(emqx_bpapi).
-include_lib("emqx/include/bpapi.hrl").
-export([
introduced_in/0,
agent_connect_leader/4,
agent_update_stream_states/5,
agent_update_stream_states/6,
leader_lease_streams/6,
leader_renew_stream_lease/4,
leader_renew_stream_lease/5,
leader_update_streams/6,
leader_invalidate/3
]).
introduced_in() ->
"5.8.0".
-spec agent_connect_leader(
node(),
emqx_ds_shared_sub_proto:leader(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:topic_filter()
) -> ok.
agent_connect_leader(Node, ToLeader, FromAgent, TopicFilter) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_connect_leader, [
ToLeader, FromAgent, TopicFilter
]).
-spec agent_update_stream_states(
node(),
emqx_ds_shared_sub_proto:leader(),
emqx_ds_shared_sub_proto:agent(),
list(emqx_ds_shared_sub_proto:agent_stream_progress()),
emqx_ds_shared_sub_proto:version()
) -> ok.
agent_update_stream_states(Node, ToLeader, FromAgent, StreamProgresses, Version) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_update_stream_states, [
ToLeader, FromAgent, StreamProgresses, Version
]).
-spec agent_update_stream_states(
node(),
emqx_ds_shared_sub_proto:leader(),
emqx_ds_shared_sub_proto:agent(),
list(emqx_ds_shared_sub_proto:agent_stream_progress()),
emqx_ds_shared_sub_proto:version(),
emqx_ds_shared_sub_proto:version()
) -> ok.
agent_update_stream_states(Node, ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, agent_update_stream_states, [
ToLeader, FromAgent, StreamProgresses, VersionOld, VersionNew
]).
%% leader -> agent messages
-spec leader_lease_streams(
node(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:leader(),
list(emqx_ds_shared_sub_proto:stream_progress()),
emqx_ds_shared_sub_proto:version()
) -> ok.
leader_lease_streams(Node, ToAgent, OfGroup, Leader, Streams, Version) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_lease_streams, [
ToAgent, OfGroup, Leader, Streams, Version
]).
-spec leader_renew_stream_lease(
node(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:version()
) -> ok.
leader_renew_stream_lease(Node, ToAgent, OfGroup, Version) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_renew_stream_lease, [ToAgent, OfGroup, Version]).
-spec leader_renew_stream_lease(
node(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:version(),
emqx_ds_shared_sub_proto:version()
) -> ok.
leader_renew_stream_lease(Node, ToAgent, OfGroup, VersionOld, VersionNew) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_renew_stream_lease, [
ToAgent, OfGroup, VersionOld, VersionNew
]).
-spec leader_update_streams(
node(),
emqx_ds_shared_sub_proto:agent(),
emqx_ds_shared_sub_proto:group(),
emqx_ds_shared_sub_proto:version(),
emqx_ds_shared_sub_proto:version(),
list(emqx_ds_shared_sub_proto:stream_progress())
) -> ok.
leader_update_streams(Node, ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_update_streams, [
ToAgent, OfGroup, VersionOld, VersionNew, StreamsNew
]).
-spec leader_invalidate(node(), emqx_ds_shared_sub_proto:agent(), emqx_ds_shared_sub_proto:group()) ->
ok.
leader_invalidate(Node, ToAgent, OfGroup) ->
erpc:cast(Node, emqx_ds_shared_sub_proto, leader_invalidate, [ToAgent, OfGroup]).