From 2a747c9d538cd682c6228a6a799470089b9ed90d Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 12 Dec 2018 13:34:13 +0800 Subject: [PATCH] Improve the subscription sharding. --- src/emqx.erl | 6 +- src/emqx_broker.erl | 217 +++++++++++++++++++---------------- src/emqx_broker_helper.erl | 80 ++++++++----- src/emqx_router_helper.erl | 2 +- src/emqx_sequence.erl | 1 + src/emqx_sm.erl | 2 +- src/emqx_tables.erl | 14 +++ test/emqx_sequence_SUITE.erl | 3 +- test/emqx_tables_SUITE.erl | 8 +- 9 files changed, 197 insertions(+), 136 deletions(-) diff --git a/src/emqx.erl b/src/emqx.erl index 3792cc4f8..76e966a59 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -22,7 +22,7 @@ %% PubSub API -export([subscribe/1, subscribe/2, subscribe/3]). -export([publish/1]). --export([unsubscribe/1, unsubscribe/2]). +-export([unsubscribe/1]). %% PubSub management API -export([topics/0, subscriptions/1, subscribers/1, subscribed/2]). @@ -88,10 +88,6 @@ publish(Msg) -> unsubscribe(Topic) -> emqx_broker:unsubscribe(iolist_to_binary(Topic)). --spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid()) -> ok). -unsubscribe(Topic, SubId) -> - emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId). - %%------------------------------------------------------------------------------ %% PubSub management API %%------------------------------------------------------------------------------ diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index a00dc17b8..9ed4aad06 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -20,7 +20,7 @@ -export([start_link/2]). -export([subscribe/1, subscribe/2, subscribe/3]). --export([unsubscribe/1, unsubscribe/2]). +-export([unsubscribe/1]). -export([subscriber_down/1]). -export([publish/1, safe_publish/1]). -export([dispatch/2]). @@ -35,6 +35,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-import(emqx_tables, [lookup_value/2, lookup_value/3]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). @@ -42,8 +44,7 @@ -define(BROKER, ?MODULE). -%% ETS tables --define(SUBID, emqx_subid). +%% ETS tables for PubSub -define(SUBOPTION, emqx_suboption). -define(SUBSCRIBER, emqx_subscriber). -define(SUBSCRIPTION, emqx_subscription). @@ -65,9 +66,6 @@ start_link(Pool, Id) -> create_tabs() -> TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], - %% SubId: SubId -> SubPid - ok = emqx_tables:new(?SUBID, [set | TabOpts]), - %% SubOption: {SubPid, Topic} -> SubOption ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]), @@ -76,7 +74,7 @@ create_tabs() -> ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... - %% bag: o(n) insert + %% bag: o(n) insert:( ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]). %%------------------------------------------------------------------------------ @@ -98,28 +96,37 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map SubPid = self(), case ets:member(?SUBOPTION, {SubPid, Topic}) of false -> - ok = emqx_broker_helper:monitor(SubPid, SubId), - %% true = ets:insert(?SUBID, {SubId, SubPid}), - true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), - case maps:get(share, SubOpts, undefined) of - undefined -> - Shard = emqx_broker_helper:get_shard(SubPid, Topic), - case Shard of - 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}); - I -> - true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}) - end, - SubOpts1 = maps:put(shard, Shard, SubOpts), - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}), - call(pick({Topic, Shard}), {subscribe, Topic}); - Group -> %% Shard subscription - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), - emqx_shared_sub:subscribe(Group, Topic, SubPid) - end; + ok = emqx_broker_helper:monitor_sub(SubPid, SubId), + do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts)); true -> ok end. +with_subid(undefined, SubOpts) -> + SubOpts; +with_subid(SubId, SubOpts) -> + maps:put(subid, SubId, SubOpts). + +%% @private +do_subscribe(Topic, SubPid, SubOpts) -> + true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), + Group = maps:get(share, SubOpts, undefined), + do_subscribe(Group, Topic, SubPid, SubOpts). + +do_subscribe(undefined, Topic, SubPid, SubOpts) -> + case emqx_broker_helper:get_sub_shard(SubPid, Topic) of + 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), + true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + call(pick(Topic), {subscribe, Topic}); + I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + true = ets:insert(?SUBOPTION, {{SubPid, Topic}, maps:put(shard, I, SubOpts)}), + call(pick({Topic, I}), {subscribe, Topic, I}) + end; + +%% Shared subscription +do_subscribe(Group, Topic, SubPid, SubOpts) -> + true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + emqx_shared_sub:subscribe(Group, Topic, SubPid). + %%------------------------------------------------------------------------------ %% Unsubscribe API %%------------------------------------------------------------------------------ @@ -130,33 +137,26 @@ unsubscribe(Topic) when is_binary(Topic) -> case ets:lookup(?SUBOPTION, {SubPid, Topic}) of [{_, SubOpts}] -> _ = emqx_broker_helper:reclaim_seq(Topic), - case maps:get(share, SubOpts, undefined) of - undefined -> - case maps:get(shard, SubOpts, 0) of - 0 -> - true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> - true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - case ets:member(emqx_subscriber, {shard, Topic, I}) of - true -> ok; - false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}) - end, - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end; - Group -> - ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid) - end, - true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), - %%true = ets:delete_object(?SUBID, {SubId, SubPid}), - true = ets:delete(?SUBOPTION, {SubPid, Topic}), - ok; + do_unsubscribe(Topic, SubPid, SubOpts); [] -> ok end. --spec(unsubscribe(emqx_topic:topic(), emqx_types:subid()) -> ok). -unsubscribe(Topic, _SubId) when is_binary(Topic) -> - unsubscribe(Topic). +do_unsubscribe(Topic, SubPid, SubOpts) -> + true = ets:delete(?SUBOPTION, {SubPid, Topic}), + true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), + Group = maps:get(share, SubOpts, undefined), + do_unsubscribe(Group, Topic, SubPid, SubOpts). + +do_unsubscribe(undefined, Topic, SubPid, SubOpts) -> + case maps:get(shard, SubOpts, 0) of + 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + cast(pick(Topic), {unsubscribed, Topic}); + I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + cast(pick({Topic, I}), {unsubscribed, Topic, I}) + end; + +do_unsubscribe(Group, Topic, SubPid, _SubOpts) -> + emqx_shared_sub:unsubscribe(Group, Topic, SubPid). %%------------------------------------------------------------------------------ %% Publish @@ -241,23 +241,28 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> inc_dropped_cnt(Topic), Delivery; [Sub] -> %% optimize? - dispatch(Sub, Topic, Msg), - Delivery#delivery{results = [{dispatch, Topic, 1}|Results]}; + Cnt = dispatch(Sub, Topic, Msg), + Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]}; Subs -> - Count = lists:foldl( - fun(Sub, Acc) -> - dispatch(Sub, Topic, Msg), Acc + 1 - end, 0, Subs), - Delivery#delivery{results = [{dispatch, Topic, Count}|Results]} + Cnt = lists:foldl( + fun(Sub, Acc) -> + dispatch(Sub, Topic, Msg) + Acc + end, 0, Subs), + Delivery#delivery{results = [{dispatch, Topic, Cnt}|Results]} end. dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> - SubPid ! {dispatch, Topic, Msg}; + case erlang:is_process_alive(SubPid) of + true -> + SubPid ! {dispatch, Topic, Msg}, + 1; + false -> 0 + end; dispatch({shard, I}, Topic, Msg) -> - - lists:foreach(fun(SubPid) -> - SubPid ! {dispatch, Topic, Msg} - end, safe_lookup_element(?SUBSCRIBER, {shard, Topic, I}, [])). + lists:foldl( + fun(SubPid, Cnt) -> + dispatch(SubPid, Topic, Msg) + Cnt + end, 0, subscribers({shard, Topic, I})). inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; @@ -265,8 +270,10 @@ inc_dropped_cnt(_Topic) -> emqx_metrics:inc('messages/dropped'). -spec(subscribers(emqx_topic:topic()) -> [pid()]). -subscribers(Topic) -> - safe_lookup_element(?SUBSCRIBER, Topic, []). +subscribers(Topic) when is_binary(Topic) -> + lookup_value(?SUBSCRIBER, Topic, []); +subscribers(Shard = {shard, _Topic, _I}) -> + lookup_value(?SUBSCRIBER, Shard, []). %%------------------------------------------------------------------------------ %% Subscriber is down @@ -275,27 +282,21 @@ subscribers(Topic) -> -spec(subscriber_down(pid()) -> true). subscriber_down(SubPid) -> lists:foreach( - fun(Sub = {_Pid, Topic}) -> - case ets:lookup(?SUBOPTION, Sub) of - [{_, SubOpts}] -> + fun(Topic) -> + case lookup_value(?SUBOPTION, {SubPid, Topic}) of + SubOpts when is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), + true = ets:delete(?SUBOPTION, {SubPid, Topic}), case maps:get(shard, SubOpts, 0) of - 0 -> - true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> - true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - case ets:member(emqx_subscriber, {shard, Topic, I}) of - true -> ok; - false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}) - end, - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end, - ets:delete(?SUBOPTION, Sub); - [] -> ok + 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = cast(pick(Topic), {unsubscribed, Topic}); + I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) + end; + undefined -> ok end - end, ets:lookup(?SUBSCRIPTION, SubPid)), - true = ets:delete(?SUBSCRIPTION, SubPid). + end, lookup_value(?SUBSCRIPTION, SubPid, [])), + ets:delete(?SUBSCRIPTION, SubPid). %%------------------------------------------------------------------------------ %% Management APIs @@ -303,20 +304,32 @@ subscriber_down(SubPid) -> -spec(subscriptions(pid() | emqx_types:subid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]). -subscriptions(SubPid) -> - [{Topic, safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{})} - || Topic <- safe_lookup_element(?SUBSCRIPTION, SubPid, [])]. +subscriptions(SubPid) when is_pid(SubPid) -> + [{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})} + || Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])]; +subscriptions(SubId) -> + case emqx_broker_helper:lookup_subpid(SubId) of + SubPid when is_pid(SubPid) -> + subscriptions(SubPid); + undefined -> [] + end. -spec(subscribed(pid(), emqx_topic:topic()) -> boolean()). subscribed(SubPid, Topic) when is_pid(SubPid) -> ets:member(?SUBOPTION, {SubPid, Topic}); subscribed(SubId, Topic) when ?is_subid(SubId) -> - %%FIXME:... SubId -> SubPid - ets:member(?SUBOPTION, {SubId, Topic}). + SubPid = emqx_broker_helper:lookup_subpid(SubId), + ets:member(?SUBOPTION, {SubPid, Topic}). --spec(get_subopts(pid(), emqx_topic:topic()) -> emqx_types:subopts()). +-spec(get_subopts(pid(), emqx_topic:topic()) -> emqx_types:subopts() | undefined). get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> - safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{}). + lookup_value(?SUBOPTION, {SubPid, Topic}); +get_subopts(SubId, Topic) when ?is_subid(SubId) -> + case emqx_broker_helper:lookup_subpid(SubId) of + SubPid when is_pid(SubPid) -> + get_subopts(SubPid, Topic); + undefined -> undefined + end. -spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()). set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> @@ -331,9 +344,6 @@ set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> topics() -> emqx_router:topics(). -safe_lookup_element(Tab, Key, Def) -> - try ets:lookup_element(Tab, Key, 2) catch error:badarg -> Def end. - %%------------------------------------------------------------------------------ %% Stats fun %%------------------------------------------------------------------------------ @@ -372,10 +382,15 @@ init([Pool, Id]) -> {ok, #{pool => Pool, id => Id}}. handle_call({subscribe, Topic}, _From, State) -> - Ok = case get(Topic) of + Ok = emqx_router:do_add_route(Topic), + {reply, Ok, State}; + +handle_call({subscribe, Topic, I}, _From, State) -> + Ok = case get(Shard = {Topic, I}) of undefined -> - _ = put(Topic, true), - emqx_router:do_add_route(Topic); + _ = put(Shard, true), + true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}), + cast(pick(Topic), {subscribe, Topic}); true -> ok end, {reply, Ok, State}; @@ -384,11 +399,18 @@ handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), {reply, ignored, State}. +handle_cast({subscribe, Topic}, State) -> + case emqx_router:do_add_route(Topic) of + ok -> ok; + {error, Reason} -> + emqx_logger:error("[Broker] Failed to add route: ~p", [Reason]) + end, + {noreply, State}; + handle_cast({unsubscribed, Topic}, State) -> case ets:member(?SUBSCRIBER, Topic) of false -> - _ = erase(Topic), - emqx_router:do_delete_route(Topic); + _ = emqx_router:do_delete_route(Topic); true -> ok end, {noreply, State}; @@ -396,6 +418,7 @@ handle_cast({unsubscribed, Topic}, State) -> handle_cast({unsubscribed, Topic, I}, State) -> case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> + _ = erase({Topic, I}), true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), cast(pick(Topic), {unsubscribed, Topic}); true -> ok diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index d3e7f9d37..7d514e31d 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -16,44 +16,56 @@ -behaviour(gen_server). --compile({no_auto_import, [monitor/2]}). - -export([start_link/0]). --export([monitor/2]). --export([get_shard/2]). +-export([register_sub/2]). +-export([lookup_subid/1, lookup_subpid/1]). +-export([get_sub_shard/2]). -export([create_seq/1, reclaim_seq/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(HELPER, ?MODULE). +-define(SUBID, emqx_subid). -define(SUBMON, emqx_submon). -define(SUBSEQ, emqx_subseq). - --record(state, {pmon :: emqx_pmon:pmon()}). +-define(SHARD, 1024). -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?HELPER}, ?MODULE, [], []). --spec(monitor(pid(), emqx_types:subid()) -> ok). -monitor(SubPid, SubId) when is_pid(SubPid) -> +-spec(register_sub(pid(), emqx_types:subid()) -> ok). +register_sub(SubPid, SubId) when is_pid(SubPid) -> case ets:lookup(?SUBMON, SubPid) of [] -> - gen_server:cast(?HELPER, {monitor, SubPid, SubId}); + gen_server:cast(?HELPER, {register_sub, SubPid, SubId}); [{_, SubId}] -> ok; _Other -> error(subid_conflict) end. --spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). -get_shard(SubPid, Topic) -> +-spec(lookup_subid(pid()) -> emqx_types:subid() | undefined). +lookup_subid(SubPid) when is_pid(SubPid) -> + emqx_tables:lookup_value(?SUBMON, SubPid). + +-spec(lookup_subpid(emqx_types:subid()) -> pid()). +lookup_subpid(SubId) -> + emqx_tables:lookup_value(?SUBID, SubId). + +-spec(get_sub_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). +get_sub_shard(SubPid, Topic) -> case create_seq(Topic) of - Seq when Seq =< 1024 -> 0; - _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2)) + Seq when Seq =< ?SHARD -> 0; + _ -> erlang:phash2(SubPid, shards_num()) + 1 end. +-spec(shards_num() -> pos_integer()). +shards_num() -> + %% Dynamic sharding later... + ets:lookup_element(?HELPER, shards, 2). + -spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). create_seq(Topic) -> emqx_sequence:nextval(?SUBSEQ, Topic). @@ -67,41 +79,55 @@ reclaim_seq(Topic) -> %%------------------------------------------------------------------------------ init([]) -> + %% Helper table + ok = emqx_tables:new(?HELPER, [{read_concurrency, true}]), + %% Shards: CPU * 32 + true = ets:insert(?HELPER, {shards, emqx_vm:schedulers() * 32}), %% SubSeq: Topic -> SeqId ok = emqx_sequence:create(?SUBSEQ), - %% Shards: CPU * 32 - true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}), + %% SubId: SubId -> SubPid + ok = emqx_tables:new(?SUBID, [public, {read_concurrency, true}, {write_concurrency, true}]), %% SubMon: SubPid -> SubId - ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), + ok = emqx_tables:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]), %% Stats timer - emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), - {ok, #state{pmon = emqx_pmon:new()}, hibernate}. + ok = emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), + {ok, #{pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> emqx_logger:error("[BrokerHelper] unexpected call: ~p", [Req]), - {reply, ignored, State}. + {reply, ignored, State}. -handle_cast({monitor, SubPid, SubId}, State = #state{pmon = PMon}) -> +handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) -> + true = (SubId =:= undefined) orelse ets:insert(?SUBID, {SubId, SubPid}), true = ets:insert(?SUBMON, {SubPid, SubId}), - {noreply, State#state{pmon = emqx_pmon:monitor(SubPid, PMon)}}; + {noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}}; handle_cast(Msg, State) -> emqx_logger:error("[BrokerHelper] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> - true = ets:delete(?SUBMON, SubPid), - ok = emqx_pool:async_submit(fun emqx_broker:subscriber_down/1, [SubPid]), - {noreply, State#state{pmon = emqx_pmon:erase(SubPid, PMon)}}; +handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #{pmon := PMon}) -> + case ets:lookup(?SUBMON, SubPid) of + [{_, SubId}] -> + ok = emqx_pool:async_submit(fun subscriber_down/2, [SubPid, SubId]); + [] -> + emqx_logger:error("[BrokerHelper] unexpected DOWN: ~p, reason: ~p", [SubPid, Reason]) + end, + {noreply, State#{pmon := emqx_pmon:erase(SubPid, PMon)}}; handle_info(Info, State) -> emqx_logger:error("[BrokerHelper] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{}) -> - _ = emqx_sequence:delete(?SUBSEQ), +terminate(_Reason, _State) -> + true = emqx_sequence:delete(?SUBSEQ), emqx_stats:cancel_update(broker_stats). code_change(_OldVsn, State, _Extra) -> {ok, State}. +subscriber_down(SubPid, SubId) -> + true = ets:delete(?SUBMON, SubPid), + true = (SubId =:= undefined) orelse ets:delete_object(?SUBID, {SubId, SubPid}), + emqx_broker:subscriber_down(SubPid). + diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index efeaabc74..c32800a24 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -90,7 +90,7 @@ init([]) -> [Node | Acc] end end, [], mnesia:dirty_all_keys(?ROUTING_NODE)), - emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0), + ok = emqx_stats:update_interval(route_stats, fun ?MODULE:stats_fun/0), {ok, #{nodes => Nodes}, hibernate}. handle_call(Req, _From, State) -> diff --git a/src/emqx_sequence.erl b/src/emqx_sequence.erl index 022531df5..33bb5edda 100644 --- a/src/emqx_sequence.erl +++ b/src/emqx_sequence.erl @@ -51,6 +51,7 @@ reclaim(Name, Key) -> end. %% @doc Delete the sequence. +-spec(delete(name()) -> boolean()). delete(Name) -> case ets:info(Name, name) of Name -> ets:delete(Name); diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index d178a8ae7..637e44b0c 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -206,7 +206,7 @@ init([]) -> ok = emqx_tables:new(?SESSION_P_TAB, TabOpts), ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), - emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0), + ok = emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0), {ok, #{session_pmon => emqx_pmon:new()}}. handle_call(Req, _From, State) -> diff --git a/src/emqx_tables.erl b/src/emqx_tables.erl index 9b3ebfeae..fdb106a99 100644 --- a/src/emqx_tables.erl +++ b/src/emqx_tables.erl @@ -15,6 +15,7 @@ -module(emqx_tables). -export([new/2]). +-export([lookup_value/2, lookup_value/3]). %% Create a named_table ets. -spec(new(atom(), list()) -> ok). @@ -26,3 +27,16 @@ new(Tab, Opts) -> Tab -> ok end. +%% KV lookup +-spec(lookup_value(atom(), term()) -> any()). +lookup_value(Tab, Key) -> + lookup_value(Tab, Key, undefined). + +-spec(lookup_value(atom(), term(), any()) -> any()). +lookup_value(Tab, Key, Def) -> + try + ets:lookup_element(Tab, Key, 2) + catch + error:badarg -> Def + end. + diff --git a/test/emqx_sequence_SUITE.erl b/test/emqx_sequence_SUITE.erl index f37b60d76..1ac0ea308 100644 --- a/test/emqx_sequence_SUITE.erl +++ b/test/emqx_sequence_SUITE.erl @@ -33,5 +33,6 @@ sequence_generate(_) -> ?assertEqual(1, reclaim(seqtab, key)), ?assertEqual(0, reclaim(seqtab, key)), ?assertEqual(false, ets:member(seqtab, key)), - ?assertEqual(1, nextval(seqtab, key)). + ?assertEqual(1, nextval(seqtab, key)), + ?assert(emqx_sequence:delete(seqtab). diff --git a/test/emqx_tables_SUITE.erl b/test/emqx_tables_SUITE.erl index 95590b0e9..1002c0a0b 100644 --- a/test/emqx_tables_SUITE.erl +++ b/test/emqx_tables_SUITE.erl @@ -20,7 +20,7 @@ all() -> [t_new]. t_new(_) -> - TId = emqx_tables:new(test_table, [{read_concurrency, true}]), - ets:insert(TId, {loss, 100}), - TId = emqx_tables:new(test_table, [{read_concurrency, true}]), - 100 = ets:lookup_element(TId, loss, 2). + ok = emqx_tables:new(test_table, [{read_concurrency, true}]), + ets:insert(test_table, {key, 100}), + ok = emqx_tables:new(test_table, [{read_concurrency, true}]), + 100 = ets:lookup_element(test_table, key, 2).