diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 7d1fffbcb..2245e81c5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -301,26 +301,30 @@ store_batch(Shard, Messages, Options) -> [{emqx_ds:time(), emqx_types:message()}], emqx_ds:message_store_opts() ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). -prepare_batch(Shard, Messages = [_ | _], Options) -> +prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. ?tp(emqx_ds_storage_layer_prepare_batch, #{ shard => Shard, messages => Messages, options => Options }), - GenId = generation_current(Shard), - #{module := Mod, data := GenData} = generation_get(Shard, GenId), - T0 = erlang:monotonic_time(microsecond), - Result = - case Mod:prepare_batch(Shard, GenData, Messages, Options) of - {ok, CookedBatch} -> - {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; - Error = {error, _, _} -> - Error - end, - T1 = erlang:monotonic_time(microsecond), - %% TODO store->prepare - emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), - Result; + %% FIXME: always store messages in the current generation + case generation_at(Shard, Time) of + {GenId, #{module := Mod, data := GenData}} -> + T0 = erlang:monotonic_time(microsecond), + Result = + case Mod:prepare_batch(Shard, GenData, Messages, Options) of + {ok, CookedBatch} -> + {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; + Error = {error, _, _} -> + Error + end, + T1 = erlang:monotonic_time(microsecond), + %% TODO store->prepare + emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), + Result; + not_found -> + ignore + end; prepare_batch(_Shard, [], _Options) -> ignore. @@ -964,6 +968,25 @@ generation_current(Shard) -> #{current_generation := Current} = get_schema_runtime(Shard), Current. +%% TODO: remove me +-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()} | not_found. +generation_at(Shard, Time) -> + Schema = #{current_generation := Current} = get_schema_runtime(Shard), + generation_at(Time, Current, Schema). + +generation_at(Time, GenId, Schema) -> + case Schema of + #{?GEN_KEY(GenId) := Gen} -> + case Gen of + #{since := Since} when Time < Since andalso GenId > 0 -> + generation_at(Time, GenId - 1, Schema); + _ -> + {GenId, Gen} + end; + _ -> + not_found + end. + -spec generation_get(shard_id(), gen_id()) -> generation() | not_found. generation_get(Shard, GenId) -> case get_schema_runtime(Shard) of