refactor(resource): improve health check and alarm it if resource down

This commit is contained in:
Shawn 2022-05-31 01:09:53 +08:00
parent a6f43e3fc8
commit 1054c364ad
7 changed files with 98 additions and 95 deletions

View File

@ -90,7 +90,7 @@ create(Type, Name, Conf) ->
<<"emqx_bridge">>, <<"emqx_bridge">>,
bridge_to_resource_type(Type), bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf), parse_confs(Type, Name, Conf),
#{} #{auto_retry_interval => 60000}
), ),
maybe_disable_bridge(Type, Name, Conf). maybe_disable_bridge(Type, Name, Conf).
@ -146,7 +146,7 @@ recreate(Type, Name, Conf) ->
resource_id(Type, Name), resource_id(Type, Name),
bridge_to_resource_type(Type), bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf), parse_confs(Type, Name, Conf),
#{} #{auto_retry_interval => 60000}
). ).
create_dry_run(Type, Conf) -> create_dry_run(Type, Conf) ->

View File

@ -306,7 +306,7 @@ on_query(
end, end,
Result. Result.
on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}) -> on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) ->
case do_get_status(Host, Port, Timeout) of case do_get_status(Host, Port, Timeout) of
ok -> ok ->
connected; connected;
@ -317,7 +317,7 @@ on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}
host => Host, host => Host,
port => Port port => Port
}), }),
disconnected {disconnected, State, Reason}
end. end.
do_get_status(Host, Port, Timeout) -> do_get_status(Host, Port, Timeout) ->

View File

@ -154,11 +154,11 @@ on_start(InstId, Conf) ->
}, },
case ?MODULE:create_bridge(BridgeConf) of case ?MODULE:create_bridge(BridgeConf) of
{ok, _Pid} -> {ok, _Pid} ->
ensure_mqtt_worker_started(InstanceId); ensure_mqtt_worker_started(InstanceId, BridgeConf);
{error, {already_started, _Pid}} -> {error, {already_started, _Pid}} ->
ok = ?MODULE:drop_bridge(InstanceId), ok = ?MODULE:drop_bridge(InstanceId),
{ok, _} = ?MODULE:create_bridge(BridgeConf), {ok, _} = ?MODULE:create_bridge(BridgeConf),
ensure_mqtt_worker_started(InstanceId); ensure_mqtt_worker_started(InstanceId, BridgeConf);
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end. end.
@ -188,15 +188,17 @@ on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
emqx_resource:query_success(AfterQuery). emqx_resource:query_success(AfterQuery).
on_get_status(_InstId, #{name := InstanceId}) -> on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
AutoReconn = maps:get(auto_reconnect, Conf, true),
case emqx_connector_mqtt_worker:status(InstanceId) of case emqx_connector_mqtt_worker:status(InstanceId) of
connected -> connected; connected -> connected;
_ -> disconnected _ when AutoReconn == true -> connecting;
_ when AutoReconn == false -> disconnected
end. end.
ensure_mqtt_worker_started(InstanceId) -> ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
case emqx_connector_mqtt_worker:ensure_started(InstanceId) of case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
ok -> {ok, #{name => InstanceId}}; ok -> {ok, #{name => InstanceId, bridge_conf => BridgeConf}};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end. end.
@ -240,6 +242,7 @@ basic_config(#{
server => Server, server => Server,
%% 30s %% 30s
connect_timeout => 30, connect_timeout => 30,
auto_reconnect => true,
reconnect_interval => ReconnIntv, reconnect_interval => ReconnIntv,
proto_ver => ProtoVer, proto_ver => ProtoVer,
%% Opening bridge_mode will form a non-standard mqtt connection message. %% Opening bridge_mode will form a non-standard mqtt connection message.

View File

@ -20,20 +20,21 @@
-type resource_config() :: term(). -type resource_config() :: term().
-type resource_spec() :: map(). -type resource_spec() :: map().
-type resource_state() :: term(). -type resource_state() :: term().
-type resource_connection_status() :: connected | disconnected | connecting. -type resource_status() :: connected | disconnected | connecting.
-type resource_data() :: #{ -type resource_data() :: #{
id := instance_id(), id := instance_id(),
mod := module(), mod := module(),
config := resource_config(), config := resource_config(),
state := resource_state(), state := resource_state(),
status := resource_connection_status(), status := resource_status(),
metrics := emqx_metrics_worker:metrics() metrics := emqx_metrics_worker:metrics()
}. }.
-type resource_group() :: binary(). -type resource_group() :: binary().
-type create_opts() :: #{ -type create_opts() :: #{
health_check_interval => integer(), health_check_interval => integer(),
health_check_timeout => integer(), health_check_timeout => integer(),
waiting_connect_complete => integer() waiting_connect_complete => integer(),
auto_retry_interval => integer()
}. }.
-type after_query() :: -type after_query() ::
{[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]} {[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]}

