refactor(emqx_resource): improve grouping strategy for emqx_resource_instance
This commit is contained in:
parent
372f628c9e
commit
df57daaabb
|
@ -43,3 +43,4 @@
|
|||
-type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}.
|
||||
|
||||
-define(TEST_ID_PREFIX, "_test_:").
|
||||
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
|
||||
|
|
|
@ -32,10 +32,10 @@
|
|||
%% APIs for instances
|
||||
|
||||
-export([ check_config/2
|
||||
, check_and_create/3
|
||||
, check_and_create/4
|
||||
, check_and_create_local/3
|
||||
, check_and_create/5
|
||||
, check_and_create_local/4
|
||||
, check_and_create_local/5
|
||||
, check_and_recreate/4
|
||||
, check_and_recreate_local/4
|
||||
]).
|
||||
|
@ -43,10 +43,10 @@
|
|||
%% Sync resource instances and files
|
||||
%% provisional solution: rpc:multical to all the nodes for creating/updating/removing
|
||||
%% todo: replicate operations
|
||||
-export([ create/3 %% store the config and start the instance
|
||||
, create/4
|
||||
, create_local/3
|
||||
-export([ create/4 %% store the config and start the instance
|
||||
, create/5
|
||||
, create_local/4
|
||||
, create_local/5
|
||||
, create_dry_run/2 %% run start/2, health_check/2 and stop/1 sequentially
|
||||
, create_dry_run_local/2
|
||||
, recreate/4 %% this will do create_dry_run, stop the old instance and start a new one
|
||||
|
@ -77,12 +77,9 @@
|
|||
, get_instance/1 %% return the data of the instance
|
||||
, list_instances_by_type/1 %% return all the instances of the same resource type
|
||||
, generate_id/1
|
||||
, generate_id/2
|
||||
, list_group_instances/1
|
||||
]).
|
||||
|
||||
-define(DEFAULT_RESOURCE_GROUP, <<"default">>).
|
||||
|
||||
-optional_callbacks([ on_query/4
|
||||
, on_health_check/2
|
||||
]).
|
||||
|
@ -134,25 +131,26 @@ apply_query_after_calls(Funcs) ->
|
|||
%% =================================================================================
|
||||
%% APIs for resource instances
|
||||
%% =================================================================================
|
||||
-spec create(instance_id(), resource_type(), resource_config()) ->
|
||||
-spec create(instance_id(), resource_group(), resource_type(), resource_config()) ->
|
||||
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
|
||||
create(InstId, ResourceType, Config) ->
|
||||
create(InstId, ResourceType, Config, #{}).
|
||||
create(InstId, Group, ResourceType, Config) ->
|
||||
create(InstId, Group, ResourceType, Config, #{}).
|
||||
|
||||
-spec create(instance_id(), resource_type(), resource_config(), create_opts()) ->
|
||||
-spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
|
||||
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
|
||||
create(InstId, ResourceType, Config, Opts) ->
|
||||
wrap_rpc(emqx_resource_proto_v1:create(InstId, ResourceType, Config, Opts)).
|
||||
create(InstId, Group, ResourceType, Config, Opts) ->
|
||||
wrap_rpc(emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts)).
|
||||
% --------------------------------------------
|
||||
|
||||
-spec create_local(instance_id(), resource_type(), resource_config()) ->
|
||||
-spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) ->
|
||||
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
|
||||
create_local(InstId, ResourceType, Config) ->
|
||||
create_local(InstId, ResourceType, Config, #{}).
|
||||
create_local(InstId, Group, ResourceType, Config) ->
|
||||
create_local(InstId, Group, ResourceType, Config, #{}).
|
||||
|
||||
-spec create_local(instance_id(), resource_type(), resource_config(), create_opts()) ->
|
||||
-spec create_local(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
|
||||
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
|
||||
create_local(InstId, ResourceType, Config, Opts) ->
|
||||
call_instance(InstId, {create, InstId, ResourceType, Config, Opts}).
|
||||
create_local(InstId, Group, ResourceType, Config, Opts) ->
|
||||
call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}).
|
||||
|
||||
-spec create_dry_run(resource_type(), resource_config()) ->
|
||||
ok | {error, Reason :: term()}.
|
||||
|
@ -192,13 +190,13 @@ query(InstId, Request) ->
|
|||
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term().
|
||||
query(InstId, Request, AfterQuery) ->
|
||||
case get_instance(InstId) of
|
||||
{ok, #{status := starting}} ->
|
||||
{ok, _Group, #{status := starting}} ->
|
||||
query_error(starting, <<"cannot serve query when the resource "
|
||||
"instance is still starting">>);
|
||||
{ok, #{status := stopped}} ->
|
||||
{ok, _Group, #{status := stopped}} ->
|
||||
query_error(stopped, <<"cannot serve query when the resource "
|
||||
"instance is stopped">>);
|
||||
{ok, #{mod := Mod, state := ResourceState, status := started}} ->
|
||||
{ok, _Group, #{mod := Mod, state := ResourceState, status := started}} ->
|
||||
%% the resource state is readonly to Module:on_query/4
|
||||
%% and the `after_query()` functions should be thread safe
|
||||
ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched),
|
||||
|
@ -230,7 +228,7 @@ health_check(InstId) ->
|
|||
set_resource_status_stoped(InstId) ->
|
||||
call_instance(InstId, {set_resource_status_stoped, InstId}).
|
||||
|
||||
-spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}.
|
||||
-spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
|
||||
get_instance(InstId) ->
|
||||
emqx_resource_instance:lookup(InstId).
|
||||
|
||||
|
@ -250,21 +248,11 @@ list_instances_by_type(ResourceType) ->
|
|||
|
||||
-spec generate_id(term()) -> instance_id().
|
||||
generate_id(Name) when is_binary(Name) ->
|
||||
generate_id(?DEFAULT_RESOURCE_GROUP, Name).
|
||||
|
||||
-spec generate_id(resource_group(), binary()) -> instance_id().
|
||||
generate_id(Group, Name) when is_binary(Group) and is_binary(Name) ->
|
||||
Id = integer_to_binary(erlang:unique_integer([positive])),
|
||||
<<Group/binary, "/", Name/binary, ":", Id/binary>>.
|
||||
<<Name/binary, ":", Id/binary>>.
|
||||
|
||||
-spec list_group_instances(resource_group()) -> [instance_id()].
|
||||
list_group_instances(Group) ->
|
||||
filter_instances(fun(Id, _) ->
|
||||
case binary:split(Id, <<"/">>) of
|
||||
[Group | _] -> true;
|
||||
_ -> false
|
||||
end
|
||||
end).
|
||||
list_group_instances(Group) -> emqx_resource_instance:list_group(Group).
|
||||
|
||||
-spec call_start(instance_id(), module(), resource_config()) ->
|
||||
{ok, resource_state()} | {error, Reason :: term()}.
|
||||
|
@ -285,27 +273,27 @@ call_stop(InstId, Mod, ResourceState) ->
|
|||
check_config(ResourceType, Conf) ->
|
||||
emqx_hocon:check(ResourceType, Conf).
|
||||
|
||||
-spec check_and_create(instance_id(), resource_type(), raw_resource_config()) ->
|
||||
-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config()) ->
|
||||
{ok, resource_data() | 'already_created'} | {error, term()}.
|
||||
check_and_create(InstId, ResourceType, RawConfig) ->
|
||||
check_and_create(InstId, ResourceType, RawConfig, #{}).
|
||||
check_and_create(InstId, Group, ResourceType, RawConfig) ->
|
||||
check_and_create(InstId, Group, ResourceType, RawConfig, #{}).
|
||||
|
||||
-spec check_and_create(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
|
||||
-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config(), create_opts()) ->
|
||||
{ok, resource_data() | 'already_created'} | {error, term()}.
|
||||
check_and_create(InstId, ResourceType, RawConfig, Opts) ->
|
||||
check_and_create(InstId, Group, ResourceType, RawConfig, Opts) ->
|
||||
check_and_do(ResourceType, RawConfig,
|
||||
fun(InstConf) -> create(InstId, ResourceType, InstConf, Opts) end).
|
||||
fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end).
|
||||
|
||||
-spec check_and_create_local(instance_id(), resource_type(), raw_resource_config()) ->
|
||||
-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config()) ->
|
||||
{ok, resource_data()} | {error, term()}.
|
||||
check_and_create_local(InstId, ResourceType, RawConfig) ->
|
||||
check_and_create_local(InstId, ResourceType, RawConfig, #{}).
|
||||
check_and_create_local(InstId, Group, ResourceType, RawConfig) ->
|
||||
check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}).
|
||||
|
||||
-spec check_and_create_local(instance_id(), resource_type(), raw_resource_config(),
|
||||
-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config(),
|
||||
create_opts()) -> {ok, resource_data()} | {error, term()}.
|
||||
check_and_create_local(InstId, ResourceType, RawConfig, Opts) ->
|
||||
check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) ->
|
||||
check_and_do(ResourceType, RawConfig,
|
||||
fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end).
|
||||
fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end).
|
||||
|
||||
-spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) ->
|
||||
{ok, resource_data()} | {error, term()}.
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
-export([ lookup/1
|
||||
, get_metrics/1
|
||||
, list_all/0
|
||||
, list_group/1
|
||||
]).
|
||||
|
||||
-export([ hash_call/2
|
||||
|
@ -61,12 +62,12 @@ hash_call(InstId, Request) ->
|
|||
hash_call(InstId, Request, Timeout) ->
|
||||
gen_server:call(pick(InstId), Request, Timeout).
|
||||
|
||||
-spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}.
|
||||
-spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
|
||||
lookup(InstId) ->
|
||||
case ets:lookup(emqx_resource_instance, InstId) of
|
||||
[] -> {error, not_found};
|
||||
[{_, Data}] ->
|
||||
{ok, Data#{id => InstId, metrics => get_metrics(InstId)}}
|
||||
[{_, Group, Data}] ->
|
||||
{ok, Group, Data#{id => InstId, metrics => get_metrics(InstId)}}
|
||||
end.
|
||||
|
||||
make_test_id() ->
|
||||
|
@ -77,17 +78,22 @@ get_metrics(InstId) ->
|
|||
emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId).
|
||||
|
||||
force_lookup(InstId) ->
|
||||
{ok, Data} = lookup(InstId),
|
||||
{ok, _Group, Data} = lookup(InstId),
|
||||
Data.
|
||||
|
||||
-spec list_all() -> [resource_data()].
|
||||
list_all() ->
|
||||
try
|
||||
[Data#{id => Id} || {Id, Data} <- ets:tab2list(emqx_resource_instance)]
|
||||
[Data#{id => Id} || {Id, _Group, Data} <- ets:tab2list(emqx_resource_instance)]
|
||||
catch
|
||||
error:badarg -> []
|
||||
end.
|
||||
|
||||
-spec list_group(resource_group()) -> [instance_id()].
|
||||
list_group(Group) ->
|
||||
List = ets:match(emqx_resource_instance, {'$1', Group, '_'}),
|
||||
lists:map(fun([A|_]) -> A end, List).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -99,8 +105,8 @@ init({Pool, Id}) ->
|
|||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||
{ok, #state{worker_pool = Pool, worker_id = Id}}.
|
||||
|
||||
handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) ->
|
||||
{reply, do_create(InstId, ResourceType, Config, Opts), State};
|
||||
handle_call({create, InstId, Group, ResourceType, Config, Opts}, _From, State) ->
|
||||
{reply, do_create(InstId, Group, ResourceType, Config, Opts), State};
|
||||
|
||||
handle_call({create_dry_run, ResourceType, Config}, _From, State) ->
|
||||
{reply, do_create_dry_run(ResourceType, Config), State};
|
||||
|
@ -143,41 +149,41 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
%% suppress the race condition check, as these functions are protected in gproc workers
|
||||
-dialyzer({nowarn_function, [ do_recreate/4
|
||||
, do_create/4
|
||||
, do_create/5
|
||||
, do_restart/2
|
||||
, do_start/4
|
||||
, do_start/5
|
||||
, do_stop/1
|
||||
, do_health_check/1
|
||||
, start_and_check/5
|
||||
, start_and_check/6
|
||||
]}).
|
||||
|
||||
do_recreate(InstId, ResourceType, NewConfig, Opts) ->
|
||||
case lookup(InstId) of
|
||||
{ok, #{mod := ResourceType, status := started} = Data} ->
|
||||
{ok, Group, #{mod := ResourceType, status := started} = Data} ->
|
||||
%% If this resource is in use (status='started'), we should make sure
|
||||
%% the new config is OK before removing the old one.
|
||||
case do_create_dry_run(ResourceType, NewConfig) of
|
||||
ok ->
|
||||
do_remove(Data, false),
|
||||
do_create(InstId, ResourceType, NewConfig, Opts);
|
||||
do_remove(Group, Data, false),
|
||||
do_create(InstId, Group, ResourceType, NewConfig, Opts);
|
||||
Error ->
|
||||
Error
|
||||
end;
|
||||
{ok, #{mod := ResourceType, status := _} = Data} ->
|
||||
do_remove(Data, false),
|
||||
do_create(InstId, ResourceType, NewConfig, Opts);
|
||||
{ok, #{mod := Mod}} when Mod =/= ResourceType ->
|
||||
{ok, Group, #{mod := ResourceType, status := _} = Data} ->
|
||||
do_remove(Group, Data, false),
|
||||
do_create(InstId, Group, ResourceType, NewConfig, Opts);
|
||||
{ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
|
||||
{error, updating_to_incorrect_resource_type};
|
||||
{error, not_found} ->
|
||||
{error, not_found}
|
||||
end.
|
||||
|
||||
do_create(InstId, ResourceType, Config, Opts) ->
|
||||
do_create(InstId, Group, ResourceType, Config, Opts) ->
|
||||
case lookup(InstId) of
|
||||
{ok, _} ->
|
||||
{ok,_, _} ->
|
||||
{ok, already_created};
|
||||
{error, not_found} ->
|
||||
case do_start(InstId, ResourceType, Config, Opts) of
|
||||
case do_start(InstId, Group, ResourceType, Config, Opts) of
|
||||
ok ->
|
||||
ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId,
|
||||
[matched, success, failed, exception], [matched]),
|
||||
|
@ -207,9 +213,10 @@ do_remove(Instance) ->
|
|||
do_remove(Instance, true).
|
||||
|
||||
do_remove(InstId, ClearMetrics) when is_binary(InstId) ->
|
||||
do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]);
|
||||
do_remove(#{id := InstId} = Data, ClearMetrics) ->
|
||||
_ = do_stop(Data),
|
||||
do_with_group_and_instance_data(InstId, fun do_remove/3, [ClearMetrics]).
|
||||
|
||||
do_remove(Group, #{id := InstId} = Data, ClearMetrics) ->
|
||||
_ = do_stop(Group, Data),
|
||||
ets:delete(emqx_resource_instance, InstId),
|
||||
case ClearMetrics of
|
||||
true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId);
|
||||
|
@ -219,42 +226,42 @@ do_remove(#{id := InstId} = Data, ClearMetrics) ->
|
|||
|
||||
do_restart(InstId, Opts) ->
|
||||
case lookup(InstId) of
|
||||
{ok, #{mod := ResourceType, config := Config} = Data} ->
|
||||
ok = do_stop(Data),
|
||||
do_start(InstId, ResourceType, Config, Opts);
|
||||
{ok, Group, #{mod := ResourceType, config := Config} = Data} ->
|
||||
ok = do_stop(Group, Data),
|
||||
do_start(InstId, Group, ResourceType, Config, Opts);
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) ->
|
||||
do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) ->
|
||||
InitData = #{id => InstId, mod => ResourceType, config => Config,
|
||||
status => starting, state => undefined},
|
||||
%% The `emqx_resource:call_start/3` need the instance exist beforehand
|
||||
ets:insert(emqx_resource_instance, {InstId, InitData}),
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, InitData}),
|
||||
case maps:get(async_create, Opts, false) of
|
||||
false ->
|
||||
start_and_check(InstId, ResourceType, Config, Opts, InitData);
|
||||
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData);
|
||||
true ->
|
||||
spawn(fun() ->
|
||||
start_and_check(InstId, ResourceType, Config, Opts, InitData)
|
||||
start_and_check(InstId, Group, ResourceType, Config, Opts, InitData)
|
||||
end),
|
||||
ok
|
||||
end.
|
||||
|
||||
start_and_check(InstId, ResourceType, Config, Opts, Data) ->
|
||||
start_and_check(InstId, Group, ResourceType, Config, Opts, Data) ->
|
||||
case emqx_resource:call_start(InstId, ResourceType, Config) of
|
||||
{ok, ResourceState} ->
|
||||
Data2 = Data#{state => ResourceState},
|
||||
ets:insert(emqx_resource_instance, {InstId, Data2}),
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data2}),
|
||||
case maps:get(async_create, Opts, false) of
|
||||
false -> case do_health_check(Data2) of
|
||||
false -> case do_health_check(Group, Data2) of
|
||||
ok -> create_default_checker(InstId, Opts);
|
||||
{error, Reason} -> {error, Reason}
|
||||
end;
|
||||
true -> create_default_checker(InstId, Opts)
|
||||
end;
|
||||
{error, Reason} ->
|
||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
|
@ -264,37 +271,39 @@ create_default_checker(InstId, Opts) ->
|
|||
maps:get(health_check_timeout, Opts, 10000)).
|
||||
|
||||
do_stop(InstId) when is_binary(InstId) ->
|
||||
do_with_instance_data(InstId, fun do_stop/1, []);
|
||||
do_stop(#{state := undefined}) ->
|
||||
do_with_group_and_instance_data(InstId, fun do_stop/2, []).
|
||||
|
||||
do_stop(_Group, #{state := undefined}) ->
|
||||
ok;
|
||||
do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) ->
|
||||
do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) ->
|
||||
_ = emqx_resource:call_stop(InstId, Mod, ResourceState),
|
||||
_ = emqx_resource_health_check:delete_checker(InstId),
|
||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}),
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}),
|
||||
ok.
|
||||
|
||||
do_health_check(InstId) when is_binary(InstId) ->
|
||||
do_with_instance_data(InstId, fun do_health_check/1, []);
|
||||
do_health_check(#{state := undefined}) ->
|
||||
do_with_group_and_instance_data(InstId, fun do_health_check/2, []).
|
||||
|
||||
do_health_check(_Group, #{state := undefined}) ->
|
||||
{error, resource_not_initialized};
|
||||
do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
|
||||
do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Data) ->
|
||||
case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of
|
||||
{ok, ResourceState1} ->
|
||||
ets:insert(emqx_resource_instance,
|
||||
{InstId, Data#{status => started, state => ResourceState1}}),
|
||||
{InstId, Group, Data#{status => started, state => ResourceState1}}),
|
||||
ok;
|
||||
{error, Reason, ResourceState1} ->
|
||||
logger:error("health check for ~p failed: ~p", [InstId, Reason]),
|
||||
ets:insert(emqx_resource_instance,
|
||||
{InstId, Data#{status => stopped, state => ResourceState1}}),
|
||||
{InstId, Group, Data#{status => stopped, state => ResourceState1}}),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
do_set_resource_status_stoped(InstId) ->
|
||||
case emqx_resource_instance:lookup(InstId) of
|
||||
{ok, #{id := InstId} = Data} ->
|
||||
{ok, Group, #{id := InstId} = Data} ->
|
||||
logger:error("health check for ~p failed: timeout", [InstId]),
|
||||
ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}});
|
||||
ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}});
|
||||
Error -> {error, Error}
|
||||
end.
|
||||
|
||||
|
@ -302,9 +311,9 @@ do_set_resource_status_stoped(InstId) ->
|
|||
%% internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
do_with_instance_data(InstId, Do, Args) ->
|
||||
do_with_group_and_instance_data(InstId, Do, Args) ->
|
||||
case lookup(InstId) of
|
||||
{ok, Data} -> erlang:apply(Do, [Data | Args]);
|
||||
{ok, Group, Data} -> erlang:apply(Do, [Group, Data | Args]);
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
-export([ introduced_in/0
|
||||
|
||||
, create/4
|
||||
, create/5
|
||||
, create_dry_run/2
|
||||
, recreate/4
|
||||
, remove/1
|
||||
|
@ -32,13 +32,14 @@ introduced_in() ->
|
|||
"5.0.0".
|
||||
|
||||
-spec create( emqx_resource:instance_id()
|
||||
, emqx_resource:resource_group()
|
||||
, emqx_resource:resource_type()
|
||||
, emqx_resource:resource_config()
|
||||
, emqx_resource:create_opts()
|
||||
) ->
|
||||
emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()).
|
||||
create(InstId, ResourceType, Config, Opts) ->
|
||||
emqx_cluster_rpc:multicall(emqx_resource, create_local, [InstId, ResourceType, Config, Opts]).
|
||||
create(InstId, Group, ResourceType, Config, Opts) ->
|
||||
emqx_cluster_rpc:multicall(emqx_resource, create_local, [InstId, Group, ResourceType, Config, Opts]).
|
||||
|
||||
-spec create_dry_run( emqx_resource:resource_type()
|
||||
, emqx_resource:resource_config()
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include("emqx_resource.hrl").
|
||||
|
||||
-define(TEST_RESOURCE, emqx_test_resource).
|
||||
-define(ID, <<"id">>).
|
||||
|
@ -59,11 +60,13 @@ t_check_config(_) ->
|
|||
t_create_remove(_) ->
|
||||
{error, _} = emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => test_resource}),
|
||||
|
||||
{ok, _} = emqx_resource:create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource}),
|
||||
|
||||
|
@ -84,11 +87,13 @@ t_create_remove(_) ->
|
|||
t_create_remove_local(_) ->
|
||||
{error, _} = emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => test_resource}),
|
||||
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource}),
|
||||
|
||||
|
@ -117,6 +122,7 @@ t_create_remove_local(_) ->
|
|||
t_query(_) ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource}),
|
||||
|
||||
|
@ -143,6 +149,7 @@ t_query(_) ->
|
|||
t_healthy_timeout(_) ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => <<"test_resource">>},
|
||||
#{async_create => true, health_check_timeout => 200}),
|
||||
|
@ -153,6 +160,7 @@ t_healthy_timeout(_) ->
|
|||
t_healthy(_) ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => <<"test_resource">>},
|
||||
#{async_create => true}),
|
||||
|
@ -184,11 +192,13 @@ t_healthy(_) ->
|
|||
t_stop_start(_) ->
|
||||
{error, _} = emqx_resource:check_and_create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => test_resource}),
|
||||
|
||||
{ok, _} = emqx_resource:check_and_create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{<<"name">> => <<"test_resource">>}),
|
||||
|
||||
|
@ -218,11 +228,13 @@ t_stop_start(_) ->
|
|||
t_stop_start_local(_) ->
|
||||
{error, _} = emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => test_resource}),
|
||||
|
||||
{ok, _} = emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{<<"name">> => <<"test_resource">>}),
|
||||
|
||||
|
@ -252,21 +264,23 @@ t_stop_start_local(_) ->
|
|||
t_list_filter(_) ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
emqx_resource:generate_id(<<"a">>),
|
||||
<<"group1">>,
|
||||
?TEST_RESOURCE,
|
||||
#{name => a}),
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
emqx_resource:generate_id(<<"group">>, <<"a">>),
|
||||
emqx_resource:generate_id(<<"a">>),
|
||||
<<"group2">>,
|
||||
?TEST_RESOURCE,
|
||||
#{name => grouped_a}),
|
||||
|
||||
[Id1] = emqx_resource:list_group_instances(<<"default">>),
|
||||
[Id1] = emqx_resource:list_group_instances(<<"group1">>),
|
||||
?assertMatch(
|
||||
{ok, #{config := #{name := a}}},
|
||||
{ok, <<"group1">>, #{config := #{name := a}}},
|
||||
emqx_resource:get_instance(Id1)),
|
||||
|
||||
[Id2] = emqx_resource:list_group_instances(<<"group">>),
|
||||
[Id2] = emqx_resource:list_group_instances(<<"group2">>),
|
||||
?assertMatch(
|
||||
{ok, #{config := #{name := grouped_a}}},
|
||||
{ok, <<"group2">>, #{config := #{name := grouped_a}}},
|
||||
emqx_resource:get_instance(Id2)).
|
||||
|
||||
t_create_dry_run_local(_) ->
|
||||
|
|
Loading…
Reference in New Issue