%%-------------------------------------------------------------------- %% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_resource_manager). -behaviour(gen_statem). -include("emqx_resource.hrl"). -include("emqx_resource_utils.hrl"). -include_lib("emqx/include/logger.hrl"). % API -export([ ensure_resource/5, recreate/4, remove/1, create_dry_run/2, restart/2, start/2, stop/1, health_check/1 ]). -export([ lookup/1, list_all/0, list_group/1, ets_lookup/1, get_metrics/1, reset_metrics/1 ]). -export([ set_resource_status_connecting/1, manager_id_to_resource_id/1 ]). % Server -export([start_link/6]). % Behaviour -export([init/1, callback_mode/0, handle_event/4, terminate/3]). % State record -record(data, { id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error }). -type data() :: #data{}. -define(ETS_TABLE, ?MODULE). -define(WAIT_FOR_RESOURCE_DELAY, 100). -define(T_OPERATION, 5000). -define(T_LOOKUP, 1000). -define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected). %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ make_manager_id(ResId) -> emqx_resource:generate_id(ResId). manager_id_to_resource_id(MgrId) -> [ResId, _Index] = string:split(MgrId, ":", trailing), ResId. %% @doc Called from emqx_resource when starting a resource instance. %% %% Triggers the emqx_resource_manager_sup supervisor to actually create %% and link the process itself if not already started. -spec ensure_resource( resource_id(), resource_group(), resource_type(), resource_config(), creation_opts() ) -> {ok, resource_data()}. ensure_resource(ResId, Group, ResourceType, Config, Opts) -> case lookup(ResId) of {ok, _Group, Data} -> {ok, Data}; {error, not_found} -> MgrId = set_new_owner(ResId), create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) end. %% @doc Called from emqx_resource when recreating a resource which may or may not exist -spec recreate(resource_id(), resource_type(), resource_config(), creation_opts()) -> {ok, resource_data()} | {error, not_found} | {error, updating_to_incorrect_resource_type}. recreate(ResId, ResourceType, NewConfig, Opts) -> case lookup(ResId) of {ok, Group, #{mod := ResourceType, status := _} = _Data} -> _ = remove(ResId, false), MgrId = set_new_owner(ResId), create_and_return_data(MgrId, ResId, Group, ResourceType, NewConfig, Opts); {ok, _, #{mod := Mod}} when Mod =/= ResourceType -> {error, updating_to_incorrect_resource_type}; {error, not_found} -> {error, not_found} end. create_and_return_data(MgrId, ResId, Group, ResourceType, Config, Opts) -> create(MgrId, ResId, Group, ResourceType, Config, Opts), {ok, _Group, Data} = lookup(ResId), {ok, Data}. %% internal configs -define(START_AFTER_CREATED, true). %% in milliseconds -define(START_TIMEOUT, 5000). %% @doc Create a resource_manager and wait until it is running create(MgrId, ResId, Group, ResourceType, Config, Opts) -> % The state machine will make the actual call to the callback/resource module after init ok = emqx_resource_manager_sup:ensure_child(MgrId, ResId, Group, ResourceType, Config, Opts), ok = emqx_metrics_worker:create_metrics( ?RES_METRICS, ResId, [ 'matched', 'retried', 'retried.success', 'retried.failed', 'success', 'failed', 'dropped', 'dropped.queue_not_enabled', 'dropped.queue_full', 'dropped.resource_not_found', 'dropped.resource_stopped', 'dropped.other', 'queuing', 'batching', 'inflight', 'received' ], [matched] ), case emqx_resource:is_buffer_supported(ResourceType) of true -> %% the resource it self supports %% buffer, so there is no need for resource workers ok; false -> ok = emqx_resource_worker_sup:start_workers(ResId, Opts), case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of true -> wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); false -> ok end end. %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% %% Triggers the `emqx_resource_manager_sup` supervisor to actually create %% and link the process itself if not already started, and then immedately stops. -spec create_dry_run(resource_type(), resource_config()) -> ok | {error, Reason :: term()}. create_dry_run(ResourceType, Config) -> ResId = make_test_id(), MgrId = set_new_owner(ResId), ok = emqx_resource_manager_sup:ensure_child( MgrId, ResId, <<"dry_run">>, ResourceType, Config, #{} ), case wait_for_ready(ResId, 15000) of ok -> remove(ResId); timeout -> _ = remove(ResId), {error, timeout} end. %% @doc Stops a running resource_manager and clears the metrics for the resource -spec remove(resource_id()) -> ok | {error, Reason :: term()}. remove(ResId) when is_binary(ResId) -> remove(ResId, true). %% @doc Stops a running resource_manager and optionally clears the metrics for the resource -spec remove(resource_id(), boolean()) -> ok | {error, Reason :: term()}. remove(ResId, ClearMetrics) when is_binary(ResId) -> safe_call(ResId, {remove, ClearMetrics}, ?T_OPERATION). %% @doc Stops and then starts an instance that was already running -spec restart(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. restart(ResId, Opts) when is_binary(ResId) -> case safe_call(ResId, restart, ?T_OPERATION) of ok -> wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)), ok; {error, _Reason} = Error -> Error end. %% @doc Start the resource -spec start(resource_id(), creation_opts()) -> ok | {error, Reason :: term()}. start(ResId, Opts) -> case safe_call(ResId, start, ?T_OPERATION) of ok -> wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)), ok; {error, _Reason} = Error -> Error end. %% @doc Stop the resource -spec stop(resource_id()) -> ok | {error, Reason :: term()}. stop(ResId) -> case safe_call(ResId, stop, ?T_OPERATION) of ok -> ok; {error, _Reason} = Error -> Error end. %% @doc Test helper -spec set_resource_status_connecting(resource_id()) -> ok. set_resource_status_connecting(ResId) -> safe_call(ResId, set_resource_status_connecting, infinity). %% @doc Lookup the group and data of a resource -spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. lookup(ResId) -> case safe_call(ResId, lookup, ?T_LOOKUP) of {error, timeout} -> ets_lookup(ResId); Result -> Result end. %% @doc Lookup the group and data of a resource -spec ets_lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. ets_lookup(ResId) -> case read_cache(ResId) of {Group, Data} -> {ok, Group, data_record_to_external_map_with_metrics(Data)}; not_found -> {error, not_found} end. %% @doc Get the metrics for the specified resource get_metrics(ResId) -> emqx_metrics_worker:get_metrics(?RES_METRICS, ResId). %% @doc Reset the metrics for the specified resource -spec reset_metrics(resource_id()) -> ok. reset_metrics(ResId) -> emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId). %% @doc Returns the data for all resources -spec list_all() -> [resource_data()] | []. list_all() -> try [ data_record_to_external_map_with_metrics(Data) || {_Id, _Group, Data} <- ets:tab2list(?ETS_TABLE) ] catch error:badarg -> [] end. %% @doc Returns a list of ids for all the resources in a group -spec list_group(resource_group()) -> [resource_id()]. list_group(Group) -> List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), lists:flatten(List). -spec health_check(resource_id()) -> {ok, resource_status()} | {error, term()}. health_check(ResId) -> safe_call(ResId, health_check, ?T_OPERATION). %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> Data = #data{ id = ResId, manager_id = MgrId, group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), %% query_mode = dynamic | sync | async %% TODO: %% dynamic mode is async mode when things are going well, but becomes sync mode %% if the resource worker is overloaded query_mode = maps:get(query_mode, Opts, sync), config = Config, opts = Opts, status = connecting, state = undefined, error = undefined }, Module = atom_to_binary(?MODULE), ProcName = binary_to_atom(<>, utf8), gen_statem:start_link({local, ProcName}, ?MODULE, {Data, Opts}, []). init({Data, Opts}) -> process_flag(trap_exit, true), %% init the cache so that lookup/1 will always return something insert_cache(Data#data.id, Data#data.group, Data), case maps:get(start_after_created, Opts, true) of true -> {ok, connecting, Data, {next_event, internal, start_resource}}; false -> {ok, stopped, Data} end. terminate(_Reason, _State, Data) -> _ = maybe_clear_alarm(Data#data.id), delete_cache(Data#data.id, Data#data.manager_id), ok. %% Behavior callback callback_mode() -> [handle_event_function, state_enter]. %% Common event Function % Called during testing to force a specific state handle_event({call, From}, set_resource_status_connecting, _State, Data) -> {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]}; % Called when the resource is to be restarted handle_event({call, From}, restart, _State, Data) -> _ = stop_resource(Data), start_resource(Data, From); % Called when the resource is to be started handle_event({call, From}, start, stopped, Data) -> start_resource(Data, From); handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; % Called when the resource received a `quit` message handle_event(info, quit, stopped, _Data) -> {stop, {shutdown, quit}}; handle_event(info, quit, _State, Data) -> _ = stop_resource(Data), {stop, {shutdown, quit}}; % Called when the resource is to be stopped handle_event({call, From}, stop, stopped, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; handle_event({call, From}, stop, _State, Data) -> Result = stop_resource(Data), {next_state, stopped, Data, [{reply, From, Result}]}; % Called when a resource is to be stopped and removed. handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_remove_event(From, ClearMetrics, Data); % Called when the state-data of the resource is being looked up. handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)}, {keep_state_and_data, [{reply, From, Reply}]}; % Called when doing a manually health check. handle_event({call, From}, health_check, stopped, _Data) -> Actions = [{reply, From, {error, resource_is_stopped}}], {keep_state_and_data, Actions}; handle_event({call, From}, health_check, _State, Data) -> handle_manually_health_check(From, Data); % State: CONNECTING handle_event(enter, _OldState, connecting, Data) -> UpdatedData = Data#data{status = connecting}, insert_cache(Data#data.id, Data#data.group, Data), Actions = [{state_timeout, 0, health_check}], {keep_state, UpdatedData, Actions}; handle_event(internal, start_resource, connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> handle_connecting_health_check(Data); %% State: CONNECTED %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks handle_event(enter, _OldState, connected, Data) -> UpdatedData = Data#data{status = connected}, insert_cache(Data#data.id, Data#data.group, UpdatedData), _ = emqx_alarm:deactivate(Data#data.id), Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {next_state, connected, UpdatedData, Actions}; handle_event(state_timeout, health_check, connected, Data) -> handle_connected_health_check(Data); %% State: DISCONNECTED handle_event(enter, _OldState, disconnected, Data) -> UpdatedData = Data#data{status = disconnected}, insert_cache(Data#data.id, Data#data.group, UpdatedData), handle_disconnected_state_enter(UpdatedData); handle_event(state_timeout, auto_retry, disconnected, Data) -> start_resource(Data, undefined); %% State: STOPPED %% The stopped state is entered after the resource has been explicitly stopped handle_event(enter, _OldState, stopped, Data) -> UpdatedData = Data#data{status = stopped}, insert_cache(Data#data.id, Data#data.group, UpdatedData), {next_state, stopped, UpdatedData}; % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( error, #{ msg => ignore_all_other_events, event_type => EventType, event_data => EventData, state => State, data => Data } ), keep_state_and_data. %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ insert_cache(ResId, Group, Data = #data{manager_id = MgrId}) -> case get_owner(ResId) of not_found -> ets:insert(?ETS_TABLE, {ResId, Group, Data}); MgrId -> ets:insert(?ETS_TABLE, {ResId, Group, Data}); _ -> ?SLOG(error, #{ msg => get_resource_owner_failed, resource_id => ResId, action => quit_resource }), self() ! quit end. read_cache(ResId) -> case ets:lookup(?ETS_TABLE, ResId) of [{_Id, Group, Data}] -> {Group, Data}; [] -> not_found end. delete_cache(ResId, MgrId) -> case get_owner(ResId) of MgrIdNow when MgrIdNow == not_found; MgrIdNow == MgrId -> do_delete_cache(ResId); _ -> ok end. do_delete_cache(<> = ResId) -> ets:delete(?ETS_TABLE, {owner, ResId}), ets:delete(?ETS_TABLE, ResId); do_delete_cache(ResId) -> ets:delete(?ETS_TABLE, ResId). set_new_owner(ResId) -> MgrId = make_manager_id(ResId), ok = set_owner(ResId, MgrId), MgrId. set_owner(ResId, MgrId) -> ets:insert(?ETS_TABLE, {{owner, ResId}, MgrId}), ok. get_owner(ResId) -> case ets:lookup(?ETS_TABLE, {owner, ResId}) of [{_, MgrId}] -> MgrId; [] -> not_found end. handle_disconnected_state_enter(Data) -> {next_state, disconnected, Data, retry_actions(Data)}. retry_actions(Data) -> case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of undefined -> []; RetryInterval -> [{state_timeout, RetryInterval, auto_retry}] end. handle_remove_event(From, ClearMetrics, Data) -> stop_resource(Data), ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, {stop_and_reply, normal, [{reply, From, ok}]}. start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache insert_cache(Data#data.id, Data#data.group, Data), case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of {ok, ResourceState} -> UpdatedData = Data#data{state = ResourceState, status = connecting}, %% Perform an initial health_check immediately before transitioning into a connected state Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), {next_state, connecting, UpdatedData, Actions}; {error, Reason} = Err -> ?SLOG(error, #{ msg => start_resource_failed, id => Data#data.id, reason => Reason }), _ = maybe_alarm(disconnected, Data#data.id), %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. UpdatedData = Data#data{error = Reason}, Actions = maybe_reply(retry_actions(UpdatedData), From, Err), {next_state, disconnected, UpdatedData, Actions} end. stop_resource(#data{state = undefined, id = ResId} = _Data) -> _ = maybe_clear_alarm(ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok; stop_resource(Data) -> %% We don't care the return value of the Mod:on_stop/2. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. ResId = Data#data.id, _ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state), _ = maybe_clear_alarm(ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok. make_test_id() -> RandId = iolist_to_binary(emqx_misc:gen_id(16)), <>. handle_manually_health_check(From, Data) -> with_health_check(Data, fun(Status, UpdatedData) -> Actions = [{reply, From, {ok, Status}}], {next_state, Status, UpdatedData, Actions} end). handle_connecting_health_check(Data) -> with_health_check( Data, fun (connected, UpdatedData) -> {next_state, connected, UpdatedData}; (connecting, UpdatedData) -> Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {keep_state, UpdatedData, Actions}; (disconnected, UpdatedData) -> {next_state, disconnected, UpdatedData} end ). handle_connected_health_check(Data) -> with_health_check( Data, fun (connected, UpdatedData) -> Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {keep_state, UpdatedData, Actions}; (Status, UpdatedData) -> ?SLOG(error, #{ msg => health_check_failed, id => Data#data.id, status => Status }), {next_state, Status, UpdatedData} end ). with_health_check(Data, Func) -> ResId = Data#data.id, HCRes = emqx_resource:call_health_check(Data#data.manager_id, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data), _ = maybe_alarm(Status, ResId), ok = maybe_resume_resource_workers(Status), UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, insert_cache(ResId, UpdatedData#data.group, UpdatedData), Func(Status, UpdatedData). health_check_interval(Opts) -> maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). maybe_alarm(connected, _ResId) -> ok; maybe_alarm(_Status, <>) -> ok; maybe_alarm(_Status, ResId) -> emqx_alarm:activate( ResId, #{resource_id => ResId, reason => resource_down}, <<"resource down: ", ResId/binary>> ). maybe_resume_resource_workers(connected) -> lists:foreach( fun({_, Pid, _, _}) -> emqx_resource_worker:resume(Pid) end, supervisor:which_children(emqx_resource_worker_sup) ); maybe_resume_resource_workers(_) -> ok. maybe_clear_alarm(<>) -> ok; maybe_clear_alarm(ResId) -> emqx_alarm:deactivate(ResId). parse_health_check_result(Status, Data) when ?IS_STATUS(Status) -> {Status, Data#data.state, undefined}; parse_health_check_result({Status, NewState}, _Data) when ?IS_STATUS(Status) -> {Status, NewState, undefined}; parse_health_check_result({Status, NewState, Error}, _Data) when ?IS_STATUS(Status) -> {Status, NewState, Error}; parse_health_check_result({error, Error}, Data) -> ?SLOG( error, #{ msg => health_check_exception, resource_id => Data#data.id, reason => Error } ), {disconnected, Data#data.state, Error}. maybe_reply(Actions, undefined, _Reply) -> Actions; maybe_reply(Actions, From, Reply) -> [{reply, From, Reply} | Actions]. -spec data_record_to_external_map_with_metrics(data()) -> resource_data(). data_record_to_external_map_with_metrics(Data) -> #{ id => Data#data.id, mod => Data#data.mod, callback_mode => Data#data.callback_mode, query_mode => Data#data.query_mode, config => Data#data.config, status => Data#data.status, state => Data#data.state, metrics => get_metrics(Data#data.id) }. -spec wait_for_ready(resource_id(), integer()) -> ok | timeout. wait_for_ready(ResId, WaitTime) -> do_wait_for_ready(ResId, WaitTime div ?WAIT_FOR_RESOURCE_DELAY). do_wait_for_ready(_ResId, 0) -> timeout; do_wait_for_ready(ResId, Retry) -> case ets_lookup(ResId) of {ok, _Group, #{status := connected}} -> ok; _ -> timer:sleep(?WAIT_FOR_RESOURCE_DELAY), do_wait_for_ready(ResId, Retry - 1) end. safe_call(ResId, Message, Timeout) -> try Module = atom_to_binary(?MODULE), MgrId = get_owner(ResId), ProcName = binary_to_existing_atom(<>, utf8), gen_statem:call(ProcName, Message, {clean_timeout, Timeout}) catch error:badarg -> {error, not_found}; exit:{R, _} when R == noproc; R == normal; R == shutdown -> {error, not_found}; exit:{timeout, _} -> {error, timeout} end.