Merge pull request #12874 from ieQu1/dev/EMQX-12030-subscriptions-api

API for durable subscriptions
This commit is contained in:
ieQu1 2024-04-19 17:11:53 +02:00 committed by GitHub
commit bac5100635
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 990 additions and 424 deletions

View File

@ -184,7 +184,7 @@ list_all_pubranges(Node) ->
session_open(Node, ClientId) -> session_open(Node, ClientId) ->
ClientInfo = #{}, ClientInfo = #{},
ConnInfo = #{peername => {undefined, undefined}}, ConnInfo = #{peername => {undefined, undefined}, proto_name => <<"MQTT">>, proto_ver => 5},
WillMsg = undefined, WillMsg = undefined,
erpc:call( erpc:call(
Node, Node,
@ -252,7 +252,6 @@ t_session_subscription_idempotency(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
Session = session_open(Node1, ClientId), Session = session_open(Node1, ClientId),
?assertMatch( ?assertMatch(
#{SubTopicFilter := #{}}, #{SubTopicFilter := #{}},
@ -326,7 +325,6 @@ t_session_unsubscription_idempotency(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
Session = session_open(Node1, ClientId), Session = session_open(Node1, ClientId),
?assertEqual( ?assertEqual(
#{}, #{},
@ -415,10 +413,7 @@ do_t_session_discard(Params) ->
ok ok
end, end,
fun(Trace) -> []
ct:pal("trace:\n ~p", [Trace]),
ok
end
), ),
ok. ok.

View File

@ -75,7 +75,8 @@
%% Managment APIs: %% Managment APIs:
-export([ -export([
list_client_subscriptions/1 list_client_subscriptions/1,
get_client_subscription/2
]). ]).
%% session table operations %% session table operations
@ -116,15 +117,42 @@
%% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be %% Currently, this is the clientid. We avoid `emqx_types:clientid()' because that can be
%% an atom, in theory (?). %% an atom, in theory (?).
-type id() :: binary(). -type id() :: binary().
-type topic_filter() :: emqx_types:topic(). -type topic_filter() :: emqx_types:topic() | #share{}.
%% Subscription and subscription states:
%%
%% Persistent sessions cannot simply update or delete subscriptions,
%% since subscription parameters must be exactly the same during
%% replay.
%%
%% To solve this problem, we store subscriptions in a twofold manner:
%%
%% - `subscription' is an object that holds up-to-date information
%% about the client's subscription and a reference to the latest
%% subscription state id
%%
%% - `subscription_state' is an immutable object that holds
%% information about the subcription parameters at a certain point of
%% time
%%
%% New subscription states are created whenever the client subscribes
%% to a topics, or updates an existing subscription.
%%
%% Stream replay states contain references to the subscription states.
%%
%% Outdated subscription states are discarded when they are not
%% referenced by either subscription or stream replay state objects.
-type subscription_id() :: integer(). -type subscription_id() :: integer().
%% This type is a result of merging
%% `emqx_persistent_session_ds_subs:subscription()' with its current
%% state.
-type subscription() :: #{ -type subscription() :: #{
id := subscription_id(), id := subscription_id(),
start_time := emqx_ds:time(), start_time := emqx_ds:time(),
props := map(), current_state := emqx_persistent_session_ds_subs:subscription_state_id(),
deleted := boolean() subopts := map()
}. }.
-define(TIMER_PULL, timer_pull). -define(TIMER_PULL, timer_pull).
@ -184,7 +212,9 @@
seqno_q2_dup, seqno_q2_dup,
seqno_q2_rec, seqno_q2_rec,
seqno_q2_next, seqno_q2_next,
n_streams n_streams,
awaiting_rel_cnt,
awaiting_rel_max
]). ]).
%% %%
@ -206,7 +236,8 @@ open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
ok = emqx_cm:takeover_kick(ClientID), ok = emqx_cm:takeover_kick(ClientID),
case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of
Session0 = #{} -> Session0 = #{} ->
Session = Session0#{props => Conf}, Session1 = Session0#{props => Conf},
Session = do_expire(ClientInfo, Session1),
{true, ensure_timers(Session), []}; {true, ensure_timers(Session), []};
false -> false ->
false false
@ -249,7 +280,7 @@ info(is_persistent, #{}) ->
info(subscriptions, #{s := S}) -> info(subscriptions, #{s := S}) ->
emqx_persistent_session_ds_subs:to_map(S); emqx_persistent_session_ds_subs:to_map(S);
info(subscriptions_cnt, #{s := S}) -> info(subscriptions_cnt, #{s := S}) ->
emqx_topic_gbt:size(emqx_persistent_session_ds_state:get_subscriptions(S)); emqx_persistent_session_ds_state:n_subscriptions(S);
info(subscriptions_max, #{props := Conf}) -> info(subscriptions_max, #{props := Conf}) ->
maps:get(max_subscriptions, Conf); maps:get(max_subscriptions, Conf);
info(upgrade_qos, #{props := Conf}) -> info(upgrade_qos, #{props := Conf}) ->
@ -262,21 +293,21 @@ info(inflight_max, #{inflight := Inflight}) ->
emqx_persistent_session_ds_inflight:receive_maximum(Inflight); emqx_persistent_session_ds_inflight:receive_maximum(Inflight);
info(retry_interval, #{props := Conf}) -> info(retry_interval, #{props := Conf}) ->
maps:get(retry_interval, Conf); maps:get(retry_interval, Conf);
% info(mqueue, #sessmem{mqueue = MQueue}) ->
% MQueue;
info(mqueue_len, #{inflight := Inflight}) -> info(mqueue_len, #{inflight := Inflight}) ->
emqx_persistent_session_ds_inflight:n_buffered(all, Inflight); emqx_persistent_session_ds_inflight:n_buffered(all, Inflight);
% info(mqueue_max, #sessmem{mqueue = MQueue}) ->
% emqx_mqueue:max_len(MQueue);
info(mqueue_dropped, _Session) -> info(mqueue_dropped, _Session) ->
0; 0;
%% info(next_pkt_id, #{s := S}) -> %% info(next_pkt_id, #{s := S}) ->
%% {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S), %% {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S),
%% PacketId; %% PacketId;
% info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> info(awaiting_rel, #{s := S}) ->
% AwaitingRel; emqx_persistent_session_ds_state:fold_awaiting_rel(fun maps:put/3, #{}, S);
%% info(awaiting_rel_cnt, #{s := S}) -> info(awaiting_rel_max, #{props := Conf}) ->
%% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); maps:get(max_awaiting_rel, Conf);
info(awaiting_rel_cnt, #{s := S}) ->
emqx_persistent_session_ds_state:n_awaiting_rel(S);
info(await_rel_timeout, #{props := Conf}) ->
maps:get(await_rel_timeout, Conf);
info(seqno_q1_comm, #{s := S}) -> info(seqno_q1_comm, #{s := S}) ->
emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S); emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S);
info(seqno_q1_dup, #{s := S}) -> info(seqno_q1_dup, #{s := S}) ->
@ -292,17 +323,7 @@ info(seqno_q2_rec, #{s := S}) ->
info(seqno_q2_next, #{s := S}) -> info(seqno_q2_next, #{s := S}) ->
emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S); emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S);
info(n_streams, #{s := S}) -> info(n_streams, #{s := S}) ->
emqx_persistent_session_ds_state:fold_streams( emqx_persistent_session_ds_state:n_streams(S);
fun(_, _, Acc) -> Acc + 1 end,
0,
S
);
info(awaiting_rel_max, #{props := Conf}) ->
maps:get(max_awaiting_rel, Conf);
info(await_rel_timeout, #{props := _Conf}) ->
%% TODO: currently this setting is ignored:
%% maps:get(await_rel_timeout, Conf).
0;
info({MsgsQ, _PagerParams}, _Session) when MsgsQ =:= mqueue_msgs; MsgsQ =:= inflight_msgs -> info({MsgsQ, _PagerParams}, _Session) when MsgsQ =:= mqueue_msgs; MsgsQ =:= inflight_msgs ->
{error, not_implemented}. {error, not_implemented}.
@ -337,93 +358,49 @@ print_session(ClientId) ->
-spec subscribe(topic_filter(), emqx_types:subopts(), session()) -> -spec subscribe(topic_filter(), emqx_types:subopts(), session()) ->
{ok, session()} | {error, emqx_types:reason_code()}. {ok, session()} | {error, emqx_types:reason_code()}.
subscribe(
#share{},
_SubOpts,
_Session
) ->
%% TODO: Shared subscriptions are not supported yet:
{error, ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED};
subscribe( subscribe(
TopicFilter, TopicFilter,
SubOpts, SubOpts,
Session = #{id := ID, s := S0} Session
) -> ) ->
case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of case emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, SubOpts, Session) of
undefined -> {ok, S1} ->
%% TODO: max subscriptions S = emqx_persistent_session_ds_state:commit(S1),
{ok, Session#{s => S}};
%% N.B.: we chose to update the router before adding the Error = {error, _} ->
%% subscription to the session/iterator table. The Error
%% reasoning for this is as follows: end.
%%
%% Messages matching this topic filter should start to be
%% persisted as soon as possible to avoid missing
%% messages. If this is the first such persistent session
%% subscription, it's important to do so early on.
%%
%% This could, in turn, lead to some inconsistency: if
%% such a route gets created but the session/iterator data
%% fails to be updated accordingly, we have a dangling
%% route. To remove such dangling routes, we may have a
%% periodic GC process that removes routes that do not
%% have a matching persistent subscription. Also, route
%% operations use dirty mnesia operations, which
%% inherently have room for inconsistencies.
%%
%% In practice, we use the iterator reference table as a
%% source of truth, since it is guarded by a transaction
%% context: we consider a subscription operation to be
%% successful if it ended up changing this table. Both
%% router and iterator information can be reconstructed
%% from this table, if needed.
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, ID),
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
Subscription = #{
start_time => now_ms(),
props => SubOpts,
id => SubId,
deleted => false
},
IsNew = true;
Subscription0 = #{} ->
Subscription = Subscription0#{props => SubOpts},
IsNew = false,
S1 = S0
end,
S = emqx_persistent_session_ds_subs:on_subscribe(TopicFilter, Subscription, S1),
?tp(persistent_session_ds_subscription_added, #{
topic_filter => TopicFilter, sub => Subscription, is_new => IsNew
}),
{ok, Session#{s => S}}.
-spec unsubscribe(topic_filter(), session()) -> -spec unsubscribe(topic_filter(), session()) ->
{ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}. {ok, session(), emqx_types:subopts()} | {error, emqx_types:reason_code()}.
unsubscribe( unsubscribe(
TopicFilter, TopicFilter,
Session = #{id := ID, s := S0} Session = #{id := SessionId, s := S0}
) -> ) ->
case emqx_persistent_session_ds_subs:lookup(TopicFilter, S0) of case emqx_persistent_session_ds_subs:on_unsubscribe(SessionId, TopicFilter, S0) of
undefined -> {ok, S1, #{id := SubId, subopts := SubOpts}} ->
{error, ?RC_NO_SUBSCRIPTION_EXISTED}; S2 = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
Subscription = #{props := SubOpts} -> S = emqx_persistent_session_ds_state:commit(S2),
S = do_unsubscribe(ID, TopicFilter, Subscription, S0), {ok, Session#{s => S}, SubOpts};
{ok, Session#{s => S}, SubOpts} Error = {error, _} ->
Error
end. end.
-spec do_unsubscribe(id(), topic_filter(), subscription(), emqx_persistent_session_ds_state:t()) ->
emqx_persistent_session_ds_state:t().
do_unsubscribe(SessionId, TopicFilter, Subscription = #{id := SubId}, S0) ->
S1 = emqx_persistent_session_ds_subs:on_unsubscribe(TopicFilter, Subscription, S0),
?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, topic_filter => TopicFilter
}),
S = emqx_persistent_session_ds_stream_scheduler:on_unsubscribe(SubId, S1),
?tp_span(
persistent_session_ds_subscription_route_delete,
#{session_id => SessionId, topic_filter => TopicFilter},
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId)
),
S.
-spec get_subscription(topic_filter(), session()) -> -spec get_subscription(topic_filter(), session()) ->
emqx_types:subopts() | undefined. emqx_types:subopts() | undefined.
get_subscription(#share{}, _) ->
%% TODO: shared subscriptions are not supported yet:
undefined;
get_subscription(TopicFilter, #{s := S}) -> get_subscription(TopicFilter, #{s := S}) ->
case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of case emqx_persistent_session_ds_subs:lookup(TopicFilter, S) of
_Subscription = #{props := SubOpts} -> #{subopts := SubOpts} ->
SubOpts; SubOpts;
undefined -> undefined ->
undefined undefined
@ -436,11 +413,72 @@ get_subscription(TopicFilter, #{s := S}) ->
-spec publish(emqx_types:packet_id(), emqx_types:message(), session()) -> -spec publish(emqx_types:packet_id(), emqx_types:message(), session()) ->
{ok, emqx_types:publish_result(), session()} {ok, emqx_types:publish_result(), session()}
| {error, emqx_types:reason_code()}. | {error, emqx_types:reason_code()}.
publish(
PacketId,
Msg = #message{qos = ?QOS_2, timestamp = Ts},
Session = #{s := S0}
) ->
case is_awaiting_full(Session) of
false ->
case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of
undefined ->
Results = emqx_broker:publish(Msg),
S = emqx_persistent_session_ds_state:put_awaiting_rel(PacketId, Ts, S0),
{ok, Results, Session#{s => S}};
_Ts ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
end;
true ->
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
end;
publish(_PacketId, Msg, Session) -> publish(_PacketId, Msg, Session) ->
%% TODO: QoS2
Result = emqx_broker:publish(Msg), Result = emqx_broker:publish(Msg),
{ok, Result, Session}. {ok, Result, Session}.
is_awaiting_full(#{s := S, props := Props}) ->
emqx_persistent_session_ds_state:n_awaiting_rel(S) >=
maps:get(max_awaiting_rel, Props, infinity).
-spec expire(emqx_types:clientinfo(), session()) ->
{ok, [], timeout(), session()} | {ok, [], session()}.
expire(ClientInfo, Session0 = #{props := Props}) ->
Session = #{s := S} = do_expire(ClientInfo, Session0),
case emqx_persistent_session_ds_state:n_awaiting_rel(S) of
0 ->
{ok, [], Session};
_ ->
AwaitRelTimeout = maps:get(await_rel_timeout, Props),
{ok, [], AwaitRelTimeout, Session}
end.
do_expire(ClientInfo, Session = #{s := S0, props := Props}) ->
%% 1. Find expired packet IDs:
Now = erlang:system_time(millisecond),
AwaitRelTimeout = maps:get(await_rel_timeout, Props),
ExpiredPacketIds =
emqx_persistent_session_ds_state:fold_awaiting_rel(
fun(PacketId, Ts, Acc) ->
Age = Now - Ts,
case Age > AwaitRelTimeout of
true ->
[PacketId | Acc];
false ->
Acc
end
end,
[],
S0
),
%% 2. Perform side effects:
_ = emqx_session_events:handle_event(ClientInfo, {expired_rel, length(ExpiredPacketIds)}),
%% 3. Update state:
S = lists:foldl(
fun emqx_persistent_session_ds_state:del_awaiting_rel/2,
S0,
ExpiredPacketIds
),
Session#{s => S}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Client -> Broker: PUBACK %% Client -> Broker: PUBACK
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -477,9 +515,14 @@ pubrec(PacketId, Session0) ->
-spec pubrel(emqx_types:packet_id(), session()) -> -spec pubrel(emqx_types:packet_id(), session()) ->
{ok, session()} | {error, emqx_types:reason_code()}. {ok, session()} | {error, emqx_types:reason_code()}.
pubrel(_PacketId, Session = #{}) -> pubrel(PacketId, Session = #{s := S0}) ->
% TODO: stub case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of
{ok, Session}. undefined ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND};
_TS ->
S = emqx_persistent_session_ds_state:del_awaiting_rel(PacketId, S0),
{ok, Session#{s => S}}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Client -> Broker: PUBCOMP %% Client -> Broker: PUBCOMP
@ -552,6 +595,8 @@ handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s :=
S = emqx_persistent_session_ds_state:commit(S0), S = emqx_persistent_session_ds_state:commit(S0),
From ! Ref, From ! Ref,
{ok, [], Session#{s => S}}; {ok, [], Session#{s => S}};
handle_timeout(ClientInfo, expire_awaiting_rel, Session) ->
expire(ClientInfo, Session);
handle_timeout(_ClientInfo, Timeout, Session) -> handle_timeout(_ClientInfo, Timeout, Session) ->
?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}), ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}),
{ok, [], Session}. {ok, [], Session}.
@ -645,7 +690,7 @@ list_client_subscriptions(ClientId) ->
%% TODO: this is not the most optimal implementation, since it %% TODO: this is not the most optimal implementation, since it
%% should be possible to avoid reading extra data (streams, etc.) %% should be possible to avoid reading extra data (streams, etc.)
case print_session(ClientId) of case print_session(ClientId) of
Sess = #{s := #{subscriptions := Subs}} -> Sess = #{s := #{subscriptions := Subs, subscription_states := SStates}} ->
Node = Node =
case Sess of case Sess of
#{'_alive' := {true, Pid}} -> #{'_alive' := {true, Pid}} ->
@ -655,8 +700,9 @@ list_client_subscriptions(ClientId) ->
end, end,
SubList = SubList =
maps:fold( maps:fold(
fun(Topic, #{props := SubProps}, Acc) -> fun(Topic, #{current_state := CS}, Acc) ->
Elem = {Topic, SubProps}, #{subopts := SubOpts} = maps:get(CS, SStates),
Elem = {Topic, SubOpts},
[Elem | Acc] [Elem | Acc]
end, end,
[], [],
@ -670,6 +716,11 @@ list_client_subscriptions(ClientId) ->
{error, not_found} {error, not_found}
end. end.
-spec get_client_subscription(emqx_types:clientid(), emqx_types:topic()) ->
subscription() | undefined.
get_client_subscription(ClientId, Topic) ->
emqx_persistent_session_ds_subs:cold_get_subscription(ClientId, Topic).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Session tables operations %% Session tables operations
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -701,7 +752,12 @@ sync(ClientId) ->
%% the broker. %% the broker.
-spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) -> -spec session_open(id(), emqx_types:clientinfo(), emqx_types:conninfo(), emqx_maybe:t(message())) ->
session() | false. session() | false.
session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) -> session_open(
SessionId,
ClientInfo,
NewConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer},
MaybeWillMsg
) ->
NowMS = now_ms(), NowMS = now_ms(),
case emqx_persistent_session_ds_state:open(SessionId) of case emqx_persistent_session_ds_state:open(SessionId) of
{ok, S0} -> {ok, S0} ->
@ -720,8 +776,9 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) ->
maps:get(peername, NewConnInfo), S2 maps:get(peername, NewConnInfo), S2
), ),
S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3), S4 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S3),
S5 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S4), S5 = set_clientinfo(ClientInfo, S4),
S = emqx_persistent_session_ds_state:commit(S5), S6 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S5),
S = emqx_persistent_session_ds_state:commit(S6),
Inflight = emqx_persistent_session_ds_inflight:new( Inflight = emqx_persistent_session_ds_inflight:new(
receive_maximum(NewConnInfo) receive_maximum(NewConnInfo)
), ),
@ -744,7 +801,9 @@ session_open(SessionId, ClientInfo, NewConnInfo, MaybeWillMsg) ->
emqx_session:conf() emqx_session:conf()
) -> ) ->
session(). session().
session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> session_ensure_new(
Id, ClientInfo, ConnInfo = #{proto_name := ProtoName, proto_ver := ProtoVer}, MaybeWillMsg, Conf
) ->
?tp(debug, persistent_session_ds_ensure_new, #{id => Id}), ?tp(debug, persistent_session_ds_ensure_new, #{id => Id}),
Now = now_ms(), Now = now_ms(),
S0 = emqx_persistent_session_ds_state:create_new(Id), S0 = emqx_persistent_session_ds_state:create_new(Id),
@ -767,8 +826,9 @@ session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
] ]
), ),
S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4), S5 = emqx_persistent_session_ds_state:set_will_message(MaybeWillMsg, S4),
S6 = emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S5), S6 = set_clientinfo(ClientInfo, S5),
S = emqx_persistent_session_ds_state:commit(S6), S7 = emqx_persistent_session_ds_state:set_protocol({ProtoName, ProtoVer}, S6),
S = emqx_persistent_session_ds_state:commit(S7),
#{ #{
id => Id, id => Id,
props => Conf, props => Conf,
@ -779,18 +839,12 @@ session_ensure_new(Id, ClientInfo, ConnInfo, MaybeWillMsg, Conf) ->
%% @doc Called when a client reconnects with `clean session=true' or %% @doc Called when a client reconnects with `clean session=true' or
%% during session GC %% during session GC
-spec session_drop(id(), _Reason) -> ok. -spec session_drop(id(), _Reason) -> ok.
session_drop(ID, Reason) -> session_drop(SessionId, Reason) ->
case emqx_persistent_session_ds_state:open(ID) of case emqx_persistent_session_ds_state:open(SessionId) of
{ok, S0} -> {ok, S0} ->
?tp(debug, drop_persistent_session, #{client_id => ID, reason => Reason}), ?tp(debug, drop_persistent_session, #{client_id => SessionId, reason => Reason}),
_S = emqx_persistent_session_ds_subs:fold( emqx_persistent_session_ds_subs:on_session_drop(SessionId, S0),
fun(TopicFilter, Subscription, S) -> emqx_persistent_session_ds_state:delete(SessionId);
do_unsubscribe(ID, TopicFilter, Subscription, S)
end,
S0,
S0
),
emqx_persistent_session_ds_state:delete(ID);
undefined -> undefined ->
ok ok
end. end.
@ -798,6 +852,11 @@ session_drop(ID, Reason) ->
now_ms() -> now_ms() ->
erlang:system_time(millisecond). erlang:system_time(millisecond).
set_clientinfo(ClientInfo0, S) ->
%% Remove unnecessary fields from the clientinfo:
ClientInfo = maps:without([cn, dn, auth_result], ClientInfo0),
emqx_persistent_session_ds_state:set_clientinfo(ClientInfo, S).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% RPC targets (v1) %% RPC targets (v1)
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -874,22 +933,31 @@ new_batch({StreamKey, Srs0}, BatchSize, Session0 = #{s := S0}, ClientInfo) ->
Session0 Session0
end. end.
enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, ClientInfo) -> enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0, s := S}, ClientInfo) ->
#srs{ #srs{
it_begin = ItBegin0, it_begin = ItBegin0,
it_end = ItEnd0, it_end = ItEnd0,
first_seqno_qos1 = FirstSeqnoQos1, first_seqno_qos1 = FirstSeqnoQos1,
first_seqno_qos2 = FirstSeqnoQos2 first_seqno_qos2 = FirstSeqnoQos2,
sub_state_id = SubStateId
} = Srs0, } = Srs0,
ItBegin = ItBegin =
case IsReplay of case IsReplay of
true -> ItBegin0; true -> ItBegin0;
false -> ItEnd0 false -> ItEnd0
end, end,
SubState = #{} = emqx_persistent_session_ds_state:get_subscription_state(SubStateId, S),
case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of case emqx_ds:next(?PERSISTENT_MESSAGE_DB, ItBegin, BatchSize) of
{ok, ItEnd, Messages} -> {ok, ItEnd, Messages} ->
{Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch( {Inflight, LastSeqnoQos1, LastSeqnoQos2} = process_batch(
IsReplay, Session, ClientInfo, FirstSeqnoQos1, FirstSeqnoQos2, Messages, Inflight0 IsReplay,
Session,
SubState,
ClientInfo,
FirstSeqnoQos1,
FirstSeqnoQos2,
Messages,
Inflight0
), ),
Srs = Srs0#srs{ Srs = Srs0#srs{
it_begin = ItBegin, it_begin = ItBegin,
@ -913,27 +981,29 @@ enqueue_batch(IsReplay, BatchSize, Srs0, Session = #{inflight := Inflight0}, Cli
%% key_of_iter(#{3 := #{3 := #{5 := K}}}) -> %% key_of_iter(#{3 := #{3 := #{5 := K}}}) ->
%% K. %% K.
process_batch(_IsReplay, _Session, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight) -> process_batch(
_IsReplay, _Session, _SubState, _ClientInfo, LastSeqNoQos1, LastSeqNoQos2, [], Inflight
) ->
{Inflight, LastSeqNoQos1, LastSeqNoQos2}; {Inflight, LastSeqNoQos1, LastSeqNoQos2};
process_batch( process_batch(
IsReplay, Session, ClientInfo, FirstSeqNoQos1, FirstSeqNoQos2, [KV | Messages], Inflight0 IsReplay,
Session,
SubState,
ClientInfo,
FirstSeqNoQos1,
FirstSeqNoQos2,
[KV | Messages],
Inflight0
) -> ) ->
#{s := S, props := #{upgrade_qos := UpgradeQoS}} = Session, #{s := S} = Session,
{_DsMsgKey, Msg0 = #message{topic = Topic}} = KV, #{upgrade_qos := UpgradeQoS, subopts := SubOpts} = SubState,
{_DsMsgKey, Msg0} = KV,
Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S), Comm1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S),
Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S), Comm2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S),
Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S), Dup1 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_1), S),
Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S), Dup2 = emqx_persistent_session_ds_state:get_seqno(?dup(?QOS_2), S),
Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S), Rec = emqx_persistent_session_ds_state:get_seqno(?rec, S),
Subs = emqx_persistent_session_ds_state:get_subscriptions(S), Msgs = emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS),
Msgs = [
Msg
|| SubMatch <- emqx_topic_gbt:matches(Topic, Subs, []),
Msg <- begin
#{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs),
emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS)
end
],
{Inflight, LastSeqNoQos1, LastSeqNoQos2} = lists:foldl( {Inflight, LastSeqNoQos1, LastSeqNoQos2} = lists:foldl(
fun(Msg = #message{qos = Qos}, {Acc, SeqNoQos10, SeqNoQos20}) -> fun(Msg = #message{qos = Qos}, {Acc, SeqNoQos10, SeqNoQos20}) ->
case Qos of case Qos of
@ -989,14 +1059,16 @@ process_batch(
Msgs Msgs
), ),
process_batch( process_batch(
IsReplay, Session, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight IsReplay, Session, SubState, ClientInfo, LastSeqNoQos1, LastSeqNoQos2, Messages, Inflight
). ).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Transient messages %% Transient messages
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos := UpgradeQoS}}) -> enqueue_transient(
_ClientInfo, Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}
) ->
%% TODO: Such messages won't be retransmitted, should the session %% TODO: Such messages won't be retransmitted, should the session
%% reconnect before transient messages are acked. %% reconnect before transient messages are acked.
%% %%
@ -1006,18 +1078,6 @@ enqueue_transient(ClientInfo, Msg0, Session = #{s := S, props := #{upgrade_qos :
%% queued messages. Since streams in this DB are exclusive to the %% queued messages. Since streams in this DB are exclusive to the
%% session, messages from the queue can be dropped as soon as they %% session, messages from the queue can be dropped as soon as they
%% are acked. %% are acked.
Subs = emqx_persistent_session_ds_state:get_subscriptions(S),
Msgs = [
Msg
|| SubMatch <- emqx_topic_gbt:matches(Msg0#message.topic, Subs, []),
Msg <- begin
#{props := SubOpts} = emqx_topic_gbt:get_record(SubMatch, Subs),
emqx_session:enrich_message(ClientInfo, Msg0, SubOpts, UpgradeQoS)
end
],
lists:foldl(fun do_enqueue_transient/2, Session, Msgs).
do_enqueue_transient(Msg = #message{qos = Qos}, Session = #{inflight := Inflight0, s := S0}) ->
case Qos of case Qos of
?QOS_0 -> ?QOS_0 ->
S = S0, S = S0,

View File

@ -65,17 +65,21 @@
last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(), last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(),
%% This stream belongs to an unsubscribed topic-filter, and is %% This stream belongs to an unsubscribed topic-filter, and is
%% marked for deletion: %% marked for deletion:
unsubscribed = false :: boolean() unsubscribed = false :: boolean(),
%% Reference to the subscription state:
sub_state_id :: emqx_persistent_session_ds_subs:subscription_state_id()
}). }).
%% Session metadata keys: %% Session metadata keys:
-define(created_at, created_at). -define(created_at, created_at).
-define(last_alive_at, last_alive_at). -define(last_alive_at, last_alive_at).
-define(expiry_interval, expiry_interval). -define(expiry_interval, expiry_interval).
%% Unique integer used to create unique identities %% Unique integer used to create unique identities:
-define(last_id, last_id). -define(last_id, last_id).
%% Connection info (relevent for the dashboard):
-define(peername, peername). -define(peername, peername).
-define(will_message, will_message). -define(will_message, will_message).
-define(clientinfo, clientinfo). -define(clientinfo, clientinfo).
-define(protocol, protocol).
-endif. -endif.

