diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 34abba2cc..1e3ac69c8 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -443,7 +443,7 @@ create_tables() -> ok = emqx_ds:open_db(?PERSISTENT_MESSAGE_DB, #{ backend => builtin, storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 255, + n_shards => 16, replication_factor => 3 }), ok = mria:create_table( diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index ef34d3e8f..1e6db805b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -341,6 +341,22 @@ fields("persistent_session_store") -> importance => ?IMPORTANCE_HIDDEN } )}, + {"n_shards", + sc( + pos_integer(), + #{ + default => 16, + importance => ?IMPORTANCE_HIDDEN + } + )}, + {"replication_factor", + sc( + pos_integer(), + #{ + default => 3, + importance => ?IMPORTANCE_HIDDEN + } + )}, {"on_disc", sc( boolean(), diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 0528a0a2c..858855b6f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -7,5 +7,4 @@ -export([start/2]). start(_Type, _Args) -> - emqx_ds_replication_layer_meta:init(), emqx_ds_sup:start_link(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index f359846eb..50331a378 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -32,8 +32,7 @@ %% internal exports: -export([ - do_open_shard_v1/3, - do_drop_shard_v1/2, + do_drop_db_v1/1, do_store_batch_v1/4, do_get_streams_v1/4, do_make_iterator_v1/5, @@ -42,6 +41,8 @@ -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0]). +-include_lib("emqx_utils/include/emqx_message.hrl"). + %%================================================================================ %% Type declarations %%================================================================================ @@ -95,40 +96,34 @@ %%================================================================================ -spec list_shards(emqx_ds:db()) -> [shard_id()]. -list_shards(_DB) -> - %% TODO: milestone 5 - lists:map(fun atom_to_binary/1, list_nodes()). +list_shards(DB) -> + emqx_ds_replication_layer_meta:shards(DB). -spec open_db(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. open_db(DB, CreateOpts) -> - %% TODO: improve error reporting, don't just crash Opts = emqx_ds_replication_layer_meta:open_db(DB, CreateOpts), + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), lists:foreach( fun(Shard) -> - Node = node_of_shard(DB, Shard), - ok = emqx_ds_proto_v1:open_shard(Node, DB, Shard, Opts) + emqx_ds_storage_layer:open_shard({DB, Shard}, Opts), + maybe_set_myself_as_leader(DB, Shard) end, - list_shards(DB) + MyShards ). -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> + Nodes = list_nodes(), + _ = emqx_ds_proto_v1:drop_db(Nodes, DB), _ = emqx_ds_replication_layer_meta:drop_db(DB), - lists:foreach( - fun(Shard) -> - Node = node_of_shard(DB, Shard), - ok = emqx_ds_proto_v1:drop_shard(Node, DB, Shard) - end, - list_shards(DB) - ). + ok. --spec store_batch(emqx_ds:db(), [emqx_types:message()], emqx_ds:message_store_opts()) -> +-spec store_batch(emqx_ds:db(), [emqx_types:message(), ...], emqx_ds:message_store_opts()) -> emqx_ds:store_batch_result(). -store_batch(DB, Batch, Opts) -> - %% TODO: Currently we store messages locally. - Shard = atom_to_binary(node()), +store_batch(DB, Messages, Opts) -> + Shard = shard_of_messages(DB, Messages), Node = node_of_shard(DB, Shard), - emqx_ds_proto_v1:store_batch(Node, DB, Shard, Batch, Opts). + emqx_ds_proto_v1:store_batch(Node, DB, Shard, Messages, Opts). -spec get_streams(emqx_ds:db(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{emqx_ds:stream_rank(), stream()}]. @@ -194,16 +189,15 @@ next(DB, Iter0, BatchSize) -> %% Internal exports (RPC targets) %%================================================================================ --spec do_open_shard_v1( - emqx_ds:db(), emqx_ds_replication_layer:shard_id(), emqx_ds:create_db_opts() -) -> - ok | {error, _}. -do_open_shard_v1(DB, Shard, Opts) -> - emqx_ds_storage_layer:open_shard({DB, Shard}, Opts). - --spec do_drop_shard_v1(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> ok | {error, _}. -do_drop_shard_v1(DB, Shard) -> - emqx_ds_storage_layer:drop_shard({DB, Shard}). +-spec do_drop_db_v1(emqx_ds:db()) -> ok | {error, _}. +do_drop_db_v1(DB) -> + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), + lists:foreach( + fun(Shard) -> + emqx_ds_storage_layer:drop_shard({DB, Shard}) + end, + MyShards + ). -spec do_store_batch_v1( emqx_ds:db(), @@ -247,9 +241,34 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> %% Internal functions %%================================================================================ +%% TODO: there's no real leader election right now +-spec maybe_set_myself_as_leader(emqx_ds:db(), shard_id()) -> ok. +maybe_set_myself_as_leader(DB, Shard) -> + Site = emqx_ds_replication_layer_meta:this_site(), + case emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) of + [Site | _] -> + %% Currently the first in-sync replica always becomes the + %% leader + ok = emqx_ds_replication_layer_meta:set_leader(DB, Shard, node()); + _Sites -> + ok + end. + -spec node_of_shard(emqx_ds:db(), shard_id()) -> node(). -node_of_shard(_DB, Shard) -> - binary_to_atom(Shard). +node_of_shard(DB, Shard) -> + case emqx_ds_replication_layer_meta:shard_leader(DB, Shard) of + {ok, Leader} -> + Leader; + {error, no_leader_for_shard} -> + %% TODO: use optvar + timer:sleep(500), + node_of_shard(DB, Shard) + end. + +%% Here we assume that all messages in the batch come from the same client +shard_of_messages(DB, [#message{from = From} | _]) -> + N = emqx_ds_replication_layer_meta:n_shards(DB), + integer_to_binary(erlang:phash2(From, N)). list_nodes() -> mria:running_nodes(). diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index 0bfa2a3ee..0f250022d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -21,11 +21,34 @@ %% implementation details from this module. -module(emqx_ds_replication_layer_meta). +-behaviour(gen_server). + %% API: --export([init/0, shards/1, replica_set/2, sites/0, open_db/2, drop_db/1]). +-export([ + shards/1, + my_shards/1, + replica_set/2, + in_sync_replicas/2, + sites/0, + open_db/2, + drop_db/1, + shard_leader/2, + this_site/0, + set_leader/3 +]). + +%% gen_server +-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: --export([open_db_trans/2, drop_db_trans/1, claim_site/2]). +-export([ + open_db_trans/2, + drop_db_trans/1, + claim_site/2, + in_sync_replicas_trans/2, + set_leader_trans/3, + n_shards/1 +]). -export_type([site/0]). @@ -35,6 +58,8 @@ %% Type declarations %%================================================================================ +-define(SERVER, ?MODULE). + -define(SHARD, emqx_ds_builtin_metadata_shard). %% DS database metadata: -define(META_TAB, emqx_ds_builtin_metadata_tab). @@ -56,7 +81,10 @@ -record(?SHARD_TAB, { shard :: {emqx_ds:db(), emqx_ds_replication_layer:shard_id()}, + %% Sites that the replica_set :: [site()], + %% Sites that contain the actual data: + in_sync_replicas :: [site()], leader :: node() | undefined, misc = #{} :: map() }). @@ -64,14 +92,21 @@ %% Persistent ID of the node (independent from the IP/FQDN): -type site() :: binary(). +%% Peristent term key: +-define(emqx_ds_builtin_site, emqx_ds_builtin_site). + %%================================================================================ %% API funcions %%================================================================================ --spec init() -> ok. -init() -> - ensure_tables(), - ensure_site(). +-spec n_shards(emqx_ds:db()) -> pos_integer(). +n_shards(DB) -> + [#?META_TAB{db_props = #{n_shards := NShards}}] = mnesia:dirty_read(?META_TAB, DB), + NShards. + +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. shards(DB) -> @@ -79,6 +114,20 @@ shards(DB) -> qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB]) ). +-spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. +my_shards(DB) -> + Site = this_site(), + eval_qlc( + qlc:q([ + Shard + || #?SHARD_TAB{shard = {D, Shard}, replica_set = ReplicaSet, in_sync_replicas = InSync} <- mnesia:table( + ?SHARD_TAB + ), + D =:= DB, + lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync) + ]) + ). + -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> {ok, [site()]} | {error, _}. replica_set(DB, Shard) -> @@ -89,10 +138,37 @@ replica_set(DB, Shard) -> {error, no_shard} end. +-spec in_sync_replicas(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + [site()]. +in_sync_replicas(DB, ShardId) -> + {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:in_sync_replicas_trans/2, [DB, ShardId]), + case Result of + {ok, InSync} -> + InSync; + {error, _} -> + [] + end. + -spec sites() -> [site()]. sites() -> eval_qlc(qlc:q([Site || #?NODE_TAB{site = Site} <- mnesia:table(?NODE_TAB)])). +-spec shard_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, node()} | {error, no_leader_for_shard}. +shard_leader(DB, Shard) -> + case mnesia:dirty_read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{leader = Leader}] -> + {ok, Leader}; + [] -> + {error, no_leader_for_shard} + end. + +-spec set_leader(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), node()) -> + ok. +set_leader(DB, Shard, Node) -> + {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]), + ok. + -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). open_db(DB, DefaultOpts) -> @@ -108,6 +184,29 @@ drop_db(DB) -> %% behavior callbacks %%================================================================================ +-record(s, {}). + +init([]) -> + process_flag(trap_exit, true), + logger:set_process_metadata(#{domain => [ds, meta]}), + ensure_tables(), + ensure_site(), + S = #s{}, + {ok, S}. + +handle_call(_Call, _From, S) -> + {reply, {error, unknown_call}, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, #s{}) -> + persistent_term:erase(?emqx_ds_builtin_site), + ok. + %%================================================================================ %% Internal exports %%================================================================================ @@ -136,6 +235,23 @@ drop_db_trans(DB) -> claim_site(Site, Node) -> mnesia:write(#?NODE_TAB{site = Site, node = Node}). +-spec in_sync_replicas_trans(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> + {ok, [site()]} | {error, no_shard}. +in_sync_replicas_trans(DB, Shard) -> + case mnesia:read(?SHARD_TAB, {DB, Shard}) of + [#?SHARD_TAB{in_sync_replicas = InSync}] -> + {ok, InSync}; + [] -> + {error, no_shard} + end. + +-spec set_leader_trans(emqx_ds:ds(), emqx_ds_replication_layer:shard_id(), node()) -> + ok. +set_leader_trans(DB, Shard, Node) -> + [Record0] = mnesia:wread({?SHARD_TAB, {DB, Shard}}), + Record = Record0#?SHARD_TAB{leader = Node}, + mnesia:write(Record). + %%================================================================================ %% Internal functions %%================================================================================ @@ -182,8 +298,13 @@ ensure_site() -> file:close(FD) end, {atomic, ok} = mria:transaction(?SHARD, fun ?MODULE:claim_site/2, [Site, node()]), + persistent_term:put(?emqx_ds_builtin_site, Site), ok. +-spec this_site() -> site(). +this_site() -> + persistent_term:get(?emqx_ds_builtin_site). + -spec create_shards(emqx_ds:db(), pos_integer(), pos_integer()) -> ok. create_shards(DB, NShards, ReplicationFactor) -> Shards = [integer_to_binary(I) || I <- lists:seq(0, NShards - 1)], @@ -193,10 +314,11 @@ create_shards(DB, NShards, ReplicationFactor) -> Hashes0 = [{hash(Shard, Site), Site} || Site <- Sites], Hashes = lists:sort(Hashes0), {_, Sites} = lists:unzip(Hashes), - ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + [First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), Record = #?SHARD_TAB{ shard = {DB, Shard}, - replica_set = ReplicaSet + replica_set = ReplicaSet, + in_sync_replicas = [First] }, mnesia:write(Record) end, diff --git a/apps/emqx_durable_storage/src/emqx_ds_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_sup.erl index d371a2346..82e2711be 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_sup.erl @@ -30,7 +30,7 @@ start_link() -> %%================================================================================ init([]) -> - Children = [storage_layer_sup()], + Children = [meta(), storage_layer_sup()], SupFlags = #{ strategy => one_for_all, intensity => 0, @@ -42,6 +42,15 @@ init([]) -> %% Internal functions %%================================================================================ +meta() -> + #{ + id => emqx_ds_replication_layer_meta, + start => {emqx_ds_replication_layer_meta, start_link, []}, + restart => permanent, + type => worker, + shutdown => 5000 + }. + storage_layer_sup() -> #{ id => local_store_shard_sup, diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 10d1ed7a5..758b5148b 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -19,7 +19,7 @@ -include_lib("emqx_utils/include/bpapi.hrl"). %% API: --export([open_shard/4, drop_shard/3, store_batch/5, get_streams/5, make_iterator/6, next/5]). +-export([drop_db/2, store_batch/5, get_streams/5, make_iterator/6, next/5]). %% behavior callbacks: -export([introduced_in/0]). @@ -28,20 +28,10 @@ %% API funcions %%================================================================================ --spec open_shard( - node(), - emqx_ds:db(), - emqx_ds_replication_layer:shard_id(), - emqx_ds:create_db_opts() -) -> - ok. -open_shard(Node, DB, Shard, Opts) -> - erpc:call(Node, emqx_ds_replication_layer, do_open_shard_v1, [DB, Shard, Opts]). - --spec drop_shard(node(), emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> - ok. -drop_shard(Node, DB, Shard) -> - erpc:call(Node, emqx_ds_replication_layer, do_drop_shard_v1, [DB, Shard]). +-spec drop_db([node()], emqx_ds:db()) -> + [{ok, ok} | erpc:caught_call_exception()]. +drop_db(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_drop_db_v1, [DB]). -spec get_streams( node(), diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index e42f576b2..8a46804b0 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -23,7 +23,7 @@ -include_lib("stdlib/include/assert.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --define(N_SHARDS, 8). +-define(N_SHARDS, 1). opts() -> #{ @@ -38,18 +38,32 @@ opts() -> t_00_smoke_open_drop(_Config) -> DB = 'DB', ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + %% Check metadata: + %% We have only one site: [Site] = emqx_ds_replication_layer_meta:sites(), + %% Check all shards: Shards = emqx_ds_replication_layer_meta:shards(DB), + %% Since there is only one site all shards should be allocated + %% to this site: + MyShards = emqx_ds_replication_layer_meta:my_shards(DB), ?assertEqual(?N_SHARDS, length(Shards)), lists:foreach( fun(Shard) -> ?assertEqual( - {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard), {DB, Shard} - ) + {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) + ), + ?assertEqual( + [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) + ), + %% Check that the leader is eleected; + ?assertEqual({ok, node()}, emqx_ds_replication_layer_meta:shard_leader(DB, Shard)) end, Shards ), + ?assertEqual(lists:sort(Shards), lists:sort(MyShards)), + %% Reopen the DB and make sure the operation is idempotent: ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + %% Close the DB: ?assertMatch(ok, emqx_ds:drop_db(DB)). %% A simple smoke test that verifies that storing the messages doesn't @@ -153,11 +167,11 @@ end_per_suite(Config) -> ok. init_per_testcase(_TC, Config) -> - %% snabbkaffe:fix_ct_logging(), application:ensure_all_started(emqx_durable_storage), Config. end_per_testcase(_TC, _Config) -> ok = application:stop(emqx_durable_storage), - mnesia:delete_schema([node()]), - mria:stop(). + mria:stop(), + _ = mnesia:delete_schema([node()]), + ok. diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index b6b84bce2..7b733406d 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -16,7 +16,7 @@ -define(DEFAULT_CONFIG, #{ backend => builtin, storage => {emqx_ds_storage_bitfield_lts, #{}}, - n_shards => 255, + n_shards => 16, replication_factor => 3 }). @@ -26,7 +26,7 @@ {emqx_ds_storage_bitfield_lts, #{ bits_per_wildcard_level => 8 }}, - n_shards => 255, + n_shards => 16, replication_factor => 3 }). @@ -391,7 +391,7 @@ end_per_testcase(TC, _Config) -> ok = emqx_ds_storage_layer_sup:stop_shard(shard(TC)). shard(TC) -> - {?MODULE, TC}. + {?MODULE, atom_to_binary(TC)}. keyspace(TC) -> TC.