Add a sequence module to generate index for subscription sharding

This commit is contained in:
Feng Lee 2018-12-04 15:59:24 +08:00
parent 520a5e0225
commit b4d981daf2
3 changed files with 96 additions and 1 deletions

View File

@ -36,7 +36,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch
emqx_hooks emqx_batch emqx_sequence
CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

58
src/emqx_sequence.erl Normal file
View File

@ -0,0 +1,58 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_sequence).
-export([create/0, create/1]).
-export([generate/1, generate/2]).
-export([reclaim/1, reclaim/2]).
-type(key() :: term()).
-type(seqid() :: non_neg_integer()).
-define(DEFAULT_TAB, ?MODULE).
%% @doc Create a sequence.
-spec(create() -> ok).
create() ->
create(?DEFAULT_TAB).
-spec(create(atom()) -> ok).
create(Tab) ->
_ = ets:new(Tab, [set, public, named_table, {write_concurrency, true}]),
ok.
%% @doc Generate a sequence id.
-spec(generate(key()) -> seqid()).
generate(Key) ->
generate(?DEFAULT_TAB, Key).
-spec(generate(atom(), key()) -> seqid()).
generate(Tab, Key) ->
ets:update_counter(Tab, Key, {2, 1}, {Key, 0}).
%% @doc Reclaim a sequence id.
-spec(reclaim(key()) -> seqid()).
reclaim(Key) ->
reclaim(?DEFAULT_TAB, Key).
-spec(reclaim(atom(), key()) -> seqid()).
reclaim(Tab, Key) ->
try ets:update_counter(Tab, Key, {2, -1, 0, 0}) of
0 -> ets:delete_object(Tab, {Key, 0}), 0;
I -> I
catch
error:badarg -> 0
end.

View File

@ -0,0 +1,37 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_sequence_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-import(emqx_sequence, [generate/1, reclaim/1]).
all() ->
[sequence_generate].
sequence_generate(_) ->
ok = emqx_sequence:create(),
?assertEqual(1, generate(key)),
?assertEqual(2, generate(key)),
?assertEqual(3, generate(key)),
?assertEqual(2, reclaim(key)),
?assertEqual(1, reclaim(key)),
?assertEqual(0, reclaim(key)),
?assertEqual(false, ets:member(emqx_sequence, key)),
?assertEqual(1, generate(key)).