Merge pull request #8087 from terry-xiaoyu/resource_down_alarm

refactor(resource): improve health check and alarm it if resource down
This commit is contained in:
Xinyu Liu 2022-05-31 10:23:52 +08:00 committed by GitHub
commit a8c87e8ab7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 113 additions and 110 deletions

View File

@ -90,7 +90,7 @@ create(Type, Name, Conf) ->
<<"emqx_bridge">>,
bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf),
#{}
#{auto_retry_interval => 60000}
),
maybe_disable_bridge(Type, Name, Conf).
@ -146,7 +146,7 @@ recreate(Type, Name, Conf) ->
resource_id(Type, Name),
bridge_to_resource_type(Type),
parse_confs(Type, Name, Conf),
#{}
#{auto_retry_interval => 60000}
).
create_dry_run(Type, Conf) ->

View File

@ -306,7 +306,7 @@ on_query(
end,
Result.
on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}) ->
on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) ->
case do_get_status(Host, Port, Timeout) of
ok ->
connected;
@ -317,7 +317,7 @@ on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout}
host => Host,
port => Port
}),
disconnected
{disconnected, State, Reason}
end.
do_get_status(Host, Port, Timeout) ->

View File

@ -154,11 +154,11 @@ on_start(InstId, Conf) ->
},
case ?MODULE:create_bridge(BridgeConf) of
{ok, _Pid} ->
ensure_mqtt_worker_started(InstanceId);
ensure_mqtt_worker_started(InstanceId, BridgeConf);
{error, {already_started, _Pid}} ->
ok = ?MODULE:drop_bridge(InstanceId),
{ok, _} = ?MODULE:create_bridge(BridgeConf),
ensure_mqtt_worker_started(InstanceId);
ensure_mqtt_worker_started(InstanceId, BridgeConf);
{error, Reason} ->
{error, Reason}
end.
@ -188,15 +188,17 @@ on_query(_InstId, {send_message, Msg}, AfterQuery, #{name := InstanceId}) ->
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
emqx_resource:query_success(AfterQuery).
on_get_status(_InstId, #{name := InstanceId}) ->
on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
AutoReconn = maps:get(auto_reconnect, Conf, true),
case emqx_connector_mqtt_worker:status(InstanceId) of
connected -> connected;
_ -> disconnected
_ when AutoReconn == true -> connecting;
_ when AutoReconn == false -> disconnected
end.
ensure_mqtt_worker_started(InstanceId) ->
ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
case emqx_connector_mqtt_worker:ensure_started(InstanceId) of
ok -> {ok, #{name => InstanceId}};
ok -> {ok, #{name => InstanceId, bridge_conf => BridgeConf}};
{error, Reason} -> {error, Reason}
end.
@ -240,6 +242,7 @@ basic_config(#{
server => Server,
%% 30s
connect_timeout => 30,
auto_reconnect => true,
reconnect_interval => ReconnIntv,
proto_ver => ProtoVer,
%% Opening bridge_mode will form a non-standard mqtt connection message.

View File

@ -83,7 +83,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected
?assertMatch([], emqx_resource:query(PoolName, test_query_find())),
?assertMatch(undefined, emqx_resource:query(PoolName, test_query_find_one())),
@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected),
?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance
@ -107,7 +107,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch([], emqx_resource:query(PoolName, test_query_find())),
?assertMatch(undefined, emqx_resource:query(PoolName, test_query_find_one())),
% Stop and remove the resource in one go.

View File

@ -82,7 +82,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
@ -102,7 +102,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected),
?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance
@ -113,7 +113,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [[1]]}, emqx_resource:query(PoolName, test_query_with_params())),
?assertMatch(

View File

@ -83,7 +83,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())),
@ -96,7 +96,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected),
?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance
@ -107,7 +107,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_no_params())),
?assertMatch({ok, _, [{1}]}, emqx_resource:query(PoolName, test_query_with_params())),
% Stop and remove the resource in one go.

View File

