Merge pull request #8247 from terry-xiaoyu/mongo_auth_timeout

feat: add start_after_created option to resource:create/4
This commit is contained in:
Xinyu Liu 2022-06-17 07:51:52 +08:00 committed by GitHub
commit c47d28cdc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 411 additions and 270 deletions

View File

@ -31,7 +31,8 @@
deep_convert/3, deep_convert/3,
diff_maps/2, diff_maps/2,
merge_with/3, merge_with/3,
best_effort_recursive_sum/3 best_effort_recursive_sum/3,
if_only_to_toggle_enable/2
]). ]).
-export_type([config_key/0, config_key_path/0]). -export_type([config_key/0, config_key_path/0]).
@ -316,3 +317,17 @@ deep_filter(M, F) when is_map(M) ->
maps:to_list(M) maps:to_list(M)
) )
). ).
if_only_to_toggle_enable(OldConf, Conf) ->
#{added := Added, removed := Removed, changed := Updated} =
emqx_map_lib:diff_maps(OldConf, Conf),
case {Added, Removed, Updated} of
{Added, Removed, #{enable := _} = Updated} when
map_size(Added) =:= 0,
map_size(Removed) =:= 0,
map_size(Updated) =:= 1
->
true;
{_, _, _} ->
false
end.

View File

@ -59,16 +59,14 @@ create_resource(ResourceId, Module, Config) ->
). ).
update_resource(Module, Config, ResourceId) -> update_resource(Module, Config, ResourceId) ->
%% recreate before maybe stop Opts = #{start_after_created => false},
%% resource will auto start during recreate Result = emqx_resource:recreate_local(ResourceId, Module, Config, Opts),
Result = emqx_resource:recreate_local(ResourceId, Module, Config), _ =
case Config of case Config of
#{enable := true} -> #{enable := true} -> emqx_resource:start(ResourceId);
Result; #{enable := false} -> ok
#{enable := false} -> end,
ok = emqx_resource:stop(ResourceId), Result.
Result
end.
check_password_from_selected_map(_Algorithm, _Selected, undefined) -> check_password_from_selected_map(_Algorithm, _Selected, undefined) ->
{error, bad_username_or_password}; {error, bad_username_or_password};

View File

@ -111,7 +111,7 @@ update(Type, Name, {OldConf, Conf}) ->
%% the `method` or `headers` of a WebHook is changed, then the bridge can be updated %% the `method` or `headers` of a WebHook is changed, then the bridge can be updated
%% without restarting the bridge. %% without restarting the bridge.
%% %%
case if_only_to_toggle_enable(OldConf, Conf) of case emqx_map_lib:if_only_to_toggle_enable(OldConf, Conf) of
false -> false ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "update bridge", msg => "update bridge",
@ -198,20 +198,6 @@ maybe_disable_bridge(Type, Name, Conf) ->
true -> ok true -> ok
end. end.
if_only_to_toggle_enable(OldConf, Conf) ->
#{added := Added, removed := Removed, changed := Updated} =
emqx_map_lib:diff_maps(OldConf, Conf),
case {Added, Removed, Updated} of
{Added, Removed, #{enable := _} = Updated} when
map_size(Added) =:= 0,
map_size(Removed) =:= 0,
map_size(Updated) =:= 1
->
true;
{_, _, _} ->
false
end.
fill_dry_run_conf(Conf) -> fill_dry_run_conf(Conf) ->
Conf#{ Conf#{
<<"egress">> => <<"egress">> =>

View File

@ -38,7 +38,7 @@
-export([mongo_query/5, check_worker_health/1]). -export([mongo_query/5, check_worker_health/1]).
-define(HEALTH_CHECK_TIMEOUT, 10000). -define(HEALTH_CHECK_TIMEOUT, 30000).
%% mongo servers don't need parse %% mongo servers don't need parse
-define(MONGO_HOST_OPTIONS, #{ -define(MONGO_HOST_OPTIONS, #{

View File

@ -14,7 +14,8 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-type resource_type() :: module(). -type resource_type() :: module().
-type instance_id() :: binary(). -type resource_id() :: binary().
-type manager_id() :: binary().
-type raw_resource_config() :: binary() | raw_term_resource_config(). -type raw_resource_config() :: binary() | raw_term_resource_config().
-type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()]. -type raw_term_resource_config() :: #{binary() => term()} | [raw_term_resource_config()].
-type resource_config() :: term(). -type resource_config() :: term().
@ -22,7 +23,7 @@
-type resource_state() :: term(). -type resource_state() :: term().
-type resource_status() :: connected | disconnected | connecting. -type resource_status() :: connected | disconnected | connecting.
-type resource_data() :: #{ -type resource_data() :: #{
id := instance_id(), id := resource_id(),
mod := module(), mod := module(),
config := resource_config(), config := resource_config(),
state := resource_state(), state := resource_state(),
@ -33,7 +34,15 @@
-type create_opts() :: #{ -type create_opts() :: #{
health_check_interval => integer(), health_check_interval => integer(),
health_check_timeout => integer(), health_check_timeout => integer(),
%% We can choose to block the return of emqx_resource:start until
%% the resource connected, wait max to `wait_for_resource_ready` ms.
wait_for_resource_ready => integer(), wait_for_resource_ready => integer(),
%% If `start_after_created` is set to true, the resource is started right
%% after it is created. But note that a `started` resource is not guaranteed
%% to be `connected`.
start_after_created => boolean(),
%% If the resource disconnected, we can set to retry starting the resource
%% periodically.
auto_retry_interval => integer() auto_retry_interval => integer()
}. }.
-type after_query() :: -type after_query() ::

View File

