diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index fcd3e81a0..36e798259 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -36,9 +36,7 @@ update_iterator/3, next/3, delete_next/4, - node_of_shard/2, - shard_of_message/3, - maybe_set_myself_as_leader/2 + shard_of_message/3 ]). %% internal exports: @@ -160,13 +158,19 @@ open_db(DB, CreateOpts) -> -spec add_generation(emqx_ds:db()) -> ok | {error, _}. add_generation(DB) -> - Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB), - _ = emqx_ds_proto_v4:add_generation(Nodes, DB), - ok. + foreach_shard( + DB, + fun(Shard) -> ok = ra_add_generation(DB, Shard) end + ). -spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. update_db_config(DB, CreateOpts) -> - emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts). + ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts), + Opts = emqx_ds_replication_layer_meta:get_options(DB), + foreach_shard( + DB, + fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end + ). -spec list_generations_with_lifetimes(emqx_ds:db()) -> #{generation_rank() => emqx_ds:generation_info()}. @@ -174,13 +178,12 @@ list_generations_with_lifetimes(DB) -> Shards = list_shards(DB), lists:foldl( fun(Shard, GensAcc) -> - Node = node_of_shard(DB, Shard), maps:fold( fun(GenId, Data, AccInner) -> AccInner#{{Shard, GenId} => Data} end, GensAcc, - emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard) + ra_list_generations_with_lifetimes(DB, Shard) ) end, #{}, @@ -189,10 +192,7 @@ list_generations_with_lifetimes(DB) -> -spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}. drop_generation(DB, {Shard, GenId}) -> - %% TODO: drop generation in all nodes in the replica set, not only in the leader, - %% after we have proper replication in place. - Node = node_of_shard(DB, Shard), - emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId). + ra_drop_generation(DB, Shard, GenId). -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> @@ -244,8 +244,7 @@ get_delete_streams(DB, TopicFilter, StartTime) -> Shards = list_shards(DB), lists:flatmap( fun(Shard) -> - Node = node_of_shard(DB, Shard), - Streams = emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, StartTime), + Streams = ra_get_delete_streams(DB, Shard, TopicFilter, StartTime), lists:map( fun(StorageLayerStream) -> ?delete_stream(Shard, StorageLayerStream) @@ -274,12 +273,7 @@ make_iterator(DB, Stream, TopicFilter, StartTime) -> emqx_ds:make_delete_iterator_result(delete_iterator()). make_delete_iterator(DB, Stream, TopicFilter, StartTime) -> ?delete_stream(Shard, StorageStream) = Stream, - Node = node_of_shard(DB, Shard), - case - emqx_ds_proto_v4:make_delete_iterator( - Node, DB, Shard, StorageStream, TopicFilter, StartTime - ) - of + case ra_make_delete_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?DELETE_IT, ?shard => Shard, ?enc => Iter}}; Err = {error, _} -> @@ -327,8 +321,7 @@ next(DB, Iter0, BatchSize) -> emqx_ds:delete_next_result(delete_iterator()). delete_next(DB, Iter0, Selector, BatchSize) -> #{?tag := ?DELETE_IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, - Node = node_of_shard(DB, Shard), - case emqx_ds_proto_v4:delete_next(Node, DB, Shard, StorageIter0, Selector, BatchSize) of + case ra_delete_next(DB, Shard, StorageIter0, Selector, BatchSize) of {ok, StorageIter, NumDeleted} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, NumDeleted}; @@ -336,17 +329,6 @@ delete_next(DB, Iter0, Selector, BatchSize) -> Other end. --spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -node_of_shard(DB, Shard) -> - case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of - {ok, Leader} -> - Leader; - {error, no_leader_for_shard} -> - %% TODO: use optvar - timer:sleep(500), - node_of_shard(DB, Shard) - end. - -spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) -> emqx_ds_replication_layer:shard_id(). shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> @@ -358,18 +340,8 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> end, integer_to_binary(Hash). -%% TODO: there's no real leader election right now --spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok. -maybe_set_myself_as_leader(DB, Shard) -> - Site = emqx_ds_replication_layer_meta:this_site(), - case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of - [Site | _] -> - %% Currently the first in-sync replica always becomes the - %% leader - ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node()); - _Sites -> - ok - end. +foreach_shard(DB, Fun) -> + lists:foreach(Fun, list_shards(DB)). %%================================================================================ %% behavior callbacks @@ -486,7 +458,8 @@ do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> -spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}. do_add_generation_v2(DB) -> - MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB), + %% FIXME + MyShards = [], lists:foreach( fun(ShardId) -> emqx_ds_storage_layer:add_generation({DB, ShardId}) @@ -551,7 +524,7 @@ ra_start_shard(DB, Shard) -> _ -> ok end, - ignore. + emqx_ds_replication_layer_shard:start_link(LocalServer). ra_store_batch(DB, Shard, Messages) -> Command = #{ @@ -565,14 +538,49 @@ ra_store_batch(DB, Shard, Messages) -> error(Error, [DB, Shard]) end. +ra_add_generation(DB, Shard) -> + Command = #{?tag => add_generation}, + case ra:process_command(ra_leader_servers(DB, Shard), Command) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + +ra_update_config(DB, Shard, Opts) -> + Command = #{?tag => update_config, ?config => Opts}, + case ra:process_command(ra_leader_servers(DB, Shard), Command) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + +ra_drop_generation(DB, Shard, GenId) -> + Command = #{?tag => drop_generation, ?generation => GenId}, + case ra:process_command(ra_leader_servers(DB, Shard), Command) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + ra_get_streams(DB, Shard, TopicFilter, Time) -> {_Name, Node} = ra_random_replica(DB, Shard), emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time). +ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> + {_Name, Node} = ra_random_replica(DB, Shard), + emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time). + ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> {_Name, Node} = ra_random_replica(DB, Shard), emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). +ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> + {_Name, Node} = ra_random_replica(DB, Shard), + emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). + ra_update_iterator(DB, Shard, Iter, DSKey) -> {_Name, Node} = ra_random_replica(DB, Shard), emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). @@ -581,9 +589,16 @@ ra_next(DB, Shard, Iter, BatchSize) -> {_Name, Node} = ra_random_replica(DB, Shard), emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). +ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> + {_Name, Node} = ra_random_replica(DB, Shard), + emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize). + +ra_list_generations_with_lifetimes(DB, Shard) -> + {_Name, Node} = ra_random_replica(DB, Shard), + emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard). + ra_drop_shard(DB, Shard) -> - %% TODO: clean dsrepl state - ra:stop_server(_System = default, ra_local_server(DB, Shard)). + ra:force_delete_server(_System = default, ra_local_server(DB, Shard)). ra_shard_servers(DB, Shard) -> {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard), @@ -672,7 +687,28 @@ apply( NState = State#{latest := NLatest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, - {NState, Result, Effect}. + {NState, Result, Effect}; +apply( + _RaftMeta, + #{?tag := add_generation}, + State +) -> + Result = emqx_ds_storage_layer:add_generation(erlang:get(emqx_ds_db_shard)), + {State, Result}; +apply( + _RaftMeta, + #{?tag := update_config, ?config := Opts}, + State +) -> + Result = emqx_ds_storage_layer:update_config(erlang:get(emqx_ds_db_shard), Opts), + {State, Result}; +apply( + _RaftMeta, + #{?tag := drop_generation, ?generation := GenId}, + State +) -> + Result = emqx_ds_storage_layer:drop_generation(erlang:get(emqx_ds_db_shard), GenId), + {State, Result}. assign_timestamps(Latest, Messages) -> assign_timestamps(Latest, Messages, []). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 004e7e683..89d615cbb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -28,8 +28,16 @@ %% keys: -define(tag, 1). -define(shard, 2). --define(timestamp, 3). -define(enc, 3). + +%% ?BATCH -define(batch_messages, 2). +-define(timestamp, 3). + +%% update_config +-define(config, 2). + +%% drop_generation +-define(generation, 2). -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index b0ef4903f..53de5b41e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -29,20 +29,14 @@ -export([ shards/1, my_shards/1, - my_owned_shards/1, - leader_nodes/1, replica_set/2, - in_sync_replicas/2, sites/0, node/1, open_db/2, get_options/1, update_db_config/2, drop_db/1, - shard_leader/2, this_site/0, - set_leader/3, - is_leader/1, print_status/0 ]). @@ -55,9 +49,6 @@ update_db_config_trans/2, drop_db_trans/1, claim_site/2, - in_sync_replicas_trans/2, - set_leader_trans/3, - is_leader_trans/1, n_shards/1 ]). @@ -96,9 +87,6 @@ %% Sites that should contain the data when the cluster is in the %% stable state (no nodes are being added or removed from it): replica_set :: [site()], - %% Sites that contain the actual data: - in_sync_replicas :: [site()], - leader :: node() | undefined, misc = #{} :: map() }). @@ -164,27 +152,6 @@ my_shards(DB) -> lists:member(Site, ReplicaSet) end). --spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. -my_owned_shards(DB) -> - Self = node(), - filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) -> - Self =:= Leader - end). - --spec leader_nodes(emqx_ds:db()) -> [node()]. -leader_nodes(DB) -> - lists:uniq( - filter_shards( - DB, - fun(#?SHARD_TAB{leader = Leader}) -> - Leader =/= undefined - end, - fun(#?SHARD_TAB{leader = Leader}) -> - Leader - end - ) - ). - -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, [site()]} | {error, _}. replica_set(DB, Shard) -> @@ -195,17 +162,6 @@ replica_set(DB, Shard) -> {error, no_shard} end. --spec in_sync_replicas(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - [site()]. -in_sync_replicas(DB, ShardId) -> - {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:in_sync_replicas_trans/2, [DB, ShardId]), - case Result of - {ok, InSync} -> - InSync; - {error, _} -> - [] - end. - -spec sites() -> [site()]. sites() -> eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). @@ -219,27 +175,6 @@ node(Site) -> undefined end. --spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - {ok, node()} | {error, no_leader_for_shard}. -shard_leader(DB, Shard) -> - case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of - [#?SHARD_TAB{leader = Leader}] when Leader =/= undefined -> - {ok, Leader}; - _ -> - {error, no_leader_for_shard} - end. - --spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) -> - ok. -set_leader(DB, Shard, Node) -> - {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]), - ok. - --spec is_leader(node()) -> boolean(). -is_leader(Node) -> - {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]), - Result. - -spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts(). get_options(DB) -> {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]), @@ -286,7 +221,6 @@ init([]) -> init_ra(), ensure_tables(), ensure_site(), - {ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}), S = #s{}, {ok, S}. @@ -309,18 +243,6 @@ handle_call(_Call, _From, S) -> handle_cast(_Cast, S) -> {noreply, S}. -handle_info( - {mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S -) -> - MyShards = my_owned_shards(DB), - - lists:foreach( - fun(ShardId) -> - emqx_ds_storage_layer:update_config({DB, ShardId}, Options) - end, - MyShards - ), - {noreply, S}; handle_info(_Info, S) -> {noreply, S}. @@ -382,41 +304,6 @@ drop_db_trans(DB) -> claim_site(Site, Node) -> mnesia:write(#?NODE_TAB{site = Site, node = Node}). --spec in_sync_replicas_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - {ok, [site()]} | {error, no_shard}. -in_sync_replicas_trans(DB, Shard) -> - case mnesia:read(?SHARD_TAB, {DB, Shard}) of - [#?SHARD_TAB{in_sync_replicas = InSync}] -> - {ok, InSync}; - [] -> - {error, no_shard} - end. - --spec set_leader_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) -> - ok. -set_leader_trans(DB, Shard, Node) -> - [Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}), - Record = Record0#?SHARD_TAB{leader = Node}, - mnesia:write(Record). - --spec is_leader_trans(node) -> boolean(). -is_leader_trans(Node) -> - case - mnesia:select( - ?SHARD_TAB, - ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) -> - Leader =:= Node - end), - 1, - read - ) - of - {[_ | _], _Cont} -> - true; - _ -> - false - end. - %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl new file mode 100644 index 000000000..0fd8499ad --- /dev/null +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -0,0 +1,47 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_replication_layer_shard). + +-export([start_link/1]). + +-behaviour(gen_server). +-export([ + init/1, + handle_call/3, + handle_cast/2, + terminate/2 +]). + +%% + +start_link(ServerId) -> + gen_server:start_link(?MODULE, ServerId, []). + +%% + +init(ServerId) -> + process_flag(trap_exit, true), + {ok, ServerId}. + +handle_call(_Call, _From, State) -> + {reply, ignored, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +terminate(_Reason, ServerId) -> + ok = ra:stop_server(ServerId). diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 082ed2dff..0c48019e3 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -51,13 +51,8 @@ t_00_smoke_open_drop(_Config) -> lists:foreach( fun(Shard) -> ?assertEqual( - {ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) - ), - ?assertEqual( - [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) - ), - %% Check that the leader is eleected; - ?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard)) + {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) + ) end, Shards ),