feat(resource manager): perform non-blocking resource health checks

Fixes https://emqx.atlassian.net/browse/EMQX-12015

This introduces only _resource_ non-blocking health checks.  _Channel_ non-blocking health
checks may be introduced later.
This commit is contained in:
Thales Macedo Garitezi 2024-03-28 18:28:54 -03:00
parent 908e7b42f3
commit bade09b56e
7 changed files with 291 additions and 129 deletions

View File

@ -825,22 +825,47 @@ do_start_stop_bridges(Type, Config) ->
%% Connecting to this endpoint should always timeout %% Connecting to this endpoint should always timeout
BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])), BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])),
BadName = <<"bad_", (atom_to_binary(Type))/binary>>, BadName = <<"bad_", (atom_to_binary(Type))/binary>>,
CreateRes0 = request_json(
post,
uri(["bridges"]),
?MQTT_BRIDGE(BadServer, BadName),
Config
),
?assertMatch( ?assertMatch(
{ok, 201, #{ {ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_MQTT, <<"type">> := ?BRIDGE_TYPE_MQTT,
<<"name">> := BadName, <<"name">> := BadName,
<<"enable">> := true, <<"enable">> := true,
<<"server">> := BadServer, <<"server">> := BadServer
<<"status">> := <<"connecting">>,
<<"node_status">> := [_ | _]
}}, }},
request_json( CreateRes0
post,
uri(["bridges"]),
?MQTT_BRIDGE(BadServer, BadName),
Config
)
), ),
{ok, 201, CreateRes1} = CreateRes0,
case CreateRes1 of
#{
<<"node_status">> := [
#{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"connack_timeout">>
},
#{<<"status">> := <<"connecting">>}
| _
],
%% `inconsistent': one node is `?status_disconnected' (because it has already
%% timed out), the other node is `?status_connecting' (started later and
%% haven't timed out yet)
<<"status">> := <<"inconsistent">>,
<<"status_reason">> := <<"connack_timeout">>
} ->
ok;
#{
<<"node_status">> := [_],
<<"status">> := <<"connecting">>
} ->
ok;
_ ->
error({unexpected_result, CreateRes1})
end,
BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
?assertMatch( ?assertMatch(
%% request from product: return 400 on such errors %% request from product: return 400 on such errors

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_resource, [ {application, emqx_resource, [
{description, "Manager for all external resources"}, {description, "Manager for all external resources"},
{vsn, "0.1.28"}, {vsn, "0.1.29"},
{registered, []}, {registered, []},
{mod, {emqx_resource_app, []}}, {mod, {emqx_resource_app, []}},
{applications, [ {applications, [

View File

@ -60,6 +60,9 @@
% Behaviour % Behaviour
-export([init/1, callback_mode/0, handle_event/4, terminate/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
%% Internal exports.
-export([worker_resource_health_check/1]).
% State record % State record
-record(data, { -record(data, {
id, id,
@ -73,7 +76,15 @@
state, state,
error, error,
pid, pid,
added_channels, added_channels = #{},
%% Reference to process performing resource health check.
hc_workers = #{resource => #{}, channel => #{}} :: #{
resource | channel := #{{pid(), reference()} => true}
},
%% Callers waiting on health check
hc_pending_callers = #{resource => [], channel => []} :: #{
resource | channel := [gen_server:from()]
},
extra extra
}). }).
-type data() :: #data{}. -type data() :: #data{}.
@ -153,13 +164,13 @@ create(ResId, Group, ResourceType, Config, Opts) ->
case SpawnBufferWorkers andalso lists:member(QueryMode, [sync, async]) of case SpawnBufferWorkers andalso lists:member(QueryMode, [sync, async]) of
true -> true ->
%% start resource workers as the query type requires them %% start resource workers as the query type requires them
ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts), ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts);
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of false ->
true -> ok
wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); end,
false -> case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
ok true ->
end; wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
false -> false ->
ok ok
end. end.
@ -455,7 +466,7 @@ handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
Reply = {ok, Group, data_record_to_external_map(Data)}, Reply = {ok, Group, data_record_to_external_map(Data)},
{keep_state_and_data, [{reply, From, Reply}]}; {keep_state_and_data, [{reply, From, Reply}]};
% Called when doing a manually health check. % Called when doing a manual health check.
handle_event({call, From}, health_check, ?state_stopped, _Data) -> handle_event({call, From}, health_check, ?state_stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}], Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions}; {keep_state_and_data, Actions};
@ -463,9 +474,9 @@ handle_event({call, From}, {channel_health_check, _}, ?state_stopped, _Data) ->
Actions = [{reply, From, {error, resource_is_stopped}}], Actions = [{reply, From, {error, resource_is_stopped}}],
{keep_state_and_data, Actions}; {keep_state_and_data, Actions};
handle_event({call, From}, health_check, _State, Data) -> handle_event({call, From}, health_check, _State, Data) ->
handle_manually_health_check(From, Data); handle_manual_resource_health_check(From, Data);
handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) -> handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) ->
handle_manually_channel_health_check(From, Data, ChannelId); handle_manual_channel_health_check(From, Data, ChannelId);
% State: CONNECTING % State: CONNECTING
handle_event(enter, _OldState, ?state_connecting = State, Data) -> handle_event(enter, _OldState, ?state_connecting = State, Data) ->
ok = log_status_consistency(State, Data), ok = log_status_consistency(State, Data),
@ -473,7 +484,7 @@ handle_event(enter, _OldState, ?state_connecting = State, Data) ->
handle_event(internal, start_resource, ?state_connecting, Data) -> handle_event(internal, start_resource, ?state_connecting, Data) ->
start_resource(Data, undefined); start_resource(Data, undefined);
handle_event(state_timeout, health_check, ?state_connecting, Data) -> handle_event(state_timeout, health_check, ?state_connecting, Data) ->
handle_connecting_health_check(Data); start_resource_health_check(Data);
handle_event( handle_event(
{call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data {call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data
) -> ) ->
@ -487,7 +498,7 @@ handle_event(enter, _OldState, ?state_connected = State, Data) ->
?tp(resource_connected_enter, #{}), ?tp(resource_connected_enter, #{}),
{keep_state_and_data, health_check_actions(Data)}; {keep_state_and_data, health_check_actions(Data)};
handle_event(state_timeout, health_check, ?state_connected, Data) -> handle_event(state_timeout, health_check, ?state_connected, Data) ->
handle_connected_health_check(Data); start_resource_health_check(Data);
handle_event( handle_event(
{call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data {call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data
) -> ) ->
@ -523,6 +534,15 @@ handle_event(
) -> ) ->
Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod), Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod),
{keep_state_and_data, {reply, From, {ok, Channels}}}; {keep_state_and_data, {reply, From, {ok, Channels}}};
handle_event(
info,
{'DOWN', Ref, process, Pid, Res},
State0,
Data0 = #data{hc_workers = #{resource := HCWorkers}}
) when
is_map_key({Pid, Ref}, HCWorkers)
->
handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res);
% Ignore all other events % Ignore all other events
handle_event(EventType, EventData, State, Data) -> handle_event(EventType, EventData, State, Data) ->
?SLOG( ?SLOG(
@ -835,18 +855,127 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) ->
_ = maybe_clear_alarm(ChannelId), _ = maybe_clear_alarm(ChannelId),
{keep_state, update_state(NewData, Data), [{reply, From, ok}]}. {keep_state, update_state(NewData, Data), [{reply, From, ok}]}.
handle_manually_health_check(From, Data) -> handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when
with_health_check( map_size(HCWorkers) > 0
Data, ->
fun(Status, UpdatedData) -> %% ongoing health check
Actions = [{reply, From, {ok, Status}}], #data{hc_pending_callers = Pending0 = #{resource := RPending0}} = Data0,
{next_state, Status, channels_health_check(Status, UpdatedData), Actions} Pending = Pending0#{resource := [From | RPending0]},
end Data = Data0#data{hc_pending_callers = Pending},
). {keep_state, Data};
handle_manual_resource_health_check(From, Data0) ->
#data{hc_pending_callers = Pending0 = #{resource := RPending0}} = Data0,
Pending = Pending0#{resource := [From | RPending0]},
Data = Data0#data{hc_pending_callers = Pending},
start_resource_health_check(Data).
handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) -> reply_pending_health_check_callers(Status, resource, Data0 = #data{hc_pending_callers = Pending0}) ->
#{resource := RPending} = Pending0,
Actions = [{reply, From, {ok, Status}} || From <- RPending],
Data = Data0#data{hc_pending_callers = Pending0#{resource := []}},
{Actions, Data}.
start_resource_health_check(#data{state = undefined} = Data) ->
%% No resource running, thus disconnected.
%% A health check spawn when state is undefined can only happen when someone manually
%% asks for a health check and the resource could not initialize or has not had enough
%% time to do so. Let's assume the continuation is as if we were `?status_connecting'.
continue_resource_health_check_not_connected(?status_disconnected, Data);
start_resource_health_check(#data{hc_workers = #{resource := HCWorkers}}) when
map_size(HCWorkers) > 0
->
%% Already ongoing
keep_state_and_data;
start_resource_health_check(#data{} = Data0) ->
#data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
WorkerRef = {_Pid, _Ref} = spawn_health_check_worker(Data0),
HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}},
Data = Data0#data{hc_workers = HCWorkers},
{keep_state, Data}.
-spec spawn_health_check_worker(data()) -> {pid(), reference()}.
spawn_health_check_worker(#data{} = Data) ->
spawn_monitor(?MODULE, worker_resource_health_check, [Data]).
%% separated so it can be spec'ed and placate dialyzer tantrums...
-spec worker_resource_health_check(data()) -> no_return().
worker_resource_health_check(Data) ->
HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state),
exit({ok, HCRes}).
handle_resource_health_check_worker_down(CurrentState, Data0, WorkerRef, ExitResult) ->
#data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0,
HCWorkers = HCWorkers0#{resource := maps:remove(WorkerRef, RHCWorkers0)},
Data1 = Data0#data{hc_workers = HCWorkers},
case ExitResult of
{ok, HCRes} ->
continue_with_health_check(Data1, CurrentState, HCRes);
_ ->
%% Unexpected: `emqx_resource:call_health_check' catches all exceptions.
continue_with_health_check(Data1, CurrentState, {error, ExitResult})
end.
continue_with_health_check(#data{} = Data0, CurrentState, HCRes) ->
#data{
id = ResId,
error = PrevError
} = Data0,
{NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0),
_ = maybe_alarm(NewStatus, ResId, Err, PrevError),
ok = maybe_resume_resource_workers(ResId, NewStatus),
Data1 = Data0#data{
state = NewState, status = NewStatus, error = Err
},
Data = update_state(Data1, Data0),
case CurrentState of
?state_connected ->
continue_resource_health_check_connected(NewStatus, Data);
_ ->
%% `?state_connecting' | `?state_disconnected' | `?state_stopped'
continue_resource_health_check_not_connected(NewStatus, Data)
end.
%% Continuation to be used when the current resource state is `?state_connected'.
continue_resource_health_check_connected(NewStatus, Data0) ->
case NewStatus of
?status_connected ->
{Replies, Data1} = reply_pending_health_check_callers(NewStatus, resource, Data0),
Data2 = channels_health_check(?status_connected, Data1),
Data = update_state(Data2, Data0),
Actions = Replies ++ health_check_actions(Data),
{keep_state, Data, Actions};
_ ->
?SLOG(warning, #{
msg => "health_check_failed",
id => Data0#data.id,
status => NewStatus
}),
%% Note: works because, coincidentally, channel/resource status is a
%% subset of resource manager state... But there should be a conversion
%% between the two here, as resource manager also has `stopped', which is
%% not a valid status at the time of writing.
{Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
{next_state, NewStatus, channels_health_check(NewStatus, Data), Replies}
end.
%% Continuation to be used when the current resource state is not `?state_connected'.
continue_resource_health_check_not_connected(NewStatus, Data0) ->
{Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0),
case NewStatus of
?status_connected ->
{next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies};
?status_connecting ->
Actions = Replies ++ health_check_actions(Data),
{next_state, ?status_connecting, channels_health_check(?status_connecting, Data),
Actions};
?status_disconnected ->
{next_state, ?state_disconnected, channels_health_check(?status_disconnected, Data),
Replies}
end.
handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) ->
{keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]}; {keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]};
handle_manually_channel_health_check( handle_manual_channel_health_check(
From, From,
#data{added_channels = Channels} = _Data, #data{added_channels = Channels} = _Data,
ChannelId ChannelId
@ -854,7 +983,7 @@ handle_manually_channel_health_check(
is_map_key(ChannelId, Channels) is_map_key(ChannelId, Channels)
-> ->
{keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]}; {keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]};
handle_manually_channel_health_check( handle_manual_channel_health_check(
From, From,
_Data, _Data,
_ChannelId _ChannelId
@ -865,56 +994,6 @@ get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, Ch
RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State),
channel_status(RawStatus). channel_status(RawStatus).
handle_connecting_health_check(Data) ->
with_health_check(
Data,
fun
(?status_connected, UpdatedData) ->
{next_state, ?state_connected,
channels_health_check(?status_connected, UpdatedData)};
(?status_connecting, UpdatedData) ->
{keep_state, channels_health_check(?status_connecting, UpdatedData),
health_check_actions(UpdatedData)};
(?status_disconnected, UpdatedData) ->
{next_state, ?state_disconnected,
channels_health_check(?status_disconnected, UpdatedData)}
end
).
handle_connected_health_check(Data) ->
with_health_check(
Data,
fun
(?status_connected, UpdatedData0) ->
UpdatedData1 = channels_health_check(?status_connected, UpdatedData0),
{keep_state, UpdatedData1, health_check_actions(UpdatedData1)};
(Status, UpdatedData) ->
?SLOG(warning, #{
msg => "health_check_failed",
id => Data#data.id,
status => Status
}),
%% Note: works because, coincidentally, channel/resource status is a
%% subset of resource manager state... But there should be a conversion
%% between the two here, as resource manager also has `stopped', which is
%% not a valid status at the time of writing.
{next_state, Status, channels_health_check(Status, UpdatedData)}
end
).
with_health_check(#data{state = undefined} = Data, Func) ->
Func(disconnected, Data);
with_health_check(#data{error = PrevError} = Data, Func) ->
ResId = Data#data.id,
HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state),
{Status, NewState, Err} = parse_health_check_result(HCRes, Data),
_ = maybe_alarm(Status, ResId, Err, PrevError),
ok = maybe_resume_resource_workers(ResId, Status),
UpdatedData = Data#data{
state = NewState, status = Status, error = Err
},
Func(Status, update_state(UpdatedData, Data)).
-spec channels_health_check(resource_status(), data()) -> data(). -spec channels_health_check(resource_status(), data()) -> data().
channels_health_check(?status_connected = _ConnectorStatus, Data0) -> channels_health_check(?status_connected = _ConnectorStatus, Data0) ->
Channels = maps:to_list(Data0#data.added_channels), Channels = maps:to_list(Data0#data.added_channels),
@ -1097,9 +1176,15 @@ update_state(Data) ->
update_state(DataWas, DataWas) -> update_state(DataWas, DataWas) ->
DataWas; DataWas;
update_state(Data, _DataWas) -> update_state(Data, _DataWas) ->
_ = insert_cache(Data#data.id, Data), _ = insert_cache(Data#data.id, remove_runtime_data(Data)),
Data. Data.
remove_runtime_data(#data{} = Data0) ->
Data0#data{
hc_workers = #{resource => #{}, channel => #{}},
hc_pending_callers = #{resource => [], channel => []}
}.
health_check_interval(Opts) -> health_check_interval(Opts) ->
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).

View File

@ -18,6 +18,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-behaviour(emqx_resource). -behaviour(emqx_resource).
@ -276,15 +277,22 @@ batch_individual_reply({async, ReplyFunAndArgs}, InstId, Batch, State) ->
on_get_status(_InstId, #{health_check_error := true}) -> on_get_status(_InstId, #{health_check_error := true}) ->
?tp(connector_demo_health_check_error, #{}), ?tp(connector_demo_health_check_error, #{}),
disconnected; ?status_disconnected;
on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) -> on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) ->
?tp(connector_demo_health_check_error, #{}), ?tp(connector_demo_health_check_error, #{}),
{disconnected, State, Message}; {?status_disconnected, State, Message};
on_get_status(_InstId, #{pid := Pid, health_check_error := {delay, Delay}}) ->
?tp(connector_demo_health_check_delay, #{}),
timer:sleep(Delay),
case is_process_alive(Pid) of
true -> ?status_connected;
false -> ?status_disconnected
end;
on_get_status(_InstId, #{pid := Pid}) -> on_get_status(_InstId, #{pid := Pid}) ->
timer:sleep(300), timer:sleep(300),
case is_process_alive(Pid) of case is_process_alive(Pid) of
true -> connected; true -> ?status_connected;
false -> disconnected false -> ?status_disconnected
end. end.
spawn_counter_process(Name, Register) -> spawn_counter_process(Name, Register) ->

View File

@ -52,12 +52,20 @@ end_per_testcase(_, _Config) ->
init_per_suite(Config) -> init_per_suite(Config) ->
code:ensure_loaded(?TEST_RESOURCE), code:ensure_loaded(?TEST_RESOURCE),
ok = emqx_common_test_helpers:start_apps([emqx_conf]), Apps = emqx_cth_suite:start(
{ok, _} = application:ensure_all_started(emqx_resource), [
Config. emqx,
emqx_conf,
emqx_resource
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
end_per_suite(_Config) -> end_per_suite(Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_conf]). Apps = proplists:get_value(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Tests %% Tests
@ -115,10 +123,7 @@ t_create_remove(_) ->
?assertNot(is_process_alive(Pid)) ?assertNot(is_process_alive(Pid))
end, end,
fun(Trace) -> [log_consistency_prop()]
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
). ).
t_create_remove_local(_) -> t_create_remove_local(_) ->
@ -174,10 +179,7 @@ t_create_remove_local(_) ->
?assertNot(is_process_alive(Pid)) ?assertNot(is_process_alive(Pid))
end, end,
fun(Trace) -> [log_consistency_prop()]
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
). ).
t_do_not_start_after_created(_) -> t_do_not_start_after_created(_) ->
@ -219,10 +221,7 @@ t_do_not_start_after_created(_) ->
?assertNot(is_process_alive(Pid2)) ?assertNot(is_process_alive(Pid2))
end, end,
fun(Trace) -> [log_consistency_prop()]
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
). ).
t_query(_) -> t_query(_) ->
@ -855,14 +854,12 @@ t_healthy_timeout(_) ->
), ),
?assertEqual(ok, emqx_resource:remove_local(?ID)) ?assertEqual(ok, emqx_resource:remove_local(?ID))
end, end,
fun(Trace) -> [log_consistency_prop()]
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
). ).
t_healthy(_) -> t_healthy(_) ->
?check_trace( ?check_trace(
#{timetrap => 10_000},
begin begin
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
@ -873,10 +870,13 @@ t_healthy(_) ->
#{name => test_resource} #{name => test_resource}
) )
), ),
ct:pal("getting state"),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
timer:sleep(300), timer:sleep(300),
ct:pal("setting state as `connecting`"),
emqx_resource:set_resource_status_connecting(?ID), emqx_resource:set_resource_status_connecting(?ID),
ct:pal("health check"),
?assertEqual({ok, connected}, emqx_resource:health_check(?ID)), ?assertEqual({ok, connected}, emqx_resource:health_check(?ID)),
?assertMatch( ?assertMatch(
[#{status := connected}], [#{status := connected}],
@ -894,10 +894,7 @@ t_healthy(_) ->
?assertEqual(ok, emqx_resource:remove_local(?ID)) ?assertEqual(ok, emqx_resource:remove_local(?ID))
end, end,
fun(Trace) -> [log_consistency_prop()]
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
). ).
t_unhealthy_target(_) -> t_unhealthy_target(_) ->
@ -1005,11 +1002,7 @@ t_stop_start(_) ->
?assertEqual(ok, emqx_resource:stop(?ID)), ?assertEqual(ok, emqx_resource:stop(?ID)),
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)) ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID))
end, end,
[log_consistency_prop()]
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
). ).
t_stop_start_local(_) -> t_stop_start_local(_) ->
@ -1064,10 +1057,7 @@ t_stop_start_local(_) ->
?assert(is_process_alive(Pid1)) ?assert(is_process_alive(Pid1))
end, end,
fun(Trace) -> [log_consistency_prop()]
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
). ).
t_list_filter(_) -> t_list_filter(_) ->
@ -1269,10 +1259,7 @@ t_health_check_disconnected(_) ->
emqx_resource:health_check(?ID) emqx_resource:health_check(?ID)
) )
end, end,
fun(Trace) -> [log_consistency_prop()]
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
). ).
t_unblock_only_required_buffer_workers(_) -> t_unblock_only_required_buffer_workers(_) ->
@ -3116,6 +3103,44 @@ t_telemetry_handler_crash(_Config) ->
), ),
ok. ok.
t_non_blocking_resource_health_check(_Config) ->
?check_trace(
begin
{ok, _} =
create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, health_check_error => {delay, 1_000}},
#{health_check_interval => 100}
),
%% concurrently attempt to health check the resource; should do it only once
%% for all callers
NumCallers = 20,
Expected = lists:duplicate(NumCallers, {ok, connected}),
?assertEqual(
Expected,
emqx_utils:pmap(
fun(_) -> emqx_resource:health_check(?ID) end,
lists:seq(1, NumCallers)
)
),
NumCallers
end,
[
log_consistency_prop(),
fun(NumCallers, Trace) ->
%% shouldn't have one health check per caller
SubTrace = ?of_kind(connector_demo_health_check_delay, Trace),
?assertMatch([_ | _], SubTrace),
?assert(length(SubTrace) < (NumCallers div 2), #{trace => Trace}),
ok
end
]
),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -3373,3 +3398,10 @@ create(Id, Group, Type, Config) ->
create(Id, Group, Type, Config, Opts) -> create(Id, Group, Type, Config, Opts) ->
emqx_resource:create_local(Id, Group, Type, Config, Opts). emqx_resource:create_local(Id, Group, Type, Config, Opts).
log_consistency_prop() ->
{"check state and cache consistency", fun ?MODULE:log_consistency_prop/1}.
log_consistency_prop(Trace) ->
?assertEqual([], ?of_kind("inconsistent_status", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace)),
ok.

View File

@ -65,8 +65,11 @@ redact(Term, Checker) ->
redact_headers(Term) -> redact_headers(Term) ->
do_redact_headers(Term). do_redact_headers(Term).
do_redact(L, Checker) when is_list(L) -> do_redact([], _Checker) ->
lists:map(fun(E) -> do_redact(E, Checker) end, L); [];
do_redact([X | Xs], Checker) ->
%% Note: we could be dealing with an improper list
[do_redact(X, Checker) | do_redact(Xs, Checker)];
do_redact(M, Checker) when is_map(M) -> do_redact(M, Checker) when is_map(M) ->
maps:map( maps:map(
fun(K, V) -> fun(K, V) ->
@ -252,6 +255,14 @@ redact2_test_() ->
Keys = [secret, passcode], Keys = [secret, passcode],
[{case_name(atom, Key), fun() -> Case(Key, Checker) end} || Key <- Keys]. [{case_name(atom, Key), fun() -> Case(Key, Checker) end} || Key <- Keys].
redact_improper_list_test_() ->
%% improper lists: check that we don't crash
%% may arise when we redact process states with pending `gen' requests
[
?_assertEqual([alias | foo], redact([alias | foo])),
?_assertEqual([1, 2 | foo], redact([1, 2 | foo]))
].
deobfuscate_test() -> deobfuscate_test() ->
NewConf0 = #{foo => <<"bar0">>, password => <<"123456">>}, NewConf0 = #{foo => <<"bar0">>, password => <<"123456">>},
?assertEqual(NewConf0, deobfuscate(NewConf0, #{foo => <<"bar">>, password => <<"654321">>})), ?assertEqual(NewConf0, deobfuscate(NewConf0, #{foo => <<"bar">>, password => <<"654321">>})),

View File

@ -0,0 +1 @@
Made resource health checks non-blocking operations. This means that operations such as updating or removing a resource won't be blocked by a lengthy running health check.