From df57daaabbc4976d6da8fde17a91968145afc228 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 11 Feb 2022 15:06:39 +0800 Subject: [PATCH 1/8] refactor(emqx_resource): improve grouping strategy for emqx_resource_instance --- apps/emqx_resource/include/emqx_resource.hrl | 1 + apps/emqx_resource/src/emqx_resource.erl | 84 ++++++-------- .../src/emqx_resource_instance.erl | 103 ++++++++++-------- .../src/proto/emqx_resource_proto_v1.erl | 7 +- .../test/emqx_resource_SUITE.erl | 24 +++- 5 files changed, 116 insertions(+), 103 deletions(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 0ea92d993..b88b72d92 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -43,3 +43,4 @@ -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. -define(TEST_ID_PREFIX, "_test_:"). +-define(DEFAULT_RESOURCE_GROUP, <<"default">>). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 61405b5a9..e6c33718f 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -32,10 +32,10 @@ %% APIs for instances -export([ check_config/2 - , check_and_create/3 , check_and_create/4 - , check_and_create_local/3 + , check_and_create/5 , check_and_create_local/4 + , check_and_create_local/5 , check_and_recreate/4 , check_and_recreate_local/4 ]). @@ -43,10 +43,10 @@ %% Sync resource instances and files %% provisional solution: rpc:multical to all the nodes for creating/updating/removing %% todo: replicate operations --export([ create/3 %% store the config and start the instance - , create/4 - , create_local/3 +-export([ create/4 %% store the config and start the instance + , create/5 , create_local/4 + , create_local/5 , create_dry_run/2 %% run start/2, health_check/2 and stop/1 sequentially , create_dry_run_local/2 , recreate/4 %% this will do create_dry_run, stop the old instance and start a new one @@ -77,12 +77,9 @@ , get_instance/1 %% return the data of the instance , list_instances_by_type/1 %% return all the instances of the same resource type , generate_id/1 - , generate_id/2 , list_group_instances/1 ]). --define(DEFAULT_RESOURCE_GROUP, <<"default">>). - -optional_callbacks([ on_query/4 , on_health_check/2 ]). @@ -134,25 +131,26 @@ apply_query_after_calls(Funcs) -> %% ================================================================================= %% APIs for resource instances %% ================================================================================= --spec create(instance_id(), resource_type(), resource_config()) -> +-spec create(instance_id(), resource_group(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create(InstId, ResourceType, Config) -> - create(InstId, ResourceType, Config, #{}). +create(InstId, Group, ResourceType, Config) -> + create(InstId, Group, ResourceType, Config, #{}). --spec create(instance_id(), resource_type(), resource_config(), create_opts()) -> +-spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create(InstId, ResourceType, Config, Opts) -> - wrap_rpc(emqx_resource_proto_v1:create(InstId, ResourceType, Config, Opts)). +create(InstId, Group, ResourceType, Config, Opts) -> + wrap_rpc(emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts)). +% -------------------------------------------- --spec create_local(instance_id(), resource_type(), resource_config()) -> +-spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create_local(InstId, ResourceType, Config) -> - create_local(InstId, ResourceType, Config, #{}). +create_local(InstId, Group, ResourceType, Config) -> + create_local(InstId, Group, ResourceType, Config, #{}). --spec create_local(instance_id(), resource_type(), resource_config(), create_opts()) -> +-spec create_local(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> {ok, resource_data() | 'already_created'} | {error, Reason :: term()}. -create_local(InstId, ResourceType, Config, Opts) -> - call_instance(InstId, {create, InstId, ResourceType, Config, Opts}). +create_local(InstId, Group, ResourceType, Config, Opts) -> + call_instance(InstId, {create, InstId, Group, ResourceType, Config, Opts}). -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. @@ -192,13 +190,13 @@ query(InstId, Request) -> -spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). query(InstId, Request, AfterQuery) -> case get_instance(InstId) of - {ok, #{status := starting}} -> + {ok, _Group, #{status := starting}} -> query_error(starting, <<"cannot serve query when the resource " "instance is still starting">>); - {ok, #{status := stopped}} -> + {ok, _Group, #{status := stopped}} -> query_error(stopped, <<"cannot serve query when the resource " "instance is stopped">>); - {ok, #{mod := Mod, state := ResourceState, status := started}} -> + {ok, _Group, #{mod := Mod, state := ResourceState, status := started}} -> %% the resource state is readonly to Module:on_query/4 %% and the `after_query()` functions should be thread safe ok = emqx_plugin_libs_metrics:inc(resource_metrics, InstId, matched), @@ -230,7 +228,7 @@ health_check(InstId) -> set_resource_status_stoped(InstId) -> call_instance(InstId, {set_resource_status_stoped, InstId}). --spec get_instance(instance_id()) -> {ok, resource_data()} | {error, Reason :: term()}. +-spec get_instance(instance_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. get_instance(InstId) -> emqx_resource_instance:lookup(InstId). @@ -250,21 +248,11 @@ list_instances_by_type(ResourceType) -> -spec generate_id(term()) -> instance_id(). generate_id(Name) when is_binary(Name) -> - generate_id(?DEFAULT_RESOURCE_GROUP, Name). - --spec generate_id(resource_group(), binary()) -> instance_id(). -generate_id(Group, Name) when is_binary(Group) and is_binary(Name) -> Id = integer_to_binary(erlang:unique_integer([positive])), - <>. + <>. -spec list_group_instances(resource_group()) -> [instance_id()]. -list_group_instances(Group) -> - filter_instances(fun(Id, _) -> - case binary:split(Id, <<"/">>) of - [Group | _] -> true; - _ -> false - end - end). +list_group_instances(Group) -> emqx_resource_instance:list_group(Group). -spec call_start(instance_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. @@ -285,27 +273,27 @@ call_stop(InstId, Mod, ResourceState) -> check_config(ResourceType, Conf) -> emqx_hocon:check(ResourceType, Conf). --spec check_and_create(instance_id(), resource_type(), raw_resource_config()) -> +-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config()) -> {ok, resource_data() | 'already_created'} | {error, term()}. -check_and_create(InstId, ResourceType, RawConfig) -> - check_and_create(InstId, ResourceType, RawConfig, #{}). +check_and_create(InstId, Group, ResourceType, RawConfig) -> + check_and_create(InstId, Group, ResourceType, RawConfig, #{}). --spec check_and_create(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> +-spec check_and_create(instance_id(), resource_group(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data() | 'already_created'} | {error, term()}. -check_and_create(InstId, ResourceType, RawConfig, Opts) -> +check_and_create(InstId, Group, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> create(InstId, ResourceType, InstConf, Opts) end). + fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end). --spec check_and_create_local(instance_id(), resource_type(), raw_resource_config()) -> +-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config()) -> {ok, resource_data()} | {error, term()}. -check_and_create_local(InstId, ResourceType, RawConfig) -> - check_and_create_local(InstId, ResourceType, RawConfig, #{}). +check_and_create_local(InstId, Group, ResourceType, RawConfig) -> + check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}). --spec check_and_create_local(instance_id(), resource_type(), raw_resource_config(), +-spec check_and_create_local(instance_id(), resource_group(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. -check_and_create_local(InstId, ResourceType, RawConfig, Opts) -> +check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) -> check_and_do(ResourceType, RawConfig, - fun(InstConf) -> create_local(InstId, ResourceType, InstConf, Opts) end). + fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end). -spec check_and_recreate(instance_id(), resource_type(), raw_resource_config(), create_opts()) -> {ok, resource_data()} | {error, term()}. diff --git a/apps/emqx_resource/src/emqx_resource_instance.erl b/apps/emqx_resource/src/emqx_resource_instance.erl index a07f3e78c..1b0effe64 100644 --- a/apps/emqx_resource/src/emqx_resource_instance.erl +++ b/apps/emqx_resource/src/emqx_resource_instance.erl @@ -26,6 +26,7 @@ -export([ lookup/1 , get_metrics/1 , list_all/0 + , list_group/1 ]). -export([ hash_call/2 @@ -61,12 +62,12 @@ hash_call(InstId, Request) -> hash_call(InstId, Request, Timeout) -> gen_server:call(pick(InstId), Request, Timeout). --spec lookup(instance_id()) -> {ok, resource_data()} | {error, not_found}. +-spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. lookup(InstId) -> case ets:lookup(emqx_resource_instance, InstId) of [] -> {error, not_found}; - [{_, Data}] -> - {ok, Data#{id => InstId, metrics => get_metrics(InstId)}} + [{_, Group, Data}] -> + {ok, Group, Data#{id => InstId, metrics => get_metrics(InstId)}} end. make_test_id() -> @@ -77,17 +78,22 @@ get_metrics(InstId) -> emqx_plugin_libs_metrics:get_metrics(resource_metrics, InstId). force_lookup(InstId) -> - {ok, Data} = lookup(InstId), + {ok, _Group, Data} = lookup(InstId), Data. -spec list_all() -> [resource_data()]. list_all() -> try - [Data#{id => Id} || {Id, Data} <- ets:tab2list(emqx_resource_instance)] + [Data#{id => Id} || {Id, _Group, Data} <- ets:tab2list(emqx_resource_instance)] catch error:badarg -> [] end. +-spec list_group(resource_group()) -> [instance_id()]. +list_group(Group) -> + List = ets:match(emqx_resource_instance, {'$1', Group, '_'}), + lists:map(fun([A|_]) -> A end, List). + %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ @@ -99,8 +105,8 @@ init({Pool, Id}) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #state{worker_pool = Pool, worker_id = Id}}. -handle_call({create, InstId, ResourceType, Config, Opts}, _From, State) -> - {reply, do_create(InstId, ResourceType, Config, Opts), State}; +handle_call({create, InstId, Group, ResourceType, Config, Opts}, _From, State) -> + {reply, do_create(InstId, Group, ResourceType, Config, Opts), State}; handle_call({create_dry_run, ResourceType, Config}, _From, State) -> {reply, do_create_dry_run(ResourceType, Config), State}; @@ -143,41 +149,41 @@ code_change(_OldVsn, State, _Extra) -> %% suppress the race condition check, as these functions are protected in gproc workers -dialyzer({nowarn_function, [ do_recreate/4 - , do_create/4 + , do_create/5 , do_restart/2 - , do_start/4 + , do_start/5 , do_stop/1 , do_health_check/1 - , start_and_check/5 + , start_and_check/6 ]}). do_recreate(InstId, ResourceType, NewConfig, Opts) -> case lookup(InstId) of - {ok, #{mod := ResourceType, status := started} = Data} -> + {ok, Group, #{mod := ResourceType, status := started} = Data} -> %% If this resource is in use (status='started'), we should make sure %% the new config is OK before removing the old one. case do_create_dry_run(ResourceType, NewConfig) of ok -> - do_remove(Data, false), - do_create(InstId, ResourceType, NewConfig, Opts); + do_remove(Group, Data, false), + do_create(InstId, Group, ResourceType, NewConfig, Opts); Error -> Error end; - {ok, #{mod := ResourceType, status := _} = Data} -> - do_remove(Data, false), - do_create(InstId, ResourceType, NewConfig, Opts); - {ok, #{mod := Mod}} when Mod =/= ResourceType -> + {ok, Group, #{mod := ResourceType, status := _} = Data} -> + do_remove(Group, Data, false), + do_create(InstId, Group, ResourceType, NewConfig, Opts); + {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> {error, not_found} end. -do_create(InstId, ResourceType, Config, Opts) -> +do_create(InstId, Group, ResourceType, Config, Opts) -> case lookup(InstId) of - {ok, _} -> + {ok,_, _} -> {ok, already_created}; {error, not_found} -> - case do_start(InstId, ResourceType, Config, Opts) of + case do_start(InstId, Group, ResourceType, Config, Opts) of ok -> ok = emqx_plugin_libs_metrics:create_metrics(resource_metrics, InstId, [matched, success, failed, exception], [matched]), @@ -207,9 +213,10 @@ do_remove(Instance) -> do_remove(Instance, true). do_remove(InstId, ClearMetrics) when is_binary(InstId) -> - do_with_instance_data(InstId, fun do_remove/2, [ClearMetrics]); -do_remove(#{id := InstId} = Data, ClearMetrics) -> - _ = do_stop(Data), + do_with_group_and_instance_data(InstId, fun do_remove/3, [ClearMetrics]). + +do_remove(Group, #{id := InstId} = Data, ClearMetrics) -> + _ = do_stop(Group, Data), ets:delete(emqx_resource_instance, InstId), case ClearMetrics of true -> ok = emqx_plugin_libs_metrics:clear_metrics(resource_metrics, InstId); @@ -219,42 +226,42 @@ do_remove(#{id := InstId} = Data, ClearMetrics) -> do_restart(InstId, Opts) -> case lookup(InstId) of - {ok, #{mod := ResourceType, config := Config} = Data} -> - ok = do_stop(Data), - do_start(InstId, ResourceType, Config, Opts); + {ok, Group, #{mod := ResourceType, config := Config} = Data} -> + ok = do_stop(Group, Data), + do_start(InstId, Group, ResourceType, Config, Opts); Error -> Error end. -do_start(InstId, ResourceType, Config, Opts) when is_binary(InstId) -> +do_start(InstId, Group, ResourceType, Config, Opts) when is_binary(InstId) -> InitData = #{id => InstId, mod => ResourceType, config => Config, status => starting, state => undefined}, %% The `emqx_resource:call_start/3` need the instance exist beforehand - ets:insert(emqx_resource_instance, {InstId, InitData}), + ets:insert(emqx_resource_instance, {InstId, Group, InitData}), case maps:get(async_create, Opts, false) of false -> - start_and_check(InstId, ResourceType, Config, Opts, InitData); + start_and_check(InstId, Group, ResourceType, Config, Opts, InitData); true -> spawn(fun() -> - start_and_check(InstId, ResourceType, Config, Opts, InitData) + start_and_check(InstId, Group, ResourceType, Config, Opts, InitData) end), ok end. -start_and_check(InstId, ResourceType, Config, Opts, Data) -> +start_and_check(InstId, Group, ResourceType, Config, Opts, Data) -> case emqx_resource:call_start(InstId, ResourceType, Config) of {ok, ResourceState} -> Data2 = Data#{state => ResourceState}, - ets:insert(emqx_resource_instance, {InstId, Data2}), + ets:insert(emqx_resource_instance, {InstId, Group, Data2}), case maps:get(async_create, Opts, false) of - false -> case do_health_check(Data2) of + false -> case do_health_check(Group, Data2) of ok -> create_default_checker(InstId, Opts); {error, Reason} -> {error, Reason} end; true -> create_default_checker(InstId, Opts) end; {error, Reason} -> - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}), {error, Reason} end. @@ -264,37 +271,39 @@ create_default_checker(InstId, Opts) -> maps:get(health_check_timeout, Opts, 10000)). do_stop(InstId) when is_binary(InstId) -> - do_with_instance_data(InstId, fun do_stop/1, []); -do_stop(#{state := undefined}) -> + do_with_group_and_instance_data(InstId, fun do_stop/2, []). + +do_stop(_Group, #{state := undefined}) -> ok; -do_stop(#{id := InstId, mod := Mod, state := ResourceState} = Data) -> +do_stop(Group, #{id := InstId, mod := Mod, state := ResourceState} = Data) -> _ = emqx_resource:call_stop(InstId, Mod, ResourceState), _ = emqx_resource_health_check:delete_checker(InstId), - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}), + ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}), ok. do_health_check(InstId) when is_binary(InstId) -> - do_with_instance_data(InstId, fun do_health_check/1, []); -do_health_check(#{state := undefined}) -> + do_with_group_and_instance_data(InstId, fun do_health_check/2, []). + +do_health_check(_Group, #{state := undefined}) -> {error, resource_not_initialized}; -do_health_check(#{id := InstId, mod := Mod, state := ResourceState0} = Data) -> +do_health_check(Group, #{id := InstId, mod := Mod, state := ResourceState0} = Data) -> case emqx_resource:call_health_check(InstId, Mod, ResourceState0) of {ok, ResourceState1} -> ets:insert(emqx_resource_instance, - {InstId, Data#{status => started, state => ResourceState1}}), + {InstId, Group, Data#{status => started, state => ResourceState1}}), ok; {error, Reason, ResourceState1} -> logger:error("health check for ~p failed: ~p", [InstId, Reason]), ets:insert(emqx_resource_instance, - {InstId, Data#{status => stopped, state => ResourceState1}}), + {InstId, Group, Data#{status => stopped, state => ResourceState1}}), {error, Reason} end. do_set_resource_status_stoped(InstId) -> case emqx_resource_instance:lookup(InstId) of - {ok, #{id := InstId} = Data} -> + {ok, Group, #{id := InstId} = Data} -> logger:error("health check for ~p failed: timeout", [InstId]), - ets:insert(emqx_resource_instance, {InstId, Data#{status => stopped}}); + ets:insert(emqx_resource_instance, {InstId, Group, Data#{status => stopped}}); Error -> {error, Error} end. @@ -302,9 +311,9 @@ do_set_resource_status_stoped(InstId) -> %% internal functions %%------------------------------------------------------------------------------ -do_with_instance_data(InstId, Do, Args) -> +do_with_group_and_instance_data(InstId, Do, Args) -> case lookup(InstId) of - {ok, Data} -> erlang:apply(Do, [Data | Args]); + {ok, Group, Data} -> erlang:apply(Do, [Group, Data | Args]); Error -> Error end. diff --git a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl index f32f7840a..3d62603fa 100644 --- a/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl +++ b/apps/emqx_resource/src/proto/emqx_resource_proto_v1.erl @@ -20,7 +20,7 @@ -export([ introduced_in/0 - , create/4 + , create/5 , create_dry_run/2 , recreate/4 , remove/1 @@ -32,13 +32,14 @@ introduced_in() -> "5.0.0". -spec create( emqx_resource:instance_id() + , emqx_resource:resource_group() , emqx_resource:resource_type() , emqx_resource:resource_config() , emqx_resource:create_opts() ) -> emqx_cluster_rpc:multicall_return(emqx_resource:resource_data()). -create(InstId, ResourceType, Config, Opts) -> - emqx_cluster_rpc:multicall(emqx_resource, create_local, [InstId, ResourceType, Config, Opts]). +create(InstId, Group, ResourceType, Config, Opts) -> + emqx_cluster_rpc:multicall(emqx_resource, create_local, [InstId, Group, ResourceType, Config, Opts]). -spec create_dry_run( emqx_resource:resource_type() , emqx_resource:resource_config() diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 41945f295..e8275b2c0 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -20,6 +20,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include("emqx_resource.hrl"). -define(TEST_RESOURCE, emqx_test_resource). -define(ID, <<"id">>). @@ -59,11 +60,13 @@ t_check_config(_) -> t_create_remove(_) -> {error, _} = emqx_resource:check_and_create_local( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{unknown => test_resource}), {ok, _} = emqx_resource:create( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}), @@ -84,11 +87,13 @@ t_create_remove(_) -> t_create_remove_local(_) -> {error, _} = emqx_resource:check_and_create_local( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{unknown => test_resource}), {ok, _} = emqx_resource:create_local( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}), @@ -117,6 +122,7 @@ t_create_remove_local(_) -> t_query(_) -> {ok, _} = emqx_resource:create_local( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource}), @@ -143,6 +149,7 @@ t_query(_) -> t_healthy_timeout(_) -> {ok, _} = emqx_resource:create_local( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => <<"test_resource">>}, #{async_create => true, health_check_timeout => 200}), @@ -153,6 +160,7 @@ t_healthy_timeout(_) -> t_healthy(_) -> {ok, _} = emqx_resource:create_local( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => <<"test_resource">>}, #{async_create => true}), @@ -184,11 +192,13 @@ t_healthy(_) -> t_stop_start(_) -> {error, _} = emqx_resource:check_and_create( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{unknown => test_resource}), {ok, _} = emqx_resource:check_and_create( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>}), @@ -218,11 +228,13 @@ t_stop_start(_) -> t_stop_start_local(_) -> {error, _} = emqx_resource:check_and_create_local( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{unknown => test_resource}), {ok, _} = emqx_resource:check_and_create_local( ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{<<"name">> => <<"test_resource">>}), @@ -252,21 +264,23 @@ t_stop_start_local(_) -> t_list_filter(_) -> {ok, _} = emqx_resource:create_local( emqx_resource:generate_id(<<"a">>), + <<"group1">>, ?TEST_RESOURCE, #{name => a}), {ok, _} = emqx_resource:create_local( - emqx_resource:generate_id(<<"group">>, <<"a">>), + emqx_resource:generate_id(<<"a">>), + <<"group2">>, ?TEST_RESOURCE, #{name => grouped_a}), - [Id1] = emqx_resource:list_group_instances(<<"default">>), + [Id1] = emqx_resource:list_group_instances(<<"group1">>), ?assertMatch( - {ok, #{config := #{name := a}}}, + {ok, <<"group1">>, #{config := #{name := a}}}, emqx_resource:get_instance(Id1)), - [Id2] = emqx_resource:list_group_instances(<<"group">>), + [Id2] = emqx_resource:list_group_instances(<<"group2">>), ?assertMatch( - {ok, #{config := #{name := grouped_a}}}, + {ok, <<"group2">>, #{config := #{name := grouped_a}}}, emqx_resource:get_instance(Id2)). t_create_dry_run_local(_) -> From d8819559f7121438ecd1d2cf673e5591205951d6 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 11 Feb 2022 17:13:14 +0800 Subject: [PATCH 2/8] fix(emqx_resource): fix emqx_bridge create --- apps/emqx_bridge/src/emqx_bridge.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index a4dc29987..beb585c0c 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -202,7 +202,7 @@ lookup(Type, Name) -> lookup(Type, Name, RawConf) -> case emqx_resource:get_instance(resource_id(Type, Name)) of {error, not_found} -> {error, not_found}; - {ok, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data, + {ok, _, Data} -> {ok, #{id => bridge_id(Type, Name), resource_data => Data, raw_config => RawConf}} end. @@ -222,7 +222,8 @@ create(BridgeId, Conf) -> create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), - case emqx_resource:create_local(resource_id(Type, Name), emqx_bridge:resource_type(Type), + Group = maps:get(group, Conf, <<"default">>), + case emqx_resource:create_local(resource_id(Type, Name), Group, emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), #{async_create => true}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); From 580901b67800f6e83d68a6466ee7a39c4c708b49 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 11 Feb 2022 17:33:41 +0800 Subject: [PATCH 3/8] fix(emqx_authn): fix create api --- apps/emqx_authn/include/emqx_authn.hrl | 2 ++ apps/emqx_authn/src/emqx_authn_utils.erl | 5 ++--- apps/emqx_authn/src/simple_authn/emqx_authn_http.erl | 1 + apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl | 2 +- apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl | 2 +- apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl | 2 +- apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl | 2 +- 7 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apps/emqx_authn/include/emqx_authn.hrl b/apps/emqx_authn/include/emqx_authn.hrl index 225812857..a1be787c3 100644 --- a/apps/emqx_authn/include/emqx_authn.hrl +++ b/apps/emqx_authn/include/emqx_authn.hrl @@ -35,3 +35,5 @@ -define(CONF_NS_BINARY, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY). -endif. + +-define(RESOURCE_GROUP, <<"emqx_authn">>). \ No newline at end of file diff --git a/apps/emqx_authn/src/emqx_authn_utils.erl b/apps/emqx_authn/src/emqx_authn_utils.erl index 5c4547402..7f17215c1 100644 --- a/apps/emqx_authn/src/emqx_authn_utils.erl +++ b/apps/emqx_authn/src/emqx_authn_utils.erl @@ -17,6 +17,7 @@ -module(emqx_authn_utils). -include_lib("emqx/include/emqx_placeholder.hrl"). +-include_lib("emqx_authn.hrl"). -export([ check_password_from_selected_map/3 , parse_deep/1 @@ -32,8 +33,6 @@ , make_resource_id/1 ]). --define(RESOURCE_GROUP, <<"emqx_authn">>). - -define(AUTHN_PLACEHOLDERS, [?PH_USERNAME, ?PH_CLIENTID, ?PH_PASSWORD, @@ -120,7 +119,7 @@ cleanup_resources() -> make_resource_id(Name) -> NameBin = bin(Name), - emqx_resource:generate_id(?RESOURCE_GROUP, NameBin). + emqx_resource:generate_id(NameBin). %%------------------------------------------------------------------------------ %% Internal functions diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl index 18501b31e..e04e938f6 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_http.erl @@ -134,6 +134,7 @@ create(#{method := Method, request_timeout => RequestTimeout, resource_id => ResourceId}, case emqx_resource:create_local(ResourceId, + ?RESOURCE_GROUP, emqx_connector_http, Config#{base_url => maps:remove(query, URIMap), pool_type => random}) of diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl index 410631ce8..aabb39e82 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mongodb.erl @@ -112,7 +112,7 @@ create(#{selector := Selector} = Config) -> NState = State#{ selector_template => SelectorTemplate, resource_id => ResourceId}, - case emqx_resource:create_local(ResourceId, emqx_connector_mongo, Config) of + case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_mongo, Config) of {ok, already_created} -> {ok, NState}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl index 038175854..e306a9b77 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mysql.erl @@ -82,7 +82,7 @@ create(#{password_hash_algorithm := Algorithm, placeholders => PlaceHolders, query_timeout => QueryTimeout, resource_id => ResourceId}, - case emqx_resource:create_local(ResourceId, emqx_connector_mysql, Config) of + case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_mysql, Config) of {ok, already_created} -> {ok, State}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl index cb0bc4df1..e6d12285a 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_pgsql.erl @@ -79,7 +79,7 @@ create(#{query := Query0, State = #{placeholders => PlaceHolders, password_hash_algorithm => Algorithm, resource_id => ResourceId}, - case emqx_resource:create_local(ResourceId, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}) of + case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_pgsql, Config#{named_queries => #{ResourceId => Query}}) of {ok, already_created} -> {ok, State}; {ok, _} -> diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl index e81356883..11fcdac84 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_redis.erl @@ -91,7 +91,7 @@ create(#{cmd := Cmd, NState = State#{ cmd => NCmd, resource_id => ResourceId}, - case emqx_resource:create_local(ResourceId, emqx_connector_redis, Config) of + case emqx_resource:create_local(ResourceId, ?RESOURCE_GROUP, emqx_connector_redis, Config) of {ok, already_created} -> {ok, NState}; {ok, _} -> From fae91d72f294dd50f5294a88756ba8c3f7fca144 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 11 Feb 2022 17:50:56 +0800 Subject: [PATCH 4/8] fix(emqx_authz): fix function 'create_resource' --- apps/emqx_authz/include/emqx_authz.hrl | 2 ++ apps/emqx_authz/src/emqx_authz_postgresql.erl | 1 + apps/emqx_authz/src/emqx_authz_utils.erl | 7 +++---- apps/emqx_authz/test/emqx_authz_SUITE.erl | 2 +- apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl | 2 +- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/emqx_authz/include/emqx_authz.hrl b/apps/emqx_authz/include/emqx_authz.hrl index 3e1c119be..e71b67284 100644 --- a/apps/emqx_authz/include/emqx_authz.hrl +++ b/apps/emqx_authz/include/emqx_authz.hrl @@ -71,3 +71,5 @@ , limit => 100 , count => 1 }). + +-define(RESOURCE_GROUP, <<"emqx_authz">>). \ No newline at end of file diff --git a/apps/emqx_authz/src/emqx_authz_postgresql.erl b/apps/emqx_authz/src/emqx_authz_postgresql.erl index 0da8caaff..936cfc7e6 100644 --- a/apps/emqx_authz/src/emqx_authz_postgresql.erl +++ b/apps/emqx_authz/src/emqx_authz_postgresql.erl @@ -53,6 +53,7 @@ init(#{query := SQL0} = Source) -> ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql), case emqx_resource:create_local( ResourceID, + ?RESOURCE_GROUP, emqx_connector_pgsql, Source#{named_queries => #{ResourceID => SQL}}) of {ok, _} -> diff --git a/apps/emqx_authz/src/emqx_authz_utils.erl b/apps/emqx_authz/src/emqx_authz_utils.erl index 2a60f7659..c0a8206c6 100644 --- a/apps/emqx_authz/src/emqx_authz_utils.erl +++ b/apps/emqx_authz/src/emqx_authz_utils.erl @@ -17,6 +17,7 @@ -module(emqx_authz_utils). -include_lib("emqx/include/emqx_placeholder.hrl"). +-include_lib("emqx_authz.hrl"). -export([ cleanup_resources/0 , make_resource_id/1 @@ -28,15 +29,13 @@ , render_sql_params/2 ]). --define(RESOURCE_GROUP, <<"emqx_authz">>). - %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ create_resource(Module, Config) -> ResourceID = make_resource_id(Module), - case emqx_resource:create_local(ResourceID, Module, Config) of + case emqx_resource:create_local(ResourceID, ?RESOURCE_GROUP, Module, Config) of {ok, already_created} -> {ok, ResourceID}; {ok, _} -> {ok, ResourceID}; {error, Reason} -> {error, Reason} @@ -49,7 +48,7 @@ cleanup_resources() -> make_resource_id(Name) -> NameBin = bin(Name), - emqx_resource:generate_id(?RESOURCE_GROUP, NameBin). + emqx_resource:generate_id(NameBin). update_config(Path, ConfigRequest) -> emqx_conf:update(Path, ConfigRequest, #{rawconf_with_defaults => true, diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index 5a9f6d97e..638da1416 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -31,7 +31,7 @@ groups() -> init_per_suite(Config) -> meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end), + meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, remove_local, fun(_) -> ok end), meck:expect(emqx_resource, create_dry_run_local, fun(_, _) -> ok end), diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index 03df47eac..6c4e093aa 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -96,7 +96,7 @@ groups() -> init_per_suite(Config) -> meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_resource, create_local, fun(_, _, _) -> {ok, meck_data} end), + meck:expect(emqx_resource, create_local, fun(_, _, _, _) -> {ok, meck_data} end), meck:expect(emqx_resource, create_dry_run_local, fun(emqx_connector_mysql, _) -> ok; (emqx_connector_mongo, _) -> ok; From 79badcb403e2b59d224cf8ebe50e95797e53de78 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Fri, 11 Feb 2022 18:29:13 +0800 Subject: [PATCH 5/8] fix(emqx_retainer): fix function 'create_resource' --- apps/emqx_retainer/src/emqx_retainer.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index edc51d174..69a598370 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -480,6 +480,7 @@ create_resource(Context, #{type := DB} = Config) -> ResourceID = erlang:iolist_to_binary([io_lib:format("~ts_~ts", [?APP, DB])]), case emqx_resource:create( ResourceID, + <<"default">>, list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), Config) of {ok, already_created} -> From 75b2963efcabcc74638edec04ca15367b6eba588 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Mon, 14 Feb 2022 14:10:10 +0800 Subject: [PATCH 6/8] test(authn,authz): fix create function --- apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl | 1 + apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl | 1 + apps/emqx_authn/test/emqx_authn_redis_SUITE.erl | 1 + apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl | 1 + apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl | 1 + apps/emqx_authz/test/emqx_authz_redis_SUITE.erl | 1 + 6 files changed, 6 insertions(+) diff --git a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl index 489a602c0..c2cddbece 100644 --- a/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_mysql_SUITE.erl @@ -59,6 +59,7 @@ init_per_suite(Config) -> ok = start_apps([emqx_resource, emqx_connector]), {ok, _} = emqx_resource:create_local( ?MYSQL_RESOURCE, + ?RESOURCE_GROUP, emqx_connector_mysql, mysql_config()), Config; diff --git a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl index 6a5e07939..c71ee3685 100644 --- a/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_pgsql_SUITE.erl @@ -60,6 +60,7 @@ init_per_suite(Config) -> ok = start_apps([emqx_resource, emqx_connector]), {ok, _} = emqx_resource:create_local( ?PGSQL_RESOURCE, + ?RESOURCE_GROUP, emqx_connector_pgsql, pgsql_config()), Config; diff --git a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl index 64805ecb7..9cd7d22cd 100644 --- a/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_redis_SUITE.erl @@ -59,6 +59,7 @@ init_per_suite(Config) -> ok = start_apps([emqx_resource, emqx_connector]), {ok, _} = emqx_resource:create_local( ?REDIS_RESOURCE, + ?RESOURCE_GROUP, emqx_connector_redis, redis_config()), Config; diff --git a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl index 0cccd748e..6232280f2 100644 --- a/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl @@ -43,6 +43,7 @@ init_per_suite(Config) -> ok = start_apps([emqx_resource, emqx_connector]), {ok, _} = emqx_resource:create_local( ?MYSQL_RESOURCE, + ?RESOURCE_GROUP, emqx_connector_mysql, mysql_config()), Config; diff --git a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl index fbc2cc922..0dc488d81 100644 --- a/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl @@ -43,6 +43,7 @@ init_per_suite(Config) -> ok = start_apps([emqx_resource, emqx_connector]), {ok, _} = emqx_resource:create_local( ?PGSQL_RESOURCE, + ?RESOURCE_GROUP, emqx_connector_pgsql, pgsql_config()), Config; diff --git a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl index 83699f51c..690a7a574 100644 --- a/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_redis_SUITE.erl @@ -44,6 +44,7 @@ init_per_suite(Config) -> ok = start_apps([emqx_resource, emqx_connector]), {ok, _} = emqx_resource:create_local( ?REDIS_RESOURCE, + ?RESOURCE_GROUP, emqx_connector_redis, redis_config()), Config; From 48942f9c93c730c34fc5ba75bbe71a8e7ed5e101 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Mon, 14 Feb 2022 17:39:32 +0800 Subject: [PATCH 7/8] refactor(emqx_resource): move unused macro to test --- apps/emqx_resource/include/emqx_resource.hrl | 1 - apps/emqx_resource/test/emqx_resource_SUITE.erl | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index b88b72d92..0ea92d993 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -43,4 +43,3 @@ -type after_query_fun() :: {fun((...) -> ok), Args :: [term()]}. -define(TEST_ID_PREFIX, "_test_:"). --define(DEFAULT_RESOURCE_GROUP, <<"default">>). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index e8275b2c0..eeaa6a501 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -24,6 +24,7 @@ -define(TEST_RESOURCE, emqx_test_resource). -define(ID, <<"id">>). +-define(DEFAULT_RESOURCE_GROUP, <<"default">>). all() -> emqx_common_test_helpers:all(?MODULE). From f70fc1a3b027277caf5189d68aba6df5faa70cd3 Mon Sep 17 00:00:00 2001 From: EMQ-YangM Date: Wed, 16 Feb 2022 14:02:13 +0800 Subject: [PATCH 8/8] fix(emqx_bridge, emqx_retainer): hard code group name --- apps/emqx_bridge/src/emqx_bridge.erl | 3 +-- apps/emqx_retainer/src/emqx_retainer.erl | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index beb585c0c..446d87a6e 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -222,8 +222,7 @@ create(BridgeId, Conf) -> create(Type, Name, Conf) -> ?SLOG(info, #{msg => "create bridge", type => Type, name => Name, config => Conf}), - Group = maps:get(group, Conf, <<"default">>), - case emqx_resource:create_local(resource_id(Type, Name), Group, emqx_bridge:resource_type(Type), + case emqx_resource:create_local(resource_id(Type, Name), <<"emqx_bridge">>, emqx_bridge:resource_type(Type), parse_confs(Type, Name, Conf), #{async_create => true}) of {ok, already_created} -> maybe_disable_bridge(Type, Name, Conf); {ok, _} -> maybe_disable_bridge(Type, Name, Conf); diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 69a598370..97333f022 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -480,7 +480,7 @@ create_resource(Context, #{type := DB} = Config) -> ResourceID = erlang:iolist_to_binary([io_lib:format("~ts_~ts", [?APP, DB])]), case emqx_resource:create( ResourceID, - <<"default">>, + <<"emqx_retainer">>, list_to_existing_atom(io_lib:format("~ts_~ts", [emqx_connector, DB])), Config) of {ok, already_created} ->