chore(emqx_sn): Add SN shard

This commit is contained in:
k32 2021-06-28 12:59:35 +02:00
parent ca1b789ef6
commit 98739224f6
2 changed files with 12 additions and 8 deletions

View File

@ -44,6 +44,8 @@
, code_change/3 , code_change/3
]). ]).
-define(SN_SHARD, emqx_sn_shard).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-record(state, {max_predef_topic_id = 0}). -record(state, {max_predef_topic_id = 0}).
@ -56,6 +58,7 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
-rlog_shard({?SN_SHARD, ?TAB}).
%% @doc Create or replicate tables. %% @doc Create or replicate tables.
-spec(mnesia(boot | copy) -> ok). -spec(mnesia(boot | copy) -> ok).
@ -74,6 +77,7 @@ mnesia(copy) ->
-spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}). -spec(start_link(list()) -> {ok, pid()} | ignore | {error, Reason :: term()}).
start_link(PredefTopics) -> start_link(PredefTopics) ->
ekka_mnesia:wait_for_shards([?SN_SHARD], infinity),
gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [PredefTopics], []).
-spec(stop() -> ok). -spec(stop() -> ok).
@ -129,10 +133,10 @@ init([PredefTopics]) ->
%% {ClientId, TopicName} -> TopicId %% {ClientId, TopicName} -> TopicId
MaxPredefId = lists:foldl( MaxPredefId = lists:foldl(
fun({TopicId, TopicName}, AccId) -> fun({TopicId, TopicName}, AccId) ->
mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId}, ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicId},
value = TopicName}), value = TopicName}),
mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName}, ekka_mnesia:dirty_write(#emqx_sn_registry{key = {predef, TopicName},
value = TopicId}), value = TopicId}),
if TopicId > AccId -> TopicId; true -> AccId end if TopicId > AccId -> TopicId; true -> AccId end
end, 0, PredefTopics), end, 0, PredefTopics),
{ok, #state{max_predef_topic_id = MaxPredefId}}. {ok, #state{max_predef_topic_id = MaxPredefId}}.
@ -157,7 +161,7 @@ handle_call({register, ClientId, TopicName}, _From,
mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId}, mnesia:write(#emqx_sn_registry{key = {ClientId, TopicId},
value = TopicName}) value = TopicName})
end, end,
case mnesia:transaction(Fun) of case ekka_mnesia:transaction(?SN_SHARD, Fun) of
{atomic, ok} -> {atomic, ok} ->
{reply, TopicId, State}; {reply, TopicId, State};
{aborted, Error} -> {aborted, Error} ->
@ -168,7 +172,7 @@ handle_call({register, ClientId, TopicName}, _From,
handle_call({unregister, ClientId}, _From, State) -> handle_call({unregister, ClientId}, _From, State) ->
Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}), Registry = mnesia:dirty_match_object({?TAB, {ClientId, '_'}, '_'}),
lists:foreach(fun(R) -> mnesia:dirty_delete_object(R) end, Registry), lists:foreach(fun(R) -> ekka_mnesia:dirty_delete_object(R) end, Registry),
{reply, ok, State}; {reply, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->

View File

@ -41,9 +41,10 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
application:set_env(ekka, strict_mode, true),
ekka_mnesia:start(), ekka_mnesia:start(),
emqx_sn_registry:mnesia(boot), emqx_sn_registry:mnesia(boot),
mnesia:clear_table(emqx_sn_registry), ekka_mnesia:clear_table(emqx_sn_registry),
PredefTopics = application:get_env(emqx_sn, predefined, []), PredefTopics = application:get_env(emqx_sn, predefined, []),
{ok, _Pid} = ?REGISTRY:start_link(PredefTopics), {ok, _Pid} = ?REGISTRY:start_link(PredefTopics),
Config. Config.
@ -118,4 +119,3 @@ register_a_lot(N, Max) when N < Max ->
Topic = iolist_to_binary(["Topic", integer_to_list(N)]), Topic = iolist_to_binary(["Topic", integer_to_list(N)]),
?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)), ?assertEqual(N, ?REGISTRY:register_topic(<<"ClientId">>, Topic)),
register_a_lot(N+1, Max). register_a_lot(N+1, Max).