fix(ds): Always store messages in the current generation

This commit is contained in:
ieQu1 2024-05-19 13:20:12 +02:00
parent 074d98a14a
commit eb7c43ee9d
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
2 changed files with 4 additions and 22 deletions

View File

@ -297,13 +297,14 @@ store_batch(Shard, Messages, Options) ->
[{emqx_ds:time(), emqx_types:message()}],
emqx_ds:message_store_opts()
) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_).
prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
prepare_batch(Shard, Messages = [_ | _], Options) ->
%% NOTE
%% We assume that batches do not span generations. Callers should enforce this.
?tp(emqx_ds_storage_layer_prepare_batch, #{
shard => Shard, messages => Messages, options => Options
}),
{GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time),
GenId = generation_current(Shard),
#{module := Mod, data := GenData} = generation_get(Shard, GenId),
T0 = erlang:monotonic_time(microsecond),
Result =
case Mod:prepare_batch(Shard, GenData, Messages, Options) of

View File

@ -27,25 +27,6 @@ opts() ->
%%
t_idempotent_store_batch(_Config) ->
Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
{ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
%% Push some messages to the shard.
Msgs1 = [gen_message(N) || N <- lists:seq(10, 20)],
GenTs = 30,
Msgs2 = [gen_message(N) || N <- lists:seq(40, 50)],
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
%% Add new generation and push the same batch + some more.
?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, GenTs)),
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})),
?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs2), #{})),
%% First batch should have been handled idempotently.
?assertEqual(
Msgs1 ++ Msgs2,
lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
),
ok = stop_shard(Pid).
t_snapshot_take_restore(_Config) ->
Shard = {?FUNCTION_NAME, _ShardId = <<"42">>},
{ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
@ -77,7 +58,7 @@ t_snapshot_take_restore(_Config) ->
%% Verify that the restored shard contains the messages up until the snapshot.
{ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()),
?assertEqual(
snabbkaffe_diff:assert_lists_eq(
Msgs1 ++ Msgs2,
lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#']))
).