From e411c5d5f8cb357e48ba9fa33ae713ec69f07019 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 Mar 2023 18:28:28 +0300 Subject: [PATCH] refactor(resman): work with state cache atomically Also ensure that cache entries are always consistent with `Data`, so that most of the code could rely on reading the cached entry most of the time. --- .../src/emqx_resource_manager.erl | 162 +++--- .../test/emqx_connector_demo.erl | 12 +- .../test/emqx_resource_SUITE.erl | 528 +++++++++++------- 3 files changed, 423 insertions(+), 279 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 05d100913..0983dff8d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -18,6 +18,7 @@ -include("emqx_resource.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). % API -export([ @@ -303,26 +304,30 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> query_mode = maps:get(query_mode, Opts, sync), config = Config, opts = Opts, - status = connecting, state = undefined, error = undefined }, gen_statem:start_link(?MODULE, {Data, Opts}, []). -init({Data, Opts}) -> +init({DataIn, Opts}) -> process_flag(trap_exit, true), - %% init the cache so that lookup/1 will always return something - DataWithPid = Data#data{pid = self()}, - insert_cache(DataWithPid#data.id, DataWithPid#data.group, DataWithPid), + Data = DataIn#data{pid = self()}, case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of - true -> {ok, connecting, DataWithPid, {next_event, internal, start_resource}}; - false -> {ok, stopped, DataWithPid} + true -> + %% init the cache so that lookup/1 will always return something + UpdatedData = update_state(Data#data{status = connecting}), + {ok, connecting, UpdatedData, {next_event, internal, start_resource}}; + false -> + %% init the cache so that lookup/1 will always return something + UpdatedData = update_state(Data#data{status = stopped}), + {ok, stopped, UpdatedData} end. +terminate({shutdown, removed}, _State, _Data) -> + ok; terminate(_Reason, _State, Data) -> - _ = stop_resource(Data), - _ = maybe_clear_alarm(Data#data.id), - delete_cache(Data#data.id, Data#data.manager_id), + _ = maybe_stop_resource(Data), + ok = delete_cache(Data#data.id, Data#data.manager_id), ok. %% Behavior callback @@ -333,11 +338,12 @@ callback_mode() -> [handle_event_function, state_enter]. % Called during testing to force a specific state handle_event({call, From}, set_resource_status_connecting, _State, Data) -> - {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]}; + UpdatedData = update_state(Data#data{status = connecting}, Data), + {next_state, connecting, UpdatedData, [{reply, From, ok}]}; % Called when the resource is to be restarted handle_event({call, From}, restart, _State, Data) -> - _ = stop_resource(Data), - start_resource(Data, From); + DataNext = stop_resource(Data), + start_resource(DataNext, From); % Called when the resource is to be started (also used for manual reconnect) handle_event({call, From}, start, State, Data) when State =:= stopped orelse @@ -347,16 +353,14 @@ handle_event({call, From}, start, State, Data) when handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; % Called when the resource received a `quit` message -handle_event(info, quit, stopped, _Data) -> - {stop, {shutdown, quit}}; handle_event(info, quit, _State, _Data) -> {stop, {shutdown, quit}}; % Called when the resource is to be stopped handle_event({call, From}, stop, stopped, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; handle_event({call, From}, stop, _State, Data) -> - Result = stop_resource(Data), - {next_state, stopped, Data, [{reply, From, Result}]}; + UpdatedData = stop_resource(Data), + {next_state, stopped, update_state(UpdatedData, Data), [{reply, From, ok}]}; % Called when a resource is to be stopped and removed. handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_remove_event(From, ClearMetrics, Data); @@ -371,11 +375,9 @@ handle_event({call, From}, health_check, stopped, _Data) -> handle_event({call, From}, health_check, _State, Data) -> handle_manually_health_check(From, Data); % State: CONNECTING -handle_event(enter, _OldState, connecting, Data) -> - UpdatedData = Data#data{status = connecting}, - insert_cache(Data#data.id, Data#data.group, Data), - Actions = [{state_timeout, 0, health_check}], - {keep_state, UpdatedData, Actions}; +handle_event(enter, _OldState, connecting = State, Data) -> + ok = log_state_consistency(State, Data), + {keep_state_and_data, [{state_timeout, 0, health_check}]}; handle_event(internal, start_resource, connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> @@ -383,27 +385,23 @@ handle_event(state_timeout, health_check, connecting, Data) -> %% State: CONNECTED %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks -handle_event(enter, _OldState, connected, Data) -> - UpdatedData = Data#data{status = connected}, - insert_cache(Data#data.id, Data#data.group, UpdatedData), +handle_event(enter, _OldState, connected = State, Data) -> + ok = log_state_consistency(State, Data), _ = emqx_alarm:deactivate(Data#data.id), - Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], - {next_state, connected, UpdatedData, Actions}; + {keep_state_and_data, health_check_actions(Data)}; handle_event(state_timeout, health_check, connected, Data) -> handle_connected_health_check(Data); %% State: DISCONNECTED -handle_event(enter, _OldState, disconnected, Data) -> - UpdatedData = Data#data{status = disconnected}, - insert_cache(Data#data.id, Data#data.group, UpdatedData), - handle_disconnected_state_enter(UpdatedData); +handle_event(enter, _OldState, disconnected = State, Data) -> + ok = log_state_consistency(State, Data), + {keep_state_and_data, retry_actions(Data)}; handle_event(state_timeout, auto_retry, disconnected, Data) -> start_resource(Data, undefined); %% State: STOPPED %% The stopped state is entered after the resource has been explicitly stopped -handle_event(enter, _OldState, stopped, Data) -> - UpdatedData = Data#data{status = stopped}, - insert_cache(Data#data.id, Data#data.group, UpdatedData), - {next_state, stopped, UpdatedData}; +handle_event(enter, _OldState, stopped = State, Data) -> + ok = log_state_consistency(State, Data), + {keep_state_and_data, []}; % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -418,6 +416,22 @@ handle_event(EventType, EventData, State, Data) -> ), keep_state_and_data. +log_state_consistency(State, #data{status = State} = Data) -> + log_cache_consistency(read_cache(Data#data.id), Data); +log_state_consistency(State, Data) -> + ?tp(warning, "inconsistent_state", #{ + state => State, + data => Data + }). + +log_cache_consistency({_, Data}, Data) -> + ok; +log_cache_consistency({_, DataCached}, Data) -> + ?tp(warning, "inconsistent_cache", #{ + cache => DataCached, + data => Data + }). + %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ @@ -451,10 +465,12 @@ delete_cache(ResId, MgrId) -> end. do_delete_cache(<> = ResId) -> - ets:delete(?ETS_TABLE, {owner, ResId}), - ets:delete(?ETS_TABLE, ResId); + true = ets:delete(?ETS_TABLE, {owner, ResId}), + true = ets:delete(?ETS_TABLE, ResId), + ok; do_delete_cache(ResId) -> - ets:delete(?ETS_TABLE, ResId). + true = ets:delete(?ETS_TABLE, ResId), + ok. set_new_owner(ResId) -> MgrId = make_manager_id(ResId), @@ -471,9 +487,6 @@ get_owner(ResId) -> [] -> not_found end. -handle_disconnected_state_enter(Data) -> - {next_state, disconnected, Data, retry_actions(Data)}. - retry_actions(Data) -> case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of undefined -> @@ -482,24 +495,27 @@ retry_actions(Data) -> [{state_timeout, RetryInterval, auto_retry}] end. +health_check_actions(Data) -> + [{state_timeout, health_check_interval(Data#data.opts), health_check}]. + handle_remove_event(From, ClearMetrics, Data) -> - stop_resource(Data), + _ = stop_resource(Data), + ok = delete_cache(Data#data.id, Data#data.manager_id), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, - {stop_and_reply, normal, [{reply, From, ok}]}. + {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}. start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache - insert_cache(Data#data.id, Data#data.group, Data), case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of {ok, ResourceState} -> - UpdatedData = Data#data{state = ResourceState, status = connecting}, + UpdatedData = Data#data{status = connecting, state = ResourceState}, %% Perform an initial health_check immediately before transitioning into a connected state Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), - {next_state, connecting, UpdatedData, Actions}; + {next_state, connecting, update_state(UpdatedData, Data), Actions}; {error, Reason} = Err -> ?SLOG(warning, #{ msg => start_resource_failed, @@ -509,34 +525,42 @@ start_resource(Data, From) -> _ = maybe_alarm(disconnected, Data#data.id), %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. - UpdatedData = Data#data{error = Reason}, + UpdatedData = Data#data{status = disconnected, error = Reason}, Actions = maybe_reply(retry_actions(UpdatedData), From, Err), - {next_state, disconnected, UpdatedData, Actions} + {next_state, disconnected, update_state(UpdatedData, Data), Actions} end. -stop_resource(#data{state = undefined, id = ResId} = _Data) -> - _ = maybe_clear_alarm(ResId), - ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), - ok; -stop_resource(Data) -> +maybe_stop_resource(#data{status = Status} = Data) when Status /= stopped -> + stop_resource(Data); +maybe_stop_resource(#data{status = stopped} = Data) -> + Data. + +stop_resource(#data{state = ResState, id = ResId} = Data) -> %% We don't care the return value of the Mod:on_stop/2. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. - ResId = Data#data.id, - _ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state), + case ResState /= undefined of + true -> + emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, ResState); + false -> + ok + end, _ = maybe_clear_alarm(ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), - ok. + Data#data{status = stopped}. make_test_id() -> RandId = iolist_to_binary(emqx_misc:gen_id(16)), <>. handle_manually_health_check(From, Data) -> - with_health_check(Data, fun(Status, UpdatedData) -> - Actions = [{reply, From, {ok, Status}}], - {next_state, Status, UpdatedData, Actions} - end). + with_health_check( + Data, + fun(Status, UpdatedData) -> + Actions = [{reply, From, {ok, Status}}], + {next_state, Status, UpdatedData, Actions} + end + ). handle_connecting_health_check(Data) -> with_health_check( @@ -545,8 +569,7 @@ handle_connecting_health_check(Data) -> (connected, UpdatedData) -> {next_state, connected, UpdatedData}; (connecting, UpdatedData) -> - Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], - {keep_state, UpdatedData, Actions}; + {keep_state, UpdatedData, health_check_actions(UpdatedData)}; (disconnected, UpdatedData) -> {next_state, disconnected, UpdatedData} end @@ -557,8 +580,7 @@ handle_connected_health_check(Data) -> Data, fun (connected, UpdatedData) -> - Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], - {keep_state, UpdatedData, Actions}; + {keep_state, UpdatedData, health_check_actions(UpdatedData)}; (Status, UpdatedData) -> ?SLOG(warning, #{ msg => health_check_failed, @@ -580,8 +602,16 @@ with_health_check(Data, Func) -> UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, - insert_cache(ResId, UpdatedData#data.group, UpdatedData), - Func(Status, UpdatedData). + Func(Status, update_state(UpdatedData, Data)). + +update_state(Data) -> + update_state(Data, undefined). + +update_state(DataWas, DataWas) -> + DataWas; +update_state(Data, _DataWas) -> + _ = insert_cache(Data#data.id, Data#data.group, Data), + Data. health_check_interval(Opts) -> maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index f41087b20..a863dbb78 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -75,8 +75,7 @@ on_start(InstId, #{name := Name} = Opts) -> on_stop(_InstId, #{stop_error := true}) -> {error, stop_error}; on_stop(_InstId, #{pid := Pid}) -> - erlang:exit(Pid, shutdown), - ok. + stop_counter_process(Pid). on_query(_InstId, get_state, State) -> {ok, State}; @@ -247,6 +246,15 @@ spawn_counter_process(Name, Register) -> true = maybe_register(Name, Pid, Register), Pid. +stop_counter_process(Pid) -> + true = erlang:is_process_alive(Pid), + true = erlang:exit(Pid, shutdown), + receive + {'EXIT', Pid, shutdown} -> ok + after 5000 -> + {error, timeout} + end. + counter_loop() -> counter_loop(#{ counter => 0, diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index af72e86f9..f6d2b7ab4 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -72,115 +72,156 @@ t_check_config(_) -> {error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}). t_create_remove(_) -> - {error, _} = emqx_resource:check_and_create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{unknown => test_resource} - ), + ?check_trace( + begin + ?assertMatch( + {error, _}, + emqx_resource:check_and_create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{unknown => test_resource} + ) + ), - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource} - ), + ?assertMatch( + {ok, _}, + emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource} + ) + ), - {ok, _} = emqx_resource:recreate( - ?ID, - ?TEST_RESOURCE, - #{name => test_resource}, - #{} - ), - {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), + ?assertMatch( + {ok, _}, + emqx_resource:recreate( + ?ID, + ?TEST_RESOURCE, + #{name => test_resource}, + #{} + ) + ), - ?assert(is_process_alive(Pid)), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), - ok = emqx_resource:remove(?ID), - {error, _} = emqx_resource:remove(?ID), + ?assert(is_process_alive(Pid)), - ?assertNot(is_process_alive(Pid)). + ?assertEqual(ok, emqx_resource:remove(?ID)), + ?assertMatch({error, _}, emqx_resource:remove(?ID)), + + ?assertNot(is_process_alive(Pid)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_create_remove_local(_) -> - {error, _} = emqx_resource:check_and_create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{unknown => test_resource} - ), + ?check_trace( + begin + ?assertMatch( + {error, _}, + emqx_resource:check_and_create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{unknown => test_resource} + ) + ), - {ok, _} = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource} - ), + ?assertMatch( + {ok, _}, + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource} + ) + ), - emqx_resource:recreate_local( - ?ID, - ?TEST_RESOURCE, - #{name => test_resource}, - #{} - ), - {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), + emqx_resource:recreate_local( + ?ID, + ?TEST_RESOURCE, + #{name => test_resource}, + #{} + ), - ?assert(is_process_alive(Pid)), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), - emqx_resource:set_resource_status_connecting(?ID), + ?assert(is_process_alive(Pid)), - emqx_resource:recreate_local( - ?ID, - ?TEST_RESOURCE, - #{name => test_resource}, - #{} - ), + emqx_resource:set_resource_status_connecting(?ID), - ok = emqx_resource:remove_local(?ID), - {error, _} = emqx_resource:remove_local(?ID), + emqx_resource:recreate_local( + ?ID, + ?TEST_RESOURCE, + #{name => test_resource}, + #{} + ), - ?assertMatch( - ?RESOURCE_ERROR(not_found), - emqx_resource:query(?ID, get_state) - ), - ?assertNot(is_process_alive(Pid)). + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + ?assertMatch({error, _}, emqx_resource:remove_local(?ID)), + + ?assertMatch( + ?RESOURCE_ERROR(not_found), + emqx_resource:query(?ID, get_state) + ), + + ?assertNot(is_process_alive(Pid)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_do_not_start_after_created(_) -> - ct:pal("creating resource"), - {ok, _} = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{start_after_created => false} - ), - %% the resource should remain `disconnected` after created - timer:sleep(200), - ?assertMatch( - ?RESOURCE_ERROR(stopped), - emqx_resource:query(?ID, get_state) - ), - ?assertMatch( - {ok, _, #{status := stopped}}, - emqx_resource:get_instance(?ID) - ), + ?check_trace( + begin + ?assertMatch( + {ok, _}, + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{start_after_created => false} + ) + ), + %% the resource should remain `disconnected` after created + timer:sleep(200), + ?assertMatch( + ?RESOURCE_ERROR(stopped), + emqx_resource:query(?ID, get_state) + ), + ?assertMatch( + {ok, _, #{status := stopped}}, + emqx_resource:get_instance(?ID) + ), - %% start the resource manually.. - ct:pal("starting resource manually"), - ok = emqx_resource:start(?ID), - {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid)), + %% start the resource manually.. + ?assertEqual(ok, emqx_resource:start(?ID)), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), + ?assert(is_process_alive(Pid)), - %% restart the resource - ct:pal("restarting resource"), - ok = emqx_resource:restart(?ID), - ?assertNot(is_process_alive(Pid)), - {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid2)), + %% restart the resource + ?assertEqual(ok, emqx_resource:restart(?ID)), + ?assertNot(is_process_alive(Pid)), + {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state), + ?assert(is_process_alive(Pid2)), - ct:pal("removing resource"), - ok = emqx_resource:remove_local(?ID), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), - ?assertNot(is_process_alive(Pid2)). + ?assertNot(is_process_alive(Pid2)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_query(_) -> {ok, _} = emqx_resource:create_local( @@ -771,153 +812,210 @@ t_query_counter_async_inflight_batch(_) -> ok = emqx_resource:remove_local(?ID). t_healthy_timeout(_) -> - {ok, _} = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => <<"bad_not_atom_name">>, register => true}, - %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later. - #{health_check_interval => 200} - ), - ?assertMatch( - {error, {resource_error, #{reason := timeout}}}, - emqx_resource:query(?ID, get_state, #{timeout => 1_000}) - ), - ?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)), - ok = emqx_resource:remove_local(?ID). + ?check_trace( + begin + ?assertMatch( + {ok, _}, + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => <<"bad_not_atom_name">>, register => true}, + %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later. + #{health_check_interval => 200} + ) + ), + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, + emqx_resource:query(?ID, get_state, #{timeout => 1_000}) + ), + ?assertMatch( + {ok, _Group, #{status := disconnected}}, emqx_resource_manager:lookup(?ID) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_healthy(_) -> - {ok, _} = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource} - ), - {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), - timer:sleep(300), - emqx_resource:set_resource_status_connecting(?ID), + ?check_trace( + begin + ?assertMatch( + {ok, _}, + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource} + ) + ), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), + timer:sleep(300), + emqx_resource:set_resource_status_connecting(?ID), - {ok, connected} = emqx_resource:health_check(?ID), - ?assertMatch( - [#{status := connected}], - emqx_resource:list_instances_verbose() - ), + ?assertEqual({ok, connected}, emqx_resource:health_check(?ID)), + ?assertMatch( + [#{status := connected}], + emqx_resource:list_instances_verbose() + ), - erlang:exit(Pid, shutdown), + erlang:exit(Pid, shutdown), - ?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)), + ?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)), - ?assertMatch( - [#{status := disconnected}], - emqx_resource:list_instances_verbose() - ), + ?assertMatch( + [#{status := disconnected}], + emqx_resource:list_instances_verbose() + ), - ok = emqx_resource:remove_local(?ID). + ?assertEqual(ok, emqx_resource:remove_local(?ID)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_stop_start(_) -> - {error, _} = emqx_resource:check_and_create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{unknown => test_resource} - ), + ?check_trace( + begin + ?assertMatch( + {error, _}, + emqx_resource:check_and_create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{unknown => test_resource} + ) + ), - {ok, _} = emqx_resource:check_and_create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{<<"name">> => <<"test_resource">>} - ), + ?assertMatch( + {ok, _}, + emqx_resource:check_and_create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{<<"name">> => <<"test_resource">>} + ) + ), - %% add some metrics to test their persistence - WorkerID0 = <<"worker:0">>, - WorkerID1 = <<"worker:1">>, - emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2), - emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3), - ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), + %% add some metrics to test their persistence + WorkerID0 = <<"worker:0">>, + WorkerID1 = <<"worker:1">>, + emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2), + emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3), + ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), - {ok, _} = emqx_resource:check_and_recreate( - ?ID, - ?TEST_RESOURCE, - #{<<"name">> => <<"test_resource">>}, - #{} - ), + ?assertMatch( + {ok, _}, + emqx_resource:check_and_recreate( + ?ID, + ?TEST_RESOURCE, + #{<<"name">> => <<"test_resource">>}, + #{} + ) + ), - {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid0)), + ?assert(is_process_alive(Pid0)), - %% metrics are reset when recreating - %% depending on timing, might show the request we just did. - ct:sleep(500), - ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), + %% metrics are reset when recreating + %% depending on timing, might show the request we just did. + ct:sleep(500), + ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), - ok = emqx_resource:stop(?ID), + ok = emqx_resource:stop(?ID), - ?assertNot(is_process_alive(Pid0)), + ?assertNot(is_process_alive(Pid0)), - ?assertMatch( - ?RESOURCE_ERROR(stopped), - emqx_resource:query(?ID, get_state) - ), + ?assertMatch( + ?RESOURCE_ERROR(stopped), + emqx_resource:query(?ID, get_state) + ), - ok = emqx_resource:restart(?ID), - timer:sleep(300), + ?assertEqual(ok, emqx_resource:restart(?ID)), + timer:sleep(300), - {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid1)), + ?assert(is_process_alive(Pid1)), - %% now stop while resetting the metrics - ct:sleep(500), - emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1), - emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4), - ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), - ok = emqx_resource:stop(?ID), - ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), + %% now stop while resetting the metrics + ct:sleep(500), + emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1), + emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4), + ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), + ?assertEqual(ok, emqx_resource:stop(?ID)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)) + end, - ok. + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_stop_start_local(_) -> - {error, _} = emqx_resource:check_and_create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{unknown => test_resource} - ), + ?check_trace( + begin + ?assertMatch( + {error, _}, + emqx_resource:check_and_create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{unknown => test_resource} + ) + ), - {ok, _} = emqx_resource:check_and_create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{<<"name">> => <<"test_resource">>} - ), + ?assertMatch( + {ok, _}, + emqx_resource:check_and_create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{<<"name">> => <<"test_resource">>} + ) + ), - {ok, _} = emqx_resource:check_and_recreate_local( - ?ID, - ?TEST_RESOURCE, - #{<<"name">> => <<"test_resource">>}, - #{} - ), + ?assertMatch( + {ok, _}, + emqx_resource:check_and_recreate_local( + ?ID, + ?TEST_RESOURCE, + #{<<"name">> => <<"test_resource">>}, + #{} + ) + ), - {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid0)), + ?assert(is_process_alive(Pid0)), - ok = emqx_resource:stop(?ID), + ?assertEqual(ok, emqx_resource:stop(?ID)), - ?assertNot(is_process_alive(Pid0)), + ?assertNot(is_process_alive(Pid0)), - ?assertMatch( - ?RESOURCE_ERROR(stopped), - emqx_resource:query(?ID, get_state) - ), + ?assertMatch( + ?RESOURCE_ERROR(stopped), + emqx_resource:query(?ID, get_state) + ), - ok = emqx_resource:restart(?ID), + ?assertEqual(ok, emqx_resource:restart(?ID)), - {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid1)). + ?assert(is_process_alive(Pid1)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_list_filter(_) -> {ok, _} = emqx_resource:create_local( @@ -1031,16 +1129,24 @@ t_auto_retry(_) -> ?assertEqual(ok, Res). t_health_check_disconnected(_) -> - _ = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource, create_error => true}, - #{auto_retry_interval => 100} - ), - ?assertEqual( - {ok, disconnected}, - emqx_resource:health_check(?ID) + ?check_trace( + begin + _ = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, create_error => true}, + #{auto_retry_interval => 100} + ), + ?assertEqual( + {ok, disconnected}, + emqx_resource:health_check(?ID) + ) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end ). t_unblock_only_required_buffer_workers(_) ->