View File

@ -22,6 +22,9 @@
%% It is responsible for saving, caching, and restoring session state. %% It is responsible for saving, caching, and restoring session state.
%% It is completely devoid of business logic. Not even the default %% It is completely devoid of business logic. Not even the default
%% values should be set in this module. %% values should be set in this module.
%%
%% Session process MUST NOT use `cold_*' functions! They are reserved
%% for use in the management APIs.
-module(emqx_persistent_session_ds_state). -module(emqx_persistent_session_ds_state).
-export([create_tables/0]). -export([create_tables/0]).
@ -33,22 +36,44 @@
-export([get_clientinfo/1, set_clientinfo/2]). -export([get_clientinfo/1, set_clientinfo/2]).
-export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]).
-export([get_peername/1, set_peername/2]). -export([get_peername/1, set_peername/2]).
-export([get_protocol/1, set_protocol/2]).
-export([new_id/1]). -export([new_id/1]).
-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). -export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]).
-export([get_seqno/2, put_seqno/3]). -export([get_seqno/2, put_seqno/3]).
-export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]).
-export([get_subscriptions/1, put_subscription/4, del_subscription/3]). -export([
get_subscription_state/2,
cold_get_subscription_state/2,
fold_subscription_states/3,
put_subscription_state/3,
del_subscription_state/2
]).
-export([
get_subscription/2,
cold_get_subscription/2,
fold_subscriptions/3,
n_subscriptions/1,
put_subscription/3,
del_subscription/2
]).
-export([
get_awaiting_rel/2,
put_awaiting_rel/3,
del_awaiting_rel/2,
fold_awaiting_rel/3,
n_awaiting_rel/1
]).
-export([make_session_iterator/0, session_iterator_next/2]). -export([make_session_iterator/0, session_iterator_next/2]).
-export_type([ -export_type([
t/0, t/0,
metadata/0, metadata/0,
subscriptions/0,
seqno_type/0, seqno_type/0,
stream_key/0, stream_key/0,
rank_key/0, rank_key/0,
session_iterator/0 session_iterator/0,
protocol/0
]). ]).
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
@ -62,8 +87,6 @@
-type message() :: emqx_types:message(). -type message() :: emqx_types:message().
-type subscriptions() :: emqx_topic_gbt:t(_SubId, emqx_persistent_session_ds:subscription()).
-opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'. -opaque session_iterator() :: emqx_persistent_session_ds:id() | '$end_of_table'.
%% Generic key-value wrapper that is used for exporting arbitrary %% Generic key-value wrapper that is used for exporting arbitrary
@ -92,13 +115,16 @@
dirty :: #{K => dirty | del} dirty :: #{K => dirty | del}
}. }.
-type protocol() :: {binary(), emqx_types:proto_ver()}.
-type metadata() :: -type metadata() ::
#{ #{
?created_at => emqx_persistent_session_ds:timestamp(), ?created_at => emqx_persistent_session_ds:timestamp(),
?last_alive_at => emqx_persistent_session_ds:timestamp(), ?last_alive_at => emqx_persistent_session_ds:timestamp(),
?expiry_interval => non_neg_integer(), ?expiry_interval => non_neg_integer(),
?last_id => integer(), ?last_id => integer(),
?peername => emqx_types:peername() ?peername => emqx_types:peername(),
?protocol => protocol()
}. }.
-type seqno_type() :: -type seqno_type() ::
@ -110,22 +136,49 @@
| ?rec | ?rec
| ?committed(?QOS_2). | ?committed(?QOS_2).
-define(id, id).
-define(dirty, dirty).
-define(metadata, metadata).
-define(subscriptions, subscriptions).
-define(subscription_states, subscription_states).
-define(seqnos, seqnos).
-define(streams, streams).
-define(ranks, ranks).
-define(awaiting_rel, awaiting_rel).
-opaque t() :: #{ -opaque t() :: #{
id := emqx_persistent_session_ds:id(), ?id := emqx_persistent_session_ds:id(),
dirty := boolean(), ?dirty := boolean(),
metadata := metadata(), ?metadata := metadata(),
subscriptions := subscriptions(), ?subscriptions := pmap(
seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_subs:subscription()
streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), ),
ranks := pmap(term(), integer()) ?subscription_states := pmap(
emqx_persistent_session_ds_subs:subscription_state_id(),
emqx_persistent_session_ds_subs:subscription_state()
),
?seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()),
?streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()),
?ranks := pmap(term(), integer()),
?awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer())
}. }.
-define(session_tab, emqx_ds_session_tab). -define(session_tab, emqx_ds_session_tab).
-define(subscription_tab, emqx_ds_session_subscriptions). -define(subscription_tab, emqx_ds_session_subscriptions).
-define(subscription_states_tab, emqx_ds_session_subscription_states).
-define(stream_tab, emqx_ds_session_streams). -define(stream_tab, emqx_ds_session_streams).
-define(seqno_tab, emqx_ds_session_seqnos). -define(seqno_tab, emqx_ds_session_seqnos).
-define(rank_tab, emqx_ds_session_ranks). -define(rank_tab, emqx_ds_session_ranks).
-define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). -define(awaiting_rel_tab, emqx_ds_session_awaiting_rel).
-define(pmaps, [
{?subscriptions, ?subscription_tab},
{?subscription_states, ?subscription_states_tab},
{?streams, ?stream_tab},
{?seqnos, ?seqno_tab},
{?ranks, ?rank_tab},
{?awaiting_rel, ?awaiting_rel_tab}
]).
%% Enable this flag if you suspect some code breaks the sequence: %% Enable this flag if you suspect some code breaks the sequence:
-ifndef(CHECK_SEQNO). -ifndef(CHECK_SEQNO).
@ -152,23 +205,25 @@ create_tables() ->
{attributes, record_info(fields, kv)} {attributes, record_info(fields, kv)}
] ]
), ),
[create_kv_pmap_table(Table) || Table <- ?pmap_tables], {_, PmapTables} = lists:unzip(?pmaps),
mria:wait_for_tables([?session_tab | ?pmap_tables]). [create_kv_pmap_table(Table) || Table <- PmapTables],
mria:wait_for_tables([?session_tab | PmapTables]).
-spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined. -spec open(emqx_persistent_session_ds:id()) -> {ok, t()} | undefined.
open(SessionId) -> open(SessionId) ->
ro_transaction(fun() -> ro_transaction(fun() ->
case kv_restore(?session_tab, SessionId) of case kv_restore(?session_tab, SessionId) of
[Metadata] -> [Metadata] ->
Rec = #{ Rec = update_pmaps(
id => SessionId, fun(_Pmap, Table) ->
metadata => Metadata, pmap_open(Table, SessionId)
subscriptions => read_subscriptions(SessionId), end,
streams => pmap_open(?stream_tab, SessionId), #{
seqnos => pmap_open(?seqno_tab, SessionId), id => SessionId,
ranks => pmap_open(?rank_tab, SessionId), metadata => Metadata,
?unset_dirty ?unset_dirty
}, }
),
{ok, Rec}; {ok, Rec};
[] -> [] ->
undefined undefined
@ -185,27 +240,13 @@ print_session(SessionId) ->
end. end.
-spec format(t()) -> map(). -spec format(t()) -> map().
format(#{ format(Rec) ->
metadata := Metadata, update_pmaps(
subscriptions := SubsGBT, fun(Pmap, _Table) ->
streams := Streams, pmap_format(Pmap)
seqnos := Seqnos,
ranks := Ranks
}) ->
Subs = emqx_topic_gbt:fold(
fun(Key, Sub, Acc) ->
maps:put(emqx_topic_gbt:get_topic(Key), Sub, Acc)
end, end,
#{}, maps:without([id, dirty], Rec)
SubsGBT ).
),
#{
metadata => Metadata,
subscriptions => Subs,
streams => pmap_format(Streams),
seqnos => pmap_format(Seqnos),
ranks => pmap_format(Ranks)
}.
-spec list_sessions() -> [emqx_persistent_session_ds:id()]. -spec list_sessions() -> [emqx_persistent_session_ds:id()].
list_sessions() -> list_sessions() ->
@ -215,7 +256,7 @@ list_sessions() ->
delete(Id) -> delete(Id) ->
transaction( transaction(
fun() -> fun() ->
[kv_pmap_delete(Table, Id) || Table <- ?pmap_tables], [kv_pmap_delete(Table, Id) || {_, Table} <- ?pmaps],
mnesia:delete(?session_tab, Id, write) mnesia:delete(?session_tab, Id, write)
end end
). ).
@ -226,36 +267,34 @@ commit(Rec = #{dirty := false}) ->
commit( commit(
Rec = #{ Rec = #{
id := SessionId, id := SessionId,
metadata := Metadata, metadata := Metadata
streams := Streams,
seqnos := SeqNos,
ranks := Ranks
} }
) -> ) ->
check_sequence(Rec), check_sequence(Rec),
transaction(fun() -> transaction(fun() ->
kv_persist(?session_tab, SessionId, Metadata), kv_persist(?session_tab, SessionId, Metadata),
Rec#{ update_pmaps(
streams => pmap_commit(SessionId, Streams), fun(Pmap, _Table) ->
seqnos => pmap_commit(SessionId, SeqNos), pmap_commit(SessionId, Pmap)
ranks => pmap_commit(SessionId, Ranks), end,
?unset_dirty Rec#{?unset_dirty}
} )
end). end).
-spec create_new(emqx_persistent_session_ds:id()) -> t(). -spec create_new(emqx_persistent_session_ds:id()) -> t().
create_new(SessionId) -> create_new(SessionId) ->
transaction(fun() -> transaction(fun() ->
delete(SessionId), delete(SessionId),
#{ update_pmaps(
id => SessionId, fun(_Pmap, Table) ->
metadata => #{}, pmap_open(Table, SessionId)
subscriptions => emqx_topic_gbt:new(), end,
streams => pmap_open(?stream_tab, SessionId), #{
seqnos => pmap_open(?seqno_tab, SessionId), id => SessionId,
ranks => pmap_open(?rank_tab, SessionId), metadata => #{},
?set_dirty ?set_dirty
} }
)
end). end).
%% %%
@ -292,6 +331,14 @@ get_peername(Rec) ->
set_peername(Val, Rec) -> set_peername(Val, Rec) ->
set_meta(?peername, Val, Rec). set_meta(?peername, Val, Rec).
-spec get_protocol(t()) -> protocol() | undefined.
get_protocol(Rec) ->
get_meta(?protocol, Rec).
-spec set_protocol(protocol(), t()) -> t().
set_protocol(Val, Rec) ->
set_meta(?protocol, Val, Rec).
-spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()). -spec get_clientinfo(t()) -> emqx_maybe:t(emqx_types:clientinfo()).
get_clientinfo(Rec) -> get_clientinfo(Rec) ->
get_meta(?clientinfo, Rec). get_meta(?clientinfo, Rec).
@ -336,30 +383,65 @@ new_id(Rec) ->
%% %%
-spec get_subscriptions(t()) -> subscriptions(). -spec get_subscription(emqx_persistent_session_ds:topic_filter(), t()) ->
get_subscriptions(#{subscriptions := Subs}) -> emqx_persistent_session_ds_subs:subscription() | undefined.
Subs. get_subscription(TopicFilter, Rec) ->
gen_get(?subscriptions, TopicFilter, Rec).
-spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
[emqx_persistent_session_ds_subs:subscription()].
cold_get_subscription(SessionId, Topic) ->
kv_pmap_read(?subscription_tab, SessionId, Topic).
-spec fold_subscriptions(fun(), Acc, t()) -> Acc.
fold_subscriptions(Fun, Acc, Rec) ->
gen_fold(?subscriptions, Fun, Acc, Rec).
-spec n_subscriptions(t()) -> non_neg_integer().
n_subscriptions(Rec) ->
gen_size(?subscriptions, Rec).
-spec put_subscription( -spec put_subscription(
emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds:topic_filter(),
_SubId, emqx_persistent_session_ds_subs:subscription(),
emqx_persistent_session_ds:subscription(),
t() t()
) -> t(). ) -> t().
put_subscription(TopicFilter, SubId, Subscription, Rec = #{id := Id, subscriptions := Subs0}) -> put_subscription(TopicFilter, Subscription, Rec) ->
%% Note: currently changes to the subscriptions are persisted immediately. gen_put(?subscriptions, TopicFilter, Subscription, Rec).
Key = {TopicFilter, SubId},
transaction(fun() -> kv_pmap_persist(?subscription_tab, Id, Key, Subscription) end),
Subs = emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Subs0),
Rec#{subscriptions => Subs}.
-spec del_subscription(emqx_persistent_session_ds:topic_filter(), _SubId, t()) -> t(). -spec del_subscription(emqx_persistent_session_ds:topic_filter(), t()) -> t().
del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0}) -> del_subscription(TopicFilter, Rec) ->
%% Note: currently the subscriptions are persisted immediately. gen_del(?subscriptions, TopicFilter, Rec).
Key = {TopicFilter, SubId},
transaction(fun() -> kv_pmap_delete(?subscription_tab, Id, Key) end), %%
Subs = emqx_topic_gbt:delete(TopicFilter, SubId, Subs0),
Rec#{subscriptions => Subs}. -spec get_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) ->
emqx_persistent_session_ds_subs:subscription_state() | undefined.
get_subscription_state(SStateId, Rec) ->
gen_get(?subscription_states, SStateId, Rec).
-spec cold_get_subscription_state(
emqx_persistent_session_ds:id(), emqx_persistent_session_ds_subs:subscription_state_id()
) ->
[emqx_persistent_session_ds_subs:subscription_state()].
cold_get_subscription_state(SessionId, SStateId) ->
kv_pmap_read(?subscription_states_tab, SessionId, SStateId).
-spec fold_subscription_states(fun(), Acc, t()) -> Acc.
fold_subscription_states(Fun, Acc, Rec) ->
gen_fold(?subscription_states, Fun, Acc, Rec).
-spec put_subscription_state(
emqx_persistent_session_ds_subs:subscription_state_id(),
emqx_persistent_session_ds_subs:subscription_state(),
t()
) -> t().
put_subscription_state(SStateId, SState, Rec) ->
gen_put(?subscription_states, SStateId, SState, Rec).
-spec del_subscription_state(emqx_persistent_session_ds_subs:subscription_state_id(), t()) -> t().
del_subscription_state(SStateId, Rec) ->
gen_del(?subscription_states, SStateId, Rec).
%% %%
@ -368,29 +450,33 @@ del_subscription(TopicFilter, SubId, Rec = #{id := Id, subscriptions := Subs0})
-spec get_stream(stream_key(), t()) -> -spec get_stream(stream_key(), t()) ->
emqx_persistent_session_ds:stream_state() | undefined. emqx_persistent_session_ds:stream_state() | undefined.
get_stream(Key, Rec) -> get_stream(Key, Rec) ->
gen_get(streams, Key, Rec). gen_get(?streams, Key, Rec).
-spec put_stream(stream_key(), emqx_persistent_session_ds:stream_state(), t()) -> t(). -spec put_stream(stream_key(), emqx_persistent_session_ds:stream_state(), t()) -> t().
put_stream(Key, Val, Rec) -> put_stream(Key, Val, Rec) ->
gen_put(streams, Key, Val, Rec). gen_put(?streams, Key, Val, Rec).
-spec del_stream(stream_key(), t()) -> t(). -spec del_stream(stream_key(), t()) -> t().
del_stream(Key, Rec) -> del_stream(Key, Rec) ->
gen_del(streams, Key, Rec). gen_del(?streams, Key, Rec).
-spec fold_streams(fun(), Acc, t()) -> Acc. -spec fold_streams(fun(), Acc, t()) -> Acc.
fold_streams(Fun, Acc, Rec) -> fold_streams(Fun, Acc, Rec) ->
gen_fold(streams, Fun, Acc, Rec). gen_fold(?streams, Fun, Acc, Rec).
-spec n_streams(t()) -> non_neg_integer().
n_streams(Rec) ->
gen_size(?streams, Rec).
%% %%
-spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined. -spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined.
get_seqno(Key, Rec) -> get_seqno(Key, Rec) ->
gen_get(seqnos, Key, Rec). gen_get(?seqnos, Key, Rec).
-spec put_seqno(seqno_type(), emqx_persistent_session_ds:seqno(), t()) -> t(). -spec put_seqno(seqno_type(), emqx_persistent_session_ds:seqno(), t()) -> t().
put_seqno(Key, Val, Rec) -> put_seqno(Key, Val, Rec) ->
gen_put(seqnos, Key, Val, Rec). gen_put(?seqnos, Key, Val, Rec).
%% %%
@ -398,19 +484,43 @@ put_seqno(Key, Val, Rec) ->
-spec get_rank(rank_key(), t()) -> integer() | undefined. -spec get_rank(rank_key(), t()) -> integer() | undefined.
get_rank(Key, Rec) -> get_rank(Key, Rec) ->
gen_get(ranks, Key, Rec). gen_get(?ranks, Key, Rec).
-spec put_rank(rank_key(), integer(), t()) -> t(). -spec put_rank(rank_key(), integer(), t()) -> t().
put_rank(Key, Val, Rec) -> put_rank(Key, Val, Rec) ->
gen_put(ranks, Key, Val, Rec). gen_put(?ranks, Key, Val, Rec).
-spec del_rank(rank_key(), t()) -> t(). -spec del_rank(rank_key(), t()) -> t().
del_rank(Key, Rec) -> del_rank(Key, Rec) ->
gen_del(ranks, Key, Rec). gen_del(?ranks, Key, Rec).
-spec fold_ranks(fun(), Acc, t()) -> Acc. -spec fold_ranks(fun(), Acc, t()) -> Acc.
fold_ranks(Fun, Acc, Rec) -> fold_ranks(Fun, Acc, Rec) ->
gen_fold(ranks, Fun, Acc, Rec). gen_fold(?ranks, Fun, Acc, Rec).
%%
-spec get_awaiting_rel(emqx_types:packet_id(), t()) -> integer() | undefined.
get_awaiting_rel(Key, Rec) ->
gen_get(?awaiting_rel, Key, Rec).
-spec put_awaiting_rel(emqx_types:packet_id(), _Timestamp :: integer(), t()) -> t().
put_awaiting_rel(Key, Val, Rec) ->
gen_put(?awaiting_rel, Key, Val, Rec).
-spec del_awaiting_rel(emqx_types:packet_id(), t()) -> t().
del_awaiting_rel(Key, Rec) ->
gen_del(?awaiting_rel, Key, Rec).
-spec fold_awaiting_rel(fun(), Acc, t()) -> Acc.
fold_awaiting_rel(Fun, Acc, Rec) ->
gen_fold(?awaiting_rel, Fun, Acc, Rec).
-spec n_awaiting_rel(t()) -> non_neg_integer().
n_awaiting_rel(Rec) ->
gen_size(?awaiting_rel, Rec).
%%
-spec make_session_iterator() -> session_iterator(). -spec make_session_iterator() -> session_iterator().
make_session_iterator() -> make_session_iterator() ->
@ -475,16 +585,20 @@ gen_del(Field, Key, Rec) ->
Rec#{?set_dirty} Rec#{?set_dirty}
). ).
%% gen_size(Field, Rec) ->
check_sequence(Rec),
pmap_size(maps:get(Field, Rec)).
read_subscriptions(SessionId) -> -spec update_pmaps(fun((pmap(_K, _V) | undefined, atom()) -> term()), map()) -> map().
Records = kv_pmap_restore(?subscription_tab, SessionId), update_pmaps(Fun, Map) ->
lists:foldl( lists:foldl(
fun({{TopicFilter, SubId}, Subscription}, Acc) -> fun({MapKey, Table}, Acc) ->
emqx_topic_gbt:insert(TopicFilter, SubId, Subscription, Acc) OldVal = maps:get(MapKey, Map, undefined),
Val = Fun(OldVal, Table),
maps:put(MapKey, Val, Acc)
end, end,
emqx_topic_gbt:new(), Map,
Records ?pmaps
). ).
%% %%
@ -547,6 +661,10 @@ pmap_commit(
pmap_format(#pmap{cache = Cache}) -> pmap_format(#pmap{cache = Cache}) ->
Cache. Cache.
-spec pmap_size(pmap(_K, _V)) -> non_neg_integer().
pmap_size(#pmap{cache = Cache}) ->
maps:size(Cache).
%% Functions dealing with set tables: %% Functions dealing with set tables:
kv_persist(Tab, SessionId, Val0) -> kv_persist(Tab, SessionId, Val0) ->
@ -574,6 +692,14 @@ kv_pmap_persist(Tab, SessionId, Key, Val0) ->
Val = encoder(encode, Tab, Val0), Val = encoder(encode, Tab, Val0),
mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write). mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write).
kv_pmap_read(Table, SessionId, Key) ->
lists:map(
fun(#kv{v = Val}) ->
encoder(decode, Table, Val)
end,
mnesia:dirty_read(Table, {SessionId, Key})
).
kv_pmap_restore(Table, SessionId) -> kv_pmap_restore(Table, SessionId) ->
MS = [{#kv{k = {SessionId, '$1'}, v = '$2'}, [], [{{'$1', '$2'}}]}], MS = [{#kv{k = {SessionId, '$1'}, v = '$2'}, [], [{{'$1', '$2'}}]}],
Objs = mnesia:select(Table, MS, read), Objs = mnesia:select(Table, MS, read),

View File

@ -126,9 +126,10 @@ find_new_streams(S) ->
renew_streams(S0) -> renew_streams(S0) ->
S1 = remove_unsubscribed_streams(S0), S1 = remove_unsubscribed_streams(S0),
S2 = remove_fully_replayed_streams(S1), S2 = remove_fully_replayed_streams(S1),
S3 = update_stream_subscription_state_ids(S2),
emqx_persistent_session_ds_subs:fold( emqx_persistent_session_ds_subs:fold(
fun fun
(Key, #{start_time := StartTime, id := SubId, deleted := false}, Acc) -> (Key, #{start_time := StartTime, id := SubId, current_state := SStateId}, Acc) ->
TopicFilter = emqx_topic:words(Key), TopicFilter = emqx_topic:words(Key),
Streams = select_streams( Streams = select_streams(
SubId, SubId,
@ -137,7 +138,7 @@ renew_streams(S0) ->
), ),
lists:foldl( lists:foldl(
fun(I, Acc1) -> fun(I, Acc1) ->
ensure_iterator(TopicFilter, StartTime, SubId, I, Acc1) ensure_iterator(TopicFilter, StartTime, SubId, SStateId, I, Acc1)
end, end,
Acc, Acc,
Streams Streams
@ -145,8 +146,8 @@ renew_streams(S0) ->
(_Key, _DeletedSubscription, Acc) -> (_Key, _DeletedSubscription, Acc) ->
Acc Acc
end, end,
S2, S3,
S2 S3
). ).
-spec on_unsubscribe( -spec on_unsubscribe(
@ -201,7 +202,7 @@ is_fully_acked(Srs, S) ->
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> ensure_iterator(TopicFilter, StartTime, SubId, SStateId, {{RankX, RankY}, Stream}, S) ->
Key = {SubId, Stream}, Key = {SubId, Stream},
case emqx_persistent_session_ds_state:get_stream(Key, S) of case emqx_persistent_session_ds_state:get_stream(Key, S) of
undefined -> undefined ->
@ -214,7 +215,8 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
rank_x = RankX, rank_x = RankX,
rank_y = RankY, rank_y = RankY,
it_begin = Iterator, it_begin = Iterator,
it_end = Iterator it_end = Iterator,
sub_state_id = SStateId
}, },
emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S); emqx_persistent_session_ds_state:put_stream(Key, NewStreamState, S);
{error, recoverable, Reason} -> {error, recoverable, Reason} ->
@ -350,6 +352,38 @@ remove_fully_replayed_streams(S0) ->
S1 S1
). ).
%% @doc Update subscription state IDs for all streams that don't have unacked messages
-spec update_stream_subscription_state_ids(emqx_persistent_session_ds_state:t()) ->
emqx_persistent_session_ds_state:t().
update_stream_subscription_state_ids(S0) ->
CommQos1 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S0),
CommQos2 = emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_2), S0),
%% Find the latest state IDs for each subscription:
LastSStateIds = emqx_persistent_session_ds_state:fold_subscriptions(
fun(_, #{id := SubId, current_state := SStateId}, Acc) ->
Acc#{SubId => SStateId}
end,
#{},
S0
),
%% Update subscription state IDs for fully acked streams:
emqx_persistent_session_ds_state:fold_streams(
fun
(_, #srs{unsubscribed = true}, S) ->
S;
(Key = {SubId, _Stream}, SRS0, S) ->
case is_fully_acked(CommQos1, CommQos2, SRS0) of
true ->
SRS = SRS0#srs{sub_state_id = maps:get(SubId, LastSStateIds)},
emqx_persistent_session_ds_state:put_stream(Key, SRS, S);
false ->
S
end
end,
S0,
S0
).
%% @doc Compare the streams by the order in which they were replayed. %% @doc Compare the streams by the order in which they were replayed.
compare_streams( compare_streams(
{_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}}, {_KeyA, #srs{first_seqno_qos1 = A1, first_seqno_qos2 = A2}},

View File

@ -24,14 +24,56 @@
-module(emqx_persistent_session_ds_subs). -module(emqx_persistent_session_ds_subs).
%% API: %% API:
-export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]). -export([
on_subscribe/3,
on_unsubscribe/3,
on_session_drop/2,
gc/1,
lookup/2,
to_map/1,
fold/3
]).
-export_type([]). %% Management API:
-export([
cold_get_subscription/2
]).
-export_type([subscription_state_id/0, subscription/0, subscription_state/0]).
-include("emqx_persistent_session_ds.hrl").
-include("emqx_mqtt.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
%%================================================================================ %%================================================================================
%% Type declarations %% Type declarations
%%================================================================================ %%================================================================================
-type subscription() :: #{
%% Session-unique identifier of the subscription. Other objects
%% can use it as a compact reference:
id := emqx_persistent_session_ds:subscription_id(),
%% Reference to the current subscription state:
current_state := subscription_state_id(),
%% Time when the subscription was added:
start_time := emqx_ds:time()
}.
-type subscription_state_id() :: integer().
-type subscription_state() :: #{
parent_subscription := emqx_persistent_session_ds:subscription_id(),
upgrade_qos := boolean(),
%% SubOpts:
subopts := #{
nl => _,
qos => _,
rap => _,
subid => _,
_ => _
}
}.
%%================================================================================ %%================================================================================
%% API functions %% API functions
%%================================================================================ %%================================================================================
@ -39,41 +81,131 @@
%% @doc Process a new subscription %% @doc Process a new subscription
-spec on_subscribe( -spec on_subscribe(
emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds:topic_filter(),
emqx_persistent_session_ds:subscription(), emqx_types:subopts(),
emqx_persistent_session_ds_state:t() emqx_persistent_session_ds:session()
) -> ) ->
emqx_persistent_session_ds_state:t(). {ok, emqx_persistent_session_ds_state:t()} | {error, ?RC_QUOTA_EXCEEDED}.
on_subscribe(TopicFilter, Subscription, S) -> on_subscribe(TopicFilter, SubOpts, #{id := SessionId, s := S0, props := Props}) ->
emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S). #{upgrade_qos := UpgradeQoS, max_subscriptions := MaxSubscriptions} = Props,
case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S0) of
undefined ->
%% This is a new subscription:
case emqx_persistent_session_ds_state:n_subscriptions(S0) < MaxSubscriptions of
true ->
ok = emqx_persistent_session_ds_router:do_add_route(TopicFilter, SessionId),
{SubId, S1} = emqx_persistent_session_ds_state:new_id(S0),
{SStateId, S2} = emqx_persistent_session_ds_state:new_id(S1),
SState = #{
parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts
},
S3 = emqx_persistent_session_ds_state:put_subscription_state(
SStateId, SState, S2
),
Subscription = #{
id => SubId,
current_state => SStateId,
start_time => now_ms()
},
S = emqx_persistent_session_ds_state:put_subscription(
TopicFilter, Subscription, S3
),
?tp(persistent_session_ds_subscription_added, #{
topic_filter => TopicFilter, session => SessionId
}),
{ok, S};
false ->
{error, ?RC_QUOTA_EXCEEDED}
end;
Sub0 = #{current_state := SStateId0, id := SubId} ->
SState = #{parent_subscription => SubId, upgrade_qos => UpgradeQoS, subopts => SubOpts},
case emqx_persistent_session_ds_state:get_subscription_state(SStateId0, S0) of
SState ->
%% Client resubscribed with the same parameters:
{ok, S0};
_ ->
%% Subsription parameters changed:
{SStateId, S1} = emqx_persistent_session_ds_state:new_id(S0),
S2 = emqx_persistent_session_ds_state:put_subscription_state(
SStateId, SState, S1
),
Sub = Sub0#{current_state => SStateId},
S = emqx_persistent_session_ds_state:put_subscription(TopicFilter, Sub, S2),
{ok, S}
end
end.
%% @doc Process UNSUBSCRIBE %% @doc Process UNSUBSCRIBE
-spec on_unsubscribe( -spec on_unsubscribe(
emqx_persistent_session_ds:id(),
emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds:topic_filter(),
emqx_persistent_session_ds:subscription(),
emqx_persistent_session_ds_state:t() emqx_persistent_session_ds_state:t()
) -> ) ->
emqx_persistent_session_ds_state:t(). {ok, emqx_persistent_session_ds_state:t(), emqx_persistent_session_ds:subscription()}
on_unsubscribe(TopicFilter, Subscription0, S0) -> | {error, ?RC_NO_SUBSCRIPTION_EXISTED}.
%% Note: we cannot delete the subscription immediately, since its on_unsubscribe(SessionId, TopicFilter, S0) ->
%% metadata can be used during replay (see `process_batch'). We case lookup(TopicFilter, S0) of
%% instead mark it as deleted, and let `subscription_gc' function undefined ->
%% dispatch it later: {error, ?RC_NO_SUBSCRIPTION_EXISTED};
Subscription = Subscription0#{deleted => true}, Subscription ->
emqx_persistent_session_ds_state:put_subscription(TopicFilter, [], Subscription, S0). ?tp(persistent_session_ds_subscription_delete, #{
session_id => SessionId, topic_filter => TopicFilter
}),
?tp_span(
persistent_session_ds_subscription_route_delete,
#{session_id => SessionId, topic_filter => TopicFilter},
ok = emqx_persistent_session_ds_router:do_delete_route(TopicFilter, SessionId)
),
{ok, emqx_persistent_session_ds_state:del_subscription(TopicFilter, S0), Subscription}
end.
%% @doc Remove subscriptions that have been marked for deletion, and -spec on_session_drop(emqx_persistent_session_ds:id(), emqx_persistent_session_ds_state:t()) -> ok.
%% that don't have any unacked messages: on_session_drop(SessionId, S0) ->
fold(
fun(TopicFilter, _Subscription, S) ->
case on_unsubscribe(SessionId, TopicFilter, S) of
{ok, S1, _} -> S1;
_ -> S
end
end,
S0,
S0
).
%% @doc Remove subscription states that don't have a parent, and that
%% don't have any unacked messages:
-spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t(). -spec gc(emqx_persistent_session_ds_state:t()) -> emqx_persistent_session_ds_state:t().
gc(S0) -> gc(S0) ->
fold_all( %% Create a set of subscription states IDs referenced either by a
fun(TopicFilter, #{id := SubId, deleted := Deleted}, Acc) -> %% subscription or a stream replay state:
case Deleted andalso has_no_unacked_streams(SubId, S0) of AliveSet0 = emqx_persistent_session_ds_state:fold_subscriptions(
true -> fun(_TopicFilter, #{current_state := SStateId}, Acc) ->
emqx_persistent_session_ds_state:del_subscription(TopicFilter, [], Acc); Acc#{SStateId => true}
end,
#{},
S0
),
AliveSet = emqx_persistent_session_ds_state:fold_streams(
fun(_StreamId, SRS = #srs{sub_state_id = SStateId}, Acc) ->
case emqx_persistent_session_ds_stream_scheduler:is_fully_acked(SRS, S0) of
false -> false ->
Acc#{SStateId => true};
true ->
Acc Acc
end end
end, end,
AliveSet0,
S0
),
%% Delete dangling subscription states:
emqx_persistent_session_ds_state:fold_subscription_states(
fun(SStateId, _, S) ->
case maps:is_key(SStateId, AliveSet) of
true ->
S;
false ->
emqx_persistent_session_ds_state:del_subscription_state(SStateId, S)
end
end,
S0, S0,
S0 S0
). ).
@ -82,12 +214,16 @@ gc(S0) ->
-spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) -> -spec lookup(emqx_persistent_session_ds:topic_filter(), emqx_persistent_session_ds_state:t()) ->
emqx_persistent_session_ds:subscription() | undefined. emqx_persistent_session_ds:subscription() | undefined.
lookup(TopicFilter, S) -> lookup(TopicFilter, S) ->
Subs = emqx_persistent_session_ds_state:get_subscriptions(S), case emqx_persistent_session_ds_state:get_subscription(TopicFilter, S) of
case emqx_topic_gbt:lookup(TopicFilter, [], Subs, undefined) of Sub = #{current_state := SStateId} ->
#{deleted := true} -> case emqx_persistent_session_ds_state:get_subscription_state(SStateId, S) of
undefined; #{subopts := SubOpts} ->
Sub -> Sub#{subopts => SubOpts};
Sub undefined ->
undefined
end;
undefined ->
undefined
end. end.
%% @doc Convert active subscriptions to a map, for information %% @doc Convert active subscriptions to a map, for information
@ -95,7 +231,7 @@ lookup(TopicFilter, S) ->
-spec to_map(emqx_persistent_session_ds_state:t()) -> map(). -spec to_map(emqx_persistent_session_ds_state:t()) -> map().
to_map(S) -> to_map(S) ->
fold( fold(
fun(TopicFilter, #{props := Props}, Acc) -> Acc#{TopicFilter => Props} end, fun(TopicFilter, _, Acc) -> Acc#{TopicFilter => lookup(TopicFilter, S)} end,
#{}, #{},
S S
). ).
@ -107,48 +243,29 @@ to_map(S) ->
emqx_persistent_session_ds_state:t() emqx_persistent_session_ds_state:t()
) -> ) ->
Acc. Acc.
fold(Fun, AccIn, S) -> fold(Fun, Acc, S) ->
fold_all( emqx_persistent_session_ds_state:fold_subscriptions(Fun, Acc, S).
fun(TopicFilter, Sub = #{deleted := Deleted}, Acc) ->
case Deleted of
true -> Acc;
false -> Fun(TopicFilter, Sub, Acc)
end
end,
AccIn,
S
).
%% @doc Fold over all subscriptions, including inactive ones: -spec cold_get_subscription(emqx_persistent_session_ds:id(), emqx_types:topic()) ->
-spec fold_all( emqx_persistent_session_ds:subscription() | undefined.
fun((emqx_types:topic(), emqx_persistent_session_ds:subscription(), Acc) -> Acc), cold_get_subscription(SessionId, Topic) ->
Acc, case emqx_persistent_session_ds_state:cold_get_subscription(SessionId, Topic) of
emqx_persistent_session_ds_state:t() [Sub = #{current_state := SStateId}] ->
) -> case
Acc. emqx_persistent_session_ds_state:cold_get_subscription_state(SessionId, SStateId)
fold_all(Fun, AccIn, S) -> of
Subs = emqx_persistent_session_ds_state:get_subscriptions(S), [#{subopts := Subopts}] ->
emqx_topic_gbt:fold( Sub#{subopts => Subopts};
fun(Key, Sub, Acc) -> Fun(emqx_topic_gbt:get_topic(Key), Sub, Acc) end, _ ->
AccIn, undefined
Subs end;
). _ ->
undefined
end.
%%================================================================================ %%================================================================================
%% Internal functions %% Internal functions
%%================================================================================ %%================================================================================
-spec has_no_unacked_streams( now_ms() ->
emqx_persistent_session_ds:subscription_id(), emqx_persistent_session_ds_state:t() erlang:system_time(millisecond).
) -> boolean().
has_no_unacked_streams(SubId, S) ->
emqx_persistent_session_ds_state:fold_streams(
fun
({SID, _Stream}, Srs, Acc) when SID =:= SubId ->
emqx_persistent_session_ds_stream_scheduler:is_fully_acked(Srs, S) andalso Acc;
(_StreamKey, _Srs, Acc) ->
Acc
end,
true,
S
).

View File

@ -429,6 +429,11 @@ enrich_deliver(ClientInfo, {deliver, Topic, Msg}, UpgradeQoS, Session) ->
end, end,
enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS). enrich_message(ClientInfo, Msg, SubOpts, UpgradeQoS).
%% Caution: updating this function _may_ break consistency of replay
%% for persistent sessions. Persistent sessions expect it to return
%% the same result during replay. If it changes the behavior between
%% releases, sessions restored from the cold storage may end up
%% replaying messages with different QoS, etc.
enrich_message( enrich_message(
ClientInfo = #{clientid := ClientId}, ClientInfo = #{clientid := ClientId},
Msg = #message{from = ClientId}, Msg = #message{from = ClientId},

View File

@ -74,9 +74,6 @@ session_id() ->
topic() -> topic() ->
oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]). oneof([<<"foo">>, <<"bar">>, <<"foo/#">>, <<"//+/#">>]).
subid() ->
oneof([[]]).
subscription() -> subscription() ->
oneof([#{}]). oneof([#{}]).
@ -129,18 +126,25 @@ put_req() ->
{Track, Seqno}, {Track, Seqno},
{seqno_track(), seqno()}, {seqno_track(), seqno()},
{#s.seqno, put_seqno, Track, Seqno} {#s.seqno, put_seqno, Track, Seqno}
),
?LET(
{Topic, Subscription},
{topic(), subscription()},
{#s.subs, put_subscription, Topic, Subscription}
) )
]). ]).
get_req() -> get_req() ->
oneof([ oneof([
{#s.streams, get_stream, stream_id()}, {#s.streams, get_stream, stream_id()},
{#s.seqno, get_seqno, seqno_track()} {#s.seqno, get_seqno, seqno_track()},
{#s.subs, get_subscription, topic()}
]). ]).
del_req() -> del_req() ->
oneof([ oneof([
{#s.streams, del_stream, stream_id()} {#s.streams, del_stream, stream_id()},
{#s.subs, del_subscription, topic()}
]). ]).
command(S) -> command(S) ->
@ -153,13 +157,6 @@ command(S) ->
{2, {call, ?MODULE, reopen, [session_id(S)]}}, {2, {call, ?MODULE, reopen, [session_id(S)]}},
{2, {call, ?MODULE, commit, [session_id(S)]}}, {2, {call, ?MODULE, commit, [session_id(S)]}},
%% Subscriptions:
{3,
{call, ?MODULE, put_subscription, [
session_id(S), topic(), subid(), subscription()
]}},
{3, {call, ?MODULE, del_subscription, [session_id(S), topic(), subid()]}},
%% Metadata: %% Metadata:
{3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}}, {3, {call, ?MODULE, put_metadata, [session_id(S), put_metadata()]}},
{3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}}, {3, {call, ?MODULE, get_metadata, [session_id(S), get_metadata()]}},
@ -170,7 +167,6 @@ command(S) ->
{3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}}, {3, {call, ?MODULE, gen_del, [session_id(S), del_req()]}},
%% Getters: %% Getters:
{4, {call, ?MODULE, get_subscriptions, [session_id(S)]}},
{1, {call, ?MODULE, iterate_sessions, [batch_size()]}} {1, {call, ?MODULE, iterate_sessions, [batch_size()]}}
]); ]);
false -> false ->
@ -207,19 +203,6 @@ postcondition(S, {call, ?MODULE, gen_get, [SessionId, {Idx, Fun, Key}]}, Result)
#{session_id => SessionId, key => Key, 'fun' => Fun} #{session_id => SessionId, key => Key, 'fun' => Fun}
), ),
true; true;
postcondition(S, {call, ?MODULE, get_subscriptions, [SessionId]}, Result) ->
#{SessionId := #s{subs = Subs}} = S,
?assertEqual(maps:size(Subs), emqx_topic_gbt:size(Result)),
maps:foreach(
fun({TopicFilter, Id}, Expected) ->
?assertEqual(
Expected,
emqx_topic_gbt:lookup(TopicFilter, Id, Result, default)
)
end,
Subs
),
true;
postcondition(_, _, _) -> postcondition(_, _, _) ->
true. true.
@ -227,22 +210,6 @@ next_state(S, _V, {call, ?MODULE, create_new, [SessionId]}) ->
S#{SessionId => #s{}}; S#{SessionId => #s{}};
next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) -> next_state(S, _V, {call, ?MODULE, delete, [SessionId]}) ->
maps:remove(SessionId, S); maps:remove(SessionId, S);
next_state(S, _V, {call, ?MODULE, put_subscription, [SessionId, TopicFilter, SubId, Subscription]}) ->
Key = {TopicFilter, SubId},
update(
SessionId,
#s.subs,
fun(Subs) -> Subs#{Key => Subscription} end,
S
);
next_state(S, _V, {call, ?MODULE, del_subscription, [SessionId, TopicFilter, SubId]}) ->
Key = {TopicFilter, SubId},
update(
SessionId,
#s.subs,
fun(Subs) -> maps:remove(Key, Subs) end,
S
);
next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) -> next_state(S, _V, {call, ?MODULE, put_metadata, [SessionId, {Key, _Fun, Val}]}) ->
update( update(
SessionId, SessionId,
@ -296,19 +263,6 @@ reopen(SessionId) ->
{ok, S} = emqx_persistent_session_ds_state:open(SessionId), {ok, S} = emqx_persistent_session_ds_state:open(SessionId),
put_state(SessionId, S). put_state(SessionId, S).
put_subscription(SessionId, TopicFilter, SubId, Subscription) ->
S = emqx_persistent_session_ds_state:put_subscription(
TopicFilter, SubId, Subscription, get_state(SessionId)
),
put_state(SessionId, S).
del_subscription(SessionId, TopicFilter, SubId) ->
S = emqx_persistent_session_ds_state:del_subscription(TopicFilter, SubId, get_state(SessionId)),
put_state(SessionId, S).
get_subscriptions(SessionId) ->
emqx_persistent_session_ds_state:get_subscriptions(get_state(SessionId)).
put_metadata(SessionId, {_MetaKey, Fun, Value}) -> put_metadata(SessionId, {_MetaKey, Fun, Value}) ->
S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]), S = apply(emqx_persistent_session_ds_state, Fun, [Value, get_state(SessionId)]),
put_state(SessionId, S). put_state(SessionId, S).

View File

@ -581,7 +581,6 @@ t_write_failure(Config) ->
) )
end), end),
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind( Trace = ?of_kind(
[buffer_worker_flush_nack, buffer_worker_retry_inflight_failed], Trace0 [buffer_worker_flush_nack, buffer_worker_retry_inflight_failed], Trace0
), ),

View File

@ -1929,7 +1929,6 @@ t_bad_attributes(Config) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
ct:pal("trace:\n ~p", [Trace]),
?assertMatch( ?assertMatch(
[ [
#{placeholder := [<<"payload">>, <<"ok">>], value := #{}}, #{placeholder := [<<"payload">>, <<"ok">>], value := #{}},

View File

@ -517,7 +517,6 @@ t_write_failure(Config) ->
ok ok
end, end,
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(buffer_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
?assertMatch([#{result := {error, _}} | _], Trace), ?assertMatch([#{result := {error, _}} | _], Trace),
[#{result := {error, Error}} | _] = Trace, [#{result := {error, Error}} | _] = Trace,

View File

@ -520,7 +520,6 @@ t_write_failure(Config) ->
) )
end), end),
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(buffer_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
?assertMatch([#{result := {error, _}} | _], Trace), ?assertMatch([#{result := {error, _}} | _], Trace),
[#{result := {error, Error}} | _] = Trace, [#{result := {error, Error}} | _] = Trace,

View File

@ -13,7 +13,7 @@ This makes the storage disk requirements very predictable: only the number of _p
DS _backend_ is a callback module that implements `emqx_ds` behavior. DS _backend_ is a callback module that implements `emqx_ds` behavior.
EMQX repository contains the "builtin" backend, implemented in `emqx_ds_replication_layer` module, that uses RocksDB as the main storage. EMQX repository contains the "builtin" backend, implemented in `emqx_ds_replication_layer` module, that uses Raft algorithm for data replication, and RocksDB as the main storage.
Note that builtin backend introduces the concept of **site** to alleviate the problem of changing node names. Note that builtin backend introduces the concept of **site** to alleviate the problem of changing node names.
Site IDs are persistent, and they are randomly generated at the first startup of the node. Site IDs are persistent, and they are randomly generated at the first startup of the node.
@ -95,10 +95,10 @@ Consumption of messages is done in several stages:
# Limitation # Limitation
- Builtin backend currently doesn't replicate data across different sites
- There is no local cache of messages, which may result in transferring the same data multiple times - There is no local cache of messages, which may result in transferring the same data multiple times
# Documentation links # Documentation links
TBD TBD
# Usage # Usage

View File

@ -1747,6 +1747,7 @@ format_channel_info(undefined, {ClientId, PSInfo0 = #{}}, _Opts) ->
format_persistent_session_info(ClientId, PSInfo0) -> format_persistent_session_info(ClientId, PSInfo0) ->
Metadata = maps:get(metadata, PSInfo0, #{}), Metadata = maps:get(metadata, PSInfo0, #{}),
{ProtoName, ProtoVer} = maps:get(protocol, Metadata),
PSInfo1 = maps:with([created_at, expiry_interval], Metadata), PSInfo1 = maps:with([created_at, expiry_interval], Metadata),
CreatedAt = maps:get(created_at, PSInfo1), CreatedAt = maps:get(created_at, PSInfo1),
case Metadata of case Metadata of
@ -1763,7 +1764,12 @@ format_persistent_session_info(ClientId, PSInfo0) ->
connected_at => CreatedAt, connected_at => CreatedAt,
ip_address => IpAddress, ip_address => IpAddress,
is_persistent => true, is_persistent => true,
port => Port port => Port,
heap_size => 0,
mqueue_len => 0,
proto_name => ProtoName,
proto_ver => ProtoVer,
subscriptions_cnt => maps:size(maps:get(subscriptions, PSInfo0, #{}))
}, },
PSInfo = lists:foldl( PSInfo = lists:foldl(
fun result_format_time_fun/2, fun result_format_time_fun/2,

View File

@ -86,7 +86,8 @@ fields(subscription) ->
{qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>, example => 0})}, {qos, hoconsc:mk(emqx_schema:qos(), #{desc => <<"QoS">>, example => 0})},
{nl, hoconsc:mk(integer(), #{desc => <<"No Local">>, example => 0})}, {nl, hoconsc:mk(integer(), #{desc => <<"No Local">>, example => 0})},
{rap, hoconsc:mk(integer(), #{desc => <<"Retain as Published">>, example => 0})}, {rap, hoconsc:mk(integer(), #{desc => <<"Retain as Published">>, example => 0})},
{rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>, example => 0})} {rh, hoconsc:mk(integer(), #{desc => <<"Retain Handling">>, example => 0})},
{durable, hoconsc:mk(boolean(), #{desc => <<"Durable subscription">>, example => false})}
]. ].
parameters() -> parameters() ->
@ -141,6 +142,14 @@ parameters() ->
required => false, required => false,
desc => <<"Shared subscription group name">> desc => <<"Shared subscription group name">>
}) })
},
{
durable,
hoconsc:mk(boolean(), #{
in => query,
required => false,
desc => <<"Filter subscriptions by durability">>
})
} }
]. ].
@ -167,7 +176,8 @@ format(WhichNode, {{Topic, _Subscriber}, SubOpts}) ->
#{ #{
topic => emqx_topic:maybe_format_share(Topic), topic => emqx_topic:maybe_format_share(Topic),
clientid => maps:get(subid, SubOpts, null), clientid => maps:get(subid, SubOpts, null),
node => WhichNode node => WhichNode,
durable => false
}, },
maps:with([qos, nl, rap, rh], SubOpts) maps:with([qos, nl, rap, rh], SubOpts)
). ).
@ -187,7 +197,22 @@ check_match_topic(#{<<"match_topic">> := MatchTopic}) ->
check_match_topic(_) -> check_match_topic(_) ->
ok. ok.
do_subscriptions_query(QString) -> do_subscriptions_query(QString0) ->
{IsDurable, QString} = maps:take(
<<"durable">>, maps:merge(#{<<"durable">> => undefined}, QString0)
),
case emqx_persistent_message:is_persistence_enabled() andalso IsDurable of
false ->
do_subscriptions_query_mem(QString);
true ->
do_subscriptions_query_persistent(QString);
undefined ->
merge_queries(
QString, fun do_subscriptions_query_mem/1, fun do_subscriptions_query_persistent/1
)
end.
do_subscriptions_query_mem(QString) ->
Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2], Args = [?SUBOPTION, QString, ?SUBS_QSCHEMA, fun ?MODULE:qs2ms/2, fun ?MODULE:format/2],
case maps:get(<<"node">>, QString, undefined) of case maps:get(<<"node">>, QString, undefined) of
undefined -> undefined ->
@ -201,8 +226,196 @@ do_subscriptions_query(QString) ->
end end
end. end.
do_subscriptions_query_persistent(#{<<"page">> := Page, <<"limit">> := Limit} = QString) ->
Count = emqx_persistent_session_ds_router:stats(n_routes),
%% TODO: filtering by client ID can be implemented more efficiently:
FilterTopic = maps:get(<<"topic">>, QString, '_'),
Stream0 = emqx_persistent_session_ds_router:stream(FilterTopic),
SubPred = fun(Sub) ->
compare_optional(<<"topic">>, QString, topic, Sub) andalso
compare_optional(<<"clientid">>, QString, clientid, Sub) andalso
compare_optional(<<"qos">>, QString, qos, Sub) andalso
compare_match_topic_optional(<<"match_topic">>, QString, topic, Sub)
end,
NDropped = (Page - 1) * Limit,
{_, Stream} = consume_n_matching(
fun persistent_route_to_subscription/1, SubPred, NDropped, Stream0
),
{Subscriptions, Stream1} = consume_n_matching(
fun persistent_route_to_subscription/1, SubPred, Limit, Stream
),
HasNext = Stream1 =/= [],
Meta =
case maps:is_key(<<"match_topic">>, QString) orelse maps:is_key(<<"qos">>, QString) of
true ->
%% Fuzzy searches shouldn't return count:
#{
limit => Limit,
page => Page,
hasnext => HasNext
};
false ->
#{
count => Count,
limit => Limit,
page => Page,
hasnext => HasNext
}
end,
#{
meta => Meta,
data => Subscriptions
}.
compare_optional(QField, Query, SField, Subscription) ->
case Query of
#{QField := Expected} ->
maps:get(SField, Subscription) =:= Expected;
_ ->
true
end.
compare_match_topic_optional(QField, Query, SField, Subscription) ->
case Query of
#{QField := TopicFilter} ->
Topic = maps:get(SField, Subscription),
emqx_topic:match(Topic, TopicFilter);
_ ->
true
end.
%% @doc Drop elements from the stream until encountered N elements
%% matching the predicate function.
-spec consume_n_matching(
fun((T) -> Q),
fun((Q) -> boolean()),
non_neg_integer(),
emqx_utils_stream:stream(T)
) -> {[Q], emqx_utils_stream:stream(T) | empty}.
consume_n_matching(Map, Pred, N, S) ->
consume_n_matching(Map, Pred, N, S, []).
consume_n_matching(_Map, _Pred, _N, [], Acc) ->
{lists:reverse(Acc), []};
consume_n_matching(_Map, _Pred, 0, S, Acc) ->
{lists:reverse(Acc), S};
consume_n_matching(Map, Pred, N, S0, Acc) ->
case emqx_utils_stream:next(S0) of
[] ->
consume_n_matching(Map, Pred, N, [], Acc);
[Elem | S] ->
Mapped = Map(Elem),
case Pred(Mapped) of
true -> consume_n_matching(Map, Pred, N - 1, S, [Mapped | Acc]);
false -> consume_n_matching(Map, Pred, N, S, Acc)
end
end.
persistent_route_to_subscription(#route{topic = Topic, dest = SessionId}) ->
case emqx_persistent_session_ds:get_client_subscription(SessionId, Topic) of
#{subopts := SubOpts} ->
#{qos := Qos, nl := Nl, rh := Rh, rap := Rap} = SubOpts,
#{
topic => Topic,
clientid => SessionId,
node => all,
qos => Qos,
nl => Nl,
rh => Rh,
rap => Rap,
durable => true
};
undefined ->
#{
topic => Topic,
clientid => SessionId,
node => all,
durable => true
}
end.
%% @private This function merges paginated results from two sources.
%%
%% Note: this implementation is far from ideal: `count' for the
%% queries may be missing, it may be larger than the actual number of
%% elements. This may lead to empty pages that can confuse the user.
%%
%% Not much can be done to mitigate that, though: since the count may
%% be incorrect, we cannot run simple math to determine when one
%% stream begins and another ends: it requires actual iteration.
%%
%% Ideally, the dashboard must be split between durable and mem
%% subscriptions, and this function should be removed for good.
merge_queries(QString0, Q1, Q2) ->
#{<<"limit">> := Limit, <<"page">> := Page} = QString0,
C1 = resp_count(QString0, Q1),
C2 = resp_count(QString0, Q2),
Meta =
case is_number(C1) andalso is_number(C2) of
true ->
#{
count => C1 + C2,
limit => Limit,
page => Page
};
false ->
#{
limit => Limit,
page => Page
}
end,
case {C1, C2} of
{_, 0} ->
%% The second query is empty. Just return the result of Q1 as usual:
Q1(QString0);
{0, _} ->
%% The first query is empty. Just return the result of Q2 as usual:
Q2(QString0);
_ when is_number(C1) ->
%% Both queries are potentially non-empty, but we at least
%% have the page number for the first query. We try to
%% stich the pages together and thus respect the limit
%% (except for the page where the results switch from Q1
%% to Q2).
%% Page where data from the second query is estimated to
%% begin:
Q2Page = ceil(C1 / Limit),
case Page =< Q2Page of
true ->
#{data := Data, meta := #{hasnext := HN}} = Q1(QString0),
#{
data => Data,
meta => Meta#{hasnext => HN orelse C2 > 0}
};
false ->
QString = QString0#{<<"page">> => Page - Q2Page},
#{data := Data, meta := #{hasnext := HN}} = Q2(QString),
#{data => Data, meta => Meta#{hasnext => HN}}
end;
_ ->
%% We don't know how many items is there in the first
%% query, and the second query is not empty (this includes
%% the case where `C2' is `undefined'). Best we can do is
%% to interleave the queries. This may produce less
%% results per page than `Limit'.
QString = QString0#{<<"limit">> => ceil(Limit / 2)},
#{data := D1, meta := #{hasnext := HN1}} = Q1(QString),
#{data := D2, meta := #{hasnext := HN2}} = Q2(QString),
#{
meta => Meta#{hasnext => HN1 or HN2},
data => D1 ++ D2
}
end.
resp_count(Query, QFun) ->
#{meta := Meta} = QFun(Query#{<<"limit">> => 1, <<"page">> => 1}),
maps:get(count, Meta, undefined).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% QueryString to MatchSpec %% QueryString to MatchSpec (mem sessions)
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter(). -spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().

View File

@ -36,17 +36,72 @@
-define(TOPIC_SORT, #{?TOPIC1 => 1, ?TOPIC2 => 2}). -define(TOPIC_SORT, #{?TOPIC1 => 1, ?TOPIC2 => 2}).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). [
{group, mem},
{group, persistent}
].
groups() ->
CommonTCs = emqx_common_test_helpers:all(?MODULE),
[
{mem, CommonTCs},
%% Shared subscriptions are currently not supported:
{persistent, CommonTCs -- [t_list_with_shared_sub, t_subscription_api]}
].
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite(), Apps = emqx_cth_suite:start(
[
{emqx,
"session_persistence {\n"
" enable = true\n"
" renew_streams_interval = 10ms\n"
"}"},
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)).
init_per_group(persistent, Config) ->
ClientConfig = #{
username => ?USERNAME,
clientid => ?CLIENTID,
proto_ver => v5,
clean_start => true,
properties => #{'Session-Expiry-Interval' => 300}
},
[{client_config, ClientConfig}, {durable, true} | Config];
init_per_group(mem, Config) ->
ClientConfig = #{
username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5, clean_start => true
},
[{client_config, ClientConfig}, {durable, false} | Config].
end_per_group(_, Config) ->
Config. Config.
end_per_suite(_) -> init_per_testcase(_TC, Config) ->
emqx_mgmt_api_test_util:end_suite(). case ?config(client_config, Config) of
ClientConfig when is_map(ClientConfig) ->
{ok, Client} = emqtt:start_link(ClientConfig),
{ok, _} = emqtt:connect(Client),
[{client, Client} | Config];
_ ->
Config
end.
end_per_testcase(_TC, Config) ->
Client = proplists:get_value(client, Config),
emqtt:disconnect(Client).
t_subscription_api(Config) -> t_subscription_api(Config) ->
Client = proplists:get_value(client, Config), Client = proplists:get_value(client, Config),
Durable = atom_to_list(?config(durable, Config)),
{ok, _, _} = emqtt:subscribe( {ok, _, _} = emqtt:subscribe(
Client, [ Client, [
{?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]} {?TOPIC1, [{rh, ?TOPIC1RH}, {rap, ?TOPIC1RAP}, {nl, ?TOPIC1NL}, {qos, ?TOPIC1QOS}]}
@ -54,12 +109,13 @@ t_subscription_api(Config) ->
), ),
{ok, _, _} = emqtt:subscribe(Client, ?TOPIC2), {ok, _, _} = emqtt:subscribe(Client, ?TOPIC2),
Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]), Path = emqx_mgmt_api_test_util:api_path(["subscriptions"]),
timer:sleep(100),
{ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path), {ok, Response} = emqx_mgmt_api_test_util:request_api(get, Path),
Data = emqx_utils_json:decode(Response, [return_maps]), Data = emqx_utils_json:decode(Response, [return_maps]),
Meta = maps:get(<<"meta">>, Data), Meta = maps:get(<<"meta">>, Data),
?assertEqual(1, maps:get(<<"page">>, Meta)), ?assertEqual(1, maps:get(<<"page">>, Meta)),
?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)), ?assertEqual(emqx_mgmt:default_row_limit(), maps:get(<<"limit">>, Meta)),
?assertEqual(2, maps:get(<<"count">>, Meta)), ?assertEqual(2, maps:get(<<"count">>, Meta), Data),
Subscriptions = maps:get(<<"data">>, Data), Subscriptions = maps:get(<<"data">>, Data),
?assertEqual(length(Subscriptions), 2), ?assertEqual(length(Subscriptions), 2),
Sort = Sort =
@ -90,7 +146,8 @@ t_subscription_api(Config) ->
{"node", atom_to_list(node())}, {"node", atom_to_list(node())},
{"qos", "0"}, {"qos", "0"},
{"share_group", "test_group"}, {"share_group", "test_group"},
{"match_topic", "t/#"} {"match_topic", "t/#"},
{"durable", Durable}
], ],
Headers = emqx_mgmt_api_test_util:auth_header_(), Headers = emqx_mgmt_api_test_util:auth_header_(),
@ -103,6 +160,7 @@ t_subscription_api(Config) ->
t_subscription_fuzzy_search(Config) -> t_subscription_fuzzy_search(Config) ->
Client = proplists:get_value(client, Config), Client = proplists:get_value(client, Config),
Durable = atom_to_list(?config(durable, Config)),
Topics = [ Topics = [
<<"t/foo">>, <<"t/foo">>,
<<"t/foo/bar">>, <<"t/foo/bar">>,
@ -116,7 +174,8 @@ t_subscription_fuzzy_search(Config) ->
MatchQs = [ MatchQs = [
{"clientid", ?CLIENTID}, {"clientid", ?CLIENTID},
{"node", atom_to_list(node())}, {"node", atom_to_list(node())},
{"match_topic", "t/#"} {"match_topic", "t/#"},
{"durable", Durable}
], ],
MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers), MatchData1 = #{<<"meta">> := MatchMeta1} = request_json(get, MatchQs, Headers),
@ -130,12 +189,13 @@ t_subscription_fuzzy_search(Config) ->
LimitMatchQuery = [ LimitMatchQuery = [
{"clientid", ?CLIENTID}, {"clientid", ?CLIENTID},
{"match_topic", "+/+/+"}, {"match_topic", "+/+/+"},
{"limit", "3"} {"limit", "3"},
{"durable", Durable}
], ],
MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers), MatchData2 = #{<<"meta">> := MatchMeta2} = request_json(get, LimitMatchQuery, Headers),
?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2), ?assertEqual(#{<<"page">> => 1, <<"limit">> => 3, <<"hasnext">> => true}, MatchMeta2),
?assertEqual(3, length(maps:get(<<"data">>, MatchData2))), ?assertEqual(3, length(maps:get(<<"data">>, MatchData2)), MatchData2),
MatchData2P2 = MatchData2P2 =
#{<<"meta">> := MatchMeta2P2} = #{<<"meta">> := MatchMeta2P2} =
@ -176,8 +236,8 @@ t_list_with_shared_sub(_Config) ->
ok. ok.
t_list_with_invalid_match_topic(_Config) -> t_list_with_invalid_match_topic(Config) ->
Client = proplists:get_value(client, _Config), Client = proplists:get_value(client, Config),
RealTopic = <<"t/+">>, RealTopic = <<"t/+">>,
Topic = <<"$share/g1/", RealTopic/binary>>, Topic = <<"$share/g1/", RealTopic/binary>>,
@ -212,12 +272,3 @@ request_json(Method, Query, Headers) when is_list(Query) ->
path() -> path() ->
emqx_mgmt_api_test_util:api_path(["subscriptions"]). emqx_mgmt_api_test_util:api_path(["subscriptions"]).
init_per_testcase(_TC, Config) ->
{ok, Client} = emqtt:start_link(#{username => ?USERNAME, clientid => ?CLIENTID, proto_ver => v5}),
{ok, _} = emqtt:connect(Client),
[{client, Client} | Config].
end_per_testcase(_TC, Config) ->
Client = proplists:get_value(client, Config),
emqtt:disconnect(Client).

View File

@ -3345,7 +3345,6 @@ wait_n_events(NEvents, Timeout, EventName) ->
end. end.
assert_sync_retry_fail_then_succeed_inflight(Trace) -> assert_sync_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]),
?assert( ?assert(
?strict_causality( ?strict_causality(
#{?snk_kind := buffer_worker_flush_nack, ref := _Ref}, #{?snk_kind := buffer_worker_flush_nack, ref := _Ref},
@ -3365,7 +3364,6 @@ assert_sync_retry_fail_then_succeed_inflight(Trace) ->
ok. ok.
assert_async_retry_fail_then_succeed_inflight(Trace) -> assert_async_retry_fail_then_succeed_inflight(Trace) ->
ct:pal(" ~p", [Trace]),
?assert( ?assert(
?strict_causality( ?strict_causality(
#{?snk_kind := handle_async_reply, action := nack}, #{?snk_kind := handle_async_reply, action := nack},

View File

@ -0,0 +1,7 @@
- Ensure consistency of the durable message replay when the subscriptions are modified before session reconnects
- Persistent sessions save inflight packet IDs for the received QoS2 messages
- Make behavior of the persistent sessions consistent with the non-persistent sessions in regard to overlapping subscriptions
- List persistent subscriptions in the REST API