From 6e8665365baa952dff6161bd66290dd53c1bbaa8 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 13 Apr 2023 14:57:45 +0200 Subject: [PATCH] refactor: rename emqx_tables to emqx_utils_ets --- apps/emqx/src/emqx_broker.erl | 8 ++++---- apps/emqx/src/emqx_broker_helper.erl | 10 +++++----- apps/emqx/src/emqx_cm.erl | 8 ++++---- apps/emqx/src/emqx_flapping.erl | 2 +- apps/emqx/src/emqx_hooks.erl | 2 +- apps/emqx/src/emqx_metrics.erl | 2 +- apps/emqx/src/emqx_ocsp_cache.erl | 2 +- apps/emqx/src/emqx_sequence.erl | 2 +- apps/emqx/src/emqx_session_router.erl | 10 +++++----- apps/emqx/src/emqx_shared_sub.erl | 8 +++++--- apps/emqx/src/emqx_stats.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_cm.erl | 6 +++--- apps/emqx_gateway/src/emqx_gateway_metrics.erl | 2 +- apps/emqx_modules/src/emqx_topic_metrics.erl | 2 +- .../src/emqx_utils_ets.erl} | 2 +- .../test/emqx_utils_ets_SUITE.erl} | 18 +++++++++--------- .../test/emqx_ee_schema_registry_SUITE.erl | 2 +- 17 files changed, 45 insertions(+), 43 deletions(-) rename apps/{emqx/src/emqx_tables.erl => emqx_utils/src/emqx_utils_ets.erl} (98%) rename apps/{emqx/test/emqx_tables_SUITE.erl => emqx_utils/test/emqx_utils_ets_SUITE.erl} (73%) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 7e00c050e..5f7c4aaf5 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -71,7 +71,7 @@ code_change/3 ]). --import(emqx_tables, [lookup_value/2, lookup_value/3]). +-import(emqx_utils_ets, [lookup_value/2, lookup_value/3]). -ifdef(TEST). -compile(export_all). @@ -107,15 +107,15 @@ create_tabs() -> TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], %% SubOption: {Topic, SubPid} -> SubOption - ok = emqx_tables:new(?SUBOPTION, [ordered_set | TabOpts]), + ok = emqx_utils_ets:new(?SUBOPTION, [ordered_set | TabOpts]), %% Subscription: SubPid -> Topic1, Topic2, Topic3, ... %% duplicate_bag: o(1) insert - ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), + ok = emqx_utils_ets:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... %% bag: o(n) insert:( - ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]). + ok = emqx_utils_ets:new(?SUBSCRIBER, [bag | TabOpts]). %%------------------------------------------------------------------------------ %% Subscribe API diff --git a/apps/emqx/src/emqx_broker_helper.erl b/apps/emqx/src/emqx_broker_helper.erl index 87b291361..06f249678 100644 --- a/apps/emqx/src/emqx_broker_helper.erl +++ b/apps/emqx/src/emqx_broker_helper.erl @@ -73,11 +73,11 @@ register_sub(SubPid, SubId) when is_pid(SubPid) -> -spec lookup_subid(pid()) -> maybe(emqx_types:subid()). lookup_subid(SubPid) when is_pid(SubPid) -> - emqx_tables:lookup_value(?SUBMON, SubPid). + emqx_utils_ets:lookup_value(?SUBMON, SubPid). -spec lookup_subpid(emqx_types:subid()) -> maybe(pid()). lookup_subpid(SubId) -> - emqx_tables:lookup_value(?SUBID, SubId). + emqx_utils_ets:lookup_value(?SUBID, SubId). -spec get_sub_shard(pid(), emqx_types:topic()) -> non_neg_integer(). get_sub_shard(SubPid, Topic) -> @@ -105,15 +105,15 @@ reclaim_seq(Topic) -> init([]) -> %% Helper table - ok = emqx_tables:new(?HELPER, [{read_concurrency, true}]), + ok = emqx_utils_ets:new(?HELPER, [{read_concurrency, true}]), %% Shards: CPU * 32 true = ets:insert(?HELPER, {shards, emqx_vm:schedulers() * 32}), %% SubSeq: Topic -> SeqId ok = emqx_sequence:create(?SUBSEQ), %% SubId: SubId -> SubPid - ok = emqx_tables:new(?SUBID, [public, {read_concurrency, true}, {write_concurrency, true}]), + ok = emqx_utils_ets:new(?SUBID, [public, {read_concurrency, true}, {write_concurrency, true}]), %% SubMon: SubPid -> SubId - ok = emqx_tables:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]), + ok = emqx_utils_ets:new(?SUBMON, [public, {read_concurrency, true}, {write_concurrency, true}]), %% Stats timer ok = emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), {ok, #{pmon => emqx_pmon:new()}}. diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index 346e8dcb5..0290b57d3 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -651,10 +651,10 @@ cast(Msg) -> gen_server:cast(?CM, Msg). init([]) -> TabOpts = [public, {write_concurrency, true}], - ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]), - ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]), - ok = emqx_tables:new(?CHAN_INFO_TAB, [ordered_set, compressed | TabOpts]), - ok = emqx_tables:new(?CHAN_LIVE_TAB, [ordered_set, {write_concurrency, true} | TabOpts]), + ok = emqx_utils_ets:new(?CHAN_TAB, [bag, {read_concurrency, true} | TabOpts]), + ok = emqx_utils_ets:new(?CHAN_CONN_TAB, [bag | TabOpts]), + ok = emqx_utils_ets:new(?CHAN_INFO_TAB, [ordered_set, compressed | TabOpts]), + ok = emqx_utils_ets:new(?CHAN_LIVE_TAB, [ordered_set, {write_concurrency, true} | TabOpts]), ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0), State = #{chan_pmon => emqx_pmon:new()}, {ok, State}. diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 2cfed0a8e..46193fb16 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -99,7 +99,7 @@ now_diff(TS) -> erlang:system_time(millisecond) - TS. %%-------------------------------------------------------------------- init([]) -> - ok = emqx_tables:new(?FLAPPING_TAB, [ + ok = emqx_utils_ets:new(?FLAPPING_TAB, [ public, set, {keypos, #flapping.clientid}, diff --git a/apps/emqx/src/emqx_hooks.erl b/apps/emqx/src/emqx_hooks.erl index 1784d8ea3..0b8dc0941 100644 --- a/apps/emqx/src/emqx_hooks.erl +++ b/apps/emqx/src/emqx_hooks.erl @@ -229,7 +229,7 @@ lookup(HookPoint) -> %%-------------------------------------------------------------------- init([]) -> - ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), + ok = emqx_utils_ets:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), {ok, #{}}. handle_call({add, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) -> diff --git a/apps/emqx/src/emqx_metrics.erl b/apps/emqx/src/emqx_metrics.erl index c2e297623..21a114c0f 100644 --- a/apps/emqx/src/emqx_metrics.erl +++ b/apps/emqx/src/emqx_metrics.erl @@ -541,7 +541,7 @@ init([]) -> CRef = counters:new(?MAX_SIZE, [write_concurrency]), ok = persistent_term:put(?MODULE, CRef), % Create index mapping table - ok = emqx_tables:new(?TAB, [{keypos, 2}, {read_concurrency, true}]), + ok = emqx_utils_ets:new(?TAB, [{keypos, 2}, {read_concurrency, true}]), Metrics = lists:append([ ?BYTES_METRICS, ?PACKET_METRICS, diff --git a/apps/emqx/src/emqx_ocsp_cache.erl b/apps/emqx/src/emqx_ocsp_cache.erl index 7044d992e..ccf3e0c40 100644 --- a/apps/emqx/src/emqx_ocsp_cache.erl +++ b/apps/emqx/src/emqx_ocsp_cache.erl @@ -120,7 +120,7 @@ inject_sni_fun(ListenerID, Conf0) -> init(_Args) -> logger:set_process_metadata(#{domain => [emqx, ocsp, cache]}), - emqx_tables:new(?CACHE_TAB, [ + emqx_utils_ets:new(?CACHE_TAB, [ named_table, public, {heir, whereis(emqx_kernel_sup), none}, diff --git a/apps/emqx/src/emqx_sequence.erl b/apps/emqx/src/emqx_sequence.erl index 60596324a..7acc87256 100644 --- a/apps/emqx/src/emqx_sequence.erl +++ b/apps/emqx/src/emqx_sequence.erl @@ -39,7 +39,7 @@ %% @doc Create a sequence. -spec create(name()) -> ok. create(Name) -> - emqx_tables:new(Name, [public, set, {write_concurrency, true}]). + emqx_utils_ets:new(Name, [public, set, {write_concurrency, true}]). %% @doc Next value of the sequence. -spec nextval(name(), key()) -> seqid(). diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 8a21d4d03..0435ddca3 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -95,7 +95,7 @@ create_table(Tab, Storage) -> %%-------------------------------------------------------------------- create_init_tab() -> - emqx_tables:new(?SESSION_INIT_TAB, [ + emqx_utils_ets:new(?SESSION_INIT_TAB, [ public, {read_concurrency, true}, {write_concurrency, true} @@ -182,7 +182,7 @@ pending(SessionID, MarkerIDs) -> call(pick(SessionID), {pending, SessionID, MarkerIDs}). buffer(SessionID, STopic, Msg) -> - case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of + case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of undefined -> ok; Worker -> emqx_session_router_worker:buffer(Worker, STopic, Msg) end. @@ -194,7 +194,7 @@ resume_begin(From, SessionID) when is_pid(From), is_binary(SessionID) -> -spec resume_end(pid(), binary()) -> {'ok', [emqx_types:message()]} | {'error', term()}. resume_end(From, SessionID) when is_pid(From), is_binary(SessionID) -> - case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of + case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of undefined -> ?tp(ps_session_not_found, #{sid => SessionID}), {error, not_found}; @@ -249,7 +249,7 @@ handle_cast({delete_routes, SessionID, Subscriptions}, State) -> ok = lists:foreach(Fun, maps:to_list(Subscriptions)), {noreply, State}; handle_cast({resume_end, SessionID, Pid}, State) -> - case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of + case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of undefined -> skip; P when P =:= Pid -> ets:delete(?SESSION_INIT_TAB, SessionID); P when is_pid(P) -> skip @@ -283,7 +283,7 @@ init_resume_worker(RemotePid, SessionID, #{pmon := Pmon} = State) -> error; {ok, Pid} -> Pmon1 = emqx_pmon:monitor(Pid, Pmon), - case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of + case emqx_utils_ets:lookup_value(?SESSION_INIT_TAB, SessionID) of undefined -> {ok, Pid, State#{pmon => Pmon1}}; {_, OldPid} -> diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index 061f2a42f..d7dc8c5a6 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -399,9 +399,11 @@ init([]) -> ok = mria:wait_for_tables([?TAB]), {ok, _} = mnesia:subscribe({table, ?TAB, simple}), {atomic, PMon} = mria:transaction(?SHARED_SUB_SHARD, fun ?MODULE:init_monitors/0), - ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), - ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), - ok = emqx_tables:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [public, set, {write_concurrency, true}]), + ok = emqx_utils_ets:new(?SHARED_SUBS, [protected, bag]), + ok = emqx_utils_ets:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), + ok = emqx_utils_ets:new(?SHARED_SUBS_ROUND_ROBIN_COUNTER, [ + public, set, {write_concurrency, true} + ]), {ok, update_stats(#state{pmon = PMon})}. init_monitors() -> diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index 017b83116..ef9109e33 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -201,7 +201,7 @@ cast(Msg) -> gen_server:cast(?SERVER, Msg). %%-------------------------------------------------------------------- init(#{tick_ms := TickMs}) -> - ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]), + ok = emqx_utils_ets:new(?TAB, [public, set, {write_concurrency, true}]), Stats = lists:append([ ?CONNECTION_STATS, ?CHANNEL_STATS, diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 5de6da63f..837600811 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -766,9 +766,9 @@ init(Options) -> TabOpts = [public, {write_concurrency, true}], {ChanTab, ConnTab, InfoTab} = cmtabs(GwName), - ok = emqx_tables:new(ChanTab, [bag, {read_concurrency, true} | TabOpts]), - ok = emqx_tables:new(ConnTab, [bag | TabOpts]), - ok = emqx_tables:new(InfoTab, [ordered_set, compressed | TabOpts]), + ok = emqx_utils_ets:new(ChanTab, [bag, {read_concurrency, true} | TabOpts]), + ok = emqx_utils_ets:new(ConnTab, [bag | TabOpts]), + ok = emqx_utils_ets:new(InfoTab, [ordered_set, compressed | TabOpts]), %% Start link cm-registry process %% XXX: Should I hang it under a higher level supervisor? diff --git a/apps/emqx_gateway/src/emqx_gateway_metrics.erl b/apps/emqx_gateway/src/emqx_gateway_metrics.erl index e94510387..0aa2ff210 100644 --- a/apps/emqx_gateway/src/emqx_gateway_metrics.erl +++ b/apps/emqx_gateway/src/emqx_gateway_metrics.erl @@ -89,7 +89,7 @@ tabname(GwName) -> init([GwName]) -> TabOpts = [public, {write_concurrency, true}], - ok = emqx_tables:new(tabname(GwName), [set | TabOpts]), + ok = emqx_utils_ets:new(tabname(GwName), [set | TabOpts]), {ok, #state{}}. handle_call(_Request, _From, State) -> diff --git a/apps/emqx_modules/src/emqx_topic_metrics.erl b/apps/emqx_modules/src/emqx_topic_metrics.erl index dfc6b07ab..de09e568f 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics.erl @@ -201,7 +201,7 @@ reset() -> init([Opts]) -> erlang:process_flag(trap_exit, true), - ok = emqx_tables:new(?TAB, [{read_concurrency, true}]), + ok = emqx_utils_ets:new(?TAB, [{read_concurrency, true}]), erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking), Fun = fun(#{topic := Topic}, CurrentSpeeds) -> diff --git a/apps/emqx/src/emqx_tables.erl b/apps/emqx_utils/src/emqx_utils_ets.erl similarity index 98% rename from apps/emqx/src/emqx_tables.erl rename to apps/emqx_utils/src/emqx_utils_ets.erl index ffdf7d891..099152675 100644 --- a/apps/emqx/src/emqx_tables.erl +++ b/apps/emqx_utils/src/emqx_utils_ets.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_tables). +-module(emqx_utils_ets). -export([ new/1, diff --git a/apps/emqx/test/emqx_tables_SUITE.erl b/apps/emqx_utils/test/emqx_utils_ets_SUITE.erl similarity index 73% rename from apps/emqx/test/emqx_tables_SUITE.erl rename to apps/emqx_utils/test/emqx_utils_ets_SUITE.erl index ad53e7139..13bf427fd 100644 --- a/apps/emqx/test/emqx_tables_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_ets_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_tables_SUITE). +-module(emqx_utils_ets_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -26,19 +26,19 @@ all() -> emqx_common_test_helpers:all(?MODULE). t_new(_) -> - ok = emqx_tables:new(?TAB), - ok = emqx_tables:new(?TAB, [{read_concurrency, true}]), + ok = emqx_utils_ets:new(?TAB), + ok = emqx_utils_ets:new(?TAB, [{read_concurrency, true}]), ?assertEqual(?TAB, ets:info(?TAB, name)). t_lookup_value(_) -> - ok = emqx_tables:new(?TAB, []), + ok = emqx_utils_ets:new(?TAB, []), true = ets:insert(?TAB, {key, val}), - ?assertEqual(val, emqx_tables:lookup_value(?TAB, key)), - ?assertEqual(undefined, emqx_tables:lookup_value(?TAB, badkey)). + ?assertEqual(val, emqx_utils_ets:lookup_value(?TAB, key)), + ?assertEqual(undefined, emqx_utils_ets:lookup_value(?TAB, badkey)). t_delete(_) -> - ok = emqx_tables:new(?TAB, []), + ok = emqx_utils_ets:new(?TAB, []), ?assertEqual(?TAB, ets:info(?TAB, name)), - ok = emqx_tables:delete(?TAB), - ok = emqx_tables:delete(?TAB), + ok = emqx_utils_ets:delete(?TAB), + ok = emqx_utils_ets:delete(?TAB), ?assertEqual(undefined, ets:info(?TAB, name)). diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 43ccd9fc1..325d3d499 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -68,7 +68,7 @@ trace_rule(Data, Envs, _Args) -> make_trace_fn_action() -> persistent_term:put({?MODULE, test_pid}, self()), Fn = <<(atom_to_binary(?MODULE))/binary, ":trace_rule">>, - emqx_tables:new(recorded_actions, [named_table, public, ordered_set]), + emqx_utils_ets:new(recorded_actions, [named_table, public, ordered_set]), #{function => Fn, args => #{}}. create_rule_http(RuleParams) ->