View File

@ -127,8 +127,9 @@
%% when calling emqx_resource:health_check/2 %% when calling emqx_resource:health_check/2
-callback on_get_status(instance_id(), resource_state()) -> -callback on_get_status(instance_id(), resource_state()) ->
resource_connection_status() resource_status()
| {resource_connection_status(), resource_state()}. | {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}.
-spec list_types() -> [module()]. -spec list_types() -> [module()].
list_types() -> list_types() ->
@ -260,7 +261,7 @@ query(InstId, Request, AfterQuery) ->
erlang:raise(Err, Reason, ST) erlang:raise(Err, Reason, ST)
end; end;
{error, not_found} -> {error, not_found} ->
query_error(not_found, <<"the resource id not exists">>) query_error(not_found, <<"resource not found or not connected">>)
end. end.
-spec restart(instance_id()) -> ok | {error, Reason :: term()}. -spec restart(instance_id()) -> ok | {error, Reason :: term()}.
@ -275,7 +276,7 @@ restart(InstId, Opts) ->
stop(InstId) -> stop(InstId) ->
emqx_resource_manager:stop(InstId). emqx_resource_manager:stop(InstId).
-spec health_check(instance_id()) -> ok | {error, Reason :: term()}. -spec health_check(instance_id()) -> resource_status().
health_check(InstId) -> health_check(InstId) ->
emqx_resource_manager:health_check(InstId). emqx_resource_manager:health_check(InstId).
@ -316,8 +317,9 @@ call_start(InstId, Mod, Config) ->
?SAFE_CALL(Mod:on_start(InstId, Config)). ?SAFE_CALL(Mod:on_start(InstId, Config)).
-spec call_health_check(instance_id(), module(), resource_state()) -> -spec call_health_check(instance_id(), module(), resource_state()) ->
resource_connection_status() resource_status()
| {resource_connection_status(), resource_state()}. | {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}.
call_health_check(InstId, Mod, ResourceState) -> call_health_check(InstId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)). ?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)).

View File

