Merge pull request #12109 from keynslug/test/emqx-cth-suite

test(emqx): switch select test suites to use `emqx_cth_suite`
This commit is contained in:
Andrew Mayorov 2023-12-06 09:48:53 +01:00 committed by GitHub
commit 3798060543
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 599 additions and 413 deletions

View File

@ -21,7 +21,7 @@ jobs:
matrix: matrix:
profile: profile:
- ['emqx', 'master'] - ['emqx', 'master']
- ['emqx-enterprise', 'release-53'] - ['emqx-enterprise', 'release-54']
otp: otp:
- 25.3.2-2 - 25.3.2-2
arch: arch:

View File

@ -28,7 +28,7 @@
%% Max subscriptions allowed %% Max subscriptions allowed
max_subscriptions :: non_neg_integer() | infinity, max_subscriptions :: non_neg_integer() | infinity,
%% Upgrade QoS? %% Upgrade QoS?
upgrade_qos :: boolean(), upgrade_qos = false :: boolean(),
%% Client <- Broker: QoS1/2 messages sent to the client but %% Client <- Broker: QoS1/2 messages sent to the client but
%% have not been unacked. %% have not been unacked.
inflight :: emqx_inflight:inflight(), inflight :: emqx_inflight:inflight(),

View File

@ -262,10 +262,12 @@ t_session_subscription_idempotency(Config) ->
end, end,
fun(Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]), ct:pal("trace:\n ~p", [Trace]),
ConnInfo = #{}, Session = erpc:call(
Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}]
),
?assertMatch( ?assertMatch(
#{subscriptions := #{SubTopicFilter := #{}}}, #{SubTopicFilter := #{}},
erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]) emqx_session:info(subscriptions, Session)
) )
end end
), ),
@ -336,10 +338,12 @@ t_session_unsubscription_idempotency(Config) ->
end, end,
fun(Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]), ct:pal("trace:\n ~p", [Trace]),
ConnInfo = #{}, Session = erpc:call(
?assertMatch( Node1, emqx_persistent_session_ds, session_open, [ClientId, _ConnInfo = #{}]
#{subscriptions := Subs = #{}} when map_size(Subs) =:= 0, ),
erpc:call(Node1, emqx_persistent_session_ds, session_open, [ClientId, ConnInfo]) ?assertEqual(
#{},
emqx_session:info(subscriptions, Session)
), ),
ok ok
end end

View File

@ -33,6 +33,8 @@
-export_type([inflight/0, seqno/0]). -export_type([inflight/0, seqno/0]).
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_utils/include/emqx_message.hrl").
-include("emqx_persistent_session_ds.hrl"). -include("emqx_persistent_session_ds.hrl").
-ifdef(TEST). -ifdef(TEST).
@ -46,6 +48,8 @@
-define(COMP, 1). -define(COMP, 1).
-define(TRACK_FLAG(WHICH), (1 bsl WHICH)). -define(TRACK_FLAG(WHICH), (1 bsl WHICH)).
-define(TRACK_FLAGS_ALL, ?TRACK_FLAG(?ACK) bor ?TRACK_FLAG(?COMP)).
-define(TRACK_FLAGS_NONE, 0).
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
@ -66,10 +70,10 @@
-opaque inflight() :: #inflight{}. -opaque inflight() :: #inflight{}.
-type reply_fun() :: fun( -type message() :: emqx_types:message().
(seqno(), emqx_types:message()) -> -type replies() :: [emqx_session:reply()].
emqx_session:replies() | {_AdvanceSeqno :: false, emqx_session:replies()}
). -type preproc_fun() :: fun((message()) -> message() | [message()]).
%%================================================================================ %%================================================================================
%% API funcions %% API funcions
@ -116,11 +120,11 @@ n_inflight(#inflight{offset_ranges = Ranges}) ->
Ranges Ranges
). ).
-spec replay(reply_fun(), inflight()) -> {emqx_session:replies(), inflight()}. -spec replay(preproc_fun(), inflight()) -> {emqx_session:replies(), inflight()}.
replay(ReplyFun, Inflight0 = #inflight{offset_ranges = Ranges0}) -> replay(PreprocFunFun, Inflight0 = #inflight{offset_ranges = Ranges0, commits = Commits}) ->
{Ranges, Replies} = lists:mapfoldr( {Ranges, Replies} = lists:mapfoldr(
fun(Range, Acc) -> fun(Range, Acc) ->
replay_range(ReplyFun, Range, Acc) replay_range(PreprocFunFun, Commits, Range, Acc)
end, end,
[], [],
Ranges0 Ranges0
@ -166,9 +170,9 @@ commit_offset(
{false, Inflight0} {false, Inflight0}
end. end.
-spec poll(reply_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) -> -spec poll(preproc_fun(), emqx_persistent_session_ds:id(), inflight(), pos_integer()) ->
{emqx_session:replies(), inflight()}. {emqx_session:replies(), inflight()}.
poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE -> poll(PreprocFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize < ?EPOCH_SIZE ->
MinBatchSize = emqx_config:get([session_persistence, min_batch_size]), MinBatchSize = emqx_config:get([session_persistence, min_batch_size]),
FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)), FetchThreshold = min(MinBatchSize, ceil(WindowSize / 2)),
FreeSpace = WindowSize - n_inflight(Inflight0), FreeSpace = WindowSize - n_inflight(Inflight0),
@ -182,7 +186,7 @@ poll(ReplyFun, SessionId, Inflight0, WindowSize) when WindowSize > 0, WindowSize
true -> true ->
%% TODO: Wrap this in `mria:async_dirty/2`? %% TODO: Wrap this in `mria:async_dirty/2`?
Streams = shuffle(get_streams(SessionId)), Streams = shuffle(get_streams(SessionId)),
fetch(ReplyFun, SessionId, Inflight0, Streams, FreeSpace, []) fetch(PreprocFun, SessionId, Inflight0, Streams, FreeSpace, [])
end. end.
%% Which seqno this track is committed until. %% Which seqno this track is committed until.
@ -249,22 +253,22 @@ get_ranges(SessionId) ->
), ),
mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read). mnesia:match_object(?SESSION_PUBRANGE_TAB, Pat, read).
fetch(ReplyFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -> fetch(PreprocFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 ->
#inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0, #inflight{next_seqno = FirstSeqno, offset_ranges = Ranges} = Inflight0,
ItBegin = get_last_iterator(DSStream, Ranges), ItBegin = get_last_iterator(DSStream, Ranges),
{ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N), {ok, ItEnd, Messages} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, N),
case Messages of case Messages of
[] -> [] ->
fetch(ReplyFun, SessionId, Inflight0, Streams, N, Acc); fetch(PreprocFun, SessionId, Inflight0, Streams, N, Acc);
_ -> _ ->
%% We need to preserve the iterator pointing to the beginning of the %% We need to preserve the iterator pointing to the beginning of the
%% range, so that we can replay it if needed. %% range, so that we can replay it if needed.
{Publishes, {UntilSeqno, Tracks}} = publish(ReplyFun, FirstSeqno, Messages), {Publishes, UntilSeqno} = publish_fetch(PreprocFun, FirstSeqno, Messages),
Size = range_size(FirstSeqno, UntilSeqno), Size = range_size(FirstSeqno, UntilSeqno),
Range0 = #ds_pubrange{ Range0 = #ds_pubrange{
id = {SessionId, FirstSeqno}, id = {SessionId, FirstSeqno},
type = ?T_INFLIGHT, type = ?T_INFLIGHT,
tracks = Tracks, tracks = compute_pub_tracks(Publishes),
until = UntilSeqno, until = UntilSeqno,
stream = DSStream#ds_stream.ref, stream = DSStream#ds_stream.ref,
iterator = ItBegin iterator = ItBegin
@ -278,7 +282,7 @@ fetch(ReplyFun, SessionId, Inflight0, [DSStream | Streams], N, Acc) when N > 0 -
next_seqno = UntilSeqno, next_seqno = UntilSeqno,
offset_ranges = Ranges ++ [Range] offset_ranges = Ranges ++ [Range]
}, },
fetch(ReplyFun, SessionId, Inflight, Streams, N - Size, [Publishes | Acc]) fetch(PreprocFun, SessionId, Inflight, Streams, N - Size, [Publishes | Acc])
end; end;
fetch(_ReplyFun, _SessionId, Inflight, _Streams, _N, Acc) -> fetch(_ReplyFun, _SessionId, Inflight, _Streams, _N, Acc) ->
Publishes = lists:append(lists:reverse(Acc)), Publishes = lists:append(lists:reverse(Acc)),
@ -375,19 +379,20 @@ discard_tracks(#{ack := AckedUntil, comp := CompUntil}, Until, Tracks) ->
TAck bor TComp. TAck bor TComp.
replay_range( replay_range(
ReplyFun, PreprocFun,
Commits,
Range0 = #ds_pubrange{type = ?T_INFLIGHT, id = {_, First}, until = Until, iterator = It}, Range0 = #ds_pubrange{type = ?T_INFLIGHT, id = {_, First}, until = Until, iterator = It},
Acc Acc
) -> ) ->
Size = range_size(First, Until), Size = range_size(First, Until),
{ok, ItNext, MessagesUnacked} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, Size), {ok, ItNext, MessagesUnacked} = emqx_ds:next(?PERSISTENT_MESSAGE_DB, It, Size),
%% Asserting that range is consistent with the message storage state. %% Asserting that range is consistent with the message storage state.
{Replies, {Until, _TracksInitial}} = publish(ReplyFun, First, MessagesUnacked), {Replies, Until} = publish_replay(PreprocFun, Commits, First, MessagesUnacked),
%% Again, we need to keep the iterator pointing past the end of the %% Again, we need to keep the iterator pointing past the end of the
%% range, so that we can pick up where we left off. %% range, so that we can pick up where we left off.
Range = keep_next_iterator(ItNext, Range0), Range = keep_next_iterator(ItNext, Range0),
{Range, Replies ++ Acc}; {Range, Replies ++ Acc};
replay_range(_ReplyFun, Range0 = #ds_pubrange{type = ?T_CHECKPOINT}, Acc) -> replay_range(_PreprocFun, _Commits, Range0 = #ds_pubrange{type = ?T_CHECKPOINT}, Acc) ->
{Range0, Acc}. {Range0, Acc}.
validate_commit( validate_commit(
@ -420,28 +425,89 @@ get_commit_next(rec, #inflight{next_seqno = NextSeqno}) ->
get_commit_next(comp, #inflight{commits = Commits}) -> get_commit_next(comp, #inflight{commits = Commits}) ->
maps:get(rec, Commits). maps:get(rec, Commits).
publish(ReplyFun, FirstSeqno, Messages) -> publish_fetch(PreprocFun, FirstSeqno, Messages) ->
lists:mapfoldl( flatmapfoldl(
fun(Message, {Seqno, TAcc}) -> fun(MessageIn, Acc) ->
case ReplyFun(Seqno, Message) of Message = PreprocFun(MessageIn),
{_Advance = false, Reply} -> publish_fetch(Message, Acc)
{Reply, {Seqno, TAcc}};
Reply ->
NextSeqno = next_seqno(Seqno),
NextTAcc = add_msg_track(Message, TAcc),
{Reply, {NextSeqno, NextTAcc}}
end
end, end,
{FirstSeqno, 0}, FirstSeqno,
Messages Messages
). ).
add_msg_track(Message, Tracks) -> publish_fetch(#message{qos = ?QOS_0} = Message, Seqno) ->
case emqx_message:qos(Message) of {{undefined, Message}, Seqno};
1 -> ?TRACK_FLAG(?ACK) bor Tracks; publish_fetch(#message{} = Message, Seqno) ->
2 -> ?TRACK_FLAG(?COMP) bor Tracks; PacketId = seqno_to_packet_id(Seqno),
_ -> Tracks {{PacketId, Message}, next_seqno(Seqno)};
end. publish_fetch(Messages, Seqno) ->
flatmapfoldl(fun publish_fetch/2, Seqno, Messages).
publish_replay(PreprocFun, Commits, FirstSeqno, Messages) ->
#{ack := AckedUntil, comp := CompUntil, rec := RecUntil} = Commits,
flatmapfoldl(
fun(MessageIn, Acc) ->
Message = PreprocFun(MessageIn),
publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc)
end,
FirstSeqno,
Messages
).
publish_replay(#message{qos = ?QOS_0}, _, _, _, Seqno) ->
%% QoS 0 (at most once) messages should not be replayed.
{[], Seqno};
publish_replay(#message{qos = Qos} = Message, AckedUntil, CompUntil, RecUntil, Seqno) ->
case Qos of
?QOS_1 when Seqno < AckedUntil ->
%% This message has already been acked, so we can skip it.
%% We still need to advance seqno, because previously we assigned this message
%% a unique Packet Id.
{[], next_seqno(Seqno)};
?QOS_2 when Seqno < CompUntil ->
%% This message's flow has already been fully completed, so we can skip it.
%% We still need to advance seqno, because previously we assigned this message
%% a unique Packet Id.
{[], next_seqno(Seqno)};
?QOS_2 when Seqno < RecUntil ->
%% This message's flow has been partially completed, we need to resend a PUBREL.
PacketId = seqno_to_packet_id(Seqno),
Pub = {pubrel, PacketId},
{Pub, next_seqno(Seqno)};
_ ->
%% This message flow hasn't been acked and/or received, we need to resend it.
PacketId = seqno_to_packet_id(Seqno),
Pub = {PacketId, emqx_message:set_flag(dup, true, Message)},
{Pub, next_seqno(Seqno)}
end;
publish_replay([], _, _, _, Seqno) ->
{[], Seqno};
publish_replay(Messages, AckedUntil, CompUntil, RecUntil, Seqno) ->
flatmapfoldl(
fun(Message, Acc) ->
publish_replay(Message, AckedUntil, CompUntil, RecUntil, Acc)
end,
Seqno,
Messages
).
-spec compute_pub_tracks(replies()) -> non_neg_integer().
compute_pub_tracks(Pubs) ->
compute_pub_tracks(Pubs, ?TRACK_FLAGS_NONE).
compute_pub_tracks(_Pubs, Tracks = ?TRACK_FLAGS_ALL) ->
Tracks;
compute_pub_tracks([Pub | Rest], Tracks) ->
Track =
case Pub of
{_PacketId, #message{qos = ?QOS_1}} -> ?TRACK_FLAG(?ACK);
{_PacketId, #message{qos = ?QOS_2}} -> ?TRACK_FLAG(?COMP);
{pubrel, _PacketId} -> ?TRACK_FLAG(?COMP);
_ -> ?TRACK_FLAGS_NONE
end,
compute_pub_tracks(Rest, Track bor Tracks);
compute_pub_tracks([], Tracks) ->
Tracks.
keep_next_iterator(ItNext, Range = #ds_pubrange{iterator = ItFirst, misc = Misc}) -> keep_next_iterator(ItNext, Range = #ds_pubrange{iterator = ItFirst, misc = Misc}) ->
Range#ds_pubrange{ Range#ds_pubrange{
@ -545,6 +611,19 @@ shuffle(L0) ->
{_, L} = lists:unzip(L2), {_, L} = lists:unzip(L2),
L. L.
-spec flatmapfoldl(fun((X, Acc) -> {Y | [Y], Acc}), Acc, [X]) -> {[Y], Acc}.
flatmapfoldl(_Fun, Acc, []) ->
{[], Acc};
flatmapfoldl(Fun, Acc, [X | Xs]) ->
{Ys, NAcc} = Fun(X, Acc),
{Zs, FAcc} = flatmapfoldl(Fun, NAcc, Xs),
case is_list(Ys) of
true ->
{Ys ++ Zs, FAcc};
_ ->
{[Ys | Zs], FAcc}
end.
ro_transaction(Fun) -> ro_transaction(Fun) ->
{atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun), {atomic, Res} = mria:ro_transaction(?DS_MRIA_SHARD, Fun),
Res. Res.

View File

@ -29,7 +29,7 @@
%% Session API %% Session API
-export([ -export([
create/3, create/3,
open/2, open/3,
destroy/1 destroy/1
]). ]).
@ -102,6 +102,8 @@
-define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at). -define(TIMER_BUMP_LAST_ALIVE_AT, timer_bump_last_alive_at).
-type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT. -type timer() :: ?TIMER_PULL | ?TIMER_GET_STREAMS | ?TIMER_BUMP_LAST_ALIVE_AT.
-type subscriptions() :: emqx_topic_gbt:t(nil(), subscription()).
-type session() :: #{ -type session() :: #{
%% Client ID %% Client ID
id := id(), id := id(),
@ -110,7 +112,7 @@
%% When the client was last considered alive %% When the client was last considered alive
last_alive_at := timestamp(), last_alive_at := timestamp(),
%% Clients Subscriptions. %% Clients Subscriptions.
subscriptions := #{topic_filter() => subscription()}, subscriptions := subscriptions(),
%% Inflight messages %% Inflight messages
inflight := emqx_persistent_message_ds_replayer:inflight(), inflight := emqx_persistent_message_ds_replayer:inflight(),
%% Receive maximum %% Receive maximum
@ -150,12 +152,12 @@
-spec create(clientinfo(), conninfo(), emqx_session:conf()) -> -spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
session(). session().
create(#{clientid := ClientID}, ConnInfo, Conf) -> create(#{clientid := ClientID}, ConnInfo, Conf) ->
% TODO: expiration Session = session_ensure_new(ClientID, ConnInfo),
ensure_timers(ensure_session(ClientID, ConnInfo, Conf)). apply_conf(ConnInfo, Conf, ensure_timers(Session)).
-spec open(clientinfo(), conninfo()) -> -spec open(clientinfo(), conninfo(), emqx_session:conf()) ->
{_IsPresent :: true, session(), []} | false. {_IsPresent :: true, session(), []} | false.
open(#{clientid := ClientID} = _ClientInfo, ConnInfo) -> open(#{clientid := ClientID} = _ClientInfo, ConnInfo, Conf) ->
%% NOTE %% NOTE
%% The fact that we need to concern about discarding all live channels here %% The fact that we need to concern about discarding all live channels here
%% is essentially a consequence of the in-memory session design, where we %% is essentially a consequence of the in-memory session design, where we
@ -165,20 +167,16 @@ open(#{clientid := ClientID} = _ClientInfo, ConnInfo) ->
ok = emqx_cm:discard_session(ClientID), ok = emqx_cm:discard_session(ClientID),
case session_open(ClientID, ConnInfo) of case session_open(ClientID, ConnInfo) of
Session0 = #{} -> Session0 = #{} ->
ReceiveMaximum = receive_maximum(ConnInfo), Session = apply_conf(ConnInfo, Conf, Session0),
Session = Session0#{receive_maximum => ReceiveMaximum},
{true, ensure_timers(Session), []}; {true, ensure_timers(Session), []};
false -> false ->
false false
end. end.
ensure_session(ClientID, ConnInfo, Conf) -> apply_conf(ConnInfo, Conf, Session) ->
Session = session_ensure_new(ClientID, ConnInfo, Conf),
ReceiveMaximum = receive_maximum(ConnInfo),
Session#{ Session#{
conninfo => ConnInfo, receive_maximum => receive_maximum(ConnInfo),
receive_maximum => ReceiveMaximum, props => Conf
subscriptions => #{}
}. }.
-spec destroy(session() | clientinfo()) -> ok. -spec destroy(session() | clientinfo()) -> ok.
@ -204,10 +202,10 @@ info(created_at, #{created_at := CreatedAt}) ->
CreatedAt; CreatedAt;
info(is_persistent, #{}) -> info(is_persistent, #{}) ->
true; true;
info(subscriptions, #{subscriptions := Iters}) -> info(subscriptions, #{subscriptions := Subs}) ->
maps:map(fun(_, #{props := SubOpts}) -> SubOpts end, Iters); subs_to_map(Subs);
info(subscriptions_cnt, #{subscriptions := Iters}) -> info(subscriptions_cnt, #{subscriptions := Subs}) ->
maps:size(Iters); subs_size(Subs);
info(subscriptions_max, #{props := Conf}) -> info(subscriptions_max, #{props := Conf}) ->
maps:get(max_subscriptions, Conf); maps:get(max_subscriptions, Conf);
info(upgrade_qos, #{props := Conf}) -> info(upgrade_qos, #{props := Conf}) ->
@ -274,41 +272,40 @@ subscribe(
TopicFilter, TopicFilter,
SubOpts, SubOpts,
Session = #{id := ID, subscriptions := Subs} Session = #{id := ID, subscriptions := Subs}
) when is_map_key(TopicFilter, Subs) ->
Subscription = maps:get(TopicFilter, Subs),
NSubscription = update_subscription(TopicFilter, Subscription, SubOpts, ID),
{ok, Session#{subscriptions := Subs#{TopicFilter => NSubscription}}};
subscribe(
TopicFilter,
SubOpts,
Session = #{id := ID, subscriptions := Subs}
) -> ) ->
% TODO: max_subscriptions case subs_lookup(TopicFilter, Subs) of
Subscription = add_subscription(TopicFilter, SubOpts, ID), Subscription = #{} ->
{ok, Session#{subscriptions := Subs#{TopicFilter => Subscription}}}. NSubscription = update_subscription(TopicFilter, Subscription, SubOpts, ID),
NSubs = subs_insert(TopicFilter, NSubscription, Subs),
{ok, Session#{subscriptions := NSubs}};
undefined ->
% TODO: max_subscriptions
Subscription = add_subscription(TopicFilter, SubOpts, ID),
NSubs = subs_insert(TopicFilter, Subscription, Subs),
{ok, Session#{subscriptions := NSubs}}
end.
-spec unsubscribe(topic_filter(), session()) -> -spec unsubscribe(topic_filter(), session()) ->
{ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
unsubscribe( unsubscribe(
TopicFilter, TopicFilter,
Session = #{id := ID, subscriptions := Subs} Session = #{id := ID, subscriptions := Subs}
) when is_map_key(TopicFilter, Subs) ->
Subscription = maps:get(TopicFilter, Subs),
SubOpts = maps:get(props, Subscription),
ok = del_subscription(TopicFilter, ID),
{ok, Session#{subscriptions := maps:remove(TopicFilter, Subs)}, SubOpts};
unsubscribe(
_TopicFilter,
_Session = #{}
) -> ) ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED}. case subs_lookup(TopicFilter, Subs) of
_Subscription = #{props := SubOpts} ->
ok = del_subscription(TopicFilter, ID),
NSubs = subs_delete(TopicFilter, Subs),
{ok, Session#{subscriptions := NSubs}, SubOpts};
undefined ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED}
end.
-spec get_subscription(topic_filter(), session()) -> -spec get_subscription(topic_filter(), session()) ->
emqx_types:subopts() | undefined. emqx_types:subopts() | undefined.
get_subscription(TopicFilter, #{subscriptions := Subs}) -> get_subscription(TopicFilter, #{subscriptions := Subs}) ->
case maps:get(TopicFilter, Subs, undefined) of case subs_lookup(TopicFilter, Subs) of
Subscription = #{} -> _Subscription = #{props := SubOpts} ->
maps:get(props, Subscription); SubOpts;
undefined -> undefined ->
undefined undefined
end. end.
@ -329,9 +326,6 @@ publish(_PacketId, Msg, Session) ->
%% Client -> Broker: PUBACK %% Client -> Broker: PUBACK
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% FIXME: parts of the commit offset function are mocked
-dialyzer({nowarn_function, puback/3}).
-spec puback(clientinfo(), emqx_types:packet_id(), session()) -> -spec puback(clientinfo(), emqx_types:packet_id(), session()) ->
{ok, emqx_types:message(), replies(), session()} {ok, emqx_types:message(), replies(), session()}
| {error, emqx_types:reason_code()}. | {error, emqx_types:reason_code()}.
@ -403,20 +397,22 @@ deliver(_ClientInfo, _Delivers, Session) ->
-spec handle_timeout(clientinfo(), _Timeout, session()) -> -spec handle_timeout(clientinfo(), _Timeout, session()) ->
{ok, replies(), session()} | {ok, replies(), timeout(), session()}. {ok, replies(), session()} | {ok, replies(), timeout(), session()}.
handle_timeout( handle_timeout(
_ClientInfo, ClientInfo,
?TIMER_PULL, ?TIMER_PULL,
Session0 = #{id := Id, inflight := Inflight0, receive_maximum := ReceiveMaximum} Session0 = #{
id := Id,
inflight := Inflight0,
subscriptions := Subs,
props := Conf,
receive_maximum := ReceiveMaximum
}
) -> ) ->
MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]), MaxBatchSize = emqx_config:get([session_persistence, max_batch_size]),
BatchSize = min(ReceiveMaximum, MaxBatchSize), BatchSize = min(ReceiveMaximum, MaxBatchSize),
UpgradeQoS = maps:get(upgrade_qos, Conf),
PreprocFun = make_preproc_fun(ClientInfo, Subs, UpgradeQoS),
{Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll( {Publishes, Inflight} = emqx_persistent_message_ds_replayer:poll(
fun PreprocFun,
(_Seqno, Message = #message{qos = ?QOS_0}) ->
{false, {undefined, Message}};
(Seqno, Message) ->
PacketId = emqx_persistent_message_ds_replayer:seqno_to_packet_id(Seqno),
{PacketId, Message}
end,
Id, Id,
Inflight0, Inflight0,
BatchSize BatchSize
@ -442,30 +438,21 @@ handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0) ->
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]),
EstimatedLastAliveAt = now_ms() + BumpInterval, EstimatedLastAliveAt = now_ms() + BumpInterval,
Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt), Session = session_set_last_alive_at_trans(Session0, EstimatedLastAliveAt),
BumpInterval = emqx_config:get([session_persistence, last_alive_update_interval]), {ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)};
{ok, [], emqx_session:ensure_timer(?TIMER_BUMP_LAST_ALIVE_AT, BumpInterval, Session)}. handle_timeout(_ClientInfo, expire_awaiting_rel, Session) ->
%% TODO: stub
{ok, [], Session}.
-spec replay(clientinfo(), [], session()) -> -spec replay(clientinfo(), [], session()) ->
{ok, replies(), session()}. {ok, replies(), session()}.
replay(_ClientInfo, [], Session = #{inflight := Inflight0}) -> replay(
AckedUntil = emqx_persistent_message_ds_replayer:committed_until(ack, Inflight0), ClientInfo,
RecUntil = emqx_persistent_message_ds_replayer:committed_until(rec, Inflight0), [],
CompUntil = emqx_persistent_message_ds_replayer:committed_until(comp, Inflight0), Session = #{inflight := Inflight0, subscriptions := Subs, props := Conf}
ReplyFun = fun ) ->
(_Seqno, #message{qos = ?QOS_0}) -> UpgradeQoS = maps:get(upgrade_qos, Conf),
{false, []}; PreprocFun = make_preproc_fun(ClientInfo, Subs, UpgradeQoS),
(Seqno, #message{qos = ?QOS_1}) when Seqno < AckedUntil -> {Replies, Inflight} = emqx_persistent_message_ds_replayer:replay(PreprocFun, Inflight0),
[];
(Seqno, #message{qos = ?QOS_2}) when Seqno < CompUntil ->
[];
(Seqno, #message{qos = ?QOS_2}) when Seqno < RecUntil ->
PacketId = emqx_persistent_message_ds_replayer:seqno_to_packet_id(Seqno),
{pubrel, PacketId};
(Seqno, Message) ->
PacketId = emqx_persistent_message_ds_replayer:seqno_to_packet_id(Seqno),
{PacketId, emqx_message:set_flag(dup, true, Message)}
end,
{Replies, Inflight} = emqx_persistent_message_ds_replayer:replay(ReplyFun, Inflight0),
{ok, Replies, Session#{inflight := Inflight}}. {ok, Replies, Session#{inflight := Inflight}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -481,6 +468,19 @@ terminate(_Reason, _Session = #{}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
make_preproc_fun(ClientInfo, Subs, UpgradeQoS) ->
fun(Message = #message{topic = Topic}) ->
emqx_utils:flattermap(
fun(Match) ->
#{props := SubOpts} = subs_get_match(Match, Subs),
emqx_session:enrich_message(ClientInfo, Message, SubOpts, UpgradeQoS)
end,
subs_matches(Topic, Subs)
)
end.
%%--------------------------------------------------------------------
-spec add_subscription(topic_filter(), emqx_types:subopts(), id()) -> -spec add_subscription(topic_filter(), emqx_types:subopts(), id()) ->
subscription(). subscription().
add_subscription(TopicFilter, SubOpts, DSSessionID) -> add_subscription(TopicFilter, SubOpts, DSSessionID) ->
@ -644,25 +644,24 @@ session_open(SessionId, NewConnInfo) ->
end end
end). end).
-spec session_ensure_new(id(), emqx_types:conninfo(), _Props :: map()) -> -spec session_ensure_new(id(), emqx_types:conninfo()) ->
session(). session().
session_ensure_new(SessionId, ConnInfo, Props) -> session_ensure_new(SessionId, ConnInfo) ->
transaction(fun() -> transaction(fun() ->
ok = session_drop_subscriptions(SessionId), ok = session_drop_records(SessionId),
Session = export_session(session_create(SessionId, ConnInfo, Props)), Session = export_session(session_create(SessionId, ConnInfo)),
Session#{ Session#{
subscriptions => #{}, subscriptions => subs_new(),
inflight => emqx_persistent_message_ds_replayer:new() inflight => emqx_persistent_message_ds_replayer:new()
} }
end). end).
session_create(SessionId, ConnInfo, Props) -> session_create(SessionId, ConnInfo) ->
Session = #session{ Session = #session{
id = SessionId, id = SessionId,
created_at = now_ms(), created_at = now_ms(),
last_alive_at = now_ms(), last_alive_at = now_ms(),
conninfo = ConnInfo, conninfo = ConnInfo
props = Props
}, },
ok = mnesia:write(?SESSION_TAB, Session, write), ok = mnesia:write(?SESSION_TAB, Session, write),
Session. Session.
@ -696,13 +695,17 @@ session_set_last_alive_at(SessionRecord0, LastAliveAt) ->
-spec session_drop(id()) -> ok. -spec session_drop(id()) -> ok.
session_drop(DSSessionId) -> session_drop(DSSessionId) ->
transaction(fun() -> transaction(fun() ->
ok = session_drop_subscriptions(DSSessionId), ok = session_drop_records(DSSessionId),
ok = session_drop_pubranges(DSSessionId),
ok = session_drop_offsets(DSSessionId),
ok = session_drop_streams(DSSessionId),
ok = mnesia:delete(?SESSION_TAB, DSSessionId, write) ok = mnesia:delete(?SESSION_TAB, DSSessionId, write)
end). end).
-spec session_drop_records(id()) -> ok.
session_drop_records(DSSessionId) ->
ok = session_drop_subscriptions(DSSessionId),
ok = session_drop_pubranges(DSSessionId),
ok = session_drop_offsets(DSSessionId),
ok = session_drop_streams(DSSessionId).
-spec session_drop_subscriptions(id()) -> ok. -spec session_drop_subscriptions(id()) -> ok.
session_drop_subscriptions(DSSessionId) -> session_drop_subscriptions(DSSessionId) ->
Subscriptions = session_read_subscriptions(DSSessionId, write), Subscriptions = session_read_subscriptions(DSSessionId, write),
@ -844,7 +847,7 @@ do_ensure_all_iterators_closed(_DSSessionID) ->
renew_streams(#{id := SessionId, subscriptions := Subscriptions}) -> renew_streams(#{id := SessionId, subscriptions := Subscriptions}) ->
transaction(fun() -> transaction(fun() ->
ExistingStreams = mnesia:read(?SESSION_STREAM_TAB, SessionId, write), ExistingStreams = mnesia:read(?SESSION_STREAM_TAB, SessionId, write),
maps:fold( subs_fold(
fun(TopicFilter, #{start_time := StartTime}, Streams) -> fun(TopicFilter, #{start_time := StartTime}, Streams) ->
TopicFilterWords = emqx_topic:words(TopicFilter), TopicFilterWords = emqx_topic:words(TopicFilter),
renew_topic_streams(SessionId, TopicFilterWords, StartTime, Streams) renew_topic_streams(SessionId, TopicFilterWords, StartTime, Streams)
@ -926,6 +929,43 @@ session_drop_offsets(DSSessionId) ->
%%-------------------------------------------------------------------------------- %%--------------------------------------------------------------------------------
subs_new() ->
emqx_topic_gbt:new().
subs_lookup(TopicFilter, Subs) ->
emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined).
subs_insert(TopicFilter, Subscription, Subs) ->
emqx_topic_gbt:insert(TopicFilter, [], Subscription, Subs).
subs_delete(TopicFilter, Subs) ->
emqx_topic_gbt:delete(TopicFilter, [], Subs).
subs_matches(Topic, Subs) ->
emqx_topic_gbt:matches(Topic, Subs, []).
subs_get_match(M, Subs) ->
emqx_topic_gbt:get_record(M, Subs).
subs_size(Subs) ->
emqx_topic_gbt:size(Subs).
subs_to_map(Subs) ->
subs_fold(
fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end,
#{},
Subs
).
subs_fold(Fun, AccIn, Subs) ->
emqx_topic_gbt:fold(
fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end,
AccIn,
Subs
).
%%--------------------------------------------------------------------------------
transaction(Fun) -> transaction(Fun) ->
case mnesia:is_transaction() of case mnesia:is_transaction() of
true -> true ->
@ -944,9 +984,9 @@ ro_transaction(Fun) ->
export_subscriptions(DSSubs) -> export_subscriptions(DSSubs) ->
lists:foldl( lists:foldl(
fun(DSSub = #ds_sub{id = {_DSSessionId, TopicFilter}}, Acc) -> fun(DSSub = #ds_sub{id = {_DSSessionId, TopicFilter}}, Acc) ->
Acc#{TopicFilter => export_subscription(DSSub)} subs_insert(TopicFilter, export_subscription(DSSub), Acc)
end, end,
#{}, subs_new(),
DSSubs DSSubs
). ).

View File

@ -96,13 +96,16 @@
]). ]).
% Foreign session implementations % Foreign session implementations
-export([enrich_delivers/3]). -export([
enrich_delivers/3,
enrich_message/4
]).
% Utilities % Utilities
-export([should_keep/1]). -export([should_keep/1]).
% Tests only % Tests only
-export([get_session_conf/2]). -export([get_session_conf/1]).
-export_type([ -export_type([
t/0, t/0,
@ -137,8 +140,6 @@
-type conf() :: #{ -type conf() :: #{
%% Max subscriptions allowed %% Max subscriptions allowed
max_subscriptions := non_neg_integer() | infinity, max_subscriptions := non_neg_integer() | infinity,
%% Max inflight messages allowed
max_inflight := non_neg_integer(),
%% Maximum number of awaiting QoS2 messages allowed %% Maximum number of awaiting QoS2 messages allowed
max_awaiting_rel := non_neg_integer() | infinity, max_awaiting_rel := non_neg_integer() | infinity,
%% Upgrade QoS? %% Upgrade QoS?
@ -171,7 +172,7 @@
-callback create(clientinfo(), conninfo(), conf()) -> -callback create(clientinfo(), conninfo(), conf()) ->
t(). t().
-callback open(clientinfo(), conninfo()) -> -callback open(clientinfo(), conninfo(), conf()) ->
{_IsPresent :: true, t(), _ReplayContext} | false. {_IsPresent :: true, t(), _ReplayContext} | false.
-callback destroy(t() | clientinfo()) -> ok. -callback destroy(t() | clientinfo()) -> ok.
@ -181,7 +182,7 @@
-spec create(clientinfo(), conninfo()) -> t(). -spec create(clientinfo(), conninfo()) -> t().
create(ClientInfo, ConnInfo) -> create(ClientInfo, ConnInfo) ->
Conf = get_session_conf(ClientInfo, ConnInfo), Conf = get_session_conf(ClientInfo),
create(ClientInfo, ConnInfo, Conf). create(ClientInfo, ConnInfo, Conf).
create(ClientInfo, ConnInfo, Conf) -> create(ClientInfo, ConnInfo, Conf) ->
@ -198,12 +199,12 @@ create(Mod, ClientInfo, ConnInfo, Conf) ->
-spec open(clientinfo(), conninfo()) -> -spec open(clientinfo(), conninfo()) ->
{_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}. {_IsPresent :: true, t(), _ReplayContext} | {_IsPresent :: false, t()}.
open(ClientInfo, ConnInfo) -> open(ClientInfo, ConnInfo) ->
Conf = get_session_conf(ClientInfo, ConnInfo), Conf = get_session_conf(ClientInfo),
Mods = [Default | _] = choose_impl_candidates(ConnInfo), Mods = [Default | _] = choose_impl_candidates(ConnInfo),
%% NOTE %% NOTE
%% Try to look the existing session up in session stores corresponding to the given %% Try to look the existing session up in session stores corresponding to the given
%% `Mods` in order, starting from the last one. %% `Mods` in order, starting from the last one.
case try_open(Mods, ClientInfo, ConnInfo) of case try_open(Mods, ClientInfo, ConnInfo, Conf) of
{_IsPresent = true, _, _} = Present -> {_IsPresent = true, _, _} = Present ->
Present; Present;
false -> false ->
@ -212,24 +213,20 @@ open(ClientInfo, ConnInfo) ->
{false, create(Default, ClientInfo, ConnInfo, Conf)} {false, create(Default, ClientInfo, ConnInfo, Conf)}
end. end.
try_open([Mod | Rest], ClientInfo, ConnInfo) -> try_open([Mod | Rest], ClientInfo, ConnInfo, Conf) ->
case try_open(Rest, ClientInfo, ConnInfo) of case try_open(Rest, ClientInfo, ConnInfo, Conf) of
{_IsPresent = true, _, _} = Present -> {_IsPresent = true, _, _} = Present ->
Present; Present;
false -> false ->
Mod:open(ClientInfo, ConnInfo) Mod:open(ClientInfo, ConnInfo, Conf)
end; end;
try_open([], _ClientInfo, _ConnInfo) -> try_open([], _ClientInfo, _ConnInfo, _Conf) ->
false. false.
-spec get_session_conf(clientinfo(), conninfo()) -> conf(). -spec get_session_conf(clientinfo()) -> conf().
get_session_conf( get_session_conf(_ClientInfo = #{zone := Zone}) ->
#{zone := Zone},
#{receive_maximum := MaxInflight}
) ->
#{ #{
max_subscriptions => get_mqtt_conf(Zone, max_subscriptions), max_subscriptions => get_mqtt_conf(Zone, max_subscriptions),
max_inflight => MaxInflight,
max_awaiting_rel => get_mqtt_conf(Zone, max_awaiting_rel), max_awaiting_rel => get_mqtt_conf(Zone, max_awaiting_rel),
upgrade_qos => get_mqtt_conf(Zone, upgrade_qos), upgrade_qos => get_mqtt_conf(Zone, upgrade_qos),
retry_interval => get_mqtt_conf(Zone, retry_interval), retry_interval => get_mqtt_conf(Zone, retry_interval),

View File

@ -59,7 +59,7 @@
-export([ -export([
create/3, create/3,
open/2, open/3,
destroy/1 destroy/1
]). ]).
@ -152,7 +152,11 @@
-spec create(clientinfo(), conninfo(), emqx_session:conf()) -> -spec create(clientinfo(), conninfo(), emqx_session:conf()) ->
session(). session().
create(#{zone := Zone, clientid := ClientId}, #{expiry_interval := EI}, Conf) -> create(
#{zone := Zone, clientid := ClientId},
#{expiry_interval := EI, receive_maximum := ReceiveMax},
Conf
) ->
QueueOpts = get_mqueue_conf(Zone), QueueOpts = get_mqueue_conf(Zone),
#session{ #session{
id = emqx_guid:gen(), id = emqx_guid:gen(),
@ -160,7 +164,7 @@ create(#{zone := Zone, clientid := ClientId}, #{expiry_interval := EI}, Conf) ->
created_at = erlang:system_time(millisecond), created_at = erlang:system_time(millisecond),
is_persistent = EI > 0, is_persistent = EI > 0,
subscriptions = #{}, subscriptions = #{},
inflight = emqx_inflight:new(maps:get(max_inflight, Conf)), inflight = emqx_inflight:new(ReceiveMax),
mqueue = emqx_mqueue:init(QueueOpts), mqueue = emqx_mqueue:init(QueueOpts),
next_pkt_id = 1, next_pkt_id = 1,
awaiting_rel = #{}, awaiting_rel = #{},
@ -195,14 +199,16 @@ destroy(_Session) ->
%% Open a (possibly existing) Session %% Open a (possibly existing) Session
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec open(clientinfo(), conninfo()) -> -spec open(clientinfo(), conninfo(), emqx_session:conf()) ->
{_IsPresent :: true, session(), replayctx()} | _IsPresent :: false. {_IsPresent :: true, session(), replayctx()} | _IsPresent :: false.
open(ClientInfo = #{clientid := ClientId}, _ConnInfo) -> open(ClientInfo = #{clientid := ClientId}, ConnInfo, Conf) ->
case emqx_cm:takeover_session_begin(ClientId) of case emqx_cm:takeover_session_begin(ClientId) of
{ok, SessionRemote, TakeoverState} -> {ok, SessionRemote, TakeoverState} ->
Session = resume(ClientInfo, SessionRemote), Session0 = resume(ClientInfo, SessionRemote),
case emqx_cm:takeover_session_end(TakeoverState) of case emqx_cm:takeover_session_end(TakeoverState) of
{ok, Pendings} -> {ok, Pendings} ->
Session1 = resize_inflight(ConnInfo, Session0),
Session = apply_conf(Conf, Session1),
clean_session(ClientInfo, Session, Pendings); clean_session(ClientInfo, Session, Pendings);
{error, _} -> {error, _} ->
% TODO log error? % TODO log error?
@ -212,6 +218,20 @@ open(ClientInfo = #{clientid := ClientId}, _ConnInfo) ->
false false
end. end.
resize_inflight(#{receive_maximum := ReceiveMax}, Session = #session{inflight = Inflight}) ->
Session#session{
inflight = emqx_inflight:resize(ReceiveMax, Inflight)
}.
apply_conf(Conf, Session = #session{}) ->
Session#session{
max_subscriptions = maps:get(max_subscriptions, Conf),
max_awaiting_rel = maps:get(max_awaiting_rel, Conf),
upgrade_qos = maps:get(upgrade_qos, Conf),
retry_interval = maps:get(retry_interval, Conf),
await_rel_timeout = maps:get(await_rel_timeout, Conf)
}.
clean_session(ClientInfo, Session = #session{mqueue = Q}, Pendings) -> clean_session(ClientInfo, Session = #session{mqueue = Q}, Pendings) ->
Q1 = emqx_mqueue:filter(fun emqx_session:should_keep/1, Q), Q1 = emqx_mqueue:filter(fun emqx_session:should_keep/1, Q),
Session1 = Session#session{mqueue = Q1}, Session1 = Session#session{mqueue = Q1},

View File

@ -14,14 +14,17 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Topic index implemetation with gb_trees stored in persistent_term. %% @doc Topic index implemetation with gb_trees as the underlying data
%% This is only suitable for a static set of topic or topic-filters. %% structure.
-module(emqx_topic_gbt). -module(emqx_topic_gbt).
-export([new/0, new/1]). -export([new/0]).
-export([size/1]).
-export([insert/4]). -export([insert/4]).
-export([delete/3]). -export([delete/3]).
-export([lookup/4]).
-export([fold/3]).
-export([match/2]). -export([match/2]).
-export([matches/3]). -export([matches/3]).
@ -29,53 +32,74 @@
-export([get_topic/1]). -export([get_topic/1]).
-export([get_record/2]). -export([get_record/2]).
-export_type([t/0, t/2, match/1]).
-type key(ID) :: emqx_trie_search:key(ID). -type key(ID) :: emqx_trie_search:key(ID).
-type words() :: emqx_trie_search:words(). -type words() :: emqx_trie_search:words().
-type match(ID) :: key(ID). -type match(ID) :: key(ID).
-type name() :: any().
%% @private Only for testing. -opaque t(ID, Value) :: gb_trees:tree(key(ID), Value).
-spec new() -> name(). -opaque t() :: t(_ID, _Value).
new() ->
new(test).
%% @doc Create a new gb_tree and store it in the persitent_term with the %% @doc Create a new gb_tree and store it in the persitent_term with the
%% given name. %% given name.
-spec new(name()) -> name(). -spec new() -> t().
new(Name) -> new() ->
T = gb_trees:from_orddict([]), gb_trees:empty().
true = gbt_update(Name, T),
Name. -spec size(t()) -> non_neg_integer().
size(Gbt) ->
gb_trees:size(Gbt).
%% @doc Insert a new entry into the index that associates given topic filter to given %% @doc Insert a new entry into the index that associates given topic filter to given
%% record ID, and attaches arbitrary record to the entry. This allows users to choose %% record ID, and attaches arbitrary record to the entry. This allows users to choose
%% between regular and "materialized" indexes, for example. %% between regular and "materialized" indexes, for example.
-spec insert(emqx_types:topic() | words(), _ID, _Record, name()) -> true. -spec insert(emqx_types:topic() | words(), _ID, _Record, t()) -> t().
insert(Filter, ID, Record, Name) -> insert(Filter, ID, Record, Gbt) ->
Tree = gbt(Name),
Key = key(Filter, ID), Key = key(Filter, ID),
NewTree = gb_trees:enter(Key, Record, Tree), gb_trees:enter(Key, Record, Gbt).
true = gbt_update(Name, NewTree).
%% @doc Delete an entry from the index that associates given topic filter to given %% @doc Delete an entry from the index that associates given topic filter to given
%% record ID. Deleting non-existing entry is not an error. %% record ID. Deleting non-existing entry is not an error.
-spec delete(emqx_types:topic() | words(), _ID, name()) -> true. -spec delete(emqx_types:topic() | words(), _ID, t()) -> t().
delete(Filter, ID, Name) -> delete(Filter, ID, Gbt) ->
Tree = gbt(Name),
Key = key(Filter, ID), Key = key(Filter, ID),
NewTree = gb_trees:delete_any(Key, Tree), gb_trees:delete_any(Key, Gbt).
true = gbt_update(Name, NewTree).
-spec lookup(emqx_types:topic() | words(), _ID, t(), Default) -> _Record | Default.
lookup(Filter, ID, Gbt, Default) ->
Key = key(Filter, ID),
case gb_trees:lookup(Key, Gbt) of
{value, Record} ->
Record;
none ->
Default
end.
-spec fold(fun((key(_ID), _Record, Acc) -> Acc), Acc, t()) -> Acc.
fold(Fun, Acc, Gbt) ->
Iter = gb_trees:iterator(Gbt),
fold_iter(Fun, Acc, Iter).
fold_iter(Fun, Acc, Iter) ->
case gb_trees:next(Iter) of
{Key, Record, NIter} ->
fold_iter(Fun, Fun(Key, Record, Acc), NIter);
none ->
Acc
end.
%% @doc Match given topic against the index and return the first match, or `false` if %% @doc Match given topic against the index and return the first match, or `false` if
%% no match is found. %% no match is found.
-spec match(emqx_types:topic(), name()) -> match(_ID) | false. -spec match(emqx_types:topic(), t()) -> match(_ID) | false.
match(Topic, Name) -> match(Topic, Gbt) ->
emqx_trie_search:match(Topic, make_nextf(Name)). emqx_trie_search:match(Topic, make_nextf(Gbt)).
%% @doc Match given topic against the index and return _all_ matches. %% @doc Match given topic against the index and return _all_ matches.
%% If `unique` option is given, return only unique matches by record ID. %% If `unique` option is given, return only unique matches by record ID.
matches(Topic, Name, Opts) -> -spec matches(emqx_types:topic(), t(), emqx_trie_search:opts()) -> [match(_ID)].
emqx_trie_search:matches(Topic, make_nextf(Name), Opts). matches(Topic, Gbt, Opts) ->
emqx_trie_search:matches(Topic, make_nextf(Gbt), Opts).
%% @doc Extract record ID from the match. %% @doc Extract record ID from the match.
-spec get_id(match(ID)) -> ID. -spec get_id(match(ID)) -> ID.
@ -88,21 +112,13 @@ get_topic(Key) ->
emqx_trie_search:get_topic(Key). emqx_trie_search:get_topic(Key).
%% @doc Fetch the record associated with the match. %% @doc Fetch the record associated with the match.
-spec get_record(match(_ID), name()) -> _Record. -spec get_record(match(_ID), t()) -> _Record.
get_record(Key, Name) -> get_record(Key, Gbt) ->
Gbt = gbt(Name),
gb_trees:get(Key, Gbt). gb_trees:get(Key, Gbt).
key(TopicOrFilter, ID) -> key(TopicOrFilter, ID) ->
emqx_trie_search:make_key(TopicOrFilter, ID). emqx_trie_search:make_key(TopicOrFilter, ID).
gbt(Name) ->
persistent_term:get({?MODULE, Name}).
gbt_update(Name, Tree) ->
persistent_term:put({?MODULE, Name}, Tree),
true.
gbt_next(nil, _Input) -> gbt_next(nil, _Input) ->
'$end_of_table'; '$end_of_table';
gbt_next({P, _V, _Smaller, Bigger}, K) when K >= P -> gbt_next({P, _V, _Smaller, Bigger}, K) when K >= P ->
@ -115,6 +131,5 @@ gbt_next({P, _V, Smaller, _Bigger}, K) ->
NextKey NextKey
end. end.
make_nextf(Name) -> make_nextf({_Size, Tree}) ->
{_SizeWeDontCare, TheTree} = gbt(Name), fun(Key) -> gbt_next(Tree, Key) end.
fun(Key) -> gbt_next(TheTree, Key) end.

View File

@ -0,0 +1,71 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Topic index implemetation with gb_tree as a persistent term.
%% This is only suitable for a static set of topic or topic-filters.
-module(emqx_topic_gbt_pterm).
-export([new/0, new/1]).
-export([insert/4]).
-export([delete/3]).
-export([match/2]).
-export([matches/3]).
-export([get_record/2]).
-type name() :: any().
-type match(ID) :: emqx_topic_gbt:match(ID).
%% @private Only for testing.
-spec new() -> name().
new() ->
new(test).
-spec new(name()) -> name().
new(Name) ->
true = pterm_update(Name, emqx_topic_gbt:new()),
Name.
-spec insert(emqx_types:topic() | emqx_trie_search:words(), _ID, _Record, name()) -> true.
insert(Filter, ID, Record, Name) ->
pterm_update(Name, emqx_topic_gbt:insert(Filter, ID, Record, pterm(Name))).
-spec delete(emqx_types:topic() | emqx_trie_search:words(), _ID, name()) -> name().
delete(Filter, ID, Name) ->
pterm_update(Name, emqx_topic_gbt:delete(Filter, ID, pterm(Name))).
-spec match(emqx_types:topic(), name()) -> match(_ID) | false.
match(Topic, Name) ->
emqx_topic_gbt:match(Topic, pterm(Name)).
-spec matches(emqx_types:topic(), name(), emqx_trie_search:opts()) -> [match(_ID)].
matches(Topic, Name, Opts) ->
emqx_topic_gbt:matches(Topic, pterm(Name), Opts).
%% @doc Fetch the record associated with the match.
-spec get_record(match(_ID), name()) -> _Record.
get_record(Key, Name) ->
emqx_topic_gbt:get_record(Key, pterm(Name)).
%%
pterm(Name) ->
persistent_term:get({?MODULE, Name}).
pterm_update(Name, Tree) ->
persistent_term:put({?MODULE, Name}, Tree),
true.

View File

@ -19,7 +19,6 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -39,12 +38,11 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all), Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
emqx_common_test_helpers:start_apps([]), [{apps, Apps} | Config].
Config.
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_common_test_helpers:stop_apps([]). emqx_cth_suite:stop(proplists:get_value(apps, Config)).
init_per_testcase(TestCase, Config) -> init_per_testcase(TestCase, Config) ->
case erlang:function_exported(?MODULE, TestCase, 2) of case erlang:function_exported(?MODULE, TestCase, 2) of

View File

@ -26,12 +26,11 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([]), Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
Config. [{apps, Apps} | Config].
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_common_test_helpers:stop_apps([]), emqx_cth_suite:stop(proplists:get_value(apps, Config)).
ok.
t_check_pub(_) -> t_check_pub(_) ->
OldConf = emqx:get_config([zones], #{}), OldConf = emqx:get_config([zones], #{}),

View File

@ -19,7 +19,6 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@ -59,31 +58,17 @@ groups() ->
]. ].
init_per_group(tcp, Config) -> init_per_group(tcp, Config) ->
emqx_common_test_helpers:start_apps([]), Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
[{port, 1883}, {conn_fun, connect} | Config]; [{port, 1883}, {conn_fun, connect}, {group_apps, Apps} | Config];
init_per_group(quic, Config) -> init_per_group(quic, Config) ->
UdpPort = 1884, Apps = emqx_cth_suite:start(
emqx_common_test_helpers:start_apps([]), [{emqx, "listeners.quic.test { enable = true, bind = 1884 }"}],
emqx_common_test_helpers:ensure_quic_listener(?MODULE, UdpPort), #{work_dir => emqx_cth_suite:work_dir(Config)}
[{port, UdpPort}, {conn_fun, quic_connect} | Config]; ),
init_per_group(_, Config) -> [{port, 1884}, {conn_fun, quic_connect}, {group_apps, Apps} | Config].
emqx_common_test_helpers:stop_apps([]),
Config.
end_per_group(quic, _Config) -> end_per_group(_Group, Config) ->
emqx_config:put([listeners, quic], #{}), emqx_cth_suite:stop(?config(group_apps, Config)).
ok;
end_per_group(_Group, _Config) ->
ok.
init_per_suite(Config) ->
%% Start Apps
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(TestCase, Config) -> init_per_testcase(TestCase, Config) ->
case erlang:function_exported(?MODULE, TestCase, 2) of case erlang:function_exported(?MODULE, TestCase, 2) of

View File

@ -44,14 +44,33 @@ groups() ->
]. ].
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx),
emqx_config:save_schema_mod_and_names(emqx_schema),
emqx_common_test_helpers:boot_modules(all),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok. ok.
init_per_group(openssl, Config) ->
DataDir = ?config(data_dir, Config),
ListenerConf = #{
bind => <<"0.0.0.0:8883">>,
max_connections => 512000,
ssl_options => #{
keyfile => filename(DataDir, "server.key"),
certfile => filename(DataDir, "server.pem"),
cacertfile => filename(DataDir, "ca.pem"),
ocsp => #{
enable_ocsp_stapling => true,
issuer_pem => filename(DataDir, "ocsp-issuer.pem"),
responder_url => <<"http://127.0.0.1:9877">>
}
}
},
Conf = #{listeners => #{ssl => #{default => ListenerConf}}},
Apps = emqx_cth_suite:start(
[{emqx, #{config => Conf}}],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{group_apps, Apps} | Config];
init_per_group(tls12, Config) -> init_per_group(tls12, Config) ->
[{tls_vsn, "-tls1_2"} | Config]; [{tls_vsn, "-tls1_2"} | Config];
init_per_group(tls13, Config) -> init_per_group(tls13, Config) ->
@ -63,24 +82,14 @@ init_per_group(without_status_request, Config) ->
init_per_group(_Group, Config) -> init_per_group(_Group, Config) ->
Config. Config.
end_per_group(openssl, Config) ->
emqx_cth_suite:stop(?config(group_apps, Config));
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
init_per_testcase(t_openssl_client, Config) -> init_per_testcase(t_openssl_client, Config) ->
ct:timetrap({seconds, 30}), ct:timetrap({seconds, 30}),
DataDir = ?config(data_dir, Config),
Handler = fun(_) -> ok end,
{OCSPResponderPort, OCSPOSPid} = setup_openssl_ocsp(Config), {OCSPResponderPort, OCSPOSPid} = setup_openssl_ocsp(Config),
ConfFilePath = filename:join([DataDir, "openssl_listeners.conf"]),
emqx_common_test_helpers:start_apps(
[],
Handler,
#{
extra_mustache_vars => #{test_data_dir => DataDir},
conf_file_path => ConfFilePath
}
),
ct:sleep(1_000),
[ [
{ocsp_responder_port, OCSPResponderPort}, {ocsp_responder_port, OCSPResponderPort},
{ocsp_responder_os_pid, OCSPOSPid} {ocsp_responder_os_pid, OCSPOSPid}
@ -107,15 +116,25 @@ init_per_testcase(TestCase, Config) when
{ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}} {ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}}
end end
), ),
emqx_mgmt_api_test_util:init_suite([emqx_conf]), Apps = emqx_cth_suite:start(
[
emqx_conf,
emqx,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
),
_ = emqx_common_test_http:create_default_app(),
snabbkaffe:start_trace(), snabbkaffe:start_trace(),
Config; [{tc_apps, Apps} | Config];
false -> false ->
[{skip_does_not_apply, true} | Config] [{skip_does_not_apply, true} | Config]
end; end;
init_per_testcase(t_ocsp_responder_error_responses, Config) -> init_per_testcase(TC, Config) ->
ct:timetrap({seconds, 30}), ct:timetrap({seconds, 30}),
TestPid = self(), TestPid = self(),
DataDir = ?config(data_dir, Config),
ok = meck:new(emqx_ocsp_cache, [non_strict, passthrough, no_history, no_link]), ok = meck:new(emqx_ocsp_cache, [non_strict, passthrough, no_history, no_link]),
meck:expect( meck:expect(
emqx_ocsp_cache, emqx_ocsp_cache,
@ -123,90 +142,44 @@ init_per_testcase(t_ocsp_responder_error_responses, Config) ->
fun(URL, _HTTPTimeout) -> fun(URL, _HTTPTimeout) ->
ct:pal("ocsp http request ~p", [URL]), ct:pal("ocsp http request ~p", [URL]),
TestPid ! {http_get, URL}, TestPid ! {http_get, URL},
persistent_term:get({?MODULE, http_response}) persistent_term:get(
{?MODULE, http_response},
{ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}}
)
end end
), ),
DataDir = ?config(data_dir, Config),
Type = ssl,
Name = test_ocsp,
ListenerOpts = #{
ssl_options =>
#{
certfile => filename:join(DataDir, "server.pem"),
ocsp => #{
enable_ocsp_stapling => true,
responder_url => <<"http://localhost:9877/">>,
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
refresh_http_timeout => <<"15s">>,
refresh_interval => <<"1s">>
}
}
},
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
ConfBin = emqx_utils_maps:binary_key_map(Conf),
CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{
required => false, atom_keys => false
}),
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf),
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
snabbkaffe:start_trace(),
_Heir = spawn_dummy_heir(),
{ok, CachePid} = emqx_ocsp_cache:start_link(),
[
{cache_pid, CachePid}
| Config
];
init_per_testcase(_TestCase, Config) ->
ct:timetrap({seconds, 10}),
TestPid = self(),
ok = meck:new(emqx_ocsp_cache, [non_strict, passthrough, no_history, no_link]),
meck:expect(
emqx_ocsp_cache,
http_get,
fun(URL, _HTTPTimeout) ->
TestPid ! {http_get, URL},
{ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"ocsp response">>}}
end
),
snabbkaffe:start_trace(),
_Heir = spawn_dummy_heir(),
{ok, CachePid} = emqx_ocsp_cache:start_link(),
DataDir = ?config(data_dir, Config),
Type = ssl,
Name = test_ocsp,
ResponderURL = <<"http://localhost:9877/">>, ResponderURL = <<"http://localhost:9877/">>,
ListenerOpts = #{ ListenerConf = #{
ssl_options => enable => false,
#{ bind => 0,
certfile => filename:join(DataDir, "server.pem"), ssl_options => #{
ocsp => #{ certfile => filename(DataDir, "server.pem"),
enable_ocsp_stapling => true, ocsp => #{
responder_url => ResponderURL, enable_ocsp_stapling => true,
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"), responder_url => ResponderURL,
refresh_http_timeout => <<"15s">>, issuer_pem => filename(DataDir, "ocsp-issuer.pem"),
refresh_interval => <<"1s">> refresh_http_timeout => <<"15s">>,
} refresh_interval => <<"1s">>
} }
}
}, },
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}}, Conf = #{listeners => #{ssl => #{test_ocsp => ListenerConf}}},
ConfBin = emqx_utils_maps:binary_key_map(Conf), Apps = emqx_cth_suite:start(
CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{ [{emqx, #{config => Conf}}],
required => false, atom_keys => false #{work_dir => emqx_cth_suite:work_dir(TC, Config)}
}), ),
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf), snabbkaffe:start_trace(),
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
[ [
{cache_pid, CachePid}, {responder_url, ResponderURL},
{responder_url, ResponderURL} {tc_apps, Apps}
| Config | Config
]. ].
filename(Dir, Name) ->
unicode:characters_to_binary(filename:join(Dir, Name)).
end_per_testcase(t_openssl_client, Config) -> end_per_testcase(t_openssl_client, Config) ->
OCSPResponderOSPid = ?config(ocsp_responder_os_pid, Config), catch kill_pid(?config(ocsp_responder_os_pid, Config)),
catch kill_pid(OCSPResponderOSPid),
emqx_common_test_helpers:stop_apps([]),
ok; ok;
end_per_testcase(TestCase, Config) when end_per_testcase(TestCase, Config) when
TestCase =:= t_update_listener; TestCase =:= t_update_listener;
@ -217,19 +190,12 @@ end_per_testcase(TestCase, Config) when
true -> true ->
ok; ok;
false -> false ->
emqx_mgmt_api_test_util:end_suite([emqx_conf]), end_per_testcase(common, Config)
meck:unload([emqx_ocsp_cache]),
ok
end; end;
end_per_testcase(t_ocsp_responder_error_responses, Config) ->
CachePid = ?config(cache_pid, Config),
catch gen_server:stop(CachePid),
meck:unload([emqx_ocsp_cache]),
persistent_term:erase({?MODULE, http_response}),
ok;
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
CachePid = ?config(cache_pid, Config), snabbkaffe:stop(),
catch gen_server:stop(CachePid), emqx_cth_suite:stop(?config(tc_apps, Config)),
persistent_term:erase({?MODULE, http_response}),
meck:unload([emqx_ocsp_cache]), meck:unload([emqx_ocsp_cache]),
ok. ok.
@ -237,24 +203,6 @@ end_per_testcase(_TestCase, Config) ->
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% The real cache makes `emqx_kernel_sup' the heir to its ETS table.
%% In some tests, we don't start the full supervision tree, so we need
%% this dummy process.
spawn_dummy_heir() ->
{_, {ok, _}} =
?wait_async_action(
spawn_link(fun() ->
true = register(emqx_kernel_sup, self()),
?tp(heir_name_registered, #{}),
receive
stop -> ok
end
end),
#{?snk_kind := heir_name_registered},
1_000
),
ok.
does_module_exist(Mod) -> does_module_exist(Mod) ->
case erlang:module_loaded(Mod) of case erlang:module_loaded(Mod) of
true -> true ->
@ -416,11 +364,6 @@ do_ensure_port_open(Port, N) when N > 0 ->
do_ensure_port_open(Port, N - 1) do_ensure_port_open(Port, N - 1)
end. end.
get_sni_fun(ListenerID) ->
#{opts := Opts} = emqx_listeners:find_by_id(ListenerID),
SSLOpts = proplists:get_value(ssl_options, Opts),
proplists:get_value(sni_fun, SSLOpts).
openssl_version() -> openssl_version() ->
Res0 = string:trim(os:cmd("openssl version"), trailing), Res0 = string:trim(os:cmd("openssl version"), trailing),
[_, Res] = string:split(Res0, " "), [_, Res] = string:split(Res0, " "),
@ -516,9 +459,7 @@ t_request_ocsp_response(_Config) ->
end end
). ).
t_request_ocsp_response_restart_cache(Config) -> t_request_ocsp_response_restart_cache(_Config) ->
process_flag(trap_exit, true),
CachePid = ?config(cache_pid, Config),
ListenerID = <<"ssl:test_ocsp">>, ListenerID = <<"ssl:test_ocsp">>,
?check_trace( ?check_trace(
begin begin
@ -526,6 +467,7 @@ t_request_ocsp_response_restart_cache(Config) ->
{ok, _} = emqx_ocsp_cache:fetch_response(ListenerID), {ok, _} = emqx_ocsp_cache:fetch_response(ListenerID),
?wait_async_action( ?wait_async_action(
begin begin
CachePid = whereis(emqx_ocsp_cache),
Ref = monitor(process, CachePid), Ref = monitor(process, CachePid),
exit(CachePid, kill), exit(CachePid, kill),
receive receive
@ -533,9 +475,7 @@ t_request_ocsp_response_restart_cache(Config) ->
ok ok
after 1_000 -> after 1_000 ->
error(cache_not_killed) error(cache_not_killed)
end, end
{ok, _} = emqx_ocsp_cache:start_link(),
ok
end, end,
#{?snk_kind := ocsp_cache_init} #{?snk_kind := ocsp_cache_init}
), ),

View File

@ -1,14 +0,0 @@
listeners.ssl.default {
bind = "0.0.0.0:8883"
max_connections = 512000
ssl_options {
keyfile = "{{ test_data_dir }}/server.key"
certfile = "{{ test_data_dir }}/server.pem"
cacertfile = "{{ test_data_dir }}/ca.pem"
ocsp {
enable_ocsp_stapling = true
issuer_pem = "{{ test_data_dir }}/ocsp-issuer.pem"
responder_url = "http://127.0.0.1:9877"
}
}
}

View File

@ -26,14 +26,13 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all), Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
emqx_common_test_helpers:start_apps([]),
OldSch = erlang:system_flag(schedulers_online, 1), OldSch = erlang:system_flag(schedulers_online, 1),
[{old_sch, OldSch} | Config]. [{apps, Apps}, {old_sch, OldSch} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
erlang:system_flag(schedulers_online, ?config(old_sch, Config)), erlang:system_flag(schedulers_online, ?config(old_sch, Config)),
emqx_common_test_helpers:stop_apps([]). emqx_cth_suite:stop(?config(apps, Config)).
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
emqx_common_test_helpers:boot_modules(all), emqx_common_test_helpers:boot_modules(all),

View File

@ -24,12 +24,11 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all), Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
emqx_common_test_helpers:start_apps([]), [{apps, Apps} | Config].
Config.
end_per_suite(_Config) -> end_per_suite(Config) ->
emqx_common_test_helpers:stop_apps([]). emqx_cth_suite:stop(proplists:get_value(apps, Config)).
init_per_testcase(t_cpu_check_alarm, Config) -> init_per_testcase(t_cpu_check_alarm, Config) ->
SysMon = emqx_config:get([sysmon, os], #{}), SysMon = emqx_config:get([sysmon, os], #{}),

View File

@ -262,7 +262,7 @@ t_publish_as_persistent(_Config) ->
Sub = connect(<<?MODULE_STRING "1">>, true, 30), Sub = connect(<<?MODULE_STRING "1">>, true, 30),
Pub = connect(<<?MODULE_STRING "2">>, true, 30), Pub = connect(<<?MODULE_STRING "2">>, true, 30),
try try
{ok, _, [1]} = emqtt:subscribe(Sub, <<"t/#">>, qos1), {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(Sub, <<"t/#">>, qos2),
Messages = [ Messages = [
{<<"t/1">>, <<"1">>, 0}, {<<"t/1">>, <<"1">>, 0},
{<<"t/1">>, <<"2">>, 1}, {<<"t/1">>, <<"2">>, 1},

View File

@ -323,7 +323,8 @@ t_choose_impl(Config) ->
ds -> emqx_persistent_session_ds ds -> emqx_persistent_session_ds
end, end,
emqx_connection:info({channel, {session, impl}}, sys:get_state(ChanPid)) emqx_connection:info({channel, {session, impl}}, sys:get_state(ChanPid))
). ),
ok = emqtt:disconnect(Client).
t_connect_discards_existing_client(Config) -> t_connect_discards_existing_client(Config) ->
ClientId = ?config(client_id, Config), ClientId = ?config(client_id, Config),
@ -389,9 +390,6 @@ t_connect_session_expiry_interval(Config) ->
ok = emqtt:disconnect(Client2). ok = emqtt:disconnect(Client2).
%% [MQTT-3.1.2-23] %% [MQTT-3.1.2-23]
%% TODO: un-skip after QoS 2 support is implemented in DS.
t_connect_session_expiry_interval_qos2(init, Config) -> skip_ds_tc(Config);
t_connect_session_expiry_interval_qos2('end', _Config) -> ok.
t_connect_session_expiry_interval_qos2(Config) -> t_connect_session_expiry_interval_qos2(Config) ->
ConnFun = ?config(conn_fun, Config), ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config), Topic = ?config(topic, Config),
@ -1009,8 +1007,6 @@ t_unsubscribe(Config) ->
?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]),
ok = emqtt:disconnect(Client). ok = emqtt:disconnect(Client).
t_multiple_subscription_matches(init, Config) -> skip_ds_tc(Config);
t_multiple_subscription_matches('end', _Config) -> ok.
t_multiple_subscription_matches(Config) -> t_multiple_subscription_matches(Config) ->
ConnFun = ?config(conn_fun, Config), ConnFun = ?config(conn_fun, Config),
Topic = ?config(topic, Config), Topic = ?config(topic, Config),

View File

@ -67,7 +67,7 @@ t_session_init(_) ->
Session = emqx_session_mem:create( Session = emqx_session_mem:create(
ClientInfo, ClientInfo,
ConnInfo, ConnInfo,
emqx_session:get_session_conf(ClientInfo, ConnInfo) emqx_session:get_session_conf(ClientInfo)
), ),
?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)), ?assertEqual(#{}, emqx_session_mem:info(subscriptions, Session)),
?assertEqual(0, emqx_session_mem:info(subscriptions_cnt, Session)), ?assertEqual(0, emqx_session_mem:info(subscriptions_cnt, Session)),
@ -531,7 +531,7 @@ session(InitFields) when is_map(InitFields) ->
Session = emqx_session_mem:create( Session = emqx_session_mem:create(
ClientInfo, ClientInfo,
ConnInfo, ConnInfo,
emqx_session:get_session_conf(ClientInfo, ConnInfo) emqx_session:get_session_conf(ClientInfo)
), ),
maps:fold( maps:fold(
fun(Field, Value, SessionAcc) -> fun(Field, Value, SessionAcc) ->

View File

@ -40,7 +40,7 @@ groups() ->
init_per_group(ets, Config) -> init_per_group(ets, Config) ->
[{index_module, emqx_topic_index} | Config]; [{index_module, emqx_topic_index} | Config];
init_per_group(gb_tree, Config) -> init_per_group(gb_tree, Config) ->
[{index_module, emqx_topic_gbt} | Config]. [{index_module, emqx_topic_gbt_pterm} | Config].
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_gateway_mqttsn, [ {application, emqx_gateway_mqttsn, [
{description, "MQTT-SN Gateway"}, {description, "MQTT-SN Gateway"},
{vsn, "0.1.6"}, {vsn, "0.1.7"},
{registered, []}, {registered, []},
{applications, [kernel, stdlib, emqx, emqx_gateway]}, {applications, [kernel, stdlib, emqx, emqx_gateway]},
{env, []}, {env, []},

View File

@ -54,7 +54,7 @@
init(ClientInfo) -> init(ClientInfo) ->
ConnInfo = #{receive_maximum => 1, expiry_interval => 0}, ConnInfo = #{receive_maximum => 1, expiry_interval => 0},
SessionConf = emqx_session:get_session_conf(ClientInfo, ConnInfo), SessionConf = emqx_session:get_session_conf(ClientInfo),
#{ #{
registry => emqx_mqttsn_registry:init(), registry => emqx_mqttsn_registry:init(),
session => emqx_session_mem:create(ClientInfo, ConnInfo, SessionConf) session => emqx_session_mem:create(ClientInfo, ConnInfo, SessionConf)

View File

@ -60,6 +60,7 @@
safe_filename/1, safe_filename/1,
diff_lists/3, diff_lists/3,
merge_lists/3, merge_lists/3,
flattermap/2,
tcp_keepalive_opts/4, tcp_keepalive_opts/4,
format/1, format/1,
format_mfal/1, format_mfal/1,
@ -1002,6 +1003,28 @@ search(ExpectValue, KeyFunc, [Item | List]) ->
false -> search(ExpectValue, KeyFunc, List) false -> search(ExpectValue, KeyFunc, List)
end. end.
%% @doc Maps over a list of terms and flattens the result, giving back a flat
%% list of terms. It's similar to `lists:flatmap/2`, but it also works on a
%% single term as `Fun` output (thus, the wordplay on "flatter").
%% The purpose of this function is to adapt to `Fun`s that return either a `[]`
%% or a term, and to avoid costs of list construction and flattening when
%% dealing with large lists.
-spec flattermap(Fun, [X]) -> [X] when
Fun :: fun((X) -> [X] | X).
flattermap(_Fun, []) ->
[];
flattermap(Fun, [X | Xs]) ->
flatcomb(Fun(X), flattermap(Fun, Xs)).
flatcomb([], Zs) ->
Zs;
flatcomb(Ys = [_ | _], []) ->
Ys;
flatcomb(Ys = [_ | _], Zs = [_ | _]) ->
Ys ++ Zs;
flatcomb(Y, Zs) ->
[Y | Zs].
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(SOCKOPTS, [ -define(SOCKOPTS, [
@ -87,13 +88,13 @@ t_pipeline(_) ->
t_start_timer(_) -> t_start_timer(_) ->
TRef = emqx_utils:start_timer(1, tmsg), TRef = emqx_utils:start_timer(1, tmsg),
timer:sleep(2), timer:sleep(2),
?assertEqual([{timeout, TRef, tmsg}], drain()), ?assertEqual([{timeout, TRef, tmsg}], ?drainMailbox()),
ok = emqx_utils:cancel_timer(TRef). ok = emqx_utils:cancel_timer(TRef).
t_cancel_timer(_) -> t_cancel_timer(_) ->
Timer = emqx_utils:start_timer(0, foo), Timer = emqx_utils:start_timer(0, foo),
ok = emqx_utils:cancel_timer(Timer), ok = emqx_utils:cancel_timer(Timer),
?assertEqual([], drain()), ?assertEqual([], ?drainMailbox()),
ok = emqx_utils:cancel_timer(undefined). ok = emqx_utils:cancel_timer(undefined).
t_proc_name(_) -> t_proc_name(_) ->
@ -153,16 +154,6 @@ t_check(_) ->
emqx_utils:check_oom(Policy) emqx_utils:check_oom(Policy)
). ).
drain() ->
drain([]).
drain(Acc) ->
receive
Msg -> drain([Msg | Acc])
after 0 ->
lists:reverse(Acc)
end.
t_rand_seed(_) -> t_rand_seed(_) ->
?assert(is_tuple(emqx_utils:rand_seed())). ?assert(is_tuple(emqx_utils:rand_seed())).
@ -240,3 +231,47 @@ t_pmap_late_reply(_) ->
[] []
), ),
ok. ok.
t_flattermap(_) ->
?assertEqual(
[42],
emqx_utils:flattermap(fun identity/1, [42])
),
?assertEqual(
[42, 42],
emqx_utils:flattermap(fun duplicate/1, [42])
),
?assertEqual(
[],
emqx_utils:flattermap(fun nil/1, [42])
),
?assertEqual(
[1, 1, 2, 2, 3, 3],
emqx_utils:flattermap(fun duplicate/1, [1, 2, 3])
),
?assertEqual(
[],
emqx_utils:flattermap(fun nil/1, [1, 2, 3])
),
?assertEqual(
[1, 2, 2, 4, 5, 5],
emqx_utils:flattermap(
fun(X) ->
case X rem 3 of
0 -> [];
1 -> X;
2 -> [X, X]
end
end,
[1, 2, 3, 4, 5]
)
).
duplicate(X) ->
[X, X].
nil(_) ->
[].
identity(X) ->
X.