@ -70,8 +70,9 @@
%% Calls to the callback module with current resource state %% Calls to the callback module with current resource state
%% They also save the state after the call finished (except query/2,3). %% They also save the state after the call finished (except query/2,3).
%% restart the instance.
-export([ -export([
start/1,
start/2,
restart/1, restart/1,
restart/2, restart/2,
%% verify if the resource is working normally %% verify if the resource is working normally
@ -116,17 +117,17 @@
]). ]).
%% when calling emqx_resource:start/1 %% when calling emqx_resource:start/1
-callback on_start(instance_id(), resource_config()) -> -callback on_start(resource_id(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}. {ok, resource_state()} | {error, Reason :: term()}.
%% when calling emqx_resource:stop/1 %% when calling emqx_resource:stop/1
-callback on_stop(instance_id(), resource_state()) -> term(). -callback on_stop(resource_id(), resource_state()) -> term().
%% when calling emqx_resource:query/3 %% when calling emqx_resource:query/3
-callback on_query(instance_id(), Request :: term(), after_query(), resource_state()) -> term(). -callback on_query(resource_id(), Request :: term(), after_query(), resource_state()) -> term().
%% when calling emqx_resource:health_check/2 %% when calling emqx_resource:health_check/2
-callback on_get_status(instance_id(), resource_state()) -> -callback on_get_status(resource_id(), resource_state()) ->
resource_status() resource_status()
| {resource_status(), resource_state()} | {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}. | {resource_status(), resource_state(), term()}.
@ -166,32 +167,32 @@ apply_query_after_calls(Funcs) ->
%% ================================================================================= %% =================================================================================
%% APIs for resource instances %% APIs for resource instances
%% ================================================================================= %% =================================================================================
-spec create(instance_id(), resource_group(), resource_type(), resource_config()) -> -spec create(resource_id(), resource_group(), resource_type(), resource_config()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}. {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, Group, ResourceType, Config) -> create(ResId, Group, ResourceType, Config) ->
create(InstId, Group, ResourceType, Config, #{}). create(ResId, Group, ResourceType, Config, #{}).
-spec create(instance_id(), resource_group(), resource_type(), resource_config(), create_opts()) -> -spec create(resource_id(), resource_group(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}. {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, Group, ResourceType, Config, Opts) -> create(ResId, Group, ResourceType, Config, Opts) ->
emqx_resource_proto_v1:create(InstId, Group, ResourceType, Config, Opts). emqx_resource_proto_v1:create(ResId, Group, ResourceType, Config, Opts).
% -------------------------------------------- % --------------------------------------------
-spec create_local(instance_id(), resource_group(), resource_type(), resource_config()) -> -spec create_local(resource_id(), resource_group(), resource_type(), resource_config()) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}. {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create_local(InstId, Group, ResourceType, Config) -> create_local(ResId, Group, ResourceType, Config) ->
create_local(InstId, Group, ResourceType, Config, #{}). create_local(ResId, Group, ResourceType, Config, #{}).
-spec create_local( -spec create_local(
instance_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_type(),
resource_config(), resource_config(),
create_opts() create_opts()
) -> ) ->
{ok, resource_data()}. {ok, resource_data()}.
create_local(InstId, Group, ResourceType, Config, Opts) -> create_local(ResId, Group, ResourceType, Config, Opts) ->
emqx_resource_manager:ensure_resource(InstId, Group, ResourceType, Config, Opts). emqx_resource_manager:ensure_resource(ResId, Group, ResourceType, Config, Opts).
-spec create_dry_run(resource_type(), resource_config()) -> -spec create_dry_run(resource_type(), resource_config()) ->
ok | {error, Reason :: term()}. ok | {error, Reason :: term()}.
@ -203,94 +204,102 @@ create_dry_run(ResourceType, Config) ->
create_dry_run_local(ResourceType, Config) -> create_dry_run_local(ResourceType, Config) ->
emqx_resource_manager:create_dry_run(ResourceType, Config). emqx_resource_manager:create_dry_run(ResourceType, Config).
-spec recreate(instance_id(), resource_type(), resource_config()) -> -spec recreate(resource_id(), resource_type(), resource_config()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate(InstId, ResourceType, Config) -> recreate(ResId, ResourceType, Config) ->
recreate(InstId, ResourceType, Config, #{}). recreate(ResId, ResourceType, Config, #{}).
-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) -> -spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate(InstId, ResourceType, Config, Opts) -> recreate(ResId, ResourceType, Config, Opts) ->
emqx_resource_proto_v1:recreate(InstId, ResourceType, Config, Opts). emqx_resource_proto_v1:recreate(ResId, ResourceType, Config, Opts).
-spec recreate_local(instance_id(), resource_type(), resource_config()) -> -spec recreate_local(resource_id(), resource_type(), resource_config()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate_local(InstId, ResourceType, Config) -> recreate_local(ResId, ResourceType, Config) ->
recreate_local(InstId, ResourceType, Config, #{}). recreate_local(ResId, ResourceType, Config, #{}).
-spec recreate_local(instance_id(), resource_type(), resource_config(), create_opts()) -> -spec recreate_local(resource_id(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate_local(InstId, ResourceType, Config, Opts) -> recreate_local(ResId, ResourceType, Config, Opts) ->
emqx_resource_manager:recreate(InstId, ResourceType, Config, Opts). emqx_resource_manager:recreate(ResId, ResourceType, Config, Opts).
-spec remove(instance_id()) -> ok | {error, Reason :: term()}. -spec remove(resource_id()) -> ok | {error, Reason :: term()}.
remove(InstId) -> remove(ResId) ->
emqx_resource_proto_v1:remove(InstId). emqx_resource_proto_v1:remove(ResId).
-spec remove_local(instance_id()) -> ok | {error, Reason :: term()}. -spec remove_local(resource_id()) -> ok | {error, Reason :: term()}.
remove_local(InstId) -> remove_local(ResId) ->
emqx_resource_manager:remove(InstId). emqx_resource_manager:remove(ResId).
-spec reset_metrics_local(instance_id()) -> ok. -spec reset_metrics_local(resource_id()) -> ok.
reset_metrics_local(InstId) -> reset_metrics_local(ResId) ->
emqx_resource_manager:reset_metrics(InstId). emqx_resource_manager:reset_metrics(ResId).
-spec reset_metrics(instance_id()) -> ok | {error, Reason :: term()}. -spec reset_metrics(resource_id()) -> ok | {error, Reason :: term()}.
reset_metrics(InstId) -> reset_metrics(ResId) ->
emqx_resource_proto_v1:reset_metrics(InstId). emqx_resource_proto_v1:reset_metrics(ResId).
%% ================================================================================= %% =================================================================================
-spec query(instance_id(), Request :: term()) -> Result :: term(). -spec query(resource_id(), Request :: term()) -> Result :: term().
query(InstId, Request) -> query(ResId, Request) ->
query(InstId, Request, inc_metrics_funcs(InstId)). query(ResId, Request, inc_metrics_funcs(ResId)).
%% same to above, also defines what to do when the Module:on_query success or failed %% same to above, also defines what to do when the Module:on_query success or failed
%% it is the duty of the Module to apply the `after_query()` functions. %% it is the duty of the Module to apply the `after_query()` functions.
-spec query(instance_id(), Request :: term(), after_query()) -> Result :: term(). -spec query(resource_id(), Request :: term(), after_query()) -> Result :: term().
query(InstId, Request, AfterQuery) -> query(ResId, Request, AfterQuery) ->
case emqx_resource_manager:ets_lookup(InstId) of case emqx_resource_manager:ets_lookup(ResId) of
{ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} ->
%% the resource state is readonly to Module:on_query/4 %% the resource state is readonly to Module:on_query/4
%% and the `after_query()` functions should be thread safe %% and the `after_query()` functions should be thread safe
ok = emqx_metrics_worker:inc(resource_metrics, InstId, matched), ok = emqx_metrics_worker:inc(resource_metrics, ResId, matched),
try try
Mod:on_query(InstId, Request, AfterQuery, ResourceState) Mod:on_query(ResId, Request, AfterQuery, ResourceState)
catch catch
Err:Reason:ST -> Err:Reason:ST ->
emqx_metrics_worker:inc(resource_metrics, InstId, exception), emqx_metrics_worker:inc(resource_metrics, ResId, exception),
erlang:raise(Err, Reason, ST) erlang:raise(Err, Reason, ST)
end; end;
{ok, _Group, _Data} -> {ok, _Group, _Data} ->
query_error(not_found, <<"resource not connected">>); query_error(not_connected, <<"resource not connected">>);
{error, not_found} -> {error, not_found} ->
query_error(not_found, <<"resource not found">>) query_error(not_found, <<"resource not found">>)
end. end.
-spec restart(instance_id()) -> ok | {error, Reason :: term()}. -spec start(resource_id()) -> ok | {error, Reason :: term()}.
restart(InstId) -> start(ResId) ->
restart(InstId, #{}). start(ResId, #{}).
-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. -spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
restart(InstId, Opts) -> start(ResId, Opts) ->
emqx_resource_manager:restart(InstId, Opts). emqx_resource_manager:start(ResId, Opts).
-spec stop(instance_id()) -> ok | {error, Reason :: term()}. -spec restart(resource_id()) -> ok | {error, Reason :: term()}.
stop(InstId) -> restart(ResId) ->
emqx_resource_manager:stop(InstId). restart(ResId, #{}).
-spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}. -spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
health_check(InstId) -> restart(ResId, Opts) ->
emqx_resource_manager:health_check(InstId). emqx_resource_manager:restart(ResId, Opts).
set_resource_status_connecting(InstId) -> -spec stop(resource_id()) -> ok | {error, Reason :: term()}.
emqx_resource_manager:set_resource_status_connecting(InstId). stop(ResId) ->
emqx_resource_manager:stop(ResId).
-spec get_instance(instance_id()) -> -spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}.
health_check(ResId) ->
emqx_resource_manager:health_check(ResId).
set_resource_status_connecting(ResId) ->
emqx_resource_manager:set_resource_status_connecting(ResId).
-spec get_instance(resource_id()) ->
{ok, resource_group(), resource_data()} | {error, Reason :: term()}. {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
get_instance(InstId) -> get_instance(ResId) ->
emqx_resource_manager:lookup(InstId). emqx_resource_manager:lookup(ResId).
-spec list_instances() -> [instance_id()]. -spec list_instances() -> [resource_id()].
list_instances() -> list_instances() ->
[Id || #{id := Id} <- list_instances_verbose()]. [Id || #{id := Id} <- list_instances_verbose()].
@ -298,36 +307,37 @@ list_instances() ->
list_instances_verbose() -> list_instances_verbose() ->
emqx_resource_manager:list_all(). emqx_resource_manager:list_all().
-spec list_instances_by_type(module()) -> [instance_id()]. -spec list_instances_by_type(module()) -> [resource_id()].
list_instances_by_type(ResourceType) -> list_instances_by_type(ResourceType) ->
filter_instances(fun filter_instances(fun
(_, RT) when RT =:= ResourceType -> true; (_, RT) when RT =:= ResourceType -> true;
(_, _) -> false (_, _) -> false
end). end).
-spec generate_id(term()) -> instance_id(). -spec generate_id(term()) -> resource_id().
generate_id(Name) when is_binary(Name) -> generate_id(Name) when is_binary(Name) ->
Id = integer_to_binary(erlang:unique_integer([positive])), Id = integer_to_binary(erlang:unique_integer([monotonic, positive])),
<<Name/binary, ":", Id/binary>>. <<Name/binary, ":", Id/binary>>.
-spec list_group_instances(resource_group()) -> [instance_id()]. -spec list_group_instances(resource_group()) -> [resource_id()].
list_group_instances(Group) -> emqx_resource_manager:list_group(Group). list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
-spec call_start(instance_id(), module(), resource_config()) -> -spec call_start(manager_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}. {ok, resource_state()} | {error, Reason :: term()}.
call_start(InstId, Mod, Config) -> call_start(MgrId, Mod, Config) ->
?SAFE_CALL(Mod:on_start(InstId, Config)). ?SAFE_CALL(Mod:on_start(MgrId, Config)).
-spec call_health_check(instance_id(), module(), resource_state()) -> -spec call_health_check(manager_id(), module(), resource_state()) ->
resource_status() resource_status()
| {resource_status(), resource_state()} | {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}. | {resource_status(), resource_state(), term()}
call_health_check(InstId, Mod, ResourceState) -> | {error, term()}.
?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)). call_health_check(MgrId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_status(MgrId, ResourceState)).
-spec call_stop(instance_id(), module(), resource_state()) -> term(). -spec call_stop(manager_id(), module(), resource_state()) -> term().
call_stop(InstId, Mod, ResourceState) -> call_stop(MgrId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_stop(InstId, ResourceState)). ?SAFE_CALL(Mod:on_stop(MgrId, ResourceState)).
-spec check_config(resource_type(), raw_resource_config()) -> -spec check_config(resource_type(), raw_resource_config()) ->
{ok, resource_config()} | {error, term()}. {ok, resource_config()} | {error, term()}.
@ -335,85 +345,85 @@ check_config(ResourceType, Conf) ->
emqx_hocon:check(ResourceType, Conf). emqx_hocon:check(ResourceType, Conf).
-spec check_and_create( -spec check_and_create(
instance_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_type(),
raw_resource_config() raw_resource_config()
) -> ) ->
{ok, resource_data() | 'already_created'} | {error, term()}. {ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(InstId, Group, ResourceType, RawConfig) -> check_and_create(ResId, Group, ResourceType, RawConfig) ->
check_and_create(InstId, Group, ResourceType, RawConfig, #{}). check_and_create(ResId, Group, ResourceType, RawConfig, #{}).
-spec check_and_create( -spec check_and_create(
instance_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_type(),
raw_resource_config(), raw_resource_config(),
create_opts() create_opts()
) -> ) ->
{ok, resource_data() | 'already_created'} | {error, term()}. {ok, resource_data() | 'already_created'} | {error, term()}.
check_and_create(InstId, Group, ResourceType, RawConfig, Opts) -> check_and_create(ResId, Group, ResourceType, RawConfig, Opts) ->
check_and_do( check_and_do(
ResourceType, ResourceType,
RawConfig, RawConfig,
fun(InstConf) -> create(InstId, Group, ResourceType, InstConf, Opts) end fun(ResConf) -> create(ResId, Group, ResourceType, ResConf, Opts) end
). ).
-spec check_and_create_local( -spec check_and_create_local(
instance_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_type(),
raw_resource_config() raw_resource_config()
) -> ) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, Group, ResourceType, RawConfig) -> check_and_create_local(ResId, Group, ResourceType, RawConfig) ->
check_and_create_local(InstId, Group, ResourceType, RawConfig, #{}). check_and_create_local(ResId, Group, ResourceType, RawConfig, #{}).
-spec check_and_create_local( -spec check_and_create_local(
instance_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_type(),
raw_resource_config(), raw_resource_config(),
create_opts() create_opts()
) -> {ok, resource_data()} | {error, term()}. ) -> {ok, resource_data()} | {error, term()}.
check_and_create_local(InstId, Group, ResourceType, RawConfig, Opts) -> check_and_create_local(ResId, Group, ResourceType, RawConfig, Opts) ->
check_and_do( check_and_do(
ResourceType, ResourceType,
RawConfig, RawConfig,
fun(InstConf) -> create_local(InstId, Group, ResourceType, InstConf, Opts) end fun(ResConf) -> create_local(ResId, Group, ResourceType, ResConf, Opts) end
). ).
-spec check_and_recreate( -spec check_and_recreate(
instance_id(), resource_id(),
resource_type(), resource_type(),
raw_resource_config(), raw_resource_config(),
create_opts() create_opts()
) -> ) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_recreate(InstId, ResourceType, RawConfig, Opts) -> check_and_recreate(ResId, ResourceType, RawConfig, Opts) ->
check_and_do( check_and_do(
ResourceType, ResourceType,
RawConfig, RawConfig,
fun(InstConf) -> recreate(InstId, ResourceType, InstConf, Opts) end fun(ResConf) -> recreate(ResId, ResourceType, ResConf, Opts) end
). ).
-spec check_and_recreate_local( -spec check_and_recreate_local(
instance_id(), resource_id(),
resource_type(), resource_type(),
raw_resource_config(), raw_resource_config(),
create_opts() create_opts()
) -> ) ->
{ok, resource_data()} | {error, term()}. {ok, resource_data()} | {error, term()}.
check_and_recreate_local(InstId, ResourceType, RawConfig, Opts) -> check_and_recreate_local(ResId, ResourceType, RawConfig, Opts) ->
check_and_do( check_and_do(
ResourceType, ResourceType,
RawConfig, RawConfig,
fun(InstConf) -> recreate_local(InstId, ResourceType, InstConf, Opts) end fun(ResConf) -> recreate_local(ResId, ResourceType, ResConf, Opts) end
). ).
check_and_do(ResourceType, RawConfig, Do) when is_function(Do) -> check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
case check_config(ResourceType, RawConfig) of case check_config(ResourceType, RawConfig) of
{ok, InstConf} -> Do(InstConf); {ok, ResConf} -> Do(ResConf);
Error -> Error Error -> Error
end. end.
@ -422,9 +432,9 @@ check_and_do(ResourceType, RawConfig, Do) when is_function(Do) ->
filter_instances(Filter) -> filter_instances(Filter) ->
[Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)]. [Id || #{id := Id, mod := Mod} <- list_instances_verbose(), Filter(Id, Mod)].
inc_metrics_funcs(InstId) -> inc_metrics_funcs(ResId) ->
OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, failed]}], OnFailed = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, failed]}],
OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, InstId, success]}], OnSucc = [{fun emqx_metrics_worker:inc/3, [resource_metrics, ResId, success]}],
{OnSucc, OnFailed}. {OnSucc, OnFailed}.
safe_apply(Func, Args) -> safe_apply(Func, Args) ->

View File

@ -23,7 +23,6 @@
% API % API
-export([ -export([
ensure_resource/5, ensure_resource/5,
create/5,
recreate/4, recreate/4,
remove/1, remove/1,
create_dry_run/2, create_dry_run/2,
@ -44,13 +43,13 @@
]). ]).
% Server % Server
-export([start_link/5]). -export([start_link/6]).
% Behaviour % Behaviour
-export([init/1, callback_mode/0, handle_event/4, terminate/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
% State record % State record
-record(data, {id, group, mod, config, opts, status, state, error}). -record(data, {id, manager_id, group, mod, config, opts, status, state, error}).
-define(SHORT_HEALTHCHECK_INTERVAL, 1000). -define(SHORT_HEALTHCHECK_INTERVAL, 1000).
-define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL, 15000).
@ -70,33 +69,55 @@
%% Triggers the emqx_resource_manager_sup supervisor to actually create %% Triggers the emqx_resource_manager_sup supervisor to actually create
%% and link the process itself if not already started. %% and link the process itself if not already started.
-spec ensure_resource( -spec ensure_resource(
instance_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_type(),
resource_config(), resource_config(),
create_opts() create_opts()
) -> {ok, resource_data()}. ) -> {ok, resource_data()}.
ensure_resource(InstId, Group, ResourceType, Config, Opts) -> ensure_resource(ResId, Group, ResourceType, Config, Opts) ->
case lookup(InstId) of case lookup(ResId) of
{ok, _Group, Data} -> {ok, _Group, Data} ->
{ok, Data}; {ok, Data};
{error, not_found} -> {error, not_found} ->
create(InstId, Group, ResourceType, Config, Opts), MgrId = set_new_owner(ResId),
{ok, _Group, Data} = lookup(InstId), create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts)
{ok, Data}
end. end.
%% @doc Called from emqx_resource when recreating a resource which may or may not exist
-spec recreate(resource_id(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}.
recreate(ResId, ResourceType, NewConfig, Opts) ->
case lookup(ResId) of
{ok, Group, #{mod := ResourceType, status := _} = _Data} ->
_ = remove(ResId, false),
MgrId = set_new_owner(ResId),
create_and_return_data(MgrId, ResId, Group, ResourceType, NewConfig, Opts);
{ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
{error, updating_to_incorrect_resource_type};
{error, not_found} ->
{error, not_found}
end.
create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) ->
create(MgrId, ResId, Group, ResourceType, Config, Opts),
{ok, _Group, Data} = lookup(ResId),
{ok, Data}.
%% @doc Create a resource_manager and wait until it is running %% @doc Create a resource_manager and wait until it is running
create(InstId, Group, ResourceType, Config, Opts) -> create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
% The state machine will make the actual call to the callback/resource module after init % The state machine will make the actual call to the callback/resource module after init
ok = emqx_resource_manager_sup:ensure_child(InstId, Group, ResourceType, Config, Opts), ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts),
ok = emqx_metrics_worker:create_metrics( ok = emqx_metrics_worker:create_metrics(
resource_metrics, resource_metrics,
InstId, ResId,
[matched, success, failed, exception], [matched, success, failed, exception],
[matched] [matched]
), ),
wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), case maps:get(start_after_created, Opts, true) of
true -> wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000));
false -> ok
end,
ok. ok.
%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
@ -106,66 +127,55 @@ create(InstId, Group, ResourceType, Config, Opts) ->
-spec create_dry_run(resource_type(), resource_config()) -> -spec create_dry_run(resource_type(), resource_config()) ->
ok | {error, Reason :: term()}. ok | {error, Reason :: term()}.
create_dry_run(ResourceType, Config) -> create_dry_run(ResourceType, Config) ->
InstId = make_test_id(), ResId = make_test_id(),
ok = emqx_resource_manager_sup:ensure_child(InstId, <<"dry_run">>, ResourceType, Config, #{}), MgrId = set_new_owner(ResId),
case wait_for_resource_ready(InstId, 5000) of ok = emqx_resource_manager_sup:ensure_child(
MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{}
),
case wait_for_resource_ready(ResId, 15000) of
ok -> ok ->
_ = remove(InstId); remove(ResId);
timeout -> timeout ->
_ = remove(InstId), _ = remove(ResId),
{error, timeout} {error, timeout}
end. end.
%% @doc Called from emqx_resource when recreating a resource which may or may not exist
-spec recreate(instance_id(), resource_type(), resource_config(), create_opts()) ->
{ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}.
recreate(InstId, ResourceType, NewConfig, Opts) ->
case lookup(InstId) of
{ok, Group, #{mod := ResourceType, status := _} = _Data} ->
_ = remove(InstId, false),
ensure_resource(InstId, Group, ResourceType, NewConfig, Opts);
{ok, _, #{mod := Mod}} when Mod =/= ResourceType ->
{error, updating_to_incorrect_resource_type};
{error, not_found} ->
{error, not_found}
end.
%% @doc Stops a running resource_manager and clears the metrics for the resource %% @doc Stops a running resource_manager and clears the metrics for the resource
-spec remove(instance_id()) -> ok | {error, Reason :: term()}. -spec remove(resource_id()) -> ok | {error, Reason :: term()}.
remove(InstId) when is_binary(InstId) -> remove(ResId) when is_binary(ResId) ->
remove(InstId, true). remove(ResId, true).
%% @doc Stops a running resource_manager and optionally clears the metrics for the resource %% @doc Stops a running resource_manager and optionally clears the metrics for the resource
-spec remove(instance_id(), boolean()) -> ok | {error, Reason :: term()}. -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}.
remove(InstId, ClearMetrics) when is_binary(InstId) -> remove(ResId, ClearMetrics) when is_binary(ResId) ->
safe_call(InstId, {remove, ClearMetrics}, ?T_OPERATION). safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION).
%% @doc Stops and then starts an instance that was already running %% @doc Stops and then starts an instance that was already running
-spec restart(instance_id(), create_opts()) -> ok | {error, Reason :: term()}. -spec restart(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
restart(InstId, Opts) when is_binary(InstId) -> restart(ResId, Opts) when is_binary(ResId) ->
case safe_call(InstId, restart, ?T_OPERATION) of case safe_call(ResId, restart, ?T_OPERATION) of
ok -> ok ->
wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)), wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)),
ok;
{error, _Reason} = Error ->
Error
end.
%% @doc Stop the resource
-spec start(instance_id(), create_opts()) -> ok | {error, Reason :: term()}.
start(InstId, Opts) ->
case safe_call(InstId, start, ?T_OPERATION) of
ok ->
wait_for_resource_ready(InstId, maps:get(wait_for_resource_ready, Opts, 5000)),
ok; ok;
{error, _Reason} = Error -> {error, _Reason} = Error ->
Error Error
end. end.
%% @doc Start the resource %% @doc Start the resource
-spec stop(instance_id()) -> ok | {error, Reason :: term()}. -spec start(resource_id(), create_opts()) -> ok | {error, Reason :: term()}.
stop(InstId) -> start(ResId, Opts) ->
case safe_call(InstId, stop, ?T_OPERATION) of case safe_call(ResId, start, ?T_OPERATION) of
ok ->
wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)),
ok;
{error, _Reason} = Error ->
Error
end.
%% @doc Stop the resource
-spec stop(resource_id()) -> ok | {error, Reason :: term()}.
stop(ResId) ->
case safe_call(ResId, stop, ?T_OPERATION) of
ok -> ok ->
ok; ok;
{error, _Reason} = Error -> {error, _Reason} = Error ->
@ -173,36 +183,36 @@ stop(InstId) ->
end. end.
%% @doc Test helper %% @doc Test helper
-spec set_resource_status_connecting(instance_id()) -> ok. -spec set_resource_status_connecting(resource_id()) -> ok.
set_resource_status_connecting(InstId) -> set_resource_status_connecting(ResId) ->
safe_call(InstId, set_resource_status_connecting, infinity). safe_call(ResId, set_resource_status_connecting, infinity).
%% @doc Lookup the group and data of a resource %% @doc Lookup the group and data of a resource
-spec lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
lookup(InstId) -> lookup(ResId) ->
case safe_call(InstId, lookup, ?T_LOOKUP) of case safe_call(ResId, lookup, ?T_LOOKUP) of
{error, timeout} -> ets_lookup(InstId); {error, timeout} -> ets_lookup(ResId);
Result -> Result Result -> Result
end. end.
%% @doc Lookup the group and data of a resource %% @doc Lookup the group and data of a resource
-spec ets_lookup(instance_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -spec ets_lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
ets_lookup(InstId) -> ets_lookup(ResId) ->
case ets:lookup(?ETS_TABLE, InstId) of case read_cache(ResId) of
[{_Id, Group, Data}] -> {Group, Data} ->
{ok, Group, data_record_to_external_map_with_metrics(Data)}; {ok, Group, data_record_to_external_map_with_metrics(Data)};
[] -> not_found ->
{error, not_found} {error, not_found}
end. end.
%% @doc Get the metrics for the specified resource %% @doc Get the metrics for the specified resource
get_metrics(InstId) -> get_metrics(ResId) ->
emqx_metrics_worker:get_metrics(resource_metrics, InstId). emqx_metrics_worker:get_metrics(resource_metrics, ResId).
%% @doc Reset the metrics for the specified resource %% @doc Reset the metrics for the specified resource
-spec reset_metrics(instance_id()) -> ok. -spec reset_metrics(resource_id()) -> ok.
reset_metrics(InstId) -> reset_metrics(ResId) ->
emqx_metrics_worker:reset_metrics(resource_metrics, InstId). emqx_metrics_worker:reset_metrics(resource_metrics, ResId).
%% @doc Returns the data for all resorces %% @doc Returns the data for all resorces
-spec list_all() -> [resource_data()] | []. -spec list_all() -> [resource_data()] | [].
@ -217,21 +227,22 @@ list_all() ->
end. end.
%% @doc Returns a list of ids for all the resources in a group %% @doc Returns a list of ids for all the resources in a group
-spec list_group(resource_group()) -> [instance_id()]. -spec list_group(resource_group()) -> [resource_id()].
list_group(Group) -> list_group(Group) ->
List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), List = ets:match(?ETS_TABLE, {'$1', Group, '_'}),
lists:flatten(List). lists:flatten(List).
-spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}. -spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}.
health_check(InstId) -> health_check(ResId) ->
safe_call(InstId, health_check, ?T_OPERATION). safe_call(ResId, health_check, ?T_OPERATION).
%% Server start/stop callbacks %% Server start/stop callbacks
%% @doc Function called from the supervisor to actually start the server %% @doc Function called from the supervisor to actually start the server
start_link(InstId, Group, ResourceType, Config, Opts) -> start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
Data = #data{ Data = #data{
id = InstId, id = ResId,
manager_id = MgrId,
group = Group, group = Group,
mod = ResourceType, mod = ResourceType,
config = Config, config = Config,
@ -240,17 +251,22 @@ start_link(InstId, Group, ResourceType, Config, Opts) ->
state = undefined, state = undefined,
error = undefined error = undefined
}, },
gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []). Module = atom_to_binary(?MODULE),
ProcName = binary_to_atom(<<Module/binary, "_", MgrId/binary>>, utf8),
gen_statem:start_link({local, ProcName}, ?MODULE, {Data, Opts}, []).
init(Data) -> init({Data, Opts}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
%% init the cache so that lookup/1 will always return something %% init the cache so that lookup/1 will always return something
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), insert_cache(Data#data.id, Data#data.group, Data),
{ok, connecting, Data, {next_event, internal, try_connect}}. case maps:get(start_after_created, Opts, true) of
true -> {ok, connecting, Data, {next_event, internal, start_resource}};
false -> {ok, stopped, Data}
end.
terminate(_Reason, _State, Data) -> terminate(_Reason, _State, Data) ->
_ = maybe_clear_alarm(Data#data.id), _ = maybe_clear_alarm(Data#data.id),
ets:delete(?ETS_TABLE, Data#data.id), delete_cache(Data#data.id, Data#data.manager_id),
ok. ok.
%% Behavior callback %% Behavior callback
@ -271,6 +287,12 @@ handle_event({call, From}, start, stopped, Data) ->
start_resource(Data, From); start_resource(Data, From);
handle_event({call, From}, start, _State, _Data) -> handle_event({call, From}, start, _State, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
% Called when the resource received a `quit` message
handle_event(info, quit, stopped, _Data) ->
{stop, {shutdown, quit}};
handle_event(info, quit, _State, Data) ->
_ = stop_resource(Data),
{stop, {shutdown, quit}};
% Called when the resource is to be stopped % Called when the resource is to be stopped
handle_event({call, From}, stop, stopped, _Data) -> handle_event({call, From}, stop, stopped, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
@ -293,10 +315,10 @@ handle_event({call, From}, health_check, _State, Data) ->
handle_manually_health_check(From, Data); handle_manually_health_check(From, Data);
% State: CONNECTING % State: CONNECTING
handle_event(enter, _OldState, connecting, Data) -> handle_event(enter, _OldState, connecting, Data) ->
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), insert_cache(Data#data.id, Data#data.group, Data),
Actions = [{state_timeout, 0, health_check}], Actions = [{state_timeout, 0, health_check}],
{keep_state_and_data, Actions}; {keep_state_and_data, Actions};
handle_event(internal, try_connect, connecting, Data) -> handle_event(internal, start_resource, connecting, Data) ->
start_resource(Data, undefined); start_resource(Data, undefined);
handle_event(state_timeout, health_check, connecting, Data) -> handle_event(state_timeout, health_check, connecting, Data) ->
handle_connecting_health_check(Data); handle_connecting_health_check(Data);
@ -304,7 +326,7 @@ handle_event(state_timeout, health_check, connecting, Data) ->
%% The connected state is entered after a successful on_start/2 of the callback mod %% The connected state is entered after a successful on_start/2 of the callback mod
%% and successful health_checks %% and successful health_checks
handle_event(enter, _OldState, connected, Data) -> handle_event(enter, _OldState, connected, Data) ->
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), insert_cache(Data#data.id, Data#data.group, Data),
_ = emqx_alarm:deactivate(Data#data.id), _ = emqx_alarm:deactivate(Data#data.id),
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
{next_state, connected, Data, Actions}; {next_state, connected, Data, Actions};
@ -312,7 +334,7 @@ handle_event(state_timeout, health_check, connected, Data) ->
handle_connected_health_check(Data); handle_connected_health_check(Data);
%% State: DISCONNECTED %% State: DISCONNECTED
handle_event(enter, _OldState, disconnected, Data) -> handle_event(enter, _OldState, disconnected, Data) ->
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), insert_cache(Data#data.id, Data#data.group, Data),
handle_disconnected_state_enter(Data); handle_disconnected_state_enter(Data);
handle_event(state_timeout, auto_retry, disconnected, Data) -> handle_event(state_timeout, auto_retry, disconnected, Data) ->
start_resource(Data, undefined); start_resource(Data, undefined);
@ -320,14 +342,14 @@ handle_event(state_timeout, auto_retry, disconnected, Data) ->
%% The stopped state is entered after the resource has been explicitly stopped %% The stopped state is entered after the resource has been explicitly stopped
handle_event(enter, _OldState, stopped, Data) -> handle_event(enter, _OldState, stopped, Data) ->
UpdatedData = Data#data{status = disconnected}, UpdatedData = Data#data{status = disconnected},
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}), insert_cache(Data#data.id, Data#data.group, UpdatedData),
{next_state, stopped, UpdatedData}; {next_state, stopped, UpdatedData};
% Ignore all other events % Ignore all other events
handle_event(EventType, EventData, State, Data) -> handle_event(EventType, EventData, State, Data) ->
?SLOG( ?SLOG(
error, error,
#{ #{
msg => "ignore all other events", msg => ignore_all_other_events,
event_type => EventType, event_type => EventType,
event_data => EventData, event_data => EventData,
state => State, state => State,
@ -339,6 +361,47 @@ handle_event(EventType, EventData, State, Data) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% internal functions %% internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) ->
case get_owner(ResId) of
not_found -> ets:insert(?ETS_TABLE, {ResId, Group, Data});
MgrId -> ets:insert(?ETS_TABLE, {ResId, Group, Data});
_ -> self() ! quit
end.
read_cache(ResId) ->
case ets:lookup(?ETS_TABLE, ResId) of
[{_Id, Group, Data}] -> {Group, Data};
[] -> not_found
end.
delete_cache(ResId, MgrId) ->
case get_owner(ResId) of
MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId ->
do_delete_cache(ResId);
_ ->
ok
end.
do_delete_cache(<<?TEST_ID_PREFIX, _/binary>> = ResId) ->
ets:delete(?ETS_TABLE, {owner, ResId}),
ets:delete(?ETS_TABLE, ResId);
do_delete_cache(ResId) ->
ets:delete(?ETS_TABLE, ResId).
set_new_owner(ResId) ->
MgrId = make_manager_id(ResId),
ok = set_owner(ResId, MgrId),
MgrId.
set_owner(ResId, MgrId) ->
ets:insert(?ETS_TABLE, {{owner, ResId}, MgrId}),
ok.
get_owner(ResId) ->
case ets:lookup(?ETS_TABLE, {owner, ResId}) of
[{_, MgrId}] -> MgrId;
[] -> not_found
end.
handle_disconnected_state_enter(Data) -> handle_disconnected_state_enter(Data) ->
case maps:get(auto_retry_interval, Data#data.opts, undefined) of case maps:get(auto_retry_interval, Data#data.opts, undefined) of
@ -359,8 +422,8 @@ handle_remove_event(From, ClearMetrics, Data) ->
start_resource(Data, From) -> start_resource(Data, From) ->
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), insert_cache(Data#data.id, Data#data.group, Data),
case emqx_resource:call_start(Data#data.id, Data#data.mod, Data#data.config) of case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of
{ok, ResourceState} -> {ok, ResourceState} ->
UpdatedData = Data#data{state = ResourceState, status = connecting}, UpdatedData = Data#data{state = ResourceState, status = connecting},
%% Perform an initial health_check immediately before transitioning into a connected state %% Perform an initial health_check immediately before transitioning into a connected state
@ -382,14 +445,16 @@ stop_resource(Data) ->
%% We don't care the return value of the Mod:on_stop/2. %% We don't care the return value of the Mod:on_stop/2.
%% The callback mod should make sure the resource is stopped after on_stop/2 %% The callback mod should make sure the resource is stopped after on_stop/2
%% is returned. %% is returned.
_ = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state), _ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state),
_ = maybe_clear_alarm(Data#data.id), _ = maybe_clear_alarm(Data#data.id),
ok. ok.
proc_name(Id) -> make_manager_id(ResId) ->
Module = atom_to_binary(?MODULE), emqx_resource:generate_id(ResId).
Connector = <<"_">>,
binary_to_atom(<<Module/binary, Connector/binary, Id/binary>>). make_test_id() ->
RandId = iolist_to_binary(emqx_misc:gen_id(16)),
<<?TEST_ID_PREFIX, RandId/binary>>.
handle_manually_health_check(From, Data) -> handle_manually_health_check(From, Data) ->
with_health_check(Data, fun(Status, UpdatedData) -> with_health_check(Data, fun(Status, UpdatedData) ->
@ -426,13 +491,13 @@ handle_connected_health_check(Data) ->
with_health_check(Data, Func) -> with_health_check(Data, Func) ->
ResId = Data#data.id, ResId = Data#data.id,
HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state), HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state),
{Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data),
_ = maybe_alarm(Status, ResId), _ = maybe_alarm(Status, ResId),
UpdatedData = Data#data{ UpdatedData = Data#data{
state = NewState, status = Status, error = Err state = NewState, status = Status, error = Err
}, },
ets:insert(?ETS_TABLE, {ResId, UpdatedData#data.group, UpdatedData}), insert_cache(ResId, UpdatedData#data.group, UpdatedData),
Func(Status, UpdatedData). Func(Status, UpdatedData).
maybe_alarm(connected, _ResId) -> maybe_alarm(connected, _ResId) ->
@ -451,12 +516,22 @@ maybe_clear_alarm(<<?TEST_ID_PREFIX, _/binary>>) ->
maybe_clear_alarm(ResId) -> maybe_clear_alarm(ResId) ->
emqx_alarm:deactivate(ResId). emqx_alarm:deactivate(ResId).
parse_health_check_result(Status, OldState) when ?IS_STATUS(Status) -> parse_health_check_result(Status, Data) when ?IS_STATUS(Status) ->
{Status, OldState, undefined}; {Status, Data#data.state, undefined};
parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status) -> parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) ->
{Status, NewState, undefined}; {Status, NewState, undefined};
parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) -> parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) ->
{Status, NewState, Error}. {Status, NewState, Error};
parse_health_check_result({error, Error}, Data) ->
?SLOG(
error,
#{
msg => health_check_exception,
resource_id => Data#data.id,
reason => Error
}
),
{disconnected, Data#data.state, Error}.
maybe_reply(Actions, undefined, _Reply) -> maybe_reply(Actions, undefined, _Reply) ->
Actions; Actions;
@ -473,29 +548,30 @@ data_record_to_external_map_with_metrics(Data) ->
metrics => get_metrics(Data#data.id) metrics => get_metrics(Data#data.id)
}. }.
make_test_id() -> -spec wait_for_resource_ready(resource_id(), integer()) -> ok | timeout.
RandId = iolist_to_binary(emqx_misc:gen_id(16)), wait_for_resource_ready(ResId, WaitTime) ->
<<?TEST_ID_PREFIX, RandId/binary>>. do_wait_for_resource_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
-spec wait_for_resource_ready(instance_id(), integer()) -> ok | timeout. do_wait_for_resource_ready(_ResId, 0) ->
wait_for_resource_ready(InstId, WaitTime) ->
do_wait_for_resource_ready(InstId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY).
do_wait_for_resource_ready(_InstId, 0) ->
timeout; timeout;
do_wait_for_resource_ready(InstId, Retry) -> do_wait_for_resource_ready(ResId, Retry) ->
case ets_lookup(InstId) of case ets_lookup(ResId) of
{ok, _Group, #{status := connected}} -> {ok, _Group, #{status := connected}} ->
ok; ok;
_ -> _ ->
timer:sleep(?WAIT_FOR_RESOURCE_DELAY), timer:sleep(?WAIT_FOR_RESOURCE_DELAY),
do_wait_for_resource_ready(InstId, Retry - 1) do_wait_for_resource_ready(ResId, Retry - 1)
end. end.
safe_call(InstId, Message, Timeout) -> safe_call(ResId, Message, Timeout) ->
try try
gen_statem:call(proc_name(InstId), Message, {clean_timeout, Timeout}) Module = atom_to_binary(?MODULE),
MgrId = get_owner(ResId),
ProcName = binary_to_existing_atom(<<Module/binary, "_", MgrId/binary>>, utf8),
gen_statem:call(ProcName, Message, {clean_timeout, Timeout})
catch catch
error:badarg ->
{error, not_found};
exit:{R, _} when R == noproc; R == normal; R == shutdown -> exit:{R, _} when R == noproc; R == normal; R == shutdown ->
{error, not_found}; {error, not_found};
exit:{timeout, _} -> exit:{timeout, _} ->

View File

@ -17,14 +17,14 @@
-behaviour(supervisor). -behaviour(supervisor).
-export([ensure_child/5]). -export([ensure_child/6]).
-export([start_link/0]). -export([start_link/0]).
-export([init/1]). -export([init/1]).
ensure_child(InstId, Group, ResourceType, Config, Opts) -> ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts) ->
_ = supervisor:start_child(?MODULE, [InstId, Group, ResourceType, Config, Opts]), _ = supervisor:start_child(?MODULE, [MgrId, ResId, Group, ResourceType, Config, Opts]),
ok. ok.
start_link() -> start_link() ->

View File

@ -34,16 +34,16 @@ introduced_in() ->
"5.0.0". "5.0.0".
-spec create( -spec create(
instance_id(), resource_id(),
resource_group(), resource_group(),
resource_type(), resource_type(),
resource_config(), resource_config(),
create_opts() create_opts()
) -> ) ->
{ok, resource_data() | 'already_created'} | {error, Reason :: term()}. {ok, resource_data() | 'already_created'} | {error, Reason :: term()}.
create(InstId, Group, ResourceType, Config, Opts) -> create(ResId, Group, ResourceType, Config, Opts) ->
emqx_cluster_rpc:multicall(emqx_resource, create_local, [ emqx_cluster_rpc:multicall(emqx_resource, create_local, [
InstId, Group, ResourceType, Config, Opts ResId, Group, ResourceType, Config, Opts
]). ]).
-spec create_dry_run( -spec create_dry_run(
@ -55,19 +55,19 @@ create_dry_run(ResourceType, Config) ->
emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]). emqx_cluster_rpc:multicall(emqx_resource, create_dry_run_local, [ResourceType, Config]).
-spec recreate( -spec recreate(
instance_id(), resource_id(),
resource_type(), resource_type(),
resource_config(), resource_config(),
create_opts() create_opts()
) -> ) ->
{ok, resource_data()} | {error, Reason :: term()}. {ok, resource_data()} | {error, Reason :: term()}.
recreate(InstId, ResourceType, Config, Opts) -> recreate(ResId, ResourceType, Config, Opts) ->
emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [InstId, ResourceType, Config, Opts]). emqx_cluster_rpc:multicall(emqx_resource, recreate_local, [ResId, ResourceType, Config, Opts]).
-spec remove(instance_id()) -> ok | {error, Reason :: term()}. -spec remove(resource_id()) -> ok | {error, Reason :: term()}.
remove(InstId) -> remove(ResId) ->
emqx_cluster_rpc:multicall(emqx_resource, remove_local, [InstId]). emqx_cluster_rpc:multicall(emqx_resource, remove_local, [ResId]).
-spec reset_metrics(instance_id()) -> ok | {error, any()}. -spec reset_metrics(resource_id()) -> ok | {error, any()}.
reset_metrics(InstId) -> reset_metrics(ResId) ->
emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [InstId]). emqx_cluster_rpc:multicall(emqx_resource, reset_metrics_local, [ResId]).

View File

@ -126,8 +126,46 @@ t_create_remove_local(_) ->
ok = emqx_resource:remove_local(?ID), ok = emqx_resource:remove_local(?ID),
{error, _} = emqx_resource:remove_local(?ID), {error, _} = emqx_resource:remove_local(?ID),
?assertMatch(
{error, {emqx_resource, #{reason := not_found}}},
emqx_resource:query(?ID, get_state)
),
?assertNot(is_process_alive(Pid)). ?assertNot(is_process_alive(Pid)).
t_do_not_start_after_created(_) ->
{ok, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{start_after_created => false}
),
%% the resource should remain `disconnected` after created
timer:sleep(200),
?assertMatch(
{error, {emqx_resource, #{reason := not_connected}}},
emqx_resource:query(?ID, get_state)
),
?assertMatch(
{ok, _, #{status := disconnected}},
emqx_resource:get_instance(?ID)
),
%% start the resource manually..
ok = emqx_resource:start(?ID),
#{pid := Pid} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid)),
%% restart the resource
ok = emqx_resource:restart(?ID),
?assertNot(is_process_alive(Pid)),
#{pid := Pid2} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid2)),
ok = emqx_resource:remove_local(?ID),
?assertNot(is_process_alive(Pid2)).
t_query(_) -> t_query(_) ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
?ID, ?ID,
@ -231,7 +269,7 @@ t_stop_start(_) ->
?assertNot(is_process_alive(Pid0)), ?assertNot(is_process_alive(Pid0)),
?assertMatch( ?assertMatch(
{error, {emqx_resource, #{reason := not_found}}}, {error, {emqx_resource, #{reason := not_connected}}},
emqx_resource:query(?ID, get_state) emqx_resource:query(?ID, get_state)
), ),
@ -273,7 +311,7 @@ t_stop_start_local(_) ->
?assertNot(is_process_alive(Pid0)), ?assertNot(is_process_alive(Pid0)),
?assertMatch( ?assertMatch(
{error, {emqx_resource, #{reason := not_found}}}, {error, {emqx_resource, #{reason := not_connected}}},
emqx_resource:query(?ID, get_state) emqx_resource:query(?ID, get_state)
), ),
@ -310,6 +348,16 @@ t_list_filter(_) ->
). ).
t_create_dry_run_local(_) -> t_create_dry_run_local(_) ->
ets:match_delete(emqx_resource_manager, {{owner, '$1'}, '_'}),
lists:foreach(
fun(_) ->
create_dry_run_local_succ()
end,
lists:seq(1, 10)
),
[] = ets:match(emqx_resource_manager, {{owner, '$1'}, '_'}).
create_dry_run_local_succ() ->
?assertEqual( ?assertEqual(
ok, ok,
emqx_resource:create_dry_run_local( emqx_resource:create_dry_run_local(
@ -317,7 +365,6 @@ t_create_dry_run_local(_) ->
#{name => test_resource, register => true} #{name => test_resource, register => true}
) )
), ),
timer:sleep(100),
?assertEqual(undefined, whereis(test_resource)). ?assertEqual(undefined, whereis(test_resource)).
t_create_dry_run_local_failed(_) -> t_create_dry_run_local_failed(_) ->