diff --git a/apps/emqx/src/emqx_persistent_session_ds.erl b/apps/emqx/src/emqx_persistent_session_ds.erl index 7d4ab71d6..43a0f1bec 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.erl +++ b/apps/emqx/src/emqx_persistent_session_ds.erl @@ -373,7 +373,7 @@ publish(_PacketId, Msg, Session) -> {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}. puback(_ClientInfo, PacketId, Session0) -> - case commit_seqno(puback, PacketId, Session0) of + case update_seqno(puback, PacketId, Session0) of {ok, Msg, Session} -> {ok, Msg, [], inc_send_quota(Session)}; Error -> @@ -388,7 +388,7 @@ puback(_ClientInfo, PacketId, Session0) -> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}. pubrec(PacketId, Session0) -> - case commit_seqno(pubrec, PacketId, Session0) of + case update_seqno(pubrec, PacketId, Session0) of {ok, Msg, Session} -> {ok, Msg, Session}; Error = {error, _} -> @@ -413,7 +413,7 @@ pubrel(_PacketId, Session = #{}) -> {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}. pubcomp(_ClientInfo, PacketId, Session0) -> - case commit_seqno(pubcomp, PacketId, Session0) of + case update_seqno(pubcomp, PacketId, Session0) of {ok, Msg, Session} -> {ok, Msg, [], inc_send_quota(Session)}; Error = {error, _} -> @@ -540,6 +540,7 @@ sync(ClientId) -> {'DOWN', Ref, process, _Pid, Reason} -> {error, Reason}; Ref -> + demonitor(Ref, [flush]), ok end; [] -> @@ -767,7 +768,7 @@ process_batch( SeqNoQos2 = inc_seqno(?QOS_2, SeqNoQos20) end, { - case Msg#message.qos of + case Qos of ?QOS_0 when IsReplay -> %% We ignore QoS 0 messages during replay: Acc; @@ -895,9 +896,9 @@ bump_interval() -> %% SeqNo tracking %% -------------------------------------------------------------------- --spec commit_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) -> +-spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} | {error, _}. -commit_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> +update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> SeqNo = packet_id_to_seqno(PacketId, S), case Track of puback -> diff --git a/apps/emqx/src/emqx_persistent_session_ds.hrl b/apps/emqx/src/emqx_persistent_session_ds.hrl index 6ab2d4c1f..8286a4e41 100644 --- a/apps/emqx/src/emqx_persistent_session_ds.hrl +++ b/apps/emqx/src/emqx_persistent_session_ds.hrl @@ -31,7 +31,7 @@ %% -----|----------|-----|-----|------> seqno %% | | | | %% committed dup rec next -% (Qos2) +%% (Qos2) %% Seqno becomes committed after receiving PUBACK for QoS1 or PUBCOMP %% for QoS2. @@ -41,23 +41,26 @@ %% committed..dup range are retransmitted with DUP flag. %% -define(dup(QOS), (10 + QOS)). +%% Rec flag is specific for the QoS2. It contains seqno of the last +%% PUBREC received from the client. When the session reconnects, +%% PUBREL packages for the dup..rec range are retransmitted. -define(rec, 22). -%% Last seqno assigned to a message. +%% Last seqno assigned to a message (it may not be sent yet). -define(next(QOS), (30 + QOS)). %%%%% State of the stream: -record(ifs, { rank_x :: emqx_ds:rank_x(), rank_y :: emqx_ds:rank_y(), - %% Iterator at the beginning and end of the last batch: + %% Iterators at the beginning and the end of the last batch: it_begin :: emqx_ds:iterator() | undefined, it_end :: emqx_ds:iterator() | end_of_stream, - %% Key that points at the beginning of the batch: + %% Size of the last batch: batch_size = 0 :: non_neg_integer(), - %% Session sequence number at the time when the batch was fetched: + %% Session sequence numbers at the time when the batch was fetched: first_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(), first_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno(), - %% Number of messages collected in the last batch: + %% Sequence numbers that have to be committed for the batch: last_seqno_qos1 = 0 :: emqx_persistent_session_ds:seqno(), last_seqno_qos2 = 0 :: emqx_persistent_session_ds:seqno() }). diff --git a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl index 4ff420eb8..66f5c9b4e 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl @@ -126,7 +126,7 @@ gc_loop(MinLastAlive, It0) -> do_gc(SessionId, MinLastAlive, LastAliveAt, EI) when LastAliveAt + EI < MinLastAlive -> emqx_persistent_session_ds:destroy_session(SessionId), - ?tp(error, ds_session_gc_cleaned, #{ + ?tp(debug, ds_session_gc_cleaned, #{ session_id => SessionId, last_alive_at => LastAliveAt, expiry_interval => EI, diff --git a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl index 2938222e9..a769fce64 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_inflight.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_inflight.erl @@ -18,9 +18,6 @@ %% API: -export([new/1, push/2, pop/1, n_buffered/2, n_inflight/1, inc_send_quota/1, receive_maximum/1]). -%% behavior callbacks: --export([]). - %% internal exports: -export([]). diff --git a/apps/emqx/src/emqx_persistent_session_ds_state.erl b/apps/emqx/src/emqx_persistent_session_ds_state.erl index fbd4fcc22..57f2316b7 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_state.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_state.erl @@ -68,9 +68,10 @@ %% It should be possible to make frequent changes to the pmap without %% stressing Mria. %% -%% It's implemented as three maps: `clean', `dirty' and `tombstones'. -%% Updates are made to the `dirty' area. `pmap_commit' function saves -%% the updated entries to Mnesia and moves them to the `clean' area. +%% It's implemented as two maps: `cache', and `dirty'. `cache' stores +%% the data, and `dirty' contains information about dirty and deleted +%% keys. When `commit/1' is called, dirty keys are dumped to the +%% tables, and deleted keys are removed from the tables. -record(pmap, {table, cache, dirty}). -type pmap(K, V) :: @@ -530,9 +531,9 @@ kv_pmap_persist(Tab, SessionId, Key, Val0) -> mnesia:write(Tab, #kv{k = {SessionId, Key}, v = Val}, write). kv_pmap_restore(Table, SessionId) -> - MS = [{#kv{k = {SessionId, '_'}, _ = '_'}, [], ['$_']}], + MS = [{#kv{k = {SessionId, '$1'}, v = '$2'}, [], [{{'$1', '$2'}}]}], Objs = mnesia:select(Table, MS, read), - [{K, encoder(decode, Table, V)} || #kv{k = {_, K}, v = V} <- Objs]. + [{K, encoder(decode, Table, V)} || {K, V} <- Objs]. kv_pmap_delete(Table, SessionId) -> MS = [{#kv{k = {SessionId, '$1'}, _ = '_'}, [], ['$1']}], diff --git a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl index e0de96454..621355005 100644 --- a/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl +++ b/apps/emqx/src/emqx_persistent_session_ds_stream_scheduler.erl @@ -142,7 +142,7 @@ ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> case emqx_persistent_session_ds_state:get_stream(Key, S) of undefined -> ?SLOG(debug, #{ - '$msg' => new_stream, key => Key, stream => Stream + msg => new_stream, key => Key, stream => Stream }), {ok, Iterator} = emqx_ds:make_iterator( ?PERSISTENT_MESSAGE_DB, Stream, TopicFilter, StartTime diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl index 008fc177c..3fc76d4b5 100644 --- a/apps/emqx/test/emqx_persistent_session_SUITE.erl +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -554,7 +554,7 @@ t_process_dies_session_expires(Config) -> ok = publish(Topic, Payload), - timer:sleep(1100), + timer:sleep(2000), {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, diff --git a/changes/ce/feat-12251.en.md b/changes/ce/feat-12251.en.md new file mode 100644 index 000000000..a206288b5 --- /dev/null +++ b/changes/ce/feat-12251.en.md @@ -0,0 +1,7 @@ +Optimize performance of the RocksDB-based persistent session. +Reduce RAM usage and frequency of database requests. + +- Introduce dirty session state to avoid frequent mria transactions +- Introduce an intermediate buffer for the persistent messages +- Use separate tracks of PacketIds for QoS1 and QoS2 messages +- Limit the number of continuous ranges of infligtht messages to one per stream