@ -97,7 +97,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
% Perform query as further check that the resource is working as expected
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
?assertEqual(ok, emqx_resource:stop(PoolName)),
@ -109,7 +109,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(StoppedStatus, disconnected),
?assertEqual({error, stopped}, emqx_resource:health_check(PoolName)),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance
@ -120,7 +120,7 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) ->
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual(ok, emqx_resource:health_check(PoolName)),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),

View File

@ -20,20 +20,21 @@
-type resource_config() :: term().
-type resource_spec() :: map().
-type resource_state() :: term().
-type resource_connection_status() :: connected | disconnected | connecting.
-type resource_status() :: connected | disconnected | connecting.
-type resource_data() :: #{
id := instance_id(),
mod := module(),
config := resource_config(),
state := resource_state(),
status := resource_connection_status(),
status := resource_status(),
metrics := emqx_metrics_worker:metrics()
}.
-type resource_group() :: binary().
-type create_opts() :: #{
health_check_interval => integer(),
health_check_timeout => integer(),
waiting_connect_complete => integer()
waiting_connect_complete => integer(),
auto_retry_interval => integer()
}.
-type after_query() ::
{[OnSuccess :: after_query_fun()], [OnFailed :: after_query_fun()]}

View File

@ -127,8 +127,9 @@
%% when calling emqx_resource:health_check/2
-callback on_get_status(instance_id(), resource_state()) ->
resource_connection_status()
| {resource_connection_status(), resource_state()}.
resource_status()
| {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}.
-spec list_types() -> [module()].
list_types() ->
@ -260,7 +261,7 @@ query(InstId, Request, AfterQuery) ->
erlang:raise(Err, Reason, ST)
end;
{error, not_found} ->
query_error(not_found, <<"the resource id not exists">>)
query_error(not_found, <<"resource not found or not connected">>)
end.
-spec restart(instance_id()) -> ok | {error, Reason :: term()}.
@ -275,7 +276,7 @@ restart(InstId, Opts) ->
stop(InstId) ->
emqx_resource_manager:stop(InstId).
-spec health_check(instance_id()) -> ok | {error, Reason :: term()}.
-spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}.
health_check(InstId) ->
emqx_resource_manager:health_check(InstId).
@ -316,8 +317,9 @@ call_start(InstId, Mod, Config) ->
?SAFE_CALL(Mod:on_start(InstId, Config)).
-spec call_health_check(instance_id(), module(), resource_state()) ->
resource_connection_status()
| {resource_connection_status(), resource_state()}.
resource_status()
| {resource_status(), resource_state()}
| {resource_status(), resource_state(), term()}.
call_health_check(InstId, Mod, ResourceState) ->
?SAFE_CALL(Mod:on_get_status(InstId, ResourceState)).

View File

@ -52,6 +52,8 @@
-define(ETS_TABLE, emqx_resource_manager).
-define(WAIT_FOR_RESOURCE_DELAY, 100).
-define(IS_STATUS(ST), ST =:= connecting; ST =:= connected; ST =:= disconnected).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
@ -188,7 +190,7 @@ list_group(Group) ->
List = ets:match(?ETS_TABLE, {'$1', Group, '_'}),
lists:flatten(List).
-spec health_check(instance_id()) -> ok | {error, Reason :: term()}.
-spec health_check(instance_id()) -> {ok, resource_status()} | {error, term()}.
health_check(InstId) ->
safe_call(InstId, health_check).
@ -209,7 +211,7 @@ start_link(InstId, Group, ResourceType, Config, Opts) ->
gen_statem:start_link({local, proc_name(InstId)}, ?MODULE, Data, []).
init(Data) ->
{ok, connecting, Data}.
{ok, connecting, Data, {next_event, internal, try_connect}}.
terminate(_Reason, _State, Data) ->
ets:delete(?ETS_TABLE, Data#data.id),
@ -238,25 +240,21 @@ handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)},
{keep_state_and_data, [{reply, From, Reply}]};
% An external health check call. Disconnected usually means an error has happened.
handle_event({call, From}, health_check, disconnected, Data) ->
Actions = [{reply, From, {error, Data#data.error}}],
{keep_state_and_data, Actions};
% Connecting state enter
handle_event(enter, connecting, connecting, Data) ->
handle_event(internal, try_connect, connecting, Data) ->
handle_connection_attempt(Data);
handle_event(enter, _OldState, connecting, Data) ->
ets:delete(?ETS_TABLE, Data#data.id),
Actions = [{state_timeout, 0, health_check}],
{next_state, connecting, Data, Actions};
% Connecting state health_check timeouts.
% First clause supports desired behavior on initial connection.
handle_event(state_timeout, health_check, connecting, #data{status = disconnected} = Data) ->
{next_state, disconnected, Data};
handle_event(state_timeout, health_check, connecting, Data) ->
connecting_health_check(Data);
%% The connected state is entered after a successful start of the callback mod
%% and successful health_checks
handle_event(enter, _OldState, connected, Data) ->
ets:insert(?ETS_TABLE, {Data#data.id, Data#data.group, Data}),
_ = emqx_alarm:deactivate(Data#data.id),
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
{next_state, connected, Data, Actions};
handle_event(state_timeout, health_check, connected, Data) ->
@ -270,11 +268,11 @@ handle_event(enter, _OldState, stopped, Data) ->
ets:delete(?ETS_TABLE, Data#data.id),
{next_state, stopped, UpdatedData};
% Resource has been explicitly stopped, so return that as the error reason.
handle_event({call, From}, health_check, stopped, _Data) ->
Actions = [{reply, From, {error, stopped}}],
handle_event({call, From}, _, stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions};
handle_event({call, From}, health_check, _State, Data) ->
handle_health_check_event(From, Data);
handle_health_check_request(From, Data);
% Ignore all other events
handle_event(EventType, EventData, State, Data) ->
?SLOG(
@ -296,7 +294,7 @@ handle_event(EventType, EventData, State, Data) ->
handle_disconnected_state_enter(Data) ->
UpdatedData = Data#data{status = disconnected},
ets:delete(?ETS_TABLE, Data#data.id),
case maps:get(auto_retry_interval, Data#data.config, undefined) of
case maps:get(auto_retry_interval, Data#data.opts, undefined) of
undefined ->
{next_state, disconnected, UpdatedData};
RetryInterval ->
@ -315,8 +313,7 @@ handle_connection_attempt(Data) ->
%% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made.
UpdatedData = Data#data{status = disconnected, error = Reason},
Actions = [{state_timeout, 0, health_check}],
{next_state, connecting, UpdatedData, Actions}
{next_state, disconnected, UpdatedData}
end.
handle_remove_event(From, ClearMetrics, Data) ->
@ -352,65 +349,65 @@ proc_name(Id) ->
Connector = <<"_">>,
binary_to_atom(<<Module/binary, Connector/binary, Id/binary>>).
handle_health_check_event(From, Data) ->
case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of
connected ->
UpdatedData = Data#data{status = connected, error = undefined},
update_resource(Data#data.id, Data#data.group, UpdatedData),
Actions = [{reply, From, ok}],
{next_state, connected, UpdatedData, Actions};
{connected, NewResourceState} ->
UpdatedData = Data#data{
state = NewResourceState, status = connected, error = undefined
},
update_resource(Data#data.id, Data#data.group, UpdatedData),
Actions = [{reply, From, ok}],
{next_state, connected, UpdatedData, Actions};
ConnectStatus ->
logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]),
UpdatedData = Data#data{status = connecting, error = ConnectStatus},
ets:delete(?ETS_TABLE, Data#data.id),
Actions = [{reply, From, {error, ConnectStatus}}],
{next_state, connecting, UpdatedData, Actions}
end.
handle_health_check_request(From, Data) ->
with_health_check(Data, fun(Status, UpdatedData) ->
Actions = [{reply, From, {ok, Status}}],
{next_state, Status, UpdatedData, Actions}
end).
connecting_health_check(Data) ->
case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of
connected ->
UpdatedData = Data#data{status = connected, error = undefined},
update_resource(Data#data.id, Data#data.group, UpdatedData),
{next_state, connected, UpdatedData};
{connected, NewResourceState} ->
UpdatedData = Data#data{
state = NewResourceState, status = connected, error = undefined
},
update_resource(Data#data.id, Data#data.group, UpdatedData),
{next_state, connected, UpdatedData};
ConnectStatus ->
logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]),
UpdatedData = Data#data{status = connecting, error = ConnectStatus},
Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}],
{keep_state, UpdatedData, Actions}
end.
with_health_check(
Data,
fun
(connected, UpdatedData) ->
{next_state, connected, UpdatedData};
(connecting, UpdatedData) ->
Actions = [{state_timeout, ?SHORT_HEALTHCHECK_INTERVAL, health_check}],
{keep_state, UpdatedData, Actions};
(disconnected, UpdatedData) ->
{next_state, disconnected, UpdatedData}
end
).
perform_connected_health_check(Data) ->
case emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state) of
connected ->
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
{keep_state_and_data, Actions};
{connected, NewResourceState} ->
UpdatedData = Data#data{
state = NewResourceState, status = connected, error = undefined
},
update_resource(Data#data.id, Data#data.group, UpdatedData),
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
{keep_state, NewResourceState, Actions};
ConnectStatus ->
logger:error("health check for ~p failed: ~p", [Data#data.id, ConnectStatus]),
UpdatedData = Data#data{error = ConnectStatus},
ets:delete(?ETS_TABLE, Data#data.id),
{next_state, connecting, UpdatedData}
end.
with_health_check(
Data,
fun
(connected, UpdatedData) ->
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
{keep_state, UpdatedData, Actions};
(Status, UpdatedData) ->
logger:error("health check for ~p failed: ~p", [Data#data.id, Status]),
{next_state, Status, UpdatedData}
end
).
with_health_check(Data, Func) ->
ResId = Data#data.id,
HCRes = emqx_resource:call_health_check(ResId, Data#data.mod, Data#data.state),
{Status, NewState, Err} = parse_health_check_result(HCRes, Data#data.state),
_ =
case Status of
connected ->
ok;
_ ->
emqx_alarm:activate(
ResId,
#{resource_id => ResId, reason => resource_down},
<<"resource down: ", ResId/binary>>
)
end,
UpdatedData = Data#data{
state = NewState, status = Status, error = Err
},
Func(Status, UpdatedData).
parse_health_check_result(Status, OldState) when ?IS_STATUS(Status) ->
{Status, OldState, undefined};
parse_health_check_result({Status, NewState}, _OldState) when ?IS_STATUS(Status) ->
{Status, NewState, undefined};
parse_health_check_result({Status, NewState, Error}, _OldState) when ?IS_STATUS(Status) ->
{Status, NewState, Error}.
data_record_to_external_map_with_metrics(Data) ->
#{
@ -448,6 +445,3 @@ safe_call(InstId, Message) ->
exit:_ ->
{error, not_found}
end.
update_resource(InstId, Group, Data) ->
ets:insert(?ETS_TABLE, {InstId, Group, Data}).

View File

@ -182,7 +182,7 @@ t_healthy(_) ->
timer:sleep(300),
emqx_resource:set_resource_status_connecting(?ID),
ok = emqx_resource:health_check(?ID),
{ok, connected} = emqx_resource:health_check(?ID),
?assertMatch(
[#{status := connected}],
@ -191,7 +191,7 @@ t_healthy(_) ->
erlang:exit(Pid, shutdown),
?assertEqual({error, connecting}, emqx_resource:health_check(?ID)),
?assertEqual({ok, connecting}, emqx_resource:health_check(?ID)),
?assertMatch(
[],
@ -361,11 +361,14 @@ t_reset_metrics(_) ->
?assertNot(is_process_alive(Pid)).
t_auto_retry(_) ->
{Res, _} = emqx_resource:create_dry_run_local(
{Res, _} = emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, create_error => true, auto_retry_interval => 1000}
#{name => test_resource, create_error => true},
#{auto_retry_interval => 100}
),
?assertEqual(error, Res).
?assertEqual(ok, Res).
%%------------------------------------------------------------------------------
%% Helpers