From 7fa3bbf176e59a431fd87a1616cad88a2f33e962 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 1 Feb 2024 14:07:54 +0100 Subject: [PATCH] wip: manage generations / db config through ra machine --- .../src/emqx_ds_replication_layer.erl | 121 +++++++++++------- .../src/emqx_ds_replication_layer.hrl | 10 +- .../src/emqx_ds_replication_layer_meta.erl | 88 ------------- .../src/emqx_ds_replication_layer_shard.erl | 47 +++++++ .../test/emqx_ds_SUITE.erl | 10 +- 5 files changed, 137 insertions(+), 139 deletions(-) create mode 100644 apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 280094576..ba3d52da1 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -33,9 +33,7 @@ make_iterator/4, update_iterator/3, next/3, - node_of_shard/2, - shard_of_message/3, - maybe_set_myself_as_leader/2 + shard_of_message/3 ]). %% internal exports: @@ -136,13 +134,19 @@ open_db(DB, CreateOpts) -> -spec add_generation(emqx_ds:db()) -> ok | {error, _}. add_generation(DB) -> - Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB), - _ = emqx_ds_proto_v4:add_generation(Nodes, DB), - ok. + foreach_shard( + DB, + fun(Shard) -> ok = ra_add_generation(DB, Shard) end + ). -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. update_db_config(DB, CreateOpts) -> - emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts). + ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts), + Opts = emqx_ds_replication_layer_meta:get_options(DB), + foreach_shard( + DB, + fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end + ). -spec list_generations_with_lifetimes(emqx_ds:db()) -> #{generation_rank() => emqx_ds:generation_info()}. @@ -150,13 +154,12 @@ list_generations_with_lifetimes(DB) -> Shards = list_shards(DB), lists:foldl( fun(Shard, GensAcc) -> - Node = node_of_shard(DB, Shard), maps:fold( fun(GenId, Data, AccInner) -> AccInner#{{Shard, GenId} => Data} end, GensAcc, - emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard) + ra_list_generations_with_lifetimes(DB, Shard) ) end, #{}, @@ -165,10 +168,7 @@ list_generations_with_lifetimes(DB) -> -spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}. drop_generation(DB, {Shard, GenId}) -> - %% TODO: drop generation in all nodes in the replica set, not only in the leader, - %% after we have proper replication in place. - Node = node_of_shard(DB, Shard), - emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId). + ra_drop_generation(DB, Shard, GenId). -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> @@ -247,17 +247,6 @@ next(DB, Iter0, BatchSize) -> Other end. --spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -node_of_shard(DB, Shard) -> - case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of - {ok, Leader} -> - Leader; - {error, no_leader_for_shard} -> - %% TODO: use optvar - timer:sleep(500), - node_of_shard(DB, Shard) - end. - -spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> emqx_ds_replication_layer:shard_id(). shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> @@ -269,18 +258,8 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> end, integer_to_binary(Hash). -%% TODO: there's no real leader election right now --spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok. -maybe_set_myself_as_leader(DB, Shard) -> - Site = emqx_ds_replication_layer_meta:this_site(), - case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of - [Site | _] -> - %% Currently the first in-sync replica always becomes the - %% leader - ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node()); - _Sites -> - ok - end. +foreach_shard(DB, Fun) -> + lists:foreach(Fun, list_shards(DB)). %%================================================================================ %% behavior callbacks @@ -375,7 +354,8 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> -spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}. do_add_generation_v2(DB) -> - MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB), + %% FIXME + MyShards = [], lists:foreach( fun(ShardId) -> emqx_ds_storage_layer:add_generation({DB, ShardId}) @@ -433,7 +413,7 @@ ra_start_shard(DB, Shard) -> _ -> ok end, - ignore. + emqx_ds_replication_layer_shard:start_link(LocalServer). ra_store_batch(DB, Shard, Messages) -> Command = #{ @@ -447,25 +427,55 @@ ra_store_batch(DB, Shard, Messages) -> error(Error, [DB, Shard]) end. +ra_add_generation(DB, Shard) -> + Command = #{?tag => add_generation}, + case ra:process_command(ra_leader_servers(DB, Shard), Command) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + +ra_update_config(DB, Shard, Opts) -> + Command = #{?tag => update_config, ?config => Opts}, + case ra:process_command(ra_leader_servers(DB, Shard), Command) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + +ra_drop_generation(DB, Shard, GenId) -> + Command = #{?tag => drop_generation, ?generation => GenId}, + case ra:process_command(ra_leader_servers(DB, Shard), Command) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + ra_get_streams(DB, Shard, TopicFilter, Time) -> {_Name, Node} = ra_random_replica(DB, Shard), - emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, Time). + emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time). ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> {_Name, Node} = ra_random_replica(DB, Shard), - emqx_ds_proto_v3:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). + emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). ra_update_iterator(DB, Shard, Iter, DSKey) -> {_Name, Node} = ra_random_replica(DB, Shard), - emqx_ds_proto_v3:update_iterator(Node, DB, Shard, Iter, DSKey). + emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). ra_next(DB, Shard, Iter, BatchSize) -> {_Name, Node} = ra_random_replica(DB, Shard), - emqx_ds_proto_v3:next(Node, DB, Shard, Iter, BatchSize). + emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). + +ra_list_generations_with_lifetimes(DB, Shard) -> + {_Name, Node} = ra_random_replica(DB, Shard), + emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard). ra_drop_shard(DB, Shard) -> - %% TODO: clean dsrepl state - ra:stop_server(_System = default, ra_local_server(DB, Shard)). + ra:force_delete_server(_System = default, ra_local_server(DB, Shard)). ra_shard_servers(DB, Shard) -> {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard), @@ -554,7 +564,28 @@ apply( NState = State#{latest := NLatest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, - {NState, Result, Effect}. + {NState, Result, Effect}; +apply( + _RaftMeta, + #{?tag := add_generation}, + State +) -> + Result = emqx_ds_storage_layer:add_generation(erlang:get(emqx_ds_db_shard)), + {State, Result}; +apply( + _RaftMeta, + #{?tag := update_config, ?config := Opts}, + State +) -> + Result = emqx_ds_storage_layer:update_config(erlang:get(emqx_ds_db_shard), Opts), + {State, Result}; +apply( + _RaftMeta, + #{?tag := drop_generation, ?generation := GenId}, + State +) -> + Result = emqx_ds_storage_layer:drop_generation(erlang:get(emqx_ds_db_shard), GenId), + {State, Result}. assign_timestamps(Latest, Messages) -> assign_timestamps(Latest, Messages, []). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 3254bbe49..daf644fa4 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -27,8 +27,16 @@ %% keys: -define(tag, 1). -define(shard, 2). --define(timestamp, 3). -define(enc, 3). + +%% ?BATCH -define(batch_messages, 2). +-define(timestamp, 3). + +%% update_config +-define(config, 2). + +%% drop_generation +-define(generation, 2). -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index b0ef4903f..54b293780 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -29,8 +29,6 @@ -export([ shards/1, my_shards/1, - my_owned_shards/1, - leader_nodes/1, replica_set/2, in_sync_replicas/2, sites/0, @@ -39,10 +37,7 @@ get_options/1, update_db_config/2, drop_db/1, - shard_leader/2, this_site/0, - set_leader/3, - is_leader/1, print_status/0 ]). @@ -56,8 +51,6 @@ drop_db_trans/1, claim_site/2, in_sync_replicas_trans/2, - set_leader_trans/3, - is_leader_trans/1, n_shards/1 ]). @@ -98,7 +91,6 @@ replica_set :: [site()], %% Sites that contain the actual data: in_sync_replicas :: [site()], - leader :: node() | undefined, misc = #{} :: map() }). @@ -164,27 +156,6 @@ my_shards(DB) -> lists:member(Site, ReplicaSet) end). --spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. -my_owned_shards(DB) -> - Self = node(), - filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) -> - Self =:= Leader - end). - --spec leader_nodes(emqx_ds:db()) -> [node()]. -leader_nodes(DB) -> - lists:uniq( - filter_shards( - DB, - fun(#?SHARD_TAB{leader = Leader}) -> - Leader =/= undefined - end, - fun(#?SHARD_TAB{leader = Leader}) -> - Leader - end - ) - ). - -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, [site()]} | {error, _}. replica_set(DB, Shard) -> @@ -219,27 +190,6 @@ node(Site) -> undefined end. --spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - {ok, node()} | {error, no_leader_for_shard}. -shard_leader(DB, Shard) -> - case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of - [#?SHARD_TAB{leader = Leader}] when Leader =/= undefined -> - {ok, Leader}; - _ -> - {error, no_leader_for_shard} - end. - --spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) -> - ok. -set_leader(DB, Shard, Node) -> - {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]), - ok. - --spec is_leader(node()) -> boolean(). -is_leader(Node) -> - {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]), - Result. - -spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). get_options(DB) -> {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]), @@ -286,7 +236,6 @@ init([]) -> init_ra(), ensure_tables(), ensure_site(), - {ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}), S = #s{}, {ok, S}. @@ -309,18 +258,6 @@ handle_call(_Call, _From, S) -> handle_cast(_Cast, S) -> {noreply, S}. -handle_info( - {mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S -) -> - MyShards = my_owned_shards(DB), - - lists:foreach( - fun(ShardId) -> - emqx_ds_storage_layer:update_config({DB, ShardId}, Options) - end, - MyShards - ), - {noreply, S}; handle_info(_Info, S) -> {noreply, S}. @@ -392,31 +329,6 @@ in_sync_replicas_trans(DB, Shard) -> {error, no_shard} end. --spec set_leader_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) -> - ok. -set_leader_trans(DB, Shard, Node) -> - [Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}), - Record = Record0#?SHARD_TAB{leader = Node}, - mnesia:write(Record). - --spec is_leader_trans(node) -> boolean(). -is_leader_trans(Node) -> - case - mnesia:select( - ?SHARD_TAB, - ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) -> - Leader =:= Node - end), - 1, - read - ) - of - {[_ | _], _Cont} -> - true; - _ -> - false - end. - %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl new file mode 100644 index 000000000..0fd8499ad --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -0,0 +1,47 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_replication_layer_shard). + +-export([start_link/1]). + +-behaviour(gen_server). +-export([ + init/1, + handle_call/3, + handle_cast/2, + terminate/2 +]). + +%% + +start_link(ServerId) -> + gen_server:start_link(?MODULE, ServerId, []). + +%% + +init(ServerId) -> + process_flag(trap_exit, true), + {ok, ServerId}. + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, ServerId) -> + ok = ra:stop_server(ServerId). diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 9dae8e699..ece29027d 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -50,13 +50,13 @@ t_00_smoke_open_drop(_Config) -> lists:foreach( fun(Shard) -> ?assertEqual( - {ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) + {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) ), ?assertEqual( - [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) - ), - %% Check that the leader is eleected; - ?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard)) + %% FIXME + undefined, + emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) + ) end, Shards ),