From 2236af84bac27e71772ae85a978ec7ab0e398e37 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 8 May 2024 10:01:16 +0200 Subject: [PATCH] feat(ds): two-stage storage commit on the storage level --- .../src/emqx_ds_replication_layer.erl | 3 +- .../src/emqx_ds_storage_bitfield_lts.erl | 107 +++++++++++++++--- .../src/emqx_ds_storage_layer.erl | 80 ++++++++++--- .../src/emqx_ds_storage_reference.erl | 24 ++-- .../test/emqx_ds_replication_SUITE.erl | 1 + 5 files changed, 168 insertions(+), 47 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 315a276ad..afc8db40d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -733,7 +733,7 @@ apply( Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), {State, Result}; apply( - #{index := RaftIdx}, + _RaftMeta, #{?tag := storage_event, ?payload := CustomEvent}, #{db_shard := DBShard, latest := Latest0} = State ) -> @@ -754,7 +754,6 @@ tick(TimeMs, #{db_shard := DBShard = {DB, Shard}, latest := Latest}) -> %% Leader = emqx_ds_replication_layer_shard:lookup_leader(DB, Shard), {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest), ?tp(emqx_ds_replication_layer_tick, #{db => DB, shard => Shard, ts => Timestamp}), - set_ts(DBShard, Latest), handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index db50e49dd..d05296a29 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -28,7 +28,8 @@ create/4, open/5, drop/5, - store_batch/4, + prepare_batch/4, + commit_batch/3, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -68,6 +69,9 @@ -define(start_time, 3). -define(storage_key, 4). -define(last_seen_key, 5). +-define(cooked_payloads, 6). +-define(cooked_lts_ops, 7). +-define(cooked_ts, 8). -type options() :: #{ @@ -90,18 +94,28 @@ db :: rocksdb:db_handle(), data :: rocksdb:cf_handle(), trie :: emqx_ds_lts:trie(), + trie_cf :: rocksdb:cf_handle(), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), ts_bits :: non_neg_integer(), ts_offset :: non_neg_integer(), gvars :: ets:table() }). +-define(lts_persist_ops, emqx_ds_storage_bitfield_lts_ops). + -type s() :: #s{}. -type stream() :: emqx_ds_lts:msg_storage_key(). -type delete_stream() :: emqx_ds_lts:msg_storage_key(). +-type cooked_batch() :: + #{ + ?cooked_payloads := [{binary(), binary()}], + ?cooked_lts_ops := [{binary(), binary()}], + ?cooked_ts := integer() + }. + -type iterator() :: #{ ?tag := ?IT, @@ -220,6 +234,7 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> db = DBHandle, data = DataCF, trie = Trie, + trie_cf = TrieCF, keymappers = KeymapperCache, ts_offset = TSOffsetBits, ts_bits = TSBits, @@ -257,24 +272,65 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> ok = rocksdb:drop_column_family(DBHandle, TrieCF), ok. --spec store_batch( +-spec prepare_batch( emqx_ds_storage_layer:shard_id(), s(), [{emqx_ds:time(), emqx_types:message()}], emqx_ds:message_store_opts() ) -> - emqx_ds:store_batch_result(). -store_batch(_ShardId, S = #s{db = DB, data = Data, gvars = Gvars}, Messages, _Options) -> + {ok, cooked_batch()}. +prepare_batch(_ShardId, S, Messages, _Options) -> + _ = erase(?lts_persist_ops), + {Payloads, MaxTs} = + lists:mapfoldl( + fun({Timestamp, Msg}, Acc) -> + {Key, _} = make_key(S, Timestamp, Msg), + Payload = {Key, message_to_value_v1(Msg)}, + {Payload, max(Acc, Timestamp)} + end, + 0, + Messages + ), + {ok, #{ + ?cooked_payloads => Payloads, + ?cooked_lts_ops => pop_lts_persist_ops(), + ?cooked_ts => MaxTs + }}. + +-spec commit_batch( + emqx_ds_storage_layer:shard_id(), + s(), + cooked_batch() +) -> ok. +commit_batch( + _ShardId, + _Data, + #{?cooked_payloads := [], ?cooked_lts_ops := LTS} +) -> + %% Assert: + [] = LTS, + ok; +commit_batch( + _ShardId, + #s{db = DB, data = DataCF, trie = Trie, trie_cf = TrieCF, gvars = Gvars}, + #{?cooked_lts_ops := LtsOps, ?cooked_payloads := Payloads, ?cooked_ts := MaxTs} +) -> {ok, Batch} = rocksdb:batch(), - MaxTs = lists:foldl( - fun({Timestamp, Msg}, Acc) -> - {Key, _} = make_key(S, Timestamp, Msg), - Val = serialize(Msg), - ok = rocksdb:put(DB, Data, Key, Val, []), - max(Acc, Timestamp) + %% Commit LTS trie to the storage: + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, TrieCF, term_to_binary(Key), term_to_binary(Val)) end, - 0, - Messages + LtsOps + ), + %% Apply LTS ops to the memory cache: + _ = emqx_ds_lts:trie_update(Trie, LtsOps), + %% Commit payloads: + lists:foreach( + fun({Key, Val}) -> + ok = rocksdb:batch_put(Batch, DataCF, Key, term_to_binary(Val)) + end, + Payloads ), Result = rocksdb:write_batch(DB, Batch, []), rocksdb:release_batch(Batch), @@ -780,9 +836,6 @@ value_v1_to_message({Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, E extra = Extra }. -serialize(Msg) -> - term_to_binary(message_to_value_v1(Msg)). - deserialize(Blob) -> value_v1_to_message(binary_to_term(Blob)). @@ -810,7 +863,8 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> -spec restore_trie(pos_integer(), rocksdb:db_handle(), rocksdb:cf_handle()) -> emqx_ds_lts:trie(). restore_trie(TopicIndexBytes, DB, CF) -> PersistCallback = fun(Key, Val) -> - rocksdb:put(DB, CF, term_to_binary(Key), term_to_binary(Val), []) + push_lts_persist_op(Key, Val), + ok end, {ok, IT} = rocksdb:iterator(DB, CF, []), try @@ -858,8 +912,29 @@ data_cf(GenId) -> trie_cf(GenId) -> "emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId). +-spec push_lts_persist_op(_Key, _Val) -> ok. +push_lts_persist_op(Key, Val) -> + case erlang:get(?lts_persist_ops) of + undefined -> + erlang:put(?lts_persist_ops, [{Key, Val}]); + L when is_list(L) -> + erlang:put(?lts_persist_ops, [{Key, Val} | L]) + end. + +-spec pop_lts_persist_ops() -> [{_Key, _Val}]. +pop_lts_persist_ops() -> + case erlang:erase(?lts_persist_ops) of + undefined -> + []; + L when is_list(L) -> + L + end. + -ifdef(TEST). +serialize(Msg) -> + term_to_binary(message_to_value_v1(Msg)). + serialize_deserialize_test() -> Msg = #message{ id = <<"message_id_val">>, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 175a0d515..df1253e1c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -26,6 +26,9 @@ %% Data store_batch/3, + prepare_batch/3, + commit_batch/2, + get_streams/3, get_delete_streams/3, make_iterator/4, @@ -66,7 +69,8 @@ shard_id/0, options/0, prototype/0, - post_creation_context/0 + post_creation_context/0, + cooked_batch/0 ]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -86,6 +90,7 @@ -define(STREAM, 1). -define(IT, 2). -define(DELETE_IT, 3). +-define(COOKED_BATCH, 4). %% keys: -define(tag, 1). @@ -132,6 +137,13 @@ ?enc := term() }. +-opaque cooked_batch() :: + #{ + ?tag := ?COOKED_BATCH, + ?generation := gen_id(), + ?enc := term() + }. + %%%% Generation: -define(GEN_KEY(GEN_ID), {generation, GEN_ID}). @@ -207,13 +219,19 @@ -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> ok | {error, _Reason}. --callback store_batch( +-callback prepare_batch( shard_id(), _Data, - [{emqx_ds:time(), emqx_types:message()}], + [{emqx_ds:time(), emqx_types:message()}, ...], emqx_ds:message_store_opts() ) -> - emqx_ds:store_batch_result(). + {ok, term()} | {error, _}. + +-callback commit_batch( + shard_id(), + _Data, + _CookedBatch +) -> ok. -callback get_streams(shard_id(), _Data, emqx_ds:topic_filter(), emqx_ds:time()) -> [_Stream]. @@ -261,20 +279,54 @@ drop_shard(Shard) -> emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -store_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> - %% NOTE - %% We assume that batches do not span generations. Callers should enforce this. +store_batch(Shard, Messages, Options) -> ?tp(emqx_ds_storage_layer_store_batch, #{ shard => Shard, messages => Messages, options => Options }), - #{module := Mod, data := GenData} = generation_at(Shard, Time), + case prepare_batch(Shard, Messages, Options) of + {ok, CookedBatch} -> + commit_batch(Shard, CookedBatch); + ignore -> + ok; + Error = {error, _} -> + Error + end. + +-spec prepare_batch( + shard_id(), + [{emqx_ds:time(), emqx_types:message()}], + emqx_ds:message_store_opts() +) -> {ok, cooked_batch()} | ignore | {error, _}. +prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> + %% NOTE + %% We assume that batches do not span generations. Callers should enforce this. + ?tp(emqx_ds_storage_layer_prepare_batch, #{ + shard => Shard, messages => Messages, options => Options + }), + {GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), T0 = erlang:monotonic_time(microsecond), - Result = Mod:store_batch(Shard, GenData, Messages, Options), + Result = + case Mod:prepare_batch(Shard, GenData, Messages, Options) of + {ok, CookedBatch} -> + {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; + Error = {error, _} -> + Error + end, T1 = erlang:monotonic_time(microsecond), + %% TODO store->prepare emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), Result; -store_batch(_Shard, [], _Options) -> - ok. +prepare_batch(_Shard, [], _Options) -> + ignore. + +-spec commit_batch(shard_id(), cooked_batch()) -> ok. +commit_batch(Shard, #{?tag := ?COOKED_BATCH, ?generation := GenId, ?enc := CookedBatch}) -> + #{?GEN_KEY(GenId) := #{module := Mod, data := GenData}} = get_schema_runtime(Shard), + T0 = erlang:monotonic_time(microsecond), + Result = Mod:commit_batch(Shard, GenData, CookedBatch), + T1 = erlang:monotonic_time(microsecond), + emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), + Result. -spec get_streams(shard_id(), emqx_ds:topic_filter(), emqx_ds:time()) -> [{integer(), stream()}]. @@ -878,7 +930,7 @@ handle_accept_snapshot(ShardId) -> %% The mechanism of storage layer events should be refined later. -spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. handle_event(Shard, Time, Event) -> - #{module := Mod, data := GenData} = generation_at(Shard, Time), + {_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), case erlang:function_exported(Mod, handle_event, 4) of true -> @@ -919,7 +971,7 @@ generations_since(Shard, Since) -> Schema ). --spec generation_at(shard_id(), emqx_ds:time()) -> generation(). +-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}. generation_at(Shard, Time) -> Schema = #{current_generation := Current} = get_schema_runtime(Shard), generation_at(Time, Current, Schema). @@ -930,7 +982,7 @@ generation_at(Time, GenId, Schema) -> #{since := Since} when Time < Since andalso GenId > 0 -> generation_at(Time, prev_generation_id(GenId), Schema); _ -> - Gen + {GenId, Gen} end. -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 3caf2c732..10007488c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -31,7 +31,8 @@ create/4, open/5, drop/5, - store_batch/4, + prepare_batch/4, + commit_batch/3, get_streams/4, get_delete_streams/4, make_iterator/5, @@ -101,12 +102,14 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) -> ok = rocksdb:drop_column_family(DBHandle, CFHandle), ok. -store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := true}) -> +prepare_batch(_ShardId, _Data, Messages, _Options) -> + {ok, Messages}. + +commit_batch(_ShardId, #s{db = DB, cf = CF}, Messages) -> {ok, Batch} = rocksdb:batch(), lists:foreach( - fun(Msg) -> - Id = erlang:unique_integer([monotonic]), - Key = <>, + fun({TS, Msg}) -> + Key = <>, Val = term_to_binary(Msg), rocksdb:batch_put(Batch, CF, Key, Val) end, @@ -114,16 +117,7 @@ store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := tru ), Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []), rocksdb:release_batch(Batch), - Res; -store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) -> - lists:foreach( - fun({Timestamp, Msg}) -> - Key = <>, - Val = term_to_binary(Msg), - rocksdb:put(DB, CF, Key, Val, []) - end, - Messages - ). + Res. get_streams(_Shard, _Data, _TopicFilter, _StartTime) -> [#stream{}]. diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index b31b9b0c2..4670dfeb0 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -159,6 +159,7 @@ t_rebalance('end', Config) -> t_rebalance(Config) -> NMsgs = 50, NClients = 5, + NEvents = NMsgs * NClients, %% List of fake client IDs: Clients = [integer_to_binary(I) || I <- lists:seq(1, NClients)], %% List of streams that generate messages for each "client" in its own topic: