From 1054c364adea90f863b0df7bf0929dae2fac9e54 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 31 May 2022 01:09:53 +0800 Subject: [PATCH] refactor(resource): improve health check and alarm it if resource down --- apps/emqx_bridge/src/emqx_bridge_resource.erl | 4 +- .../src/emqx_connector_http.erl | 4 +- .../src/emqx_connector_mqtt.erl | 15 +- apps/emqx_resource/include/emqx_resource.hrl | 7 +- apps/emqx_resource/src/emqx_resource.erl | 14 +- .../src/emqx_resource_manager.erl | 142 +++++++++--------- .../test/emqx_resource_SUITE.erl | 7 +- 7 files changed, 98 insertions(+), 95 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index b3f7ec978..5af164b33 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -90,7 +90,7 @@ create(Type, Name, Conf) -> <<"emqx_bridge">>, bridge_to_resource_type(Type), parse_confs(Type, Name, Conf), - #{} + #{auto_retry_interval => 60000} ), maybe_disable_bridge(Type, Name, Conf). @@ -146,7 +146,7 @@ recreate(Type, Name, Conf) -> resource_id(Type, Name), bridge_to_resource_type(Type), parse_confs(Type, Name, Conf), - #{} + #{auto_retry_interval => 60000} ). create_dry_run(Type, Conf) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 956cdb840..b06ee94a8 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -306,7 +306,7 @@ on_query( end, Result. -on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}) -> +on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) -> case do_get_status(Host, Port, Timeout) of ok -> connected; @@ -317,7 +317,7 @@ on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} host => Host, port => Port }), - disconnected + {disconnected, State, Reason} end. do_get_status(Host, Port, Timeout) -> diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index e0e750c3d..21e201504 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -154,11 +154,11 @@ on_start(InstId, Conf) -> }, case ?MODULE:create_bridge(BridgeConf) of {ok, _Pid} -> - ensure_mqtt_worker_started(InstanceId); + ensure_mqtt_worker_started(InstanceId, BridgeConf); {error, {already_started, _Pid}} -> ok = ?MODULE:drop_bridge(InstanceId), {ok, _} = ?MODULE:create_bridge(BridgeConf), - ensure_mqtt_worker_started(InstanceId); + ensure_mqtt_worker_started(InstanceId, BridgeConf); {error, Reason} -> {error, Reason} end. @@ -188,15 +188,17 @@ on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) -> emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_resource:query_success(AfterQuery). -on_get_status(_InstId, #{name := InstanceId}) -> +on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> + AutoReconn = maps:get(auto_reconnect, Conf, true), case emqx_connector_mqtt_worker:status(InstanceId) of connected -> connected; - _ -> disconnected + _ when AutoReconn == true -> connecting; + _ when AutoReconn == false -> disconnected end. -ensure_mqtt_worker_started(InstanceId) -> +ensure_mqtt_worker_started(InstanceId, BridgeConf) -> case emqx_connector_mqtt_worker:ensure_started(InstanceId) of - ok -> {ok, #{name => InstanceId}}; + ok -> {ok, #{name => InstanceId, bridge_conf => BridgeConf}}; {error, Reason} -> {error, Reason} end. @@ -240,6 +242,7 @@ basic_config(#{ server => Server, %% 30s connect_timeout => 30, + auto_reconnect => true, reconnect_interval => ReconnIntv, proto_ver => ProtoVer, %% Opening bridge_mode will form a non-standard mqtt connection message. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index ca8b661d4..2b57bc580 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -20,20 +20,21 @@ -type resource_config() :: term(). -type resource_spec() :: map(). -type resource_state() :: term(). --type resource_connection_status() :: connected | disconnected | connecting. +-type resource_status() :: connected | disconnected | connecting. -type resource_data() :: #{ id := instance_id(), mod := module(), config := resource_config(), state := resource_state(), - status := resource_connection_status(), + status := resource_status(), metrics := emqx_metrics_worker:metrics() }. -type resource_group() :: binary(). -type create_opts() :: #{ health_check_interval => integer(), health_check_timeout => integer(), - waiting_connect_complete => integer() + waiting_connect_complete => integer(), + auto_retry_interval => integer() }. -type after_query() :: {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 05ad3ce46..29309afcc 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -127,8 +127,9 @@ %% when calling emqx_resource:health_check/2 -callback on_get_status(instance_id(), resource_state()) -> - resource_connection_status() - | {resource_connection_status(), resource_state()}. + resource_status() + | {resource_status(), resource_state()} + | {resource_status(), resource_state(), term()}. -spec list_types() -> [module()]. list_types() -> @@ -260,7 +261,7 @@ query(InstId, Request, AfterQuery) -> erlang:raise(Err, Reason, ST) end; {error, not_found} -> - query_error(not_found, <<"the resource id not exists">>) + query_error(not_found, <<"resource not found or not connected">>) end. -spec restart(instance_id()) -> ok | {error, Reason :: term()}. @@ -275,7 +276,7 @@ restart(InstId, Opts) -> stop(InstId) -> emqx_resource_manager:stop(InstId). --spec health_check(instance_id()) -> ok | {error, Reason :: term()}. +-spec health_check(instance_id()) -> resource_status(). health_check(InstId) -> emqx_resource_manager:health_check(InstId). @@ -316,8 +317,9 @@ call_start(InstId, Mod, Config) -> ?SAFE_CALL(Mod:on_start(InstId, Config)). -spec call_health_check(instance_id(), module(), resource_state()) -> - resource_connection_status() - | {resource_connection_status(), resource_state()}. + resource_status() + | {resource_status(), resource_state()} + | {resource_status(), resource_state(), term()}. call_health_check(InstId, Mod, ResourceState) -> ?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 41ff74f16..405b47097 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -52,6 +52,8 @@ -define(ETS_TABLE, emqx_resource_manager). -define(WAIT_FOR_RESOURCE_DELAY, 100). +-define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -188,7 +190,7 @@ list_group(Group) -> List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), lists:flatten(List). --spec health_check(instance_id()) -> ok | {error, Reason :: term()}. +-spec health_check(instance_id()) -> resource_status(). health_check(InstId) -> safe_call(InstId, health_check). @@ -209,7 +211,7 @@ start_link(InstId, Group, ResourceType, Config, Opts) -> gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []). init(Data) -> - {ok, connecting, Data}. + {ok, connecting, Data, {next_event, internal, try_connect}}. terminate(_Reason, _State, Data) -> ets:delete(?ETS_TABLE, Data#data.id), @@ -238,25 +240,21 @@ handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)}, {keep_state_and_data, [{reply, From, Reply}]}; -% An external health check call. Disconnected usually means an error has happened. -handle_event({call, From}, health_check, disconnected, Data) -> - Actions = [{reply, From, {error, Data#data.error}}], - {keep_state_and_data, Actions}; % Connecting state enter -handle_event(enter, connecting, connecting, Data) -> +handle_event(internal, try_connect, connecting, Data) -> handle_connection_attempt(Data); handle_event(enter, _OldState, connecting, Data) -> + ets:delete(?ETS_TABLE, Data#data.id), Actions = [{state_timeout, 0, health_check}], {next_state, connecting, Data, Actions}; % Connecting state health_check timeouts. -% First clause supports desired behavior on initial connection. -handle_event(state_timeout, health_check, connecting, #data{status = disconnected} = Data) -> - {next_state, disconnected, Data}; handle_event(state_timeout, health_check, connecting, Data) -> connecting_health_check(Data); %% The connected state is entered after a successful start of the callback mod %% and successful health_checks handle_event(enter, _OldState, connected, Data) -> + ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}), + _ = emqx_alarm:deactivate(Data#data.id), Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], {next_state, connected, Data, Actions}; handle_event(state_timeout, health_check, connected, Data) -> @@ -270,11 +268,11 @@ handle_event(enter, _OldState, stopped, Data) -> ets:delete(?ETS_TABLE, Data#data.id), {next_state, stopped, UpdatedData}; % Resource has been explicitly stopped, so return that as the error reason. -handle_event({call, From}, health_check, stopped, _Data) -> - Actions = [{reply, From, {error, stopped}}], +handle_event({call, From}, _, stopped, _Data) -> + Actions = [{reply, From, {error, resource_is_stopped}}], {keep_state_and_data, Actions}; handle_event({call, From}, health_check, _State, Data) -> - handle_health_check_event(From, Data); + handle_health_check_request(From, Data); % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -296,7 +294,7 @@ handle_event(EventType, EventData, State, Data) -> handle_disconnected_state_enter(Data) -> UpdatedData = Data#data{status = disconnected}, ets:delete(?ETS_TABLE, Data#data.id), - case maps:get(auto_retry_interval, Data#data.config, undefined) of + case maps:get(auto_retry_interval, Data#data.opts, undefined) of undefined -> {next_state, disconnected, UpdatedData}; RetryInterval -> @@ -315,8 +313,7 @@ handle_connection_attempt(Data) -> %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. UpdatedData = Data#data{status = disconnected, error = Reason}, - Actions = [{state_timeout, 0, health_check}], - {next_state, connecting, UpdatedData, Actions} + {next_state, disconnected, UpdatedData} end. handle_remove_event(From, ClearMetrics, Data) -> @@ -352,65 +349,65 @@ proc_name(Id) -> Connector = <<"_">>, binary_to_atom(<>). -handle_health_check_event(From, Data) -> - case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of - connected -> - UpdatedData = Data#data{status = connected, error = undefined}, - update_resource(Data#data.id, Data#data.group, UpdatedData), - Actions = [{reply, From, ok}], - {next_state, connected, UpdatedData, Actions}; - {connected, NewResourceState} -> - UpdatedData = Data#data{ - state = NewResourceState, status = connected, error = undefined - }, - update_resource(Data#data.id, Data#data.group, UpdatedData), - Actions = [{reply, From, ok}], - {next_state, connected, UpdatedData, Actions}; - ConnectStatus -> - logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]), - UpdatedData = Data#data{status = connecting, error = ConnectStatus}, - ets:delete(?ETS_TABLE, Data#data.id), - Actions = [{reply, From, {error, ConnectStatus}}], - {next_state, connecting, UpdatedData, Actions} - end. +handle_health_check_request(From, Data) -> + with_health_check(Data, fun(Status, UpdatedData) -> + Actions = [{reply, From, Status}], + {next_state, Status, UpdatedData, Actions} + end). connecting_health_check(Data) -> - case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of - connected -> - UpdatedData = Data#data{status = connected, error = undefined}, - update_resource(Data#data.id, Data#data.group, UpdatedData), - {next_state, connected, UpdatedData}; - {connected, NewResourceState} -> - UpdatedData = Data#data{ - state = NewResourceState, status = connected, error = undefined - }, - update_resource(Data#data.id, Data#data.group, UpdatedData), - {next_state, connected, UpdatedData}; - ConnectStatus -> - logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]), - UpdatedData = Data#data{status = connecting, error = ConnectStatus}, - Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}], - {keep_state, UpdatedData, Actions} - end. + with_health_check( + Data, + fun + (connected, UpdatedData) -> + {next_state, connected, UpdatedData}; + (connecting, UpdatedData) -> + Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}], + {keep_state, UpdatedData, Actions}; + (disconnected, UpdatedData) -> + {next_state, disconnected, UpdatedData} + end + ). perform_connected_health_check(Data) -> - case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of - connected -> - Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], - {keep_state_and_data, Actions}; - {connected, NewResourceState} -> - UpdatedData = Data#data{ - state = NewResourceState, status = connected, error = undefined - }, - update_resource(Data#data.id, Data#data.group, UpdatedData), - Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], - {keep_state, NewResourceState, Actions}; - ConnectStatus -> - logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]), - UpdatedData = Data#data{error = ConnectStatus}, - ets:delete(?ETS_TABLE, Data#data.id), - {next_state, connecting, UpdatedData} - end. + with_health_check( + Data, + fun + (connected, UpdatedData) -> + Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], + {keep_state, UpdatedData, Actions}; + (Status, UpdatedData) -> + logger:error("health check for ~p failed: ~p", [Data#data.id, Status]), + {next_state, Status, UpdatedData} + end + ). + +with_health_check(Data, Func) -> + ResId = Data#data.id, + HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state), + {Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state), + _ = + case Status of + connected -> + ok; + _ -> + emqx_alarm:activate( + ResId, + #{resource_id => ResId, reason => resource_down}, + <<"resource down: ", ResId/binary>> + ) + end, + UpdatedData = Data#data{ + state = NewState, status = Status, error = Err + }, + Func(Status, UpdatedData). + +parse_health_check_result(Status, OldState) when ?IS_STATUS(Status) -> + {Status, OldState, undefined}; +parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status) -> + {Status, NewState, undefined}; +parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) -> + {Status, NewState, Error}. data_record_to_external_map_with_metrics(Data) -> #{ @@ -448,6 +445,3 @@ safe_call(InstId, Message) -> exit:_ -> {error, not_found} end. - -update_resource(InstId, Group, Data) -> - ets:insert(?ETS_TABLE, {InstId, Group, Data}). diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 0d5230dfa..77053054f 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -361,9 +361,12 @@ t_reset_metrics(_) -> ?assertNot(is_process_alive(Pid)). t_auto_retry(_) -> - {Res, _} = emqx_resource:create_dry_run_local( + {Res, _} = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, - #{name => test_resource, create_error => true, auto_retry_interval => 1000} + #{name => test_resource, create_error => true}, + #{auto_retry_interval => 100} ), ?assertEqual(error, Res).