From 4580906405757caae438cc5260c2e0b18f651cb4 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 21 May 2024 17:40:18 +0200 Subject: [PATCH 01/15] fix(ds): Use erpc instead of gen_rpc for `delete_next' --- apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl index 62ea33c3e..a53791feb 100644 --- a/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl +++ b/apps/emqx_durable_storage/src/proto/emqx_ds_proto_v4.erl @@ -179,8 +179,7 @@ make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, StartTime) -> | {ok, end_of_stream} | {error, _}. delete_next(Node, DB, Shard, Iter, Selector, BatchSize) -> - emqx_rpc:call( - Shard, + erpc:call( Node, emqx_ds_replication_layer, do_delete_next_v4, From c6fc76e3355ab3caa5ec694440e396740a96dffe Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 18 May 2024 15:53:21 +0200 Subject: [PATCH 02/15] fix(ds): Perform read operations on the leader. --- apps/emqx_durable_storage/README.md | 2 + .../src/emqx_ds_replication_layer.erl | 98 ++++++++++++++----- .../src/emqx_ds_replication_layer_shard.erl | 48 ++++----- .../test/emqx_ds_replication_SUITE.erl | 15 ++- 4 files changed, 104 insertions(+), 59 deletions(-) diff --git a/apps/emqx_durable_storage/README.md b/apps/emqx_durable_storage/README.md index f67cc3e24..362ad47a3 100644 --- a/apps/emqx_durable_storage/README.md +++ b/apps/emqx_durable_storage/README.md @@ -124,6 +124,8 @@ The following application environment variables are available: - `emqx_durable_storage.egress_flush_interval`: period at which the batches of messages are committed to the durable storage. +- `emqx_durable_storage.reads`: `leader_preferred` | `local_preferred`. + Runtime settings for the durable storages can be modified via CLI as well as the REST API. The following CLI commands are available: diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index 9330e0b1a..f3b5fdedf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -561,12 +561,27 @@ list_nodes() -> %% Too large for normal operation, need better backpressure mechanism. -define(RA_TIMEOUT, 60 * 1000). --define(SAFERPC(EXPR), +-define(SAFE_ERPC(EXPR), try EXPR catch - error:RPCError = {erpc, _} -> - {error, recoverable, RPCError} + error:RPCError__ = {erpc, _} -> + {error, recoverable, RPCError__} + end +). + +-define(SHARD_RPC(DB, SHARD, NODE, BODY), + case + emqx_ds_replication_layer_shard:servers( + DB, SHARD, application:get_env(emqx_durable_storage, reads, leader_preferred) + ) + of + [{_, NODE} | _] -> + begin + BODY + end; + [] -> + {error, recoverable, replica_offline} end ). @@ -623,44 +638,79 @@ ra_drop_generation(DB, Shard, GenId) -> end. ra_get_streams(DB, Shard, TopicFilter, Time) -> - {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), TimestampUs = timestamp_to_timeus(Time), - ?SAFERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:get_streams(Node, DB, Shard, TopicFilter, TimestampUs)) + ). ra_get_delete_streams(DB, Shard, TopicFilter, Time) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - ?SAFERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:get_delete_streams(Node, DB, Shard, TopicFilter, Time)) + ). ra_make_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), TimeUs = timestamp_to_timeus(StartTime), - ?SAFERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:make_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)) + ). ra_make_delete_iterator(DB, Shard, Stream, TopicFilter, StartTime) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), TimeUs = timestamp_to_timeus(StartTime), - ?SAFERPC(emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC( + emqx_ds_proto_v4:make_delete_iterator(Node, DB, Shard, Stream, TopicFilter, TimeUs) + ) + ). ra_update_iterator(DB, Shard, Iter, DSKey) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - ?SAFERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:update_iterator(Node, DB, Shard, Iter, DSKey)) + ). ra_next(DB, Shard, Iter, BatchSize) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of - RPCError = {badrpc, _} -> - {error, recoverable, RPCError}; - Other -> - Other - end. + ?SHARD_RPC( + DB, + Shard, + Node, + case emqx_ds_proto_v4:next(Node, DB, Shard, Iter, BatchSize) of + Err = {badrpc, _} -> + {error, recoverable, Err}; + Ret -> + Ret + end + ). ra_delete_next(DB, Shard, Iter, Selector, BatchSize) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize). + ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:delete_next(Node, DB, Shard, Iter, Selector, BatchSize)) + ). ra_list_generations_with_lifetimes(DB, Shard) -> - {_Name, Node} = emqx_ds_replication_layer_shard:server(DB, Shard, local_preferred), - case ?SAFERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) of + Reply = ?SHARD_RPC( + DB, + Shard, + Node, + ?SAFE_ERPC(emqx_ds_proto_v4:list_generations_with_lifetimes(Node, DB, Shard)) + ), + case Reply of Gens = #{} -> maps:map( fun(_GenId, Data = #{since := Since, until := Until}) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 0bfa89e95..518e1e630 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -28,8 +28,7 @@ %% Dynamic server location API -export([ - servers/3, - server/3 + servers/3 ]). %% Membership @@ -83,16 +82,15 @@ server_name(DB, Shard, Site) -> %% --spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server(), ...] when - Order :: leader_preferred | undefined. -servers(DB, Shard, _Order = leader_preferred) -> +-spec servers(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), Order) -> [server()] when + Order :: leader_preferred | local_preferred | undefined. +servers(DB, Shard, leader_preferred) -> get_servers_leader_preferred(DB, Shard); +servers(DB, Shard, local_preferred) -> + get_servers_local_preferred(DB, Shard); servers(DB, Shard, _Order = undefined) -> get_shard_servers(DB, Shard). -server(DB, Shard, _Which = local_preferred) -> - get_server_local_preferred(DB, Shard). - get_servers_leader_preferred(DB, Shard) -> %% NOTE: Contact last known leader first, then rest of shard servers. ClusterName = get_cluster_name(DB, Shard), @@ -104,17 +102,24 @@ get_servers_leader_preferred(DB, Shard) -> get_online_servers(DB, Shard) end. -get_server_local_preferred(DB, Shard) -> - %% NOTE: Contact either local server or a random replica. +get_servers_local_preferred(DB, Shard) -> + %% Return list of servers, where the local replica (if exists) is + %% the first element. Note: result is _NOT_ shuffled. This can be + %% bad for the load balancing, but it makes results more + %% deterministic. Caller that doesn't care about that can shuffle + %% the results by itself. ClusterName = get_cluster_name(DB, Shard), case ra_leaderboard:lookup_members(ClusterName) of - Servers when is_list(Servers) -> - pick_local(Servers); undefined -> - %% TODO - %% Leader is unkonwn if there are no servers of this group on the - %% local node. We want to pick a replica in that case as well. - pick_random(get_online_servers(DB, Shard)) + Servers = get_online_servers(DB, Shard); + Servers when is_list(Servers) -> + ok + end, + case lists:keyfind(node(), 2, Servers) of + false -> + Servers; + Local when is_tuple(Local) -> + [Local | lists:delete(Local, Servers)] end. lookup_leader(DB, Shard) -> @@ -139,17 +144,6 @@ filter_online(Servers) -> is_server_online({_Name, Node}) -> Node == node() orelse lists:member(Node, nodes()). -pick_local(Servers) -> - case lists:keyfind(node(), 2, Servers) of - Local when is_tuple(Local) -> - Local; - false -> - pick_random(Servers) - end. - -pick_random(Servers) -> - lists:nth(rand:uniform(length(Servers)), Servers). - get_cluster_name(DB, Shard) -> memoize(fun cluster_name/2, [DB, Shard]). diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index 8303ff861..eacf7e301 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -479,11 +479,13 @@ t_rebalance_offline_restarts(Config) -> %% shard_server_info(Node, DB, Shard, Site, Info) -> - Server = shard_server(Node, DB, Shard, Site), - {Server, ds_repl_shard(Node, server_info, [Info, Server])}. - -shard_server(Node, DB, Shard, Site) -> - ds_repl_shard(Node, shard_server, [DB, Shard, Site]). + ?ON( + Node, + begin + Server = emqx_ds_replication_layer_shard:shard_server(DB, Shard, Site), + {Server, emqx_ds_replication_layer_shard:server_info(Info, Server)} + end + ). ds_repl_meta(Node, Fun) -> ds_repl_meta(Node, Fun, []). @@ -499,9 +501,6 @@ ds_repl_meta(Node, Fun, Args) -> error(meta_op_failed) end. -ds_repl_shard(Node, Fun, Args) -> - erpc:call(Node, emqx_ds_replication_layer_shard, Fun, Args). - shards(Node, DB) -> erpc:call(Node, emqx_ds_replication_layer_meta, shards, [DB]). From aca2d9586c0d91d81ff1714f0b4d9fe71c12f759 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 18 May 2024 15:53:53 +0200 Subject: [PATCH 03/15] fix(ds): Fix return type of drop_generation --- apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index fdba6335f..438955367 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -513,7 +513,7 @@ add_generation(ShardId, Since) -> list_generations_with_lifetimes(ShardId) -> gen_server:call(?REF(ShardId), #call_list_generations_with_lifetimes{}, infinity). --spec drop_generation(shard_id(), gen_id()) -> ok. +-spec drop_generation(shard_id(), gen_id()) -> ok | {error, _}. drop_generation(ShardId, GenId) -> gen_server:call(?REF(ShardId), #call_drop_generation{gen_id = GenId}, infinity). From 1526c527d0287ae671d5e1593196a10241988546 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sat, 18 May 2024 15:54:25 +0200 Subject: [PATCH 04/15] fix(ds): Log generation operations --- .../src/emqx_ds_replication_layer.erl | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl index f3b5fdedf..ee4e71812 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer.erl @@ -761,6 +761,14 @@ apply( #{?tag := add_generation, ?since := Since}, #{db_shard := DBShard, latest := Latest0} = State0 ) -> + ?tp( + info, + ds_replication_layer_add_generation, + #{ + shard => DBShard, + since => Since + } + ), {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:add_generation(DBShard, Timestamp), State = State0#{latest := Latest}, @@ -771,6 +779,15 @@ apply( #{?tag := update_config, ?since := Since, ?config := Opts}, #{db_shard := DBShard, latest := Latest0} = State0 ) -> + ?tp( + notice, + ds_replication_layer_update_config, + #{ + shard => DBShard, + config => Opts, + since => Since + } + ), {Timestamp, Latest} = ensure_monotonic_timestamp(Since, Latest0), Result = emqx_ds_storage_layer:update_config(DBShard, Timestamp, Opts), State = State0#{latest := Latest}, @@ -780,6 +797,14 @@ apply( #{?tag := drop_generation, ?generation := GenId}, #{db_shard := DBShard} = State ) -> + ?tp( + info, + ds_replication_layer_drop_generation, + #{ + shard => DBShard, + generation => GenId + } + ), Result = emqx_ds_storage_layer:drop_generation(DBShard, GenId), {State, Result}; apply( From e4a73f003a8f7ee5272de3f5c8fb6a9a6c4eca44 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 19 May 2024 09:20:16 +0200 Subject: [PATCH 05/15] feat(ds): Implement format_status callback Reduce volume of logs and crash reports from DS --- .../src/emqx_ds_replication_layer_egress.erl | 9 ++++++++- .../src/emqx_ds_storage_layer.erl | 20 ++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index 9201ccf04..dc76aecf0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -33,7 +33,7 @@ -export([start_link/2, store_batch/3]). %% behavior callbacks: --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). +-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: -export([]). @@ -129,6 +129,13 @@ init([DB, Shard]) -> }, {ok, S}. +format_status(#s{db = DB, shard = Shard, queue = Q}) -> + #{ + db => DB, + shard => Shard, + queue => queue:len(Q) + }. + handle_call( #enqueue_req{ messages = Msgs, diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 438955367..57930fa72 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -52,7 +52,7 @@ ]). %% gen_server --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). +-export([init/1, format_status/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: -export([db_dir/1]). @@ -586,6 +586,24 @@ init({ShardId, Options}) -> commit_metadata(S), {ok, S}. +format_status(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) -> + #{ + id => ShardId, + db => DB, + cf_refs => CFRefs, + schema => Schema, + shard => + maps:map( + fun + (?GEN_KEY(_), _Schema) -> + '...'; + (_K, Val) -> + Val + end, + Shard + ) + }. + handle_call(#call_update_config{since = Since, options = Options}, _From, S0) -> case handle_update_config(S0, Since, Options) of S = #s{} -> From 074d98a14a53c51ff94170847560ee00fc38bed6 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 19 May 2024 09:24:02 +0200 Subject: [PATCH 06/15] test(ds): Refactor ds_SUITE --- apps/emqx_durable_storage/test/emqx_ds_SUITE.erl | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 49742fdc6..eb14456cb 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -67,10 +67,16 @@ t_00_smoke_open_drop(_Config) -> %% A simple smoke test that verifies that storing the messages doesn't %% crash t_01_smoke_store(_Config) -> - DB = default, - ?assertMatch(ok, emqx_ds:open_db(DB, opts())), - Msg = message(<<"foo/bar">>, <<"foo">>, 0), - ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])). + ?check_trace( + #{timetrap => 10_000}, + begin + DB = default, + ?assertMatch(ok, emqx_ds:open_db(DB, opts())), + Msg = message(<<"foo/bar">>, <<"foo">>, 0), + ?assertMatch(ok, emqx_ds:store_batch(DB, [Msg])) + end, + [] + ). %% A simple smoke test that verifies that getting the list of streams %% doesn't crash and that iterators can be opened. From eb7c43ee9df3014ddda865e20f06b74230615508 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 19 May 2024 13:20:12 +0200 Subject: [PATCH 07/15] fix(ds): Always store messages in the current generation --- .../src/emqx_ds_storage_layer.erl | 5 +++-- .../test/emqx_ds_storage_SUITE.erl | 21 +------------------ 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 57930fa72..d36d8e96f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -297,13 +297,14 @@ store_batch(Shard, Messages, Options) -> [{emqx_ds:time(), emqx_types:message()}], emqx_ds:message_store_opts() ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). -prepare_batch(Shard, Messages = [{Time, _Msg} | _], Options) -> +prepare_batch(Shard, Messages = [_ | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. ?tp(emqx_ds_storage_layer_prepare_batch, #{ shard => Shard, messages => Messages, options => Options }), - {GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), + GenId = generation_current(Shard), + #{module := Mod, data := GenData} = generation_get(Shard, GenId), T0 = erlang:monotonic_time(microsecond), Result = case Mod:prepare_batch(Shard, GenData, Messages, Options) of diff --git a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl index 39158c7ef..dad18f89e 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_storage_SUITE.erl @@ -27,25 +27,6 @@ opts() -> %% -t_idempotent_store_batch(_Config) -> - Shard = {?FUNCTION_NAME, _ShardId = <<"42">>}, - {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), - %% Push some messages to the shard. - Msgs1 = [gen_message(N) || N <- lists:seq(10, 20)], - GenTs = 30, - Msgs2 = [gen_message(N) || N <- lists:seq(40, 50)], - ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})), - %% Add new generation and push the same batch + some more. - ?assertEqual(ok, emqx_ds_storage_layer:add_generation(Shard, GenTs)), - ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs1), #{})), - ?assertEqual(ok, emqx_ds_storage_layer:store_batch(Shard, batch(Msgs2), #{})), - %% First batch should have been handled idempotently. - ?assertEqual( - Msgs1 ++ Msgs2, - lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#'])) - ), - ok = stop_shard(Pid). - t_snapshot_take_restore(_Config) -> Shard = {?FUNCTION_NAME, _ShardId = <<"42">>}, {ok, Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), @@ -77,7 +58,7 @@ t_snapshot_take_restore(_Config) -> %% Verify that the restored shard contains the messages up until the snapshot. {ok, _Pid} = emqx_ds_storage_layer:start_link(Shard, opts()), - ?assertEqual( + snabbkaffe_diff:assert_lists_eq( Msgs1 ++ Msgs2, lists:keysort(#message.timestamp, emqx_ds_test_helpers:storage_consume(Shard, ['#'])) ). From acdae4fba38f235cf860b3801f5532bd82678498 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 19 May 2024 13:21:10 +0200 Subject: [PATCH 08/15] fix(ds): Workaround for the idempotency error when dropping gens --- .../src/emqx_ds_storage_layer.erl | 70 ++++++++++++------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index d36d8e96f..68e2f4597 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -564,6 +564,7 @@ start_link(Shard = {_, _}, Options) -> init({ShardId, Options}) -> process_flag(trap_exit, true), + ?tp(info, ds_storage_init, #{shard => ShardId}), logger:set_process_metadata(#{shard_id => ShardId, domain => [ds, storage_layer, shard]}), erase_schema_runtime(ShardId), clear_all_checkpoints(ShardId), @@ -777,18 +778,31 @@ handle_drop_generation(S0, GenId) -> } = S0, #{module := Mod, cf_refs := GenCFRefs} = GenSchema, #{?GEN_KEY(GenId) := #{data := RuntimeData}} = OldShard, - case Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) of - ok -> - CFRefs = OldCFRefs -- GenCFRefs, - Shard = maps:remove(?GEN_KEY(GenId), OldShard), - Schema = maps:remove(?GEN_KEY(GenId), OldSchema), - S = S0#s{ - cf_refs = CFRefs, - shard = Shard, - schema = Schema - }, - {ok, S} - end. + try + Mod:drop(ShardId, DB, GenId, GenCFRefs, RuntimeData) + catch + EC:Err:Stack -> + ?tp( + error, + ds_storage_layer_failed_to_drop_generation, + #{ + shard => ShardId, + EC => Err, + stacktrace => Stack, + generation => GenId, + s => format_status(S0) + } + ) + end, + CFRefs = OldCFRefs -- GenCFRefs, + Shard = maps:remove(?GEN_KEY(GenId), OldShard), + Schema = maps:remove(?GEN_KEY(GenId), OldSchema), + S = S0#s{ + cf_refs = CFRefs, + shard = Shard, + schema = Schema + }, + {ok, S}. -spec open_generation(shard_id(), rocksdb:db_handle(), cf_refs(), gen_id(), generation_schema()) -> generation(). @@ -940,14 +954,18 @@ handle_accept_snapshot(ShardId) -> %% general. %% %% The mechanism of storage layer events should be refined later. --spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> [CustomEvent]. +-spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> {gen_id(), [CustomEvent]}. handle_event(Shard, Time, Event) -> - {_GenId, #{module := Mod, data := GenData}} = generation_at(Shard, Time), - ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), - case erlang:function_exported(Mod, handle_event, 4) of - true -> - Mod:handle_event(Shard, GenData, Time, Event); - false -> + case generation_at(Shard, Time) of + {_GenId, #{module := Mod, data := GenData}} -> + ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), + case erlang:function_exported(Mod, handle_event, 4) of + true -> + Mod:handle_event(Shard, GenData, Time, Event); + false -> + [] + end; + _ -> [] end. @@ -989,12 +1007,16 @@ generation_at(Shard, Time) -> generation_at(Time, Current, Schema). generation_at(Time, GenId, Schema) -> - #{?GEN_KEY(GenId) := Gen} = Schema, - case Gen of - #{since := Since} when Time < Since andalso GenId > 0 -> - generation_at(Time, prev_generation_id(GenId), Schema); + case Schema of + #{?GEN_KEY(GenId) := Gen} -> + case Gen of + #{since := Since} when Time < Since andalso GenId > 0 -> + generation_at(Time, prev_generation_id(GenId), Schema); + _ -> + {GenId, Gen} + end; _ -> - {GenId, Gen} + not_found end. -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). From 0ff307e7895a1eef2ae12dd02ed087e59994f37e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 19 May 2024 21:19:57 +0200 Subject: [PATCH 09/15] fix(ds): Include generation ID in the storage events Make sure storage events originating from generation X are handled in the context of the same generation. --- .../src/emqx_ds_storage_layer.erl | 56 ++++++------------- 1 file changed, 17 insertions(+), 39 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 68e2f4597..e93780ba2 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -80,6 +80,10 @@ -define(stream_v2(GENERATION, INNER), [GENERATION | INNER]). -define(delete_stream(GENERATION, INNER), [GENERATION | INNER]). +%% Wrappers for the storage events: +-define(storage_event(GEN_ID, PAYLOAD), #{0 := 3333, 1 := GEN_ID, 2 := PAYLOAD}). +-define(mk_storage_event(GEN_ID, PAYLOAD), #{0 => 3333, 1 => GEN_ID, 2 => PAYLOAD}). + %%================================================================================ %% Type declarations %%================================================================================ @@ -848,10 +852,6 @@ new_generation(ShardId, DB, Schema0, Since) -> next_generation_id(GenId) -> GenId + 1. --spec prev_generation_id(gen_id()) -> gen_id(). -prev_generation_id(GenId) when GenId > 0 -> - GenId - 1. - %% @doc Commit current state of the server to both rocksdb and the persistent term -spec commit_metadata(server_state()) -> ok. commit_metadata(#s{shard_id = ShardId, schema = Schema, shard = Runtime, db = DB}) -> @@ -947,27 +947,23 @@ handle_accept_snapshot(ShardId) -> Dir = db_dir(ShardId), emqx_ds_storage_snapshot:new_writer(Dir). -%% FIXME: currently this interface is a hack to handle safe cutoff -%% timestamp in LTS. It has many shortcomings (can lead to infinite -%% loops if the CBM is not careful; events from one generation may be -%% sent to the next one, etc.) and the API is not well thought out in -%% general. -%% -%% The mechanism of storage layer events should be refined later. --spec handle_event(shard_id(), emqx_ds:time(), CustomEvent | tick) -> {gen_id(), [CustomEvent]}. -handle_event(Shard, Time, Event) -> - case generation_at(Shard, Time) of - {_GenId, #{module := Mod, data := GenData}} -> - ?tp(emqx_ds_storage_layer_event, #{mod => Mod, time => Time, event => Event}), +-spec handle_event(shard_id(), emqx_ds:time(), Event) -> [Event]. +handle_event(Shard, Time, ?storage_event(GenId, Event)) -> + case generation_get(Shard, GenId) of + not_found -> + []; + #{module := Mod, data := GenData} -> case erlang:function_exported(Mod, handle_event, 4) of true -> - Mod:handle_event(Shard, GenData, Time, Event); + NewEvents = Mod:handle_event(Shard, GenData, Time, Event), + [?mk_storage_event(GenId, E) || E <- NewEvents]; false -> [] - end; - _ -> - [] - end. + end + end; +handle_event(Shard, Time, Event) -> + GenId = generation_current(Shard), + handle_event(Shard, Time, ?mk_storage_event(GenId, Event)). %%-------------------------------------------------------------------------------- %% Schema access @@ -1001,24 +997,6 @@ generations_since(Shard, Since) -> Schema ). --spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()}. -generation_at(Shard, Time) -> - Schema = #{current_generation := Current} = get_schema_runtime(Shard), - generation_at(Time, Current, Schema). - -generation_at(Time, GenId, Schema) -> - case Schema of - #{?GEN_KEY(GenId) := Gen} -> - case Gen of - #{since := Since} when Time < Since andalso GenId > 0 -> - generation_at(Time, prev_generation_id(GenId), Schema); - _ -> - {GenId, Gen} - end; - _ -> - not_found - end. - -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). -spec get_schema_runtime(shard_id()) -> shard(). From 60edf5e9b8cf16f02657840a3289b32c960b4fb7 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Sun, 19 May 2024 23:12:46 +0200 Subject: [PATCH 10/15] fix(ds): Move responsibility of returning end_of_stream to the CBM --- .../src/emqx_ds_storage_bitfield_lts.erl | 39 +++++++++++++------ .../src/emqx_ds_storage_layer.erl | 15 +++---- .../src/emqx_ds_storage_reference.erl | 11 ++++-- 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index ebbcde17c..8acb6e529 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -35,7 +35,7 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/5, + next/6, delete_next/6, post_creation_actions/1, @@ -424,23 +424,21 @@ next( Schema = #s{ts_offset = TSOffset, ts_bits = TSBits}, It = #{?storage_key := Stream}, BatchSize, - Now + Now, + IsCurrent ) -> init_counters(), %% Compute safe cutoff time. It's the point in time where the last %% complete epoch ends, so we need to know the current time to %% compute it. This is needed because new keys can be added before %% the iterator. - IsWildcard = + %% + %% This is needed to avoid situations when the iterator advances + %% to position k1, and then a new message with k2, such that k2 < + %% k1 is inserted. k2 would be missed. + HasCutoff = case Stream of - {_StaticKey, []} -> false; - _ -> true - end, - SafeCutoffTime = - case IsWildcard of - true -> - (Now bsr TSOffset) bsl TSOffset; - false -> + {_StaticKey, []} -> %% Iterators scanning streams without varying topic %% levels can operate on incomplete epochs, since new %% matching keys for the single topic are added in @@ -450,10 +448,27 @@ next( %% filters operating on streams with varying parts: %% iterator can jump to the next topic and then it %% won't backtrack. + false; + _ -> + %% New batches are only added to the current + %% generation. We can ignore cutoff time for old + %% generations: + IsCurrent + end, + SafeCutoffTime = + case HasCutoff of + true -> + (Now bsr TSOffset) bsl TSOffset; + false -> 1 bsl TSBits - 1 end, try - next_until(Schema, It, SafeCutoffTime, BatchSize) + case next_until(Schema, It, SafeCutoffTime, BatchSize) of + {ok, _, []} when not IsCurrent -> + {ok, end_of_stream}; + Result -> + Result + end after report_counters(Shard) end. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index e93780ba2..71bf6fa6e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -248,8 +248,8 @@ ) -> emqx_ds:make_delete_iterator_result(_Iterator). --callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time()) -> - {ok, Iter, [emqx_types:message()]} | {error, _}. +-callback next(shard_id(), _Data, Iter, pos_integer(), emqx_ds:time(), _IsCurrent :: boolean()) -> + {ok, Iter, [emqx_types:message()]} | {ok, end_of_stream} | {error, _}. -callback delete_next( shard_id(), _Data, DeleteIterator, emqx_ds:delete_selector(), pos_integer(), emqx_ds:time() @@ -449,15 +449,12 @@ update_iterator( next(Shard, Iter = #{?tag := ?IT, ?generation := GenId, ?enc := GenIter0}, BatchSize, Now) -> case generation_get(Shard, GenId) of #{module := Mod, data := GenData} -> - Current = generation_current(Shard), - case Mod:next(Shard, GenData, GenIter0, BatchSize, Now) of - {ok, _GenIter, []} when GenId < Current -> - %% This is a past generation. Storage layer won't write - %% any more messages here. The iterator reached the end: - %% the stream has been fully replayed. - {ok, end_of_stream}; + IsCurrent = GenId =:= generation_current(Shard), + case Mod:next(Shard, GenData, GenIter0, BatchSize, Now, IsCurrent) of {ok, GenIter, Batch} -> {ok, Iter#{?enc := GenIter}, Batch}; + {ok, end_of_stream} -> + {ok, end_of_stream}; Error = {error, _, _} -> Error end; diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl index 10007488c..1c506390e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_reference.erl @@ -38,7 +38,7 @@ make_iterator/5, make_delete_iterator/5, update_iterator/4, - next/5, + next/6, delete_next/6 ]). @@ -148,7 +148,7 @@ update_iterator(_Shard, _Data, OldIter, DSKey) -> last_seen_message_key = DSKey }}. -next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) -> +next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now, IsCurrent) -> #it{topic_filter = TopicFilter, start_time = StartTime, last_seen_message_key = Key0} = It0, {ok, ITHandle} = rocksdb:iterator(DB, CF, []), Action = @@ -162,7 +162,12 @@ next(_Shard, #s{db = DB, cf = CF}, It0, BatchSize, _Now) -> {Key, Messages} = do_next(TopicFilter, StartTime, ITHandle, Action, BatchSize, Key0, []), rocksdb:iterator_close(ITHandle), It = It0#it{last_seen_message_key = Key}, - {ok, It, lists:reverse(Messages)}. + case Messages of + [] when not IsCurrent -> + {ok, end_of_stream}; + _ -> + {ok, It, lists:reverse(Messages)} + end. delete_next(_Shard, #s{db = DB, cf = CF}, It0, Selector, BatchSize, _Now) -> #delete_it{ From b3ded7edce091f7d8e7905feb14a3f226ed24180 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 20 May 2024 12:56:53 +0200 Subject: [PATCH 11/15] fix(ds): Fix code review remark --- .../src/emqx_ds_builtin_db_sup.erl | 28 +++++++++++ .../src/emqx_ds_replication_layer_egress.erl | 20 +++++--- .../src/emqx_ds_storage_layer.erl | 47 ++++++++++++------- 3 files changed, 71 insertions(+), 24 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl index 06e925c1b..b2a461e7a 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_builtin_db_sup.erl @@ -33,6 +33,12 @@ ]). -export([which_dbs/0, which_shards/1]). +%% Debug: +-export([ + get_egress_workers/1, + get_shard_workers/1 +]). + %% behaviour callbacks: -export([init/1]). @@ -111,6 +117,28 @@ which_dbs() -> Key = {n, l, #?db_sup{_ = '_', db = '$1'}}, gproc:select({local, names}, [{{Key, '_', '_'}, [], ['$1']}]). +%% @doc Get pids of all local egress servers for the given DB. +-spec get_egress_workers(emqx_ds:db()) -> #{_Shard => pid()}. +get_egress_workers(DB) -> + Children = supervisor:which_children(?via(#?egress_sup{db = DB})), + L = [{Shard, Child} || {Shard, Child, _, _} <- Children, is_pid(Child)], + maps:from_list(L). + +%% @doc Get pids of all local shard servers for the given DB. +-spec get_shard_workers(emqx_ds:db()) -> #{_Shard => pid()}. +get_shard_workers(DB) -> + Shards = supervisor:which_children(?via(#?shards_sup{db = DB})), + L = lists:flatmap( + fun + ({_Shard, Sup, _, _}) when is_pid(Sup) -> + [{Id, Pid} || {Id, Pid, _, _} <- supervisor:which_children(Sup), is_pid(Pid)]; + (_) -> + [] + end, + Shards + ), + maps:from_list(L). + %%================================================================================ %% behaviour callbacks %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl index dc76aecf0..1d0efca6f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_egress.erl @@ -129,12 +129,20 @@ init([DB, Shard]) -> }, {ok, S}. -format_status(#s{db = DB, shard = Shard, queue = Q}) -> - #{ - db => DB, - shard => Shard, - queue => queue:len(Q) - }. +format_status(Status) -> + maps:map( + fun + (state, #s{db = DB, shard = Shard, queue = Q}) -> + #{ + db => DB, + shard => Shard, + queue => queue:len(Q) + }; + (_, Val) -> + Val + end, + Status + ). handle_call( #enqueue_req{ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 71bf6fa6e..7d1fffbcb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -589,23 +589,16 @@ init({ShardId, Options}) -> commit_metadata(S), {ok, S}. -format_status(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) -> - #{ - id => ShardId, - db => DB, - cf_refs => CFRefs, - schema => Schema, - shard => - maps:map( - fun - (?GEN_KEY(_), _Schema) -> - '...'; - (_K, Val) -> - Val - end, - Shard - ) - }. +format_status(Status) -> + maps:map( + fun + (state, State) -> + format_state(State); + (_, Val) -> + Val + end, + Status + ). handle_call(#call_update_config{since = Since, options = Options}, _From, S0) -> case handle_update_config(S0, Since, Options) of @@ -791,7 +784,7 @@ handle_drop_generation(S0, GenId) -> EC => Err, stacktrace => Stack, generation => GenId, - s => format_status(S0) + s => format_state(S0) } ) end, @@ -994,6 +987,24 @@ generations_since(Shard, Since) -> Schema ). +format_state(#s{shard_id = ShardId, db = DB, cf_refs = CFRefs, schema = Schema, shard = Shard}) -> + #{ + id => ShardId, + db => DB, + cf_refs => CFRefs, + schema => Schema, + shard => + maps:map( + fun + (?GEN_KEY(_), _Schema) -> + '...'; + (_K, Val) -> + Val + end, + Shard + ) + }. + -define(PERSISTENT_TERM(SHARD), {emqx_ds_storage_layer, SHARD}). -spec get_schema_runtime(shard_id()) -> shard(). From 29345aaa30e7e5f931b12abd55165c2d1ca6489e Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 20 May 2024 15:53:11 +0200 Subject: [PATCH 12/15] fix(ds): Fix idle event generation in bitfield_lts layout --- apps/emqx/src/emqx_ds_schema.erl | 7 +------ .../src/emqx_ds_storage_bitfield_lts.erl | 8 +++++--- .../test/emqx_ds_replication_SUITE.erl | 2 +- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index 87f87aa7f..dc395b291 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -246,7 +246,7 @@ fields(layout_builtin_reference) -> reference, #{ 'readOnly' => true, - importance => ?IMPORTANCE_HIDDEN + importance => ?IMPORTANCE_LOW } )} ]. @@ -273,17 +273,12 @@ ds_schema(Options) -> Options ). --ifndef(TEST). -builtin_layouts() -> - [ref(layout_builtin_wildcard_optimized)]. --else. builtin_layouts() -> %% Reference layout stores everything in one stream, so it's not %% suitable for production use. However, it's very simple and %% produces a very predictabale replay order, which can be useful %% for testing and debugging: [ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)]. --endif. sc(Type, Meta) -> hoconsc:mk(Type, Meta). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 8acb6e529..7d058109e 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -161,7 +161,7 @@ %% GVar used for idle detection: -define(IDLE_DETECT, idle_detect). --define(EPOCH(S, TS), (TS bsl S#s.ts_bits)). +-define(EPOCH(S, TS), (TS bsr S#s.ts_offset)). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -458,7 +458,7 @@ next( SafeCutoffTime = case HasCutoff of true -> - (Now bsr TSOffset) bsl TSOffset; + ?EPOCH(Schema, Now) bsl TSOffset; false -> 1 bsl TSBits - 1 end, @@ -561,13 +561,15 @@ handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> LastWrittenTs = 0 end, case Latch of - false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) -> + false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) + 1 -> ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}), [dummy_event]; _ -> [] end; handle_event(_ShardId, _Data, _Time, _Event) -> + %% `dummy_event' goes here and does nothing. But it forces update + %% of `Time' in the replication layer. []. %%================================================================================ diff --git a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl index eacf7e301..941e20641 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_replication_SUITE.erl @@ -183,7 +183,7 @@ t_rebalance(Config) -> ], Stream1 = emqx_utils_stream:interleave( [ - {50, Stream0}, + {10, Stream0}, emqx_utils_stream:const(add_generation) ], false From 5c78ecba40abd90e737889ebaf8da8e4016539d2 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 21 May 2024 19:07:11 +0200 Subject: [PATCH 13/15] docs(ds): Update documentation for the storage layouts --- apps/emqx/src/emqx_ds_schema.erl | 5 ++++- rel/i18n/emqx_ds_schema.hocon | 17 +++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_ds_schema.erl b/apps/emqx/src/emqx_ds_schema.erl index dc395b291..5902bcfb7 100644 --- a/apps/emqx/src/emqx_ds_schema.erl +++ b/apps/emqx/src/emqx_ds_schema.erl @@ -246,7 +246,8 @@ fields(layout_builtin_reference) -> reference, #{ 'readOnly' => true, - importance => ?IMPORTANCE_LOW + importance => ?IMPORTANCE_LOW, + desc => ?DESC(layout_builtin_reference_type) } )} ]. @@ -257,6 +258,8 @@ desc(builtin_local_write_buffer) -> ?DESC(builtin_local_write_buffer); desc(layout_builtin_wildcard_optimized) -> ?DESC(layout_builtin_wildcard_optimized); +desc(layout_builtin_reference) -> + ?DESC(layout_builtin_reference); desc(_) -> undefined. diff --git a/rel/i18n/emqx_ds_schema.hocon b/rel/i18n/emqx_ds_schema.hocon index 11d25ebe4..65b76b6fa 100644 --- a/rel/i18n/emqx_ds_schema.hocon +++ b/rel/i18n/emqx_ds_schema.hocon @@ -90,11 +90,20 @@ wildcard_optimized_epoch_bits.desc: Time span covered by each epoch grows exponentially with the value of `epoch_bits`: - - `epoch_bits = 1`: epoch time = 1 millisecond - - `epoch_bits = 2`: 2 milliseconds + - `epoch_bits = 1`: epoch time = 2 microseconds + - `epoch_bits = 2`: 4 microseconds ... - - `epoch_bits = 10`: 1024 milliseconds - - `epoch_bits = 13`: ~8 seconds + - `epoch_bits = 20`: ~1s ...~""" +layout_builtin_reference.label: "Reference layout" +layout_builtin_reference.desc: + """~ + A simplistic layout type that stores all messages from all topics in chronological order in a single stream. + + Not recommended for production use.~""" + +layout_builtin_reference_type.label: "Layout type" +layout_builtin_reference_type.desc: "Reference layout type." + } From 59a09fb86fa4fcadd624a461f2bae2e3df04065a Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Tue, 21 May 2024 22:07:03 +0200 Subject: [PATCH 14/15] fix(ds): Apply review remarks --- .../src/emqx_ds_replication_layer_shard.erl | 6 +++--- .../src/emqx_ds_storage_bitfield_lts.erl | 13 +++++++++++++ changes/ce/fix-13072.en.md | 10 ++++++++++ 3 files changed, 26 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-13072.en.md diff --git a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl index 518e1e630..1070fbde0 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replication_layer_shard.erl @@ -115,11 +115,11 @@ get_servers_local_preferred(DB, Shard) -> Servers when is_list(Servers) -> ok end, - case lists:keyfind(node(), 2, Servers) of + case lists:keytake(node(), 2, Servers) of false -> Servers; - Local when is_tuple(Local) -> - [Local | lists:delete(Local, Servers)] + {value, Local, Rest} -> + [Local | Rest] end. lookup_leader(DB, Shard) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 7d058109e..3b62fbfdf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -553,6 +553,17 @@ delete_next_until( end. handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> + %% If the last message was published more than one epoch ago, and + %% the shard remains idle, we need to advance safety cutoff + %% interval to make sure the last epoch becomes visible to the + %% readers. + %% + %% We do so by emitting a dummy event that will be persisted by + %% the replication layer. Processing it will advance the + %% replication layer's clock. + %% + %% This operation is latched to avoid publishing events on every + %% tick. case ets:lookup(Gvars, ?IDLE_DETECT) of [{?IDLE_DETECT, Latch, LastWrittenTs}] -> ok; @@ -562,6 +573,8 @@ handle_event(_ShardId, State = #s{gvars = Gvars}, Time, tick) -> end, case Latch of false when ?EPOCH(State, Time) > ?EPOCH(State, LastWrittenTs) + 1 -> + %% Note: + 1 above delays the event by one epoch to add a + %% safety margin. ets:insert(Gvars, {?IDLE_DETECT, true, LastWrittenTs}), [dummy_event]; _ -> diff --git a/changes/ce/fix-13072.en.md b/changes/ce/fix-13072.en.md new file mode 100644 index 000000000..da4a4253f --- /dev/null +++ b/changes/ce/fix-13072.en.md @@ -0,0 +1,10 @@ +Various fixes related to the `durable_sessions` feature: + +- Add an option to execute read operations on the leader. +- `drop_generation` operation can be replayed multiple times by the replication layer, but it's not idempotent. This PR adds a workaround that avoids a crash when `drop_generation` doesn't succeed. In the future, however, we want to make `drop_generation` idempotent in a nicer way. +- Wrap storage layer events in a small structure containing the generation ID, to make sure events are handled by the same layout CBM & context that produced them. +- Fix crash when storage event arrives to the dropped generation (now removed `storage_layer:generation_at` function didn't handle the case of dropped generations). +- Implement `format_status` callback for several workers to minimize log spam +- Move the responsibility of `end_of_stream` detection to the layout CBM. Previously storage layer used a heuristic: old generations that return an empty batch won't produce more data. This was, obviously, incorrect: for example, bitfield-LTS layout MAY return empty batch while waiting for safe cutoff time. +- `reference` layout has been enabled in prod build. It could be useful for integration testing. +- Fix incorrect epoch calculation in `bitfield_lts:handle_event` callback that lead to missed safe cutoff time updates, and effectively, subscribers being unable to fetch messages until a fresh batch was published. From 6eb04f90a3c4077a9a60a0c1fdbd03ecac7fbf7b Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Wed, 22 May 2024 20:28:16 +0200 Subject: [PATCH 15/15] fix(ds): Allow to write batches to older generations --- .../src/emqx_ds_storage_layer.erl | 53 +++++++++++++------ 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 7d1fffbcb..2245e81c5 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -301,26 +301,30 @@ store_batch(Shard, Messages, Options) -> [{emqx_ds:time(), emqx_types:message()}], emqx_ds:message_store_opts() ) -> {ok, cooked_batch()} | ignore | emqx_ds:error(_). -prepare_batch(Shard, Messages = [_ | _], Options) -> +prepare_batch(Shard, Messages = [{Time, _} | _], Options) -> %% NOTE %% We assume that batches do not span generations. Callers should enforce this. ?tp(emqx_ds_storage_layer_prepare_batch, #{ shard => Shard, messages => Messages, options => Options }), - GenId = generation_current(Shard), - #{module := Mod, data := GenData} = generation_get(Shard, GenId), - T0 = erlang:monotonic_time(microsecond), - Result = - case Mod:prepare_batch(Shard, GenData, Messages, Options) of - {ok, CookedBatch} -> - {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; - Error = {error, _, _} -> - Error - end, - T1 = erlang:monotonic_time(microsecond), - %% TODO store->prepare - emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), - Result; + %% FIXME: always store messages in the current generation + case generation_at(Shard, Time) of + {GenId, #{module := Mod, data := GenData}} -> + T0 = erlang:monotonic_time(microsecond), + Result = + case Mod:prepare_batch(Shard, GenData, Messages, Options) of + {ok, CookedBatch} -> + {ok, #{?tag => ?COOKED_BATCH, ?generation => GenId, ?enc => CookedBatch}}; + Error = {error, _, _} -> + Error + end, + T1 = erlang:monotonic_time(microsecond), + %% TODO store->prepare + emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0), + Result; + not_found -> + ignore + end; prepare_batch(_Shard, [], _Options) -> ignore. @@ -964,6 +968,25 @@ generation_current(Shard) -> #{current_generation := Current} = get_schema_runtime(Shard), Current. +%% TODO: remove me +-spec generation_at(shard_id(), emqx_ds:time()) -> {gen_id(), generation()} | not_found. +generation_at(Shard, Time) -> + Schema = #{current_generation := Current} = get_schema_runtime(Shard), + generation_at(Time, Current, Schema). + +generation_at(Time, GenId, Schema) -> + case Schema of + #{?GEN_KEY(GenId) := Gen} -> + case Gen of + #{since := Since} when Time < Since andalso GenId > 0 -> + generation_at(Time, GenId - 1, Schema); + _ -> + {GenId, Gen} + end; + _ -> + not_found + end. + -spec generation_get(shard_id(), gen_id()) -> generation() | not_found. generation_get(Shard, GenId) -> case get_schema_runtime(Shard) of