feat(dsrepl): cache shard metadata in persistent terms

This commit is contained in:
Andrew Mayorov 2024-02-01 17:25:07 +01:00
parent e6c2c2fb07
commit d19128ed65
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
4 changed files with 194 additions and 112 deletions

View File

@ -153,7 +153,7 @@ shard_spec(DB, Shard) ->
shard_replication_spec(DB, Shard) -> shard_replication_spec(DB, Shard) ->
#{ #{
id => {Shard, replication}, id => {Shard, replication},
start => {emqx_ds_replication_layer, ra_start_shard, [DB, Shard]}, start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard]},
restart => transient, restart => transient,
type => worker type => worker
}. }.

View File

@ -57,7 +57,6 @@
do_delete_next_v4/5, do_delete_next_v4/5,
%% FIXME %% FIXME
ra_start_shard/2,
ra_store_batch/3 ra_store_batch/3
]). ]).
@ -493,45 +492,13 @@ list_nodes() ->
%% %%
ra_start_shard(DB, Shard) ->
System = default,
Site = emqx_ds_replication_layer_meta:this_site(),
ClusterName = ra_cluster_name(DB, Shard),
LocalServer = ra_local_server(DB, Shard),
Servers = ra_shard_servers(DB, Shard),
case ra:restart_server(System, LocalServer) of
ok ->
ok;
{error, name_not_registered} ->
ok = ra:start_server(System, #{
id => LocalServer,
uid => <<ClusterName/binary, "_", Site/binary>>,
cluster_name => ClusterName,
initial_members => Servers,
machine => {module, ?MODULE, #{db => DB, shard => Shard}},
log_init_args => #{}
})
end,
case Servers of
[LocalServer | _] ->
%% TODO
%% Not super robust, but we probably don't expect nodes to be down
%% when we bring up a fresh consensus group. Triggering election
%% is not really required otherwise.
%% TODO
%% Ensure that doing that on node restart does not disrupt consensus.
ok = ra:trigger_election(LocalServer);
_ ->
ok
end,
emqx_ds_replication_layer_shard:start_link(LocalServer).
ra_store_batch(DB, Shard, Messages) -> ra_store_batch(DB, Shard, Messages) ->
Command = #{ Command = #{
?tag => ?BATCH, ?tag => ?BATCH,
?batch_messages => Messages ?batch_messages => Messages
}, },
case ra:process_command(ra_leader_servers(DB, Shard), Command) of Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command) of
{ok, Result, _Leader} -> {ok, Result, _Leader} ->
Result; Result;
Error -> Error ->
@ -540,7 +507,8 @@ ra_store_batch(DB, Shard, Messages) ->
ra_add_generation(DB, Shard) -> ra_add_generation(DB, Shard) ->
Command = #{?tag => add_generation}, Command = #{?tag => add_generation},
case ra:process_command(ra_leader_servers(DB, Shard), Command) of Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command) of
{ok, Result, _Leader} -> {ok, Result, _Leader} ->
Result; Result;
Error -> Error ->
@ -549,7 +517,8 @@ ra_add_generation(DB, Shard) ->
ra_update_config(DB, Shard, Opts) -> ra_update_config(DB, Shard, Opts) ->
Command = #{?tag => update_config, ?config => Opts}, Command = #{?tag => update_config, ?config => Opts},
case ra:process_command(ra_leader_servers(DB, Shard), Command) of Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command) of
{ok, Result, _Leader} -> {ok, Result, _Leader} ->
Result; Result;
Error -> Error ->
@ -558,7 +527,8 @@ ra_update_config(DB, Shard, Opts) ->
ra_drop_generation(DB, Shard, GenId) -> ra_drop_generation(DB, Shard, GenId) ->
Command = #{?tag => drop_generation, ?generation => GenId}, Command = #{?tag => drop_generation, ?generation => GenId},
case ra:process_command(ra_leader_servers(DB, Shard), Command) of Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred),
case ra:process_command(Servers, Command) of
{ok, Result, _Leader} -> {ok, Result, _Leader} ->
Result; Result;
Error -> Error ->
@ -566,7 +536,7 @@ ra_drop_generation(DB, Shard, GenId) ->
end. end.
ra_get_streams(DB, Shard, TopicFilter, Time) -> ra_get_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = ra_random_replica(DB, Shard), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time). emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time).
ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
@ -574,7 +544,7 @@ ra_get_delete_streams(DB, Shard, TopicFilter, Time) ->
emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time). emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = ra_random_replica(DB, Shard), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
@ -582,11 +552,11 @@ ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
ra_update_iterator(DB, Shard, Iter, DSKey) -> ra_update_iterator(DB, Shard, Iter, DSKey) ->
{_Name, Node} = ra_random_replica(DB, Shard), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
ra_next(DB, Shard, Iter, BatchSize) -> ra_next(DB, Shard, Iter, BatchSize) ->
{_Name, Node} = ra_random_replica(DB, Shard), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
@ -594,68 +564,12 @@ ra_delete_next(DB, Shard, Iter, Selector, BatchSize) ->
emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize). emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize).
ra_list_generations_with_lifetimes(DB, Shard) -> ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = ra_random_replica(DB, Shard), {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower),
emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard). emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard).
ra_drop_shard(DB, Shard) -> ra_drop_shard(DB, Shard) ->
ra:force_delete_server(_System = default, ra_local_server(DB, Shard)). LocalServer = emqx_ds_replication_layer_shard:server(DB, Shard, local),
ra:force_delete_server(_System = default, LocalServer).
ra_shard_servers(DB, Shard) ->
{ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
[
{ra_server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}
|| Site <- ReplicaSet
].
ra_local_server(DB, Shard) ->
Site = emqx_ds_replication_layer_meta:this_site(),
{ra_server_name(DB, Shard, Site), node()}.
ra_leader_servers(DB, Shard) ->
%% NOTE: Contact last known leader first, then rest of shard servers.
ClusterName = ra_cluster_name(DB, Shard),
case ra_leaderboard:lookup_leader(ClusterName) of
Leader when Leader /= undefined ->
Servers = ra_leaderboard:lookup_members(ClusterName),
[Leader | lists:delete(Leader, Servers)];
undefined ->
%% TODO: Dynamic membership.
ra_shard_servers(DB, Shard)
end.
ra_random_replica(DB, Shard) ->
%% NOTE: Contact random replica that is not a known leader.
%% TODO: Replica may be down, so we may need to retry.
ClusterName = ra_cluster_name(DB, Shard),
case ra_leaderboard:lookup_members(ClusterName) of
Servers when is_list(Servers) ->
Leader = ra_leaderboard:lookup_leader(ClusterName),
ra_pick_replica(Servers, Leader);
undefined ->
%% TODO
%% Leader is unkonwn if there are no servers of this group on the
%% local node. We want to pick a replica in that case as well.
%% TODO: Dynamic membership.
ra_pick_server(ra_shard_servers(DB, Shard))
end.
ra_pick_replica(Servers, Leader) ->
case lists:delete(Leader, Servers) of
[] ->
Leader;
Followers ->
ra_pick_server(Followers)
end.
ra_pick_server(Servers) ->
lists:nth(rand:uniform(length(Servers)), Servers).
ra_cluster_name(DB, Shard) ->
iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])).
ra_server_name(DB, Shard, Site) ->
DBBin = atom_to_binary(DB),
binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>).
%% %%

