diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 6086ec7d6..448aa8ad5 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -46,6 +46,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + do_register/4 +]). + -export([lookup_name/1]). -define(SN_SHARD, emqx_sn_shard). @@ -173,33 +178,11 @@ handle_call( TopicId when TopicId >= 16#FFFF -> {reply, {error, too_large}, State}; TopicId -> - Fun = fun() -> - mnesia:write( - Tab, - #emqx_sn_registry{ - key = {ClientId, next_topic_id}, - value = TopicId + 1 - }, - write - ), - mnesia:write( - Tab, - #emqx_sn_registry{ - key = {ClientId, TopicName}, - value = TopicId - }, - write - ), - mnesia:write( - Tab, - #emqx_sn_registry{ - key = {ClientId, TopicId}, - value = TopicName - }, - write - ) - end, - case mria:transaction(?SN_SHARD, Fun) of + case + mria:transaction(?SN_SHARD, fun ?MODULE:do_register/4, [ + Tab, ClientId, TopicId, TopicName + ]) + of {atomic, ok} -> {reply, TopicId, State}; {aborted, Error} -> @@ -248,6 +231,32 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +do_register(Tab, ClientId, TopicId, TopicName) -> + mnesia:write( + Tab, + #emqx_sn_registry{ + key = {ClientId, next_topic_id}, + value = TopicId + 1 + }, + write + ), + mnesia:write( + Tab, + #emqx_sn_registry{ + key = {ClientId, TopicName}, + value = TopicId + }, + write + ), + mnesia:write( + Tab, + #emqx_sn_registry{ + key = {ClientId, TopicId}, + value = TopicName + }, + write + ). + %%----------------------------------------------------------------------------- next_topic_id(Tab, PredefId, ClientId) ->