Merge pull request #6989 from EMQ-YangM/up_master

refactor(emqx_resource): Improve grouping strategy for emqx_resource_instance
This commit is contained in:
Yang Miao 2022-02-16 16:26:28 +08:00 committed by GitHub
commit 78cad0a528
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 142 additions and 118 deletions

View File

@ -35,3 +35,5 @@
-define(CONF_NS_BINARY, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY).
-endif.
-define(RESOURCE_GROUP, <<"emqx_authn">>).

View File

@ -17,6 +17,7 @@
-module(emqx_authn_utils).
-include_lib("emqx/include/emqx_placeholder.hrl").
-include_lib("emqx_authn.hrl").
-export([ check_password_from_selected_map/3
, parse_deep/1
@ -32,8 +33,6 @@
, make_resource_id/1
]).
-define(RESOURCE_GROUP, <<"emqx_authn">>).
-define(AUTHN_PLACEHOLDERS, [?PH_USERNAME,
?PH_CLIENTID,
?PH_PASSWORD,
@ -120,7 +119,7 @@ cleanup_resources() ->
make_resource_id(Name) ->
NameBin = bin(Name),
emqx_resource:generate_id(?RESOURCE_GROUP, NameBin).
emqx_resource:generate_id(NameBin).
%%------------------------------------------------------------------------------
%% Internal functions

View File

@ -134,6 +134,7 @@ create(#{method := Method,
request_timeout => RequestTimeout,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId,
?RESOURCE_GROUP,
emqx_connector_http,
Config#{base_url => maps:remove(query, URIMap),
pool_type => random}) of

View File

@ -112,7 +112,7 @@ create(#{selector := Selector} = Config) ->
NState = State#{
selector_template => SelectorTemplate,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, emqx_connector_mongo, Config) of
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_mongo, Config) of
{ok, already_created} ->
{ok, NState};
{ok, _} ->

View File

@ -81,7 +81,7 @@ create(#{password_hash_algorithm := Algorithm,
placeholders => PlaceHolders,
query_timeout => QueryTimeout,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, emqx_connector_mysql, Config) of
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_mysql, Config) of
{ok, already_created} ->
{ok, State};
{ok, _} ->

View File

@ -79,7 +79,7 @@ create(#{query := Query0,
State = #{placeholders => PlaceHolders,
password_hash_algorithm => Algorithm,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}) of
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}) of
{ok, already_created} ->
{ok, State};
{ok, _} ->

View File

@ -91,7 +91,7 @@ create(#{cmd := Cmd,
NState = State#{
cmd => NCmd,
resource_id => ResourceId},
case emqx_resource:create_local(ResourceId, emqx_connector_redis, Config) of
case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_redis, Config) of
{ok, already_created} ->
{ok, NState};
{ok, _} ->

View File

@ -59,6 +59,7 @@ init_per_suite(Config) ->
ok = start_apps([emqx_resource, emqx_connector]),
{ok, _} = emqx_resource:create_local(
?MYSQL_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_mysql,
mysql_config()),
Config;

View File

@ -60,6 +60,7 @@ init_per_suite(Config) ->
ok = start_apps([emqx_resource, emqx_connector]),
{ok, _} = emqx_resource:create_local(
?PGSQL_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_pgsql,
pgsql_config()),
Config;

View File

@ -59,6 +59,7 @@ init_per_suite(Config) ->
ok = start_apps([emqx_resource, emqx_connector]),
{ok, _} = emqx_resource:create_local(
?REDIS_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_redis,
redis_config()),
Config;

View File

@ -71,3 +71,5 @@
, limit => 100
, count => 1
}).
-define(RESOURCE_GROUP, <<"emqx_authz">>).

View File

