From 146f082fdc130f0e65312e73194622ec2ef7e551 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 19 Jan 2024 23:18:19 +0100 Subject: [PATCH] feat(dsrepl): implement raft-based replication Still very rough but mostly working. --- apps/emqx/src/emqx_ds_schema.erl | 2 +- apps/emqx/src/emqx_message.erl | 5 + apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- .../test/emqx_persistent_messages_SUITE.erl | 4 - apps/emqx_durable_storage/src/emqx_ds.erl | 11 +- .../src/emqx_ds_builtin_db_sup.erl | 22 ++- .../src/emqx_ds_replication_layer.erl | 186 +++++++++++++++++- .../src/emqx_ds_replication_layer.hrl | 1 + .../src/emqx_ds_replication_layer_egress.erl | 21 +- .../src/emqx_ds_replication_layer_meta.erl | 83 +++++--- .../src/emqx_ds_storage_bitfield_lts.erl | 18 +- .../src/emqx_durable_storage.app.src | 2 +- rebar.config | 3 +- 13 files changed, 275 insertions(+), 85 deletions(-) diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index 5c552404e..64551be13 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -201,7 +201,7 @@ fields(layout_builtin_wildcard_optimized) -> sc( range(0, 64), #{ - default => 10, + default => 20, importance => ?IMPORTANCE_HIDDEN, desc => ?DESC(wildcard_optimized_epoch_bits) } diff --git a/apps/emqx/src/emqx_message.erl b/apps/emqx/src/emqx_message.erl index 8ec31f479..7e899fe2d 100644 --- a/apps/emqx/src/emqx_message.erl +++ b/apps/emqx/src/emqx_message.erl @@ -66,6 +66,7 @@ -export([ is_expired/2, + set_timestamp/2, update_expiry/1, timestamp_now/0 ]). @@ -288,6 +289,10 @@ is_expired(#message{timestamp = CreatedAt}, Zone) -> Interval -> elapsed(CreatedAt) > Interval end. +-spec set_timestamp(integer(), emqx_types:message()) -> emqx_types:message(). +set_timestamp(Timestamp, Msg) -> + Msg#message{timestamp = Timestamp}. + -spec update_expiry(emqx_types:message()) -> emqx_types:message(). update_expiry( Msg = #message{ diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 8cf3cb284..ac374b8a9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -323,7 +323,7 @@ subscribe( ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID), {SubId, S1} = emqx_persistent_session_ds_state:new_id(S0), Subscription = #{ - start_time => now_ms(), + start_time => emqx_ds:timestamp_us(), props => SubOpts, id => SubId, deleted => false diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 14989bdd8..5c9637e2e 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -33,10 +33,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - %% avoid inter-suite flakiness... - %% TODO: remove after other suites start to use `emx_cth_suite' - application:stop(emqx), - application:stop(emqx_durable_storage), Config. end_per_suite(_Config) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 4143c9ffd..a470d7281 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -43,6 +43,7 @@ %% Misc. API: -export([count/1]). +-export([timestamp_us/0]). -export_type([ create_db_opts/0, @@ -147,9 +148,11 @@ -type error(Reason) :: {error, recoverable | unrecoverable, Reason}. %% Timestamp +%% Each message must have unique timestamp. %% Earliest possible timestamp is 0. -%% TODO granularity? Currently, we should always use milliseconds, as that's the unit we -%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. +%% Granularity: microsecond. +%% TODO: Currently, we should always use milliseconds, as that's the unit we +%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). -type message_store_opts() :: @@ -394,6 +397,10 @@ count(DB) -> %% Internal exports %%================================================================================ +-spec timestamp_us() -> time(). +timestamp_us() -> + erlang:system_time(microsecond). + %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 68aa0ee90..ce13d0ea5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -87,14 +87,6 @@ init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), _ = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), - %% TODO: before the leader election is implemented, we set ourselves as the leader for all shards: - MyShards = emqx_ds_replication_layer_meta:my_shards(DB), - lists:foreach( - fun(Shard) -> - emqx_ds_replication_layer:maybe_set_myself_as_leader(DB, Shard) - end, - MyShards - ), Children = [sup_spec(#?shard_sup{db = DB}, []), sup_spec(#?egress_sup{db = DB}, [])], SupFlags = #{ strategy => one_for_all, @@ -106,7 +98,11 @@ init({#?shard_sup{db = DB}, _}) -> %% Spec for the supervisor that manages the worker processes for %% each local shard of the DB: MyShards = emqx_ds_replication_layer_meta:my_shards(DB), - Children = [shard_spec(DB, Shard) || Shard <- MyShards], + Children = [ + Child + || Shard <- MyShards, + Child <- [shard_spec(DB, Shard), shard_replication_spec(DB, Shard)] + ], SupFlags = #{ strategy => one_for_one, intensity => 10, @@ -154,6 +150,14 @@ shard_spec(DB, Shard) -> type => worker }. +shard_replication_spec(DB, Shard) -> + #{ + id => {Shard, replication}, + start => {emqx_ds_replication_layer, ra_start_shard, [DB, Shard]}, + restart => transient, + type => worker + }. + egress_spec(DB, Shard) -> #{ id => Shard, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 9135819f9..badad7fc8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -56,7 +56,16 @@ do_drop_generation_v3/3, do_get_delete_streams_v4/4, do_make_delete_iterator_v4/5, - do_delete_next_v4/5 + do_delete_next_v4/5, + + %% FIXME + ra_start_shard/2, + ra_store_batch/3 +]). + +-export([ + init/1, + apply/3 ]). -export_type([ @@ -208,10 +217,9 @@ get_streams(DB, TopicFilter, StartTime) -> Shards = list_shards(DB), lists:flatmap( fun(Shard) -> - Node = node_of_shard(DB, Shard), Streams = try - emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, StartTime) + ra_get_streams(DB, Shard, TopicFilter, StartTime) catch error:{erpc, _} -> %% TODO: log? @@ -251,8 +259,7 @@ get_delete_streams(DB, TopicFilter, StartTime) -> emqx_ds:make_iterator_result(iterator()). make_iterator(DB, Stream, TopicFilter, StartTime) -> ?stream_v2(Shard, StorageStream) = Stream, - Node = node_of_shard(DB, Shard), - try emqx_ds_proto_v4:make_iterator(Node, DB, Shard, StorageStream, TopicFilter, StartTime) of + try ra_make_iterator(DB, Shard, StorageStream, TopicFilter, StartTime) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Error = {error, _, _} -> @@ -282,8 +289,7 @@ make_delete_iterator(DB, Stream, TopicFilter, StartTime) -> emqx_ds:make_iterator_result(iterator()). update_iterator(DB, OldIter, DSKey) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter} = OldIter, - Node = node_of_shard(DB, Shard), - try emqx_ds_proto_v4:update_iterator(Node, DB, Shard, StorageIter, DSKey) of + try ra_update_iterator(DB, Shard, StorageIter, DSKey) of {ok, Iter} -> {ok, #{?tag => ?IT, ?shard => Shard, ?enc => Iter}}; Error = {error, _, _} -> @@ -296,7 +302,6 @@ update_iterator(DB, OldIter, DSKey) -> -spec next(emqx_ds:db(), iterator(), pos_integer()) -> emqx_ds:next_result(iterator()). next(DB, Iter0, BatchSize) -> #{?tag := ?IT, ?shard := Shard, ?enc := StorageIter0} = Iter0, - Node = node_of_shard(DB, Shard), %% TODO: iterator can contain information that is useful for %% reconstructing messages sent over the network. For example, %% when we send messages with the learned topic index, we could @@ -305,7 +310,7 @@ next(DB, Iter0, BatchSize) -> %% %% This kind of trickery should be probably done here in the %% replication layer. Or, perhaps, in the logic layer. - case emqx_ds_proto_v4:next(Node, DB, Shard, StorageIter0, BatchSize) of + case ra_next(DB, Shard, StorageIter0, BatchSize) of {ok, StorageIter, Batch} -> Iter = Iter0#{?enc := StorageIter}, {ok, Iter, Batch}; @@ -379,7 +384,8 @@ do_drop_db_v1(DB) -> emqx_ds_builtin_sup:stop_db(DB), lists:foreach( fun(Shard) -> - emqx_ds_storage_layer:drop_shard({DB, Shard}) + emqx_ds_storage_layer:drop_shard({DB, Shard}), + ra_drop_shard(DB, Shard) end, MyShards ). @@ -510,3 +516,163 @@ do_get_delete_streams_v4(DB, Shard, TopicFilter, StartTime) -> list_nodes() -> mria:running_nodes(). + +%% + +ra_start_shard(DB, Shard) -> + System = default, + Site = emqx_ds_replication_layer_meta:this_site(), + ClusterName = ra_cluster_name(DB, Shard), + LocalServer = ra_local_server(DB, Shard), + Servers = ra_shard_servers(DB, Shard), + case ra:restart_server(System, LocalServer) of + ok -> + ok; + {error, name_not_registered} -> + ok = ra:start_server(System, #{ + id => LocalServer, + uid => <>, + cluster_name => ClusterName, + initial_members => Servers, + machine => {module, ?MODULE, #{db => DB, shard => Shard}}, + log_init_args => #{} + }) + end, + case Servers of + [LocalServer | _] -> + %% TODO + %% Not super robust, but we probably don't expect nodes to be down + %% when we bring up a fresh consensus group. Triggering election + %% is not really required otherwise. + %% TODO + %% Ensure that doing that on node restart does not disrupt consensus. + ok = ra:trigger_election(LocalServer); + _ -> + ok + end, + ignore. + +ra_store_batch(DB, Shard, Messages) -> + Command = #{ + ?tag => ?BATCH, + ?batch_messages => Messages, + ?timestamp => emqx_ds:timestamp_us() + }, + case ra:process_command(ra_leader_servers(DB, Shard), Command) of + {ok, Result, _Leader} -> + Result; + Error -> + error(Error, [DB, Shard]) + end. + +ra_get_streams(DB, Shard, TopicFilter, Time) -> + {_Name, Node} = ra_random_replica(DB, Shard), + emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, Time). + +ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> + {_Name, Node} = ra_random_replica(DB, Shard), + emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime). + +ra_update_iterator(DB, Shard, Iter, DSKey) -> + {_Name, Node} = ra_random_replica(DB, Shard), + emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey). + +ra_next(DB, Shard, Iter, BatchSize) -> + {_Name, Node} = ra_random_replica(DB, Shard), + emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize). + +ra_drop_shard(DB, Shard) -> + %% TODO: clean dsrepl state + ra:stop_server(_System = default, ra_local_server(DB, Shard)). + +ra_shard_servers(DB, Shard) -> + {ok, ReplicaSet} = emqx_ds_replication_layer_meta:replica_set(DB, Shard), + [ + {ra_server_name(DB, Shard, Site), emqx_ds_replication_layer_meta:node(Site)} + || Site <- ReplicaSet + ]. + +ra_local_server(DB, Shard) -> + Site = emqx_ds_replication_layer_meta:this_site(), + {ra_server_name(DB, Shard, Site), node()}. + +ra_leader_servers(DB, Shard) -> + %% NOTE: Contact last known leader first, then rest of shard servers. + ClusterName = ra_cluster_name(DB, Shard), + case ra_leaderboard:lookup_leader(ClusterName) of + Leader when Leader /= undefined -> + Servers = ra_leaderboard:lookup_members(ClusterName), + [Leader | lists:delete(Leader, Servers)]; + undefined -> + %% TODO: Dynamic membership. + ra_shard_servers(DB, Shard) + end. + +ra_random_replica(DB, Shard) -> + %% NOTE: Contact random replica that is not a known leader. + %% TODO: Replica may be down, so we may need to retry. + ClusterName = ra_cluster_name(DB, Shard), + case ra_leaderboard:lookup_members(ClusterName) of + Servers when is_list(Servers) -> + Leader = ra_leaderboard:lookup_leader(ClusterName), + ra_pick_replica(Servers, Leader); + undefined -> + %% TODO + %% Leader is unkonwn if there are no servers of this group on the + %% local node. We want to pick a replica in that case as well. + %% TODO: Dynamic membership. + ra_pick_server(ra_shard_servers(DB, Shard)) + end. + +ra_pick_replica(Servers, Leader) -> + case lists:delete(Leader, Servers) of + [] -> + Leader; + Followers -> + ra_pick_server(Followers) + end. + +ra_pick_server(Servers) -> + lists:nth(rand:uniform(length(Servers)), Servers). + +ra_cluster_name(DB, Shard) -> + iolist_to_binary(io_lib:format("~s_~s", [DB, Shard])). + +ra_server_name(DB, Shard, Site) -> + DBBin = atom_to_binary(DB), + binary_to_atom(<<"ds_", DBBin/binary, Shard/binary, "_", Site/binary>>). + +%% + +init(#{db := DB, shard := Shard}) -> + _ = erlang:put(emqx_ds_db_shard, {DB, Shard}), + #{latest => 0}. + +apply( + #{index := RaftIdx}, + #{ + ?tag := ?BATCH, + ?batch_messages := MessagesIn, + ?timestamp := TimestampLocal + }, + #{latest := Latest} = State +) -> + %% NOTE + %% Unique timestamp tracking real time closely. + %% With microsecond granularity it should be nearly impossible for it to run + %% too far ahead than the real time clock. + Timestamp = max(Latest + 1, TimestampLocal), + Messages = assign_timestamps(Timestamp, MessagesIn), + Result = emqx_ds_storage_layer:store_batch(erlang:get(emqx_ds_db_shard), Messages, #{}), + %% NOTE: Last assigned timestamp. + NLatest = Timestamp + length(Messages) - 1, + NState = State#{latest := NLatest}, + %% TODO: Need to measure effects of changing frequency of `release_cursor`. + Effect = {release_cursor, RaftIdx, NState}, + {NState, Result, Effect}. + +assign_timestamps(Timestamp, [MessageIn | Rest]) -> + Message = emqx_message:set_timestamp(Timestamp, MessageIn), + [Message | assign_timestamps(Timestamp + 1, Rest)]; +assign_timestamps(_Timestamp, []) -> + []. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index b3a57d442..004e7e683 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -28,6 +28,7 @@ %% keys: -define(tag, 1). -define(shard, 2). +-define(timestamp, 3). -define(enc, 3). -define(batch_messages, 2). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 8b1a9a835..515bae1b6 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -40,7 +40,6 @@ -export_type([]). --include("emqx_ds_replication_layer.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). %%================================================================================ @@ -109,7 +108,6 @@ store_batch(DB, Messages, Opts) -> -record(s, { db :: emqx_ds:db(), shard :: emqx_ds_replication_layer:shard_id(), - leader :: node(), n = 0 :: non_neg_integer(), tref :: reference(), batch = [] :: [emqx_types:message()], @@ -119,12 +117,9 @@ store_batch(DB, Messages, Opts) -> init([DB, Shard]) -> process_flag(trap_exit, true), process_flag(message_queue_data, off_heap), - %% TODO: adjust leader dynamically - Leader = shard_leader(DB, Shard), S = #s{ db = DB, shard = Shard, - leader = Leader, tref = start_timer() }, {ok, S}. @@ -159,10 +154,10 @@ terminate(_Reason, _S) -> do_flush(S = #s{batch = []}) -> S#s{tref = start_timer()}; do_flush( - S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard, leader = Leader} + S = #s{batch = Messages, pending_replies = Replies, db = DB, shard = Shard} ) -> - Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)}, - ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}), + %% FIXME + ok = emqx_ds_replication_layer:ra_store_batch(DB, Shard, lists:reverse(Messages)), [gen_server:reply(From, ok) || From <- lists:reverse(Replies)], ?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}), erlang:garbage_collect(), @@ -212,13 +207,3 @@ do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies start_timer() -> Interval = application:get_env(emqx_durable_storage, egress_flush_interval, 100), erlang:send_after(Interval, self(), ?flush). - -shard_leader(DB, Shard) -> - %% TODO: use optvar - case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of - {ok, Leader} -> - Leader; - {error, no_leader_for_shard} -> - timer:sleep(500), - shard_leader(DB, Shard) - end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 597c8bc0d..6b157ad6b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -34,6 +34,7 @@ replica_set/2, in_sync_replicas/2, sites/0, + node/1, open_db/2, get_options/1, update_db_config/2, @@ -113,7 +114,7 @@ -spec print_status() -> ok. print_status() -> - io:format("THIS SITE:~n~s~n", [base64:encode(this_site())]), + io:format("THIS SITE:~n~s~n", [this_site()]), io:format("~nSITES:~n", []), Nodes = [node() | nodes()], lists:foreach( @@ -123,28 +124,18 @@ print_status() -> true -> up; false -> down end, - io:format("~s ~p ~p~n", [base64:encode(Site), Node, Status]) + io:format("~s ~p ~p~n", [Site, Node, Status]) end, eval_qlc(mnesia:table(?NODE_TAB)) ), io:format( - "~nSHARDS:~nId Leader Status~n", [] + "~nSHARDS:~nId Replicas~n", [] ), lists:foreach( - fun(#?SHARD_TAB{shard = {DB, Shard}, leader = Leader}) -> + fun(#?SHARD_TAB{shard = {DB, Shard}, replica_set = RS}) -> ShardStr = string:pad(io_lib:format("~p/~s", [DB, Shard]), 30), - LeaderStr = string:pad(atom_to_list(Leader), 33), - Status = - case lists:member(Leader, Nodes) of - true -> - case node() of - Leader -> "up *"; - _ -> "up" - end; - false -> - "down" - end, - io:format("~s ~s ~s~n", [ShardStr, LeaderStr, Status]) + ReplicasStr = string:pad(io_lib:format("~p", [RS]), 40), + io:format("~s ~s~n", [ShardStr, ReplicasStr]) end, eval_qlc(mnesia:table(?SHARD_TAB)) ). @@ -169,8 +160,8 @@ shards(DB) -> -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. my_shards(DB) -> Site = this_site(), - filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet, in_sync_replicas = InSync}) -> - lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync) + filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet}) -> + lists:member(Site, ReplicaSet) end). -spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. @@ -219,6 +210,15 @@ in_sync_replicas(DB, ShardId) -> sites() -> eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). +-spec node(site()) -> node() | undefined. +node(Site) -> + case mnesia:dirty_read(?NODE_TAB, Site) of + [#?NODE_TAB{node = Node}] -> + Node; + [] -> + undefined + end. + -spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, node()} | {error, no_leader_for_shard}. shard_leader(DB, Shard) -> @@ -248,8 +248,17 @@ get_options(DB) -> -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). open_db(DB, DefaultOpts) -> - {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]), - Opts. + case mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]) of + {atomic, Opts} -> + Opts; + {aborted, {siteless_nodes, Nodes}} -> + %% TODO + %% This is ugly. We need a good story of how to fairly allocate shards in a + %% fresh cluster. + logger:notice("Aborting shard allocation, siteless nodes found: ~p", [Nodes]), + ok = timer:sleep(1000), + open_db(DB, DefaultOpts) + end. -spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> ok | {error, _}. @@ -273,12 +282,26 @@ drop_db(DB) -> init([]) -> process_flag(trap_exit, true), logger:set_process_metadata(#{domain => [ds, meta]}), + init_ra(), ensure_tables(), ensure_site(), {ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}), S = #s{}, {ok, S}. +init_ra() -> + DataDir = filename:join([emqx:data_dir(), "dsrepl"]), + Config = maps:merge(ra_system:default_config(), #{ + data_dir => DataDir, + wal_data_dir => DataDir + }), + case ra_system:start(Config) of + {ok, _System} -> + ok; + {error, {already_started, _System}} -> + ok + end. + handle_call(_Call, _From, S) -> {reply, {error, unknown_call}, S}. @@ -431,8 +454,8 @@ ensure_site() -> {ok, [Site]} -> ok; _ -> - Site = crypto:strong_rand_bytes(8), - logger:notice("Creating a new site with ID=~s", [base64:encode(Site)]), + Site = binary:encode_hex(crypto:strong_rand_bytes(4)), + logger:notice("Creating a new site with ID=~s", [Site]), ok = filelib:ensure_dir(Filename), {ok, FD} = file:open(Filename, [write]), io:format(FD, "~p.", [Site]), @@ -445,17 +468,23 @@ ensure_site() -> -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. create_shards(DB, NShards, ReplicationFactor) -> Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], - AllSites = sites(), + AllSites = mnesia:match_object(?NODE_TAB, #?NODE_TAB{_ = '_'}, read), + Nodes = mria_mnesia:running_nodes(), + case Nodes -- [N || #?NODE_TAB{node = N} <- AllSites] of + [] -> + ok; + NodesSiteless -> + mnesia:abort({siteless_nodes, NodesSiteless}) + end, lists:foreach( fun(Shard) -> - Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites], + Hashes0 = [{hash(Shard, Site), Site} || #?NODE_TAB{site = Site} <- AllSites], Hashes = lists:sort(Hashes0), {_, Sites} = lists:unzip(Hashes), - [First | ReplicaSet] = lists:sublist(Sites, 1, ReplicationFactor), + ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), Record = #?SHARD_TAB{ shard = {DB, Shard}, - replica_set = ReplicaSet, - in_sync_replicas = [First] + replica_set = ReplicaSet }, mnesia:write(Record) end, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 64984e1d8..59df2bbdc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -160,8 +160,8 @@ create(_ShardId, DBHandle, GenId, Options) -> %% Get options: BitsPerTopicLevel = maps:get(bits_per_wildcard_level, Options, 64), TopicIndexBytes = maps:get(topic_index_bytes, Options, 4), - %% 10 bits -> 1024 ms -> ~1 sec - TSOffsetBits = maps:get(epoch_bits, Options, 10), + %% 20 bits -> 1048576 us -> ~1 sec + TSOffsetBits = maps:get(epoch_bits, Options, 20), %% Create column families: DataCFName = data_cf(GenId), TrieCFName = trie_cf(GenId), @@ -345,7 +345,7 @@ next(_Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. - Now = emqx_message:timestamp_now(), + Now = emqx_ds:timestamp_us(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, next_until(Schema, It, SafeCutoffTime, BatchSize). @@ -436,9 +436,7 @@ prepare_loop_context(DB, CF, TopicIndex, StartTime, SafeCutoffTime, Varying, Key %% Make filter: Inequations = [ {'=', TopicIndex}, - {StartTime, '..', SafeCutoffTime - 1}, - %% Unique integer: - any + {StartTime, '..', SafeCutoffTime - 1} %% Varying topic levels: | lists:map( fun @@ -666,11 +664,10 @@ make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestam ]) -> binary(). make_key(KeyMapper, TopicIndex, Timestamp, Varying) -> - UniqueInteger = erlang:unique_integer([monotonic, positive]), emqx_ds_bitmask_keymapper:key_to_bitstring( KeyMapper, emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [ - TopicIndex, Timestamp, UniqueInteger | Varying + TopicIndex, Timestamp | Varying ]) ). @@ -726,10 +723,9 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> %% Dimension Offset Bitsize [{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index {2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch - [{3 + I, 0, BitsPerTopicLevel } %% Varying topic levels + [{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels || I <- lists:seq(1, N)] ++ - [{2, 0, TSOffsetBits }, %% Timestamp offset - {3, 0, 64 }], %% Unique integer + [{2, 0, TSOffsetBits }], %% Timestamp offset Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)), %% Assert: case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index 2d9d932c0..191ed9a2d 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -5,7 +5,7 @@ {vsn, "0.1.12"}, {modules, []}, {registered, []}, - {applications, [kernel, stdlib, rocksdb, gproc, mria, emqx_utils]}, + {applications, [kernel, stdlib, rocksdb, gproc, mria, ra, emqx_utils]}, {mod, {emqx_ds_app, []}}, {env, []} ]}. diff --git a/rebar.config b/rebar.config index 126ce9532..a38aa85b6 100644 --- a/rebar.config +++ b/rebar.config @@ -110,7 +110,8 @@ {uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}}, {ssl_verify_fun, "1.1.7"}, {rfc3339, {git, "https://github.com/emqx/rfc3339.git", {tag, "0.2.3"}}}, - {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}} + {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.2"}}}, + {ra, "2.7.3"} ]}. {xref_ignores,