From ca697a4e14343ce30cb78b84800451fc8a7c845d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 27 Jul 2023 11:36:29 -0300 Subject: [PATCH 01/23] fix: rename `emqx_ds{,_replay}:replay_id()` --- .../src/emqx_ds_message_storage_bitmask.erl | 8 ++++---- .../src/emqx_ds_storage_layer.erl | 12 ++++++------ .../props/emqx_ds_message_storage_bitmask_shim.erl | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 57608e5cb..74a50c302 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -277,13 +277,13 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). --spec make_iterator(db(), emqx_ds:replay()) -> +-spec make_iterator(db(), emqx_ds_replay:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> Options = emqx_ds_conf:shard_iteration_options(DB#db.shard), make_iterator(DB, Replay, Options). --spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> +-spec make_iterator(db(), emqx_ds_replay:replay(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. @@ -337,7 +337,7 @@ preserve_iterator(#it{cursor = Cursor}) -> }, term_to_binary(State). --spec restore_iterator(db(), emqx_ds:replay(), binary()) -> +-spec restore_iterator(db(), emqx_ds_replay:replay(), binary()) -> {ok, iterator()} | {error, _TODO}. restore_iterator(DB, Replay, Serial) when is_binary(Serial) -> State = binary_to_term(Serial), @@ -419,7 +419,7 @@ hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). --spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter(). +-spec make_keyspace_filter(emqx_ds_replay:replay(), keymapper()) -> keyspace_filter(). make_keyspace_filter({TopicFilter, StartTime}, Keymapper) -> Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 017423b02..7c73dafaf 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -63,7 +63,7 @@ -record(it, { shard :: emqx_ds:shard(), gen :: gen_id(), - replay :: emqx_ds:replay(), + replay :: emqx_ds_replay:replay(), module :: module(), data :: term() }). @@ -104,10 +104,10 @@ -callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) -> ok | {error, _}. --callback make_iterator(_Schema, emqx_ds:replay()) -> +-callback make_iterator(_Schema, emqx_ds_replay:replay()) -> {ok, _It} | {error, _}. --callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}. +-callback restore_iterator(_Schema, emqx_ds_replay:replay(), binary()) -> {ok, _It} | {error, _}. -callback preserve_iterator(_Schema, _It) -> term(). @@ -132,7 +132,7 @@ store(Shard, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), Mod:store(Data, GUID, Time, Topic, Msg). --spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) -> +-spec make_iterator(emqx_ds:shard(), emqx_ds_replay:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(Shard, Replay = {_, StartTime}) -> {GenId, Gen} = meta_lookup_gen(Shard, StartTime), @@ -160,12 +160,12 @@ next(It = #it{module = Mod, data = ItData}) -> end end. --spec preserve_iterator(iterator(), emqx_ds:replay_id()) -> +-spec preserve_iterator(iterator(), emqx_ds_replay:replay_id()) -> ok | {error, _TODO}. preserve_iterator(It = #it{}, ReplayID) -> iterator_put_state(ReplayID, It). --spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> +-spec restore_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> {ok, iterator()} | {error, _TODO}. restore_iterator(Shard, ReplayID) -> case iterator_get_state(Shard, ReplayID) of diff --git a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl index 59668ca01..bbe16f518 100644 --- a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl @@ -29,7 +29,7 @@ store(Tab, MessageID, PublishedAt, Topic, Payload) -> true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), ok. --spec iterate(t(), emqx_ds:replay()) -> +-spec iterate(t(), emqx_ds_replay:replay()) -> [binary()]. iterate(Tab, {TopicFilter, StartTime}) -> ets:foldr( From 9463e271c07e5e1057dd289751e15d1b720f4014 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 25 Jul 2023 17:09:44 -0300 Subject: [PATCH 02/23] feat(ds): open iterators when handling `SUBSCRIBE` packets Fixes https://emqx.atlassian.net/browse/EMQX-9741 --- apps/emqx/include/emqx.hrl | 3 +- apps/emqx/include/emqx_session.hrl | 57 +++++ apps/emqx/priv/bpapi.versions | 1 + apps/emqx/src/emqx_cm.erl | 12 +- apps/emqx/src/emqx_persistent_session_ds.erl | 49 ++++- apps/emqx/src/emqx_session.erl | 62 ++---- apps/emqx/src/emqx_session_router.erl | 1 + .../emqx_persistent_session.erl | 16 +- .../emqx_persistent_session.hrl | 2 + .../emqx_persistent_session_gc.erl | 1 + apps/emqx/src/proto/emqx_ds_proto_v1.erl | 49 +++++ apps/emqx/test/emqx_cth_cluster.erl | 5 +- .../test/emqx_persistent_messages_SUITE.erl | 194 ++++++++++++++++-- .../test/emqx_persistent_session_SUITE.erl | 22 ++ apps/emqx/test/emqx_proper_types.erl | 45 ++-- apps/emqx_durable_storage/src/emqx_ds.erl | 62 +++++- apps/emqx_durable_storage/src/emqx_ds_int.hrl | 2 +- .../src/emqx_ds_replay.erl | 2 +- .../src/emqx_ds_storage_layer.erl | 58 +++++- .../src/emqx_durable_storage.app.src | 2 +- .../emqx_ds_message_storage_bitmask_shim.erl | 2 + .../src/emqx_mgmt_api_clients.erl | 2 +- 22 files changed, 537 insertions(+), 112 deletions(-) create mode 100644 apps/emqx/include/emqx_session.hrl create mode 100644 apps/emqx/src/proto/emqx_ds_proto_v1.erl diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index ac9d297de..e0d1685e8 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -23,7 +23,6 @@ -define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(CM_SHARD, emqx_cm_shard). -define(ROUTE_SHARD, route_shard). --define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard). %% Banner %%-------------------------------------------------------------------- @@ -92,7 +91,7 @@ -record(route, { topic :: binary(), - dest :: node() | {binary(), node()} | emqx_session:sessionID() + dest :: node() | {binary(), node()} | emqx_session:session_id() }). %%-------------------------------------------------------------------- diff --git a/apps/emqx/include/emqx_session.hrl b/apps/emqx/include/emqx_session.hrl new file mode 100644 index 000000000..fba4cf911 --- /dev/null +++ b/apps/emqx/include/emqx_session.hrl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_SESSION_HRL). +-define(EMQX_SESSION_HRL, true). + +-record(session, { + %% Client's id + clientid :: emqx_types:clientid(), + id :: emqx_session:session_id(), + %% Is this session a persistent session i.e. was it started with Session-Expiry > 0 + is_persistent :: boolean(), + %% Client’s Subscriptions. + subscriptions :: map(), + %% Max subscriptions allowed + max_subscriptions :: non_neg_integer() | infinity, + %% Upgrade QoS? + upgrade_qos :: boolean(), + %% Client <- Broker: QoS1/2 messages sent to the client but + %% have not been unacked. + inflight :: emqx_inflight:inflight(), + %% All QoS1/2 messages published to when client is disconnected, + %% or QoS1/2 messages pending transmission to the Client. + %% + %% Optionally, QoS0 messages pending transmission to the Client. + mqueue :: emqx_mqueue:mqueue(), + %% Next packet id of the session + next_pkt_id = 1 :: emqx_types:packet_id(), + %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) + retry_interval :: timeout(), + %% Client -> Broker: QoS2 messages received from the client, but + %% have not been completely acknowledged + awaiting_rel :: map(), + %% Maximum number of awaiting QoS2 messages allowed + max_awaiting_rel :: non_neg_integer() | infinity, + %% Awaiting PUBREL Timeout (Unit: millisecond) + await_rel_timeout :: timeout(), + %% Created at + created_at :: pos_integer(), + %% Durable storage iterators for existing subscriptions + iterators = [] :: [emqx_ds_replay:replay_id()] +}). + +-endif. diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index e13f60654..b6a4c6e7a 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -15,6 +15,7 @@ {emqx_conf,3}. {emqx_dashboard,1}. {emqx_delayed,1}. +{emqx_ds,1}. {emqx_eviction_agent,1}. {emqx_exhook,1}. {emqx_ft_storage_exporter_fs,1}. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c680560fb..b98222959 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("emqx_cm.hrl"). +-include("emqx_session.hrl"). -include("logger.hrl"). -include("types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -301,7 +302,16 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> create_session(ClientInfo, ConnInfo) -> Options = get_session_confs(ClientInfo, ConnInfo), - Session = emqx_session:init(Options), + #{clientid := ClientID} = ClientInfo, + Session0 = emqx_session:init(Options), + IteratorIDs = + case emqx_persistent_session_ds:open_session(ClientID) of + {skipped, disabled} -> + []; + {_IsNew, _DSSessionID, Iterators0} -> + Iterators0 + end, + Session = Session0#session{iterators = IteratorIDs}, ok = emqx_metrics:inc('session.created'), ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 27b4f0950..7e9db4707 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -16,15 +16,24 @@ -module(emqx_persistent_session_ds). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + -export([init/0]). --export([persist_message/1]). +-export([ + persist_message/1, + open_session/1, + add_subscription/2 +]). -export([ serialize_message/1, deserialize_message/1 ]). +%% RPC +-export([do_open_iterator/3]). + %% FIXME -define(DS_SHARD, <<"local">>). @@ -72,6 +81,44 @@ store_message(Msg) -> find_subscribers(_Msg) -> [node()]. +open_session(ClientID) -> + ?WHEN_ENABLED(emqx_ds:session_open(ClientID)). + +-spec add_subscription(emqx_types:topic(), emqx_ds:session_id()) -> + {ok, emqx_ds:iterator_id(), _IsNew :: boolean()} | {skipped, disabled}. +add_subscription(TopicFilterBin, DSSessionID) -> + ?WHEN_ENABLED( + begin + TopicFilter = emqx_topic:words(TopicFilterBin), + {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( + DSSessionID, TopicFilter + ), + case IsNew of + true -> + ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID); + false -> + ok + end, + {ok, IteratorID, IsNew} + end + ). + +-spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. +open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> + Nodes = emqx:running_nodes(), + Results = emqx_ds_proto_v1:open_iterator(Nodes, TopicFilter, StartMS, IteratorID), + %% TODO: handle errors + true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), + ok. + +-spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. +do_open_iterator(TopicFilter, StartMS, IteratorID) -> + Replay = {TopicFilter, StartMS}, + %% FIXME: choose DS shard based on ...? + {ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay), + ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), + ok. + %% serialize_message(Msg) -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index d838e95d0..8f1933fc1 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -44,6 +44,7 @@ -module(emqx_session). -include("emqx.hrl"). +-include("emqx_session.hrl"). -include("emqx_mqtt.hrl"). -include("logger.hrl"). -include("types.hrl"). @@ -101,49 +102,13 @@ %% Export for CT -export([set_field/3]). --type sessionID() :: emqx_guid:guid(). +-type session_id() :: emqx_guid:guid(). -export_type([ session/0, - sessionID/0 + session_id/0 ]). --record(session, { - %% Client's id - clientid :: emqx_types:clientid(), - id :: sessionID(), - %% Is this session a persistent session i.e. was it started with Session-Expiry > 0 - is_persistent :: boolean(), - %% Client’s Subscriptions. - subscriptions :: map(), - %% Max subscriptions allowed - max_subscriptions :: non_neg_integer() | infinity, - %% Upgrade QoS? - upgrade_qos :: boolean(), - %% Client <- Broker: QoS1/2 messages sent to the client but - %% have not been unacked. - inflight :: emqx_inflight:inflight(), - %% All QoS1/2 messages published to when client is disconnected, - %% or QoS1/2 messages pending transmission to the Client. - %% - %% Optionally, QoS0 messages pending transmission to the Client. - mqueue :: emqx_mqueue:mqueue(), - %% Next packet id of the session - next_pkt_id = 1 :: emqx_types:packet_id(), - %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) - retry_interval :: timeout(), - %% Client -> Broker: QoS2 messages received from the client, but - %% have not been completely acknowledged - awaiting_rel :: map(), - %% Maximum number of awaiting QoS2 messages allowed - max_awaiting_rel :: non_neg_integer() | infinity, - %% Awaiting PUBREL Timeout (Unit: millisecond) - await_rel_timeout :: timeout(), - %% Created at - created_at :: pos_integer() - %% Message deliver latency stats -}). - -type inflight_data_phase() :: wait_ack | wait_comp. -record(inflight_data, { @@ -297,7 +262,9 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt. + CreatedAt; +info(iterators, #session{iterators = IteratorIds}) -> + IteratorIds. %% @doc Get stats of the session. -spec stats(session()) -> emqx_types:stats(). @@ -324,11 +291,13 @@ subscribe( case IsNew andalso is_subscriptions_full(Session) of false -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts), + Session1 = Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}, + Session2 = add_persistent_subscription(TopicFilter, ClientId, Session1), ok = emqx_hooks:run( 'session.subscribed', [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}] ), - {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}}; + {ok, Session2}; true -> {error, ?RC_QUOTA_EXCEEDED} end. @@ -341,6 +310,19 @@ is_subscriptions_full(#session{ }) -> maps:size(Subs) >= MaxLimit. +-spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> + session(). +add_persistent_subscription(TopicFilterBin, ClientId, Session) -> + case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of + {skipped, disabled} -> + Session; + {ok, IteratorID, _IsNew = true} -> + Iterators = Session#session.iterators, + Session#session{iterators = [IteratorID | Iterators]}; + {ok, _IteratorID, _IsNew = false} -> + Session + end. + %%-------------------------------------------------------------------- %% Client -> Broker: UNSUBSCRIBE %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 1567f9e62..25484bdf0 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include("persistent_session/emqx_persistent_session.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.erl b/apps/emqx/src/persistent_session/emqx_persistent_session.erl index 111154571..d85e13d67 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.erl @@ -115,10 +115,10 @@ storage_backend() -> %% Session message ADT API %%-------------------------------------------------------------------- --spec session_message_info('timestamp' | 'sessionID', sess_msg_key()) -> term(). +-spec session_message_info('timestamp' | 'session_id', sess_msg_key()) -> term(). session_message_info(timestamp, {_, <<>>, <>, ?ABANDONED}) -> TS; session_message_info(timestamp, {_, GUID, _, _}) -> emqx_guid:timestamp(GUID); -session_message_info(sessionID, {SessionID, _, _, _}) -> SessionID. +session_message_info(session_id, {SessionID, _, _, _}) -> SessionID. %%-------------------------------------------------------------------- %% DB API @@ -243,7 +243,7 @@ discard_opt(true, ClientID, Session) -> emqx_session_router:delete_routes(SessionID, Subscriptions), emqx_session:set_field(is_persistent, false, Session). --spec mark_resume_begin(emqx_session:sessionID()) -> emqx_guid:guid(). +-spec mark_resume_begin(emqx_session:session_id()) -> emqx_guid:guid(). mark_resume_begin(SessionID) -> MarkerID = emqx_guid:gen(), put_session_message({SessionID, MarkerID, <<>>, ?MARKER}), @@ -396,12 +396,12 @@ do_mark_as_delivered(SessionID, [{deliver, STopic, Msg} | Left]) -> do_mark_as_delivered(_SessionID, []) -> ok. --spec pending(emqx_session:sessionID()) -> +-spec pending(emqx_session:session_id()) -> [{emqx_types:message(), STopic :: binary()}]. pending(SessionID) -> pending_messages_in_db(SessionID, []). --spec pending(emqx_session:sessionID(), MarkerIDs :: [emqx_guid:guid()]) -> +-spec pending(emqx_session:session_id(), MarkerIDs :: [emqx_guid:guid()]) -> [{emqx_types:message(), STopic :: binary()}]. pending(SessionID, MarkerIds) -> %% TODO: Handle lost MarkerIDs @@ -460,8 +460,8 @@ read_pending_msgs([], Acc) -> lists:reverse(Acc). %% The keys are ordered by -%% {sessionID(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired). -%% {sessionID(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER} +%% {session_id(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired). +%% {session_id(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER} %% where %% <<>> < emqx_guid:guid() %% <<>> < bin_timestamp() @@ -491,7 +491,7 @@ pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, Mar false -> pending_messages(Key, Acc, MarkerIds); true -> pending_messages(Key, [{PrevMsgId, PrevSTopic} | Acc], MarkerIds) end; - %% Next sessionID or '$end_of_table' + %% Next session_id or '$end_of_table' _What -> case PrevTag =:= ?UNDELIVERED of false -> {lists:reverse(Acc), MarkerIds}; diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session.hrl b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl index eb4224116..5476d8daf 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session.hrl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session.hrl @@ -14,6 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- +-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard). + -record(session_store, { client_id :: binary(), expiry_interval :: non_neg_integer(), diff --git a/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl b/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl index a4c4e5422..4aa59cdef 100644 --- a/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl +++ b/apps/emqx/src/persistent_session/emqx_persistent_session_gc.erl @@ -56,6 +56,7 @@ start_link() -> init([]) -> process_flag(trap_exit, true), + mria_rlog:ensure_shard(?PERSISTENT_SESSION_SHARD), {ok, start_message_gc_timer(start_session_gc_timer(#{}))}. handle_call(_Request, _From, State) -> diff --git a/apps/emqx/src/proto/emqx_ds_proto_v1.erl b/apps/emqx/src/proto/emqx_ds_proto_v1.erl new file mode 100644 index 000000000..2283b7e4e --- /dev/null +++ b/apps/emqx/src/proto/emqx_ds_proto_v1.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ds_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + open_iterator/4 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 30_000). + +introduced_in() -> + %% FIXME + "5.3.0". + +-spec open_iterator( + [node()], + emqx_topic:words(), + emqx_ds:time(), + emqx_ds:iterator_id() +) -> + emqx_rpc:erpc_multicall(ok). +open_iterator(Nodes, TopicFilter, StartMS, IteratorID) -> + erpc:multicall( + Nodes, + emqx_persistent_session_ds, + do_open_iterator, + [TopicFilter, StartMS, IteratorID], + ?TIMEOUT + ). diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 5e8bd4103..ddcb3234c 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -20,6 +20,7 @@ -export([stop/1]). -export([share_load_module/2]). +-export([node_name/1]). -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). @@ -83,7 +84,7 @@ when }. start(Nodes, ClusterOpts) -> NodeSpecs = mk_nodespecs(Nodes, ClusterOpts), - ct:pal("Starting cluster: ~p", [NodeSpecs]), + ct:pal("Starting cluster:\n ~p", [NodeSpecs]), % 1. Start bare nodes with only basic applications running _ = emqx_utils:pmap(fun start_node_init/1, NodeSpecs, ?TIMEOUT_NODE_START_MS), % 2. Start applications needed to enable clustering @@ -237,6 +238,8 @@ default_appspec(emqx_conf, Spec, _NodeSpecs) -> listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec) } }; +default_appspec(emqx, Spec = #{listeners := true}, _NodeSpecs) -> + #{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}}; default_appspec(_App, _, _) -> #{}. diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index b818e3fec..62702b3bc 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -17,6 +17,8 @@ -module(emqx_persistent_messages_SUITE). -include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -compile(export_all). -compile(nowarn_export_all). @@ -24,25 +26,38 @@ -define(NOW, (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}])) ). +-define(DS_SHARD, <<"local">>). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(emqx_durable_storage), - ok = emqx_common_test_helpers:start_apps([], fun - (emqx) -> - emqx_common_test_helpers:boot_modules(all), - emqx_config:init_load(emqx_schema, <<"persistent_session_store.ds = true">>), - emqx_app:set_config_loader(?MODULE); - (_) -> - ok - end), + %% avoid inter-suite flakiness... + application:stop(emqx), + application:stop(emqx_durable_storage), + TCApps = emqx_cth_suite:start( + app_specs(), + #{work_dir => ?config(priv_dir, Config)} + ), + [{tc_apps, TCApps} | Config]. + +end_per_suite(Config) -> + TCApps = ?config(tc_apps, Config), + emqx_cth_suite:stop(TCApps), + ok. + +init_per_testcase(t_session_subscription_iterators, Config) -> + Cluster = cluster(), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}), + [{nodes, Nodes} | Config]; +init_per_testcase(_TestCase, Config) -> Config. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]), - application:stop(emqx_durable_storage), +end_per_testcase(t_session_subscription_iterators, Config) -> + Nodes = ?config(nodes, Config), + ok = emqx_cth_cluster:stop(Nodes), + ok; +end_per_testcase(_TestCase, _Config) -> ok. t_messages_persisted(_Config) -> @@ -76,7 +91,7 @@ t_messages_persisted(_Config) -> ct:pal("Results = ~p", [Results]), - Persisted = consume(<<"local">>, {['#'], 0}), + Persisted = consume(?DS_SHARD, {['#'], 0}), ct:pal("Persisted = ~p", [Persisted]), @@ -88,6 +103,98 @@ t_messages_persisted(_Config) -> ok. +%% TODO: test quic and ws too +t_session_subscription_iterators(Config) -> + [Node1, Node2] = ?config(nodes, Config), + Port = get_mqtt_port(Node1, tcp), + Topic = <<"t/topic">>, + SubTopicFilter = <<"t/+">>, + AnotherTopic = <<"u/another-topic">>, + ClientId = <<"myclientid">>, + ?check_trace( + begin + [ + Payload1, + Payload2, + Payload3, + Payload4 + ] = lists:map( + fun(N) -> <<"hello", (integer_to_binary(N))/binary>> end, + lists:seq(1, 4) + ), + ct:pal("starting"), + {ok, Client} = emqtt:start_link([ + {port, Port}, + {clientid, ClientId}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client), + ct:pal("publishing 1"), + Message1 = emqx_message:make(Topic, Payload1), + publish(Node1, Message1), + receive_messages(1), + ct:pal("subscribing 1"), + {ok, _, [2]} = emqtt:subscribe(Client, SubTopicFilter, qos2), + ct:pal("publishing 2"), + Message2 = emqx_message:make(Topic, Payload2), + publish(Node1, Message2), + receive_messages(1), + ct:pal("subscribing 2"), + {ok, _, [1]} = emqtt:subscribe(Client, SubTopicFilter, qos1), + ct:pal("publishing 3"), + Message3 = emqx_message:make(Topic, Payload3), + publish(Node1, Message3), + receive_messages(1), + ct:pal("publishing 4"), + Message4 = emqx_message:make(AnotherTopic, Payload4), + publish(Node1, Message4), + IteratorIds = get_iterator_ids(Node1, ClientId), + emqtt:stop(Client), + #{ + messages => [Message1, Message2, Message3, Message4], + iterator_ids => IteratorIds + } + end, + fun(Results, Trace) -> + ct:pal("trace:\n ~p", [Trace]), + #{ + messages := [_Message1, Message2, Message3 | _], + iterator_ids := IteratorIds + } = Results, + case ?of_kind(ds_session_subscription_added, Trace) of + [] -> + %% Since `emqx_durable_storage' is a dependency of `emqx', it gets + %% compiled in "prod" mode when running emqx standalone tests. + ok; + [_ | _] -> + ?assertMatch( + [ + #{?snk_kind := ds_session_subscription_added}, + #{?snk_kind := ds_session_subscription_present} + ], + ?of_kind( + [ + ds_session_subscription_added, + ds_session_subscription_present + ], + Trace + ) + ), + ok + end, + ?assertMatch([_], IteratorIds), + [IteratorId] = IteratorIds, + ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end), + ExpectedMessages = [Message2, Message3], + ?assertEqual(ExpectedMessages, ReplayMessages1), + %% Different DS shard + ReplayMessages2 = erpc:call(Node2, fun() -> consume(?DS_SHARD, IteratorId) end), + ?assertEqual([], ReplayMessages2), + ok + end + ), + ok. + %% connect(ClientId, CleanStart, EI) -> @@ -103,8 +210,11 @@ connect(ClientId, CleanStart, EI) -> {ok, _} = emqtt:connect(Client), Client. -consume(Shard, Replay) -> +consume(Shard, Replay = {_TopicFiler, _StartMS}) -> {ok, It} = emqx_ds_storage_layer:make_iterator(Shard, Replay), + consume(It); +consume(Shard, IteratorId) when is_binary(IteratorId) -> + {ok, It} = emqx_ds_storage_layer:restore_iterator(Shard, IteratorId), consume(It). consume(It) -> @@ -114,3 +224,59 @@ consume(It) -> none -> [] end. + +receive_messages(Count) -> + receive_messages(Count, []). + +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + receive_messages(Count - 1, [Msg | Msgs]); + {deliver, _Topic, Msg} -> + receive_messages(Count - 1, [Msg | Msgs]); + _Other -> + receive_messages(Count, Msgs) + after 5000 -> + Msgs + end. + +publish(Node, Message) -> + erpc:call(Node, emqx, publish, [Message]). + +get_iterator_ids(Node, ClientId) -> + Channel = erpc:call(Node, fun() -> + [ConnPid] = emqx_cm:lookup_channels(ClientId), + sys:get_state(ConnPid) + end), + emqx_connection:info({channel, {session, iterators}}, Channel). + +app_specs() -> + [ + emqx_durable_storage, + {emqx, #{ + before_start => fun() -> + emqx_app:set_config_loader(?MODULE) + end, + config => #{persistent_session_store => #{ds => true}}, + override_env => [{boot_modules, [broker, listeners]}] + }} + ]. + +cluster() -> + Node1 = persistent_messages_SUITE1, + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(Node1), + listeners => true, + apps => app_specs() + }, + [ + {Node1, Spec}, + {persistent_messages_SUITE2, Spec} + ]. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 07cfabc70..d8736b918 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -267,6 +267,8 @@ receive_messages(Count, Msgs) -> receive {publish, Msg} -> receive_messages(Count - 1, [Msg | Msgs]); + {deliver, _Topic, Msg} -> + receive_messages(Count - 1, [Msg | Msgs]); _Other -> receive_messages(Count, Msgs) after 5000 -> @@ -373,6 +375,26 @@ do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) -> do_publish(Payload, PublishFun, WaitForUnregister) -> do_publish([Payload], PublishFun, WaitForUnregister). +get_replay_messages(ReplayID) -> + DSShard = <<"local">>, + case emqx_ds_storage_layer:restore_iterator(DSShard, ReplayID) of + {ok, It} -> + do_get_replay_messages(It, []); + Error -> + error({"error restoring iterator", #{error => Error, replay_id => ReplayID}}) + end. + +do_get_replay_messages(It, Acc) -> + case emqx_ds_storage_layer:next(It) of + {value, Val, NewIt} -> + Msg = erlang:binary_to_term(Val), + do_get_replay_messages(NewIt, [Msg | Acc]); + none -> + {ok, lists:reverse(Acc)}; + {error, Reason} -> + {error, Reason} + end. + %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 0e9d3032c..56e0b23b8 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -20,6 +20,7 @@ -include_lib("proper/include/proper.hrl"). -include("emqx.hrl"). +-include("emqx_session.hrl"). -include("emqx_access_control.hrl"). %% High level Types @@ -132,33 +133,23 @@ clientinfo() -> sessioninfo() -> ?LET( Session, - {session, clientid(), - % id - sessionid(), - % is_persistent - boolean(), - % subscriptions - subscriptions(), - % max_subscriptions - non_neg_integer(), - % upgrade_qos - boolean(), - % emqx_inflight:inflight() - inflight(), - % emqx_mqueue:mqueue() - mqueue(), - % next_pkt_id - packet_id(), - % retry_interval - safty_timeout(), - % awaiting_rel - awaiting_rel(), - % max_awaiting_rel - non_neg_integer(), - % await_rel_timeout - safty_timeout(), - % created_at - timestamp()}, + #session{ + clientid = clientid(), + id = sessionid(), + is_persistent = boolean(), + subscriptions = subscriptions(), + max_subscriptions = non_neg_integer(), + upgrade_qos = boolean(), + inflight = inflight(), + mqueue = mqueue(), + next_pkt_id = packet_id(), + retry_interval = safty_timeout(), + awaiting_rel = awaiting_rel(), + max_awaiting_rel = non_neg_integer(), + await_rel_timeout = safty_timeout(), + created_at = timestamp(), + iterators = [] + }, emqx_session:info(Session) ). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 9eccf8c16..a69357975 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -15,6 +15,8 @@ %%-------------------------------------------------------------------- -module(emqx_ds). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + %% API: -export([ensure_shard/2]). %% Messages: @@ -56,7 +58,7 @@ -type iterator() :: term(). --opaque iterator_id() :: binary(). +-type iterator_id() :: binary(). %%-type session() :: #session{}. @@ -73,7 +75,8 @@ %% Timestamp %% Earliest possible timestamp is 0. -%% TODO granularity? +%% TODO granularity? Currently, we should always use micro second, as that's the unit we +%% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). %%================================================================================ @@ -129,11 +132,13 @@ session_open(ClientID) -> fun() -> case mnesia:read(?SESSION_TAB, ClientID) of [#session{iterators = Iterators}] -> - {false, ClientID, Iterators}; + IteratorIDs = maps:values(Iterators), + {false, ClientID, IteratorIDs}; [] -> - Session = #session{id = ClientID, iterators = []}, + Iterators = #{}, + Session = #session{id = ClientID, iterators = Iterators}, mnesia:write(?SESSION_TAB, Session, write), - {true, ClientID, []} + {true, ClientID, _IteratorIDs = []} end end ), @@ -160,10 +165,38 @@ session_suspend(_SessionId) -> %% @doc Called when a client subscribes to a topic. Idempotent. -spec session_add_iterator(session_id(), emqx_topic:words()) -> - {ok, iterator_id()} | {error, session_not_found}. -session_add_iterator(_SessionId, _TopicFilter) -> - %% TODO - {ok, <<"">>}. + {ok, iterator_id(), time(), _IsNew :: boolean()} | {error, session_not_found}. +session_add_iterator(DSSessionId, TopicFilter) -> + {atomic, Ret} = + mria:transaction( + ?DS_SHARD, + fun() -> + case mnesia:wread({?SESSION_TAB, DSSessionId}) of + [] -> + {error, session_not_found}; + [#session{iterators = #{TopicFilter := IteratorId}}] -> + StartMS = get_start_ms(IteratorId, DSSessionId), + ?tp( + ds_session_subscription_present, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = false, + {ok, IteratorId, StartMS, IsNew}; + [#session{iterators = Iterators0} = Session0] -> + {IteratorId, StartMS} = new_iterator_id(DSSessionId), + Iterators = Iterators0#{TopicFilter => IteratorId}, + Session = Session0#session{iterators = Iterators}, + mnesia:write(?SESSION_TAB, Session, write), + ?tp( + ds_session_subscription_added, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = true, + {ok, IteratorId, StartMS, IsNew} + end + end + ), + Ret. %% @doc Called when a client unsubscribes from a topic. Returns `true' %% if the session contained the subscription or `false' if it wasn't @@ -201,3 +234,14 @@ iterator_stats() -> %%================================================================================ %% Internal functions %%================================================================================ + +-spec new_iterator_id(session_id()) -> {iterator_id(), time()}. +new_iterator_id(DSSessionId) -> + NowMS = erlang:system_time(microsecond), + NowMSBin = integer_to_binary(NowMS), + {<>, NowMS}. + +-spec get_start_ms(iterator_id(), emqx_session:session_id()) -> time(). +get_start_ms(IteratorId, SessionId) -> + <> = IteratorId, + binary_to_integer(StartMSBin). diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl index 96688ede6..fa11a6600 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -21,7 +21,7 @@ -record(session, { id :: emqx_ds:session_id(), - iterators :: [{emqx_topic:words(), emqx_ds:iterator_id()}] + iterators :: #{emqx_topic:words() => emqx_ds:iterator_id()} }). -endif. diff --git a/apps/emqx_durable_storage/src/emqx_ds_replay.erl b/apps/emqx_durable_storage/src/emqx_ds_replay.erl index a66cee7fd..b9ffa32ac 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_replay.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_replay.erl @@ -15,7 +15,7 @@ -type replay_id() :: binary(). -type replay() :: { - _TopicFilter :: emqx_ds:topic(), + _TopicFilter :: emqx_ds:words(), _StartTime :: emqx_ds:time() }. diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 7c73dafaf..ac4649d94 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -13,7 +13,13 @@ -export([make_iterator/2, next/1]). --export([preserve_iterator/2, restore_iterator/2, discard_iterator/2]). +-export([ + preserve_iterator/2, + restore_iterator/2, + discard_iterator/2, + is_iterator_present/2, + discard_iterator_prefix/2 +]). %% behaviour callbacks: -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -160,10 +166,10 @@ next(It = #it{module = Mod, data = ItData}) -> end end. --spec preserve_iterator(iterator(), emqx_ds_replay:replay_id()) -> +-spec preserve_iterator(iterator(), emqx_ds:iterator_id()) -> ok | {error, _TODO}. -preserve_iterator(It = #it{}, ReplayID) -> - iterator_put_state(ReplayID, It). +preserve_iterator(It = #it{}, IteratorID) -> + iterator_put_state(IteratorID, It). -spec restore_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> {ok, iterator()} | {error, _TODO}. @@ -177,11 +183,27 @@ restore_iterator(Shard, ReplayID) -> Error end. --spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> +-spec is_iterator_present(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> + boolean(). +is_iterator_present(Shard, ReplayID) -> + %% TODO: use keyMayExist after added to wrapper? + case iterator_get_state(Shard, ReplayID) of + {ok, _} -> + true; + _ -> + false + end. + +-spec discard_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> ok | {error, _TODO}. discard_iterator(Shard, ReplayID) -> iterator_delete(Shard, ReplayID). +-spec discard_iterator_prefix(emqx_ds:shard(), binary()) -> + ok | {error, _TODO}. +discard_iterator_prefix(Shard, KeyPrefix) -> + do_discard_iterator_prefix(Shard, KeyPrefix). + %%================================================================================ %% behaviour callbacks %%================================================================================ @@ -391,6 +413,32 @@ restore_iterator_state( It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, open_restore_iterator(meta_get_gen(Shard, Gen), It, State). +do_discard_iterator_prefix(Shard, KeyPrefix) -> + #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), + case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of + {ok, It} -> + NextAction = {seek, KeyPrefix}, + do_discard_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction); + Error -> + Error + end. + +do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction) -> + case rocksdb:iterator_move(It, NextAction) of + {ok, K = <>, _V} -> + ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS), + do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, next); + {ok, _K, _V} -> + ok = rocksdb:iterator_close(It), + ok; + {error, invalid_iterator} -> + ok = rocksdb:iterator_close(It), + ok; + Error -> + ok = rocksdb:iterator_close(It), + Error + end. + %% Functions for dealing with the metadata stored persistently in rocksdb -define(CURRENT_GEN, <<"current">>). diff --git a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src index ecf9dd270..367ade691 100644 --- a/apps/emqx_durable_storage/src/emqx_durable_storage.app.src +++ b/apps/emqx_durable_storage/src/emqx_durable_storage.app.src @@ -2,7 +2,7 @@ {application, emqx_durable_storage, [ {description, "Message persistence and subscription replays for EMQX"}, % strict semver, bump manually! - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {modules, []}, {registered, []}, {applications, [kernel, stdlib, rocksdb, gproc, mria]}, diff --git a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl index bbe16f518..10431eb1a 100644 --- a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl @@ -14,6 +14,8 @@ -opaque t() :: ets:tid(). +-export_type([t/0]). + -spec open() -> t(). open() -> ets:new(?MODULE, [ordered_set, {keypos, 1}]). diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index 18ac65ae6..d9b6d9bd5 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -927,7 +927,7 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) -> retry_interval, upgrade_qos, zone, - %% sessionID, defined in emqx_session.erl + %% session_id, defined in emqx_session.erl id ], TimesKeys = [created_at, connected_at, disconnected_at], From e8d7bb9a6767898e4aee6a3d2fbf977813f852dd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 11 Aug 2023 10:05:00 -0300 Subject: [PATCH 03/23] refactor: rename module --- apps/emqx/priv/bpapi.versions | 2 +- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- ..._ds_proto_v1.erl => emqx_persistent_session_ds_proto_v1.erl} | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename apps/emqx/src/proto/{emqx_ds_proto_v1.erl => emqx_persistent_session_ds_proto_v1.erl} (96%) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index b6a4c6e7a..68d42ee01 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -15,7 +15,6 @@ {emqx_conf,3}. {emqx_dashboard,1}. {emqx_delayed,1}. -{emqx_ds,1}. {emqx_eviction_agent,1}. {emqx_exhook,1}. {emqx_ft_storage_exporter_fs,1}. @@ -42,6 +41,7 @@ {emqx_node_rebalance_evacuation,1}. {emqx_node_rebalance_status,1}. {emqx_persistent_session,1}. +{emqx_persistent_session_ds,1}. {emqx_plugins,1}. {emqx_prometheus,1}. {emqx_resource,1}. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 7e9db4707..7d118ccb4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -106,7 +106,7 @@ add_subscription(TopicFilterBin, DSSessionID) -> -spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> Nodes = emqx:running_nodes(), - Results = emqx_ds_proto_v1:open_iterator(Nodes, TopicFilter, StartMS, IteratorID), + Results = emqx_persistent_session_ds_proto_v1:open_iterator(Nodes, TopicFilter, StartMS, IteratorID), %% TODO: handle errors true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), ok. diff --git a/apps/emqx/src/proto/emqx_ds_proto_v1.erl b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl similarity index 96% rename from apps/emqx/src/proto/emqx_ds_proto_v1.erl rename to apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl index 2283b7e4e..cd348cc2c 100644 --- a/apps/emqx/src/proto/emqx_ds_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_persistent_session_ds_proto_v1.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_ds_proto_v1). +-module(emqx_persistent_session_ds_proto_v1). -behaviour(emqx_bpapi). From 33ddbe80ad5e571b682a5a3f001d2fa42169bb01 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 11 Aug 2023 10:05:26 -0300 Subject: [PATCH 04/23] refactor: remove persistence leakeage from emqx_cm level --- apps/emqx/src/emqx_cm.erl | 12 ++---------- apps/emqx/src/emqx_session.erl | 14 +++++++++++++- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index b98222959..ae6efb89c 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -301,17 +301,9 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> emqx_cm_locker:trans(ClientId, ResumeStart). create_session(ClientInfo, ConnInfo) -> + #{clientid := ClientId} = ClientInfo, Options = get_session_confs(ClientInfo, ConnInfo), - #{clientid := ClientID} = ClientInfo, - Session0 = emqx_session:init(Options), - IteratorIDs = - case emqx_persistent_session_ds:open_session(ClientID) of - {skipped, disabled} -> - []; - {_IsNew, _DSSessionID, Iterators0} -> - Iterators0 - end, - Session = Session0#session{iterators = IteratorIDs}, + Session = emqx_session:init_and_open(ClientId, Options), ok = emqx_metrics:inc('session.created'), ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 8f1933fc1..9b877ae44 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -60,7 +60,7 @@ unpersist/1 ]). --export([init/1]). +-export([init/1, init_and_open/2]). -export([ info/1, @@ -166,6 +166,18 @@ %% Init a Session %%-------------------------------------------------------------------- +-spec init_and_open(emqx_types:clientid(), options()) -> session(). +init_and_open(ClientID, Options) -> + Session0 = emqx_session:init(Options), + IteratorIDs = + case emqx_persistent_session_ds:open_session(ClientID) of + {skipped, disabled} -> + []; + {_IsNew, _DSSessionID, Iterators0} -> + Iterators0 + end, + Session0#session{iterators = IteratorIDs}. + -spec init(options()) -> session(). init(Opts) -> MaxInflight = maps:get(max_inflight, Opts), From c28c6d1b7e62ced8f792ea99e9adcb89a1088bfd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 11 Aug 2023 10:06:39 -0300 Subject: [PATCH 05/23] fix: ensure iterator is opened --- apps/emqx/src/emqx_persistent_session_ds.erl | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 7d118ccb4..564c99c00 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -93,12 +93,7 @@ add_subscription(TopicFilterBin, DSSessionID) -> {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( DSSessionID, TopicFilter ), - case IsNew of - true -> - ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID); - false -> - ok - end, + ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID), {ok, IteratorID, IsNew} end ). @@ -106,7 +101,9 @@ add_subscription(TopicFilterBin, DSSessionID) -> -spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> Nodes = emqx:running_nodes(), - Results = emqx_persistent_session_ds_proto_v1:open_iterator(Nodes, TopicFilter, StartMS, IteratorID), + Results = emqx_persistent_session_ds_proto_v1:open_iterator( + Nodes, TopicFilter, StartMS, IteratorID + ), %% TODO: handle errors true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), ok. @@ -114,10 +111,15 @@ open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> -spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. do_open_iterator(TopicFilter, StartMS, IteratorID) -> Replay = {TopicFilter, StartMS}, - %% FIXME: choose DS shard based on ...? - {ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay), - ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), - ok. + case emqx_ds_storage_layer:is_iterator_present(?DS_SHARD, IteratorID) of + true -> + {ok, _It} = emqx_ds_storage_layer:restore_iterator(?DS_SHARD, IteratorID), + ok; + false -> + {ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay), + ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), + ok + end. %% From 8eab389ae1d018961445aba6597e22ead954647f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 11 Aug 2023 10:12:41 -0300 Subject: [PATCH 06/23] perf: avoid unnecessary transaction --- apps/emqx_durable_storage/src/emqx_ds.erl | 27 +++++++++-------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index a69357975..654451561 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -126,23 +126,16 @@ message_stats() -> %% the broker. -spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id(), [iterator_id()]}. session_open(ClientID) -> - {atomic, Ret} = - mria:transaction( - ?DS_SHARD, - fun() -> - case mnesia:read(?SESSION_TAB, ClientID) of - [#session{iterators = Iterators}] -> - IteratorIDs = maps:values(Iterators), - {false, ClientID, IteratorIDs}; - [] -> - Iterators = #{}, - Session = #session{id = ClientID, iterators = Iterators}, - mnesia:write(?SESSION_TAB, Session, write), - {true, ClientID, _IteratorIDs = []} - end - end - ), - Ret. + case mnesia:dirty_read(?SESSION_TAB, ClientID) of + [#session{iterators = Iterators}] -> + IteratorIDs = maps:values(Iterators), + {false, ClientID, IteratorIDs}; + [] -> + Iterators = #{}, + Session = #session{id = ClientID, iterators = Iterators}, + mria:dirty_write(?SESSION_TAB, Session), + {true, ClientID, _IteratorIDs = []} + end. %% @doc Called when a client reconnects with `clean session=true' or %% during session GC From 3239f5ac5bf7e1593f666881e1e28048aa3677c1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 11 Aug 2023 10:43:16 -0300 Subject: [PATCH 07/23] feat: rm unnecessary transactions, use separate table for iterator references --- apps/emqx_durable_storage/src/emqx_ds.erl | 72 ++++++++----------- apps/emqx_durable_storage/src/emqx_ds_app.erl | 13 +++- apps/emqx_durable_storage/src/emqx_ds_int.hrl | 7 ++ 3 files changed, 47 insertions(+), 45 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 654451561..297c7d857 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -141,12 +141,7 @@ session_open(ClientID) -> %% during session GC -spec session_drop(emqx_types:clientid()) -> ok. session_drop(ClientID) -> - {atomic, ok} = mria:transaction( - ?DS_SHARD, - fun() -> - mnesia:delete({?SESSION_TAB, ClientID}) - end - ), + ok = mria:dirty_delete({?SESSION_TAB, ClientID}), ok. %% @doc Called when a client disconnects. This function terminates all @@ -158,38 +153,32 @@ session_suspend(_SessionId) -> %% @doc Called when a client subscribes to a topic. Idempotent. -spec session_add_iterator(session_id(), emqx_topic:words()) -> - {ok, iterator_id(), time(), _IsNew :: boolean()} | {error, session_not_found}. + {ok, iterator_id(), time(), _IsNew :: boolean()}. session_add_iterator(DSSessionId, TopicFilter) -> - {atomic, Ret} = - mria:transaction( - ?DS_SHARD, - fun() -> - case mnesia:wread({?SESSION_TAB, DSSessionId}) of - [] -> - {error, session_not_found}; - [#session{iterators = #{TopicFilter := IteratorId}}] -> - StartMS = get_start_ms(IteratorId, DSSessionId), - ?tp( - ds_session_subscription_present, - #{iterator_id => IteratorId, session_id => DSSessionId} - ), - IsNew = false, - {ok, IteratorId, StartMS, IsNew}; - [#session{iterators = Iterators0} = Session0] -> - {IteratorId, StartMS} = new_iterator_id(DSSessionId), - Iterators = Iterators0#{TopicFilter => IteratorId}, - Session = Session0#session{iterators = Iterators}, - mnesia:write(?SESSION_TAB, Session, write), - ?tp( - ds_session_subscription_added, - #{iterator_id => IteratorId, session_id => DSSessionId} - ), - IsNew = true, - {ok, IteratorId, StartMS, IsNew} - end - end - ), - Ret. + IteratorRefId = {DSSessionId, TopicFilter}, + case mnesia:dirty_read(?ITERATOR_REF_TAB, IteratorRefId) of + [] -> + {IteratorId, StartMS} = new_iterator_id(DSSessionId), + IteratorRef = #iterator_ref{ + ref_id = IteratorRefId, + it_id = IteratorId, + start_time = StartMS + }, + ok = mria:dirty_write(?ITERATOR_REF_TAB, IteratorRef), + ?tp( + ds_session_subscription_added, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = true, + {ok, IteratorId, StartMS, IsNew}; + [#iterator_ref{it_id = IteratorId, start_time = StartMS}] -> + ?tp( + ds_session_subscription_present, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = false, + {ok, IteratorId, StartMS, IsNew} + end. %% @doc Called when a client unsubscribes from a topic. Returns `true' %% if the session contained the subscription or `false' if it wasn't @@ -231,10 +220,5 @@ iterator_stats() -> -spec new_iterator_id(session_id()) -> {iterator_id(), time()}. new_iterator_id(DSSessionId) -> NowMS = erlang:system_time(microsecond), - NowMSBin = integer_to_binary(NowMS), - {<>, NowMS}. - --spec get_start_ms(iterator_id(), emqx_session:session_id()) -> time(). -get_start_ms(IteratorId, SessionId) -> - <> = IteratorId, - binary_to_integer(StartMSBin). + IteratorId = <>, + {IteratorId, NowMS}. diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 216e979ee..cbcdb0b8c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -25,7 +25,18 @@ init_mnesia() -> {record_name, session}, {attributes, record_info(fields, session)} ] - ). + ), + ok = mria:create_table( + ?ITERATOR_REF_TAB, + [ + {rlog_shard, ?DS_SHARD}, + {type, ordered_set}, + {storage, storage()}, + {record_name, iterator_ref}, + {attributes, record_info(fields, iterator_ref)} + ] + ), + ok. storage() -> case mria:rocksdb_backend_available() of diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl index fa11a6600..55223068f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -17,6 +17,7 @@ -define(EMQX_DS_HRL, true). -define(SESSION_TAB, emqx_ds_session). +-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). -define(DS_SHARD, emqx_ds_shard). -record(session, { @@ -24,4 +25,10 @@ iterators :: #{emqx_topic:words() => emqx_ds:iterator_id()} }). +-record(iterator_ref, { + ref_id :: {emqx_ds:session_id(), emqx_topic:words()}, + it_id :: emqx_ds:iterator_id(), + start_time :: emqx_ds:time() +}). + -endif. From e4e88ebf36b57a43f2a4327578971f086f6601f1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 11 Aug 2023 14:11:28 -0300 Subject: [PATCH 08/23] test: add scenario for node stopping midway during subscribe --- apps/emqx/src/emqx_persistent_session_ds.erl | 16 +- apps/emqx/test/emqx_cth_cluster.erl | 18 +- apps/emqx/test/emqx_cth_suite.erl | 2 + .../test/emqx_persistent_messages_SUITE.erl | 8 + .../src/emqx_ds_storage_layer.erl | 32 +++- .../test/emqx_ds_SUITE.erl | 178 ++++++++++++++++++ 6 files changed, 241 insertions(+), 13 deletions(-) create mode 100644 apps/emqx_durable_storage/test/emqx_ds_SUITE.erl diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 564c99c00..0120f09d4 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -93,13 +93,27 @@ add_subscription(TopicFilterBin, DSSessionID) -> {ok, IteratorID, StartMS, IsNew} = emqx_ds:session_add_iterator( DSSessionID, TopicFilter ), - ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID), + Ctx = #{ + iterator_id => IteratorID, + start_time => StartMS, + is_new => IsNew + }, + ?tp(persistent_session_ds_iterator_added, Ctx), + ?tp_span( + persistent_session_ds_open_iterators, + Ctx, + ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) + ), {ok, IteratorID, IsNew} end ). -spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> + ?tp(persistent_session_ds_will_open_iterators, #{ + iterator_id => IteratorID, + start_time => StartMS + }), Nodes = emqx:running_nodes(), Results = emqx_persistent_session_ds_proto_v1:open_iterator( Nodes, TopicFilter, StartMS, IteratorID diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index ddcb3234c..cbbad0aa2 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -17,7 +17,7 @@ -module(emqx_cth_cluster). -export([start/2]). --export([stop/1]). +-export([stop/1, stop_node/1]). -export([share_load_module/2]). -export([node_name/1]). @@ -80,7 +80,12 @@ when %% Working directory %% Everything a test produces should go here. Each node's stuff should go in its %% own directory. - work_dir := file:name() + work_dir := file:name(), + %% Usually, we want to ensure the node / test suite starts from a clean slate. + %% However, sometimes, we may want to test restarting a node. For such + %% situations, we need to disable this check to allow resuming from an existing + %% state. + skip_clean_suite_state_check => boolean() }. start(Nodes, ClusterOpts) -> NodeSpecs = mk_nodespecs(Nodes, ClusterOpts), @@ -124,12 +129,14 @@ mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) -> Node = node_name(Name), BasePort = base_port(N), WorkDir = maps:get(work_dir, ClusterOpts), + SkipCleanSuiteStateCheck = maps:get(skip_clean_suite_state_check, ClusterOpts, false), Defaults = #{ name => Node, role => core, apps => [], base_port => BasePort, work_dir => filename:join([WorkDir, Node]), + skip_clean_suite_state_check => SkipCleanSuiteStateCheck, driver => ct_slave }, maps:merge(Defaults, NodeOpts). @@ -288,17 +295,20 @@ load_apps(Node, #{apps := Apps}) -> erpc:call(Node, emqx_cth_suite, load_apps, [Apps]). start_apps_clustering(Node, #{apps := Apps} = Spec) -> - SuiteOpts = maps:with([work_dir], Spec), + SuiteOpts = suite_opts(Spec), AppsClustering = [lists:keyfind(App, 1, Apps) || App <- ?APPS_CLUSTERING], _Started = erpc:call(Node, emqx_cth_suite, start, [AppsClustering, SuiteOpts]), ok. start_apps(Node, #{apps := Apps} = Spec) -> - SuiteOpts = maps:with([work_dir], Spec), + SuiteOpts = suite_opts(Spec), AppsRest = [AppSpec || AppSpec = {App, _} <- Apps, not lists:member(App, ?APPS_CLUSTERING)], _Started = erpc:call(Node, emqx_cth_suite, start_apps, [AppsRest, SuiteOpts]), ok. +suite_opts(Spec) -> + maps:with([work_dir, skip_clean_suite_state_check], Spec). + maybe_join_cluster(_Node, #{role := replicant}) -> ok; maybe_join_cluster(Node, Spec) -> diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 9b3e58da4..dbe9423da 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -358,6 +358,8 @@ stop_apps(Apps) -> %% +verify_clean_suite_state(#{skip_clean_suite_state_check := true}) -> + ok; verify_clean_suite_state(#{work_dir := WorkDir}) -> {ok, []} = file:list_dir(WorkDir), none = persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, none), diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 62702b3bc..b669be889 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -183,6 +183,8 @@ t_session_subscription_iterators(Config) -> ok end, ?assertMatch([_], IteratorIds), + ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), + ?assertMatch({ok, [_]}, get_all_iterator_ids(Node2)), [IteratorId] = IteratorIds, ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end), ExpectedMessages = [Message2, Message3], @@ -280,3 +282,9 @@ cluster() -> get_mqtt_port(Node, Type) -> {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), Port. + +get_all_iterator_ids(Node) -> + Fn = fun(K, _V, Acc) -> [K | Acc] end, + erpc:call(Node, fun() -> + emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) + end). diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index ac4649d94..9bc7924e8 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -18,7 +18,8 @@ restore_iterator/2, discard_iterator/2, is_iterator_present/2, - discard_iterator_prefix/2 + discard_iterator_prefix/2, + foldl_iterator_prefix/4 ]). %% behaviour callbacks: @@ -204,6 +205,16 @@ discard_iterator(Shard, ReplayID) -> discard_iterator_prefix(Shard, KeyPrefix) -> do_discard_iterator_prefix(Shard, KeyPrefix). +-spec foldl_iterator_prefix( + emqx_ds:shard(), + binary(), + fun((_Key :: binary(), _Value :: binary(), Acc) -> Acc), + Acc +) -> {ok, Acc} | {error, _TODO} when + Acc :: term(). +foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc). + %%================================================================================ %% behaviour callbacks %%================================================================================ @@ -414,26 +425,31 @@ restore_iterator_state( open_restore_iterator(meta_get_gen(Shard, Gen), It, State). do_discard_iterator_prefix(Shard, KeyPrefix) -> + #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db), + Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end, + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, ok). + +do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, Acc) -> #db{handle = Handle, cf_iterator = CF} = meta_lookup(Shard, db), case rocksdb:iterator(Handle, CF, ?ITERATION_READ_OPTS) of {ok, It} -> NextAction = {seek, KeyPrefix}, - do_discard_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction); + do_foldl_iterator_prefix(Handle, CF, It, KeyPrefix, NextAction, Fn, Acc); Error -> Error end. -do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction) -> +do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, NextAction, Fn, Acc) -> case rocksdb:iterator_move(It, NextAction) of - {ok, K = <>, _V} -> - ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS), - do_discard_iterator_prefix(DBHandle, CF, It, KeyPrefix, next); + {ok, K = <>, V} -> + NewAcc = Fn(K, V, Acc), + do_foldl_iterator_prefix(DBHandle, CF, It, KeyPrefix, next, Fn, NewAcc); {ok, _K, _V} -> ok = rocksdb:iterator_close(It), - ok; + {ok, Acc}; {error, invalid_iterator} -> ok = rocksdb:iterator_close(It), - ok; + {ok, Acc}; Error -> ok = rocksdb:iterator_close(It), Error diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl new file mode 100644 index 000000000..8a2d18c0d --- /dev/null +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -0,0 +1,178 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ds_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-define(DS_SHARD, <<"local">>). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + %% avoid inter-suite flakiness... + application:stop(emqx), + application:stop(emqx_durable_storage), + TCApps = emqx_cth_suite:start( + app_specs(), + #{work_dir => ?config(priv_dir, Config)} + ), + [{tc_apps, TCApps} | Config]. + +end_per_suite(Config) -> + TCApps = ?config(tc_apps, Config), + emqx_cth_suite:stop(TCApps), + ok. + +init_per_testcase(t_session_subscription_idempotency, Config) -> + Cluster = cluster(#{n => 1}), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}), + [{cluster, Cluster}, {nodes, Nodes} | Config]; +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(t_session_subscription_idempotency, Config) -> + Nodes = ?config(nodes, Config), + ok = emqx_cth_cluster:stop(Nodes), + ok; +end_per_testcase(_TestCase, _Config) -> + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +cluster(#{n := N}) -> + Node1 = ds_SUITE1, + Spec = #{ + role => core, + join_to => emqx_cth_cluster:node_name(Node1), + listeners => true, + apps => app_specs() + }, + [ + {Node1, Spec} + | lists:map( + fun(M) -> + Name = binary_to_atom(<<"ds_SUITE", (integer_to_binary(M))/binary>>), + {Name, Spec} + end, + lists:seq(2, N) + ) + ]. + +app_specs() -> + [ + emqx_durable_storage, + {emqx, #{ + before_start => fun() -> + emqx_app:set_config_loader(?MODULE) + end, + config => #{persistent_session_store => #{ds => true}}, + override_env => [{boot_modules, [broker, listeners]}] + }} + ]. + +get_mqtt_port(Node, Type) -> + {_IP, Port} = erpc:call(Node, emqx_config, get, [[listeners, Type, default, bind]]), + Port. + +get_all_iterator_ids(Node) -> + Fn = fun(K, _V, Acc) -> [K | Acc] end, + erpc:call(Node, fun() -> + emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) + end). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_session_subscription_idempotency(Config) -> + Cluster = ?config(cluster, Config), + [Node1] = ?config(nodes, Config), + Port = get_mqtt_port(Node1, tcp), + SubTopicFilter = <<"t/+">>, + ClientId = <<"myclientid">>, + ?check_trace( + begin + ?force_ordering( + #{?snk_kind := persistent_session_ds_iterator_added}, + _NEvents0 = 1, + #{?snk_kind := will_restart_node}, + _Guard0 = true + ), + ?force_ordering( + #{?snk_kind := restarted_node}, + _NEvents1 = 1, + #{?snk_kind := persistent_session_ds_open_iterators, ?snk_span := start}, + _Guard1 = true + ), + + spawn_link(fun() -> + ?tp(will_restart_node, #{}), + ct:pal("stopping node ~p", [Node1]), + ok = emqx_cth_cluster:stop_node(Node1), + ct:pal("stopped node ~p; restarting...", [Node1]), + [Node1] = emqx_cth_cluster:start(Cluster, #{ + work_dir => ?config(priv_dir, Config), + skip_clean_suite_state_check => true + }), + ct:pal("node ~p restarted", [Node1]), + ?tp(restarted_node, #{}), + ok + end), + + ct:pal("starting 1"), + {ok, Client0} = emqtt:start_link([ + {port, Port}, + {clientid, ClientId}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client0), + ct:pal("subscribing 1"), + process_flag(trap_exit, true), + catch emqtt:subscribe(Client0, SubTopicFilter, qos2), + receive + {'EXIT', {shutdown, _}} -> + ok + after 0 -> ok + end, + process_flag(trap_exit, false), + + {ok, _} = ?block_until(#{?snk_kind := restarted_node}, 15_000), + ct:pal("starting 2"), + {ok, Client1} = emqtt:start_link([ + {port, Port}, + {clientid, ClientId}, + {proto_ver, v5} + ]), + {ok, _} = emqtt:connect(Client1), + ct:pal("subscribing 2"), + {ok, _, [2]} = emqtt:subscribe(Client1, SubTopicFilter, qos2), + + ok = emqtt:stop(Client1), + + ok + end, + fun(Trace) -> + ct:pal("trace:\n ~p", [Trace]), + %% Exactly one iterator should have been opened. + ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), + ?assertMatch( + {_IsNew = false, ClientId, _}, + erpc:call(Node1, emqx_ds, session_open, [ClientId]) + ), + ok + end + ), + ok. From 021755b82bb1d5293620cacd6b324e669c9b2555 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 11 Aug 2023 15:08:38 -0300 Subject: [PATCH 09/23] refactor: rm iterators from DS `#session{}` record --- apps/emqx/include/emqx_session.hrl | 4 +-- apps/emqx/src/emqx_persistent_session_ds.erl | 2 +- apps/emqx/src/emqx_session.erl | 25 ++++-------------- .../test/emqx_persistent_messages_SUITE.erl | 17 ++++-------- apps/emqx/test/emqx_proper_types.erl | 3 +-- apps/emqx_durable_storage/src/emqx_ds.erl | 13 +++++----- apps/emqx_durable_storage/src/emqx_ds_int.hrl | 4 ++- .../src/emqx_ds_storage_layer.erl | 26 +++++++++++++++++-- .../test/emqx_ds_SUITE.erl | 2 +- 9 files changed, 47 insertions(+), 49 deletions(-) diff --git a/apps/emqx/include/emqx_session.hrl b/apps/emqx/include/emqx_session.hrl index fba4cf911..3fea157ed 100644 --- a/apps/emqx/include/emqx_session.hrl +++ b/apps/emqx/include/emqx_session.hrl @@ -49,9 +49,7 @@ %% Awaiting PUBREL Timeout (Unit: millisecond) await_rel_timeout :: timeout(), %% Created at - created_at :: pos_integer(), - %% Durable storage iterators for existing subscriptions - iterators = [] :: [emqx_ds_replay:replay_id()] + created_at :: pos_integer() }). -endif. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 0120f09d4..19e11b1a3 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -85,7 +85,7 @@ open_session(ClientID) -> ?WHEN_ENABLED(emqx_ds:session_open(ClientID)). -spec add_subscription(emqx_types:topic(), emqx_ds:session_id()) -> - {ok, emqx_ds:iterator_id(), _IsNew :: boolean()} | {skipped, disabled}. + {ok, emqx_ds:iterator_id(), IsNew :: boolean()} | {skipped, disabled}. add_subscription(TopicFilterBin, DSSessionID) -> ?WHEN_ENABLED( begin diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 9b877ae44..32c98290a 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -169,14 +169,8 @@ -spec init_and_open(emqx_types:clientid(), options()) -> session(). init_and_open(ClientID, Options) -> Session0 = emqx_session:init(Options), - IteratorIDs = - case emqx_persistent_session_ds:open_session(ClientID) of - {skipped, disabled} -> - []; - {_IsNew, _DSSessionID, Iterators0} -> - Iterators0 - end, - Session0#session{iterators = IteratorIDs}. + _ = emqx_persistent_session_ds:open_session(ClientID), + Session0. -spec init(options()) -> session(). init(Opts) -> @@ -274,9 +268,7 @@ info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(created_at, #session{created_at = CreatedAt}) -> - CreatedAt; -info(iterators, #session{iterators = IteratorIds}) -> - IteratorIds. + CreatedAt. %% @doc Get stats of the session. -spec stats(session()) -> emqx_types:stats(). @@ -325,15 +317,8 @@ is_subscriptions_full(#session{ -spec add_persistent_subscription(emqx_types:topic(), emqx_types:clientid(), session()) -> session(). add_persistent_subscription(TopicFilterBin, ClientId, Session) -> - case emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId) of - {skipped, disabled} -> - Session; - {ok, IteratorID, _IsNew = true} -> - Iterators = Session#session.iterators, - Session#session{iterators = [IteratorID | Iterators]}; - {ok, _IteratorID, _IsNew = false} -> - Session - end. + _ = emqx_persistent_session_ds:add_subscription(TopicFilterBin, ClientId), + Session. %%-------------------------------------------------------------------- %% Client -> Broker: UNSUBSCRIBE diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index b669be889..7ca1f3f15 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -148,18 +148,15 @@ t_session_subscription_iterators(Config) -> ct:pal("publishing 4"), Message4 = emqx_message:make(AnotherTopic, Payload4), publish(Node1, Message4), - IteratorIds = get_iterator_ids(Node1, ClientId), emqtt:stop(Client), #{ - messages => [Message1, Message2, Message3, Message4], - iterator_ids => IteratorIds + messages => [Message1, Message2, Message3, Message4] } end, fun(Results, Trace) -> ct:pal("trace:\n ~p", [Trace]), #{ - messages := [_Message1, Message2, Message3 | _], - iterator_ids := IteratorIds + messages := [_Message1, Message2, Message3 | _] } = Results, case ?of_kind(ds_session_subscription_added, Trace) of [] -> @@ -182,10 +179,9 @@ t_session_subscription_iterators(Config) -> ), ok end, - ?assertMatch([_], IteratorIds), ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), - ?assertMatch({ok, [_]}, get_all_iterator_ids(Node2)), - [IteratorId] = IteratorIds, + {ok, [IteratorId]} = get_all_iterator_ids(Node1), + ?assertMatch({ok, [IteratorId]}, get_all_iterator_ids(Node2)), ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end), ExpectedMessages = [Message2, Message3], ?assertEqual(ExpectedMessages, ReplayMessages1), @@ -284,7 +280,4 @@ get_mqtt_port(Node, Type) -> Port. get_all_iterator_ids(Node) -> - Fn = fun(K, _V, Acc) -> [K | Acc] end, - erpc:call(Node, fun() -> - emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) - end). + erpc:call(Node, emqx_ds_storage_layer, list_iterator_prefix, [?DS_SHARD, <<>>]). diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 56e0b23b8..ab1720754 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -147,8 +147,7 @@ sessioninfo() -> awaiting_rel = awaiting_rel(), max_awaiting_rel = non_neg_integer(), await_rel_timeout = safty_timeout(), - created_at = timestamp(), - iterators = [] + created_at = timestamp() }, emqx_session:info(Session) ). diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 297c7d857..3cc7ca886 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -15,6 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_ds). +-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API: @@ -124,17 +125,15 @@ message_stats() -> %% %% Note: session API doesn't handle session takeovers, it's the job of %% the broker. --spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id(), [iterator_id()]}. +-spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}. session_open(ClientID) -> case mnesia:dirty_read(?SESSION_TAB, ClientID) of - [#session{iterators = Iterators}] -> - IteratorIDs = maps:values(Iterators), - {false, ClientID, IteratorIDs}; + [#session{}] -> + {false, ClientID}; [] -> - Iterators = #{}, - Session = #session{id = ClientID, iterators = Iterators}, + Session = #session{id = ClientID}, mria:dirty_write(?SESSION_TAB, Session), - {true, ClientID, _IteratorIDs = []} + {true, ClientID} end. %% @doc Called when a client reconnects with `clean session=true' or diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl index 55223068f..47493bd0b 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -21,8 +21,10 @@ -define(DS_SHARD, emqx_ds_shard). -record(session, { + %% same as clientid id :: emqx_ds:session_id(), - iterators :: #{emqx_topic:words() => emqx_ds:iterator_id()} + %% for future usage + props = #{} :: map() }). -record(iterator_ref, { diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 9bc7924e8..adede5322 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -19,6 +19,7 @@ discard_iterator/2, is_iterator_present/2, discard_iterator_prefix/2, + list_iterator_prefix/2, foldl_iterator_prefix/4 ]). @@ -203,7 +204,17 @@ discard_iterator(Shard, ReplayID) -> -spec discard_iterator_prefix(emqx_ds:shard(), binary()) -> ok | {error, _TODO}. discard_iterator_prefix(Shard, KeyPrefix) -> - do_discard_iterator_prefix(Shard, KeyPrefix). + case do_discard_iterator_prefix(Shard, KeyPrefix) of + {ok, _} -> ok; + Error -> Error + end. + +-spec list_iterator_prefix( + emqx_ds:shard(), + binary() +) -> {ok, [emqx_ds:iterator_id()]} | {error, _TODO}. +list_iterator_prefix(Shard, KeyPrefix) -> + do_list_iterator_prefix(Shard, KeyPrefix). -spec foldl_iterator_prefix( emqx_ds:shard(), @@ -377,7 +388,11 @@ open_restore_iterator(#{module := Mod, data := Data}, It = #it{replay = Replay}, %% --define(KEY_REPLAY_STATE(ReplayID), <<(ReplayID)/binary, "rs">>). +-define(KEY_REPLAY_STATE(IteratorId), <<(IteratorId)/binary, "rs">>). +-define(KEY_REPLAY_STATE_PAT(KeyReplayState), begin + <> = (KeyReplayState), + IteratorId +end). -define(ITERATION_WRITE_OPTS, []). -define(ITERATION_READ_OPTS, []). @@ -424,6 +439,13 @@ restore_iterator_state( It = #it{shard = Shard, gen = Gen, replay = {TopicFilter, StartTime}}, open_restore_iterator(meta_get_gen(Shard, Gen), It, State). +do_list_iterator_prefix(Shard, KeyPrefix) -> + Fn = fun(K0, _V, Acc) -> + K = ?KEY_REPLAY_STATE_PAT(K0), + [K | Acc] + end, + do_foldl_iterator_prefix(Shard, KeyPrefix, Fn, []). + do_discard_iterator_prefix(Shard, KeyPrefix) -> #db{handle = DBHandle, cf_iterator = CF} = meta_lookup(Shard, db), Fn = fun(K, _V, _Acc) -> ok = rocksdb:delete(DBHandle, CF, K, ?ITERATION_WRITE_OPTS) end, diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 8a2d18c0d..73eb28d85 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -169,7 +169,7 @@ t_session_subscription_idempotency(Config) -> %% Exactly one iterator should have been opened. ?assertMatch({ok, [_]}, get_all_iterator_ids(Node1)), ?assertMatch( - {_IsNew = false, ClientId, _}, + {_IsNew = false, ClientId}, erpc:call(Node1, emqx_ds, session_open, [ClientId]) ), ok From c74abe79d072bb1a9309e6c812434a21cbab86b1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 15 Aug 2023 16:07:30 -0300 Subject: [PATCH 10/23] refactor: reduce arity --- apps/emqx/src/emqx_cm.erl | 3 +-- apps/emqx/src/emqx_session.erl | 7 ++++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index ae6efb89c..2cc2b72b4 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -301,9 +301,8 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> emqx_cm_locker:trans(ClientId, ResumeStart). create_session(ClientInfo, ConnInfo) -> - #{clientid := ClientId} = ClientInfo, Options = get_session_confs(ClientInfo, ConnInfo), - Session = emqx_session:init_and_open(ClientId, Options), + Session = emqx_session:init_and_open(Options), ok = emqx_metrics:inc('session.created'), ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 32c98290a..0c051f002 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -60,7 +60,7 @@ unpersist/1 ]). --export([init/1, init_and_open/2]). +-export([init/1, init_and_open/1]). -export([ info/1, @@ -166,8 +166,9 @@ %% Init a Session %%-------------------------------------------------------------------- --spec init_and_open(emqx_types:clientid(), options()) -> session(). -init_and_open(ClientID, Options) -> +-spec init_and_open(options()) -> session(). +init_and_open(Options) -> + #{clientid := ClientID} = Options, Session0 = emqx_session:init(Options), _ = emqx_persistent_session_ds:open_session(ClientID), Session0. From a15405a800ed1406ffb2e82cc2f0ef43e1d9fb34 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 15 Aug 2023 16:17:37 -0300 Subject: [PATCH 11/23] test: fix assertions --- apps/emqx/test/emqx_persistent_messages_SUITE.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 7ca1f3f15..9d814aad6 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -132,19 +132,18 @@ t_session_subscription_iterators(Config) -> ct:pal("publishing 1"), Message1 = emqx_message:make(Topic, Payload1), publish(Node1, Message1), - receive_messages(1), ct:pal("subscribing 1"), {ok, _, [2]} = emqtt:subscribe(Client, SubTopicFilter, qos2), ct:pal("publishing 2"), Message2 = emqx_message:make(Topic, Payload2), publish(Node1, Message2), - receive_messages(1), + [_] = receive_messages(1), ct:pal("subscribing 2"), {ok, _, [1]} = emqtt:subscribe(Client, SubTopicFilter, qos1), ct:pal("publishing 3"), Message3 = emqx_message:make(Topic, Payload3), publish(Node1, Message3), - receive_messages(1), + [_] = receive_messages(1), ct:pal("publishing 4"), Message4 = emqx_message:make(AnotherTopic, Payload4), publish(Node1, Message4), From 6de0bbe76a6611c52d5b59204ce718c03ca1c135 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 15 Aug 2023 16:20:36 -0300 Subject: [PATCH 12/23] test(refactor): always allocate listeners for emqx app --- apps/emqx/test/emqx_cth_cluster.erl | 2 +- apps/emqx/test/emqx_persistent_messages_SUITE.erl | 1 - apps/emqx_durable_storage/test/emqx_ds_SUITE.erl | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index cbbad0aa2..1a83056cb 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -245,7 +245,7 @@ default_appspec(emqx_conf, Spec, _NodeSpecs) -> listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec) } }; -default_appspec(emqx, Spec = #{listeners := true}, _NodeSpecs) -> +default_appspec(emqx, Spec, _NodeSpecs) -> #{config => #{listeners => allocate_listener_ports([tcp, ssl, ws, wss], Spec)}}; default_appspec(_App, _, _) -> #{}. diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 9d814aad6..81cc3dade 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -266,7 +266,6 @@ cluster() -> Spec = #{ role => core, join_to => emqx_cth_cluster:node_name(Node1), - listeners => true, apps => app_specs() }, [ diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index 73eb28d85..c79856fc7 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -57,7 +57,6 @@ cluster(#{n := N}) -> Spec = #{ role => core, join_to => emqx_cth_cluster:node_name(Node1), - listeners => true, apps => app_specs() }, [ From e8c73b06e14c256a2f29eaf6536f37ee6877c99e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 15 Aug 2023 16:28:42 -0300 Subject: [PATCH 13/23] docs: add comment about future test failure --- apps/emqx/test/emqx_persistent_messages_SUITE.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 81cc3dade..e055ce794 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -183,6 +183,8 @@ t_session_subscription_iterators(Config) -> ?assertMatch({ok, [IteratorId]}, get_all_iterator_ids(Node2)), ReplayMessages1 = erpc:call(Node1, fun() -> consume(?DS_SHARD, IteratorId) end), ExpectedMessages = [Message2, Message3], + %% Note: it is expected that this will break after replayers are in place. + %% They might have consumed all the messages by this time. ?assertEqual(ExpectedMessages, ReplayMessages1), %% Different DS shard ReplayMessages2 = erpc:call(Node2, fun() -> consume(?DS_SHARD, IteratorId) end), From c46b8de9382454a91fdb0beeb71f0efb4dc9d77f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 16 Aug 2023 15:50:01 -0300 Subject: [PATCH 14/23] test: remove unused things, refactor some functions --- .../test/emqx_persistent_messages_SUITE.erl | 14 ++---------- .../test/emqx_persistent_session_SUITE.erl | 22 ------------------- 2 files changed, 2 insertions(+), 34 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index e055ce794..dbd4df0ae 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -23,18 +23,12 @@ -compile(export_all). -compile(nowarn_export_all). --define(NOW, - (calendar:system_time_to_rfc3339(erlang:system_time(millisecond), [{unit, millisecond}])) -). -define(DS_SHARD, <<"local">>). all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - %% avoid inter-suite flakiness... - application:stop(emqx), - application:stop(emqx_durable_storage), TCApps = emqx_cth_suite:start( app_specs(), #{work_dir => ?config(priv_dir, Config)} @@ -232,12 +226,8 @@ receive_messages(0, Msgs) -> receive_messages(Count, Msgs) -> receive {publish, Msg} -> - receive_messages(Count - 1, [Msg | Msgs]); - {deliver, _Topic, Msg} -> - receive_messages(Count - 1, [Msg | Msgs]); - _Other -> - receive_messages(Count, Msgs) - after 5000 -> + receive_messages(Count - 1, [Msg | Msgs]) + after 5_000 -> Msgs end. diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index d8736b918..07cfabc70 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -267,8 +267,6 @@ receive_messages(Count, Msgs) -> receive {publish, Msg} -> receive_messages(Count - 1, [Msg | Msgs]); - {deliver, _Topic, Msg} -> - receive_messages(Count - 1, [Msg | Msgs]); _Other -> receive_messages(Count, Msgs) after 5000 -> @@ -375,26 +373,6 @@ do_publish(Payloads = [_ | _], PublishFun, WaitForUnregister) -> do_publish(Payload, PublishFun, WaitForUnregister) -> do_publish([Payload], PublishFun, WaitForUnregister). -get_replay_messages(ReplayID) -> - DSShard = <<"local">>, - case emqx_ds_storage_layer:restore_iterator(DSShard, ReplayID) of - {ok, It} -> - do_get_replay_messages(It, []); - Error -> - error({"error restoring iterator", #{error => Error, replay_id => ReplayID}}) - end. - -do_get_replay_messages(It, Acc) -> - case emqx_ds_storage_layer:next(It) of - {value, Val, NewIt} -> - Msg = erlang:binary_to_term(Val), - do_get_replay_messages(NewIt, [Msg | Acc]); - none -> - {ok, lists:reverse(Acc)}; - {error, Reason} -> - {error, Reason} - end. - %%-------------------------------------------------------------------- %% Test Cases %%-------------------------------------------------------------------- From 3344bfb0bd74c5fa9542a54e278f72eb39424cdf Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 16 Aug 2023 15:56:29 -0300 Subject: [PATCH 15/23] refactor: rm `emqx_ds_replay` --- apps/emqx_durable_storage/src/emqx_ds.erl | 9 +++++ .../src/emqx_ds_message_storage_bitmask.erl | 8 ++--- .../src/emqx_ds_replay.erl | 36 ------------------- .../src/emqx_ds_storage_layer.erl | 14 ++++---- .../emqx_ds_message_storage_bitmask_shim.erl | 2 +- 5 files changed, 21 insertions(+), 48 deletions(-) delete mode 100644 apps/emqx_durable_storage/src/emqx_ds_replay.erl diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 3cc7ca886..7ec9f3801 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -42,6 +42,8 @@ message_stats/0, message_store_opts/0, session_id/0, + replay/0, + replay_id/0, iterator_id/0, iterator/0, shard/0, @@ -80,6 +82,13 @@ %% use in emqx_guid. Otherwise, the iterators won't match the message timestamps. -type time() :: non_neg_integer(). +-type replay_id() :: binary(). + +-type replay() :: { + _TopicFilter :: emqx_topic:words(), + _StartTime :: time() +}. + %%================================================================================ %% API funcions %%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl index 74a50c302..57608e5cb 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_message_storage_bitmask.erl @@ -277,13 +277,13 @@ store(DB = #db{handle = DBHandle, cf = CFHandle}, MessageID, PublishedAt, Topic, Value = make_message_value(Topic, MessagePayload), rocksdb:put(DBHandle, CFHandle, Key, Value, DB#db.write_options). --spec make_iterator(db(), emqx_ds_replay:replay()) -> +-spec make_iterator(db(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(DB, Replay) -> Options = emqx_ds_conf:shard_iteration_options(DB#db.shard), make_iterator(DB, Replay, Options). --spec make_iterator(db(), emqx_ds_replay:replay(), iteration_options()) -> +-spec make_iterator(db(), emqx_ds:replay(), iteration_options()) -> % {error, invalid_start_time}? might just start from the beginning of time % and call it a day: client violated the contract anyway. {ok, iterator()} | {error, _TODO}. @@ -337,7 +337,7 @@ preserve_iterator(#it{cursor = Cursor}) -> }, term_to_binary(State). --spec restore_iterator(db(), emqx_ds_replay:replay(), binary()) -> +-spec restore_iterator(db(), emqx_ds:replay(), binary()) -> {ok, iterator()} | {error, _TODO}. restore_iterator(DB, Replay, Serial) when is_binary(Serial) -> State = binary_to_term(Serial), @@ -419,7 +419,7 @@ hash(Input, Bits) -> % at most 32 bits erlang:phash2(Input, 1 bsl Bits). --spec make_keyspace_filter(emqx_ds_replay:replay(), keymapper()) -> keyspace_filter(). +-spec make_keyspace_filter(emqx_ds:replay(), keymapper()) -> keyspace_filter(). make_keyspace_filter({TopicFilter, StartTime}, Keymapper) -> Bitstring = compute_bitstring(TopicFilter, StartTime, Keymapper), HashBitmask = compute_topic_bitmask(TopicFilter, Keymapper), diff --git a/apps/emqx_durable_storage/src/emqx_ds_replay.erl b/apps/emqx_durable_storage/src/emqx_ds_replay.erl deleted file mode 100644 index b9ffa32ac..000000000 --- a/apps/emqx_durable_storage/src/emqx_ds_replay.erl +++ /dev/null @@ -1,36 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- --module(emqx_ds_replay). - -%% API: --export([]). - --export_type([replay_id/0, replay/0]). - -%%================================================================================ -%% Type declarations -%%================================================================================ - --type replay_id() :: binary(). - --type replay() :: { - _TopicFilter :: emqx_ds:words(), - _StartTime :: emqx_ds:time() -}. - -%%================================================================================ -%% API funcions -%%================================================================================ - -%%================================================================================ -%% behaviour callbacks -%%================================================================================ - -%%================================================================================ -%% Internal exports -%%================================================================================ - -%%================================================================================ -%% Internal functions -%%================================================================================ diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index adede5322..69c0e008c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -71,7 +71,7 @@ -record(it, { shard :: emqx_ds:shard(), gen :: gen_id(), - replay :: emqx_ds_replay:replay(), + replay :: emqx_ds:replay(), module :: module(), data :: term() }). @@ -112,10 +112,10 @@ -callback store(_Schema, binary(), emqx_ds:time(), emqx_ds:topic(), binary()) -> ok | {error, _}. --callback make_iterator(_Schema, emqx_ds_replay:replay()) -> +-callback make_iterator(_Schema, emqx_ds:replay()) -> {ok, _It} | {error, _}. --callback restore_iterator(_Schema, emqx_ds_replay:replay(), binary()) -> {ok, _It} | {error, _}. +-callback restore_iterator(_Schema, emqx_ds:replay(), binary()) -> {ok, _It} | {error, _}. -callback preserve_iterator(_Schema, _It) -> term(). @@ -140,7 +140,7 @@ store(Shard, GUID, Time, Topic, Msg) -> {_GenId, #{module := Mod, data := Data}} = meta_lookup_gen(Shard, Time), Mod:store(Data, GUID, Time, Topic, Msg). --spec make_iterator(emqx_ds:shard(), emqx_ds_replay:replay()) -> +-spec make_iterator(emqx_ds:shard(), emqx_ds:replay()) -> {ok, iterator()} | {error, _TODO}. make_iterator(Shard, Replay = {_, StartTime}) -> {GenId, Gen} = meta_lookup_gen(Shard, StartTime), @@ -173,7 +173,7 @@ next(It = #it{module = Mod, data = ItData}) -> preserve_iterator(It = #it{}, IteratorID) -> iterator_put_state(IteratorID, It). --spec restore_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> +-spec restore_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> {ok, iterator()} | {error, _TODO}. restore_iterator(Shard, ReplayID) -> case iterator_get_state(Shard, ReplayID) of @@ -185,7 +185,7 @@ restore_iterator(Shard, ReplayID) -> Error end. --spec is_iterator_present(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> +-spec is_iterator_present(emqx_ds:shard(), emqx_ds:replay_id()) -> boolean(). is_iterator_present(Shard, ReplayID) -> %% TODO: use keyMayExist after added to wrapper? @@ -196,7 +196,7 @@ is_iterator_present(Shard, ReplayID) -> false end. --spec discard_iterator(emqx_ds:shard(), emqx_ds_replay:replay_id()) -> +-spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) -> ok | {error, _TODO}. discard_iterator(Shard, ReplayID) -> iterator_delete(Shard, ReplayID). diff --git a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl index 10431eb1a..e9daf2581 100644 --- a/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl +++ b/apps/emqx_durable_storage/test/props/emqx_ds_message_storage_bitmask_shim.erl @@ -31,7 +31,7 @@ store(Tab, MessageID, PublishedAt, Topic, Payload) -> true = ets:insert(Tab, {{PublishedAt, MessageID}, Topic, Payload}), ok. --spec iterate(t(), emqx_ds_replay:replay()) -> +-spec iterate(t(), emqx_ds:replay()) -> [binary()]. iterate(Tab, {TopicFilter, StartTime}) -> ets:foldr( From 65085d012b430d6192ecf5be4047ae713a39f65d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 16 Aug 2023 15:57:56 -0300 Subject: [PATCH 16/23] refactor: rename fn --- apps/emqx/src/emqx_persistent_session_ds.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 19e11b1a3..b862ef0d9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -102,18 +102,19 @@ add_subscription(TopicFilterBin, DSSessionID) -> ?tp_span( persistent_session_ds_open_iterators, Ctx, - ok = open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) + ok = open_iterator_on_all_shards(TopicFilter, StartMS, IteratorID) ), {ok, IteratorID, IsNew} end ). --spec open_iterator_on_all_nodes(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. -open_iterator_on_all_nodes(TopicFilter, StartMS, IteratorID) -> +-spec open_iterator_on_all_shards(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. +open_iterator_on_all_shards(TopicFilter, StartMS, IteratorID) -> ?tp(persistent_session_ds_will_open_iterators, #{ iterator_id => IteratorID, start_time => StartMS }), + %% Note: currently, shards map 1:1 to nodes, but this will change in the future. Nodes = emqx:running_nodes(), Results = emqx_persistent_session_ds_proto_v1:open_iterator( Nodes, TopicFilter, StartMS, IteratorID From dbfacae283a81bc51c306fdf20b1d41015fa94c6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 16 Aug 2023 16:05:08 -0300 Subject: [PATCH 17/23] fix: reinstate transactions --- apps/emqx_durable_storage/src/emqx_ds.erl | 77 +++++++++++++---------- 1 file changed, 45 insertions(+), 32 deletions(-) diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 7ec9f3801..889e7ea24 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -136,20 +136,29 @@ message_stats() -> %% the broker. -spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}. session_open(ClientID) -> - case mnesia:dirty_read(?SESSION_TAB, ClientID) of - [#session{}] -> - {false, ClientID}; - [] -> - Session = #session{id = ClientID}, - mria:dirty_write(?SESSION_TAB, Session), - {true, ClientID} - end. + {atomic, Res} = + mria:transaction(?DS_SHARD, fun() -> + case mnesia:read(?SESSION_TAB, ClientID, write) of + [#session{}] -> + {false, ClientID}; + [] -> + Session = #session{id = ClientID}, + mnesia:write(?SESSION_TAB, Session, write), + {true, ClientID} + end + end), + Res. %% @doc Called when a client reconnects with `clean session=true' or %% during session GC -spec session_drop(emqx_types:clientid()) -> ok. session_drop(ClientID) -> - ok = mria:dirty_delete({?SESSION_TAB, ClientID}), + {atomic, ok} = mria:transaction( + ?DS_SHARD, + fun() -> + mnesia:delete({?SESSION_TAB, ClientID}) + end + ), ok. %% @doc Called when a client disconnects. This function terminates all @@ -164,29 +173,33 @@ session_suspend(_SessionId) -> {ok, iterator_id(), time(), _IsNew :: boolean()}. session_add_iterator(DSSessionId, TopicFilter) -> IteratorRefId = {DSSessionId, TopicFilter}, - case mnesia:dirty_read(?ITERATOR_REF_TAB, IteratorRefId) of - [] -> - {IteratorId, StartMS} = new_iterator_id(DSSessionId), - IteratorRef = #iterator_ref{ - ref_id = IteratorRefId, - it_id = IteratorId, - start_time = StartMS - }, - ok = mria:dirty_write(?ITERATOR_REF_TAB, IteratorRef), - ?tp( - ds_session_subscription_added, - #{iterator_id => IteratorId, session_id => DSSessionId} - ), - IsNew = true, - {ok, IteratorId, StartMS, IsNew}; - [#iterator_ref{it_id = IteratorId, start_time = StartMS}] -> - ?tp( - ds_session_subscription_present, - #{iterator_id => IteratorId, session_id => DSSessionId} - ), - IsNew = false, - {ok, IteratorId, StartMS, IsNew} - end. + {atomic, Res} = + mria:transaction(?DS_SHARD, fun() -> + case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of + [] -> + {IteratorId, StartMS} = new_iterator_id(DSSessionId), + IteratorRef = #iterator_ref{ + ref_id = IteratorRefId, + it_id = IteratorId, + start_time = StartMS + }, + ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write), + ?tp( + ds_session_subscription_added, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = true, + {ok, IteratorId, StartMS, IsNew}; + [#iterator_ref{it_id = IteratorId, start_time = StartMS}] -> + ?tp( + ds_session_subscription_present, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = false, + {ok, IteratorId, StartMS, IsNew} + end + end), + Res. %% @doc Called when a client unsubscribes from a topic. Returns `true' %% if the session contained the subscription or `false' if it wasn't From dad27091bea87062c6e2630bdbcdd3fcdc8c67b6 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 16 Aug 2023 17:43:33 -0300 Subject: [PATCH 18/23] test: rm custom option --- apps/emqx/test/emqx_cth_cluster.erl | 14 ++--- apps/emqx/test/emqx_cth_suite.erl | 2 - .../test/emqx_ds_SUITE.erl | 59 +++++++++++++++---- 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/apps/emqx/test/emqx_cth_cluster.erl b/apps/emqx/test/emqx_cth_cluster.erl index 1a83056cb..e24600181 100644 --- a/apps/emqx/test/emqx_cth_cluster.erl +++ b/apps/emqx/test/emqx_cth_cluster.erl @@ -20,7 +20,8 @@ -export([stop/1, stop_node/1]). -export([share_load_module/2]). --export([node_name/1]). +-export([node_name/1, mk_nodespecs/2]). +-export([start_apps/2, set_node_opts/2]). -define(APPS_CLUSTERING, [gen_rpc, mria, ekka]). @@ -80,12 +81,7 @@ when %% Working directory %% Everything a test produces should go here. Each node's stuff should go in its %% own directory. - work_dir := file:name(), - %% Usually, we want to ensure the node / test suite starts from a clean slate. - %% However, sometimes, we may want to test restarting a node. For such - %% situations, we need to disable this check to allow resuming from an existing - %% state. - skip_clean_suite_state_check => boolean() + work_dir := file:name() }. start(Nodes, ClusterOpts) -> NodeSpecs = mk_nodespecs(Nodes, ClusterOpts), @@ -129,14 +125,12 @@ mk_init_nodespec(N, Name, NodeOpts, ClusterOpts) -> Node = node_name(Name), BasePort = base_port(N), WorkDir = maps:get(work_dir, ClusterOpts), - SkipCleanSuiteStateCheck = maps:get(skip_clean_suite_state_check, ClusterOpts, false), Defaults = #{ name => Node, role => core, apps => [], base_port => BasePort, work_dir => filename:join([WorkDir, Node]), - skip_clean_suite_state_check => SkipCleanSuiteStateCheck, driver => ct_slave }, maps:merge(Defaults, NodeOpts). @@ -307,7 +301,7 @@ start_apps(Node, #{apps := Apps} = Spec) -> ok. suite_opts(Spec) -> - maps:with([work_dir, skip_clean_suite_state_check], Spec). + maps:with([work_dir], Spec). maybe_join_cluster(_Node, #{role := replicant}) -> ok; diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index dbe9423da..9b3e58da4 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -358,8 +358,6 @@ stop_apps(Apps) -> %% -verify_clean_suite_state(#{skip_clean_suite_state_check := true}) -> - ok; verify_clean_suite_state(#{work_dir := WorkDir}) -> {ok, []} = file:list_dir(WorkDir), none = persistent_term:get(?EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY, none), diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl index c79856fc7..842782e35 100644 --- a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl +++ b/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl @@ -20,9 +20,6 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - %% avoid inter-suite flakiness... - application:stop(emqx), - application:stop(emqx_durable_storage), TCApps = emqx_cth_suite:start( app_specs(), #{work_dir => ?config(priv_dir, Config)} @@ -36,8 +33,16 @@ end_per_suite(Config) -> init_per_testcase(t_session_subscription_idempotency, Config) -> Cluster = cluster(#{n => 1}), - Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => ?config(priv_dir, Config)}), - [{cluster, Cluster}, {nodes, Nodes} | Config]; + ClusterOpts = #{work_dir => ?config(priv_dir, Config)}, + NodeSpecs = emqx_cth_cluster:mk_nodespecs(Cluster, ClusterOpts), + Nodes = emqx_cth_cluster:start(Cluster, ClusterOpts), + [ + {cluster, Cluster}, + {node_specs, NodeSpecs}, + {cluster_opts, ClusterOpts}, + {nodes, Nodes} + | Config + ]; init_per_testcase(_TestCase, Config) -> Config. @@ -92,12 +97,28 @@ get_all_iterator_ids(Node) -> emqx_ds_storage_layer:foldl_iterator_prefix(?DS_SHARD, <<>>, Fn, []) end). +wait_nodeup(Node) -> + ?retry( + _Sleep0 = 500, + _Attempts0 = 50, + pong = net_adm:ping(Node) + ). + +wait_gen_rpc_down(_NodeSpec = #{apps := Apps}) -> + #{override_env := Env} = proplists:get_value(gen_rpc, Apps), + Port = proplists:get_value(tcp_server_port, Env), + ?retry( + _Sleep0 = 500, + _Attempts0 = 50, + false = emqx_common_test_helpers:is_tcp_server_available("127.0.0.1", Port) + ). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ t_session_subscription_idempotency(Config) -> - Cluster = ?config(cluster, Config), + [Node1Spec | _] = ?config(node_specs, Config), [Node1] = ?config(nodes, Config), Port = get_mqtt_port(Node1, tcp), SubTopicFilter = <<"t/+">>, @@ -119,13 +140,25 @@ t_session_subscription_idempotency(Config) -> spawn_link(fun() -> ?tp(will_restart_node, #{}), - ct:pal("stopping node ~p", [Node1]), - ok = emqx_cth_cluster:stop_node(Node1), - ct:pal("stopped node ~p; restarting...", [Node1]), - [Node1] = emqx_cth_cluster:start(Cluster, #{ - work_dir => ?config(priv_dir, Config), - skip_clean_suite_state_check => true - }), + ct:pal("restarting node ~p", [Node1]), + true = monitor_node(Node1, true), + ok = erpc:call(Node1, init, restart, []), + receive + {nodedown, Node1} -> + ok + after 10_000 -> + ct:fail("node ~p didn't stop", [Node1]) + end, + ct:pal("waiting for nodeup ~p", [Node1]), + wait_nodeup(Node1), + wait_gen_rpc_down(Node1Spec), + ct:pal("restarting apps on ~p", [Node1]), + Apps = maps:get(apps, Node1Spec), + ok = erpc:call(Node1, emqx_cth_suite, load_apps, [Apps]), + _ = erpc:call(Node1, emqx_cth_suite, start_apps, [Apps, Node1Spec]), + %% have to re-inject this so that we may stop the node succesfully at the + %% end.... + ok = emqx_cth_cluster:set_node_opts(Node1, Node1Spec), ct:pal("node ~p restarted", [Node1]), ?tp(restarted_node, #{}), ok From c1f49abad226d933fae928fdfd5720937673c6f1 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 17 Aug 2023 14:54:57 -0300 Subject: [PATCH 19/23] test: fix inter-suite flakiness --- apps/emqx/test/emqx_crl_cache_SUITE.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/test/emqx_crl_cache_SUITE.erl b/apps/emqx/test/emqx_crl_cache_SUITE.erl index 6c6337038..248013ce9 100644 --- a/apps/emqx/test/emqx_crl_cache_SUITE.erl +++ b/apps/emqx/test/emqx_crl_cache_SUITE.erl @@ -41,6 +41,7 @@ init_per_suite(Config) -> Config. end_per_suite(_Config) -> + emqx_config:erase_all(), ok. init_per_testcase(TestCase, Config) when From ee2897e5dedfb671de6221bd498f33b35497885e Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 16 Aug 2023 17:54:49 -0300 Subject: [PATCH 20/23] test(refactor): move test to integration tests dir --- .../test => emqx/integration_test}/emqx_ds_SUITE.erl | 0 apps/emqx/test/emqx_cth_suite.erl | 8 +++++++- apps/emqx/test/emqx_persistent_messages_SUITE.erl | 7 ++++++- 3 files changed, 13 insertions(+), 2 deletions(-) rename apps/{emqx_durable_storage/test => emqx/integration_test}/emqx_ds_SUITE.erl (100%) diff --git a/apps/emqx_durable_storage/test/emqx_ds_SUITE.erl b/apps/emqx/integration_test/emqx_ds_SUITE.erl similarity index 100% rename from apps/emqx_durable_storage/test/emqx_ds_SUITE.erl rename to apps/emqx/integration_test/emqx_ds_SUITE.erl diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 9b3e58da4..80b3a578c 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -101,7 +101,13 @@ when %% function will raise an error. work_dir := file:name() }. -start(Apps, SuiteOpts = #{work_dir := WorkDir}) -> +start(Apps, SuiteOpts0 = #{work_dir := WorkDir0}) -> + %% when running CT on the whole app, it seems like `priv_dir` is the same on all + %% suites and leads to the "clean slate" verificatin to fail. + WorkDir = binary_to_list( + filename:join([WorkDir0, emqx_guid:to_hexstr(emqx_guid:gen())]) + ), + SuiteOpts = SuiteOpts0#{work_dir := WorkDir}, % 1. Prepare appspec instructions AppSpecs = [mk_appspec(App, SuiteOpts) || App <- Apps], % 2. Load every app so that stuff scanning attributes of loaded modules works diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index dbd4df0ae..bb2921de1 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -29,9 +29,14 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + %% avoid inter-suite flakiness... + %% TODO: remove after other suites start to use `emx_cth_suite' + application:stop(emqx), + application:stop(emqx_durable_storage), + WorkDir = ?config(priv_dir, Config), TCApps = emqx_cth_suite:start( app_specs(), - #{work_dir => ?config(priv_dir, Config)} + #{work_dir => WorkDir} ), [{tc_apps, TCApps} | Config]. From f007b4442670e6274fa438257fbff15484c373a5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 21 Aug 2023 10:58:03 -0300 Subject: [PATCH 21/23] fix(data_import): rm duplicate import call and fix test --- apps/emqx_management/src/emqx_mgmt_data_backup.erl | 1 - apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index b83a46903..b677da2b2 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -525,7 +525,6 @@ do_import_conf(RawConf, Opts) -> Errors = lists:foldr( fun(Module, ErrorsAcc) -> - Module:import_config(RawConf), case Module:import_config(RawConf) of {ok, #{changed := Changed}} -> maybe_print_changed(Changed, Opts), diff --git a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl index f9b9ef766..381862995 100644 --- a/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_data_backup_SUITE.erl @@ -440,8 +440,8 @@ create_test_tab(Attributes) -> apps_to_start() -> [ - {emqx_conf, "dashboard.listeners.http.bind = 0"}, {emqx, #{override_env => [{boot_modules, [broker, router]}]}}, + {emqx_conf, #{config => #{dashboard => #{listeners => #{http => #{bind => <<"0">>}}}}}}, emqx_psk, emqx_management, emqx_dashboard, From f15f59650d833f82e5f037688ef01c0ceb2e6edf Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 25 Aug 2023 13:49:33 -0300 Subject: [PATCH 22/23] test: rm obselete workaround code --- apps/emqx/test/emqx_persistent_messages_SUITE.erl | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index bb2921de1..db22b19e6 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -250,9 +250,6 @@ app_specs() -> [ emqx_durable_storage, {emqx, #{ - before_start => fun() -> - emqx_app:set_config_loader(?MODULE) - end, config => #{persistent_session_store => #{ds => true}}, override_env => [{boot_modules, [broker, listeners]}] }} From 33a0048155b66bc78bbdfac7502553caa701b279 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 25 Aug 2023 15:24:53 -0300 Subject: [PATCH 23/23] refactor: move logic to `ensure_iterator` --- apps/emqx/src/emqx_persistent_session_ds.erl | 12 +++------- .../src/emqx_ds_storage_layer.erl | 23 +++++++++++-------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b862ef0d9..dc615fd5b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -123,18 +123,12 @@ open_iterator_on_all_shards(TopicFilter, StartMS, IteratorID) -> true = lists:all(fun(Res) -> Res =:= {ok, ok} end, Results), ok. +%% RPC target. -spec do_open_iterator(emqx_topic:words(), emqx_ds:time(), emqx_ds:iterator_id()) -> ok. do_open_iterator(TopicFilter, StartMS, IteratorID) -> Replay = {TopicFilter, StartMS}, - case emqx_ds_storage_layer:is_iterator_present(?DS_SHARD, IteratorID) of - true -> - {ok, _It} = emqx_ds_storage_layer:restore_iterator(?DS_SHARD, IteratorID), - ok; - false -> - {ok, It} = emqx_ds_storage_layer:make_iterator(?DS_SHARD, Replay), - ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), - ok - end. + {ok, _It} = emqx_ds_storage_layer:ensure_iterator(?DS_SHARD, IteratorID, Replay), + ok. %% diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl index 69c0e008c..47c29e170 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_layer.erl @@ -17,7 +17,7 @@ preserve_iterator/2, restore_iterator/2, discard_iterator/2, - is_iterator_present/2, + ensure_iterator/3, discard_iterator_prefix/2, list_iterator_prefix/2, foldl_iterator_prefix/4 @@ -185,15 +185,18 @@ restore_iterator(Shard, ReplayID) -> Error end. --spec is_iterator_present(emqx_ds:shard(), emqx_ds:replay_id()) -> - boolean(). -is_iterator_present(Shard, ReplayID) -> - %% TODO: use keyMayExist after added to wrapper? - case iterator_get_state(Shard, ReplayID) of - {ok, _} -> - true; - _ -> - false +-spec ensure_iterator(emqx_ds:shard(), emqx_ds:iterator_id(), emqx_ds:replay()) -> + {ok, iterator()} | {error, _TODO}. +ensure_iterator(Shard, IteratorID, Replay = {_TopicFilter, _StartMS}) -> + case restore_iterator(Shard, IteratorID) of + {ok, It} -> + {ok, It}; + {error, not_found} -> + {ok, It} = make_iterator(Shard, Replay), + ok = emqx_ds_storage_layer:preserve_iterator(It, IteratorID), + {ok, It}; + Error -> + Error end. -spec discard_iterator(emqx_ds:shard(), emqx_ds:replay_id()) ->