feat(resource): ensure uniqueness through `gproc`

Also use it instead of a custom ETS table for simplicity and
better consistency. This has drawbacks though: expect slightly
increased load on gproc gen_server due to how `gproc:set_value/2`
works.
This commit is contained in:
Andrew Mayorov 2023-04-19 14:32:03 +03:00
parent 4575167607
commit 670709f746
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 53 additions and 62 deletions

View File

@ -57,7 +57,9 @@
}).
-type data() :: #data{}.
-define(ETS_TABLE, ?MODULE).
-define(NAME(ResId), {n, l, {?MODULE, ResId}}).
-define(REF(ResId), {via, gproc, ?NAME(ResId)}).
-define(WAIT_FOR_RESOURCE_DELAY, 100).
-define(T_OPERATION, 5000).
-define(T_LOOKUP, 1000).
@ -229,10 +231,11 @@ lookup(ResId) ->
%% @doc Lookup the group and data of a resource from the cache
-spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
lookup_cached(ResId) ->
case read_cache(ResId) of
{Group, Data} ->
{ok, Group, data_record_to_external_map(Data)};
not_found ->
try read_cache(ResId) of
Data = #data{group = Group} ->
{ok, Group, data_record_to_external_map(Data)}
catch
error:badarg ->
{error, not_found}
end.
@ -248,20 +251,16 @@ reset_metrics(ResId) ->
%% @doc Returns the data for all resources
-spec list_all() -> [resource_data()].
list_all() ->
try
[
data_record_to_external_map(Data)
|| {_Id, _Group, Data} <- ets:tab2list(?ETS_TABLE)
]
catch
error:badarg -> []
end.
lists:map(
fun data_record_to_external_map/1,
gproc:select({local, names}, [{{?NAME('_'), '_', '$1'}, [], ['$1']}])
).
%% @doc Returns a list of ids for all the resources in a group
-spec list_group(resource_group()) -> [resource_id()].
list_group(Group) ->
List = ets:match(?ETS_TABLE, {'$1', Group, '_'}),
lists:flatten(List).
Guard = {'==', {element, #data.group, '$1'}, Group},
gproc:select({local, names}, [{{?NAME('$2'), '_', '$1'}, [Guard], ['$2']}]).
-spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}.
health_check(ResId) ->
@ -286,7 +285,7 @@ start_link(ResId, Group, ResourceType, Config, Opts) ->
state = undefined,
error = undefined
},
gen_statem:start_link(?MODULE, {Data, Opts}, []).
gen_statem:start_link(?REF(ResId), ?MODULE, {Data, Opts}, []).
init({DataIn, Opts}) ->
process_flag(trap_exit, true),
@ -306,7 +305,7 @@ terminate({shutdown, removed}, _State, _Data) ->
ok;
terminate(_Reason, _State, Data) ->
_ = maybe_stop_resource(Data),
ok = delete_cache(Data#data.id),
_ = erase_cache(Data),
ok.
%% Behavior callback
@ -401,9 +400,9 @@ log_state_consistency(State, Data) ->
data => Data
}).
log_cache_consistency({_, Data}, Data) ->
log_cache_consistency(Data, Data) ->
ok;
log_cache_consistency({_, DataCached}, Data) ->
log_cache_consistency(DataCached, Data) ->
?tp(warning, "inconsistent_cache", #{
cache => DataCached,
data => Data
@ -412,22 +411,21 @@ log_cache_consistency({_, DataCached}, Data) ->
%%------------------------------------------------------------------------------
%% internal functions
%%------------------------------------------------------------------------------
insert_cache(ResId, Group, Data = #data{}) ->
ets:insert(?ETS_TABLE, {ResId, Group, Data}).
insert_cache(ResId, Data = #data{}) ->
gproc:set_value(?NAME(ResId), Data).
read_cache(ResId) ->
case ets:lookup(?ETS_TABLE, ResId) of
[{_Id, Group, Data}] -> {Group, Data};
[] -> not_found
end.
gproc:lookup_value(?NAME(ResId)).
delete_cache(<<?TEST_ID_PREFIX, _/binary>> = ResId) ->
true = ets:delete(?ETS_TABLE, {owner, ResId}),
true = ets:delete(?ETS_TABLE, ResId),
ok;
delete_cache(ResId) ->
true = ets:delete(?ETS_TABLE, ResId),
ok.
erase_cache(_Data = #data{id = ResId}) ->
gproc:unreg(?NAME(ResId)).
try_read_cache(ResId) ->
try
read_cache(ResId)
catch
error:badarg -> not_found
end.
retry_actions(Data) ->
case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
@ -442,12 +440,12 @@ health_check_actions(Data) ->
handle_remove_event(From, ClearMetrics, Data) ->
_ = stop_resource(Data),
ok = delete_cache(Data#data.id),
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
false -> ok
end,
_ = erase_cache(Data),
{stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}.
start_resource(Data, From) ->
@ -552,7 +550,7 @@ update_state(Data) ->
update_state(DataWas, DataWas) ->
DataWas;
update_state(Data, _DataWas) ->
_ = insert_cache(Data#data.id, Data#data.group, Data),
_ = insert_cache(Data#data.id, Data),
Data.
health_check_interval(Opts) ->
@ -642,10 +640,10 @@ wait_for_ready(ResId, WaitTime) ->
do_wait_for_ready(_ResId, 0) ->
timeout;
do_wait_for_ready(ResId, Retry) ->
case read_cache(ResId) of
{_Group, #data{status = connected}} ->
case try_read_cache(ResId) of
#data{status = connected} ->
ok;
{_Group, #data{status = disconnected, error = Err}} ->
#data{status = disconnected, error = Err} ->
{error, external_error(Err)};
_ ->
timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
@ -654,12 +652,7 @@ do_wait_for_ready(ResId, Retry) ->
safe_call(ResId, Message, Timeout) ->
try
case read_cache(ResId) of
not_found ->
{error, not_found};
{_, #data{pid = ManagerPid}} ->
gen_statem:call(ManagerPid, Message, {clean_timeout, Timeout})
end
gen_statem:call(?REF(ResId), Message, {clean_timeout, Timeout})
catch
error:badarg ->
{error, not_found};

View File

@ -31,9 +31,6 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
TabOpts = [named_table, set, public, {read_concurrency, true}],
_ = ets:new(emqx_resource_manager, TabOpts),
ChildSpecs = [
#{
id => emqx_resource_manager,
@ -44,6 +41,5 @@ init([]) ->
modules => [emqx_resource_manager]
}
],
SupFlags = #{strategy => simple_one_for_one, intensity => 10, period => 10},
{ok, {SupFlags, ChildSpecs}}.

View File

@ -1055,28 +1055,22 @@ t_list_filter(_) ->
).
t_create_dry_run_local(_) ->
ets:match_delete(emqx_resource_manager, {{owner, '$1'}, '_'}),
lists:foreach(
fun(_) ->
create_dry_run_local_succ()
end,
lists:seq(1, 10)
),
case [] =:= ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}) of
false ->
%% Sleep to remove flakyness in test case. It take some time for
%% the ETS table to be cleared.
timer:sleep(2000),
[] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'});
true ->
ok
end.
?retry(
100,
5,
?assertEqual(
[],
emqx_resource:list_instances_verbose()
)
).
create_dry_run_local_succ() ->
case whereis(test_resource) of
undefined -> ok;
Pid -> exit(Pid, kill)
end,
?assertEqual(
ok,
emqx_resource:create_dry_run_local(
@ -1107,7 +1101,15 @@ t_create_dry_run_local_failed(_) ->
?TEST_RESOURCE,
#{name => test_resource, stop_error => true}
),
?assertEqual(ok, Res3).
?assertEqual(ok, Res3),
?retry(
100,
5,
?assertEqual(
[],
emqx_resource:list_instances_verbose()
)
).
t_test_func(_) ->
?assertEqual(ok, erlang:apply(emqx_resource_validator:not_empty("not_empty"), [<<"someval">>])),