wip: manage generations / db config through ra machine

This commit is contained in:
Andrew Mayorov 2024-02-01 14:07:54 +01:00
parent 46e8118e1c
commit 7fa3bbf176
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
5 changed files with 137 additions and 139 deletions

View File

@ -33,9 +33,7 @@
make_iterator/4,
update_iterator/3,
next/3,
node_of_shard/2,
shard_of_message/3,
maybe_set_myself_as_leader/2
shard_of_message/3
]).
%% internal exports:
@ -136,13 +134,19 @@ open_db(DB, CreateOpts) ->
-spec add_generation(emqx_ds:db()) -> ok | {error, _}.
add_generation(DB) ->
Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB),
_ = emqx_ds_proto_v4:add_generation(Nodes, DB),
ok.
foreach_shard(
DB,
fun(Shard) -> ok = ra_add_generation(DB, Shard) end
).
-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}.
update_db_config(DB, CreateOpts) ->
emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts).
ok = emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts),
Opts = emqx_ds_replication_layer_meta:get_options(DB),
foreach_shard(
DB,
fun(Shard) -> ok = ra_update_config(DB, Shard, Opts) end
).
-spec list_generations_with_lifetimes(emqx_ds:db()) ->
#{generation_rank() => emqx_ds:generation_info()}.
@ -150,13 +154,12 @@ list_generations_with_lifetimes(DB) ->
Shards = list_shards(DB),
lists:foldl(
fun(Shard, GensAcc) ->
Node = node_of_shard(DB, Shard),
maps:fold(
fun(GenId, Data, AccInner) ->
AccInner#{{Shard, GenId} => Data}
end,
GensAcc,
emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)
ra_list_generations_with_lifetimes(DB, Shard)
)
end,
#{},
@ -165,10 +168,7 @@ list_generations_with_lifetimes(DB) ->
-spec drop_generation(emqx_ds:db(), generation_rank()) -> ok | {error, _}.
drop_generation(DB, {Shard, GenId}) ->
%% TODO: drop generation in all nodes in the replica set, not only in the leader,
%% after we have proper replication in place.
Node = node_of_shard(DB, Shard),
emqx_ds_proto_v4:drop_generation(Node, DB, Shard, GenId).
ra_drop_generation(DB, Shard, GenId).
-spec drop_db(emqx_ds:db()) -> ok | {error, _}.
drop_db(DB) ->
@ -247,17 +247,6 @@ next(DB, Iter0, BatchSize) ->
Other
end.
-spec node_of_shard(emqx_ds:db(), shard_id()) -> node().
node_of_shard(DB, Shard) ->
case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of
{ok, Leader} ->
Leader;
{error, no_leader_for_shard} ->
%% TODO: use optvar
timer:sleep(500),
node_of_shard(DB, Shard)
end.
-spec shard_of_message(emqx_ds:db(), emqx_types:message(), clientid | topic) ->
emqx_ds_replication_layer:shard_id().
shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
@ -269,18 +258,8 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) ->
end,
integer_to_binary(Hash).
%% TODO: there's no real leader election right now
-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok.
maybe_set_myself_as_leader(DB, Shard) ->
Site = emqx_ds_replication_layer_meta:this_site(),
case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of
[Site | _] ->
%% Currently the first in-sync replica always becomes the
%% leader
ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node());
_Sites ->
ok
end.
foreach_shard(DB, Fun) ->
lists:foreach(Fun, list_shards(DB)).
%%================================================================================
%% behavior callbacks
@ -375,7 +354,8 @@ do_next_v1(DB, Shard, Iter, BatchSize) ->
-spec do_add_generation_v2(emqx_ds:db()) -> ok | {error, _}.
do_add_generation_v2(DB) ->
MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB),
%% FIXME
MyShards = [],
lists:foreach(
fun(ShardId) ->
emqx_ds_storage_layer:add_generation({DB, ShardId})
@ -433,7 +413,7 @@ ra_start_shard(DB, Shard) ->
_ ->
ok
end,
ignore.
emqx_ds_replication_layer_shard:start_link(LocalServer).
ra_store_batch(DB, Shard, Messages) ->
Command = #{
@ -447,25 +427,55 @@ ra_store_batch(DB, Shard, Messages) ->
error(Error, [DB, Shard])
end.
ra_add_generation(DB, Shard) ->
Command = #{?tag => add_generation},
case ra:process_command(ra_leader_servers(DB, Shard), Command) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_update_config(DB, Shard, Opts) ->
Command = #{?tag => update_config, ?config => Opts},
case ra:process_command(ra_leader_servers(DB, Shard), Command) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_drop_generation(DB, Shard, GenId) ->
Command = #{?tag => drop_generation, ?generation => GenId},
case ra:process_command(ra_leader_servers(DB, Shard), Command) of
{ok, Result, _Leader} ->
Result;
Error ->
error(Error, [DB, Shard])
end.
ra_get_streams(DB, Shard, TopicFilter, Time) ->
{_Name, Node} = ra_random_replica(DB, Shard),
emqx_ds_proto_v3:get_streams(Node, DB, Shard, TopicFilter, Time).
emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time).
ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) ->
{_Name, Node} = ra_random_replica(DB, Shard),
emqx_ds_proto_v3:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime).
ra_update_iterator(DB, Shard, Iter, DSKey) ->
{_Name, Node} = ra_random_replica(DB, Shard),
emqx_ds_proto_v3:update_iterator(Node, DB, Shard, Iter, DSKey).
emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey).
ra_next(DB, Shard, Iter, BatchSize) ->
{_Name, Node} = ra_random_replica(DB, Shard),
emqx_ds_proto_v3:next(Node, DB, Shard, Iter, BatchSize).
emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize).
ra_list_generations_with_lifetimes(DB, Shard) ->
{_Name, Node} = ra_random_replica(DB, Shard),
emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard).
ra_drop_shard(DB, Shard) ->
%% TODO: clean dsrepl state
ra:stop_server(_System = default, ra_local_server(DB, Shard)).
ra:force_delete_server(_System = default, ra_local_server(DB, Shard)).
ra_shard_servers(DB, Shard) ->
{ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard),
@ -554,7 +564,28 @@ apply(
NState = State#{latest := NLatest},
%% TODO: Need to measure effects of changing frequency of `release_cursor`.
Effect = {release_cursor, RaftIdx, NState},
{NState, Result, Effect}.
{NState, Result, Effect};
apply(
_RaftMeta,
#{?tag := add_generation},
State
) ->
Result = emqx_ds_storage_layer:add_generation(erlang:get(emqx_ds_db_shard)),
{State, Result};
apply(
_RaftMeta,
#{?tag := update_config, ?config := Opts},
State
) ->
Result = emqx_ds_storage_layer:update_config(erlang:get(emqx_ds_db_shard), Opts),
{State, Result};
apply(
_RaftMeta,
#{?tag := drop_generation, ?generation := GenId},
State
) ->
Result = emqx_ds_storage_layer:drop_generation(erlang:get(emqx_ds_db_shard), GenId),
{State, Result}.
assign_timestamps(Latest, Messages) ->
assign_timestamps(Latest, Messages, []).

View File

@ -27,8 +27,16 @@
%% keys:
-define(tag, 1).
-define(shard, 2).
-define(timestamp, 3).
-define(enc, 3).
%% ?BATCH
-define(batch_messages, 2).
-define(timestamp, 3).
%% update_config
-define(config, 2).
%% drop_generation
-define(generation, 2).
-endif.

View File

@ -29,8 +29,6 @@
-export([
shards/1,
my_shards/1,
my_owned_shards/1,
leader_nodes/1,
replica_set/2,
in_sync_replicas/2,
sites/0,
@ -39,10 +37,7 @@
get_options/1,
update_db_config/2,
drop_db/1,
shard_leader/2,
this_site/0,
set_leader/3,
is_leader/1,
print_status/0
]).
@ -56,8 +51,6 @@
drop_db_trans/1,
claim_site/2,
in_sync_replicas_trans/2,
set_leader_trans/3,
is_leader_trans/1,
n_shards/1
]).
@ -98,7 +91,6 @@
replica_set :: [site()],
%% Sites that contain the actual data:
in_sync_replicas :: [site()],
leader :: node() | undefined,
misc = #{} :: map()
}).
@ -164,27 +156,6 @@ my_shards(DB) ->
lists:member(Site, ReplicaSet)
end).
-spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()].
my_owned_shards(DB) ->
Self = node(),
filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) ->
Self =:= Leader
end).
-spec leader_nodes(emqx_ds:db()) -> [node()].
leader_nodes(DB) ->
lists:uniq(
filter_shards(
DB,
fun(#?SHARD_TAB{leader = Leader}) ->
Leader =/= undefined
end,
fun(#?SHARD_TAB{leader = Leader}) ->
Leader
end
)
).
-spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
{ok, [site()]} | {error, _}.
replica_set(DB, Shard) ->
@ -219,27 +190,6 @@ node(Site) ->
undefined
end.
-spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) ->
{ok, node()} | {error, no_leader_for_shard}.
shard_leader(DB, Shard) ->
case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of
[#?SHARD_TAB{leader = Leader}] when Leader =/= undefined ->
{ok, Leader};
_ ->
{error, no_leader_for_shard}
end.
-spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
ok.
set_leader(DB, Shard, Node) ->
{atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]),
ok.
-spec is_leader(node()) -> boolean().
is_leader(Node) ->
{atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]),
Result.
-spec get_options(emqx_ds:db()) -> emqx_ds_replication_layer:builtin_db_opts().
get_options(DB) ->
{atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, undefined]),
@ -286,7 +236,6 @@ init([]) ->
init_ra(),
ensure_tables(),
ensure_site(),
{ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}),
S = #s{},
{ok, S}.
@ -309,18 +258,6 @@ handle_call(_Call, _From, S) ->
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(
{mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S
) ->
MyShards = my_owned_shards(DB),
lists:foreach(
fun(ShardId) ->
emqx_ds_storage_layer:update_config({DB, ShardId}, Options)
end,
MyShards
),
{noreply, S};
handle_info(_Info, S) ->
{noreply, S}.
@ -392,31 +329,6 @@ in_sync_replicas_trans(DB, Shard) ->
{error, no_shard}
end.
-spec set_leader_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) ->
ok.
set_leader_trans(DB, Shard, Node) ->
[Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}),
Record = Record0#?SHARD_TAB{leader = Node},
mnesia:write(Record).
-spec is_leader_trans(node) -> boolean().
is_leader_trans(Node) ->
case
mnesia:select(
?SHARD_TAB,
ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) ->
Leader =:= Node
end),
1,
read
)
of
{[_ | _], _Cont} ->
true;
_ ->
false
end.
%%================================================================================
%% Internal functions
%%================================================================================

View File

@ -0,0 +1,47 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ds_replication_layer_shard).
-export([start_link/1]).
-behaviour(gen_server).
-export([
init/1,
handle_call/3,
handle_cast/2,
terminate/2
]).
%%
start_link(ServerId) ->
gen_server:start_link(?MODULE, ServerId, []).
%%
init(ServerId) ->
process_flag(trap_exit, true),
{ok, ServerId}.
handle_call(_Call, _From, State) ->
{reply, ignored, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
terminate(_Reason, ServerId) ->
ok = ra:stop_server(ServerId).

View File

@ -50,13 +50,13 @@ t_00_smoke_open_drop(_Config) ->
lists:foreach(
fun(Shard) ->
?assertEqual(
{ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
{ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard)
),
?assertEqual(
[Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
),
%% Check that the leader is eleected;
?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard))
%% FIXME
undefined,
emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard)
)
end,
Shards
),