diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 518e1e630..1070fbde0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -115,11 +115,11 @@ get_servers_local_preferred(DB, Shard) -> Servers when is_list(Servers) -> ok end, - case lists:keyfind(node(), 2, Servers) of + case lists:keytake(node(), 2, Servers) of false -> Servers; - Local when is_tuple(Local) -> - [Local | lists:delete(Local, Servers)] + {value, Local, Rest} -> + [Local | Rest] end. lookup_leader(DB, Shard) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 7d058109e..3b62fbfdf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -553,6 +553,17 @@ delete_next_until( end. handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> + %% If the last message was published more than one epoch ago, and + %% the shard remains idle, we need to advance safety cutoff + %% interval to make sure the last epoch becomes visible to the + %% readers. + %% + %% We do so by emitting a dummy event that will be persisted by + %% the replication layer. Processing it will advance the + %% replication layer's clock. + %% + %% This operation is latched to avoid publishing events on every + %% tick. case ets:lookup(Gvars, ?IDLE_DETECT) of [{?IDLE_DETECT, Latch, LastWrittenTs}] -> ok; @@ -562,6 +573,8 @@ handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> end, case Latch of false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) + 1 -> + %% Note: + 1 above delays the event by one epoch to add a + %% safety margin. ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}), [dummy_event]; _ -> diff --git a/changes/ce/fix-13072.en.md b/changes/ce/fix-13072.en.md new file mode 100644 index 000000000..da4a4253f --- /dev/null +++ b/changes/ce/fix-13072.en.md @@ -0,0 +1,10 @@ +Various fixes related to the `durable_sessions` feature: + +- Add an option to execute read operations on the leader. +- `drop_generation` operation can be replayed multiple times by the replication layer, but it's not idempotent. This PR adds a workaround that avoids a crash when `drop_generation` doesn't succeed. In the future, however, we want to make `drop_generation` idempotent in a nicer way. +- Wrap storage layer events in a small structure containing the generation ID, to make sure events are handled by the same layout CBM & context that produced them. +- Fix crash when storage event arrives to the dropped generation (now removed `storage_layer:generation_at` function didn't handle the case of dropped generations). +- Implement `format_status` callback for several workers to minimize log spam +- Move the responsibility of `end_of_stream` detection to the layout CBM. Previously storage layer used a heuristic: old generations that return an empty batch won't produce more data. This was, obviously, incorrect: for example, bitfield-LTS layout MAY return empty batch while waiting for safe cutoff time. +- `reference` layout has been enabled in prod build. It could be useful for integration testing. +- Fix incorrect epoch calculation in `bitfield_lts:handle_event` callback that lead to missed safe cutoff time updates, and effectively, subscribers being unable to fetch messages until a fresh batch was published.