@ -53,6 +53,7 @@ init(#{query := SQL0} = Source) ->
ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql),
case emqx_resource:create_local(
ResourceID,
?RESOURCE_GROUP,
emqx_connector_pgsql,
Source#{named_queries => #{ResourceID => SQL}}) of
{ok, _} ->

View File

@ -17,6 +17,7 @@
-module(emqx_authz_utils).
-include_lib("emqx/include/emqx_placeholder.hrl").
-include_lib("emqx_authz.hrl").
-export([ cleanup_resources/0
, make_resource_id/1
@ -28,15 +29,13 @@
, render_sql_params/2
]).
-define(RESOURCE_GROUP, <<"emqx_authz">>).
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
create_resource(Module, Config) ->
ResourceID = make_resource_id(Module),
case emqx_resource:create_local(ResourceID, Module, Config) of
case emqx_resource:create_local(ResourceID, ?RESOURCE_GROUP, Module, Config) of
{ok, already_created} -> {ok, ResourceID};
{ok, _} -> {ok, ResourceID};
{error, Reason} -> {error, Reason}
@ -49,7 +48,7 @@ cleanup_resources() ->
make_resource_id(Name) ->
NameBin = bin(Name),
emqx_resource:generate_id(?RESOURCE_GROUP, NameBin).
emqx_resource:generate_id(NameBin).
update_config(Path, ConfigRequest) ->
emqx_conf:update(Path, ConfigRequest, #{rawconf_with_defaults => true,

View File

@ -31,7 +31,7 @@ groups() ->
init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, remove_local, fun(_) -> ok end),
meck:expect(emqx_resource, create_dry_run_local, fun(_, _) -> ok end),

View File

@ -99,7 +99,7 @@ groups() ->
init_per_suite(Config) ->
meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end),
meck:expect(emqx_resource, create_dry_run_local,
fun(emqx_connector_mysql, _) -> ok;
(emqx_connector_mongo, _) -> ok;

View File

@ -42,6 +42,7 @@ init_per_suite(Config) ->
ok = start_apps([emqx_resource, emqx_connector]),
{ok, _} = emqx_resource:create_local(
?MYSQL_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_mysql,
mysql_config()),
Config;

View File

@ -42,6 +42,7 @@ init_per_suite(Config) ->
ok = start_apps([emqx_resource, emqx_connector]),
{ok, _} = emqx_resource:create_local(
?PGSQL_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_pgsql,
pgsql_config()),
Config;

View File

@ -43,6 +43,7 @@ init_per_suite(Config) ->
ok = start_apps([emqx_resource, emqx_connector]),
{ok, _} = emqx_resource:create_local(
?REDIS_RESOURCE,
?RESOURCE_GROUP,
emqx_connector_redis,
redis_config()),
Config;

View File

@ -202,7 +202,7 @@ lookup(Type, Name) ->
lookup(Type, Name, RawConf) ->
case emqx_resource:get_instance(resource_id(Type, Name)) of
{error, not_found} -> {error, not_found};
{ok, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data,
{ok, _, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data,
raw_config => RawConf}}
end.
@ -222,7 +222,7 @@ create(BridgeId, Conf) ->
create(Type, Name, Conf) ->
?SLOG(info, #{msg => "create bridge", type => Type, name => Name,
config => Conf}),
case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type),
case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>, emqx_bridge:resource_type(Type),
parse_confs(Type, Name, Conf), #{async_create => true}) of
{ok, already_created} -> maybe_disable_bridge(Type, Name, Conf);
{ok, _} -> maybe_disable_bridge(Type, Name, Conf);

View File

