diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 2b57bc580..d1d82a401 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -33,7 +33,7 @@ -type create_opts() :: #{ health_check_interval => integer(), health_check_timeout => integer(), - waiting_connect_complete => integer(), + wait_for_resource_ready => integer(), auto_retry_interval => integer() }. -type after_query() :: diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 8d0d984a2..bcc4422b2 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -267,13 +267,13 @@ handle_event({call, From}, restart, _State, Data) -> _ = stop_resource(Data), start_resource(Data, From); % Called when the resource is to be started -handle_event({call, From}, start, _State, #data{status = disconnected} = Data) -> +handle_event({call, From}, start, stopped, Data) -> start_resource(Data, From); handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; % Called when the resource is to be stopped -handle_event({call, From}, stop, _State, #data{status = disconnected} = Data) -> - {next_state, stopped, Data, [{reply, From, ok}]}; +handle_event({call, From}, stop, stopped, _Data) -> + {keep_state_and_data, [{reply, From, ok}]}; handle_event({call, From}, stop, _State, Data) -> Result = stop_resource(Data), UpdatedData = Data#data{status = disconnected}, @@ -281,21 +281,27 @@ handle_event({call, From}, stop, _State, Data) -> % Called when a resource is to be stopped and removed. handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_remove_event(From, ClearMetrics, Data); -% Called when the state of the resource is being looked up. +% Called when the state-data of the resource is being looked up. handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)}, {keep_state_and_data, [{reply, From, Reply}]}; -% Connecting state enter -handle_event(internal, try_connect, connecting, Data) -> - start_resource(Data, undefined); +% Called when doing a manually health check. +handle_event({call, From}, health_check, stopped, _Data) -> + Actions = [{reply, From, {error, resource_is_stopped}}], + {keep_state_and_data, Actions}; +handle_event({call, From}, health_check, _State, Data) -> + handle_manually_health_check(From, Data); +% State: CONNECTING handle_event(enter, _OldState, connecting, Data) -> ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), Actions = [{state_timeout, 0, health_check}], - {next_state, connecting, Data, Actions}; -% Connecting state health_check timeouts. + {keep_state_and_data, Actions}; +handle_event(internal, try_connect, connecting, Data) -> + start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> - connecting_health_check(Data); -%% The connected state is entered after a successful start of the callback mod + handle_connecting_health_check(Data); +%% State: CONNECTED +%% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks handle_event(enter, _OldState, connected, Data) -> ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), @@ -303,22 +309,19 @@ handle_event(enter, _OldState, connected, Data) -> Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], {next_state, connected, Data, Actions}; handle_event(state_timeout, health_check, connected, Data) -> - perform_connected_health_check(Data); + handle_connected_health_check(Data); +%% State: DISCONNECTED handle_event(enter, _OldState, disconnected, Data) -> ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), handle_disconnected_state_enter(Data); handle_event(state_timeout, auto_retry, disconnected, Data) -> start_resource(Data, undefined); +%% State: STOPPED +%% The stopped state is entered after the resource has been explicitly stopped handle_event(enter, _OldState, stopped, Data) -> UpdatedData = Data#data{status = disconnected}, ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, UpdatedData}), {next_state, stopped, UpdatedData}; -% Resource has been explicitly stopped, so return that as the error reason. -handle_event({call, From}, health_check, stopped, _Data) -> - Actions = [{reply, From, {error, resource_is_stopped}}], - {keep_state_and_data, Actions}; -handle_event({call, From}, health_check, _State, Data) -> - handle_health_check_request(From, Data); % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -331,7 +334,7 @@ handle_event(EventType, EventData, State, Data) -> data => Data } ), - {next_state, State, Data}. + keep_state_and_data. %%------------------------------------------------------------------------------ %% internal functions @@ -364,6 +367,7 @@ start_resource(Data, From) -> Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), {next_state, connecting, UpdatedData, Actions}; {error, Reason} = Err -> + _ = maybe_alarm(disconnected, Data#data.id), %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. UpdatedData = Data#data{status = disconnected, error = Reason}, @@ -371,25 +375,29 @@ start_resource(Data, From) -> {next_state, disconnected, UpdatedData, Actions} end. -stop_resource(#data{state = undefined} = _Data) -> +stop_resource(#data{state = undefined, id = ResId} = _Data) -> + _ = maybe_clear_alarm(ResId), ok; stop_resource(Data) -> - Result = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state), + %% We don't care the return value of the Mod:on_stop/2. + %% The callback mod should make sure the resource is stopped after on_stop/2 + %% is returned. + _ = emqx_resource:call_stop(Data#data.id, Data#data.mod, Data#data.state), _ = maybe_clear_alarm(Data#data.id), - Result. + ok. proc_name(Id) -> Module = atom_to_binary(?MODULE), Connector = <<"_">>, binary_to_atom(<>). -handle_health_check_request(From, Data) -> +handle_manually_health_check(From, Data) -> with_health_check(Data, fun(Status, UpdatedData) -> Actions = [{reply, From, {ok, Status}}], {next_state, Status, UpdatedData, Actions} end). -connecting_health_check(Data) -> +handle_connecting_health_check(Data) -> with_health_check( Data, fun @@ -403,7 +411,7 @@ connecting_health_check(Data) -> end ). -perform_connected_health_check(Data) -> +handle_connected_health_check(Data) -> with_health_check( Data, fun @@ -420,18 +428,18 @@ with_health_check(Data, Func) -> ResId = Data#data.id, HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state), {Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state), - _ = maybe_alarm_resource_down(Status, ResId), + _ = maybe_alarm(Status, ResId), UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, ets:insert(?ETS_TABLE, {ResId, UpdatedData#data.group, UpdatedData}), Func(Status, UpdatedData). -maybe_alarm_resource_down(connected, _ResId) -> +maybe_alarm(connected, _ResId) -> ok; -maybe_alarm_resource_down(_Status, <>) -> +maybe_alarm(_Status, <>) -> ok; -maybe_alarm_resource_down(_Status, ResId) -> +maybe_alarm(_Status, ResId) -> emqx_alarm:activate( ResId, #{resource_id => ResId, reason => resource_down},