diff --git a/apps/emqx_durable_storage/src/emqx_ds.erl b/apps/emqx_durable_storage/src/emqx_ds.erl index 7ec9f3801..889e7ea24 100644 --- a/apps/emqx_durable_storage/src/emqx_ds.erl +++ b/apps/emqx_durable_storage/src/emqx_ds.erl @@ -136,20 +136,29 @@ message_stats() -> %% the broker. -spec session_open(emqx_types:clientid()) -> {_New :: boolean(), session_id()}. session_open(ClientID) -> - case mnesia:dirty_read(?SESSION_TAB, ClientID) of - [#session{}] -> - {false, ClientID}; - [] -> - Session = #session{id = ClientID}, - mria:dirty_write(?SESSION_TAB, Session), - {true, ClientID} - end. + {atomic, Res} = + mria:transaction(?DS_SHARD, fun() -> + case mnesia:read(?SESSION_TAB, ClientID, write) of + [#session{}] -> + {false, ClientID}; + [] -> + Session = #session{id = ClientID}, + mnesia:write(?SESSION_TAB, Session, write), + {true, ClientID} + end + end), + Res. %% @doc Called when a client reconnects with `clean session=true' or %% during session GC -spec session_drop(emqx_types:clientid()) -> ok. session_drop(ClientID) -> - ok = mria:dirty_delete({?SESSION_TAB, ClientID}), + {atomic, ok} = mria:transaction( + ?DS_SHARD, + fun() -> + mnesia:delete({?SESSION_TAB, ClientID}) + end + ), ok. %% @doc Called when a client disconnects. This function terminates all @@ -164,29 +173,33 @@ session_suspend(_SessionId) -> {ok, iterator_id(), time(), _IsNew :: boolean()}. session_add_iterator(DSSessionId, TopicFilter) -> IteratorRefId = {DSSessionId, TopicFilter}, - case mnesia:dirty_read(?ITERATOR_REF_TAB, IteratorRefId) of - [] -> - {IteratorId, StartMS} = new_iterator_id(DSSessionId), - IteratorRef = #iterator_ref{ - ref_id = IteratorRefId, - it_id = IteratorId, - start_time = StartMS - }, - ok = mria:dirty_write(?ITERATOR_REF_TAB, IteratorRef), - ?tp( - ds_session_subscription_added, - #{iterator_id => IteratorId, session_id => DSSessionId} - ), - IsNew = true, - {ok, IteratorId, StartMS, IsNew}; - [#iterator_ref{it_id = IteratorId, start_time = StartMS}] -> - ?tp( - ds_session_subscription_present, - #{iterator_id => IteratorId, session_id => DSSessionId} - ), - IsNew = false, - {ok, IteratorId, StartMS, IsNew} - end. + {atomic, Res} = + mria:transaction(?DS_SHARD, fun() -> + case mnesia:read(?ITERATOR_REF_TAB, IteratorRefId, write) of + [] -> + {IteratorId, StartMS} = new_iterator_id(DSSessionId), + IteratorRef = #iterator_ref{ + ref_id = IteratorRefId, + it_id = IteratorId, + start_time = StartMS + }, + ok = mnesia:write(?ITERATOR_REF_TAB, IteratorRef, write), + ?tp( + ds_session_subscription_added, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = true, + {ok, IteratorId, StartMS, IsNew}; + [#iterator_ref{it_id = IteratorId, start_time = StartMS}] -> + ?tp( + ds_session_subscription_present, + #{iterator_id => IteratorId, session_id => DSSessionId} + ), + IsNew = false, + {ok, IteratorId, StartMS, IsNew} + end + end), + Res. %% @doc Called when a client unsubscribes from a topic. Returns `true' %% if the session contained the subscription or `false' if it wasn't