From 74cb43f8b19ecf79bc20136ff594c61b00669594 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 2 Nov 2023 11:47:28 +0100 Subject: [PATCH] fix(ds): Add unique ID to the key --- apps/emqx/src/emqx_persistent_session_ds.erl | 10 ++++++++-- .../test/emqx_persistent_messages_SUITE.erl | 18 ++++++++++-------- .../test/emqx_persistent_session_SUITE.erl | 13 ++++++++----- apps/emqx_durable_storage/include/emqx_ds.hrl | 19 +++++++++++++++++++ apps/emqx_durable_storage/src/emqx_ds.erl | 1 + .../src/emqx_ds_bitmask_keymapper.erl | 5 +++-- .../src/emqx_ds_storage_bitfield_lts.erl | 15 +++++++++++---- .../src/emqx_ds_storage_layer.erl | 6 ++++-- 8 files changed, 64 insertions(+), 23 deletions(-) create mode 100644 apps/emqx_durable_storage/include/emqx_ds.hrl diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index abecb72a2..9a9e05a7a 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -115,6 +115,7 @@ session(). create(#{clientid := ClientID}, _ConnInfo, Conf) -> % TODO: expiration + ensure_timers(), ensure_session(ClientID, Conf). -spec open(clientinfo(), conninfo()) -> @@ -127,10 +128,9 @@ open(#{clientid := ClientID}, _ConnInfo) -> %% somehow isolate those idling not-yet-expired sessions into a separate process %% space, and move this call back into `emqx_cm` where it belongs. ok = emqx_cm:discard_session(ClientID), - ensure_timer(pull), - ensure_timer(get_streams), case open_session(ClientID) of Session = #{} -> + ensure_timers(), {true, Session, []}; false -> false @@ -705,6 +705,12 @@ export_record(Record, I, [Field | Rest], Acc) -> export_record(_, _, [], Acc) -> Acc. +%% TODO: find a more reliable way to perform actions that have side +%% effects. Add `CBM:init' callback to the session behavior? +ensure_timers() -> + ensure_timer(pull), + ensure_timer(get_streams). + -spec ensure_timer(pull | get_streams) -> ok. ensure_timer(Type) -> _ = emqx_utils:start_timer(100, {emqx_session, Type}), diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index db025a457..52ba090b5 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -26,9 +26,6 @@ -import(emqx_common_test_helpers, [on_exit/1]). --define(DEFAULT_KEYSPACE, default). --define(DS_SHARD_ID, <<"local">>). --define(DS_SHARD, {?DEFAULT_KEYSPACE, ?DS_SHARD_ID}). -define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). all() -> @@ -49,6 +46,7 @@ init_per_testcase(t_session_subscription_iterators = TestCase, Config) -> Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}), [{nodes, Nodes} | Config]; init_per_testcase(TestCase, Config) -> + ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), Apps = emqx_cth_suite:start( app_specs(), #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} @@ -59,9 +57,9 @@ end_per_testcase(t_session_subscription_iterators, Config) -> Nodes = ?config(nodes, Config), emqx_common_test_helpers:call_janitor(60_000), ok = emqx_cth_cluster:stop(Nodes), - ok; + end_per_testcase(common, Config); end_per_testcase(_TestCase, Config) -> - Apps = ?config(apps, Config), + Apps = proplists:get_value(apps, Config, []), emqx_common_test_helpers:call_janitor(60_000), clear_db(), emqx_cth_suite:stop(Apps), @@ -97,6 +95,7 @@ t_messages_persisted(_Config) -> Results = [emqtt:publish(CP, Topic, Payload, 1) || {Topic, Payload} <- Messages], ct:pal("Results = ~p", [Results]), + timer:sleep(2000), Persisted = consume(['#'], 0), @@ -141,6 +140,8 @@ t_messages_persisted_2(_Config) -> {ok, #{reason_code := ?RC_NO_MATCHING_SUBSCRIBERS}} = emqtt:publish(CP, T(<<"client/2/topic">>), <<"8">>, 1), + timer:sleep(2000), + Persisted = consume(['#'], 0), ct:pal("Persisted = ~p", [Persisted]), @@ -251,13 +252,14 @@ connect(Opts0 = #{}) -> {ok, _} = emqtt:connect(Client), Client. -consume(TopicFiler, StartMS) -> +consume(TopicFilter, StartMS) -> + Streams = emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFilter, StartMS), lists:flatmap( fun({_Rank, Stream}) -> - {ok, It} = emqx_ds:make_iterator(Stream, StartMS, 0), + {ok, It} = emqx_ds:make_iterator(Stream, TopicFilter, StartMS), consume(It) end, - emqx_ds:get_streams(?PERSISTENT_MESSAGE_DB, TopicFiler, StartMS) + Streams ). consume(It) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 008305671..5a14e0bc9 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -24,6 +24,8 @@ -compile(export_all). -compile(nowarn_export_all). +-define(PERSISTENT_MESSAGE_DB, emqx_persistent_message). + %%-------------------------------------------------------------------- %% SUITE boilerplate %%-------------------------------------------------------------------- @@ -131,6 +133,7 @@ get_listener_port(Type, Name) -> end_per_group(Group, Config) when Group == tcp; Group == ws; Group == quic -> ok = emqx_cth_suite:stop(?config(group_apps, Config)); end_per_group(_, _Config) -> + ok = emqx_ds:drop_db(?PERSISTENT_MESSAGE_DB), ok. init_per_testcase(TestCase, Config) -> @@ -188,7 +191,7 @@ receive_messages(Count, Msgs) -> receive_messages(Count - 1, [Msg | Msgs]); _Other -> receive_messages(Count, Msgs) - after 5000 -> + after 15000 -> Msgs end. @@ -227,11 +230,11 @@ wait_for_cm_unregister(ClientId, N) -> end. publish(Topic, Payloads) -> - publish(Topic, Payloads, false). + publish(Topic, Payloads, false, 2). -publish(Topic, Payloads, WaitForUnregister) -> +publish(Topic, Payloads, WaitForUnregister, QoS) -> Fun = fun(Client, Payload) -> - {ok, _} = emqtt:publish(Client, Topic, Payload, 2) + {ok, _} = emqtt:publish(Client, Topic, Payload, QoS) end, do_publish(Payloads, Fun, WaitForUnregister). @@ -532,7 +535,7 @@ t_publish_while_client_is_gone_qos1(Config) -> ok = emqtt:disconnect(Client1), maybe_kill_connection_process(ClientId, Config), - ok = publish(Topic, [Payload1, Payload2]), + ok = publish(Topic, [Payload1, Payload2], false, 1), {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, diff --git a/apps/emqx_durable_storage/include/emqx_ds.hrl b/apps/emqx_durable_storage/include/emqx_ds.hrl new file mode 100644 index 000000000..c9ee4b7f7 --- /dev/null +++ b/apps/emqx_durable_storage/include/emqx_ds.hrl @@ -0,0 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-ifndef(EMQX_DS_HRL_HRL). +-define(EMQX_DS_HRL_HRL, true). + +-endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 1e7f88367..27a0745bc 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -111,6 +111,7 @@ open_db(DB, Opts = #{backend := builtin}) -> emqx_ds_replication_layer:open_db(DB, Opts). %% @doc TODO: currently if one or a few shards are down, they won't be + %% deleted. -spec drop_db(db()) -> ok. drop_db(DB) -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl index ee2173000..5666b45ae 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_bitmask_keymapper.erl @@ -214,7 +214,7 @@ vector_to_key(#keymapper{scanner = [Actions | Scanner]}, [Coord | Vector]) -> bin_vector_to_key(Keymapper = #keymapper{dim_sizeof = DimSizeof, size = Size}, Binaries) -> Vec = lists:zipwith( fun(Bin, SizeOf) -> - <> = Bin, + <> = Bin, Int end, Binaries, @@ -402,7 +402,8 @@ bin_increment( Filter = #filter{size = Size, bitmask = Bitmask, bitfilter = Bitfilter, range_max = RangeMax}, KeyBin ) -> - <> = KeyBin, + %% The key may contain random suffix, skip it: + <> = KeyBin, Key1 = Key0 + 1, if Key1 band Bitmask =:= Bitfilter, Key1 =< RangeMax -> diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 129c2500e..fe198c207 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -206,7 +206,10 @@ next_until(#s{db = DB, data = CF, keymappers = Keymappers}, It, SafeCutoffTime, %% Make filter: Inequations = [ {'=', TopicIndex}, - {StartTime, '..', SafeCutoffTime - 1} + {StartTime, '..', SafeCutoffTime - 1}, + %% Unique integer: + any + %% Varying topic levels: | lists:map( fun ('+') -> @@ -337,9 +340,12 @@ make_key(#s{keymappers = KeyMappers, trie = Trie}, #message{timestamp = Timestam ]) -> binary(). make_key(KeyMapper, TopicIndex, Timestamp, Varying) -> + UniqueInteger = erlang:unique_integer([monotonic, positive]), emqx_ds_bitmask_keymapper:key_to_bitstring( KeyMapper, - emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [TopicIndex, Timestamp | Varying]) + emqx_ds_bitmask_keymapper:vector_to_key(KeyMapper, [ + TopicIndex, Timestamp, UniqueInteger | Varying + ]) ). %% TODO: don't hardcode the thresholds @@ -366,9 +372,10 @@ make_keymapper(TopicIndexBytes, BitsPerTopicLevel, TSBits, TSOffsetBits, N) -> %% Dimension Offset Bitsize [{1, 0, TopicIndexBytes * ?BYTE_SIZE}, %% Topic index {2, TSOffsetBits, TSBits - TSOffsetBits }] ++ %% Timestamp epoch - [{2 + I, 0, BitsPerTopicLevel } %% Varying topic levels + [{3 + I, 0, BitsPerTopicLevel } %% Varying topic levels || I <- lists:seq(1, N)] ++ - [{2, 0, TSOffsetBits }], %% Timestamp offset + [{2, 0, TSOffsetBits }, %% Timestamp offset + {3, 0, 64 }], %% Unique integer Keymapper = emqx_ds_bitmask_keymapper:make_keymapper(lists:reverse(Bitsources)), %% Assert: case emqx_ds_bitmask_keymapper:bitsize(Keymapper) rem 8 of diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 4140c0ed7..57af33d61 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -24,6 +24,8 @@ -export([start_link/2, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). %% internal exports: +-export([db_dir/1]). + -export_type([gen_id/0, generation/0, cf_refs/0, stream/0, iterator/0]). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -132,7 +134,7 @@ open_shard(Shard, Options) -> -spec drop_shard(shard_id()) -> ok. drop_shard(Shard) -> - emqx_ds_storage_layer_sup:stop_shard(Shard), + catch emqx_ds_storage_layer_sup:stop_shard(Shard), ok = rocksdb:destroy(db_dir(Shard), []). -spec store_batch(shard_id(), [emqx_types:message()], emqx_ds:message_store_opts()) -> @@ -368,7 +370,7 @@ rocksdb_open(Shard, Options) -> -spec db_dir(shard_id()) -> file:filename(). db_dir({DB, ShardId}) -> - filename:join(["data", atom_to_list(DB), atom_to_list(ShardId)]). + filename:join([emqx:data_dir(), atom_to_list(DB), atom_to_list(ShardId)]). %%-------------------------------------------------------------------------------- %% Schema access