From 88103c5f0ecd6dd2c1063e86f978e9cb9bf8d5cb Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 28 Nov 2023 19:13:14 +0300 Subject: [PATCH 01/16] refactor(session): pass config to `SessionImpl:open/3` as well * Anticipate that connection info may change during session takeover. * Avoid persisting session conf as part of persistent session state. --- apps/emqx/include/emqx_session_mem.hrl | 10 ++--- apps/emqx/src/emqx_persistent_session_ds.erl | 34 +++++++------- apps/emqx/src/emqx_session.erl | 28 +++++------- apps/emqx/src/emqx_session_mem.erl | 44 +++++++++++++------ apps/emqx/test/emqx_session_mem_SUITE.erl | 4 +- .../src/emqx_mqttsn_session.erl | 2 +- 6 files changed, 65 insertions(+), 57 deletions(-) diff --git a/apps/emqx/include/emqx_session_mem.hrl b/apps/emqx/include/emqx_session_mem.hrl index 9874a9018..ae155f766 100644 --- a/apps/emqx/include/emqx_session_mem.hrl +++ b/apps/emqx/include/emqx_session_mem.hrl @@ -26,9 +26,9 @@ %% Client’s Subscriptions. subscriptions :: map(), %% Max subscriptions allowed - max_subscriptions :: non_neg_integer() | infinity, + max_subscriptions = infinity :: non_neg_integer() | infinity, %% Upgrade QoS? - upgrade_qos :: boolean(), + upgrade_qos = false :: boolean(), %% Client <- Broker: QoS1/2 messages sent to the client but %% have not been unacked. inflight :: emqx_inflight:inflight(), @@ -40,14 +40,14 @@ %% Next packet id of the session next_pkt_id = 1 :: emqx_types:packet_id(), %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) - retry_interval :: timeout(), + retry_interval = 0 :: timeout(), %% Client -> Broker: QoS2 messages received from the client, but %% have not been completely acknowledged awaiting_rel :: map(), %% Maximum number of awaiting QoS2 messages allowed - max_awaiting_rel :: non_neg_integer() | infinity, + max_awaiting_rel = infinity :: non_neg_integer() | infinity, %% Awaiting PUBREL Timeout (Unit: millisecond) - await_rel_timeout :: timeout(), + await_rel_timeout = 0 :: timeout(), %% Created at created_at :: pos_integer() }). diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index b50ac8c64..5bf0a82a5 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -29,7 +29,7 @@ %% Session API -export([ create/3, - open/2, + open/3, destroy/1 ]). @@ -119,6 +119,8 @@ conninfo := emqx_types:conninfo(), %% Timers timer() => reference(), + %% Upgrade QoS? + upgrade_qos := boolean(), %% props := map() }. @@ -151,11 +153,12 @@ session(). create(#{clientid := ClientID}, ConnInfo, Conf) -> % TODO: expiration - ensure_timers(ensure_session(ClientID, ConnInfo, Conf)). + Session = ensure_timers(session_ensure_new(ClientID, ConnInfo)), + preserve_conf(ConnInfo, Conf, Session). --spec open(clientinfo(), conninfo()) -> +-spec open(clientinfo(), conninfo(), emqx_session:conf()) -> {_IsPresent :: true, session(), []} | false. -open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> +open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> %% NOTE %% The fact that we need to concern about discarding all live channels here %% is essentially a consequence of the in-memory session design, where we @@ -165,20 +168,16 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> ok = emqx_cm:discard_session(ClientID), case session_open(ClientID, ConnInfo) of Session0 = #{} -> - ReceiveMaximum = receive_maximum(ConnInfo), - Session = Session0#{receive_maximum => ReceiveMaximum}, + Session = preserve_conf(ConnInfo, Conf, Session0), {true, ensure_timers(Session), []}; false -> false end. -ensure_session(ClientID, ConnInfo, Conf) -> - Session = session_ensure_new(ClientID, ConnInfo, Conf), - ReceiveMaximum = receive_maximum(ConnInfo), +preserve_conf(ConnInfo, Conf, Session) -> Session#{ - conninfo => ConnInfo, - receive_maximum => ReceiveMaximum, - subscriptions => #{} + receive_maximum => receive_maximum(ConnInfo), + upgrade_qos => maps:get(upgrade_qos, Conf) }. -spec destroy(session() | clientinfo()) -> ok. @@ -644,25 +643,24 @@ session_open(SessionId, NewConnInfo) -> end end). --spec session_ensure_new(id(), emqx_types:conninfo(), _Props :: map()) -> +-spec session_ensure_new(id(), emqx_types:conninfo()) -> session(). -session_ensure_new(SessionId, ConnInfo, Props) -> +session_ensure_new(SessionId, ConnInfo) -> transaction(fun() -> ok = session_drop_subscriptions(SessionId), - Session = export_session(session_create(SessionId, ConnInfo, Props)), + Session = export_session(session_create(SessionId, ConnInfo)), Session#{ subscriptions => #{}, inflight => emqx_persistent_message_ds_replayer:new() } end). -session_create(SessionId, ConnInfo, Props) -> +session_create(SessionId, ConnInfo) -> Session = #session{ id = SessionId, created_at = now_ms(), last_alive_at = now_ms(), - conninfo = ConnInfo, - props = Props + conninfo = ConnInfo }, ok = mnesia:write(?SESSION_TAB, Session, write), Session. diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index f73ac812e..911bccb0a 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -102,7 +102,7 @@ -export([should_keep/1]). % Tests only --export([get_session_conf/2]). +-export([get_session_conf/1]). -export_type([ t/0, @@ -137,8 +137,6 @@ -type conf() :: #{ %% Max subscriptions allowed max_subscriptions := non_neg_integer() | infinity, - %% Max inflight messages allowed - max_inflight := non_neg_integer(), %% Maximum number of awaiting QoS2 messages allowed max_awaiting_rel := non_neg_integer() | infinity, %% Upgrade QoS? @@ -171,7 +169,7 @@ -callback create(clientinfo(), conninfo(), conf()) -> t(). --callback open(clientinfo(), conninfo()) -> +-callback open(clientinfo(), conninfo(), conf()) -> {_IsPresent :: true, t(), _ReplayContext} | false. -callback destroy(t() | clientinfo()) -> ok. @@ -181,7 +179,7 @@ -spec create(clientinfo(), conninfo()) -> t(). create(ClientInfo, ConnInfo) -> - Conf = get_session_conf(ClientInfo, ConnInfo), + Conf = get_session_conf(ClientInfo), create(ClientInfo, ConnInfo, Conf). create(ClientInfo, ConnInfo, Conf) -> @@ -198,12 +196,12 @@ create(Mod, ClientInfo, ConnInfo, Conf) -> -spec open(clientinfo(), conninfo()) -> {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}. open(ClientInfo, ConnInfo) -> - Conf = get_session_conf(ClientInfo, ConnInfo), + Conf = get_session_conf(ClientInfo), Mods = [Default | _] = choose_impl_candidates(ConnInfo), %% NOTE %% Try to look the existing session up in session stores corresponding to the given %% `Mods` in order, starting from the last one. - case try_open(Mods, ClientInfo, ConnInfo) of + case try_open(Mods, ClientInfo, ConnInfo, Conf) of {_IsPresent = true, _, _} = Present -> Present; false -> @@ -212,24 +210,20 @@ open(ClientInfo, ConnInfo) -> {false, create(Default, ClientInfo, ConnInfo, Conf)} end. -try_open([Mod | Rest], ClientInfo, ConnInfo) -> - case try_open(Rest, ClientInfo, ConnInfo) of +try_open([Mod | Rest], ClientInfo, ConnInfo, Conf) -> + case try_open(Rest, ClientInfo, ConnInfo, Conf) of {_IsPresent = true, _, _} = Present -> Present; false -> - Mod:open(ClientInfo, ConnInfo) + Mod:open(ClientInfo, ConnInfo, Conf) end; -try_open([], _ClientInfo, _ConnInfo) -> +try_open([], _ClientInfo, _ConnInfo, _Conf) -> false. --spec get_session_conf(clientinfo(), conninfo()) -> conf(). -get_session_conf( - #{zone := Zone}, - #{receive_maximum := MaxInflight} -) -> +-spec get_session_conf(clientinfo()) -> conf(). +get_session_conf(_ClientInfo = #{zone := Zone}) -> #{ max_subscriptions => get_mqtt_conf(Zone, max_subscriptions), - max_inflight => MaxInflight, max_awaiting_rel => get_mqtt_conf(Zone, max_awaiting_rel), upgrade_qos => get_mqtt_conf(Zone, upgrade_qos), retry_interval => get_mqtt_conf(Zone, retry_interval), diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 178c71e12..1d626fdb0 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -59,7 +59,7 @@ -export([ create/3, - open/2, + open/3, destroy/1 ]). @@ -152,24 +152,24 @@ -spec create(clientinfo(), conninfo(), emqx_session:conf()) -> session(). -create(#{zone := Zone, clientid := ClientId}, #{expiry_interval := EI}, Conf) -> +create( + #{zone := Zone, clientid := ClientId}, + #{expiry_interval := EI, receive_maximum := ReceiveMax}, + Conf +) -> QueueOpts = get_mqueue_conf(Zone), - #session{ + Session = #session{ id = emqx_guid:gen(), clientid = ClientId, created_at = erlang:system_time(millisecond), is_persistent = EI > 0, subscriptions = #{}, - inflight = emqx_inflight:new(maps:get(max_inflight, Conf)), + inflight = emqx_inflight:new(ReceiveMax), mqueue = emqx_mqueue:init(QueueOpts), next_pkt_id = 1, - awaiting_rel = #{}, - max_subscriptions = maps:get(max_subscriptions, Conf), - max_awaiting_rel = maps:get(max_awaiting_rel, Conf), - upgrade_qos = maps:get(upgrade_qos, Conf), - retry_interval = maps:get(retry_interval, Conf), - await_rel_timeout = maps:get(await_rel_timeout, Conf) - }. + awaiting_rel = #{} + }, + preserve_conf(Conf, Session). get_mqueue_conf(Zone) -> #{ @@ -195,14 +195,16 @@ destroy(_Session) -> %% Open a (possibly existing) Session %%-------------------------------------------------------------------- --spec open(clientinfo(), conninfo()) -> +-spec open(clientinfo(), conninfo(), emqx_session:conf()) -> {_IsPresent :: true, session(), replayctx()} | _IsPresent :: false. -open(ClientInfo = #{clientid := ClientId}, _ConnInfo) -> +open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) -> case emqx_cm:takeover_session_begin(ClientId) of {ok, SessionRemote, TakeoverState} -> - Session = resume(ClientInfo, SessionRemote), + Session0 = resume(ClientInfo, SessionRemote), case emqx_cm:takeover_session_end(TakeoverState) of {ok, Pendings} -> + Session1 = resize_inflight(ConnInfo, Session0), + Session = preserve_conf(Conf, Session1), clean_session(ClientInfo, Session, Pendings); {error, _} -> % TODO log error? @@ -212,6 +214,20 @@ open(ClientInfo = #{clientid := ClientId}, _ConnInfo) -> false end. +resize_inflight(#{receive_maximum := ReceiveMax}, Session = #session{inflight = Inflight}) -> + Session#session{ + inflight = emqx_inflight:resize(ReceiveMax, Inflight) + }. + +preserve_conf(Conf, Session = #session{}) -> + Session#session{ + max_subscriptions = maps:get(max_subscriptions, Conf), + max_awaiting_rel = maps:get(max_awaiting_rel, Conf), + upgrade_qos = maps:get(upgrade_qos, Conf), + retry_interval = maps:get(retry_interval, Conf), + await_rel_timeout = maps:get(await_rel_timeout, Conf) + }. + clean_session(ClientInfo, Session = #session{mqueue = Q}, Pendings) -> Q1 = emqx_mqueue:filter(fun emqx_session:should_keep/1, Q), Session1 = Session#session{mqueue = Q1}, diff --git a/apps/emqx/test/emqx_session_mem_SUITE.erl b/apps/emqx/test/emqx_session_mem_SUITE.erl index 7f10635c1..20d622941 100644 --- a/apps/emqx/test/emqx_session_mem_SUITE.erl +++ b/apps/emqx/test/emqx_session_mem_SUITE.erl @@ -67,7 +67,7 @@ t_session_init(_) -> Session = emqx_session_mem:create( ClientInfo, ConnInfo, - emqx_session:get_session_conf(ClientInfo, ConnInfo) + emqx_session:get_session_conf(ClientInfo) ), ?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)), ?assertEqual(0, emqx_session_mem:info(subscriptions_cnt, Session)), @@ -531,7 +531,7 @@ session(InitFields) when is_map(InitFields) -> Session = emqx_session_mem:create( ClientInfo, ConnInfo, - emqx_session:get_session_conf(ClientInfo, ConnInfo) + emqx_session:get_session_conf(ClientInfo) ), maps:fold( fun(Field, Value, SessionAcc) -> diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl index 3621aa627..21f2c7e36 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_session.erl @@ -54,7 +54,7 @@ init(ClientInfo) -> ConnInfo = #{receive_maximum => 1, expiry_interval => 0}, - SessionConf = emqx_session:get_session_conf(ClientInfo, ConnInfo), + SessionConf = emqx_session:get_session_conf(ClientInfo), #{ registry => emqx_mqttsn_registry:init(), session => emqx_session_mem:create(ClientInfo, ConnInfo, SessionConf) From 508346f095b559adc4ff5983848c80f0b1cbfd3d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 28 Nov 2023 22:57:04 +0300 Subject: [PATCH 02/16] refactor(topicidx): split persistent term stuff off gbt-based index --- apps/emqx/src/emqx_topic_gbt.erl | 97 +++++++++++++---------- apps/emqx/src/emqx_topic_gbt_pterm.erl | 71 +++++++++++++++++ apps/emqx/test/emqx_topic_index_SUITE.erl | 2 +- 3 files changed, 128 insertions(+), 42 deletions(-) create mode 100644 apps/emqx/src/emqx_topic_gbt_pterm.erl diff --git a/apps/emqx/src/emqx_topic_gbt.erl b/apps/emqx/src/emqx_topic_gbt.erl index 063cba21d..6e9e7d2fc 100644 --- a/apps/emqx/src/emqx_topic_gbt.erl +++ b/apps/emqx/src/emqx_topic_gbt.erl @@ -14,14 +14,17 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Topic index implemetation with gb_trees stored in persistent_term. -%% This is only suitable for a static set of topic or topic-filters. +%% @doc Topic index implemetation with gb_trees as the underlying data +%% structure. -module(emqx_topic_gbt). --export([new/0, new/1]). +-export([new/0]). +-export([size/1]). -export([insert/4]). -export([delete/3]). +-export([lookup/4]). +-export([fold/3]). -export([match/2]). -export([matches/3]). @@ -29,53 +32,74 @@ -export([get_topic/1]). -export([get_record/2]). +-export_type([t/0, t/2, match/1]). + -type key(ID) :: emqx_trie_search:key(ID). -type words() :: emqx_trie_search:words(). -type match(ID) :: key(ID). --type name() :: any(). -%% @private Only for testing. --spec new() -> name(). -new() -> - new(test). +-opaque t(ID, Value) :: gb_trees:tree(key(ID), Value). +-opaque t() :: t(_ID, _Value). %% @doc Create a new gb_tree and store it in the persitent_term with the %% given name. --spec new(name()) -> name(). -new(Name) -> - T = gb_trees:from_orddict([]), - true = gbt_update(Name, T), - Name. +-spec new() -> t(). +new() -> + gb_trees:empty(). + +-spec size(t()) -> non_neg_integer(). +size(Gbt) -> + gb_trees:size(Gbt). %% @doc Insert a new entry into the index that associates given topic filter to given %% record ID, and attaches arbitrary record to the entry. This allows users to choose %% between regular and "materialized" indexes, for example. --spec insert(emqx_types:topic() | words(), _ID, _Record, name()) -> true. -insert(Filter, ID, Record, Name) -> - Tree = gbt(Name), +-spec insert(emqx_types:topic() | words(), _ID, _Record, t()) -> t(). +insert(Filter, ID, Record, Gbt) -> Key = key(Filter, ID), - NewTree = gb_trees:enter(Key, Record, Tree), - true = gbt_update(Name, NewTree). + gb_trees:enter(Key, Record, Gbt). %% @doc Delete an entry from the index that associates given topic filter to given %% record ID. Deleting non-existing entry is not an error. --spec delete(emqx_types:topic() | words(), _ID, name()) -> true. -delete(Filter, ID, Name) -> - Tree = gbt(Name), +-spec delete(emqx_types:topic() | words(), _ID, t()) -> t(). +delete(Filter, ID, Gbt) -> Key = key(Filter, ID), - NewTree = gb_trees:delete_any(Key, Tree), - true = gbt_update(Name, NewTree). + gb_trees:delete_any(Key, Gbt). + +-spec lookup(emqx_types:topic() | words(), _ID, t(), Default) -> _Record | Default. +lookup(Filter, ID, Gbt, Default) -> + Key = key(Filter, ID), + case gb_trees:lookup(Key, Gbt) of + {value, Record} -> + Record; + none -> + Default + end. + +-spec fold(fun((key(_ID), _Record, Acc) -> Acc), Acc, t()) -> Acc. +fold(Fun, Acc, Gbt) -> + Iter = gb_trees:iterator(Gbt), + fold_iter(Fun, Acc, Iter). + +fold_iter(Fun, Acc, Iter) -> + case gb_trees:next(Iter) of + {Key, Record, NIter} -> + fold_iter(Fun, Fun(Key, Record, Acc), NIter); + none -> + Acc + end. %% @doc Match given topic against the index and return the first match, or `false` if %% no match is found. --spec match(emqx_types:topic(), name()) -> match(_ID) | false. -match(Topic, Name) -> - emqx_trie_search:match(Topic, make_nextf(Name)). +-spec match(emqx_types:topic(), t()) -> match(_ID) | false. +match(Topic, Gbt) -> + emqx_trie_search:match(Topic, make_nextf(Gbt)). %% @doc Match given topic against the index and return _all_ matches. %% If `unique` option is given, return only unique matches by record ID. -matches(Topic, Name, Opts) -> - emqx_trie_search:matches(Topic, make_nextf(Name), Opts). +-spec matches(emqx_types:topic(), t(), emqx_trie_search:opts()) -> [match(_ID)]. +matches(Topic, Gbt, Opts) -> + emqx_trie_search:matches(Topic, make_nextf(Gbt), Opts). %% @doc Extract record ID from the match. -spec get_id(match(ID)) -> ID. @@ -88,21 +112,13 @@ get_topic(Key) -> emqx_trie_search:get_topic(Key). %% @doc Fetch the record associated with the match. --spec get_record(match(_ID), name()) -> _Record. -get_record(Key, Name) -> - Gbt = gbt(Name), +-spec get_record(match(_ID), t()) -> _Record. +get_record(Key, Gbt) -> gb_trees:get(Key, Gbt). key(TopicOrFilter, ID) -> emqx_trie_search:make_key(TopicOrFilter, ID). -gbt(Name) -> - persistent_term:get({?MODULE, Name}). - -gbt_update(Name, Tree) -> - persistent_term:put({?MODULE, Name}, Tree), - true. - gbt_next(nil, _Input) -> '$end_of_table'; gbt_next({P, _V, _Smaller, Bigger}, K) when K >= P -> @@ -115,6 +131,5 @@ gbt_next({P, _V, Smaller, _Bigger}, K) -> NextKey end. -make_nextf(Name) -> - {_SizeWeDontCare, TheTree} = gbt(Name), - fun(Key) -> gbt_next(TheTree, Key) end. +make_nextf({_Size, Tree}) -> + fun(Key) -> gbt_next(Tree, Key) end. diff --git a/apps/emqx/src/emqx_topic_gbt_pterm.erl b/apps/emqx/src/emqx_topic_gbt_pterm.erl new file mode 100644 index 000000000..4702fcb5f --- /dev/null +++ b/apps/emqx/src/emqx_topic_gbt_pterm.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Topic index implemetation with gb_tree as a persistent term. +%% This is only suitable for a static set of topic or topic-filters. + +-module(emqx_topic_gbt_pterm). + +-export([new/0, new/1]). +-export([insert/4]). +-export([delete/3]). +-export([match/2]). +-export([matches/3]). + +-export([get_record/2]). + +-type name() :: any(). +-type match(ID) :: emqx_topic_gbt:match(ID). + +%% @private Only for testing. +-spec new() -> name(). +new() -> + new(test). + +-spec new(name()) -> name(). +new(Name) -> + true = pterm_update(Name, emqx_topic_gbt:new()), + Name. + +-spec insert(emqx_types:topic() | emqx_trie_search:words(), _ID, _Record, name()) -> true. +insert(Filter, ID, Record, Name) -> + pterm_update(Name, emqx_topic_gbt:insert(Filter, ID, Record, pterm(Name))). + +-spec delete(emqx_types:topic() | emqx_trie_search:words(), _ID, name()) -> name(). +delete(Filter, ID, Name) -> + pterm_update(Name, emqx_topic_gbt:delete(Filter, ID, pterm(Name))). + +-spec match(emqx_types:topic(), name()) -> match(_ID) | false. +match(Topic, Name) -> + emqx_topic_gbt:match(Topic, pterm(Name)). + +-spec matches(emqx_types:topic(), name(), emqx_trie_search:opts()) -> [match(_ID)]. +matches(Topic, Name, Opts) -> + emqx_topic_gbt:matches(Topic, pterm(Name), Opts). + +%% @doc Fetch the record associated with the match. +-spec get_record(match(_ID), name()) -> _Record. +get_record(Key, Name) -> + emqx_topic_gbt:get_record(Key, pterm(Name)). + +%% + +pterm(Name) -> + persistent_term:get({?MODULE, Name}). + +pterm_update(Name, Tree) -> + persistent_term:put({?MODULE, Name}, Tree), + true. diff --git a/apps/emqx/test/emqx_topic_index_SUITE.erl b/apps/emqx/test/emqx_topic_index_SUITE.erl index 71e508306..a61edced6 100644 --- a/apps/emqx/test/emqx_topic_index_SUITE.erl +++ b/apps/emqx/test/emqx_topic_index_SUITE.erl @@ -40,7 +40,7 @@ groups() -> init_per_group(ets, Config) -> [{index_module, emqx_topic_index} | Config]; init_per_group(gb_tree, Config) -> - [{index_module, emqx_topic_gbt} | Config]. + [{index_module, emqx_topic_gbt_pterm} | Config]. end_per_group(_Group, _Config) -> ok. From b5f39f89e37567b9c590609e81981028f1811761 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 29 Nov 2023 18:03:45 +0300 Subject: [PATCH 03/16] feat(utils): add `flattermap/2` as slightly more generic `flatmap/2` --- apps/emqx_utils/src/emqx_utils.erl | 31 ++++++++++++ apps/emqx_utils/test/emqx_utils_SUITE.erl | 58 ++++++++++++++++++----- 2 files changed, 77 insertions(+), 12 deletions(-) diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index f827f65de..a5acfbb8a 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -60,6 +60,7 @@ safe_filename/1, diff_lists/3, merge_lists/3, + flattermap/2, tcp_keepalive_opts/4, format/1, format_mfal/1, @@ -999,6 +1000,36 @@ search(ExpectValue, KeyFunc, [Item | List]) -> false -> search(ExpectValue, KeyFunc, List) end. +%% @doc Maps over a term or a list of terms and flattens the result, giving back +%% again a term or a flat list of terms. It's similar to `lists:flatmap/2`, but +%% it works on a single term as well, both as input and `Fun` output (thus, the +%% wordplay on "flatter"). +%% The purpose of this function is to adapt to `Fun`s that return either a `[]` +%% or a term, and to avoid costs of list construction and flattening when dealing +%% with large lists. +-spec flattermap(Fun, FlatList) -> FlatList when + Fun :: fun((X) -> FlatList), + FlatList :: [X] | X. +flattermap(_Fun, []) -> + []; +flattermap(Fun, [X | Xs]) -> + flatcomb(Fun(X), flattermap(Fun, Xs)); +flattermap(Fun, X) -> + Fun(X). + +flatcomb([], Z) -> + Z; +flatcomb(Y, []) -> + Y; +flatcomb(Ys = [_ | _], Zs = [_ | _]) -> + Ys ++ Zs; +flatcomb(Ys = [_ | _], Z) -> + Ys ++ [Z]; +flatcomb(Y, Zs = [_ | _]) -> + [Y | Zs]; +flatcomb(Y, Z) -> + [Y, Z]. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index 12e99c917..154e065b1 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/asserts.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SOCKOPTS, [ @@ -87,13 +88,13 @@ t_pipeline(_) -> t_start_timer(_) -> TRef = emqx_utils:start_timer(1, tmsg), timer:sleep(2), - ?assertEqual([{timeout, TRef, tmsg}], drain()), + ?assertEqual([{timeout, TRef, tmsg}], ?drainMailbox()), ok = emqx_utils:cancel_timer(TRef). t_cancel_timer(_) -> Timer = emqx_utils:start_timer(0, foo), ok = emqx_utils:cancel_timer(Timer), - ?assertEqual([], drain()), + ?assertEqual([], ?drainMailbox()), ok = emqx_utils:cancel_timer(undefined). t_proc_name(_) -> @@ -153,16 +154,6 @@ t_check(_) -> emqx_utils:check_oom(Policy) ). -drain() -> - drain([]). - -drain(Acc) -> - receive - Msg -> drain([Msg | Acc]) - after 0 -> - lists:reverse(Acc) - end. - t_rand_seed(_) -> ?assert(is_tuple(emqx_utils:rand_seed())). @@ -240,3 +231,46 @@ t_pmap_late_reply(_) -> [] ), ok. + +t_flattermap(_) -> + ?assertEqual( + [42, 42], + emqx_utils:flattermap(fun duplicate/1, 42) + ), + ?assertEqual( + [1, 1, 2, 2, 3, 3], + emqx_utils:flattermap(fun duplicate/1, [1, 2, 3]) + ), + ?assertEqual( + [], + emqx_utils:flattermap(fun nil/1, 42) + ), + ?assertEqual( + [], + emqx_utils:flattermap(fun nil/1, [1, 2, 3]) + ), + ?assertEqual( + 42, + emqx_utils:flattermap(fun identity/1, [42]) + ), + ?assertEqual( + [1, 3, 5], + emqx_utils:flattermap( + fun(X) -> + case X rem 2 of + 0 -> []; + 1 -> X + end + end, + [1, 2, 3, 4, 5] + ) + ). + +duplicate(X) -> + [X, X]. + +nil(_) -> + []. + +identity(X) -> + X. From 3265d2f2aab3f467aa8b6dc79299b2714733824e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 29 Nov 2023 12:44:35 +0300 Subject: [PATCH 04/16] fix(sessds): respect subscription options when publishing --- .../emqx_persistent_session_ds_SUITE.erl | 18 +- .../emqx_persistent_message_ds_replayer.erl | 35 ++-- apps/emqx/src/emqx_persistent_session_ds.erl | 180 ++++++++++++------ apps/emqx/src/emqx_session.erl | 5 +- .../test/emqx_persistent_session_SUITE.erl | 5 +- 5 files changed, 158 insertions(+), 85 deletions(-) diff --git a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl index 5e1297df6..0bbe24ae4 100644 --- a/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl +++ b/apps/emqx/integration_test/emqx_persistent_session_ds_SUITE.erl @@ -262,10 +262,12 @@ t_session_subscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - ConnInfo = #{}, + Session = erpc:call( + Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}] + ), ?assertMatch( - #{subscriptions := #{SubTopicFilter := #{}}}, - erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]) + #{SubTopicFilter := #{}}, + emqx_session:info(subscriptions, Session) ) end ), @@ -336,10 +338,12 @@ t_session_unsubscription_idempotency(Config) -> end, fun(Trace) -> ct:pal("trace:\n ~p", [Trace]), - ConnInfo = #{}, - ?assertMatch( - #{subscriptions := Subs = #{}} when map_size(Subs) =:= 0, - erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]) + Session = erpc:call( + Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}] + ), + ?assertEqual( + #{}, + emqx_session:info(subscriptions, Session) ), ok end diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 2bd312561..32934492a 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -66,10 +66,9 @@ -opaque inflight() :: #inflight{}. --type reply_fun() :: fun( - (seqno(), emqx_types:message()) -> - emqx_session:replies() | {_AdvanceSeqno :: false, emqx_session:replies()} -). +-type replies() :: reply() | [replies()]. +-type reply() :: emqx_session:reply() | fun((emqx_types:packet_id()) -> emqx_session:replies()). +-type reply_fun() :: fun((seqno(), emqx_types:message()) -> replies()). %%================================================================================ %% API funcions @@ -422,26 +421,32 @@ get_commit_next(comp, #inflight{commits = Commits}) -> publish(ReplyFun, FirstSeqno, Messages) -> lists:mapfoldl( - fun(Message, {Seqno, TAcc}) -> - case ReplyFun(Seqno, Message) of - {_Advance = false, Reply} -> - {Reply, {Seqno, TAcc}}; - Reply -> - NextSeqno = next_seqno(Seqno), - NextTAcc = add_msg_track(Message, TAcc), - {Reply, {NextSeqno, NextTAcc}} - end + fun(Message, Acc = {Seqno, _Tracks}) -> + Reply = ReplyFun(Seqno, Message), + publish_reply(Reply, Acc) end, {FirstSeqno, 0}, Messages ). -add_msg_track(Message, Tracks) -> +publish_reply(Replies = [_ | _], Acc) -> + lists:mapfoldl(fun publish_reply/2, Acc, Replies); +publish_reply(Reply, {Seqno, Tracks}) when is_function(Reply) -> + Pub = Reply(seqno_to_packet_id(Seqno)), + NextSeqno = next_seqno(Seqno), + NextTracks = add_pub_track(Pub, Tracks), + {Pub, {NextSeqno, NextTracks}}; +publish_reply(Reply, Acc) -> + {Reply, Acc}. + +add_pub_track({PacketId, Message}, Tracks) when is_integer(PacketId) -> case emqx_message:qos(Message) of 1 -> ?TRACK_FLAG(?ACK) bor Tracks; 2 -> ?TRACK_FLAG(?COMP) bor Tracks; _ -> Tracks - end. + end; +add_pub_track(_Pub, Tracks) -> + Tracks. keep_next_iterator(ItNext, Range = #ds_pubrange{iterator = ItFirst, misc = Misc}) -> Range#ds_pubrange{ diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 5bf0a82a5..a25e4da3b 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -102,6 +102,8 @@ -define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). -type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. +-type subscriptions() :: emqx_topic_gbt:t(nil(), subscription()). + -type session() :: #{ %% Client ID id := id(), @@ -110,7 +112,7 @@ %% When the client was last considered alive last_alive_at := timestamp(), %% Client’s Subscriptions. - subscriptions := #{topic_filter() => subscription()}, + subscriptions := subscriptions(), %% Inflight messages inflight := emqx_persistent_message_ds_replayer:inflight(), %% Receive maximum @@ -119,8 +121,6 @@ conninfo := emqx_types:conninfo(), %% Timers timer() => reference(), - %% Upgrade QoS? - upgrade_qos := boolean(), %% props := map() }. @@ -177,7 +177,7 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> preserve_conf(ConnInfo, Conf, Session) -> Session#{ receive_maximum => receive_maximum(ConnInfo), - upgrade_qos => maps:get(upgrade_qos, Conf) + props => Conf }. -spec destroy(session() | clientinfo()) -> ok. @@ -203,10 +203,10 @@ info(created_at, #{created_at := CreatedAt}) -> CreatedAt; info(is_persistent, #{}) -> true; -info(subscriptions, #{subscriptions := Iters}) -> - maps:map(fun(_, #{props := SubOpts}) -> SubOpts end, Iters); -info(subscriptions_cnt, #{subscriptions := Iters}) -> - maps:size(Iters); +info(subscriptions, #{subscriptions := Subs}) -> + subs_to_map(Subs); +info(subscriptions_cnt, #{subscriptions := Subs}) -> + subs_size(Subs); info(subscriptions_max, #{props := Conf}) -> maps:get(max_subscriptions, Conf); info(upgrade_qos, #{props := Conf}) -> @@ -273,41 +273,40 @@ subscribe( TopicFilter, SubOpts, Session = #{id := ID, subscriptions := Subs} -) when is_map_key(TopicFilter, Subs) -> - Subscription = maps:get(TopicFilter, Subs), - NSubscription = update_subscription(TopicFilter, Subscription, SubOpts, ID), - {ok, Session#{subscriptions := Subs#{TopicFilter => NSubscription}}}; -subscribe( - TopicFilter, - SubOpts, - Session = #{id := ID, subscriptions := Subs} ) -> - % TODO: max_subscriptions - Subscription = add_subscription(TopicFilter, SubOpts, ID), - {ok, Session#{subscriptions := Subs#{TopicFilter => Subscription}}}. + case subs_lookup(TopicFilter, Subs) of + Subscription = #{} -> + NSubscription = update_subscription(TopicFilter, Subscription, SubOpts, ID), + NSubs = subs_insert(TopicFilter, NSubscription, Subs), + {ok, Session#{subscriptions := NSubs}}; + undefined -> + % TODO: max_subscriptions + Subscription = add_subscription(TopicFilter, SubOpts, ID), + NSubs = subs_insert(TopicFilter, Subscription, Subs), + {ok, Session#{subscriptions := NSubs}} + end. -spec unsubscribe(topic_filter(), session()) -> {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. unsubscribe( TopicFilter, Session = #{id := ID, subscriptions := Subs} -) when is_map_key(TopicFilter, Subs) -> - Subscription = maps:get(TopicFilter, Subs), - SubOpts = maps:get(props, Subscription), - ok = del_subscription(TopicFilter, ID), - {ok, Session#{subscriptions := maps:remove(TopicFilter, Subs)}, SubOpts}; -unsubscribe( - _TopicFilter, - _Session = #{} ) -> - {error, ?RC_NO_SUBSCRIPTION_EXISTED}. + case subs_lookup(TopicFilter, Subs) of + _Subscription = #{props := SubOpts} -> + ok = del_subscription(TopicFilter, ID), + NSubs = subs_delete(TopicFilter, Subs), + {ok, Session#{subscriptions := NSubs}, SubOpts}; + undefined -> + {error, ?RC_NO_SUBSCRIPTION_EXISTED} + end. -spec get_subscription(topic_filter(), session()) -> emqx_types:subopts() | undefined. get_subscription(TopicFilter, #{subscriptions := Subs}) -> - case maps:get(TopicFilter, Subs, undefined) of - Subscription = #{} -> - maps:get(props, Subscription); + case subs_lookup(TopicFilter, Subs) of + _Subscription = #{props := SubOpts} -> + SubOpts; undefined -> undefined end. @@ -328,9 +327,6 @@ publish(_PacketId, Msg, Session) -> %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- -%% FIXME: parts of the commit offset function are mocked --dialyzer({nowarn_function, puback/3}). - -spec puback(clientinfo(), emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}. @@ -402,20 +398,27 @@ deliver(_ClientInfo, _Delivers, Session) -> -spec handle_timeout(clientinfo(), _Timeout, session()) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. handle_timeout( - _ClientInfo, - ?TIMER_PULL, - Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} + ClientInfo, + pull, + Session0 = #{ + id := Id, + inflight := Inflight0, + subscriptions := Subs, + props := Conf, + receive_maximum := ReceiveMaximum + } ) -> MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]), BatchSize = min(ReceiveMaximum, MaxBatchSize), + UpgradeQoS = maps:get(upgrade_qos, Conf), + ReplyFun = make_reply_fun(ClientInfo, Subs, UpgradeQoS, fun + (_Seqno, Message = #message{qos = ?QOS_0}) -> + {undefined, Message}; + (_Seqno, Message) -> + fun(PacketId) -> {PacketId, Message} end + end), {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( - fun - (_Seqno, Message = #message{qos = ?QOS_0}) -> - {false, {undefined, Message}}; - (Seqno, Message) -> - PacketId = emqx_persistent_message_ds_replayer:seqno_to_packet_id(Seqno), - {PacketId, Message} - end, + ReplyFun, Id, Inflight0, BatchSize @@ -446,24 +449,27 @@ handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> -spec replay(clientinfo(), [], session()) -> {ok, replies(), session()}. -replay(_ClientInfo, [], Session = #{inflight := Inflight0}) -> +replay( + ClientInfo, + [], + Session = #{inflight := Inflight0, subscriptions := Subs, props := Conf} +) -> + UpgradeQoS = maps:get(upgrade_qos, Conf), AckedUntil = emqx_persistent_message_ds_replayer:committed_until(ack, Inflight0), RecUntil = emqx_persistent_message_ds_replayer:committed_until(rec, Inflight0), CompUntil = emqx_persistent_message_ds_replayer:committed_until(comp, Inflight0), - ReplyFun = fun + ReplyFun = make_reply_fun(ClientInfo, Subs, UpgradeQoS, fun (_Seqno, #message{qos = ?QOS_0}) -> - {false, []}; + []; (Seqno, #message{qos = ?QOS_1}) when Seqno < AckedUntil -> - []; + fun(_) -> [] end; (Seqno, #message{qos = ?QOS_2}) when Seqno < CompUntil -> - []; + fun(_) -> [] end; (Seqno, #message{qos = ?QOS_2}) when Seqno < RecUntil -> - PacketId = emqx_persistent_message_ds_replayer:seqno_to_packet_id(Seqno), - {pubrel, PacketId}; - (Seqno, Message) -> - PacketId = emqx_persistent_message_ds_replayer:seqno_to_packet_id(Seqno), - {PacketId, emqx_message:set_flag(dup, true, Message)} - end, + fun(PacketId) -> {pubrel, PacketId} end; + (_Seqno, Message) -> + fun(PacketId) -> {PacketId, emqx_message:set_flag(dup, true, Message)} end + end), {Replies, Inflight} = emqx_persistent_message_ds_replayer:replay(ReplyFun, Inflight0), {ok, Replies, Session#{inflight := Inflight}}. @@ -480,6 +486,25 @@ terminate(_Reason, _Session = #{}) -> %%-------------------------------------------------------------------- +make_reply_fun(ClientInfo, Subs, UpgradeQoS, InnerFun) -> + fun(Seqno, Message0 = #message{topic = Topic}) -> + emqx_utils:flattermap( + fun(Match) -> + emqx_utils:flattermap( + fun(Message) -> InnerFun(Seqno, Message) end, + enrich_message(ClientInfo, Message0, Match, Subs, UpgradeQoS) + ) + end, + subs_matches(Topic, Subs) + ) + end. + +enrich_message(ClientInfo, Message, SubMatch, Subs, UpgradeQoS) -> + #{props := SubOpts} = subs_get_match(SubMatch, Subs), + emqx_session:enrich_message(ClientInfo, Message, SubOpts, UpgradeQoS). + +%%-------------------------------------------------------------------- + -spec add_subscription(topic_filter(), emqx_types:subopts(), id()) -> subscription(). add_subscription(TopicFilter, SubOpts, DSSessionID) -> @@ -650,7 +675,7 @@ session_ensure_new(SessionId, ConnInfo) -> ok = session_drop_subscriptions(SessionId), Session = export_session(session_create(SessionId, ConnInfo)), Session#{ - subscriptions => #{}, + subscriptions => subs_new(), inflight => emqx_persistent_message_ds_replayer:new() } end). @@ -842,7 +867,7 @@ do_ensure_all_iterators_closed(_DSSessionID) -> renew_streams(#{id := SessionId, subscriptions := Subscriptions}) -> transaction(fun() -> ExistingStreams = mnesia:read(?SESSION_STREAM_TAB, SessionId, write), - maps:fold( + subs_fold( fun(TopicFilter, #{start_time := StartTime}, Streams) -> TopicFilterWords = emqx_topic:words(TopicFilter), renew_topic_streams(SessionId, TopicFilterWords, StartTime, Streams) @@ -924,6 +949,43 @@ session_drop_offsets(DSSessionId) -> %%-------------------------------------------------------------------------------- +subs_new() -> + emqx_topic_gbt:new(). + +subs_lookup(TopicFilter, Subs) -> + emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined). + +subs_insert(TopicFilter, Subscription, Subs) -> + emqx_topic_gbt:insert(TopicFilter, [], Subscription, Subs). + +subs_delete(TopicFilter, Subs) -> + emqx_topic_gbt:delete(TopicFilter, [], Subs). + +subs_matches(Topic, Subs) -> + emqx_topic_gbt:matches(Topic, Subs, []). + +subs_get_match(M, Subs) -> + emqx_topic_gbt:get_record(M, Subs). + +subs_size(Subs) -> + emqx_topic_gbt:size(Subs). + +subs_to_map(Subs) -> + subs_fold( + fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, + #{}, + Subs + ). + +subs_fold(Fun, AccIn, Subs) -> + emqx_topic_gbt:fold( + fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, + AccIn, + Subs + ). + +%%-------------------------------------------------------------------------------- + transaction(Fun) -> case mnesia:is_transaction() of true -> @@ -942,9 +1004,9 @@ ro_transaction(Fun) -> export_subscriptions(DSSubs) -> lists:foldl( fun(DSSub = #ds_sub{id = {_DSSessionId, TopicFilter}}, Acc) -> - Acc#{TopicFilter => export_subscription(DSSub)} + subs_insert(TopicFilter, export_subscription(DSSub), Acc) end, - #{}, + subs_new(), DSSubs ). diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 911bccb0a..bcf711a76 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -96,7 +96,10 @@ ]). % Foreign session implementations --export([enrich_delivers/3]). +-export([ + enrich_delivers/3, + enrich_message/4 +]). % Utilities -export([should_keep/1]). diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 66bb8dcf5..00a961d47 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -323,7 +323,8 @@ t_choose_impl(Config) -> ds -> emqx_persistent_session_ds end, emqx_connection:info({channel, {session, impl}}, sys:get_state(ChanPid)) - ). + ), + ok = emqtt:disconnect(Client). t_connect_discards_existing_client(Config) -> ClientId = ?config(client_id, Config), @@ -1009,8 +1010,6 @@ t_unsubscribe(Config) -> ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), ok = emqtt:disconnect(Client). -t_multiple_subscription_matches(init, Config) -> skip_ds_tc(Config); -t_multiple_subscription_matches('end', _Config) -> ok. t_multiple_subscription_matches(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config), From 1948719eecb58137bdf2a31f91f70912c99b73e0 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 29 Nov 2023 18:36:21 +0300 Subject: [PATCH 05/16] test(sessds): unskip testcase related to QoS2 handling --- apps/emqx/test/emqx_persistent_session_SUITE.erl | 3 --- 1 file changed, 3 deletions(-) diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 00a961d47..b25c1c299 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -390,9 +390,6 @@ t_connect_session_expiry_interval(Config) -> ok = emqtt:disconnect(Client2). %% [MQTT-3.1.2-23] -%% TODO: un-skip after QoS 2 support is implemented in DS. -t_connect_session_expiry_interval_qos2(init, Config) -> skip_ds_tc(Config); -t_connect_session_expiry_interval_qos2('end', _Config) -> ok. t_connect_session_expiry_interval_qos2(Config) -> ConnFun = ?config(conn_fun, Config), Topic = ?config(topic, Config), From b5ad0f98155c4c391b5383abd8dc9477b886ed1d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 29 Nov 2023 19:10:59 +0300 Subject: [PATCH 06/16] fix(test): subscribe with QoS 2 to receive QoS 2 message --- apps/emqx/test/emqx_persistent_messages_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_persistent_messages_SUITE.erl b/apps/emqx/test/emqx_persistent_messages_SUITE.erl index 80a83c0a4..010133643 100644 --- a/apps/emqx/test/emqx_persistent_messages_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_messages_SUITE.erl @@ -262,7 +262,7 @@ t_publish_as_persistent(_Config) -> Sub = connect(<>, true, 30), Pub = connect(<>, true, 30), try - {ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Sub, <<"t/#">>, qos2), Messages = [ {<<"t/1">>, <<"1">>, 0}, {<<"t/1">>, <<"2">>, 1}, From 6255ee083350b6889c71f8823db08fca0d87db6a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Dec 2023 11:06:00 +0300 Subject: [PATCH 07/16] fix(sessmem): avoid defining unsafe defaults in `#session` --- apps/emqx/include/emqx_session_mem.hrl | 8 ++++---- apps/emqx/src/emqx_session_mem.erl | 16 ++++++++++------ 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/apps/emqx/include/emqx_session_mem.hrl b/apps/emqx/include/emqx_session_mem.hrl index ae155f766..88901a541 100644 --- a/apps/emqx/include/emqx_session_mem.hrl +++ b/apps/emqx/include/emqx_session_mem.hrl @@ -26,7 +26,7 @@ %% Client’s Subscriptions. subscriptions :: map(), %% Max subscriptions allowed - max_subscriptions = infinity :: non_neg_integer() | infinity, + max_subscriptions :: non_neg_integer() | infinity, %% Upgrade QoS? upgrade_qos = false :: boolean(), %% Client <- Broker: QoS1/2 messages sent to the client but @@ -40,14 +40,14 @@ %% Next packet id of the session next_pkt_id = 1 :: emqx_types:packet_id(), %% Retry interval for redelivering QoS1/2 messages (Unit: millisecond) - retry_interval = 0 :: timeout(), + retry_interval :: timeout(), %% Client -> Broker: QoS2 messages received from the client, but %% have not been completely acknowledged awaiting_rel :: map(), %% Maximum number of awaiting QoS2 messages allowed - max_awaiting_rel = infinity :: non_neg_integer() | infinity, + max_awaiting_rel :: non_neg_integer() | infinity, %% Awaiting PUBREL Timeout (Unit: millisecond) - await_rel_timeout = 0 :: timeout(), + await_rel_timeout :: timeout(), %% Created at created_at :: pos_integer() }). diff --git a/apps/emqx/src/emqx_session_mem.erl b/apps/emqx/src/emqx_session_mem.erl index 1d626fdb0..c8affdaea 100644 --- a/apps/emqx/src/emqx_session_mem.erl +++ b/apps/emqx/src/emqx_session_mem.erl @@ -158,7 +158,7 @@ create( Conf ) -> QueueOpts = get_mqueue_conf(Zone), - Session = #session{ + #session{ id = emqx_guid:gen(), clientid = ClientId, created_at = erlang:system_time(millisecond), @@ -167,9 +167,13 @@ create( inflight = emqx_inflight:new(ReceiveMax), mqueue = emqx_mqueue:init(QueueOpts), next_pkt_id = 1, - awaiting_rel = #{} - }, - preserve_conf(Conf, Session). + awaiting_rel = #{}, + max_subscriptions = maps:get(max_subscriptions, Conf), + max_awaiting_rel = maps:get(max_awaiting_rel, Conf), + upgrade_qos = maps:get(upgrade_qos, Conf), + retry_interval = maps:get(retry_interval, Conf), + await_rel_timeout = maps:get(await_rel_timeout, Conf) + }. get_mqueue_conf(Zone) -> #{ @@ -204,7 +208,7 @@ open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) -> case emqx_cm:takeover_session_end(TakeoverState) of {ok, Pendings} -> Session1 = resize_inflight(ConnInfo, Session0), - Session = preserve_conf(Conf, Session1), + Session = apply_conf(Conf, Session1), clean_session(ClientInfo, Session, Pendings); {error, _} -> % TODO log error? @@ -219,7 +223,7 @@ resize_inflight(#{receive_maximum := ReceiveMax}, Session = #session{inflight = inflight = emqx_inflight:resize(ReceiveMax, Inflight) }. -preserve_conf(Conf, Session = #session{}) -> +apply_conf(Conf, Session = #session{}) -> Session#session{ max_subscriptions = maps:get(max_subscriptions, Conf), max_awaiting_rel = maps:get(max_awaiting_rel, Conf), From 29ec73847a7272286268a529123e7bd21d81af3d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Dec 2023 15:05:42 +0300 Subject: [PATCH 08/16] fix(utils): make `flattermap/2` results less variative --- apps/emqx_utils/src/emqx_utils.erl | 36 +++++++++-------------- apps/emqx_utils/test/emqx_utils_SUITE.erl | 25 ++++++++-------- 2 files changed, 27 insertions(+), 34 deletions(-) diff --git a/apps/emqx_utils/src/emqx_utils.erl b/apps/emqx_utils/src/emqx_utils.erl index a5acfbb8a..8d55af623 100644 --- a/apps/emqx_utils/src/emqx_utils.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -1000,35 +1000,27 @@ search(ExpectValue, KeyFunc, [Item | List]) -> false -> search(ExpectValue, KeyFunc, List) end. -%% @doc Maps over a term or a list of terms and flattens the result, giving back -%% again a term or a flat list of terms. It's similar to `lists:flatmap/2`, but -%% it works on a single term as well, both as input and `Fun` output (thus, the -%% wordplay on "flatter"). +%% @doc Maps over a list of terms and flattens the result, giving back a flat +%% list of terms. It's similar to `lists:flatmap/2`, but it also works on a +%% single term as `Fun` output (thus, the wordplay on "flatter"). %% The purpose of this function is to adapt to `Fun`s that return either a `[]` -%% or a term, and to avoid costs of list construction and flattening when dealing -%% with large lists. --spec flattermap(Fun, FlatList) -> FlatList when - Fun :: fun((X) -> FlatList), - FlatList :: [X] | X. +%% or a term, and to avoid costs of list construction and flattening when +%% dealing with large lists. +-spec flattermap(Fun, [X]) -> [X] when + Fun :: fun((X) -> [X] | X). flattermap(_Fun, []) -> []; flattermap(Fun, [X | Xs]) -> - flatcomb(Fun(X), flattermap(Fun, Xs)); -flattermap(Fun, X) -> - Fun(X). + flatcomb(Fun(X), flattermap(Fun, Xs)). -flatcomb([], Z) -> - Z; -flatcomb(Y, []) -> - Y; +flatcomb([], Zs) -> + Zs; +flatcomb(Ys = [_ | _], []) -> + Ys; flatcomb(Ys = [_ | _], Zs = [_ | _]) -> Ys ++ Zs; -flatcomb(Ys = [_ | _], Z) -> - Ys ++ [Z]; -flatcomb(Y, Zs = [_ | _]) -> - [Y | Zs]; -flatcomb(Y, Z) -> - [Y, Z]. +flatcomb(Y, Zs) -> + [Y | Zs]. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index 154e065b1..63f253805 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -233,33 +233,34 @@ t_pmap_late_reply(_) -> ok. t_flattermap(_) -> + ?assertEqual( + [42], + emqx_utils:flattermap(fun identity/1, [42]) + ), ?assertEqual( [42, 42], - emqx_utils:flattermap(fun duplicate/1, 42) + emqx_utils:flattermap(fun duplicate/1, [42]) + ), + ?assertEqual( + [], + emqx_utils:flattermap(fun nil/1, [42]) ), ?assertEqual( [1, 1, 2, 2, 3, 3], emqx_utils:flattermap(fun duplicate/1, [1, 2, 3]) ), - ?assertEqual( - [], - emqx_utils:flattermap(fun nil/1, 42) - ), ?assertEqual( [], emqx_utils:flattermap(fun nil/1, [1, 2, 3]) ), ?assertEqual( - 42, - emqx_utils:flattermap(fun identity/1, [42]) - ), - ?assertEqual( - [1, 3, 5], + [1, 2, 2, 4, 5, 5], emqx_utils:flattermap( fun(X) -> - case X rem 2 of + case X rem 3 of 0 -> []; - 1 -> X + 1 -> X; + 2 -> [X, X] end end, [1, 2, 3, 4, 5] From fd26e690b8be23290a69a0a35a61824de76ca970 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Dec 2023 15:23:58 +0300 Subject: [PATCH 09/16] refactor(sessds): move parts of message processing to replayer To simplify the processing flow, reducing the number of back-and-forth between the session and the replayer. --- .../emqx_persistent_message_ds_replayer.erl | 150 +++++++++++++----- apps/emqx/src/emqx_persistent_session_ds.erl | 52 ++---- 2 files changed, 125 insertions(+), 77 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl index 32934492a..723f02a01 100644 --- a/apps/emqx/src/emqx_persistent_message_ds_replayer.erl +++ b/apps/emqx/src/emqx_persistent_message_ds_replayer.erl @@ -33,6 +33,8 @@ -export_type([inflight/0, seqno/0]). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx_utils/include/emqx_message.hrl"). -include("emqx_persistent_session_ds.hrl"). -ifdef(TEST). @@ -46,6 +48,8 @@ -define(COMP, 1). -define(TRACK_FLAG(WHICH), (1 bsl WHICH)). +-define(TRACK_FLAGS_ALL, ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)). +-define(TRACK_FLAGS_NONE, 0). %%================================================================================ %% Type declarations @@ -66,9 +70,10 @@ -opaque inflight() :: #inflight{}. --type replies() :: reply() | [replies()]. --type reply() :: emqx_session:reply() | fun((emqx_types:packet_id()) -> emqx_session:replies()). --type reply_fun() :: fun((seqno(), emqx_types:message()) -> replies()). +-type message() :: emqx_types:message(). +-type replies() :: [emqx_session:reply()]. + +-type preproc_fun() :: fun((message()) -> message() | [message()]). %%================================================================================ %% API funcions @@ -115,11 +120,11 @@ n_inflight(#inflight{offset_ranges = Ranges}) -> Ranges ). --spec replay(reply_fun(), inflight()) -> {emqx_session:replies(), inflight()}. -replay(ReplyFun, Inflight0 = #inflight{offset_ranges = Ranges0}) -> +-spec replay(preproc_fun(), inflight()) -> {emqx_session:replies(), inflight()}. +replay(PreprocFunFun, Inflight0 = #inflight{offset_ranges = Ranges0, commits = Commits}) -> {Ranges, Replies} = lists:mapfoldr( fun(Range, Acc) -> - replay_range(ReplyFun, Range, Acc) + replay_range(PreprocFunFun, Commits, Range, Acc) end, [], Ranges0 @@ -165,9 +170,9 @@ commit_offset( {false, Inflight0} end. --spec poll(reply_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) -> +-spec poll(preproc_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) -> {emqx_session:replies(), inflight()}. -poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE -> +poll(PreprocFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE -> MinBatchSize = emqx_config:get([session_persistence, min_batch_size]), FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)), FreeSpace = WindowSize - n_inflight(Inflight0), @@ -181,7 +186,7 @@ poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize true -> %% TODO: Wrap this in `mria:async_dirty/2`? Streams = shuffle(get_streams(SessionId)), - fetch(ReplyFun, SessionId, Inflight0, Streams, FreeSpace, []) + fetch(PreprocFun, SessionId, Inflight0, Streams, FreeSpace, []) end. %% Which seqno this track is committed until. @@ -248,22 +253,22 @@ get_ranges(SessionId) -> ), mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read). -fetch(ReplyFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -> +fetch(PreprocFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -> #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0, ItBegin = get_last_iterator(DSStream, Ranges), {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N), case Messages of [] -> - fetch(ReplyFun, SessionId, Inflight0, Streams, N, Acc); + fetch(PreprocFun, SessionId, Inflight0, Streams, N, Acc); _ -> %% We need to preserve the iterator pointing to the beginning of the %% range, so that we can replay it if needed. - {Publishes, {UntilSeqno, Tracks}} = publish(ReplyFun, FirstSeqno, Messages), + {Publishes, UntilSeqno} = publish_fetch(PreprocFun, FirstSeqno, Messages), Size = range_size(FirstSeqno, UntilSeqno), Range0 = #ds_pubrange{ id = {SessionId, FirstSeqno}, type = ?T_INFLIGHT, - tracks = Tracks, + tracks = compute_pub_tracks(Publishes), until = UntilSeqno, stream = DSStream#ds_stream.ref, iterator = ItBegin @@ -277,7 +282,7 @@ fetch(ReplyFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 - next_seqno = UntilSeqno, offset_ranges = Ranges ++ [Range] }, - fetch(ReplyFun, SessionId, Inflight, Streams, N - Size, [Publishes | Acc]) + fetch(PreprocFun, SessionId, Inflight, Streams, N - Size, [Publishes | Acc]) end; fetch(_ReplyFun, _SessionId, Inflight, _Streams, _N, Acc) -> Publishes = lists:append(lists:reverse(Acc)), @@ -374,19 +379,20 @@ discard_tracks(#{ack := AckedUntil, comp := CompUntil}, Until, Tracks) -> TAck bor TComp. replay_range( - ReplyFun, + PreprocFun, + Commits, Range0 = #ds_pubrange{type = ?T_INFLIGHT, id = {_, First}, until = Until, iterator = It}, Acc ) -> Size = range_size(First, Until), {ok, ItNext, MessagesUnacked} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, Size), %% Asserting that range is consistent with the message storage state. - {Replies, {Until, _TracksInitial}} = publish(ReplyFun, First, MessagesUnacked), + {Replies, Until} = publish_replay(PreprocFun, Commits, First, MessagesUnacked), %% Again, we need to keep the iterator pointing past the end of the %% range, so that we can pick up where we left off. Range = keep_next_iterator(ItNext, Range0), {Range, Replies ++ Acc}; -replay_range(_ReplyFun, Range0 = #ds_pubrange{type = ?T_CHECKPOINT}, Acc) -> +replay_range(_PreprocFun, _Commits, Range0 = #ds_pubrange{type = ?T_CHECKPOINT}, Acc) -> {Range0, Acc}. validate_commit( @@ -419,33 +425,88 @@ get_commit_next(rec, #inflight{next_seqno = NextSeqno}) -> get_commit_next(comp, #inflight{commits = Commits}) -> maps:get(rec, Commits). -publish(ReplyFun, FirstSeqno, Messages) -> - lists:mapfoldl( - fun(Message, Acc = {Seqno, _Tracks}) -> - Reply = ReplyFun(Seqno, Message), - publish_reply(Reply, Acc) +publish_fetch(PreprocFun, FirstSeqno, Messages) -> + flatmapfoldl( + fun(MessageIn, Acc) -> + Message = PreprocFun(MessageIn), + publish_fetch(Message, Acc) end, - {FirstSeqno, 0}, + FirstSeqno, Messages ). -publish_reply(Replies = [_ | _], Acc) -> - lists:mapfoldl(fun publish_reply/2, Acc, Replies); -publish_reply(Reply, {Seqno, Tracks}) when is_function(Reply) -> - Pub = Reply(seqno_to_packet_id(Seqno)), - NextSeqno = next_seqno(Seqno), - NextTracks = add_pub_track(Pub, Tracks), - {Pub, {NextSeqno, NextTracks}}; -publish_reply(Reply, Acc) -> - {Reply, Acc}. +publish_fetch(#message{qos = ?QOS_0} = Message, Seqno) -> + {{undefined, Message}, Seqno}; +publish_fetch(#message{} = Message, Seqno) -> + PacketId = seqno_to_packet_id(Seqno), + {{PacketId, Message}, next_seqno(Seqno)}; +publish_fetch(Messages, Seqno) -> + flatmapfoldl(fun publish_fetch/2, Seqno, Messages). -add_pub_track({PacketId, Message}, Tracks) when is_integer(PacketId) -> - case emqx_message:qos(Message) of - 1 -> ?TRACK_FLAG(?ACK) bor Tracks; - 2 -> ?TRACK_FLAG(?COMP) bor Tracks; - _ -> Tracks +publish_replay(PreprocFun, Commits, FirstSeqno, Messages) -> + #{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits, + flatmapfoldl( + fun(MessageIn, Acc) -> + Message = PreprocFun(MessageIn), + publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc) + end, + FirstSeqno, + Messages + ). + +publish_replay(#message{qos = ?QOS_0}, _, _, _, Seqno) -> + %% QoS 0 (at most once) messages should not be replayed. + {[], Seqno}; +publish_replay(#message{qos = Qos} = Message, AckedUntil, CompUntil, RecUntil, Seqno) -> + case Qos of + ?QOS_1 when Seqno < AckedUntil -> + %% This message has already been acked, so we can skip it. + %% We still need to advance seqno, because previously we assigned this message + %% a unique Packet Id. + {[], next_seqno(Seqno)}; + ?QOS_2 when Seqno < CompUntil -> + %% This message's flow has already been fully completed, so we can skip it. + %% We still need to advance seqno, because previously we assigned this message + %% a unique Packet Id. + {[], next_seqno(Seqno)}; + ?QOS_2 when Seqno < RecUntil -> + %% This message's flow has been partially completed, we need to resend a PUBREL. + PacketId = seqno_to_packet_id(Seqno), + Pub = {pubrel, PacketId}, + {Pub, next_seqno(Seqno)}; + _ -> + %% This message flow hasn't been acked and/or received, we need to resend it. + PacketId = seqno_to_packet_id(Seqno), + Pub = {PacketId, emqx_message:set_flag(dup, true, Message)}, + {Pub, next_seqno(Seqno)} end; -add_pub_track(_Pub, Tracks) -> +publish_replay([], _, _, _, Seqno) -> + {[], Seqno}; +publish_replay(Messages, AckedUntil, CompUntil, RecUntil, Seqno) -> + flatmapfoldl( + fun(Message, Acc) -> + publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc) + end, + Seqno, + Messages + ). + +-spec compute_pub_tracks(replies()) -> non_neg_integer(). +compute_pub_tracks(Pubs) -> + compute_pub_tracks(Pubs, ?TRACK_FLAGS_NONE). + +compute_pub_tracks(_Pubs, Tracks = ?TRACK_FLAGS_ALL) -> + Tracks; +compute_pub_tracks([Pub | Rest], Tracks) -> + Track = + case Pub of + {_PacketId, #message{qos = ?QOS_1}} -> ?TRACK_FLAG(?ACK); + {_PacketId, #message{qos = ?QOS_2}} -> ?TRACK_FLAG(?COMP); + {pubrel, _PacketId} -> ?TRACK_FLAG(?COMP); + _ -> ?TRACK_FLAGS_NONE + end, + compute_pub_tracks(Rest, Track bor Tracks); +compute_pub_tracks([], Tracks) -> Tracks. keep_next_iterator(ItNext, Range = #ds_pubrange{iterator = ItFirst, misc = Misc}) -> @@ -550,6 +611,19 @@ shuffle(L0) -> {_, L} = lists:unzip(L2), L. +-spec flatmapfoldl(fun((X, Acc) -> {Y | [Y], Acc}), Acc, [X]) -> {[Y], Acc}. +flatmapfoldl(_Fun, Acc, []) -> + {[], Acc}; +flatmapfoldl(Fun, Acc, [X | Xs]) -> + {Ys, NAcc} = Fun(X, Acc), + {Zs, FAcc} = flatmapfoldl(Fun, NAcc, Xs), + case is_list(Ys) of + true -> + {Ys ++ Zs, FAcc}; + _ -> + {[Ys | Zs], FAcc} + end. + ro_transaction(Fun) -> {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun), Res. diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index a25e4da3b..bedd72a16 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -152,9 +152,8 @@ -spec create(clientinfo(), conninfo(), emqx_session:conf()) -> session(). create(#{clientid := ClientID}, ConnInfo, Conf) -> - % TODO: expiration - Session = ensure_timers(session_ensure_new(ClientID, ConnInfo)), - preserve_conf(ConnInfo, Conf, Session). + Session = session_ensure_new(ClientID, ConnInfo), + apply_conf(ConnInfo, Conf, ensure_timers(Session)). -spec open(clientinfo(), conninfo(), emqx_session:conf()) -> {_IsPresent :: true, session(), []} | false. @@ -168,13 +167,13 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) -> ok = emqx_cm:discard_session(ClientID), case session_open(ClientID, ConnInfo) of Session0 = #{} -> - Session = preserve_conf(ConnInfo, Conf, Session0), + Session = apply_conf(ConnInfo, Conf, Session0), {true, ensure_timers(Session), []}; false -> false end. -preserve_conf(ConnInfo, Conf, Session) -> +apply_conf(ConnInfo, Conf, Session) -> Session#{ receive_maximum => receive_maximum(ConnInfo), props => Conf @@ -399,7 +398,7 @@ deliver(_ClientInfo, _Delivers, Session) -> {ok, replies(), session()} | {ok, replies(), timeout(), session()}. handle_timeout( ClientInfo, - pull, + ?TIMER_PULL, Session0 = #{ id := Id, inflight := Inflight0, @@ -411,14 +410,9 @@ handle_timeout( MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]), BatchSize = min(ReceiveMaximum, MaxBatchSize), UpgradeQoS = maps:get(upgrade_qos, Conf), - ReplyFun = make_reply_fun(ClientInfo, Subs, UpgradeQoS, fun - (_Seqno, Message = #message{qos = ?QOS_0}) -> - {undefined, Message}; - (_Seqno, Message) -> - fun(PacketId) -> {PacketId, Message} end - end), + PreprocFun = make_preproc_fun(ClientInfo, Subs, UpgradeQoS), {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( - ReplyFun, + PreprocFun, Id, Inflight0, BatchSize @@ -455,22 +449,8 @@ replay( Session = #{inflight := Inflight0, subscriptions := Subs, props := Conf} ) -> UpgradeQoS = maps:get(upgrade_qos, Conf), - AckedUntil = emqx_persistent_message_ds_replayer:committed_until(ack, Inflight0), - RecUntil = emqx_persistent_message_ds_replayer:committed_until(rec, Inflight0), - CompUntil = emqx_persistent_message_ds_replayer:committed_until(comp, Inflight0), - ReplyFun = make_reply_fun(ClientInfo, Subs, UpgradeQoS, fun - (_Seqno, #message{qos = ?QOS_0}) -> - []; - (Seqno, #message{qos = ?QOS_1}) when Seqno < AckedUntil -> - fun(_) -> [] end; - (Seqno, #message{qos = ?QOS_2}) when Seqno < CompUntil -> - fun(_) -> [] end; - (Seqno, #message{qos = ?QOS_2}) when Seqno < RecUntil -> - fun(PacketId) -> {pubrel, PacketId} end; - (_Seqno, Message) -> - fun(PacketId) -> {PacketId, emqx_message:set_flag(dup, true, Message)} end - end), - {Replies, Inflight} = emqx_persistent_message_ds_replayer:replay(ReplyFun, Inflight0), + PreprocFun = make_preproc_fun(ClientInfo, Subs, UpgradeQoS), + {Replies, Inflight} = emqx_persistent_message_ds_replayer:replay(PreprocFun, Inflight0), {ok, Replies, Session#{inflight := Inflight}}. %%-------------------------------------------------------------------- @@ -486,23 +466,17 @@ terminate(_Reason, _Session = #{}) -> %%-------------------------------------------------------------------- -make_reply_fun(ClientInfo, Subs, UpgradeQoS, InnerFun) -> - fun(Seqno, Message0 = #message{topic = Topic}) -> +make_preproc_fun(ClientInfo, Subs, UpgradeQoS) -> + fun(Message = #message{topic = Topic}) -> emqx_utils:flattermap( fun(Match) -> - emqx_utils:flattermap( - fun(Message) -> InnerFun(Seqno, Message) end, - enrich_message(ClientInfo, Message0, Match, Subs, UpgradeQoS) - ) + #{props := SubOpts} = subs_get_match(Match, Subs), + emqx_session:enrich_message(ClientInfo, Message, SubOpts, UpgradeQoS) end, subs_matches(Topic, Subs) ) end. -enrich_message(ClientInfo, Message, SubMatch, Subs, UpgradeQoS) -> - #{props := SubOpts} = subs_get_match(SubMatch, Subs), - emqx_session:enrich_message(ClientInfo, Message, SubOpts, UpgradeQoS). - %%-------------------------------------------------------------------- -spec add_subscription(topic_filter(), emqx_types:subopts(), id()) -> From ee9a98f0a4a9f27ffbefb79fe9bf5071563e5dfa Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Dec 2023 16:04:34 +0300 Subject: [PATCH 10/16] chore: bump emqx_gateway_mqttsn to 0.1.7 --- apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src index 6049a7973..246566df1 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src +++ b/apps/emqx_gateway_mqttsn/src/emqx_gateway_mqttsn.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_mqttsn, [ {description, "MQTT-SN Gateway"}, - {vsn, "0.1.6"}, + {vsn, "0.1.7"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, From 47bc747323ed8279b9cd0cf1e7bfc321bfdb10d2 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Dec 2023 19:35:05 +0300 Subject: [PATCH 11/16] fix(sessds): drop everything related to session on CleanStart --- apps/emqx/src/emqx_persistent_session_ds.erl | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index bedd72a16..376081a70 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -646,7 +646,7 @@ session_open(SessionId, NewConnInfo) -> session(). session_ensure_new(SessionId, ConnInfo) -> transaction(fun() -> - ok = session_drop_subscriptions(SessionId), + ok = session_drop_records(SessionId), Session = export_session(session_create(SessionId, ConnInfo)), Session#{ subscriptions => subs_new(), @@ -693,13 +693,17 @@ session_set_last_alive_at(SessionRecord0, LastAliveAt) -> -spec session_drop(id()) -> ok. session_drop(DSSessionId) -> transaction(fun() -> - ok = session_drop_subscriptions(DSSessionId), - ok = session_drop_pubranges(DSSessionId), - ok = session_drop_offsets(DSSessionId), - ok = session_drop_streams(DSSessionId), + ok = session_drop_records(DSSessionId), ok = mnesia:delete(?SESSION_TAB, DSSessionId, write) end). +-spec session_drop_records(id()) -> ok. +session_drop_records(DSSessionId) -> + ok = session_drop_subscriptions(DSSessionId), + ok = session_drop_pubranges(DSSessionId), + ok = session_drop_offsets(DSSessionId), + ok = session_drop_streams(DSSessionId). + -spec session_drop_subscriptions(id()) -> ok. session_drop_subscriptions(DSSessionId) -> Subscriptions = session_read_subscriptions(DSSessionId, write), From 627c58b07d966f56dfa2a1e0270403c804a370ad Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 1 Dec 2023 19:36:20 +0300 Subject: [PATCH 12/16] fix(sessds): handle `expire_awaiting_rel` common timer with no-op Otherwise, the channel crashes when the timer is triggered. --- apps/emqx/src/emqx_persistent_session_ds.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 376081a70..37385c12c 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -438,8 +438,10 @@ handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) -> BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), EstimatedLastAliveAt = now_ms() + BumpInterval, Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), - BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), - {ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)}. + {ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)}; +handle_timeout(_ClientInfo, expire_awaiting_rel, Session) -> + %% TODO: stub + {ok, [], Session}. -spec replay(clientinfo(), [], session()) -> {ok, replies(), session()}. From 4ad7e850453deea006edfb08d3cf67fbe2368f58 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Tue, 5 Dec 2023 15:07:50 +0100 Subject: [PATCH 13/16] ci: run scheduled packages build of release-54 --- .github/workflows/build_packages_cron.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_packages_cron.yaml b/.github/workflows/build_packages_cron.yaml index 97fb9536c..d29f2d64a 100644 --- a/.github/workflows/build_packages_cron.yaml +++ b/.github/workflows/build_packages_cron.yaml @@ -21,7 +21,7 @@ jobs: matrix: profile: - ['emqx', 'master'] - - ['emqx-enterprise', 'release-53'] + - ['emqx-enterprise', 'release-54'] otp: - 25.3.2-2 arch: From e995b3948e8c64580f6c5429346fd86d55ddbf2c Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Tue, 5 Dec 2023 15:17:53 +0100 Subject: [PATCH 14/16] chore: bump emqx_gateway_coap version --- apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src index 10dd6efef..ba43cabc8 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway_coap, [ {description, "CoAP Gateway"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, From 6f5228e99122b4e7c7ca48ce550e18fc099fbf20 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 5 Dec 2023 19:12:53 +0100 Subject: [PATCH 15/16] test(emqx): switch select test suites to use `emqx_cth_suite` --- apps/emqx/test/emqx_mqtt_SUITE.erl | 10 +++--- apps/emqx/test/emqx_mqtt_caps_SUITE.erl | 9 +++-- .../emqx/test/emqx_mqtt_protocol_v5_SUITE.erl | 33 +++++-------------- apps/emqx/test/emqx_olp_SUITE.erl | 7 ++-- apps/emqx/test/emqx_os_mon_SUITE.erl | 9 +++-- 5 files changed, 24 insertions(+), 44 deletions(-) diff --git a/apps/emqx/test/emqx_mqtt_SUITE.erl b/apps/emqx/test/emqx_mqtt_SUITE.erl index f03c7af83..591a08e9a 100644 --- a/apps/emqx/test/emqx_mqtt_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_SUITE.erl @@ -19,7 +19,6 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -39,12 +38,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), - Config. + Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]). +end_per_suite(Config) -> + emqx_cth_suite:stop(proplists:get_value(apps, Config)). init_per_testcase(TestCase, Config) -> case erlang:function_exported(?MODULE, TestCase, 2) of diff --git a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl index e97684b74..8be5564b2 100644 --- a/apps/emqx/test/emqx_mqtt_caps_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_caps_SUITE.erl @@ -26,12 +26,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([]), - Config. + Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]), - ok. +end_per_suite(Config) -> + emqx_cth_suite:stop(proplists:get_value(apps, Config)). t_check_pub(_) -> OldConf = emqx:get_config([zones], #{}), diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index a2a2e5244..ff248a16a 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -19,7 +19,6 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -59,31 +58,17 @@ groups() -> ]. init_per_group(tcp, Config) -> - emqx_common_test_helpers:start_apps([]), - [{port, 1883}, {conn_fun, connect} | Config]; + Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}), + [{port, 1883}, {conn_fun, connect}, {group_apps, Apps} | Config]; init_per_group(quic, Config) -> - UdpPort = 1884, - emqx_common_test_helpers:start_apps([]), - emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort), - [{port, UdpPort}, {conn_fun, quic_connect} | Config]; -init_per_group(_, Config) -> - emqx_common_test_helpers:stop_apps([]), - Config. + Apps = emqx_cth_suite:start( + [{emqx, "listeners.quic.test { enable = true, bind = 1884 }"}], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{port, 1884}, {conn_fun, quic_connect}, {group_apps, Apps} | Config]. -end_per_group(quic, _Config) -> - emqx_config:put([listeners, quic], #{}), - ok; -end_per_group(_Group, _Config) -> - ok. - -init_per_suite(Config) -> - %% Start Apps - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), - Config. - -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]). +end_per_group(_Group, Config) -> + emqx_cth_suite:stop(?config(group_apps, Config)). init_per_testcase(TestCase, Config) -> case erlang:function_exported(?MODULE, TestCase, 2) of diff --git a/apps/emqx/test/emqx_olp_SUITE.erl b/apps/emqx/test/emqx_olp_SUITE.erl index cd8db7a8f..7389b259c 100644 --- a/apps/emqx/test/emqx_olp_SUITE.erl +++ b/apps/emqx/test/emqx_olp_SUITE.erl @@ -26,14 +26,13 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), + Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}), OldSch = erlang:system_flag(schedulers_online, 1), - [{old_sch, OldSch} | Config]. + [{apps, Apps}, {old_sch, OldSch} | Config]. end_per_suite(Config) -> erlang:system_flag(schedulers_online, ?config(old_sch, Config)), - emqx_common_test_helpers:stop_apps([]). + emqx_cth_suite:stop(?config(apps, Config)). init_per_testcase(_, Config) -> emqx_common_test_helpers:boot_modules(all), diff --git a/apps/emqx/test/emqx_os_mon_SUITE.erl b/apps/emqx/test/emqx_os_mon_SUITE.erl index 1833be48e..2d7558392 100644 --- a/apps/emqx/test/emqx_os_mon_SUITE.erl +++ b/apps/emqx/test/emqx_os_mon_SUITE.erl @@ -24,12 +24,11 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - emqx_common_test_helpers:boot_modules(all), - emqx_common_test_helpers:start_apps([]), - Config. + Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([]). +end_per_suite(Config) -> + emqx_cth_suite:stop(proplists:get_value(apps, Config)). init_per_testcase(t_cpu_check_alarm, Config) -> SysMon = emqx_config:get([sysmon, os], #{}), From 83bea2254d280ea88101d813cfd79167fd3bab11 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 5 Dec 2023 19:13:50 +0100 Subject: [PATCH 16/16] test(ocsp): switch test suite to use `emqx_cth_suite` And simplify it slightly in the process. --- apps/emqx/test/emqx_ocsp_cache_SUITE.erl | 204 +++++++----------- .../openssl_listeners.conf | 14 -- 2 files changed, 72 insertions(+), 146 deletions(-) delete mode 100644 apps/emqx/test/emqx_ocsp_cache_SUITE_data/openssl_listeners.conf diff --git a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl index aa5d78121..fce74785c 100644 --- a/apps/emqx/test/emqx_ocsp_cache_SUITE.erl +++ b/apps/emqx/test/emqx_ocsp_cache_SUITE.erl @@ -44,14 +44,33 @@ groups() -> ]. init_per_suite(Config) -> - application:load(emqx), - emqx_config:save_schema_mod_and_names(emqx_schema), - emqx_common_test_helpers:boot_modules(all), Config. end_per_suite(_Config) -> ok. +init_per_group(openssl, Config) -> + DataDir = ?config(data_dir, Config), + ListenerConf = #{ + bind => <<"0.0.0.0:8883">>, + max_connections => 512000, + ssl_options => #{ + keyfile => filename(DataDir, "server.key"), + certfile => filename(DataDir, "server.pem"), + cacertfile => filename(DataDir, "ca.pem"), + ocsp => #{ + enable_ocsp_stapling => true, + issuer_pem => filename(DataDir, "ocsp-issuer.pem"), + responder_url => <<"http://127.0.0.1:9877">> + } + } + }, + Conf = #{listeners => #{ssl => #{default => ListenerConf}}}, + Apps = emqx_cth_suite:start( + [{emqx, #{config => Conf}}], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{group_apps, Apps} | Config]; init_per_group(tls12, Config) -> [{tls_vsn, "-tls1_2"} | Config]; init_per_group(tls13, Config) -> @@ -63,24 +82,14 @@ init_per_group(without_status_request, Config) -> init_per_group(_Group, Config) -> Config. +end_per_group(openssl, Config) -> + emqx_cth_suite:stop(?config(group_apps, Config)); end_per_group(_Group, _Config) -> ok. init_per_testcase(t_openssl_client, Config) -> ct:timetrap({seconds, 30}), - DataDir = ?config(data_dir, Config), - Handler = fun(_) -> ok end, {OCSPResponderPort, OCSPOSPid} = setup_openssl_ocsp(Config), - ConfFilePath = filename:join([DataDir, "openssl_listeners.conf"]), - emqx_common_test_helpers:start_apps( - [], - Handler, - #{ - extra_mustache_vars => #{test_data_dir => DataDir}, - conf_file_path => ConfFilePath - } - ), - ct:sleep(1_000), [ {ocsp_responder_port, OCSPResponderPort}, {ocsp_responder_os_pid, OCSPOSPid} @@ -107,15 +116,25 @@ init_per_testcase(TestCase, Config) when {ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}} end ), - emqx_mgmt_api_test_util:init_suite([emqx_conf]), + Apps = emqx_cth_suite:start( + [ + emqx_conf, + emqx, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(TestCase, Config)} + ), + _ = emqx_common_test_http:create_default_app(), snabbkaffe:start_trace(), - Config; + [{tc_apps, Apps} | Config]; false -> [{skip_does_not_apply, true} | Config] end; -init_per_testcase(t_ocsp_responder_error_responses, Config) -> +init_per_testcase(TC, Config) -> ct:timetrap({seconds, 30}), TestPid = self(), + DataDir = ?config(data_dir, Config), ok = meck:new(emqx_ocsp_cache, [non_strict, passthrough, no_history, no_link]), meck:expect( emqx_ocsp_cache, @@ -123,90 +142,44 @@ init_per_testcase(t_ocsp_responder_error_responses, Config) -> fun(URL, _HTTPTimeout) -> ct:pal("ocsp http request ~p", [URL]), TestPid ! {http_get, URL}, - persistent_term:get({?MODULE, http_response}) + persistent_term:get( + {?MODULE, http_response}, + {ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}} + ) end ), - DataDir = ?config(data_dir, Config), - Type = ssl, - Name = test_ocsp, - ListenerOpts = #{ - ssl_options => - #{ - certfile => filename:join(DataDir, "server.pem"), - ocsp => #{ - enable_ocsp_stapling => true, - responder_url => <<"http://localhost:9877/">>, - issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"), - refresh_http_timeout => <<"15s">>, - refresh_interval => <<"1s">> - } - } - }, - Conf = #{listeners => #{Type => #{Name => ListenerOpts}}}, - ConfBin = emqx_utils_maps:binary_key_map(Conf), - CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{ - required => false, atom_keys => false - }), - Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf), - ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2), - emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2), - snabbkaffe:start_trace(), - _Heir = spawn_dummy_heir(), - {ok, CachePid} = emqx_ocsp_cache:start_link(), - [ - {cache_pid, CachePid} - | Config - ]; -init_per_testcase(_TestCase, Config) -> - ct:timetrap({seconds, 10}), - TestPid = self(), - ok = meck:new(emqx_ocsp_cache, [non_strict, passthrough, no_history, no_link]), - meck:expect( - emqx_ocsp_cache, - http_get, - fun(URL, _HTTPTimeout) -> - TestPid ! {http_get, URL}, - {ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}} - end - ), - snabbkaffe:start_trace(), - _Heir = spawn_dummy_heir(), - {ok, CachePid} = emqx_ocsp_cache:start_link(), - DataDir = ?config(data_dir, Config), - Type = ssl, - Name = test_ocsp, ResponderURL = <<"http://localhost:9877/">>, - ListenerOpts = #{ - ssl_options => - #{ - certfile => filename:join(DataDir, "server.pem"), - ocsp => #{ - enable_ocsp_stapling => true, - responder_url => ResponderURL, - issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"), - refresh_http_timeout => <<"15s">>, - refresh_interval => <<"1s">> - } + ListenerConf = #{ + enable => false, + bind => 0, + ssl_options => #{ + certfile => filename(DataDir, "server.pem"), + ocsp => #{ + enable_ocsp_stapling => true, + responder_url => ResponderURL, + issuer_pem => filename(DataDir, "ocsp-issuer.pem"), + refresh_http_timeout => <<"15s">>, + refresh_interval => <<"1s">> } + } }, - Conf = #{listeners => #{Type => #{Name => ListenerOpts}}}, - ConfBin = emqx_utils_maps:binary_key_map(Conf), - CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{ - required => false, atom_keys => false - }), - Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf), - ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2), - emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2), + Conf = #{listeners => #{ssl => #{test_ocsp => ListenerConf}}}, + Apps = emqx_cth_suite:start( + [{emqx, #{config => Conf}}], + #{work_dir => emqx_cth_suite:work_dir(TC, Config)} + ), + snabbkaffe:start_trace(), [ - {cache_pid, CachePid}, - {responder_url, ResponderURL} + {responder_url, ResponderURL}, + {tc_apps, Apps} | Config ]. +filename(Dir, Name) -> + unicode:characters_to_binary(filename:join(Dir, Name)). + end_per_testcase(t_openssl_client, Config) -> - OCSPResponderOSPid = ?config(ocsp_responder_os_pid, Config), - catch kill_pid(OCSPResponderOSPid), - emqx_common_test_helpers:stop_apps([]), + catch kill_pid(?config(ocsp_responder_os_pid, Config)), ok; end_per_testcase(TestCase, Config) when TestCase =:= t_update_listener; @@ -217,19 +190,12 @@ end_per_testcase(TestCase, Config) when true -> ok; false -> - emqx_mgmt_api_test_util:end_suite([emqx_conf]), - meck:unload([emqx_ocsp_cache]), - ok + end_per_testcase(common, Config) end; -end_per_testcase(t_ocsp_responder_error_responses, Config) -> - CachePid = ?config(cache_pid, Config), - catch gen_server:stop(CachePid), - meck:unload([emqx_ocsp_cache]), - persistent_term:erase({?MODULE, http_response}), - ok; end_per_testcase(_TestCase, Config) -> - CachePid = ?config(cache_pid, Config), - catch gen_server:stop(CachePid), + snabbkaffe:stop(), + emqx_cth_suite:stop(?config(tc_apps, Config)), + persistent_term:erase({?MODULE, http_response}), meck:unload([emqx_ocsp_cache]), ok. @@ -237,24 +203,6 @@ end_per_testcase(_TestCase, Config) -> %% Helper functions %%-------------------------------------------------------------------- -%% The real cache makes `emqx_kernel_sup' the heir to its ETS table. -%% In some tests, we don't start the full supervision tree, so we need -%% this dummy process. -spawn_dummy_heir() -> - {_, {ok, _}} = - ?wait_async_action( - spawn_link(fun() -> - true = register(emqx_kernel_sup, self()), - ?tp(heir_name_registered, #{}), - receive - stop -> ok - end - end), - #{?snk_kind := heir_name_registered}, - 1_000 - ), - ok. - does_module_exist(Mod) -> case erlang:module_loaded(Mod) of true -> @@ -416,11 +364,6 @@ do_ensure_port_open(Port, N) when N > 0 -> do_ensure_port_open(Port, N - 1) end. -get_sni_fun(ListenerID) -> - #{opts := Opts} = emqx_listeners:find_by_id(ListenerID), - SSLOpts = proplists:get_value(ssl_options, Opts), - proplists:get_value(sni_fun, SSLOpts). - openssl_version() -> Res0 = string:trim(os:cmd("openssl version"), trailing), [_, Res] = string:split(Res0, " "), @@ -516,9 +459,7 @@ t_request_ocsp_response(_Config) -> end ). -t_request_ocsp_response_restart_cache(Config) -> - process_flag(trap_exit, true), - CachePid = ?config(cache_pid, Config), +t_request_ocsp_response_restart_cache(_Config) -> ListenerID = <<"ssl:test_ocsp">>, ?check_trace( begin @@ -526,6 +467,7 @@ t_request_ocsp_response_restart_cache(Config) -> {ok, _} = emqx_ocsp_cache:fetch_response(ListenerID), ?wait_async_action( begin + CachePid = whereis(emqx_ocsp_cache), Ref = monitor(process, CachePid), exit(CachePid, kill), receive @@ -533,9 +475,7 @@ t_request_ocsp_response_restart_cache(Config) -> ok after 1_000 -> error(cache_not_killed) - end, - {ok, _} = emqx_ocsp_cache:start_link(), - ok + end end, #{?snk_kind := ocsp_cache_init} ), diff --git a/apps/emqx/test/emqx_ocsp_cache_SUITE_data/openssl_listeners.conf b/apps/emqx/test/emqx_ocsp_cache_SUITE_data/openssl_listeners.conf deleted file mode 100644 index d26e12acf..000000000 --- a/apps/emqx/test/emqx_ocsp_cache_SUITE_data/openssl_listeners.conf +++ /dev/null @@ -1,14 +0,0 @@ -listeners.ssl.default { - bind = "0.0.0.0:8883" - max_connections = 512000 - ssl_options { - keyfile = "{{ test_data_dir }}/server.key" - certfile = "{{ test_data_dir }}/server.pem" - cacertfile = "{{ test_data_dir }}/ca.pem" - ocsp { - enable_ocsp_stapling = true - issuer_pem = "{{ test_data_dir }}/ocsp-issuer.pem" - responder_url = "http://127.0.0.1:9877" - } - } -}