From d19128ed650d7ca50e03210ed651d3c8748c5972 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 1 Feb 2024 17:25:07 +0100 Subject: [PATCH] feat(dsrepl): cache shard metadata in persistent terms --- .../src/emqx_ds_builtin_db_sup.erl | 2 +- .../src/emqx_ds_replication_layer.erl | 116 ++----------- .../src/emqx_ds_replication_layer_meta.erl | 32 ++++ .../src/emqx_ds_replication_layer_shard.erl | 156 ++++++++++++++++-- 4 files changed, 194 insertions(+), 112 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index ce13d0ea5..93502df8c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -153,7 +153,7 @@ shard_spec(DB, Shard) -> shard_replication_spec(DB, Shard) -> #{ id => {Shard, replication}, - start => {emqx_ds_replication_layer, ra_start_shard, [DB, Shard]}, + start => {emqx_ds_replication_layer_shard, start_link, [DB, Shard]}, restart => transient, type => worker }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 36e798259..53a491d1f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -57,7 +57,6 @@ do_delete_next_v4/5, %% FIXME - ra_start_shard/2, ra_store_batch/3 ]). @@ -493,45 +492,13 @@ list_nodes() -> %% -ra_start_shard(DB, Shard) -> - System = default, - Site = emqx_ds_replication_layer_meta:this_site(), - ClusterName = ra_cluster_name(DB, Shard), - LocalServer = ra_local_server(DB, Shard), - Servers = ra_shard_servers(DB, Shard), - case ra:restart_server(System, LocalServer) of - ok -> - ok; - {error, name_not_registered} -> - ok = ra:start_server(System, #{ - id => LocalServer, - uid => <>, - cluster_name => ClusterName, - initial_members => Servers, - machine => {module, ?MODULE, #{db => DB, shard => Shard}}, - log_init_args => #{} - }) - end, - case Servers of - [LocalServer | _] -> - %% TODO - %% Not super robust, but we probably don't expect nodes to be down - %% when we bring up a fresh consensus group. Triggering election - %% is not really required otherwise. - %% TODO - %% Ensure that doing that on node restart does not disrupt consensus. - ok = ra:trigger_election(LocalServer); - _ -> - ok - end, - emqx_ds_replication_layer_shard:start_link(LocalServer). - ra_store_batch(DB, Shard, Messages) -> Command = #{ ?tag => ?BATCH, ?batch_messages => Messages }, - case ra:process_command(ra_leader_servers(DB, Shard), Command) of + Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), + case ra:process_command(Servers, Command) of {ok, Result, _Leader} -> Result; Error -> @@ -540,7 +507,8 @@ ra_store_batch(DB, Shard, Messages) -> ra_add_generation(DB, Shard) -> Command = #{?tag => add_generation}, - case ra:process_command(ra_leader_servers(DB, Shard), Command) of + Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), + case ra:process_command(Servers, Command) of {ok, Result, _Leader} -> Result; Error -> @@ -549,7 +517,8 @@ ra_add_generation(DB, Shard) -> ra_update_config(DB, Shard, Opts) -> Command = #{?tag => update_config, ?config => Opts}, - case ra:process_command(ra_leader_servers(DB, Shard), Command) of + Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), + case ra:process_command(Servers, Command) of {ok, Result, _Leader} -> Result; Error -> @@ -558,7 +527,8 @@ ra_update_config(DB, Shard, Opts) -> ra_drop_generation(DB, Shard, GenId) -> Command = #{?tag => drop_generation, ?generation => GenId}, - case ra:process_command(ra_leader_servers(DB, Shard), Command) of + Servers = emqx_ds_replication_layer_shard:servers(DB, Shard, leader_preferred), + case ra:process_command(Servers, Command) of {ok, Result, _Leader} -> Result; Error -> @@ -566,7 +536,7 @@ ra_drop_generation(DB, Shard, GenId) -> end. ra_get_streams(DB, Shard, TopicFilter, Time) -> - {_Name, Node} = ra_random_replica(DB, Shard), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time). ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> @@ -574,7 +544,7 @@ ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time). ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_Name, Node} = ra_random_replica(DB, Shard), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> @@ -582,11 +552,11 @@ ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). ra_update_iterator(DB, Shard, Iter, DSKey) -> - {_Name, Node} = ra_random_replica(DB, Shard), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). ra_next(DB, Shard, Iter, BatchSize) -> - {_Name, Node} = ra_random_replica(DB, Shard), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> @@ -594,68 +564,12 @@ ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize). ra_list_generations_with_lifetimes(DB, Shard) -> - {_Name, Node} = ra_random_replica(DB, Shard), + {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, random_follower), emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard). ra_drop_shard(DB, Shard) -> - ra:force_delete_server(_System = default, ra_local_server(DB, Shard)). - -ra_shard_servers(DB, Shard) -> - {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard), - [ - {ra_server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)} - || Site <- ReplicaSet - ]. - -ra_local_server(DB, Shard) -> - Site = emqx_ds_replication_layer_meta:this_site(), - {ra_server_name(DB, Shard, Site), node()}. - -ra_leader_servers(DB, Shard) -> - %% NOTE: Contact last known leader first, then rest of shard servers. - ClusterName = ra_cluster_name(DB, Shard), - case ra_leaderboard:lookup_leader(ClusterName) of - Leader when Leader /= undefined -> - Servers = ra_leaderboard:lookup_members(ClusterName), - [Leader | lists:delete(Leader, Servers)]; - undefined -> - %% TODO: Dynamic membership. - ra_shard_servers(DB, Shard) - end. - -ra_random_replica(DB, Shard) -> - %% NOTE: Contact random replica that is not a known leader. - %% TODO: Replica may be down, so we may need to retry. - ClusterName = ra_cluster_name(DB, Shard), - case ra_leaderboard:lookup_members(ClusterName) of - Servers when is_list(Servers) -> - Leader = ra_leaderboard:lookup_leader(ClusterName), - ra_pick_replica(Servers, Leader); - undefined -> - %% TODO - %% Leader is unkonwn if there are no servers of this group on the - %% local node. We want to pick a replica in that case as well. - %% TODO: Dynamic membership. - ra_pick_server(ra_shard_servers(DB, Shard)) - end. - -ra_pick_replica(Servers, Leader) -> - case lists:delete(Leader, Servers) of - [] -> - Leader; - Followers -> - ra_pick_server(Followers) - end. - -ra_pick_server(Servers) -> - lists:nth(rand:uniform(length(Servers)), Servers). - -ra_cluster_name(DB, Shard) -> - iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])). - -ra_server_name(DB, Shard, Site) -> - DBBin = atom_to_binary(DB), - binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>). + LocalServer = emqx_ds_replication_layer_shard:server(DB, Shard, local), + ra:force_delete_server(_System = default, LocalServer). %% diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 53de5b41e..070e0ac93 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -29,6 +29,7 @@ -export([ shards/1, my_shards/1, + shard_meta/2, replica_set/2, sites/0, node/1, @@ -96,6 +97,8 @@ %% Peristent term key: -define(emqx_ds_builtin_site, emqx_ds_builtin_site). +-define(DB_META(DB), {?MODULE, DB}). + %%================================================================================ %% API funcions %%================================================================================ @@ -152,6 +155,12 @@ my_shards(DB) -> lists:member(Site, ReplicaSet) end). +shard_meta(DB, Shard) -> + case get_db_meta(DB) of + #{Shard := Meta} -> Meta; + #{} -> undefined + end. + -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, [site()]} | {error, _}. replica_set(DB, Shard) -> @@ -264,6 +273,7 @@ open_db_trans(DB, CreateOpts) -> ReplicationFactor = maps:get(replication_factor, CreateOpts), mnesia:write(#?META_TAB{db = DB, db_props = CreateOpts}), create_shards(DB, NSites, NShards, ReplicationFactor), + save_db_meta(DB), CreateOpts; [#?META_TAB{db_props = Opts}] -> Opts @@ -379,6 +389,28 @@ create_shards(DB, NSites, NShards, ReplicationFactor) -> Shards ). +save_db_meta(DB) -> + Shards = mnesia:match_object(?SHARD_TAB, #?SHARD_TAB{_ = '_'}, read), + Meta = maps:from_list([mk_shard_meta(Shard) || Shard <- Shards]), + persistent_term:put(?DB_META(DB), Meta). + +get_db_meta(DB) -> + persistent_term:get(?DB_META(DB)). + +% erase_db_meta(DB) -> +% persistent_term:erase(?DB_META(DB)). + +mk_shard_meta(#?SHARD_TAB{shard = {DB, Shard}, replica_set = ReplicaSet}) -> + %% FIXME: Wrong place. + Servers = [ + {emqx_ds_replication_layer_shard:server_name(DB, Shard, Site), Node#?NODE_TAB.node} + || Site <- ReplicaSet, + Node <- mnesia:read(?NODE_TAB, Site) + ], + {Shard, #{ + servers => Servers + }}. + -spec hash(emqx_ds_replication_layer:shard_id(), site()) -> any(). hash(Shard, Site) -> erlang:phash2({Shard, Site}). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 0fd8499ad..9c66650d3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -16,7 +16,13 @@ -module(emqx_ds_replication_layer_shard). --export([start_link/1]). +-export([start_link/2]). +-export([shard_servers/2]). + +-export([ + servers/3, + server/3 +]). -behaviour(gen_server). -export([ @@ -26,16 +32,106 @@ terminate/2 ]). -%% - -start_link(ServerId) -> - gen_server:start_link(?MODULE, ServerId, []). +-define(PTERM(DB, SHARD, L), {?MODULE, DB, SHARD, L}). +-define(MEMOIZE(DB, SHARD, EXPR), + case persistent_term:get(__X_Key = ?PTERM(DB, SHARD, ?LINE), undefined) of + undefined -> + ok = persistent_term:put(__X_Key, __X_Value = (EXPR)), + __X_Value; + __X_Value -> + __X_Value + end +). %% -init(ServerId) -> - process_flag(trap_exit, true), - {ok, ServerId}. +start_link(DB, Shard) -> + gen_server:start_link(?MODULE, {DB, Shard}, []). + +shard_servers(DB, Shard) -> + {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + [ + {server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)} + || Site <- ReplicaSet + ]. + +local_server(DB, Shard) -> + Site = emqx_ds_replication_layer_meta:this_site(), + {server_name(DB, Shard, Site), node()}. + +cluster_name(DB, Shard) -> + iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])). + +server_name(DB, Shard, Site) -> + DBBin = atom_to_binary(DB), + binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>). + +%% + +servers(DB, Shard, _Order = leader_preferred) -> + get_servers_leader_preferred(DB, Shard); +servers(DB, Shard, _Order = undefined) -> + get_shard_servers(DB, Shard). + +server(DB, Shard, _Which = random_follower) -> + pick_random_replica(DB, Shard); +server(DB, Shard, _Which = local) -> + get_local_server(DB, Shard). + +get_servers_leader_preferred(DB, Shard) -> + %% NOTE: Contact last known leader first, then rest of shard servers. + ClusterName = get_cluster_name(DB, Shard), + case ra_leaderboard:lookup_leader(ClusterName) of + Leader when Leader /= undefined -> + Servers = ra_leaderboard:lookup_members(ClusterName), + [Leader | lists:delete(Leader, Servers)]; + undefined -> + %% TODO: Dynamic membership. + get_shard_servers(DB, Shard) + end. + +pick_random_replica(DB, Shard) -> + %% NOTE: Contact random replica that is not a known leader. + %% TODO: Replica may be down, so we may need to retry. + ClusterName = get_cluster_name(DB, Shard), + case ra_leaderboard:lookup_members(ClusterName) of + Servers when is_list(Servers) -> + Leader = ra_leaderboard:lookup_leader(ClusterName), + pick_replica(Servers, Leader); + undefined -> + %% TODO + %% Leader is unkonwn if there are no servers of this group on the + %% local node. We want to pick a replica in that case as well. + %% TODO: Dynamic membership. + pick_server(get_shard_servers(DB, Shard)) + end. + +pick_replica(Servers, Leader) -> + case lists:delete(Leader, Servers) of + [] -> + Leader; + Followers -> + pick_server(Followers) + end. + +pick_server(Servers) -> + lists:nth(rand:uniform(length(Servers)), Servers). + +get_cluster_name(DB, Shard) -> + ?MEMOIZE(DB, Shard, cluster_name(DB, Shard)). + +get_local_server(DB, Shard) -> + ?MEMOIZE(DB, Shard, local_server(DB, Shard)). + +get_shard_servers(DB, Shard) -> + maps:get(servers, emqx_ds_replication_layer_meta:get_shard_meta(DB, Shard)). + +%% + +init({DB, Shard}) -> + _ = process_flag(trap_exit, true), + _Meta = start_shard(DB, Shard), + {ok, {DB, Shard}}. handle_call(_Call, _From, State) -> {reply, ignored, State}. @@ -43,5 +139,45 @@ handle_call(_Call, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -terminate(_Reason, ServerId) -> - ok = ra:stop_server(ServerId). +terminate(_Reason, {DB, Shard}) -> + LocalServer = get_local_server(DB, Shard), + ok = ra:stop_server(LocalServer). + +%% + +start_shard(DB, Shard) -> + System = default, + Site = emqx_ds_replication_layer_meta:this_site(), + ClusterName = cluster_name(DB, Shard), + LocalServer = local_server(DB, Shard), + Servers = shard_servers(DB, Shard), + case ra:restart_server(System, LocalServer) of + ok -> + ok; + {error, name_not_registered} -> + ok = ra:start_server(System, #{ + id => LocalServer, + uid => <>, + cluster_name => ClusterName, + initial_members => Servers, + machine => {module, emqx_ds_replication_layer, #{db => DB, shard => Shard}}, + log_init_args => #{} + }) + end, + case Servers of + [LocalServer | _] -> + %% TODO + %% Not super robust, but we probably don't expect nodes to be down + %% when we bring up a fresh consensus group. Triggering election + %% is not really required otherwise. + %% TODO + %% Ensure that doing that on node restart does not disrupt consensus. + ok = ra:trigger_election(LocalServer); + _ -> + ok + end, + #{ + cluster_name => ClusterName, + servers => Servers, + local_server => LocalServer + }.