From bade09b56e35c80509ef04d605ac5cf8f0947566 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 28 Mar 2024 18:28:54 -0300 Subject: [PATCH] feat(resource manager): perform non-blocking resource health checks Fixes https://emqx.atlassian.net/browse/EMQX-12015 This introduces only _resource_ non-blocking health checks. _Channel_ non-blocking health checks may be introduced later. --- .../test/emqx_bridge_api_SUITE.erl | 43 +++- apps/emqx_resource/src/emqx_resource.app.src | 2 +- .../src/emqx_resource_manager.erl | 235 ++++++++++++------ .../test/emqx_connector_demo.erl | 16 +- .../test/emqx_resource_SUITE.erl | 108 +++++--- apps/emqx_utils/src/emqx_utils_redact.erl | 15 +- changes/ce/fix-12812.en.md | 1 + 7 files changed, 291 insertions(+), 129 deletions(-) create mode 100644 changes/ce/fix-12812.en.md diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 30b6c8b34..1971ad697 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -825,22 +825,47 @@ do_start_stop_bridges(Type, Config) -> %% Connecting to this endpoint should always timeout BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])), BadName = <<"bad_", (atom_to_binary(Type))/binary>>, + CreateRes0 = request_json( + post, + uri(["bridges"]), + ?MQTT_BRIDGE(BadServer, BadName), + Config + ), ?assertMatch( {ok, 201, #{ <<"type">> := ?BRIDGE_TYPE_MQTT, <<"name">> := BadName, <<"enable">> := true, - <<"server">> := BadServer, - <<"status">> := <<"connecting">>, - <<"node_status">> := [_ | _] + <<"server">> := BadServer }}, - request_json( - post, - uri(["bridges"]), - ?MQTT_BRIDGE(BadServer, BadName), - Config - ) + CreateRes0 ), + {ok, 201, CreateRes1} = CreateRes0, + case CreateRes1 of + #{ + <<"node_status">> := [ + #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"connack_timeout">> + }, + #{<<"status">> := <<"connecting">>} + | _ + ], + %% `inconsistent': one node is `?status_disconnected' (because it has already + %% timed out), the other node is `?status_connecting' (started later and + %% haven't timed out yet) + <<"status">> := <<"inconsistent">>, + <<"status_reason">> := <<"connack_timeout">> + } -> + ok; + #{ + <<"node_status">> := [_], + <<"status">> := <<"connecting">> + } -> + ok; + _ -> + error({unexpected_result, CreateRes1}) + end, BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), ?assertMatch( %% request from product: return 400 on such errors diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 272dd4e08..913cc5e8c 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.28"}, + {vsn, "0.1.29"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 60bc3c7e9..06123a935 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -60,6 +60,9 @@ % Behaviour -export([init/1, callback_mode/0, handle_event/4, terminate/3]). +%% Internal exports. +-export([worker_resource_health_check/1]). + % State record -record(data, { id, @@ -73,7 +76,15 @@ state, error, pid, - added_channels, + added_channels = #{}, + %% Reference to process performing resource health check. + hc_workers = #{resource => #{}, channel => #{}} :: #{ + resource | channel := #{{pid(), reference()} => true} + }, + %% Callers waiting on health check + hc_pending_callers = #{resource => [], channel => []} :: #{ + resource | channel := [gen_server:from()] + }, extra }). -type data() :: #data{}. @@ -153,13 +164,13 @@ create(ResId, Group, ResourceType, Config, Opts) -> case SpawnBufferWorkers andalso lists:member(QueryMode, [sync, async]) of true -> %% start resource workers as the query type requires them - ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts), - case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of - true -> - wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); - false -> - ok - end; + ok = emqx_resource_buffer_worker_sup:start_workers(ResId, Opts); + false -> + ok + end, + case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of + true -> + wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); false -> ok end. @@ -455,7 +466,7 @@ handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> Reply = {ok, Group, data_record_to_external_map(Data)}, {keep_state_and_data, [{reply, From, Reply}]}; -% Called when doing a manually health check. +% Called when doing a manual health check. handle_event({call, From}, health_check, ?state_stopped, _Data) -> Actions = [{reply, From, {error, resource_is_stopped}}], {keep_state_and_data, Actions}; @@ -463,9 +474,9 @@ handle_event({call, From}, {channel_health_check, _}, ?state_stopped, _Data) -> Actions = [{reply, From, {error, resource_is_stopped}}], {keep_state_and_data, Actions}; handle_event({call, From}, health_check, _State, Data) -> - handle_manually_health_check(From, Data); + handle_manual_resource_health_check(From, Data); handle_event({call, From}, {channel_health_check, ChannelId}, _State, Data) -> - handle_manually_channel_health_check(From, Data, ChannelId); + handle_manual_channel_health_check(From, Data, ChannelId); % State: CONNECTING handle_event(enter, _OldState, ?state_connecting = State, Data) -> ok = log_status_consistency(State, Data), @@ -473,7 +484,7 @@ handle_event(enter, _OldState, ?state_connecting = State, Data) -> handle_event(internal, start_resource, ?state_connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, ?state_connecting, Data) -> - handle_connecting_health_check(Data); + start_resource_health_check(Data); handle_event( {call, From}, {remove_channel, ChannelId}, ?state_connecting = _State, Data ) -> @@ -487,7 +498,7 @@ handle_event(enter, _OldState, ?state_connected = State, Data) -> ?tp(resource_connected_enter, #{}), {keep_state_and_data, health_check_actions(Data)}; handle_event(state_timeout, health_check, ?state_connected, Data) -> - handle_connected_health_check(Data); + start_resource_health_check(Data); handle_event( {call, From}, {add_channel, ChannelId, Config}, ?state_connected = _State, Data ) -> @@ -523,6 +534,15 @@ handle_event( ) -> Channels = emqx_resource:call_get_channels(Data#data.id, Data#data.mod), {keep_state_and_data, {reply, From, {ok, Channels}}}; +handle_event( + info, + {'DOWN', Ref, process, Pid, Res}, + State0, + Data0 = #data{hc_workers = #{resource := HCWorkers}} +) when + is_map_key({Pid, Ref}, HCWorkers) +-> + handle_resource_health_check_worker_down(State0, Data0, {Pid, Ref}, Res); % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -835,18 +855,127 @@ handle_not_connected_and_not_connecting_remove_channel(From, ChannelId, Data) -> _ = maybe_clear_alarm(ChannelId), {keep_state, update_state(NewData, Data), [{reply, From, ok}]}. -handle_manually_health_check(From, Data) -> - with_health_check( - Data, - fun(Status, UpdatedData) -> - Actions = [{reply, From, {ok, Status}}], - {next_state, Status, channels_health_check(Status, UpdatedData), Actions} - end - ). +handle_manual_resource_health_check(From, Data0 = #data{hc_workers = #{resource := HCWorkers}}) when + map_size(HCWorkers) > 0 +-> + %% ongoing health check + #data{hc_pending_callers = Pending0 = #{resource := RPending0}} = Data0, + Pending = Pending0#{resource := [From | RPending0]}, + Data = Data0#data{hc_pending_callers = Pending}, + {keep_state, Data}; +handle_manual_resource_health_check(From, Data0) -> + #data{hc_pending_callers = Pending0 = #{resource := RPending0}} = Data0, + Pending = Pending0#{resource := [From | RPending0]}, + Data = Data0#data{hc_pending_callers = Pending}, + start_resource_health_check(Data). -handle_manually_channel_health_check(From, #data{state = undefined}, _ChannelId) -> +reply_pending_health_check_callers(Status, resource, Data0 = #data{hc_pending_callers = Pending0}) -> + #{resource := RPending} = Pending0, + Actions = [{reply, From, {ok, Status}} || From <- RPending], + Data = Data0#data{hc_pending_callers = Pending0#{resource := []}}, + {Actions, Data}. + +start_resource_health_check(#data{state = undefined} = Data) -> + %% No resource running, thus disconnected. + %% A health check spawn when state is undefined can only happen when someone manually + %% asks for a health check and the resource could not initialize or has not had enough + %% time to do so. Let's assume the continuation is as if we were `?status_connecting'. + continue_resource_health_check_not_connected(?status_disconnected, Data); +start_resource_health_check(#data{hc_workers = #{resource := HCWorkers}}) when + map_size(HCWorkers) > 0 +-> + %% Already ongoing + keep_state_and_data; +start_resource_health_check(#data{} = Data0) -> + #data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0, + WorkerRef = {_Pid, _Ref} = spawn_health_check_worker(Data0), + HCWorkers = HCWorkers0#{resource := RHCWorkers0#{WorkerRef => true}}, + Data = Data0#data{hc_workers = HCWorkers}, + {keep_state, Data}. + +-spec spawn_health_check_worker(data()) -> {pid(), reference()}. +spawn_health_check_worker(#data{} = Data) -> + spawn_monitor(?MODULE, worker_resource_health_check, [Data]). + +%% separated so it can be spec'ed and placate dialyzer tantrums... +-spec worker_resource_health_check(data()) -> no_return(). +worker_resource_health_check(Data) -> + HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state), + exit({ok, HCRes}). + +handle_resource_health_check_worker_down(CurrentState, Data0, WorkerRef, ExitResult) -> + #data{hc_workers = HCWorkers0 = #{resource := RHCWorkers0}} = Data0, + HCWorkers = HCWorkers0#{resource := maps:remove(WorkerRef, RHCWorkers0)}, + Data1 = Data0#data{hc_workers = HCWorkers}, + case ExitResult of + {ok, HCRes} -> + continue_with_health_check(Data1, CurrentState, HCRes); + _ -> + %% Unexpected: `emqx_resource:call_health_check' catches all exceptions. + continue_with_health_check(Data1, CurrentState, {error, ExitResult}) + end. + +continue_with_health_check(#data{} = Data0, CurrentState, HCRes) -> + #data{ + id = ResId, + error = PrevError + } = Data0, + {NewStatus, NewState, Err} = parse_health_check_result(HCRes, Data0), + _ = maybe_alarm(NewStatus, ResId, Err, PrevError), + ok = maybe_resume_resource_workers(ResId, NewStatus), + Data1 = Data0#data{ + state = NewState, status = NewStatus, error = Err + }, + Data = update_state(Data1, Data0), + case CurrentState of + ?state_connected -> + continue_resource_health_check_connected(NewStatus, Data); + _ -> + %% `?state_connecting' | `?state_disconnected' | `?state_stopped' + continue_resource_health_check_not_connected(NewStatus, Data) + end. + +%% Continuation to be used when the current resource state is `?state_connected'. +continue_resource_health_check_connected(NewStatus, Data0) -> + case NewStatus of + ?status_connected -> + {Replies, Data1} = reply_pending_health_check_callers(NewStatus, resource, Data0), + Data2 = channels_health_check(?status_connected, Data1), + Data = update_state(Data2, Data0), + Actions = Replies ++ health_check_actions(Data), + {keep_state, Data, Actions}; + _ -> + ?SLOG(warning, #{ + msg => "health_check_failed", + id => Data0#data.id, + status => NewStatus + }), + %% Note: works because, coincidentally, channel/resource status is a + %% subset of resource manager state... But there should be a conversion + %% between the two here, as resource manager also has `stopped', which is + %% not a valid status at the time of writing. + {Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0), + {next_state, NewStatus, channels_health_check(NewStatus, Data), Replies} + end. + +%% Continuation to be used when the current resource state is not `?state_connected'. +continue_resource_health_check_not_connected(NewStatus, Data0) -> + {Replies, Data} = reply_pending_health_check_callers(NewStatus, resource, Data0), + case NewStatus of + ?status_connected -> + {next_state, ?state_connected, channels_health_check(?status_connected, Data), Replies}; + ?status_connecting -> + Actions = Replies ++ health_check_actions(Data), + {next_state, ?status_connecting, channels_health_check(?status_connecting, Data), + Actions}; + ?status_disconnected -> + {next_state, ?state_disconnected, channels_health_check(?status_disconnected, Data), + Replies} + end. + +handle_manual_channel_health_check(From, #data{state = undefined}, _ChannelId) -> {keep_state_and_data, [{reply, From, channel_status({error, resource_disconnected})}]}; -handle_manually_channel_health_check( +handle_manual_channel_health_check( From, #data{added_channels = Channels} = _Data, ChannelId @@ -854,7 +983,7 @@ handle_manually_channel_health_check( is_map_key(ChannelId, Channels) -> {keep_state_and_data, [{reply, From, maps:get(ChannelId, Channels)}]}; -handle_manually_channel_health_check( +handle_manual_channel_health_check( From, _Data, _ChannelId @@ -865,56 +994,6 @@ get_channel_status_channel_added(#data{id = ResId, mod = Mod, state = State}, Ch RawStatus = emqx_resource:call_channel_health_check(ResId, ChannelId, Mod, State), channel_status(RawStatus). -handle_connecting_health_check(Data) -> - with_health_check( - Data, - fun - (?status_connected, UpdatedData) -> - {next_state, ?state_connected, - channels_health_check(?status_connected, UpdatedData)}; - (?status_connecting, UpdatedData) -> - {keep_state, channels_health_check(?status_connecting, UpdatedData), - health_check_actions(UpdatedData)}; - (?status_disconnected, UpdatedData) -> - {next_state, ?state_disconnected, - channels_health_check(?status_disconnected, UpdatedData)} - end - ). - -handle_connected_health_check(Data) -> - with_health_check( - Data, - fun - (?status_connected, UpdatedData0) -> - UpdatedData1 = channels_health_check(?status_connected, UpdatedData0), - {keep_state, UpdatedData1, health_check_actions(UpdatedData1)}; - (Status, UpdatedData) -> - ?SLOG(warning, #{ - msg => "health_check_failed", - id => Data#data.id, - status => Status - }), - %% Note: works because, coincidentally, channel/resource status is a - %% subset of resource manager state... But there should be a conversion - %% between the two here, as resource manager also has `stopped', which is - %% not a valid status at the time of writing. - {next_state, Status, channels_health_check(Status, UpdatedData)} - end - ). - -with_health_check(#data{state = undefined} = Data, Func) -> - Func(disconnected, Data); -with_health_check(#data{error = PrevError} = Data, Func) -> - ResId = Data#data.id, - HCRes = emqx_resource:call_health_check(Data#data.id, Data#data.mod, Data#data.state), - {Status, NewState, Err} = parse_health_check_result(HCRes, Data), - _ = maybe_alarm(Status, ResId, Err, PrevError), - ok = maybe_resume_resource_workers(ResId, Status), - UpdatedData = Data#data{ - state = NewState, status = Status, error = Err - }, - Func(Status, update_state(UpdatedData, Data)). - -spec channels_health_check(resource_status(), data()) -> data(). channels_health_check(?status_connected = _ConnectorStatus, Data0) -> Channels = maps:to_list(Data0#data.added_channels), @@ -1097,9 +1176,15 @@ update_state(Data) -> update_state(DataWas, DataWas) -> DataWas; update_state(Data, _DataWas) -> - _ = insert_cache(Data#data.id, Data), + _ = insert_cache(Data#data.id, remove_runtime_data(Data)), Data. +remove_runtime_data(#data{} = Data0) -> + Data0#data{ + hc_workers = #{resource => #{}, channel => #{}}, + hc_pending_callers = #{resource => [], channel => []} + }. + health_check_interval(Opts) -> maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 93f6b661b..d1ac5c2e6 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -18,6 +18,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -behaviour(emqx_resource). @@ -276,15 +277,22 @@ batch_individual_reply({async, ReplyFunAndArgs}, InstId, Batch, State) -> on_get_status(_InstId, #{health_check_error := true}) -> ?tp(connector_demo_health_check_error, #{}), - disconnected; + ?status_disconnected; on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) -> ?tp(connector_demo_health_check_error, #{}), - {disconnected, State, Message}; + {?status_disconnected, State, Message}; +on_get_status(_InstId, #{pid := Pid, health_check_error := {delay, Delay}}) -> + ?tp(connector_demo_health_check_delay, #{}), + timer:sleep(Delay), + case is_process_alive(Pid) of + true -> ?status_connected; + false -> ?status_disconnected + end; on_get_status(_InstId, #{pid := Pid}) -> timer:sleep(300), case is_process_alive(Pid) of - true -> connected; - false -> disconnected + true -> ?status_connected; + false -> ?status_disconnected end. spawn_counter_process(Name, Register) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index fa9f7e7c9..a6cdaedb2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -52,12 +52,20 @@ end_per_testcase(_, _Config) -> init_per_suite(Config) -> code:ensure_loaded(?TEST_RESOURCE), - ok = emqx_common_test_helpers:start_apps([emqx_conf]), - {ok, _} = application:ensure_all_started(emqx_resource), - Config. + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_resource + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_conf]). +end_per_suite(Config) -> + Apps = proplists:get_value(apps, Config), + emqx_cth_suite:stop(Apps), + ok. %%------------------------------------------------------------------------------ %% Tests @@ -115,10 +123,7 @@ t_create_remove(_) -> ?assertNot(is_process_alive(Pid)) end, - fun(Trace) -> - ?assertEqual([], ?of_kind("inconsistent_status", Trace)), - ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) - end + [log_consistency_prop()] ). t_create_remove_local(_) -> @@ -174,10 +179,7 @@ t_create_remove_local(_) -> ?assertNot(is_process_alive(Pid)) end, - fun(Trace) -> - ?assertEqual([], ?of_kind("inconsistent_status", Trace)), - ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) - end + [log_consistency_prop()] ). t_do_not_start_after_created(_) -> @@ -219,10 +221,7 @@ t_do_not_start_after_created(_) -> ?assertNot(is_process_alive(Pid2)) end, - fun(Trace) -> - ?assertEqual([], ?of_kind("inconsistent_status", Trace)), - ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) - end + [log_consistency_prop()] ). t_query(_) -> @@ -855,14 +854,12 @@ t_healthy_timeout(_) -> ), ?assertEqual(ok, emqx_resource:remove_local(?ID)) end, - fun(Trace) -> - ?assertEqual([], ?of_kind("inconsistent_status", Trace)), - ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) - end + [log_consistency_prop()] ). t_healthy(_) -> ?check_trace( + #{timetrap => 10_000}, begin ?assertMatch( {ok, _}, @@ -873,10 +870,13 @@ t_healthy(_) -> #{name => test_resource} ) ), + ct:pal("getting state"), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), timer:sleep(300), + ct:pal("setting state as `connecting`"), emqx_resource:set_resource_status_connecting(?ID), + ct:pal("health check"), ?assertEqual({ok, connected}, emqx_resource:health_check(?ID)), ?assertMatch( [#{status := connected}], @@ -894,10 +894,7 @@ t_healthy(_) -> ?assertEqual(ok, emqx_resource:remove_local(?ID)) end, - fun(Trace) -> - ?assertEqual([], ?of_kind("inconsistent_status", Trace)), - ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) - end + [log_consistency_prop()] ). t_unhealthy_target(_) -> @@ -1005,11 +1002,7 @@ t_stop_start(_) -> ?assertEqual(ok, emqx_resource:stop(?ID)), ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)) end, - - fun(Trace) -> - ?assertEqual([], ?of_kind("inconsistent_status", Trace)), - ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) - end + [log_consistency_prop()] ). t_stop_start_local(_) -> @@ -1064,10 +1057,7 @@ t_stop_start_local(_) -> ?assert(is_process_alive(Pid1)) end, - fun(Trace) -> - ?assertEqual([], ?of_kind("inconsistent_status", Trace)), - ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) - end + [log_consistency_prop()] ). t_list_filter(_) -> @@ -1269,10 +1259,7 @@ t_health_check_disconnected(_) -> emqx_resource:health_check(?ID) ) end, - fun(Trace) -> - ?assertEqual([], ?of_kind("inconsistent_status", Trace)), - ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) - end + [log_consistency_prop()] ). t_unblock_only_required_buffer_workers(_) -> @@ -3116,6 +3103,44 @@ t_telemetry_handler_crash(_Config) -> ), ok. +t_non_blocking_resource_health_check(_Config) -> + ?check_trace( + begin + {ok, _} = + create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, health_check_error => {delay, 1_000}}, + #{health_check_interval => 100} + ), + %% concurrently attempt to health check the resource; should do it only once + %% for all callers + NumCallers = 20, + Expected = lists:duplicate(NumCallers, {ok, connected}), + ?assertEqual( + Expected, + emqx_utils:pmap( + fun(_) -> emqx_resource:health_check(?ID) end, + lists:seq(1, NumCallers) + ) + ), + + NumCallers + end, + [ + log_consistency_prop(), + fun(NumCallers, Trace) -> + %% shouldn't have one health check per caller + SubTrace = ?of_kind(connector_demo_health_check_delay, Trace), + ?assertMatch([_ | _], SubTrace), + ?assert(length(SubTrace) < (NumCallers div 2), #{trace => Trace}), + ok + end + ] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -3373,3 +3398,10 @@ create(Id, Group, Type, Config) -> create(Id, Group, Type, Config, Opts) -> emqx_resource:create_local(Id, Group, Type, Config, Opts). + +log_consistency_prop() -> + {"check state and cache consistency", fun ?MODULE:log_consistency_prop/1}. +log_consistency_prop(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_status", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)), + ok. diff --git a/apps/emqx_utils/src/emqx_utils_redact.erl b/apps/emqx_utils/src/emqx_utils_redact.erl index 4d3cc7f7b..c830048a9 100644 --- a/apps/emqx_utils/src/emqx_utils_redact.erl +++ b/apps/emqx_utils/src/emqx_utils_redact.erl @@ -65,8 +65,11 @@ redact(Term, Checker) -> redact_headers(Term) -> do_redact_headers(Term). -do_redact(L, Checker) when is_list(L) -> - lists:map(fun(E) -> do_redact(E, Checker) end, L); +do_redact([], _Checker) -> + []; +do_redact([X | Xs], Checker) -> + %% Note: we could be dealing with an improper list + [do_redact(X, Checker) | do_redact(Xs, Checker)]; do_redact(M, Checker) when is_map(M) -> maps:map( fun(K, V) -> @@ -252,6 +255,14 @@ redact2_test_() -> Keys = [secret, passcode], [{case_name(atom, Key), fun() -> Case(Key, Checker) end} || Key <- Keys]. +redact_improper_list_test_() -> + %% improper lists: check that we don't crash + %% may arise when we redact process states with pending `gen' requests + [ + ?_assertEqual([alias | foo], redact([alias | foo])), + ?_assertEqual([1, 2 | foo], redact([1, 2 | foo])) + ]. + deobfuscate_test() -> NewConf0 = #{foo => <<"bar0">>, password => <<"123456">>}, ?assertEqual(NewConf0, deobfuscate(NewConf0, #{foo => <<"bar">>, password => <<"654321">>})), diff --git a/changes/ce/fix-12812.en.md b/changes/ce/fix-12812.en.md new file mode 100644 index 000000000..f530c2060 --- /dev/null +++ b/changes/ce/fix-12812.en.md @@ -0,0 +1 @@ +Made resource health checks non-blocking operations. This means that operations such as updating or removing a resource won't be blocked by a lengthy running health check.