View File

@ -29,6 +29,7 @@
-export([ -export([
shards/1, shards/1,
my_shards/1, my_shards/1,
shard_meta/2,
replica_set/2, replica_set/2,
sites/0, sites/0,
node/1, node/1,
@ -96,6 +97,8 @@
%% Peristent term key: %% Peristent term key:
-define(emqx_ds_builtin_site, emqx_ds_builtin_site). -define(emqx_ds_builtin_site, emqx_ds_builtin_site).
-define(DB_META(DB), {?MODULE, DB}).
%%================================================================================ %%================================================================================
%% API funcions %% API funcions
%%================================================================================ %%================================================================================
@ -152,6 +155,12 @@ my_shards(DB) ->
lists:member(Site, ReplicaSet) lists:member(Site, ReplicaSet)
end). end).
shard_meta(DB, Shard) ->
case get_db_meta(DB) of
#{Shard := Meta} -> Meta;
#{} -> undefined
end.
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
{ok, [site()]} | {error, _}. {ok, [site()]} | {error, _}.
replica_set(DB, Shard) -> replica_set(DB, Shard) ->
@ -264,6 +273,7 @@ open_db_trans(DB, CreateOpts) ->
ReplicationFactor = maps:get(replication_factor, CreateOpts), ReplicationFactor = maps:get(replication_factor, CreateOpts),
mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}),
create_shards(DB, NSites, NShards, ReplicationFactor), create_shards(DB, NSites, NShards, ReplicationFactor),
save_db_meta(DB),
CreateOpts; CreateOpts;
[#?META_TAB{db_props = Opts}] -> [#?META_TAB{db_props = Opts}] ->
Opts Opts
@ -379,6 +389,28 @@ create_shards(DB, NSites, NShards, ReplicationFactor) ->
Shards Shards
). ).
save_db_meta(DB) ->
Shards = mnesia:match_object(?SHARD_TAB, #?SHARD_TAB{_ = '_'}, read),
Meta = maps:from_list([mk_shard_meta(Shard) || Shard <- Shards]),
persistent_term:put(?DB_META(DB), Meta).
get_db_meta(DB) ->
persistent_term:get(?DB_META(DB)).
% erase_db_meta(DB) ->
% persistent_term:erase(?DB_META(DB)).
mk_shard_meta(#?SHARD_TAB{shard = {DB, Shard}, replica_set = ReplicaSet}) ->
%% FIXME: Wrong place.
Servers = [
{emqx_ds_replication_layer_shard:server_name(DB, Shard, Site), Node#?NODE_TAB.node}
|| Site <- ReplicaSet,
Node <- mnesia:read(?NODE_TAB, Site)
],
{Shard, #{
servers => Servers
}}.
-spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). -spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any().
hash(Shard, Site) -> hash(Shard, Site) ->
erlang:phash2({Shard, Site}). erlang:phash2({Shard, Site}).

View File

@ -16,7 +16,13 @@
-module(emqx_ds_replication_layer_shard). -module(emqx_ds_replication_layer_shard).
-export([start_link/1]). -export([start_link/2]).
-export([shard_servers/2]).
-export([
servers/3,
server/3
]).
-behaviour(gen_server). -behaviour(gen_server).
-export([ -export([
@ -26,16 +32,106 @@
terminate/2 terminate/2
]). ]).
%% -define(PTERM(DB, SHARD, L), {?MODULE, DB, SHARD, L}).
-define(MEMOIZE(DB, SHARD, EXPR),
start_link(ServerId) -> case persistent_term:get(__X_Key = ?PTERM(DB, SHARD, ?LINE), undefined) of
gen_server:start_link(?MODULE, ServerId, []). undefined ->
ok = persistent_term:put(__X_Key, __X_Value = (EXPR)),
__X_Value;
__X_Value ->
__X_Value
end
).
%% %%
init(ServerId) -> start_link(DB, Shard) ->
process_flag(trap_exit, true), gen_server:start_link(?MODULE, {DB, Shard}, []).
{ok, ServerId}.
shard_servers(DB, Shard) ->
{ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
[
{server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)}
|| Site <- ReplicaSet
].
local_server(DB, Shard) ->
Site = emqx_ds_replication_layer_meta:this_site(),
{server_name(DB, Shard, Site), node()}.
cluster_name(DB, Shard) ->
iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])).
server_name(DB, Shard, Site) ->
DBBin = atom_to_binary(DB),
binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>).
%%
servers(DB, Shard, _Order = leader_preferred) ->
get_servers_leader_preferred(DB, Shard);
servers(DB, Shard, _Order = undefined) ->
get_shard_servers(DB, Shard).
server(DB, Shard, _Which = random_follower) ->
pick_random_replica(DB, Shard);
server(DB, Shard, _Which = local) ->
get_local_server(DB, Shard).
get_servers_leader_preferred(DB, Shard) ->
%% NOTE: Contact last known leader first, then rest of shard servers.
ClusterName = get_cluster_name(DB, Shard),
case ra_leaderboard:lookup_leader(ClusterName) of
Leader when Leader /= undefined ->
Servers = ra_leaderboard:lookup_members(ClusterName),
[Leader | lists:delete(Leader, Servers)];
undefined ->
%% TODO: Dynamic membership.
get_shard_servers(DB, Shard)
end.
pick_random_replica(DB, Shard) ->
%% NOTE: Contact random replica that is not a known leader.
%% TODO: Replica may be down, so we may need to retry.
ClusterName = get_cluster_name(DB, Shard),
case ra_leaderboard:lookup_members(ClusterName) of
Servers when is_list(Servers) ->
Leader = ra_leaderboard:lookup_leader(ClusterName),
pick_replica(Servers, Leader);
undefined ->
%% TODO
%% Leader is unkonwn if there are no servers of this group on the
%% local node. We want to pick a replica in that case as well.
%% TODO: Dynamic membership.
pick_server(get_shard_servers(DB, Shard))
end.
pick_replica(Servers, Leader) ->
case lists:delete(Leader, Servers) of
[] ->
Leader;
Followers ->
pick_server(Followers)
end.
pick_server(Servers) ->
lists:nth(rand:uniform(length(Servers)), Servers).
get_cluster_name(DB, Shard) ->
?MEMOIZE(DB, Shard, cluster_name(DB, Shard)).
get_local_server(DB, Shard) ->
?MEMOIZE(DB, Shard, local_server(DB, Shard)).
get_shard_servers(DB, Shard) ->
maps:get(servers, emqx_ds_replication_layer_meta:get_shard_meta(DB, Shard)).
%%
init({DB, Shard}) ->
_ = process_flag(trap_exit, true),
_Meta = start_shard(DB, Shard),
{ok, {DB, Shard}}.
handle_call(_Call, _From, State) -> handle_call(_Call, _From, State) ->
{reply, ignored, State}. {reply, ignored, State}.
@ -43,5 +139,45 @@ handle_call(_Call, _From, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
terminate(_Reason, ServerId) -> terminate(_Reason, {DB, Shard}) ->
ok = ra:stop_server(ServerId). LocalServer = get_local_server(DB, Shard),
ok = ra:stop_server(LocalServer).
%%
start_shard(DB, Shard) ->
System = default,
Site = emqx_ds_replication_layer_meta:this_site(),
ClusterName = cluster_name(DB, Shard),
LocalServer = local_server(DB, Shard),
Servers = shard_servers(DB, Shard),
case ra:restart_server(System, LocalServer) of
ok ->
ok;
{error, name_not_registered} ->
ok = ra:start_server(System, #{
id => LocalServer,
uid => <<ClusterName/binary, "_", Site/binary>>,
cluster_name => ClusterName,
initial_members => Servers,
machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}},
log_init_args => #{}
})
end,
case Servers of
[LocalServer | _] ->
%% TODO
%% Not super robust, but we probably don't expect nodes to be down
%% when we bring up a fresh consensus group. Triggering election
%% is not really required otherwise.
%% TODO
%% Ensure that doing that on node restart does not disrupt consensus.
ok = ra:trigger_election(LocalServer);
_ ->
ok
end,
#{
cluster_name => ClusterName,
servers => Servers,
local_server => LocalServer
}.