diff --git a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl index 28e307832..a7cc795b6 100644 --- a/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl +++ b/apps/emqx_ds_builtin_local/src/emqx_ds_builtin_local.erl @@ -202,7 +202,7 @@ store_batch(DB, Messages, Opts) -> {error, recoverable, Reason} end. --record(bs, {options :: term()}). +-record(bs, {options :: emqx_ds:create_db_opts()}). -type buffer_state() :: #bs{}. -spec init_buffer(emqx_ds:db(), shard(), _Options) -> {ok, buffer_state()}. @@ -220,24 +220,36 @@ init_buffer(DB, Shard, Options) -> -spec flush_buffer(emqx_ds:db(), shard(), [emqx_types:message()], buffer_state()) -> {buffer_state(), emqx_ds:store_batch_result()}. flush_buffer(DB, Shard, Messages, S0 = #bs{options = Options}) -> - {Latest, Batch} = assign_timestamps(current_timestamp({DB, Shard}), Messages), - Result = emqx_ds_storage_layer:store_batch({DB, Shard}, Batch, Options), - emqx_ds_builtin_local_meta:set_current_timestamp({DB, Shard}, Latest), + ShardId = {DB, Shard}, + ForceMonotonic = maps:get(force_monotonic_timestamps, Options), + {Latest, Batch} = make_batch(ForceMonotonic, current_timestamp(ShardId), Messages), + Result = emqx_ds_storage_layer:store_batch(ShardId, Batch, _Options = #{}), + emqx_ds_builtin_local_meta:set_current_timestamp(ShardId, Latest), {S0, Result}. -assign_timestamps(Latest, Messages) -> - assign_timestamps(Latest, Messages, []). +make_batch(_ForceMonotonic = true, Latest, Messages) -> + assign_monotonic_timestamps(Latest, Messages, []); +make_batch(false, Latest, Messages) -> + assign_message_timestamps(Latest, Messages, []). -assign_timestamps(Latest, [MessageIn | Rest], Acc) -> - case emqx_message:timestamp(MessageIn, microsecond) of - TimestampUs when TimestampUs > Latest -> - Message = assign_timestamp(TimestampUs, MessageIn), - assign_timestamps(TimestampUs, Rest, [Message | Acc]); +assign_monotonic_timestamps(Latest0, [Message | Rest], Acc0) -> + case emqx_message:timestamp(Message, microsecond) of + TimestampUs when TimestampUs > Latest0 -> + Latest = TimestampUs; _Earlier -> - Message = assign_timestamp(Latest + 1, MessageIn), - assign_timestamps(Latest + 1, Rest, [Message | Acc]) - end; -assign_timestamps(Latest, [], Acc) -> + Latest = Latest0 + 1 + end, + Acc = [assign_timestamp(Latest, Message) | Acc0], + assign_monotonic_timestamps(Latest, Rest, Acc); +assign_monotonic_timestamps(Latest, [], Acc) -> + {Latest, lists:reverse(Acc)}. + +assign_message_timestamps(Latest0, [Message | Rest], Acc0) -> + TimestampUs = emqx_message:timestamp(Message, microsecond), + Latest = max(TimestampUs, Latest0), + Acc = [assign_timestamp(TimestampUs, Message) | Acc0], + assign_message_timestamps(Latest, Rest, Acc); +assign_message_timestamps(Latest, [], Acc) -> {Latest, lists:reverse(Acc)}. assign_timestamp(TimestampUs, Message) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 57d8c9234..ff157d01b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -268,7 +268,7 @@ open_db(DB, Opts = #{backend := Backend}) -> Module -> persistent_term:put(?persistent_term(DB), Module), emqx_ds_sup:register_db(DB, Backend), - ?module(DB):open_db(DB, Opts) + ?module(DB):open_db(DB, set_db_defaults(Opts)) end. -spec close_db(db()) -> ok. @@ -286,7 +286,7 @@ add_generation(DB) -> -spec update_db_config(db(), create_db_opts()) -> ok. update_db_config(DB, Opts) -> - ?module(DB):update_db_config(DB, Opts). + ?module(DB):update_db_config(DB, set_db_defaults(Opts)). -spec list_generations_with_lifetimes(db()) -> #{generation_rank() => generation_info()}. list_generations_with_lifetimes(DB) -> @@ -417,6 +417,10 @@ timestamp_us() -> %% Internal functions %%================================================================================ +set_db_defaults(Opts) -> + Defaults = #{force_monotonic_timestamps => true}, + maps:merge(Defaults, Opts). + call_if_implemented(Mod, Fun, Args, Default) -> case erlang:function_exported(Mod, Fun, length(Args)) of true ->