diff --git a/apps/emqx_sn/src/emqx_sn_registry.erl b/apps/emqx_sn/src/emqx_sn_registry.erl index 4a3b22585..4d2e656b3 100644 --- a/apps/emqx_sn/src/emqx_sn_registry.erl +++ b/apps/emqx_sn/src/emqx_sn_registry.erl @@ -44,6 +44,8 @@ , code_change/3 ]). +-define(SN_SHARD, emqx_sn_shard). + -define(TAB, ?MODULE). -record(state, {max_predef_topic_id = 0}). @@ -56,6 +58,7 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). +-rlog_shard({?SN_SHARD, ?TAB}). %% @doc Create or replicate tables. -spec(mnesia(boot | copy) -> ok). @@ -74,6 +77,7 @@ mnesia(copy) -> -spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). start_link(PredefTopics) -> + ekka_mnesia:wait_for_shards([?SN_SHARD], infinity), gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []). -spec(stop() -> ok). @@ -129,10 +133,10 @@ init([PredefTopics]) -> %% {ClientId, TopicName} -> TopicId MaxPredefId = lists:foldl( fun({TopicId, TopicName}, AccId) -> - mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, - value = TopicName}), - mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, - value = TopicId}), + ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, + value = TopicName}), + ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, + value = TopicId}), if TopicId > AccId -> TopicId; true -> AccId end end, 0, PredefTopics), {ok, #state{max_predef_topic_id = MaxPredefId}}. @@ -157,7 +161,7 @@ handle_call({register, ClientId, TopicName}, _From, mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId}, value = TopicName}) end, - case mnesia:transaction(Fun) of + case ekka_mnesia:transaction(?SN_SHARD, Fun) of {atomic, ok} -> {reply, TopicId, State}; {aborted, Error} -> @@ -168,7 +172,7 @@ handle_call({register, ClientId, TopicName}, _From, handle_call({unregister, ClientId}, _From, State) -> Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}), - lists:foreach(fun(R) -> mnesia:dirty_delete_object(R) end, Registry), + lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(R) end, Registry), {reply, ok, State}; handle_call(Req, _From, State) -> diff --git a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl index 8d320d8ed..58a458ecc 100644 --- a/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_registry_SUITE.erl @@ -41,9 +41,10 @@ end_per_suite(_Config) -> ok. init_per_testcase(_TestCase, Config) -> + application:set_env(ekka, strict_mode, true), ekka_mnesia:start(), emqx_sn_registry:mnesia(boot), - mnesia:clear_table(emqx_sn_registry), + ekka_mnesia:clear_table(emqx_sn_registry), PredefTopics = application:get_env(emqx_sn, predefined, []), {ok, _Pid} = ?REGISTRY:start_link(PredefTopics), Config. @@ -118,4 +119,3 @@ register_a_lot(N, Max) when N < Max -> Topic = iolist_to_binary(["Topic", integer_to_list(N)]), ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)), register_a_lot(N+1, Max). -