diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 654451561..297c7d857 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -141,12 +141,7 @@ session_open(ClientID) -> %% during session GC -spec session_drop(emqx_types:clientid()) -> ok. session_drop(ClientID) -> - {atomic, ok} = mria:transaction( - ?DS_SHARD, - fun() -> - mnesia:delete({?SESSION_TAB, ClientID}) - end - ), + ok = mria:dirty_delete({?SESSION_TAB, ClientID}), ok. %% @doc Called when a client disconnects. This function terminates all @@ -158,38 +153,32 @@ session_suspend(_SessionId) -> %% @doc Called when a client subscribes to a topic. Idempotent. -spec session_add_iterator(session_id(), emqx_topic:words()) -> - {ok, iterator_id(), time(), _IsNew :: boolean()} | {error, session_not_found}. + {ok, iterator_id(), time(), _IsNew :: boolean()}. session_add_iterator(DSSessionId, TopicFilter) -> - {atomic, Ret} = - mria:transaction( - ?DS_SHARD, - fun() -> - case mnesia:wread({?SESSION_TAB, DSSessionId}) of - [] -> - {error, session_not_found}; - [#session{iterators = #{TopicFilter := IteratorId}}] -> - StartMS = get_start_ms(IteratorId, DSSessionId), - ?tp( - ds_session_subscription_present, - #{iterator_id => IteratorId, session_id => DSSessionId} - ), - IsNew = false, - {ok, IteratorId, StartMS, IsNew}; - [#session{iterators = Iterators0} = Session0] -> - {IteratorId, StartMS} = new_iterator_id(DSSessionId), - Iterators = Iterators0#{TopicFilter => IteratorId}, - Session = Session0#session{iterators = Iterators}, - mnesia:write(?SESSION_TAB, Session, write), - ?tp( - ds_session_subscription_added, - #{iterator_id => IteratorId, session_id => DSSessionId} - ), - IsNew = true, - {ok, IteratorId, StartMS, IsNew} - end - end - ), - Ret. + IteratorRefId = {DSSessionId, TopicFilter}, + case mnesia:dirty_read(?ITERATOR_REF_TAB, IteratorRefId) of + [] -> + {IteratorId, StartMS} = new_iterator_id(DSSessionId), + IteratorRef = #iterator_ref{ + ref_id = IteratorRefId, + it_id = IteratorId, + start_time = StartMS + }, + ok = mria:dirty_write(?ITERATOR_REF_TAB, IteratorRef), + ?tp( + ds_session_subscription_added, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = true, + {ok, IteratorId, StartMS, IsNew}; + [#iterator_ref{it_id = IteratorId, start_time = StartMS}] -> + ?tp( + ds_session_subscription_present, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = false, + {ok, IteratorId, StartMS, IsNew} + end. %% @doc Called when a client unsubscribes from a topic. Returns `true' %% if the session contained the subscription or `false' if it wasn't @@ -231,10 +220,5 @@ iterator_stats() -> -spec new_iterator_id(session_id()) -> {iterator_id(), time()}. new_iterator_id(DSSessionId) -> NowMS = erlang:system_time(microsecond), - NowMSBin = integer_to_binary(NowMS), - {<>, NowMS}. - --spec get_start_ms(iterator_id(), emqx_session:session_id()) -> time(). -get_start_ms(IteratorId, SessionId) -> - <> = IteratorId, - binary_to_integer(StartMSBin). + IteratorId = <>, + {IteratorId, NowMS}. diff --git a/apps/emqx_durable_storage/src/emqx_ds_app.erl b/apps/emqx_durable_storage/src/emqx_ds_app.erl index 216e979ee..cbcdb0b8c 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_app.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_app.erl @@ -25,7 +25,18 @@ init_mnesia() -> {record_name, session}, {attributes, record_info(fields, session)} ] - ). + ), + ok = mria:create_table( + ?ITERATOR_REF_TAB, + [ + {rlog_shard, ?DS_SHARD}, + {type, ordered_set}, + {storage, storage()}, + {record_name, iterator_ref}, + {attributes, record_info(fields, iterator_ref)} + ] + ), + ok. storage() -> case mria:rocksdb_backend_available() of diff --git a/apps/emqx_durable_storage/src/emqx_ds_int.hrl b/apps/emqx_durable_storage/src/emqx_ds_int.hrl index fa11a6600..55223068f 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_int.hrl +++ b/apps/emqx_durable_storage/src/emqx_ds_int.hrl @@ -17,6 +17,7 @@ -define(EMQX_DS_HRL, true). -define(SESSION_TAB, emqx_ds_session). +-define(ITERATOR_REF_TAB, emqx_ds_iterator_ref). -define(DS_SHARD, emqx_ds_shard). -record(session, { @@ -24,4 +25,10 @@ iterators :: #{emqx_topic:words() => emqx_ds:iterator_id()} }). +-record(iterator_ref, { + ref_id :: {emqx_ds:session_id(), emqx_topic:words()}, + it_id :: emqx_ds:iterator_id(), + start_time :: emqx_ds:time() +}). + -endif.