refactor: rename emqx_tables to emqx_utils_ets

This commit is contained in:
Stefan Strigler 2023-04-13 14:57:45 +02:00
parent d0df086c80
commit 6e8665365b
17 changed files with 45 additions and 43 deletions

View File

@ -71,7 +71,7 @@
code_change/3
]).
-import(emqx_tables, [lookup_value/2, lookup_value/3]).
-import(emqx_utils_ets, [lookup_value/2, lookup_value/3]).
-ifdef(TEST).
-compile(export_all).
@ -107,15 +107,15 @@ create_tabs() ->
TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
%% SubOption: {Topic, SubPid} -> SubOption
ok = emqx_tables:new(?SUBOPTION, [ordered_set | TabOpts]),
ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]),
%% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
%% duplicate_bag: o(1) insert
ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
ok = emqx_utils_ets:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
%% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
%% bag: o(n) insert:(
ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]).
ok = emqx_utils_ets:new(?SUBSCRIBER, [bag | TabOpts]).
%%------------------------------------------------------------------------------
%% Subscribe API

View File

@ -73,11 +73,11 @@ register_sub(SubPid, SubId) when is_pid(SubPid) ->
-spec lookup_subid(pid()) -> maybe(emqx_types:subid()).
lookup_subid(SubPid) when is_pid(SubPid) ->
emqx_tables:lookup_value(?SUBMON, SubPid).
emqx_utils_ets:lookup_value(?SUBMON, SubPid).
-spec lookup_subpid(emqx_types:subid()) -> maybe(pid()).
lookup_subpid(SubId) ->
emqx_tables:lookup_value(?SUBID, SubId).
emqx_utils_ets:lookup_value(?SUBID, SubId).
-spec get_sub_shard(pid(), emqx_types:topic()) -> non_neg_integer().
get_sub_shard(SubPid, Topic) ->
@ -105,15 +105,15 @@ reclaim_seq(Topic) ->
init([]) ->
%% Helper table
ok = emqx_tables:new(?HELPER, [{read_concurrency, true}]),
ok = emqx_utils_ets:new(?HELPER, [{read_concurrency, true}]),
%% Shards: CPU * 32
true = ets:insert(?HELPER, {shards, emqx_vm:schedulers() * 32}),
%% SubSeq: Topic -> SeqId
ok = emqx_sequence:create(?SUBSEQ),
%% SubId: SubId -> SubPid
ok = emqx_tables:new(?SUBID, [public, {read_concurrency, true}, {write_concurrency, true}]),
ok = emqx_utils_ets:new(?SUBID, [public, {read_concurrency, true}, {write_concurrency, true}]),
%% SubMon: SubPid -> SubId
ok = emqx_tables:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]),
ok = emqx_utils_ets:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]),
%% Stats timer
ok = emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0),
{ok, #{pmon => emqx_pmon:new()}}.

View File

@ -651,10 +651,10 @@ cast(Msg) -> gen_server:cast(?CM, Msg).
init([]) ->
TabOpts = [public, {write_concurrency, true}],
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_INFO_TAB, [ordered_set, compressed | TabOpts]),
ok = emqx_tables:new(?CHAN_LIVE_TAB, [ordered_set, {write_concurrency, true} | TabOpts]),
ok = emqx_utils_ets:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_utils_ets:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_utils_ets:new(?CHAN_INFO_TAB, [ordered_set, compressed | TabOpts]),
ok = emqx_utils_ets:new(?CHAN_LIVE_TAB, [ordered_set, {write_concurrency, true} | TabOpts]),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
State = #{chan_pmon => emqx_pmon:new()},
{ok, State}.

View File

