diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 662db0c27..2f6e93714 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -22,7 +22,7 @@ -module(emqx_ds). %% Management API: --export([open_db/2, drop_db/1]). +-export([open_db/2, update_db_config/2, add_generation/1, drop_db/1]). %% Message storage API: -export([store_batch/2, store_batch/3]). @@ -95,7 +95,7 @@ %% Timestamp %% Earliest possible timestamp is 0. -%% TODO granularity? Currently, we should always use micro second, as that's the unit we +%% TODO granularity? Currently, we should always use milliseconds, as that's the unit we %% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). @@ -124,6 +124,10 @@ -callback open_db(db(), create_db_opts()) -> ok | {error, _}. +-callback update_db_config(db(), create_db_opts()) -> ok | {error, _}. + +-callback add_generation(db()) -> ok | {error, _}. + -callback drop_db(db()) -> ok | {error, _}. -callback store_batch(db(), [emqx_types:message()], message_store_opts()) -> store_batch_result(). @@ -154,6 +158,24 @@ open_db(DB, Opts = #{backend := Backend}) when Backend =:= builtin orelse Backen persistent_term:put(?persistent_term(DB), Module), ?module(DB):open_db(DB, Opts). +-spec update_db_config(db(), create_db_opts()) -> ok. +update_db_config(DB, Opts) -> + case persistent_term:get(?persistent_term(DB), undefined) of + undefined -> + ok; + Module -> + Module:update_db_config(DB, Opts) + end. + +-spec add_generation(db()) -> ok. +add_generation(DB) -> + case persistent_term:get(?persistent_term(DB), undefined) of + undefined -> + ok; + Module -> + Module:add_generation(DB) + end. + %% @doc TODO: currently if one or a few shards are down, they won't be %% deleted. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 5eac27471..6bc0f9f32 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -23,6 +23,8 @@ -export([ list_shards/1, open_db/2, + update_db_config/2, + add_generation/1, drop_db/1, store_batch/3, get_streams/3, @@ -38,7 +40,8 @@ do_get_streams_v1/4, do_make_iterator_v1/5, do_update_iterator_v2/4, - do_next_v1/4 + do_next_v1/4, + do_add_generation_v1/1 ]). -export_type([shard_id/0, builtin_db_opts/0, stream/0, iterator/0, message_id/0, batch/0]). @@ -121,6 +124,16 @@ open_db(DB, CreateOpts) -> MyShards ). +-spec update_db_config(emqx_ds:db(), builtin_db_opts()) -> ok | {error, _}. +update_db_config(DB, CreateOpts) -> + emqx_ds_replication_layer_meta:update_db_config(DB, CreateOpts). + +-spec add_generation(emqx_ds:db()) -> ok | {error, _}. +add_generation(DB) -> + Nodes = emqx_ds_replication_layer_meta:leader_nodes(DB), + _ = emqx_ds_proto_v1:add_generation(Nodes, DB), + ok. + -spec drop_db(emqx_ds:db()) -> ok | {error, _}. drop_db(DB) -> Nodes = list_nodes(), @@ -284,6 +297,17 @@ do_update_iterator_v2(DB, Shard, OldIter, DSKey) -> do_next_v1(DB, Shard, Iter, BatchSize) -> emqx_ds_storage_layer:next({DB, Shard}, Iter, BatchSize). +-spec do_add_generation_v1(emqx_ds:db()) -> ok | {error, _}. +do_add_generation_v1(DB) -> + MyShards = emqx_ds_replication_layer_meta:my_owned_shards(DB), + + lists:foreach( + fun(ShardId) -> + emqx_ds_storage_layer:add_generation({DB, ShardId}) + end, + MyShards + ). + %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl index a2fc9dbbf..7a30139f8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_meta.erl @@ -21,20 +21,26 @@ %% implementation details from this module. -module(emqx_ds_replication_layer_meta). +-compile(inline). + -behaviour(gen_server). %% API: -export([ shards/1, my_shards/1, + my_owned_shards/1, + leader_nodes/1, replica_set/2, in_sync_replicas/2, sites/0, open_db/2, + update_db_config/2, drop_db/1, shard_leader/2, this_site/0, set_leader/3, + is_leader/1, print_status/0 ]). @@ -44,16 +50,19 @@ %% internal exports: -export([ open_db_trans/2, + update_db_config_trans/2, drop_db_trans/1, claim_site/2, in_sync_replicas_trans/2, set_leader_trans/3, + is_leader_trans/1, n_shards/1 ]). -export_type([site/0]). -include_lib("stdlib/include/qlc.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). %%================================================================================ %% Type declarations @@ -145,22 +154,34 @@ start_link() -> -spec shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. shards(DB) -> - eval_qlc( - qlc:q([Shard || #?SHARD_TAB{shard = {D, Shard}} <- mnesia:table(?SHARD_TAB), D =:= DB]) - ). + filter_shards(DB). -spec my_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. my_shards(DB) -> Site = this_site(), - eval_qlc( - qlc:q([ - Shard - || #?SHARD_TAB{shard = {D, Shard}, replica_set = ReplicaSet, in_sync_replicas = InSync} <- mnesia:table( - ?SHARD_TAB - ), - D =:= DB, - lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync) - ]) + filter_shards(DB, fun(#?SHARD_TAB{replica_set = ReplicaSet, in_sync_replicas = InSync}) -> + lists:member(Site, ReplicaSet) orelse lists:member(Site, InSync) + end). + +-spec my_owned_shards(emqx_ds:db()) -> [emqx_ds_replication_layer:shard_id()]. +my_owned_shards(DB) -> + Self = node(), + filter_shards(DB, fun(#?SHARD_TAB{leader = Leader}) -> + Self =:= Leader + end). + +-spec leader_nodes(emqx_ds:db()) -> [node()]. +leader_nodes(DB) -> + lists:uniq( + filter_shards( + DB, + fun(#?SHARD_TAB{leader = Leader}) -> + Leader =/= undefined + end, + fun(#?SHARD_TAB{leader = Leader}) -> + Leader + end + ) ). -spec replica_set(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> @@ -204,12 +225,25 @@ set_leader(DB, Shard, Node) -> {atomic, _} = mria:transaction(?SHARD, fun ?MODULE:set_leader_trans/3, [DB, Shard, Node]), ok. +-spec is_leader(node()) -> boolean(). +is_leader(Node) -> + {atomic, Result} = mria:transaction(?SHARD, fun ?MODULE:is_leader_trans/1, [Node]), + Result. + -spec open_db(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> emqx_ds_replication_layer:builtin_db_opts(). open_db(DB, DefaultOpts) -> {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:open_db_trans/2, [DB, DefaultOpts]), Opts. +-spec update_db_config(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> + ok | {error, _}. +update_db_config(DB, DefaultOpts) -> + {atomic, Opts} = mria:transaction(?SHARD, fun ?MODULE:update_db_config_trans/2, [ + DB, DefaultOpts + ]), + Opts. + -spec drop_db(emqx_ds:db()) -> ok. drop_db(DB) -> _ = mria:transaction(?SHARD, fun ?MODULE:drop_db_trans/1, [DB]), @@ -226,6 +260,7 @@ init([]) -> logger:set_process_metadata(#{domain => [ds, meta]}), ensure_tables(), ensure_site(), + {ok, _} = mnesia:subscribe({table, ?META_TAB, detailed}), S = #s{}, {ok, S}. @@ -235,6 +270,18 @@ handle_call(_Call, _From, S) -> handle_cast(_Cast, S) -> {noreply, S}. +handle_info( + {mnesia_table_event, {write, ?META_TAB, #?META_TAB{db = DB, db_props = Options}, [_], _}}, S +) -> + MyShards = my_owned_shards(DB), + + lists:foreach( + fun(ShardId) -> + emqx_ds_storage_layer:update_config({DB, ShardId}, Options) + end, + MyShards + ), + {noreply, S}; handle_info(_Info, S) -> {noreply, S}. @@ -260,6 +307,31 @@ open_db_trans(DB, CreateOpts) -> Opts end. +-spec update_db_config_trans(emqx_ds:db(), emqx_ds_replication_layer:builtin_db_opts()) -> + ok | {error, database}. +update_db_config_trans(DB, CreateOpts) -> + case mnesia:wread({?META_TAB, DB}) of + [#?META_TAB{db_props = Opts}] -> + %% Since this is an update and not a reopen, + %% we should keep the shard number and replication factor + %% and not create a new shard server + #{ + n_shards := NShards, + replication_factor := ReplicationFactor + } = Opts, + + mnesia:write(#?META_TAB{ + db = DB, + db_props = CreateOpts#{ + n_shards := NShards, + replication_factor := ReplicationFactor + } + }), + ok; + [] -> + {error, no_database} + end. + -spec drop_db_trans(emqx_ds:db()) -> ok. drop_db_trans(DB) -> mnesia:delete({?META_TAB, DB}), @@ -287,6 +359,24 @@ set_leader_trans(DB, Shard, Node) -> Record = Record0#?SHARD_TAB{leader = Node}, mnesia:write(Record). +-spec is_leader_trans(node) -> boolean(). +is_leader_trans(Node) -> + case + mnesia:select( + ?SHARD_TAB, + ets:fun2ms(fun(#?SHARD_TAB{leader = Leader}) -> + Leader =:= Node + end), + 1, + read + ) + of + {[_ | _], _Cont} -> + true; + _ -> + false + end. + %%================================================================================ %% Internal functions %%================================================================================ @@ -346,7 +436,7 @@ create_shards(DB, NShards, ReplicationFactor) -> Hashes0 = [{hash(Shard, Site), Site} || Site <- AllSites], Hashes = lists:sort(Hashes0), {_, Sites} = lists:unzip(Hashes), - [First | _] = ReplicaSet = lists:sublist(Sites, 1, ReplicationFactor), + [First | ReplicaSet] = lists:sublist(Sites, 1, ReplicationFactor), Record = #?SHARD_TAB{ shard = {DB, Shard}, replica_set = ReplicaSet, @@ -369,3 +459,30 @@ eval_qlc(Q) -> {atomic, Result} = mria:ro_transaction(?SHARD, fun() -> qlc:eval(Q) end), Result end. + +filter_shards(DB) -> + filter_shards(DB, const(true)). + +-spec filter_shards(emqx_ds:db(), fun((_) -> boolean())) -> + [emqx_ds_replication_layer:shard_id()]. +filter_shards(DB, Predicte) -> + filter_shards(DB, Predicte, fun(#?SHARD_TAB{shard = {_, ShardId}}) -> + ShardId + end). + +filter_shards(DB, Predicte, Maper) -> + eval_qlc( + qlc:q([ + Maper(Shard) + || #?SHARD_TAB{shard = {D, _}} = Shard <- mnesia:table( + ?SHARD_TAB + ), + D =:= DB, + Predicte(Shard) + ]) + ). + +const(Result) -> + fun(_) -> + Result + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index a88d02248..2fb2d1c1f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -25,7 +25,9 @@ get_streams/3, make_iterator/4, update_iterator/3, - next/3 + next/3, + update_config/2, + add_generation/1 ]). %% gen_server @@ -47,6 +49,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). + %%================================================================================ %% Type declarations %%================================================================================ @@ -249,13 +253,21 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch Error end. +-spec update_config(shard_id(), emqx_ds:builtin_db_opts()) -> ok. +update_config(ShardId, Options) -> + gen_server:call(?REF(ShardId), {?FUNCTION_NAME, Options}, infinity). + +-spec add_generation(shard_id()) -> ok. +add_generation(ShardId) -> + gen_server:call(?REF(ShardId), add_generation, infinity). + %%================================================================================ %% gen_server for the shard %%================================================================================ -define(REF(ShardId), {via, gproc, {n, l, {?MODULE, ShardId}}}). --spec start_link(shard_id(), options()) -> +-spec start_link(shard_id(), emqx_ds:builtin_db_options()) -> {ok, pid()}. start_link(Shard = {_, _}, Options) -> gen_server:start_link(?REF(Shard), ?MODULE, {Shard, Options}, []). @@ -300,6 +312,18 @@ init({ShardId, Options}) -> commit_metadata(S), {ok, S}. +handle_call({update_config, Options}, _From, #s{schema = Schema} = S0) -> + Prototype = maps:get(storage, Options), + S1 = S0#s{schema = Schema#{prototype := Prototype}}, + Since = emqx_message:timestamp_now(), + S = add_generation(S1, Since), + commit_metadata(S), + {reply, ok, S}; +handle_call(add_generation, _From, S0) -> + Since = emqx_message:timestamp_now(), + S = add_generation(S0, Since), + commit_metadata(S), + {reply, ok, S}; handle_call(#call_create_generation{since = Since}, _From, S0) -> S = add_generation(S0, Since), commit_metadata(S), @@ -342,11 +366,13 @@ open_shard(ShardId, DB, CFRefs, ShardSchema) -> -spec add_generation(server_state(), emqx_ds:time()) -> server_state(). add_generation(S0, Since) -> #s{shard_id = ShardId, db = DB, schema = Schema0, shard = Shard0, cf_refs = CFRefs0} = S0, - {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema0, Since), + Schema1 = update_last_until(Schema0, Since), + Shard1 = update_last_until(Shard0, Since), + {GenId, Schema, NewCFRefs} = new_generation(ShardId, DB, Schema1, Since), CFRefs = NewCFRefs ++ CFRefs0, Key = {generation, GenId}, Generation = open_generation(ShardId, DB, CFRefs, GenId, maps:get(Key, Schema)), - Shard = Shard0#{Key => Generation}, + Shard = Shard1#{current_generation := GenId, Key => Generation}, S0#s{ cf_refs = CFRefs, schema = Schema, @@ -426,6 +452,13 @@ rocksdb_open(Shard, Options) -> db_dir({DB, ShardId}) -> filename:join([emqx:data_dir(), atom_to_list(DB), binary_to_list(ShardId)]). +-spec update_last_until(Schema, emqx_ds:time()) -> Schema when Schema :: shard_schema() | shard(). +update_last_until(Schema, Until) -> + #{current_generation := GenId} = Schema, + GenData0 = maps:get({generation, GenId}, Schema), + GenData = GenData0#{until := Until}, + Schema#{{generation, GenId} := GenData}. + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl index 67ed1a3ca..f4b792632 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v1.erl @@ -24,7 +24,8 @@ store_batch/5, get_streams/5, make_iterator/6, - next/5 + next/5, + add_generation/2 ]). %% behavior callbacks: @@ -89,6 +90,11 @@ store_batch(Node, DB, Shard, Batch, Options) -> DB, Shard, Batch, Options ]). +-spec add_generation([node()], emqx_ds:db()) -> + [{ok, ok} | erpc:caught_call_exception()]. +add_generation(Node, DB) -> + erpc:multicall(Node, emqx_ds_replication_layer, do_add_generation_v1, [DB]). + %%================================================================================ %% behavior callbacks %%================================================================================ diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index f44000eeb..b99fd6a8d 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -50,7 +50,7 @@ t_00_smoke_open_drop(_Config) -> lists:foreach( fun(Shard) -> ?assertEqual( - {ok, [Site]}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) + {ok, []}, emqx_ds_replication_layer_meta:replica_set(DB, Shard) ), ?assertEqual( [Site], emqx_ds_replication_layer_meta:in_sync_replicas(DB, Shard) @@ -155,6 +155,127 @@ t_05_update_iterator(_Config) -> ?assertEqual(Msgs, AllMsgs, #{from_key => Iter1, final_iter => FinalIter}), ok. +t_05_update(_Config) -> + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + TopicFilter = ['#'], + + DataSet = update_data_set(), + + ToMsgs = fun(Datas) -> + lists:map( + fun({Topic, Payload}) -> + message(Topic, Payload, emqx_message:timestamp_now()) + end, + Datas + ) + end, + + {_, StartTimes, MsgsList} = + lists:foldl( + fun + (Datas, {true, TimeAcc, MsgAcc}) -> + Msgs = ToMsgs(Datas), + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + {false, TimeAcc, [Msgs | MsgAcc]}; + (Datas, {Any, TimeAcc, MsgAcc}) -> + timer:sleep(500), + ?assertMatch(ok, emqx_ds:update_db_config(DB, opts())), + timer:sleep(500), + StartTime = emqx_message:timestamp_now(), + Msgs = ToMsgs(Datas), + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + {Any, [StartTime | TimeAcc], [Msgs | MsgAcc]} + end, + {true, [emqx_message:timestamp_now()], []}, + DataSet + ), + + Checker = fun({StartTime, Msgs0}, Acc) -> + Msgs = Msgs0 ++ Acc, + Batch = fetch_all(DB, TopicFilter, StartTime), + ?assertEqual(Msgs, Batch, {StartTime}), + Msgs + end, + lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). + +t_06_add_generation(_Config) -> + DB = ?FUNCTION_NAME, + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + TopicFilter = ['#'], + + DataSet = update_data_set(), + + ToMsgs = fun(Datas) -> + lists:map( + fun({Topic, Payload}) -> + message(Topic, Payload, emqx_message:timestamp_now()) + end, + Datas + ) + end, + + {_, StartTimes, MsgsList} = + lists:foldl( + fun + (Datas, {true, TimeAcc, MsgAcc}) -> + Msgs = ToMsgs(Datas), + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + {false, TimeAcc, [Msgs | MsgAcc]}; + (Datas, {Any, TimeAcc, MsgAcc}) -> + timer:sleep(500), + ?assertMatch(ok, emqx_ds:add_generation(DB)), + timer:sleep(500), + StartTime = emqx_message:timestamp_now(), + Msgs = ToMsgs(Datas), + ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs)), + {Any, [StartTime | TimeAcc], [Msgs | MsgAcc]} + end, + {true, [emqx_message:timestamp_now()], []}, + DataSet + ), + + Checker = fun({StartTime, Msgs0}, Acc) -> + Msgs = Msgs0 ++ Acc, + Batch = fetch_all(DB, TopicFilter, StartTime), + ?assertEqual(Msgs, Batch, {StartTime}), + Msgs + end, + lists:foldl(Checker, [], lists:zip(StartTimes, MsgsList)). + +update_data_set() -> + [ + [ + {<<"foo/bar">>, <<"1">>} + ], + + [ + {<<"foo">>, <<"2">>} + ], + + [ + {<<"bar/bar">>, <<"3">>} + ] + ]. + +fetch_all(DB, TopicFilter, StartTime) -> + Streams0 = emqx_ds:get_streams(DB, TopicFilter, StartTime), + Streams = lists:sort( + fun({{_, A}, _}, {{_, B}, _}) -> + A < B + end, + Streams0 + ), + lists:foldl( + fun({_, Stream}, Acc) -> + {ok, Iter0} = emqx_ds:make_iterator(DB, Stream, TopicFilter, StartTime), + {ok, _, Msgs} = iterate(DB, Iter0, StartTime), + Acc ++ Msgs + end, + [], + Streams + ). + message(Topic, Payload, PublishedAt) -> #message{ topic = Topic, @@ -172,6 +293,8 @@ iterate(DB, It0, BatchSize, Acc) -> {ok, It, Acc}; {ok, It, Msgs} -> iterate(DB, It, BatchSize, Acc ++ Msgs); + {ok, end_of_stream} -> + {ok, It0, Acc}; Ret -> Ret end.