diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl index 30b72e5a8..971805351 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -77,7 +77,8 @@ stop_db(DB) -> %% @doc Set a DB-global variable. Please don't abuse this API. -spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok. set_gvar(DB, Key, Val) -> - ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}). + ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}), + ok. -spec get_gvar(emqx_ds:db(), _Key, Val) -> Val. get_gvar(DB, Key, Default) -> @@ -123,7 +124,7 @@ init(?top) -> type => supervisor, shutdown => infinity }, - ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), + _ = ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), %% SupFlags = #{ strategy => one_for_all, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index c79d60d07..90a26c484 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -36,7 +36,8 @@ update_iterator/3, next/3, delete_next/4, - shard_of_message/3 + shard_of_message/3, + current_timestamp/2 ]). %% internal exports: @@ -65,6 +66,7 @@ -export([ init/1, apply/3, + tick/2, snapshot_module/0 ]). @@ -161,6 +163,8 @@ -type timestamp_us() :: non_neg_integer(). +-define(gv_timestamp(SHARD), {gv_timestamp, SHARD}). + %%================================================================================ %% API functions %%================================================================================ @@ -367,6 +371,12 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> foreach_shard(DB, Fun) -> lists:foreach(Fun, list_shards(DB)). +%% @doc Messages have been replicated up to this timestamp on the +%% local server +-spec current_timestamp(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> emqx_ds:time(). +current_timestamp(DB, Shard) -> + emqx_ds_builtin_sup:get_gvar(DB, ?gv_timestamp(Shard), 0). + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -491,7 +501,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, ?IF_STORAGE_RUNNING( ShardId, - emqx_ds_storage_layer:next(ShardId, Iter, BatchSize, emqx_ds:timestamp_us()) + emqx_ds_storage_layer:next( + ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard) + ) ). -spec do_delete_next_v4( @@ -504,7 +516,11 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()). do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> emqx_ds_storage_layer:delete_next( - {DB, Shard}, Iter, Selector, BatchSize, emqx_ds:timestamp_us() + {DB, Shard}, + Iter, + Selector, + BatchSize, + emqx_ds_replication_layer:current_timestamp(DB, Shard) ). -spec do_add_generation_v2(emqx_ds:db()) -> no_return(). @@ -675,7 +691,7 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{db_shard := DBShard, latest := Latest0} = State + #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State ) -> %% NOTE %% Unique timestamp tracking real time closely. @@ -686,6 +702,7 @@ apply( NState = State#{latest := Latest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, + emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), Latest), {NState, Result, Effect}; apply( _RaftMeta, @@ -711,7 +728,20 @@ apply( #{db_shard := DBShard} = State ) -> Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), - {State, Result}. + {State, Result}; +apply( + _RaftMeta, + #{?tag := storage_event, ?payload := CustomEvent}, + #{db_shard := DBShard, latest := Latest0} = State +) -> + {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0), + Effects = handle_custom_event(DBShard, Timestamp, CustomEvent), + {State#{latest := Latest}, ok, Effects}. + +-spec tick(integer(), ra_state()) -> ra_machine:effects(). +tick(TimeMs, #{db_shard := DBShard, latest := Latest}) -> + {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest), + handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> assign_timestamps(Latest, Messages, []). @@ -744,3 +774,13 @@ timeus_to_timestamp(TimestampUs) -> snapshot_module() -> emqx_ds_replication_snapshot. + +handle_custom_event(DBShard, Latest, Event) -> + try + Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event), + [{append, #{?tag => storage_event, ?payload => I}} || I <- Events] + catch + EC:Err:Stacktrace -> + logger:error(#{EC => Err, stacktrace => Stacktrace, msg => "ds_storage_layer_tick"}), + [] + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 70812fa18..960824143 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -41,4 +41,7 @@ %% drop_generation -define(generation, 2). +%% custom events +-define(payload, 2). + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index e0e70596a..ac495be1c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -16,6 +16,7 @@ -module(emqx_ds_replication_layer_shard). +%% API: -export([start_link/3]). %% Static server configuration diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 5947b2300..7342b097d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -36,7 +36,9 @@ update_iterator/4, next/5, delete_next/6, - post_creation_actions/1 + post_creation_actions/1, + + handle_event/4 ]). %% internal exports: @@ -90,7 +92,8 @@ trie :: emqx_ds_lts:trie(), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), ts_bits :: non_neg_integer(), - ts_offset :: non_neg_integer() + ts_offset :: non_neg_integer(), + gvars :: ets:table() }). -type s() :: #s{}. @@ -142,6 +145,10 @@ -define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]). +%% GVar used for idle detection: +-define(IDLE_DETECT, idle_detect). +-define(EPOCH(S, TS), (TS bsl S#s.ts_bits)). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -215,7 +222,8 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> trie = Trie, keymappers = KeymapperCache, ts_offset = TSOffsetBits, - ts_bits = TSBits + ts_bits = TSBits, + gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}]) }. -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> @@ -240,8 +248,9 @@ post_creation_actions( s() ) -> ok. -drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) -> +drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> emqx_ds_lts:destroy(Trie), + catch ets:delete(GVars), {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), ok = rocksdb:drop_column_family(DBHandle, DataCF), @@ -255,18 +264,21 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) -> emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> +store_batch(_ShardId, S = #s{db = DB, data = Data, gvars = Gvars}, Messages, _Options) -> {ok, Batch} = rocksdb:batch(), - lists:foreach( - fun({Timestamp, Msg}) -> + MaxTs = lists:foldl( + fun({Timestamp, Msg}, Acc) -> {Key, _} = make_key(S, Timestamp, Msg), Val = serialize(Msg), - rocksdb:put(DB, Data, Key, Val, []) + ok = rocksdb:put(DB, Data, Key, Val, []), + max(Acc, Timestamp) end, + 0, Messages ), Result = rocksdb:write_batch(DB, Batch, []), rocksdb:release_batch(Batch), + ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to %% observe until there's `{no_slowdown, true}` in write options. @@ -469,6 +481,20 @@ delete_next_until( rocksdb:iterator_close(ITHandle) end. +handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> + %% Cause replication layer to bump timestamp when idle + case ets:lookup(Gvars, ?IDLE_DETECT) of + [{?IDLE_DETECT, false, LastWrittenTs}] when + ?EPOCH(State, LastWrittenTs) > ?EPOCH(State, Time) + -> + ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}), + [emqx_ds_storage_bitfield_lts_dummy_event]; + _ -> + [] + end; +handle_event(_ShardId, _Data, _Time, _Event) -> + []. + %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 36dc813e5..fff3a77f3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -42,7 +42,10 @@ %% Snapshotting take_snapshot/1, - accept_snapshot/1 + accept_snapshot/1, + + %% Custom events + handle_event/3 ]). %% gen_server @@ -79,7 +82,6 @@ %% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending %% records over the wire. - %% tags: -define(STREAM, 1). -define(IT, 2). @@ -201,6 +203,7 @@ -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> _Data. +%% Delete the schema and data -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> ok | {error, _Reason}. @@ -231,9 +234,11 @@ ) -> {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. +-callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. + -callback post_creation_actions(post_creation_context()) -> _Data. --optional_callbacks([post_creation_actions/1]). +-optional_callbacks([post_creation_actions/1, handle_event/4]). %%================================================================================ %% API for the replication layer @@ -857,6 +862,24 @@ handle_accept_snapshot(ShardId) -> Dir = db_dir(ShardId), emqx_ds_storage_snapshot:new_writer(Dir). +%% FIXME: currently this interface is a hack to handle safe cutoff +%% timestamp in LTS. It has many shortcomings (can lead to infinite +%% loops if the CBM is not careful; events from one generation may be +%% sent to the next one, etc.) and the API is not well thought out in +%% general. +%% +%% The mechanism of storage layer events should be refined later. +-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. +handle_event(Shard, Time, Event) -> + #{module := Mod, data := GenData} = generation_at(Shard, Time), + ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), + case erlang:function_exported(Mod, handle_event, 4) of + true -> + Mod:handle_event(Shard, GenData, Time, Event); + false -> + [] + end. + %%-------------------------------------------------------------------------------- %% Schema access %%--------------------------------------------------------------------------------