@ -99,7 +99,7 @@ now_diff(TS) -> erlang:system_time(millisecond) - TS.
%%--------------------------------------------------------------------
init([]) ->
ok = emqx_tables:new(?FLAPPING_TAB, [
ok = emqx_utils_ets:new(?FLAPPING_TAB, [
public,
set,
{keypos, #flapping.clientid},

View File

@ -229,7 +229,7 @@ lookup(HookPoint) ->
%%--------------------------------------------------------------------
init([]) ->
ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
ok = emqx_utils_ets:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
{ok, #{}}.
handle_call({add, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) ->

View File

@ -541,7 +541,7 @@ init([]) ->
CRef = counters:new(?MAX_SIZE, [write_concurrency]),
ok = persistent_term:put(?MODULE, CRef),
% Create index mapping table
ok = emqx_tables:new(?TAB, [{keypos, 2}, {read_concurrency, true}]),
ok = emqx_utils_ets:new(?TAB, [{keypos, 2}, {read_concurrency, true}]),
Metrics = lists:append([
?BYTES_METRICS,
?PACKET_METRICS,

View File

@ -120,7 +120,7 @@ inject_sni_fun(ListenerID, Conf0) ->
init(_Args) ->
logger:set_process_metadata(#{domain => [emqx, ocsp, cache]}),
emqx_tables:new(?CACHE_TAB, [
emqx_utils_ets:new(?CACHE_TAB, [
named_table,
public,
{heir, whereis(emqx_kernel_sup), none},

View File

@ -39,7 +39,7 @@
%% @doc Create a sequence.
-spec create(name()) -> ok.
create(Name) ->
emqx_tables:new(Name, [public, set, {write_concurrency, true}]).
emqx_utils_ets:new(Name, [public, set, {write_concurrency, true}]).
%% @doc Next value of the sequence.
-spec nextval(name(), key()) -> seqid().

View File

@ -95,7 +95,7 @@ create_table(Tab, Storage) ->
%%--------------------------------------------------------------------
create_init_tab() ->
emqx_tables:new(?SESSION_INIT_TAB, [
emqx_utils_ets:new(?SESSION_INIT_TAB, [
public,
{read_concurrency, true},
{write_concurrency, true}
@ -182,7 +182,7 @@ pending(SessionID, MarkerIDs) ->
call(pick(SessionID), {pending, SessionID, MarkerIDs}).
buffer(SessionID, STopic, Msg) ->
case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
undefined -> ok;
Worker -> emqx_session_router_worker:buffer(Worker, STopic, Msg)
end.
@ -194,7 +194,7 @@ resume_begin(From, SessionID) when is_pid(From), is_binary(SessionID) ->
-spec resume_end(pid(), binary()) ->
{'ok', [emqx_types:message()]} | {'error', term()}.
resume_end(From, SessionID) when is_pid(From), is_binary(SessionID) ->
case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
undefined ->
?tp(ps_session_not_found, #{sid => SessionID}),
{error, not_found};
@ -249,7 +249,7 @@ handle_cast({delete_routes, SessionID, Subscriptions}, State) ->
ok = lists:foreach(Fun, maps:to_list(Subscriptions)),
{noreply, State};
handle_cast({resume_end, SessionID, Pid}, State) ->
case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
undefined -> skip;
P when P =:= Pid -> ets:delete(?SESSION_INIT_TAB, SessionID);
P when is_pid(P) -> skip
@ -283,7 +283,7 @@ init_resume_worker(RemotePid, SessionID, #{pmon := Pmon} = State) ->
error;
{ok, Pid} ->
Pmon1 = emqx_pmon:monitor(Pid, Pmon),
case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of
case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of
undefined ->
{ok, Pid, State#{pmon => Pmon1}};
{_, OldPid} ->

View File

@ -399,9 +399,11 @@ init([]) ->
ok = mria:wait_for_tables([?TAB]),
{ok, _} = mnesia:subscribe({table, ?TAB, simple}),
{atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0),
ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
ok = emqx_tables:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [public, set, {write_concurrency, true}]),
ok = emqx_utils_ets:new(?SHARED_SUBS, [protected, bag]),
ok = emqx_utils_ets:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
ok = emqx_utils_ets:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [
public, set, {write_concurrency, true}
]),
{ok, update_stats(#state{pmon = PMon})}.
init_monitors() ->

View File

@ -201,7 +201,7 @@ cast(Msg) -> gen_server:cast(?SERVER, Msg).
%%--------------------------------------------------------------------
init(#{tick_ms := TickMs}) ->
ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]),
ok = emqx_utils_ets:new(?TAB, [public, set, {write_concurrency, true}]),
Stats = lists:append([
?CONNECTION_STATS,
?CHANNEL_STATS,

View File

@ -766,9 +766,9 @@ init(Options) ->
TabOpts = [public, {write_concurrency, true}],
{ChanTab, ConnTab, InfoTab} = cmtabs(GwName),
ok = emqx_tables:new(ChanTab, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_tables:new(ConnTab, [bag | TabOpts]),
ok = emqx_tables:new(InfoTab, [ordered_set, compressed | TabOpts]),
ok = emqx_utils_ets:new(ChanTab, [bag, {read_concurrency, true} | TabOpts]),
ok = emqx_utils_ets:new(ConnTab, [bag | TabOpts]),
ok = emqx_utils_ets:new(InfoTab, [ordered_set, compressed | TabOpts]),
%% Start link cm-registry process
%% XXX: Should I hang it under a higher level supervisor?

View File

@ -89,7 +89,7 @@ tabname(GwName) ->
init([GwName]) ->
TabOpts = [public, {write_concurrency, true}],
ok = emqx_tables:new(tabname(GwName), [set | TabOpts]),
ok = emqx_utils_ets:new(tabname(GwName), [set | TabOpts]),
{ok, #state{}}.
handle_call(_Request, _From, State) ->

View File

@ -201,7 +201,7 @@ reset() ->
init([Opts]) ->
erlang:process_flag(trap_exit, true),
ok = emqx_tables:new(?TAB, [{read_concurrency, true}]),
ok = emqx_utils_ets:new(?TAB, [{read_concurrency, true}]),
erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking),
Fun =
fun(#{topic := Topic}, CurrentSpeeds) ->

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_tables).
-module(emqx_utils_ets).
-export([
new/1,

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_tables_SUITE).
-module(emqx_utils_ets_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -26,19 +26,19 @@
all() -> emqx_common_test_helpers:all(?MODULE).
t_new(_) ->
ok = emqx_tables:new(?TAB),
ok = emqx_tables:new(?TAB, [{read_concurrency, true}]),
ok = emqx_utils_ets:new(?TAB),
ok = emqx_utils_ets:new(?TAB, [{read_concurrency, true}]),
?assertEqual(?TAB, ets:info(?TAB, name)).
t_lookup_value(_) ->
ok = emqx_tables:new(?TAB, []),
ok = emqx_utils_ets:new(?TAB, []),
true = ets:insert(?TAB, {key, val}),
?assertEqual(val, emqx_tables:lookup_value(?TAB, key)),
?assertEqual(undefined, emqx_tables:lookup_value(?TAB, badkey)).
?assertEqual(val, emqx_utils_ets:lookup_value(?TAB, key)),
?assertEqual(undefined, emqx_utils_ets:lookup_value(?TAB, badkey)).
t_delete(_) ->
ok = emqx_tables:new(?TAB, []),
ok = emqx_utils_ets:new(?TAB, []),
?assertEqual(?TAB, ets:info(?TAB, name)),
ok = emqx_tables:delete(?TAB),
ok = emqx_tables:delete(?TAB),
ok = emqx_utils_ets:delete(?TAB),
ok = emqx_utils_ets:delete(?TAB),
?assertEqual(undefined, ets:info(?TAB, name)).

View File

@ -68,7 +68,7 @@ trace_rule(Data, Envs, _Args) ->
make_trace_fn_action() ->
persistent_term:put({?MODULE, test_pid}, self()),
Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>,
emqx_tables:new(recorded_actions, [named_table, public, ordered_set]),
emqx_utils_ets:new(recorded_actions, [named_table, public, ordered_set]),
#{function => Fn, args => #{}}.
create_rule_http(RuleParams) ->