From 9999ccd36c5d7f1a6dc6c9c26e89db5cfe7129f6 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 22 Apr 2024 17:39:31 +0200 Subject: [PATCH 01/39] feat(ds): Ignore safe cutoff time for streams without varying levels --- .../src/emqx_ds_storage_bitfield_lts.erl | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 2ec6674b6..1cbdb92ee 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -89,6 +89,7 @@ data :: rocksdb:cf_handle(), trie :: emqx_ds_lts:trie(), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), + ts_bits :: non_neg_integer(), ts_offset :: non_neg_integer() }). @@ -213,7 +214,8 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> data = DataCF, trie = Trie, keymappers = KeymapperCache, - ts_offset = TSOffsetBits + ts_offset = TSOffsetBits, + ts_bits = TSBits }. -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> @@ -348,13 +350,39 @@ update_iterator( ) -> {ok, OldIter#{?last_seen_key => DSKey}}. -next(Shard, Schema = #s{ts_offset = TSOffset}, It, BatchSize) -> - %% Compute safe cutoff time. - %% It's the point in time where the last complete epoch ends, so we need to know - %% the current time to compute it. +next( + Shard, + Schema = #s{ts_offset = TSOffset, ts_bits = TSBits}, + It = #{?topic_filter := TF, ?storage_key := Stream}, + BatchSize +) -> init_counters(), - Now = emqx_ds:timestamp_us(), - SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, + %% Compute safe cutoff time. It's the point in time where the last + %% complete epoch ends, so we need to know the current time to + %% compute it. This is needed because new keys can be added before + %% the iterator. + IsWildcard = + case Stream of + {_StaticKey, []} -> false; + _ -> true + end, + SafeCutoffTime = + case IsWildcard of + true -> + Now = emqx_ds:timestamp_us(), + (Now bsr TSOffset) bsl TSOffset; + false -> + %% Iterators scanning streams without varying topic + %% levels can operate on incomplete epochs, since new + %% matching keys for the single topic are added in + %% lexicographic order. + %% + %% Note: this DOES NOT apply to non-wildcard topic + %% filters operating on streams with varying parts: + %% iterator can jump to the next topic and then it + %% won't backtrack. + 1 bsl TSBits - 1 + end, try next_until(Schema, It, SafeCutoffTime, BatchSize) after From bcfa7b2209170e32993e208732276220af3f0873 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 00:20:59 +0200 Subject: [PATCH 02/39] fix(ds): Destroy LTS tries when the generation is dropped --- apps/emqx_durable_storage/src/emqx_ds_lts.erl | 7 +++++++ .../src/emqx_ds_storage_bitfield_lts.erl | 3 ++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_lts.erl index bd7cb3826..e087504de 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_lts.erl @@ -19,6 +19,7 @@ %% API: -export([ trie_create/1, trie_create/0, + destroy/1, trie_restore/2, trie_copy_learned_paths/2, topic_key/3, @@ -116,6 +117,12 @@ trie_create(UserOpts) -> trie_create() -> trie_create(#{}). +-spec destroy(trie()) -> ok. +destroy(#trie{trie = Trie, stats = Stats}) -> + catch ets:delete(Trie), + catch ets:delete(Stats), + ok. + %% @doc Restore trie from a dump -spec trie_restore(options(), [{_Key, _Val}]) -> trie(). trie_restore(Options, Dump) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 1cbdb92ee..80264da79 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -240,7 +240,8 @@ post_creation_actions( s() ) -> ok. -drop(_Shard, DBHandle, GenId, CFRefs, #s{}) -> +drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) -> + emqx_ds_lts:destroy(Trie), {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), ok = rocksdb:drop_column_family(DBHandle, DataCF), From 86d45522e328672787f0d9dddc02c75045a582b4 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 01:01:11 +0200 Subject: [PATCH 03/39] fix(dsrepl): Don't reverse elements of batches --- .../src/emqx_ds_replication_layer.erl | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 61126c164..3ff87ab44 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -672,22 +672,15 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{db_shard := DBShard, latest := Latest} = State + #{db_shard := DBShard, latest := Latest0} = State ) -> %% NOTE %% Unique timestamp tracking real time closely. %% With microsecond granularity it should be nearly impossible for it to run %% too far ahead than the real time clock. - {NLatest, Messages} = assign_timestamps(Latest, MessagesIn), - %% TODO - %% Batch is now reversed, but it should not make a lot of difference. - %% Even if it would be in order, it's still possible to write messages far away - %% in the past, i.e. when replica catches up with the leader. Storage layer - %% currently relies on wall clock time to decide if it's safe to iterate over - %% next epoch, this is likely wrong. Ideally it should rely on consensus clock - %% time instead. + {Latest, Messages} = assign_timestamps(Latest0, MessagesIn), Result = emqx_ds_storage_layer:store_batch(DBShard, Messages, #{}), - NState = State#{latest := NLatest}, + NState = State#{latest := Latest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, {NState, Result, Effect}; @@ -730,7 +723,7 @@ assign_timestamps(Latest, [MessageIn | Rest], Acc) -> assign_timestamps(Latest + 1, Rest, [Message | Acc]) end; assign_timestamps(Latest, [], Acc) -> - {Latest, Acc}. + {Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> {TimestampUs, Message}. From 8ac9700aabf2db79ac043e8091645c312b7167ea Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 16:44:43 +0200 Subject: [PATCH 04/39] feat(ds): Add an API for DB-global variables --- .../src/emqx_ds_builtin_db_sup.erl | 1 + .../src/emqx_ds_builtin_sup.erl | 30 ++++++++++++++++++- .../src/emqx_ds_replication_layer.erl | 1 + 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index ef1600500..06e925c1b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -118,6 +118,7 @@ which_dbs() -> init({#?db_sup{db = DB}, DefaultOpts}) -> %% Spec for the top-level supervisor for the database: logger:notice("Starting DS DB ~p", [DB]), + emqx_ds_builtin_sup:clean_gvars(DB), emqx_ds_builtin_metrics:init_for_db(DB), Opts = emqx_ds_replication_layer_meta:open_db(DB, DefaultOpts), ok = start_ra_system(DB, Opts), diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl index 45e81bdc9..30b72e5a8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -23,6 +23,7 @@ %% API: -export([start_db/2, stop_db/1]). +-export([set_gvar/3, get_gvar/3, clean_gvars/1]). %% behavior callbacks: -export([init/1]). @@ -39,6 +40,13 @@ -define(top, ?MODULE). -define(databases, emqx_ds_builtin_databases_sup). +-define(gvar_tab, emqx_ds_builtin_gvar). + +-record(gvar, { + k :: {emqx_ds:db(), _Key}, + v :: _Value +}). + %%================================================================================ %% API functions %%================================================================================ @@ -61,11 +69,30 @@ stop_db(DB) -> Pid when is_pid(Pid) -> _ = supervisor:terminate_child(?databases, DB), _ = supervisor:delete_child(?databases, DB), - ok; + clean_gvars(DB); undefined -> ok end. +%% @doc Set a DB-global variable. Please don't abuse this API. +-spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok. +set_gvar(DB, Key, Val) -> + ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}). + +-spec get_gvar(emqx_ds:db(), _Key, Val) -> Val. +get_gvar(DB, Key, Default) -> + case ets:lookup(?gvar_tab, {DB, Key}) of + [#gvar{v = Val}] -> + Val; + [] -> + Default + end. + +-spec clean_gvars(emqx_ds:db()) -> ok. +clean_gvars(DB) -> + ets:match_delete(?gvar_tab, #gvar{k = {DB, '_'}, _ = '_'}), + ok. + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -96,6 +123,7 @@ init(?top) -> type => supervisor, shutdown => infinity }, + ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), %% SupFlags = #{ strategy => one_for_all, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 3ff87ab44..9f9f28676 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -363,6 +363,7 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> end, integer_to_binary(Hash). +-spec foreach_shard(emqx_ds:db(), fun((shard_id()) -> _)) -> ok. foreach_shard(DB, Fun) -> lists:foreach(Fun, list_shards(DB)). From 1ff2e02fd9755cad044b8b59b42abafc7916c26e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 02:36:33 +0200 Subject: [PATCH 05/39] feat(ds): Pass current time to the storage layer via argument --- .../src/emqx_ds_replication_layer.erl | 6 +++-- .../src/emqx_ds_storage_bitfield_lts.erl | 13 +++++----- .../src/emqx_ds_storage_layer.erl | 26 ++++++++++++------- .../src/emqx_ds_storage_reference.erl | 8 +++--- .../emqx_ds_storage_bitfield_lts_SUITE.erl | 16 ++++++++---- .../test/emqx_ds_test_helpers.erl | 26 ++++++++++++++----- 6 files changed, 61 insertions(+), 34 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 9f9f28676..c79d60d07 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -491,7 +491,7 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, ?IF_STORAGE_RUNNING( ShardId, - emqx_ds_storage_layer:next(ShardId, Iter, BatchSize) + emqx_ds_storage_layer:next(ShardId, Iter, BatchSize, emqx_ds:timestamp_us()) ). -spec do_delete_next_v4( @@ -503,7 +503,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ) -> emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()). do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> - emqx_ds_storage_layer:delete_next({DB, Shard}, Iter, Selector, BatchSize). + emqx_ds_storage_layer:delete_next( + {DB, Shard}, Iter, Selector, BatchSize, emqx_ds:timestamp_us() + ). -spec do_add_generation_v2(emqx_ds:db()) -> no_return(). do_add_generation_v2(_DB) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 80264da79..5947b2300 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -34,8 +34,8 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/4, - delete_next/5, + next/5, + delete_next/6, post_creation_actions/1 ]). @@ -354,8 +354,9 @@ update_iterator( next( Shard, Schema = #s{ts_offset = TSOffset, ts_bits = TSBits}, - It = #{?topic_filter := TF, ?storage_key := Stream}, - BatchSize + It = #{?storage_key := Stream}, + BatchSize, + Now ) -> init_counters(), %% Compute safe cutoff time. It's the point in time where the last @@ -370,7 +371,6 @@ next( SafeCutoffTime = case IsWildcard of true -> - Now = emqx_ds:timestamp_us(), (Now bsr TSOffset) bsl TSOffset; false -> %% Iterators scanning streams without varying topic @@ -415,12 +415,11 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, rocksdb:iterator_close(ITHandle) end. -delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize) -> +delete_next(Shard, Schema = #s{ts_offset = TSOffset}, It, Selector, BatchSize, Now) -> %% Compute safe cutoff time. %% It's the point in time where the last complete epoch ends, so we need to know %% the current time to compute it. init_counters(), - Now = emqx_message:timestamp_now(), SafeCutoffTime = (Now bsr TSOffset) bsl TSOffset, try delete_next_until(Schema, It, SafeCutoffTime, Selector, BatchSize) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 4981c3fc1..36dc813e5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -31,8 +31,8 @@ make_iterator/4, make_delete_iterator/4, update_iterator/3, - next/3, - delete_next/4, + next/4, + delete_next/5, %% Generations update_config/3, @@ -223,9 +223,14 @@ ) -> emqx_ds:make_delete_iterator_result(_Iterator). --callback next(shard_id(), _Data, Iter, pos_integer()) -> +-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) -> {ok, Iter, [emqx_types:message()]} | {error, _}. +-callback delete_next( + shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() +) -> + {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. + -callback post_creation_actions(post_creation_context()) -> _Data. -optional_callbacks([post_creation_actions/1]). @@ -377,13 +382,13 @@ update_iterator( {error, unrecoverable, generation_not_found} end. --spec next(shard_id(), iterator(), pos_integer()) -> +-spec next(shard_id(), iterator(), pos_integer(), emqx_ds:time()) -> emqx_ds:next_result(iterator()). -next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize) -> +next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Current = generation_current(Shard), - case Mod:next(Shard, GenData, GenIter0, BatchSize) of + case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of {ok, _GenIter, []} when GenId < Current -> %% This is a past generation. Storage layer won't write %% any more messages here. The iterator reached the end: @@ -399,18 +404,21 @@ next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, Batch {error, unrecoverable, generation_not_found} end. --spec delete_next(shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer()) -> +-spec delete_next( + shard_id(), delete_iterator(), emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() +) -> emqx_ds:delete_next_result(delete_iterator()). delete_next( Shard, Iter = #{?tag := ?DELETE_IT, ?generation := GenId, ?enc := GenIter0}, Selector, - BatchSize + BatchSize, + Now ) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> Current = generation_current(Shard), - case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize) of + case Mod:delete_next(Shard, GenData, GenIter0, Selector, BatchSize, Now) of {ok, _GenIter, _Deleted = 0, _IteratedOver = 0} when GenId < Current -> %% This is a past generation. Storage layer won't write %% any more messages here. The iterator reached the end: diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 7aa54b9f3..3caf2c732 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -37,8 +37,8 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/4, - delete_next/5 + next/5, + delete_next/6 ]). %% internal exports: @@ -154,7 +154,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) -> last_seen_message_key = DSKey }}. -next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> +next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) -> #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, {ok, ITHandle} = rocksdb:iterator(DB, CF, []), Action = @@ -170,7 +170,7 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize) -> It = It0#it{last_seen_message_key = Key}, {ok, It, lists:reverse(Messages)}. -delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize) -> +delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) -> #delete_it{ topic_filter = TopicFilter, start_time = StartTime, diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl index 78838e675..bb6d0f917 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_bitfield_lts_SUITE.erl @@ -73,13 +73,15 @@ t_iterate(_Config) -> begin [{_Rank, Stream}] = emqx_ds_storage_layer:get_streams(?SHARD, parse_topic(Topic), 0), {ok, It} = emqx_ds_storage_layer:make_iterator(?SHARD, Stream, parse_topic(Topic), 0), - {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next(?SHARD, It, 100), + {ok, NextIt, MessagesAndKeys} = emqx_ds_storage_layer:next( + ?SHARD, It, 100, emqx_ds:timestamp_us() + ), Messages = [Msg || {_DSKey, Msg} <- MessagesAndKeys], ?assertEqual( lists:map(fun integer_to_binary/1, Timestamps), payloads(Messages) ), - {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100) + {ok, _, []} = emqx_ds_storage_layer:next(?SHARD, NextIt, 100, emqx_ds:timestamp_us()) end || Topic <- Topics ], @@ -370,7 +372,7 @@ dump_stream(Shard, Stream, TopicFilter, StartTime) -> F(It, 0) -> error({too_many_iterations, It}); F(It, N) -> - case emqx_ds_storage_layer:next(Shard, It, BatchSize) of + case emqx_ds_storage_layer:next(Shard, It, BatchSize, emqx_ds:timestamp_us()) of end_of_stream -> []; {ok, _NextIt, []} -> @@ -542,7 +544,11 @@ delete(_Shard, [], _Selector) -> delete(Shard, Iterators, Selector) -> {NewIterators0, N} = lists:foldl( fun(Iterator0, {AccIterators, NAcc}) -> - case emqx_ds_storage_layer:delete_next(Shard, Iterator0, Selector, 10) of + case + emqx_ds_storage_layer:delete_next( + Shard, Iterator0, Selector, 10, emqx_ds:timestamp_us() + ) + of {ok, end_of_stream} -> {AccIterators, NAcc}; {ok, _Iterator1, 0} -> @@ -573,7 +579,7 @@ replay(_Shard, []) -> replay(Shard, Iterators) -> {NewIterators0, Messages0} = lists:foldl( fun(Iterator0, {AccIterators, AccMessages}) -> - case emqx_ds_storage_layer:next(Shard, Iterator0, 10) of + case emqx_ds_storage_layer:next(Shard, Iterator0, 10, emqx_ds:timestamp_us()) of {ok, end_of_stream} -> {AccIterators, AccMessages}; {ok, _Iterator1, []} -> diff --git a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl index be4f7bcdf..f54752230 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_test_helpers.erl @@ -85,8 +85,14 @@ consume_stream(DB, Stream, TopicFilter, StartTime) -> consume_iter(DB, It) -> consume_iter(DB, It, #{}). -consume_iter(DB, It, Opts) -> - consume_iter_with(fun emqx_ds:next/3, [DB], It, Opts). +consume_iter(DB, It0, Opts) -> + consume_iter_with( + fun(It, BatchSize) -> + emqx_ds:next(DB, It, BatchSize) + end, + It0, + Opts + ). storage_consume(ShardId, TopicFilter) -> storage_consume(ShardId, TopicFilter, 0). @@ -108,16 +114,22 @@ storage_consume_stream(ShardId, Stream, TopicFilter, StartTime) -> storage_consume_iter(ShardId, It) -> storage_consume_iter(ShardId, It, #{}). -storage_consume_iter(ShardId, It, Opts) -> - consume_iter_with(fun emqx_ds_storage_layer:next/3, [ShardId], It, Opts). +storage_consume_iter(ShardId, It0, Opts) -> + consume_iter_with( + fun(It, BatchSize) -> + emqx_ds_storage_layer:next(ShardId, It, BatchSize, emqx_ds:timestamp_us()) + end, + It0, + Opts + ). -consume_iter_with(NextFun, Args, It0, Opts) -> +consume_iter_with(NextFun, It0, Opts) -> BatchSize = maps:get(batch_size, Opts, 5), - case erlang:apply(NextFun, Args ++ [It0, BatchSize]) of + case NextFun(It0, BatchSize) of {ok, It, _Msgs = []} -> {ok, It, []}; {ok, It1, Batch} -> - {ok, It, Msgs} = consume_iter_with(NextFun, Args, It1, Opts), + {ok, It, Msgs} = consume_iter_with(NextFun, It1, Opts), {ok, It, [Msg || {_DSKey, Msg} <- Batch] ++ Msgs}; {ok, Eos = end_of_stream} -> {ok, Eos, []}; From b2a633aca13d8af9e8465cbd8f8cda22f11431e2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 23 Apr 2024 19:27:01 +0200 Subject: [PATCH 06/39] fix(ds): Use leader's clock for computing LTS safe cutoff time --- .../src/emqx_ds_builtin_sup.erl | 5 +- .../src/emqx_ds_replication_layer.erl | 50 +++++++++++++++++-- .../src/emqx_ds_replication_layer.hrl | 3 ++ .../src/emqx_ds_replication_layer_shard.erl | 1 + .../src/emqx_ds_storage_bitfield_lts.erl | 42 +++++++++++++--- .../src/emqx_ds_storage_layer.erl | 29 +++++++++-- 6 files changed, 112 insertions(+), 18 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl index 30b72e5a8..971805351 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_sup.erl @@ -77,7 +77,8 @@ stop_db(DB) -> %% @doc Set a DB-global variable. Please don't abuse this API. -spec set_gvar(emqx_ds:db(), _Key, _Val) -> ok. set_gvar(DB, Key, Val) -> - ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}). + ets:insert(?gvar_tab, #gvar{k = {DB, Key}, v = Val}), + ok. -spec get_gvar(emqx_ds:db(), _Key, Val) -> Val. get_gvar(DB, Key, Default) -> @@ -123,7 +124,7 @@ init(?top) -> type => supervisor, shutdown => infinity }, - ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), + _ = ets:new(?gvar_tab, [named_table, set, public, {keypos, #gvar.k}, {read_concurrency, true}]), %% SupFlags = #{ strategy => one_for_all, diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index c79d60d07..90a26c484 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -36,7 +36,8 @@ update_iterator/3, next/3, delete_next/4, - shard_of_message/3 + shard_of_message/3, + current_timestamp/2 ]). %% internal exports: @@ -65,6 +66,7 @@ -export([ init/1, apply/3, + tick/2, snapshot_module/0 ]). @@ -161,6 +163,8 @@ -type timestamp_us() :: non_neg_integer(). +-define(gv_timestamp(SHARD), {gv_timestamp, SHARD}). + %%================================================================================ %% API functions %%================================================================================ @@ -367,6 +371,12 @@ shard_of_message(DB, #message{from = From, topic = Topic}, SerializeBy) -> foreach_shard(DB, Fun) -> lists:foreach(Fun, list_shards(DB)). +%% @doc Messages have been replicated up to this timestamp on the +%% local server +-spec current_timestamp(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> emqx_ds:time(). +current_timestamp(DB, Shard) -> + emqx_ds_builtin_sup:get_gvar(DB, ?gv_timestamp(Shard), 0). + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -491,7 +501,9 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> ShardId = {DB, Shard}, ?IF_STORAGE_RUNNING( ShardId, - emqx_ds_storage_layer:next(ShardId, Iter, BatchSize, emqx_ds:timestamp_us()) + emqx_ds_storage_layer:next( + ShardId, Iter, BatchSize, emqx_ds_replication_layer:current_timestamp(DB, Shard) + ) ). -spec do_delete_next_v4( @@ -504,7 +516,11 @@ do_next_v1(DB, Shard, Iter, BatchSize) -> emqx_ds:delete_next_result(emqx_ds_storage_layer:delete_iterator()). do_delete_next_v4(DB, Shard, Iter, Selector, BatchSize) -> emqx_ds_storage_layer:delete_next( - {DB, Shard}, Iter, Selector, BatchSize, emqx_ds:timestamp_us() + {DB, Shard}, + Iter, + Selector, + BatchSize, + emqx_ds_replication_layer:current_timestamp(DB, Shard) ). -spec do_add_generation_v2(emqx_ds:db()) -> no_return(). @@ -675,7 +691,7 @@ apply( ?tag := ?BATCH, ?batch_messages := MessagesIn }, - #{db_shard := DBShard, latest := Latest0} = State + #{db_shard := DBShard = {DB, Shard}, latest := Latest0} = State ) -> %% NOTE %% Unique timestamp tracking real time closely. @@ -686,6 +702,7 @@ apply( NState = State#{latest := Latest}, %% TODO: Need to measure effects of changing frequency of `release_cursor`. Effect = {release_cursor, RaftIdx, NState}, + emqx_ds_builtin_sup:set_gvar(DB, ?gv_timestamp(Shard), Latest), {NState, Result, Effect}; apply( _RaftMeta, @@ -711,7 +728,20 @@ apply( #{db_shard := DBShard} = State ) -> Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), - {State, Result}. + {State, Result}; +apply( + _RaftMeta, + #{?tag := storage_event, ?payload := CustomEvent}, + #{db_shard := DBShard, latest := Latest0} = State +) -> + {Timestamp, Latest} = ensure_monotonic_timestamp(emqx_ds:timestamp_us(), Latest0), + Effects = handle_custom_event(DBShard, Timestamp, CustomEvent), + {State#{latest := Latest}, ok, Effects}. + +-spec tick(integer(), ra_state()) -> ra_machine:effects(). +tick(TimeMs, #{db_shard := DBShard, latest := Latest}) -> + {Timestamp, _} = assign_timestamp(timestamp_to_timeus(TimeMs), Latest), + handle_custom_event(DBShard, Timestamp, tick). assign_timestamps(Latest, Messages) -> assign_timestamps(Latest, Messages, []). @@ -744,3 +774,13 @@ timeus_to_timestamp(TimestampUs) -> snapshot_module() -> emqx_ds_replication_snapshot. + +handle_custom_event(DBShard, Latest, Event) -> + try + Events = emqx_ds_storage_layer:handle_event(DBShard, Latest, Event), + [{append, #{?tag => storage_event, ?payload => I}} || I <- Events] + catch + EC:Err:Stacktrace -> + logger:error(#{EC => Err, stacktrace => Stacktrace, msg => "ds_storage_layer_tick"}), + [] + end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl index 70812fa18..960824143 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.hrl @@ -41,4 +41,7 @@ %% drop_generation -define(generation, 2). +%% custom events +-define(payload, 2). + -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index e0e70596a..ac495be1c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -16,6 +16,7 @@ -module(emqx_ds_replication_layer_shard). +%% API: -export([start_link/3]). %% Static server configuration diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 5947b2300..7342b097d 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -36,7 +36,9 @@ update_iterator/4, next/5, delete_next/6, - post_creation_actions/1 + post_creation_actions/1, + + handle_event/4 ]). %% internal exports: @@ -90,7 +92,8 @@ trie :: emqx_ds_lts:trie(), keymappers :: array:array(emqx_ds_bitmask_keymapper:keymapper()), ts_bits :: non_neg_integer(), - ts_offset :: non_neg_integer() + ts_offset :: non_neg_integer(), + gvars :: ets:table() }). -type s() :: #s{}. @@ -142,6 +145,10 @@ -define(DS_LTS_COUNTERS, [?DS_LTS_SEEK_COUNTER, ?DS_LTS_NEXT_COUNTER, ?DS_LTS_COLLISION_COUNTER]). +%% GVar used for idle detection: +-define(IDLE_DETECT, idle_detect). +-define(EPOCH(S, TS), (TS bsl S#s.ts_bits)). + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. @@ -215,7 +222,8 @@ open(_Shard, DBHandle, GenId, CFRefs, Schema) -> trie = Trie, keymappers = KeymapperCache, ts_offset = TSOffsetBits, - ts_bits = TSBits + ts_bits = TSBits, + gvars = ets:new(?MODULE, [public, set, {read_concurrency, true}]) }. -spec post_creation_actions(emqx_ds_storage_layer:post_creation_context()) -> @@ -240,8 +248,9 @@ post_creation_actions( s() ) -> ok. -drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) -> +drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie, gvars = GVars}) -> emqx_ds_lts:destroy(Trie), + catch ets:delete(GVars), {_, DataCF} = lists:keyfind(data_cf(GenId), 1, CFRefs), {_, TrieCF} = lists:keyfind(trie_cf(GenId), 1, CFRefs), ok = rocksdb:drop_column_family(DBHandle, DataCF), @@ -255,18 +264,21 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{trie = Trie}) -> emqx_ds:message_store_opts() ) -> emqx_ds:store_batch_result(). -store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) -> +store_batch(_ShardId, S = #s{db = DB, data = Data, gvars = Gvars}, Messages, _Options) -> {ok, Batch} = rocksdb:batch(), - lists:foreach( - fun({Timestamp, Msg}) -> + MaxTs = lists:foldl( + fun({Timestamp, Msg}, Acc) -> {Key, _} = make_key(S, Timestamp, Msg), Val = serialize(Msg), - rocksdb:put(DB, Data, Key, Val, []) + ok = rocksdb:put(DB, Data, Key, Val, []), + max(Acc, Timestamp) end, + 0, Messages ), Result = rocksdb:write_batch(DB, Batch, []), rocksdb:release_batch(Batch), + ets:insert(Gvars, {?IDLE_DETECT, false, MaxTs}), %% NOTE %% Strictly speaking, `{error, incomplete}` is a valid result but should be impossible to %% observe until there's `{no_slowdown, true}` in write options. @@ -469,6 +481,20 @@ delete_next_until( rocksdb:iterator_close(ITHandle) end. +handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> + %% Cause replication layer to bump timestamp when idle + case ets:lookup(Gvars, ?IDLE_DETECT) of + [{?IDLE_DETECT, false, LastWrittenTs}] when + ?EPOCH(State, LastWrittenTs) > ?EPOCH(State, Time) + -> + ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}), + [emqx_ds_storage_bitfield_lts_dummy_event]; + _ -> + [] + end; +handle_event(_ShardId, _Data, _Time, _Event) -> + []. + %%================================================================================ %% Internal functions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 36dc813e5..fff3a77f3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -42,7 +42,10 @@ %% Snapshotting take_snapshot/1, - accept_snapshot/1 + accept_snapshot/1, + + %% Custom events + handle_event/3 ]). %% gen_server @@ -79,7 +82,6 @@ %% # "Record" integer keys. We use maps with integer keys to avoid persisting and sending %% records over the wire. - %% tags: -define(STREAM, 1). -define(IT, 2). @@ -201,6 +203,7 @@ -callback open(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _Schema) -> _Data. +%% Delete the schema and data -callback drop(shard_id(), rocksdb:db_handle(), gen_id(), cf_refs(), _RuntimeData) -> ok | {error, _Reason}. @@ -231,9 +234,11 @@ ) -> {ok, DeleteIterator, _NDeleted :: non_neg_integer(), _IteratedOver :: non_neg_integer()}. +-callback handle_event(shard_id(), _Data, emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. + -callback post_creation_actions(post_creation_context()) -> _Data. --optional_callbacks([post_creation_actions/1]). +-optional_callbacks([post_creation_actions/1, handle_event/4]). %%================================================================================ %% API for the replication layer @@ -857,6 +862,24 @@ handle_accept_snapshot(ShardId) -> Dir = db_dir(ShardId), emqx_ds_storage_snapshot:new_writer(Dir). +%% FIXME: currently this interface is a hack to handle safe cutoff +%% timestamp in LTS. It has many shortcomings (can lead to infinite +%% loops if the CBM is not careful; events from one generation may be +%% sent to the next one, etc.) and the API is not well thought out in +%% general. +%% +%% The mechanism of storage layer events should be refined later. +-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. +handle_event(Shard, Time, Event) -> + #{module := Mod, data := GenData} = generation_at(Shard, Time), + ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), + case erlang:function_exported(Mod, handle_event, 4) of + true -> + Mod:handle_event(Shard, GenData, Time, Event); + false -> + [] + end. + %%-------------------------------------------------------------------------------- %% Schema access %%-------------------------------------------------------------------------------- From 3642bcd1b6d5967dfa31940c1273aaf54e0ea51b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 26 Apr 2024 01:03:35 +0200 Subject: [PATCH 07/39] docs(ds): Fix comment for the builtin DS metrics --- apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl index ce984db57..763d38606 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_metrics.erl @@ -190,7 +190,7 @@ prometheus_per_db(NodeOrAggr) -> %% ... %% ''' %% -%% If `NodeOrAggr' = `node' then node name is appended to the list of +%% If `NodeOrAggr' = `aggr' then node name is appended to the list of %% labels. prometheus_per_db(NodeOrAggr, DB, Acc0) -> Labels = [ From 68c601ad72b36d81bf036fb4b99c26c11f8a0d20 Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 8 May 2024 11:08:27 +0800 Subject: [PATCH 08/39] fix(plugin): add a backup for the plugin config file --- apps/emqx_plugins/include/emqx_plugins.hrl | 2 +- apps/emqx_plugins/src/emqx_plugins.erl | 107 +++++++++++++++++-- apps/emqx_plugins/src/emqx_plugins_serde.erl | 2 +- 3 files changed, 98 insertions(+), 13 deletions(-) diff --git a/apps/emqx_plugins/include/emqx_plugins.hrl b/apps/emqx_plugins/include/emqx_plugins.hrl index 95dc50e4f..f822b9c8d 100644 --- a/apps/emqx_plugins/include/emqx_plugins.hrl +++ b/apps/emqx_plugins/include/emqx_plugins.hrl @@ -25,7 +25,7 @@ -define(CONFIG_FORMAT_MAP, config_format_map). -type schema_name() :: binary(). --type avsc() :: binary(). +-type avsc_path() :: string(). -type encoded_data() :: iodata(). -type decoded_data() :: map(). diff --git a/apps/emqx_plugins/src/emqx_plugins.erl b/apps/emqx_plugins/src/emqx_plugins.erl index 67d25bf7a..7381c01c2 100644 --- a/apps/emqx_plugins/src/emqx_plugins.erl +++ b/apps/emqx_plugins/src/emqx_plugins.erl @@ -94,6 +94,8 @@ -define(RAW_BIN, binary). -define(JSON_MAP, json_map). +-define(MAX_KEEP_BACKUP_CONFIGS, 10). + %% "my_plugin-0.1.0" -type name_vsn() :: binary() | string(). %% the parse result of the JSON info file @@ -287,7 +289,7 @@ get_config(NameVsn, #{format := ?CONFIG_FORMAT_MAP}, Default) -> %% the avro Json Map and plugin config ALWAYS be valid before calling this function. put_config(NameVsn, AvroJsonMap, _DecodedPluginConfig) -> AvroJsonBin = emqx_utils_json:encode(AvroJsonMap), - ok = write_avro_bin(NameVsn, AvroJsonBin), + ok = backup_and_write_avro_bin(NameVsn, AvroJsonBin), ok = persistent_term:put(?PLUGIN_PERSIS_CONFIG_KEY(NameVsn), AvroJsonMap), ok. @@ -1057,8 +1059,69 @@ maybe_create_config_dir(NameVsn) -> {error, {mkdir_failed, ConfigDir, Reason}} end. -write_avro_bin(NameVsn, AvroBin) -> - ok = file:write_file(avro_config_file(NameVsn), AvroBin). +%% @private Backup the current config to a file with a timestamp suffix and +%% then save the new config to the config file. +backup_and_write_avro_bin(NameVsn, AvroBin) -> + %% this may fail, but we don't care + %% e.g. read-only file system + Path = avro_config_file(NameVsn), + _ = filelib:ensure_dir(Path), + TmpFile = Path ++ ".tmp", + case file:write_file(TmpFile, AvroBin) of + ok -> + backup_and_replace(Path, TmpFile); + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_save_conf_file", + hint => + "The updated cluster config is not saved on this node, please check the file system.", + filename => TmpFile, + reason => Reason + }), + %% e.g. read-only, it's not the end of the world + ok + end. + +backup_and_replace(Path, TmpPath) -> + Backup = Path ++ "." ++ now_time() ++ ".bak", + case file:rename(Path, Backup) of + ok -> + ok = file:rename(TmpPath, Path), + ok = prune_backup_files(Path); + {error, enoent} -> + %% not created yet + ok = file:rename(TmpPath, Path); + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_backup_conf_file", + filename => Backup, + reason => Reason + }), + ok + end. + +prune_backup_files(Path) -> + Files0 = filelib:wildcard(Path ++ ".*"), + Re = "\\.[0-9]{4}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}\\.[0-9]{3}\\.bak$", + Files = lists:filter(fun(F) -> re:run(F, Re) =/= nomatch end, Files0), + Sorted = lists:reverse(lists:sort(Files)), + {_Keeps, Deletes} = lists:split(min(?MAX_KEEP_BACKUP_CONFIGS, length(Sorted)), Sorted), + lists:foreach( + fun(F) -> + case file:delete(F) of + ok -> + ok; + {error, Reason} -> + ?SLOG(warning, #{ + msg => "failed_to_delete_backup_plugin_conf_file", + filename => F, + reason => Reason + }), + ok + end + end, + Deletes + ). read_file_fun(Path, ErrMsg, #{read_mode := ?RAW_BIN}) -> fun() -> @@ -1082,30 +1145,38 @@ read_file_fun(Path, ErrMsg, #{read_mode := ?JSON_MAP}) -> end. %% Directorys +-spec plugin_dir(name_vsn()) -> string(). plugin_dir(NameVsn) -> - filename:join([install_dir(), NameVsn]). + wrap_list_path(filename:join([install_dir(), NameVsn])). +-spec plugin_config_dir(name_vsn()) -> string(). plugin_config_dir(NameVsn) -> - filename:join([plugin_dir(NameVsn), "data", "configs"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "data", "configs"])). %% Files +-spec pkg_file_path(name_vsn()) -> string(). pkg_file_path(NameVsn) -> - filename:join([install_dir(), bin([NameVsn, ".tar.gz"])]). + wrap_list_path(filename:join([install_dir(), bin([NameVsn, ".tar.gz"])])). +-spec info_file_path(name_vsn()) -> string(). info_file_path(NameVsn) -> - filename:join([plugin_dir(NameVsn), "release.json"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "release.json"])). +-spec avsc_file_path(name_vsn()) -> string(). avsc_file_path(NameVsn) -> - filename:join([plugin_dir(NameVsn), "config_schema.avsc"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "config_schema.avsc"])). +-spec avro_config_file(name_vsn()) -> string(). avro_config_file(NameVsn) -> - filename:join([plugin_config_dir(NameVsn), "config.avro"]). + wrap_list_path(filename:join([plugin_config_dir(NameVsn), "config.avro"])). +-spec i18n_file_path(name_vsn()) -> string(). i18n_file_path(NameVsn) -> - filename:join([plugin_dir(NameVsn), "config_i18n.json"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "config_i18n.json"])). +-spec readme_file(name_vsn()) -> string(). readme_file(NameVsn) -> - filename:join([plugin_dir(NameVsn), "README.md"]). + wrap_list_path(filename:join([plugin_dir(NameVsn), "README.md"])). running_apps() -> lists:map( @@ -1115,6 +1186,17 @@ running_apps() -> application:which_applications(infinity) ). +%% @private This is the same human-readable timestamp format as +%% hocon-cli generated app.