From b4d981daf2a549554e8e76f88b9b3c0676779932 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 4 Dec 2018 15:59:24 +0800 Subject: [PATCH] Add a sequence module to generate index for subscription sharding --- Makefile | 2 +- src/emqx_sequence.erl | 58 ++++++++++++++++++++++++++++++++++++ test/emqx_sequence_SUITE.erl | 37 +++++++++++++++++++++++ 3 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 src/emqx_sequence.erl create mode 100644 test/emqx_sequence_SUITE.erl diff --git a/Makefile b/Makefile index 26bcf22ce..2c1693813 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch + emqx_hooks emqx_batch emqx_sequence CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_sequence.erl b/src/emqx_sequence.erl new file mode 100644 index 000000000..62a882294 --- /dev/null +++ b/src/emqx_sequence.erl @@ -0,0 +1,58 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_sequence). + +-export([create/0, create/1]). +-export([generate/1, generate/2]). +-export([reclaim/1, reclaim/2]). + +-type(key() :: term()). +-type(seqid() :: non_neg_integer()). + +-define(DEFAULT_TAB, ?MODULE). + +%% @doc Create a sequence. +-spec(create() -> ok). +create() -> + create(?DEFAULT_TAB). + +-spec(create(atom()) -> ok). +create(Tab) -> + _ = ets:new(Tab, [set, public, named_table, {write_concurrency, true}]), + ok. + +%% @doc Generate a sequence id. +-spec(generate(key()) -> seqid()). +generate(Key) -> + generate(?DEFAULT_TAB, Key). + +-spec(generate(atom(), key()) -> seqid()). +generate(Tab, Key) -> + ets:update_counter(Tab, Key, {2, 1}, {Key, 0}). + +%% @doc Reclaim a sequence id. +-spec(reclaim(key()) -> seqid()). +reclaim(Key) -> + reclaim(?DEFAULT_TAB, Key). + +-spec(reclaim(atom(), key()) -> seqid()). +reclaim(Tab, Key) -> + try ets:update_counter(Tab, Key, {2, -1, 0, 0}) of + 0 -> ets:delete_object(Tab, {Key, 0}), 0; + I -> I + catch + error:badarg -> 0 + end. + diff --git a/test/emqx_sequence_SUITE.erl b/test/emqx_sequence_SUITE.erl new file mode 100644 index 000000000..999a95723 --- /dev/null +++ b/test/emqx_sequence_SUITE.erl @@ -0,0 +1,37 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_sequence_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-import(emqx_sequence, [generate/1, reclaim/1]). + +all() -> + [sequence_generate]. + +sequence_generate(_) -> + ok = emqx_sequence:create(), + ?assertEqual(1, generate(key)), + ?assertEqual(2, generate(key)), + ?assertEqual(3, generate(key)), + ?assertEqual(2, reclaim(key)), + ?assertEqual(1, reclaim(key)), + ?assertEqual(0, reclaim(key)), + ?assertEqual(false, ets:member(emqx_sequence, key)), + ?assertEqual(1, generate(key)). +