From 670709f746a7820dd0d981dc170c5ae337a6777d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 19 Apr 2023 14:32:03 +0300 Subject: [PATCH] feat(resource): ensure uniqueness through `gproc` Also use it instead of a custom ETS table for simplicity and better consistency. This has drawbacks though: expect slightly increased load on gproc gen_server due to how `gproc:set_value/2` works. --- .../src/emqx_resource_manager.erl | 79 +++++++++---------- .../src/emqx_resource_manager_sup.erl | 4 - .../test/emqx_resource_SUITE.erl | 32 ++++---- 3 files changed, 53 insertions(+), 62 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index c8be34f87..f42d3c1b5 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -57,7 +57,9 @@ }). -type data() :: #data{}. --define(ETS_TABLE, ?MODULE). +-define(NAME(ResId), {n, l, {?MODULE, ResId}}). +-define(REF(ResId), {via, gproc, ?NAME(ResId)}). + -define(WAIT_FOR_RESOURCE_DELAY, 100). -define(T_OPERATION, 5000). -define(T_LOOKUP, 1000). @@ -229,10 +231,11 @@ lookup(ResId) -> %% @doc Lookup the group and data of a resource from the cache -spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. lookup_cached(ResId) -> - case read_cache(ResId) of - {Group, Data} -> - {ok, Group, data_record_to_external_map(Data)}; - not_found -> + try read_cache(ResId) of + Data = #data{group = Group} -> + {ok, Group, data_record_to_external_map(Data)} + catch + error:badarg -> {error, not_found} end. @@ -248,20 +251,16 @@ reset_metrics(ResId) -> %% @doc Returns the data for all resources -spec list_all() -> [resource_data()]. list_all() -> - try - [ - data_record_to_external_map(Data) - || {_Id, _Group, Data} <- ets:tab2list(?ETS_TABLE) - ] - catch - error:badarg -> [] - end. + lists:map( + fun data_record_to_external_map/1, + gproc:select({local, names}, [{{?NAME('_'), '_', '$1'}, [], ['$1']}]) + ). %% @doc Returns a list of ids for all the resources in a group -spec list_group(resource_group()) -> [resource_id()]. list_group(Group) -> - List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), - lists:flatten(List). + Guard = {'==', {element, #data.group, '$1'}, Group}, + gproc:select({local, names}, [{{?NAME('$2'), '_', '$1'}, [Guard], ['$2']}]). -spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}. health_check(ResId) -> @@ -286,7 +285,7 @@ start_link(ResId, Group, ResourceType, Config, Opts) -> state = undefined, error = undefined }, - gen_statem:start_link(?MODULE, {Data, Opts}, []). + gen_statem:start_link(?REF(ResId), ?MODULE, {Data, Opts}, []). init({DataIn, Opts}) -> process_flag(trap_exit, true), @@ -306,7 +305,7 @@ terminate({shutdown, removed}, _State, _Data) -> ok; terminate(_Reason, _State, Data) -> _ = maybe_stop_resource(Data), - ok = delete_cache(Data#data.id), + _ = erase_cache(Data), ok. %% Behavior callback @@ -401,9 +400,9 @@ log_state_consistency(State, Data) -> data => Data }). -log_cache_consistency({_, Data}, Data) -> +log_cache_consistency(Data, Data) -> ok; -log_cache_consistency({_, DataCached}, Data) -> +log_cache_consistency(DataCached, Data) -> ?tp(warning, "inconsistent_cache", #{ cache => DataCached, data => Data @@ -412,22 +411,21 @@ log_cache_consistency({_, DataCached}, Data) -> %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ -insert_cache(ResId, Group, Data = #data{}) -> - ets:insert(?ETS_TABLE, {ResId, Group, Data}). +insert_cache(ResId, Data = #data{}) -> + gproc:set_value(?NAME(ResId), Data). read_cache(ResId) -> - case ets:lookup(?ETS_TABLE, ResId) of - [{_Id, Group, Data}] -> {Group, Data}; - [] -> not_found - end. + gproc:lookup_value(?NAME(ResId)). -delete_cache(<> = ResId) -> - true = ets:delete(?ETS_TABLE, {owner, ResId}), - true = ets:delete(?ETS_TABLE, ResId), - ok; -delete_cache(ResId) -> - true = ets:delete(?ETS_TABLE, ResId), - ok. +erase_cache(_Data = #data{id = ResId}) -> + gproc:unreg(?NAME(ResId)). + +try_read_cache(ResId) -> + try + read_cache(ResId) + catch + error:badarg -> not_found + end. retry_actions(Data) -> case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of @@ -442,12 +440,12 @@ health_check_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> _ = stop_resource(Data), - ok = delete_cache(Data#data.id), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, + _ = erase_cache(Data), {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}. start_resource(Data, From) -> @@ -552,7 +550,7 @@ update_state(Data) -> update_state(DataWas, DataWas) -> DataWas; update_state(Data, _DataWas) -> - _ = insert_cache(Data#data.id, Data#data.group, Data), + _ = insert_cache(Data#data.id, Data), Data. health_check_interval(Opts) -> @@ -642,10 +640,10 @@ wait_for_ready(ResId, WaitTime) -> do_wait_for_ready(_ResId, 0) -> timeout; do_wait_for_ready(ResId, Retry) -> - case read_cache(ResId) of - {_Group, #data{status = connected}} -> + case try_read_cache(ResId) of + #data{status = connected} -> ok; - {_Group, #data{status = disconnected, error = Err}} -> + #data{status = disconnected, error = Err} -> {error, external_error(Err)}; _ -> timer:sleep(?WAIT_FOR_RESOURCE_DELAY), @@ -654,12 +652,7 @@ do_wait_for_ready(ResId, Retry) -> safe_call(ResId, Message, Timeout) -> try - case read_cache(ResId) of - not_found -> - {error, not_found}; - {_, #data{pid = ManagerPid}} -> - gen_statem:call(ManagerPid, Message, {clean_timeout, Timeout}) - end + gen_statem:call(?REF(ResId), Message, {clean_timeout, Timeout}) catch error:badarg -> {error, not_found}; diff --git a/apps/emqx_resource/src/emqx_resource_manager_sup.erl b/apps/emqx_resource/src/emqx_resource_manager_sup.erl index b27c46739..2f442cd56 100644 --- a/apps/emqx_resource/src/emqx_resource_manager_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_manager_sup.erl @@ -31,9 +31,6 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - TabOpts = [named_table, set, public, {read_concurrency, true}], - _ = ets:new(emqx_resource_manager, TabOpts), - ChildSpecs = [ #{ id => emqx_resource_manager, @@ -44,6 +41,5 @@ init([]) -> modules => [emqx_resource_manager] } ], - SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10}, {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 34781df6c..6fd5a552e 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -1055,28 +1055,22 @@ t_list_filter(_) -> ). t_create_dry_run_local(_) -> - ets:match_delete(emqx_resource_manager, {{owner, '$1'}, '_'}), lists:foreach( fun(_) -> create_dry_run_local_succ() end, lists:seq(1, 10) ), - case [] =:= ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}) of - false -> - %% Sleep to remove flakyness in test case. It take some time for - %% the ETS table to be cleared. - timer:sleep(2000), - [] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}); - true -> - ok - end. + ?retry( + 100, + 5, + ?assertEqual( + [], + emqx_resource:list_instances_verbose() + ) + ). create_dry_run_local_succ() -> - case whereis(test_resource) of - undefined -> ok; - Pid -> exit(Pid, kill) - end, ?assertEqual( ok, emqx_resource:create_dry_run_local( @@ -1107,7 +1101,15 @@ t_create_dry_run_local_failed(_) -> ?TEST_RESOURCE, #{name => test_resource, stop_error => true} ), - ?assertEqual(ok, Res3). + ?assertEqual(ok, Res3), + ?retry( + 100, + 5, + ?assertEqual( + [], + emqx_resource:list_instances_verbose() + ) + ). t_test_func(_) -> ?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),