@ -52,6 +52,8 @@
-define(ETS_TABLE, emqx_resource_manager). -define(ETS_TABLE, emqx_resource_manager).
-define(WAIT_FOR_RESOURCE_DELAY, 100). -define(WAIT_FOR_RESOURCE_DELAY, 100).
-define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -188,7 +190,7 @@ list_group(Group) ->
List = ets:match(?ETS_TABLE, {'$1', Group, '_'}), List = ets:match(?ETS_TABLE, {'$1', Group, '_'}),
lists:flatten(List). lists:flatten(List).
-spec health_check(instance_id()) -> ok | {error, Reason :: term()}. -spec health_check(instance_id()) -> resource_status().
health_check(InstId) -> health_check(InstId) ->
safe_call(InstId, health_check). safe_call(InstId, health_check).
@ -209,7 +211,7 @@ start_link(InstId, Group, ResourceType, Config, Opts) ->
gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []). gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []).
init(Data) -> init(Data) ->
{ok, connecting, Data}. {ok, connecting, Data, {next_event, internal, try_connect}}.
terminate(_Reason, _State, Data) -> terminate(_Reason, _State, Data) ->
ets:delete(?ETS_TABLE, Data#data.id), ets:delete(?ETS_TABLE, Data#data.id),
@ -238,25 +240,21 @@ handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)}, Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)},
{keep_state_and_data, [{reply, From, Reply}]}; {keep_state_and_data, [{reply, From, Reply}]};
% An external health check call. Disconnected usually means an error has happened.
handle_event({call, From}, health_check, disconnected, Data) ->
Actions = [{reply, From, {error, Data#data.error}}],
{keep_state_and_data, Actions};
% Connecting state enter % Connecting state enter
handle_event(enter, connecting, connecting, Data) -> handle_event(internal, try_connect, connecting, Data) ->
handle_connection_attempt(Data); handle_connection_attempt(Data);
handle_event(enter, _OldState, connecting, Data) -> handle_event(enter, _OldState, connecting, Data) ->
ets:delete(?ETS_TABLE, Data#data.id),
Actions = [{state_timeout, 0, health_check}], Actions = [{state_timeout, 0, health_check}],
{next_state, connecting, Data, Actions}; {next_state, connecting, Data, Actions};
% Connecting state health_check timeouts. % Connecting state health_check timeouts.
% First clause supports desired behavior on initial connection.
handle_event(state_timeout, health_check, connecting, #data{status = disconnected} = Data) ->
{next_state, disconnected, Data};
handle_event(state_timeout, health_check, connecting, Data) -> handle_event(state_timeout, health_check, connecting, Data) ->
connecting_health_check(Data); connecting_health_check(Data);
%% The connected state is entered after a successful start of the callback mod %% The connected state is entered after a successful start of the callback mod
%% and successful health_checks %% and successful health_checks
handle_event(enter, _OldState, connected, Data) -> handle_event(enter, _OldState, connected, Data) ->
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
_ = emqx_alarm:deactivate(Data#data.id),
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
{next_state, connected, Data, Actions}; {next_state, connected, Data, Actions};
handle_event(state_timeout, health_check, connected, Data) -> handle_event(state_timeout, health_check, connected, Data) ->
@ -270,11 +268,11 @@ handle_event(enter, _OldState, stopped, Data) ->
ets:delete(?ETS_TABLE, Data#data.id), ets:delete(?ETS_TABLE, Data#data.id),
{next_state, stopped, UpdatedData}; {next_state, stopped, UpdatedData};
% Resource has been explicitly stopped, so return that as the error reason. % Resource has been explicitly stopped, so return that as the error reason.
handle_event({call, From}, health_check, stopped, _Data) -> handle_event({call, From}, _, stopped, _Data) ->
Actions = [{reply, From, {error, stopped}}], Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions}; {keep_state_and_data, Actions};
handle_event({call, From}, health_check, _State, Data) -> handle_event({call, From}, health_check, _State, Data) ->
handle_health_check_event(From, Data); handle_health_check_request(From, Data);
% Ignore all other events % Ignore all other events
handle_event(EventType, EventData, State, Data) -> handle_event(EventType, EventData, State, Data) ->
?SLOG( ?SLOG(
@ -296,7 +294,7 @@ handle_event(EventType, EventData, State, Data) ->
handle_disconnected_state_enter(Data) -> handle_disconnected_state_enter(Data) ->
UpdatedData = Data#data{status = disconnected}, UpdatedData = Data#data{status = disconnected},
ets:delete(?ETS_TABLE, Data#data.id), ets:delete(?ETS_TABLE, Data#data.id),
case maps:get(auto_retry_interval, Data#data.config, undefined) of case maps:get(auto_retry_interval, Data#data.opts, undefined) of
undefined -> undefined ->
{next_state, disconnected, UpdatedData}; {next_state, disconnected, UpdatedData};
RetryInterval -> RetryInterval ->
@ -315,8 +313,7 @@ handle_connection_attempt(Data) ->
%% Keep track of the error reason why the connection did not work %% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made. %% so that the Reason can be returned when the verification call is made.
UpdatedData = Data#data{status = disconnected, error = Reason}, UpdatedData = Data#data{status = disconnected, error = Reason},
Actions = [{state_timeout, 0, health_check}], {next_state, disconnected, UpdatedData}
{next_state, connecting, UpdatedData, Actions}
end. end.
handle_remove_event(From, ClearMetrics, Data) -> handle_remove_event(From, ClearMetrics, Data) ->
@ -352,65 +349,65 @@ proc_name(Id) ->
Connector = <<"_">>, Connector = <<"_">>,
binary_to_atom(<<Module/binary, Connector/binary, Id/binary>>). binary_to_atom(<<Module/binary, Connector/binary, Id/binary>>).
handle_health_check_event(From, Data) -> handle_health_check_request(From, Data) ->
case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of with_health_check(Data, fun(Status, UpdatedData) ->
connected -> Actions = [{reply, From, Status}],
UpdatedData = Data#data{status = connected, error = undefined}, {next_state, Status, UpdatedData, Actions}
update_resource(Data#data.id, Data#data.group, UpdatedData), end).
Actions = [{reply, From, ok}],
{next_state, connected, UpdatedData, Actions};
{connected, NewResourceState} ->
UpdatedData = Data#data{
state = NewResourceState, status = connected, error = undefined
},
update_resource(Data#data.id, Data#data.group, UpdatedData),
Actions = [{reply, From, ok}],
{next_state, connected, UpdatedData, Actions};
ConnectStatus ->
logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]),
UpdatedData = Data#data{status = connecting, error = ConnectStatus},
ets:delete(?ETS_TABLE, Data#data.id),
Actions = [{reply, From, {error, ConnectStatus}}],
{next_state, connecting, UpdatedData, Actions}
end.
connecting_health_check(Data) -> connecting_health_check(Data) ->
case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of with_health_check(
connected -> Data,
UpdatedData = Data#data{status = connected, error = undefined}, fun
update_resource(Data#data.id, Data#data.group, UpdatedData), (connected, UpdatedData) ->
{next_state, connected, UpdatedData}; {next_state, connected, UpdatedData};
{connected, NewResourceState} -> (connecting, UpdatedData) ->
UpdatedData = Data#data{ Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}],
state = NewResourceState, status = connected, error = undefined {keep_state, UpdatedData, Actions};
}, (disconnected, UpdatedData) ->
update_resource(Data#data.id, Data#data.group, UpdatedData), {next_state, disconnected, UpdatedData}
{next_state, connected, UpdatedData}; end
ConnectStatus -> ).
logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]),
UpdatedData = Data#data{status = connecting, error = ConnectStatus},
Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}],
{keep_state, UpdatedData, Actions}
end.
perform_connected_health_check(Data) -> perform_connected_health_check(Data) ->
case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of with_health_check(
connected -> Data,
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], fun
{keep_state_and_data, Actions}; (connected, UpdatedData) ->
{connected, NewResourceState} -> Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
UpdatedData = Data#data{ {keep_state, UpdatedData, Actions};
state = NewResourceState, status = connected, error = undefined (Status, UpdatedData) ->
}, logger:error("health check for ~p failed: ~p", [Data#data.id, Status]),
update_resource(Data#data.id, Data#data.group, UpdatedData), {next_state, Status, UpdatedData}
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], end
{keep_state, NewResourceState, Actions}; ).
ConnectStatus ->
logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]), with_health_check(Data, Func) ->
UpdatedData = Data#data{error = ConnectStatus}, ResId = Data#data.id,
ets:delete(?ETS_TABLE, Data#data.id), HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state),
{next_state, connecting, UpdatedData} {Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state),
end. _ =
case Status of
connected ->
ok;
_ ->
emqx_alarm:activate(
ResId,
#{resource_id => ResId, reason => resource_down},
<<"resource down: ", ResId/binary>>
)
end,
UpdatedData = Data#data{
state = NewState, status = Status, error = Err
},
Func(Status, UpdatedData).
parse_health_check_result(Status, OldState) when ?IS_STATUS(Status) ->
{Status, OldState, undefined};
parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status) ->
{Status, NewState, undefined};
parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) ->
{Status, NewState, Error}.
data_record_to_external_map_with_metrics(Data) -> data_record_to_external_map_with_metrics(Data) ->
#{ #{
@ -448,6 +445,3 @@ safe_call(InstId, Message) ->
exit:_ -> exit:_ ->
{error, not_found} {error, not_found}
end. end.
update_resource(InstId, Group, Data) ->
ets:insert(?ETS_TABLE, {InstId, Group, Data}).

View File

@ -361,9 +361,12 @@ t_reset_metrics(_) ->
?assertNot(is_process_alive(Pid)). ?assertNot(is_process_alive(Pid)).
t_auto_retry(_) -> t_auto_retry(_) ->
{Res, _} = emqx_resource:create_dry_run_local( {Res, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, create_error => true, auto_retry_interval => 1000} #{name => test_resource, create_error => true},
#{auto_retry_interval => 100}
), ),
?assertEqual(error, Res). ?assertEqual(error, Res).