@ -32,10 +32,10 @@
%% APIs for instances
-export([ check_config/2
, check_and_create/3
, check_and_create/4
, check_and_create_local/3
, check_and_create/5
, check_and_create_local/4
, check_and_create_local/5
, check_and_recreate/4
, check_and_recreate_local/4
]).
@ -43,10 +43,10 @@
%% Sync resource instances and files
%% provisional solution: rpc:multical to all the nodes for creating/updating/removing
%% todo: replicate operations
-export([ create/3 %% store the config and start the instance
, create/4
, create_local/3
-export([ create/4 %% store the config and start the instance
, create/5
, create_local/4
, create_local/5
, create_dry_run/2 %% run start/2, health_check/2 and stop/1 sequentially
, create_dry_run_local/2
, recreate/4 %% this will do create_dry_run, stop the old instance and start a new one
@ -77,12 +77,9 @@
, get_instance/1 %% return the data of the instance
, list_instances_by_type/1 %% return all the instances of the same resource type
, generate_id/1
, generate_id/2
, list_group_instances/1
]).
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
-optional_callbacks([ on_query/4
, on_health_check/2
]).
@ -134,25 +131,26 @@ apply_query_after_calls(Funcs) ->
%% =================================================================================
%% APIs for resource instances
%% =================================================================================
-spec create(instance_id(), resource_type(), resource_config()) ->
-spec create(instance_id(), resource_group(), resource_type(), resource_config()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, ResourceType, Config) ->
create(InstId, ResourceType, Config, #{}).
create(InstId, Group, ResourceType, Config) ->
create(InstId, Group, ResourceType, Config, #{}).
-spec create(instance_id(), resource_type(), resource_config(), create_opts()) ->
-spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, ResourceType, Config, Opts) ->
wrap_rpc(emqx_resource_proto_v1:create(InstId, ResourceType, Config, Opts)).
create(InstId, Group, ResourceType, Config, Opts) ->
wrap_rpc(emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts)).
% --------------------------------------------
-spec create_local(instance_id(), resource_type(), resource_config()) ->
-spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create_local(InstId, ResourceType, Config) ->
create_local(InstId, ResourceType, Config, #{}).
create_local(InstId, Group, ResourceType, Config) ->
create_local(InstId, Group, ResourceType, Config, #{}).
-spec create_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
-spec create_local(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create_local(InstId, ResourceType, Config, Opts) ->
call_instance(InstId, {create, InstId, ResourceType, Config, Opts}).
create_local(InstId, Group, ResourceType, Config, Opts) ->
call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}).
-spec create_dry_run(resource_type(), resource_config()) ->
ok | {error, Reason :: term()}.
@ -192,13 +190,13 @@ query(InstId, Request) ->
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
query(InstId, Request, AfterQuery) ->
case get_instance(InstId) of
{ok, #{status := starting}} ->
{ok, _Group, #{status := starting}} ->
query_error(starting, <<"cannot serve query when the resource "
"instance is still starting">>);
{ok, #{status := stopped}} ->
{ok, _Group, #{status := stopped}} ->
query_error(stopped, <<"cannot serve query when the resource "
"instance is stopped">>);
{ok, #{mod := Mod, state := ResourceState, status := started}} ->
{ok, _Group, #{mod := Mod, state := ResourceState, status := started}} ->
%% the resource state is readonly to Module:on_query/4
%% and the `after_query()` functions should be thread safe
ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched),
@ -230,7 +228,7 @@ health_check(InstId) ->
set_resource_status_stoped(InstId) ->
call_instance(InstId, {set_resource_status_stoped, InstId}).
-spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}.
-spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
get_instance(InstId) ->
emqx_resource_instance:lookup(InstId).
@ -250,21 +248,11 @@ list_instances_by_type(ResourceType) ->
-spec generate_id(term()) -> instance_id().
generate_id(Name) when is_binary(Name) ->
generate_id(?DEFAULT_RESOURCE_GROUP, Name).
-spec generate_id(resource_group(), binary()) -> instance_id().
generate_id(Group, Name) when is_binary(Group) and is_binary(Name) ->
Id = integer_to_binary(erlang:unique_integer([positive])),
<<Group/binary, "/", Name/binary, ":", Id/binary>>.
<<Name/binary, ":", Id/binary>>.
-spec list_group_instances(resource_group()) -> [instance_id()].
list_group_instances(Group) ->
filter_instances(fun(Id, _) ->
case binary:split(Id, <<"/">>) of
[Group | _] -> true;
_ -> false
end
end).
list_group_instances(Group) -> emqx_resource_instance:list_group(Group).
-spec call_start(instance_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}.
@ -285,27 +273,27 @@ call_stop(InstId, Mod, ResourceState) ->
check_config(ResourceType, Conf) ->
emqx_hocon:check(ResourceType, Conf).
-spec check_and_create(instance_id(), resource_type(), raw_resource_config()) ->
-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config()) ->
{ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(InstId, ResourceType, RawConfig) ->
check_and_create(InstId, ResourceType, RawConfig, #{}).
check_and_create(InstId, Group, ResourceType, RawConfig) ->
check_and_create(InstId, Group, ResourceType, RawConfig, #{}).
-spec check_and_create(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config(), create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(InstId, ResourceType, RawConfig, Opts) ->
check_and_create(InstId, Group, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create(InstId, ResourceType, InstConf, Opts) end).
fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end).
-spec check_and_create_local(instance_id(), resource_type(), raw_resource_config()) ->
-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config()) ->
{ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, ResourceType, RawConfig) ->
check_and_create_local(InstId, ResourceType, RawConfig, #{}).
check_and_create_local(InstId, Group, ResourceType, RawConfig) ->
check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}).
-spec check_and_create_local(instance_id(), resource_type(), raw_resource_config(),
-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config(),
create_opts()) -> {ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, ResourceType, RawConfig, Opts) ->
check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) ->
check_and_do(ResourceType, RawConfig,
fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end).
fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end).
-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
{ok, resource_data()} | {error, term()}.

View File

@ -26,6 +26,7 @@
-export([ lookup/1
, get_metrics/1
, list_all/0
, list_group/1
]).
-export([ hash_call/2
@ -61,12 +62,12 @@ hash_call(InstId, Request) ->
hash_call(InstId, Request, Timeout) ->
gen_server:call(pick(InstId), Request, Timeout).
-spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}.
-spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
lookup(InstId) ->
case ets:lookup(emqx_resource_instance, InstId) of
[] -> {error, not_found};
[{_, Data}] ->
{ok, Data#{id => InstId, metrics => get_metrics(InstId)}}
[{_, Group, Data}] ->
{ok, Group, Data#{id => InstId, metrics => get_metrics(InstId)}}
end.
make_test_id() ->
@ -77,17 +78,22 @@ get_metrics(InstId) ->
emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
force_lookup(InstId) ->
{ok, Data} = lookup(InstId),
{ok, _Group, Data} = lookup(InstId),
Data.
-spec list_all() -> [resource_data()].
list_all() ->
try
[Data#{id => Id} || {Id, Data} <- ets:tab2list(emqx_resource_instance)]
[Data#{id => Id} || {Id, _Group, Data} <- ets:tab2list(emqx_resource_instance)]
catch
error:badarg -> []
end.
-spec list_group(resource_group()) -> [instance_id()].
list_group(Group) ->
List = ets:match(emqx_resource_instance, {'$1', Group, '_'}),
lists:map(fun([A|_]) -> A end, List).
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
@ -99,8 +105,8 @@ init({Pool, Id}) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{worker_pool = Pool, worker_id = Id}}.
handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) ->
{reply, do_create(InstId, ResourceType, Config, Opts), State};
handle_call({create, InstId, Group, ResourceType, Config, Opts}, _From, State) ->
{reply, do_create(InstId, Group, ResourceType, Config, Opts), State};
handle_call({create_dry_run, ResourceType, Config}, _From, State) ->
{reply, do_create_dry_run(ResourceType, Config), State};
@ -143,41 +149,41 @@ code_change(_OldVsn, State, _Extra) ->
%% suppress the race condition check, as these functions are protected in gproc workers
-dialyzer({nowarn_function, [ do_recreate/4
, do_create/4
, do_create/5
, do_restart/2
, do_start/4
, do_start/5
, do_stop/1
, do_health_check/1
, start_and_check/5
, start_and_check/6
]}).
do_recreate(InstId, ResourceType, NewConfig, Opts) ->
case lookup(InstId) of
{ok, #{mod := ResourceType, status := started} = Data} ->
{ok, Group, #{mod := ResourceType, status := started} = Data} ->
%% If this resource is in use (status='started'), we should make sure
%% the new config is OK before removing the old one.
case do_create_dry_run(ResourceType, NewConfig) of
ok ->
do_remove(Data, false),
do_create(InstId, ResourceType, NewConfig, Opts);
do_remove(Group, Data, false),
do_create(InstId, Group, ResourceType, NewConfig, Opts);
Error ->
Error
end;
{ok, #{mod := ResourceType, status := _} = Data} ->
do_remove(Data, false),
do_create(InstId, ResourceType, NewConfig, Opts);
{ok, #{mod := Mod}} when Mod =/= ResourceType ->
{ok, Group, #{mod := ResourceType, status := _} = Data} ->
do_remove(Group, Data, false),
do_create(InstId, Group, ResourceType, NewConfig, Opts);
{ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
{error, updating_to_incorrect_resource_type};
{error, not_found} ->
{error, not_found}
end.
do_create(InstId, ResourceType, Config, Opts) ->
do_create(InstId, Group, ResourceType, Config, Opts) ->
case lookup(InstId) of
{ok, _} ->
{ok,_, _} ->
{ok, already_created};
{error, not_found} ->
case do_start(InstId, ResourceType, Config, Opts) of
case do_start(InstId, Group, ResourceType, Config, Opts) of
ok ->
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
[matched, success, failed, exception], [matched]),
@ -207,9 +213,10 @@ do_remove(Instance) ->
do_remove(Instance, true).
do_remove(InstId, ClearMetrics) when is_binary(InstId) ->
do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]);
do_remove(#{id := InstId} = Data, ClearMetrics) ->
_ = do_stop(Data),
do_with_group_and_instance_data(InstId, fun do_remove/3, [ClearMetrics]).
do_remove(Group, #{id := InstId} = Data, ClearMetrics) ->
_ = do_stop(Group, Data),
ets:delete(emqx_resource_instance, InstId),
case ClearMetrics of
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
@ -219,42 +226,42 @@ do_remove(#{id := InstId} = Data, ClearMetrics) ->
do_restart(InstId, Opts) ->
case lookup(InstId) of
{ok, #{mod := ResourceType, config := Config} = Data} ->
ok = do_stop(Data),
do_start(InstId, ResourceType, Config, Opts);
{ok, Group, #{mod := ResourceType, config := Config} = Data} ->
ok = do_stop(Group, Data),
do_start(InstId, Group, ResourceType, Config, Opts);
Error ->
Error
end.
do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) ->
do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
InitData = #{id => InstId, mod => ResourceType, config => Config,
status => starting, state => undefined},
%% The `emqx_resource:call_start/3` need the instance exist beforehand
ets:insert(emqx_resource_instance, {InstId, InitData}),
ets:insert(emqx_resource_instance, {InstId, Group, InitData}),
case maps:get(async_create, Opts, false) of
false ->
start_and_check(InstId, ResourceType, Config, Opts, InitData);
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData);
true ->
spawn(fun() ->
start_and_check(InstId, ResourceType, Config, Opts, InitData)
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
end),
ok
end.
start_and_check(InstId, ResourceType, Config, Opts, Data) ->
start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->
case emqx_resource:call_start(InstId, ResourceType, Config) of
{ok, ResourceState} ->
Data2 = Data#{state => ResourceState},
ets:insert(emqx_resource_instance, {InstId, Data2}),
ets:insert(emqx_resource_instance, {InstId, Group, Data2}),
case maps:get(async_create, Opts, false) of
false -> case do_health_check(Data2) of
false -> case do_health_check(Group, Data2) of
ok -> create_default_checker(InstId, Opts);
{error, Reason} -> {error, Reason}
end;
true -> create_default_checker(InstId, Opts)
end;
{error, Reason} ->
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}),
{error, Reason}
end.
@ -264,37 +271,39 @@ create_default_checker(InstId, Opts) ->
maps:get(health_check_timeout, Opts, 10000)).
do_stop(InstId) when is_binary(InstId) ->
do_with_instance_data(InstId, fun do_stop/1, []);
do_stop(#{state := undefined}) ->
do_with_group_and_instance_data(InstId, fun do_stop/2, []).
do_stop(_Group, #{state := undefined}) ->
ok;
do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) ->
do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) ->
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
_ = emqx_resource_health_check:delete_checker(InstId),
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}),
ok.
do_health_check(InstId) when is_binary(InstId) ->
do_with_instance_data(InstId, fun do_health_check/1, []);
do_health_check(#{state := undefined}) ->
do_with_group_and_instance_data(InstId, fun do_health_check/2, []).
do_health_check(_Group, #{state := undefined}) ->
{error, resource_not_initialized};
do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
{ok, ResourceState1} ->
ets:insert(emqx_resource_instance,
{InstId, Data#{status => started, state => ResourceState1}}),
{InstId, Group, Data#{status => started, state => ResourceState1}}),
ok;
{error, Reason, ResourceState1} ->
logger:error("health check for ~p failed: ~p", [InstId, Reason]),
ets:insert(emqx_resource_instance,
{InstId, Data#{status => stopped, state => ResourceState1}}),
{InstId, Group, Data#{status => stopped, state => ResourceState1}}),
{error, Reason}
end.
do_set_resource_status_stoped(InstId) ->
case emqx_resource_instance:lookup(InstId) of
{ok, #{id := InstId} = Data} ->
{ok, Group, #{id := InstId} = Data} ->
logger:error("health check for ~p failed: timeout", [InstId]),
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}});
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}});
Error -> {error, Error}
end.
@ -302,9 +311,9 @@ do_set_resource_status_stoped(InstId) ->
%% internal functions
%%------------------------------------------------------------------------------
do_with_instance_data(InstId, Do, Args) ->
do_with_group_and_instance_data(InstId, Do, Args) ->
case lookup(InstId) of
{ok, Data} -> erlang:apply(Do, [Data | Args]);
{ok, Group, Data} -> erlang:apply(Do, [Group, Data | Args]);
Error -> Error
end.

View File

@ -20,7 +20,7 @@
-export([ introduced_in/0
, create/4
, create/5
, create_dry_run/2
, recreate/4
, remove/1
@ -32,13 +32,14 @@ introduced_in() ->
"5.0.0".
-spec create( emqx_resource:instance_id()
, emqx_resource:resource_group()
, emqx_resource:resource_type()
, emqx_resource:resource_config()
, emqx_resource:create_opts()
) ->
emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()).
create(InstId, ResourceType, Config, Opts) ->
emqx_cluster_rpc:multicall(emqx_resource, create_local, [InstId, ResourceType, Config, Opts]).
create(InstId, Group, ResourceType, Config, Opts) ->
emqx_cluster_rpc:multicall(emqx_resource, create_local, [InstId, Group, ResourceType, Config, Opts]).
-spec create_dry_run( emqx_resource:resource_type()
, emqx_resource:resource_config()

View File

@ -20,9 +20,11 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_resource.hrl").
-define(TEST_RESOURCE, emqx_test_resource).
-define(ID, <<"id">>).
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -59,11 +61,13 @@ t_check_config(_) ->
t_create_remove(_) ->
{error, _} = emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}),
{ok, _} = emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}),
@ -84,11 +88,13 @@ t_create_remove(_) ->
t_create_remove_local(_) ->
{error, _} = emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}),
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}),
@ -117,6 +123,7 @@ t_create_remove_local(_) ->
t_query(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource}),
@ -143,6 +150,7 @@ t_query(_) ->
t_healthy_timeout(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => <<"test_resource">>},
#{async_create => true, health_check_timeout => 200}),
@ -153,6 +161,7 @@ t_healthy_timeout(_) ->
t_healthy(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => <<"test_resource">>},
#{async_create => true}),
@ -184,11 +193,13 @@ t_healthy(_) ->
t_stop_start(_) ->
{error, _} = emqx_resource:check_and_create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}),
{ok, _} = emqx_resource:check_and_create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>}),
@ -218,11 +229,13 @@ t_stop_start(_) ->
t_stop_start_local(_) ->
{error, _} = emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}),
{ok, _} = emqx_resource:check_and_create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>}),
@ -252,21 +265,23 @@ t_stop_start_local(_) ->
t_list_filter(_) ->
{ok, _} = emqx_resource:create_local(
emqx_resource:generate_id(<<"a">>),
<<"group1">>,
?TEST_RESOURCE,
#{name => a}),
{ok, _} = emqx_resource:create_local(
emqx_resource:generate_id(<<"group">>, <<"a">>),
emqx_resource:generate_id(<<"a">>),
<<"group2">>,
?TEST_RESOURCE,
#{name => grouped_a}),
[Id1] = emqx_resource:list_group_instances(<<"default">>),
[Id1] = emqx_resource:list_group_instances(<<"group1">>),
?assertMatch(
{ok, #{config := #{name := a}}},
{ok, <<"group1">>, #{config := #{name := a}}},
emqx_resource:get_instance(Id1)),
[Id2] = emqx_resource:list_group_instances(<<"group">>),
[Id2] = emqx_resource:list_group_instances(<<"group2">>),
?assertMatch(
{ok, #{config := #{name := grouped_a}}},
{ok, <<"group2">>, #{config := #{name := grouped_a}}},
emqx_resource:get_instance(Id2)).
t_create_dry_run_local(_) ->

View File

@ -480,6 +480,7 @@ create_resource(Context, #{type := DB} = Config) ->
ResourceID = erlang:iolist_to_binary([io_lib:format("~ts_~ts", [?APP, DB])]),
case emqx_resource:create(
ResourceID,
<<"emqx_retainer">>,
list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])),
Config) of
{ok, already_created} ->