diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 0f1e77370..4517fa1b7 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -184,7 +184,9 @@ seqno_q2_dup, seqno_q2_rec, seqno_q2_next, - n_streams + n_streams, + awaiting_rel_cnt, + awaiting_rel_max ]). %% @@ -206,7 +208,8 @@ open(#{clientid := ClientID} = ClientInfo, ConnInfo, MaybeWillMsg, Conf) -> ok = emqx_cm:takeover_kick(ClientID), case session_open(ClientID, ClientInfo, ConnInfo, MaybeWillMsg) of Session0 = #{} -> - Session = Session0#{props => Conf}, + Session1 = Session0#{props => Conf}, + Session = do_expire(ClientInfo, Session1), {true, ensure_timers(Session), []}; false -> false @@ -262,21 +265,21 @@ info(inflight_max, #{inflight := Inflight}) -> emqx_persistent_session_ds_inflight:receive_maximum(Inflight); info(retry_interval, #{props := Conf}) -> maps:get(retry_interval, Conf); -% info(mqueue, #sessmem{mqueue = MQueue}) -> -% MQueue; info(mqueue_len, #{inflight := Inflight}) -> emqx_persistent_session_ds_inflight:n_buffered(all, Inflight); -% info(mqueue_max, #sessmem{mqueue = MQueue}) -> -% emqx_mqueue:max_len(MQueue); info(mqueue_dropped, _Session) -> 0; %% info(next_pkt_id, #{s := S}) -> %% {PacketId, _} = emqx_persistent_message_ds_replayer:next_packet_id(S), %% PacketId; -% info(awaiting_rel, #sessmem{awaiting_rel = AwaitingRel}) -> -% AwaitingRel; -%% info(awaiting_rel_cnt, #{s := S}) -> -%% seqno_diff(?QOS_2, ?rec, ?committed(?QOS_2), S); +info(awaiting_rel, #{s := S}) -> + emqx_persistent_session_ds_state:fold_awaiting_rel(fun maps:put/3, #{}, S); +info(awaiting_rel_max, #{props := Conf}) -> + maps:get(max_awaiting_rel, Conf); +info(awaiting_rel_cnt, #{s := S}) -> + emqx_persistent_session_ds_state:n_awaiting_rel(S); +info(await_rel_timeout, #{props := Conf}) -> + maps:get(await_rel_timeout, Conf); info(seqno_q1_comm, #{s := S}) -> emqx_persistent_session_ds_state:get_seqno(?committed(?QOS_1), S); info(seqno_q1_dup, #{s := S}) -> @@ -292,17 +295,7 @@ info(seqno_q2_rec, #{s := S}) -> info(seqno_q2_next, #{s := S}) -> emqx_persistent_session_ds_state:get_seqno(?next(?QOS_2), S); info(n_streams, #{s := S}) -> - emqx_persistent_session_ds_state:fold_streams( - fun(_, _, Acc) -> Acc + 1 end, - 0, - S - ); -info(awaiting_rel_max, #{props := Conf}) -> - maps:get(max_awaiting_rel, Conf); -info(await_rel_timeout, #{props := _Conf}) -> - %% TODO: currently this setting is ignored: - %% maps:get(await_rel_timeout, Conf). - 0; + emqx_persistent_session_ds_state:n_streams(S); info({MsgsQ, _PagerParams}, _Session) when MsgsQ =:= mqueue_msgs; MsgsQ =:= inflight_msgs -> {error, not_implemented}. @@ -446,11 +439,72 @@ get_subscription(TopicFilter, #{s := S}) -> -spec publish(emqx_types:packet_id(), emqx_types:message(), session()) -> {ok, emqx_types:publish_result(), session()} | {error, emqx_types:reason_code()}. +publish( + PacketId, + Msg = #message{qos = ?QOS_2, timestamp = Ts}, + Session = #{s := S0} +) -> + case is_awaiting_full(Session) of + false -> + case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of + undefined -> + Results = emqx_broker:publish(Msg), + S = emqx_persistent_session_ds_state:put_awaiting_rel(PacketId, Ts, S0), + {ok, Results, Session#{s => S}}; + _Ts -> + {error, ?RC_PACKET_IDENTIFIER_IN_USE} + end; + true -> + {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} + end; publish(_PacketId, Msg, Session) -> - %% TODO: QoS2 Result = emqx_broker:publish(Msg), {ok, Result, Session}. +is_awaiting_full(#{s := S, props := Props}) -> + emqx_persistent_session_ds_state:n_awaiting_rel(S) >= + maps:get(max_awaiting_rel, Props, infinity). + +-spec expire(emqx_types:clientinfo(), session()) -> + {ok, [], timeout(), session()} | {ok, [], session()}. +expire(ClientInfo, Session0 = #{props := Props}) -> + Session = #{s := S} = do_expire(ClientInfo, Session0), + case emqx_persistent_session_ds_state:n_awaiting_rel(S) of + 0 -> + {ok, [], Session}; + _ -> + AwaitRelTimeout = maps:get(await_rel_timeout, Props), + {ok, [], AwaitRelTimeout, Session} + end. + +do_expire(ClientInfo, Session = #{s := S0, props := Props}) -> + %% 1. Find expired packet IDs: + Now = erlang:system_time(millisecond), + AwaitRelTimeout = maps:get(await_rel_timeout, Props), + ExpiredPacketIds = + emqx_persistent_session_ds_state:fold_awaiting_rel( + fun(PacketId, Ts, Acc) -> + Age = Now - Ts, + case Age > AwaitRelTimeout of + true -> + [PacketId | Acc]; + false -> + Acc + end + end, + [], + S0 + ), + %% 2. Perform side effects: + _ = emqx_session_events:handle_event(ClientInfo, {expired_rel, length(ExpiredPacketIds)}), + %% 3. Update state: + S = lists:foldl( + fun emqx_persistent_session_ds_state:del_awaiting_rel/2, + S0, + ExpiredPacketIds + ), + Session#{s => S}. + %%-------------------------------------------------------------------- %% Client -> Broker: PUBACK %%-------------------------------------------------------------------- @@ -487,9 +541,14 @@ pubrec(PacketId, Session0) -> -spec pubrel(emqx_types:packet_id(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}. -pubrel(_PacketId, Session = #{}) -> - % TODO: stub - {ok, Session}. +pubrel(PacketId, Session = #{s := S0}) -> + case emqx_persistent_session_ds_state:get_awaiting_rel(PacketId, S0) of + undefined -> + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}; + _TS -> + S = emqx_persistent_session_ds_state:del_awaiting_rel(PacketId, S0), + {ok, Session#{s => S}} + end. %%-------------------------------------------------------------------- %% Client -> Broker: PUBCOMP @@ -562,6 +621,8 @@ handle_timeout(_ClientInfo, #req_sync{from = From, ref = Ref}, Session = #{s := S = emqx_persistent_session_ds_state:commit(S0), From ! Ref, {ok, [], Session#{s => S}}; +handle_timeout(ClientInfo, expire_awaiting_rel, Session) -> + expire(ClientInfo, Session); handle_timeout(_ClientInfo, Timeout, Session) -> ?SLOG(warning, #{msg => "unknown_ds_timeout", timeout => Timeout}), {ok, [], Session}. diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index 28297964d..fc2da1317 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -34,10 +34,17 @@ -export([get_will_message/1, set_will_message/2, clear_will_message/1, clear_will_message_now/1]). -export([get_peername/1, set_peername/2]). -export([new_id/1]). --export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3]). +-export([get_stream/2, put_stream/3, del_stream/2, fold_streams/3, n_streams/1]). -export([get_seqno/2, put_seqno/3]). -export([get_rank/2, put_rank/3, del_rank/2, fold_ranks/3]). -export([get_subscriptions/1, put_subscription/4, del_subscription/3]). +-export([ + get_awaiting_rel/2, + put_awaiting_rel/3, + del_awaiting_rel/2, + fold_awaiting_rel/3, + n_awaiting_rel/1 +]). -export([make_session_iterator/0, session_iterator_next/2]). @@ -117,7 +124,8 @@ subscriptions := subscriptions(), seqnos := pmap(seqno_type(), emqx_persistent_session_ds:seqno()), streams := pmap(emqx_ds:stream(), emqx_persistent_session_ds:stream_state()), - ranks := pmap(term(), integer()) + ranks := pmap(term(), integer()), + awaiting_rel := pmap(emqx_types:packet_id(), _Timestamp :: integer()) }. -define(session_tab, emqx_ds_session_tab). @@ -125,7 +133,8 @@ -define(stream_tab, emqx_ds_session_streams). -define(seqno_tab, emqx_ds_session_seqnos). -define(rank_tab, emqx_ds_session_ranks). --define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab]). +-define(awaiting_rel_tab, emqx_ds_session_awaiting_rel). +-define(pmap_tables, [?stream_tab, ?seqno_tab, ?rank_tab, ?subscription_tab, ?awaiting_rel_tab]). %% Enable this flag if you suspect some code breaks the sequence: -ifndef(CHECK_SEQNO). @@ -167,6 +176,7 @@ open(SessionId) -> streams => pmap_open(?stream_tab, SessionId), seqnos => pmap_open(?seqno_tab, SessionId), ranks => pmap_open(?rank_tab, SessionId), + awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId), ?unset_dirty }, {ok, Rec}; @@ -190,7 +200,8 @@ format(#{ subscriptions := SubsGBT, streams := Streams, seqnos := Seqnos, - ranks := Ranks + ranks := Ranks, + awaiting_rel := AwaitingRel }) -> Subs = emqx_topic_gbt:fold( fun(Key, Sub, Acc) -> @@ -204,7 +215,8 @@ format(#{ subscriptions => Subs, streams => pmap_format(Streams), seqnos => pmap_format(Seqnos), - ranks => pmap_format(Ranks) + ranks => pmap_format(Ranks), + awaiting_rel => pmap_format(AwaitingRel) }. -spec list_sessions() -> [emqx_persistent_session_ds:id()]. @@ -229,7 +241,8 @@ commit( metadata := Metadata, streams := Streams, seqnos := SeqNos, - ranks := Ranks + ranks := Ranks, + awaiting_rel := AwaitingRel } ) -> check_sequence(Rec), @@ -239,6 +252,7 @@ commit( streams => pmap_commit(SessionId, Streams), seqnos => pmap_commit(SessionId, SeqNos), ranks => pmap_commit(SessionId, Ranks), + awaiting_rel => pmap_commit(SessionId, AwaitingRel), ?unset_dirty } end). @@ -254,6 +268,7 @@ create_new(SessionId) -> streams => pmap_open(?stream_tab, SessionId), seqnos => pmap_open(?seqno_tab, SessionId), ranks => pmap_open(?rank_tab, SessionId), + awaiting_rel => pmap_open(?awaiting_rel_tab, SessionId), ?set_dirty } end). @@ -382,6 +397,10 @@ del_stream(Key, Rec) -> fold_streams(Fun, Acc, Rec) -> gen_fold(streams, Fun, Acc, Rec). +-spec n_streams(t()) -> non_neg_integer(). +n_streams(Rec) -> + gen_size(streams, Rec). + %% -spec get_seqno(seqno_type(), t()) -> emqx_persistent_session_ds:seqno() | undefined. @@ -412,6 +431,30 @@ del_rank(Key, Rec) -> fold_ranks(Fun, Acc, Rec) -> gen_fold(ranks, Fun, Acc, Rec). +%% + +-spec get_awaiting_rel(emqx_types:packet_id(), t()) -> integer() | undefined. +get_awaiting_rel(Key, Rec) -> + gen_get(awaiting_rel, Key, Rec). + +-spec put_awaiting_rel(emqx_types:packet_id(), _Timestamp :: integer(), t()) -> t(). +put_awaiting_rel(Key, Val, Rec) -> + gen_put(awaiting_rel, Key, Val, Rec). + +-spec del_awaiting_rel(emqx_types:packet_id(), t()) -> t(). +del_awaiting_rel(Key, Rec) -> + gen_del(awaiting_rel, Key, Rec). + +-spec fold_awaiting_rel(fun(), Acc, t()) -> Acc. +fold_awaiting_rel(Fun, Acc, Rec) -> + gen_fold(awaiting_rel, Fun, Acc, Rec). + +-spec n_awaiting_rel(t()) -> non_neg_integer(). +n_awaiting_rel(Rec) -> + gen_size(awaiting_rel, Rec). + +%% + -spec make_session_iterator() -> session_iterator(). make_session_iterator() -> mnesia:dirty_first(?session_tab). @@ -475,6 +518,10 @@ gen_del(Field, Key, Rec) -> Rec#{?set_dirty} ). +gen_size(Field, Rec) -> + check_sequence(Rec), + pmap_size(maps:get(Field, Rec)). + %% read_subscriptions(SessionId) -> @@ -547,6 +594,10 @@ pmap_commit( pmap_format(#pmap{cache = Cache}) -> Cache. +-spec pmap_size(pmap(_K, _V)) -> non_neg_integer(). +pmap_size(#pmap{cache = Cache}) -> + maps:size(Cache). + %% Functions dealing with set tables: kv_persist(Tab, SessionId, Val0) -> diff --git a/apps/emqx/src/emqx_persistent_session_ds_subs.erl b/apps/emqx/src/emqx_persistent_session_ds_subs.erl index 92f17b108..9071ad9d9 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_subs.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_subs.erl @@ -24,7 +24,15 @@ -module(emqx_persistent_session_ds_subs). %% API: --export([on_subscribe/3, on_unsubscribe/3, gc/1, lookup/2, to_map/1, fold/3, fold_all/3]). +-export([ + on_subscribe/3, + on_unsubscribe/3, + gc/1, + lookup/2, + to_map/1, + fold/3, + fold_all/3 +]). -export_type([]).