diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 57930fa72..d36d8e96f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -297,13 +297,14 @@ store_batch(Shard, Messages, Options) -> [{emqx_ds:time(), emqx_types:message()}], emqx_ds:message_store_opts() ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). -prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> +prepare_batch(Shard, Messages = [_ | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. ?tp(emqx_ds_storage_layer_prepare_batch, #{ shard => Shard, messages => Messages, options => Options }), - {GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), + GenId = generation_current(Shard), + #{module := Mod, data := GenData} = generation_get(Shard, GenId), T0 = erlang:monotonic_time(microsecond), Result = case Mod:prepare_batch(Shard, GenData, Messages, Options) of diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl index 39158c7ef..dad18f89e 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl @@ -27,25 +27,6 @@ opts() -> %% -t_idempotent_store_batch(_Config) -> - Shard = {?FUNCTION_NAME, _ShardId = <<"42">>}, - {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), - %% Push some messages to the shard. - Msgs1 = [gen_message(N) || N <- lists:seq(10, 20)], - GenTs = 30, - Msgs2 = [gen_message(N) || N <- lists:seq(40, 50)], - ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})), - %% Add new generation and push the same batch + some more. - ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, GenTs)), - ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})), - ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs2), #{})), - %% First batch should have been handled idempotently. - ?assertEqual( - Msgs1 ++ Msgs2, - lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#'])) - ), - ok = stop_shard(Pid). - t_snapshot_take_restore(_Config) -> Shard = {?FUNCTION_NAME, _ShardId = <<"42">>}, {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), @@ -77,7 +58,7 @@ t_snapshot_take_restore(_Config) -> %% Verify that the restored shard contains the messages up until the snapshot. {ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), - ?assertEqual( + snabbkaffe_diff:assert_lists_eq( Msgs1 ++ Msgs2, lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#'])) ).