refactor(resman): work with state cache atomically
Also ensure that cache entries are always consistent with `Data`, so that most of the code could rely on reading the cached entry most of the time.
This commit is contained in:
parent
b3e7e51094
commit
e411c5d5f8
|
@ -18,6 +18,7 @@
|
|||
|
||||
-include("emqx_resource.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("snabbkaffe/include/trace.hrl").
|
||||
|
||||
% API
|
||||
-export([
|
||||
|
@ -303,26 +304,30 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
|
|||
query_mode = maps:get(query_mode, Opts, sync),
|
||||
config = Config,
|
||||
opts = Opts,
|
||||
status = connecting,
|
||||
state = undefined,
|
||||
error = undefined
|
||||
},
|
||||
gen_statem:start_link(?MODULE, {Data, Opts}, []).
|
||||
|
||||
init({Data, Opts}) ->
|
||||
init({DataIn, Opts}) ->
|
||||
process_flag(trap_exit, true),
|
||||
%% init the cache so that lookup/1 will always return something
|
||||
DataWithPid = Data#data{pid = self()},
|
||||
insert_cache(DataWithPid#data.id, DataWithPid#data.group, DataWithPid),
|
||||
Data = DataIn#data{pid = self()},
|
||||
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
|
||||
true -> {ok, connecting, DataWithPid, {next_event, internal, start_resource}};
|
||||
false -> {ok, stopped, DataWithPid}
|
||||
true ->
|
||||
%% init the cache so that lookup/1 will always return something
|
||||
UpdatedData = update_state(Data#data{status = connecting}),
|
||||
{ok, connecting, UpdatedData, {next_event, internal, start_resource}};
|
||||
false ->
|
||||
%% init the cache so that lookup/1 will always return something
|
||||
UpdatedData = update_state(Data#data{status = stopped}),
|
||||
{ok, stopped, UpdatedData}
|
||||
end.
|
||||
|
||||
terminate({shutdown, removed}, _State, _Data) ->
|
||||
ok;
|
||||
terminate(_Reason, _State, Data) ->
|
||||
_ = stop_resource(Data),
|
||||
_ = maybe_clear_alarm(Data#data.id),
|
||||
delete_cache(Data#data.id, Data#data.manager_id),
|
||||
_ = maybe_stop_resource(Data),
|
||||
ok = delete_cache(Data#data.id, Data#data.manager_id),
|
||||
ok.
|
||||
|
||||
%% Behavior callback
|
||||
|
@ -333,11 +338,12 @@ callback_mode() -> [handle_event_function, state_enter].
|
|||
|
||||
% Called during testing to force a specific state
|
||||
handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
|
||||
{next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]};
|
||||
UpdatedData = update_state(Data#data{status = connecting}, Data),
|
||||
{next_state, connecting, UpdatedData, [{reply, From, ok}]};
|
||||
% Called when the resource is to be restarted
|
||||
handle_event({call, From}, restart, _State, Data) ->
|
||||
_ = stop_resource(Data),
|
||||
start_resource(Data, From);
|
||||
DataNext = stop_resource(Data),
|
||||
start_resource(DataNext, From);
|
||||
% Called when the resource is to be started (also used for manual reconnect)
|
||||
handle_event({call, From}, start, State, Data) when
|
||||
State =:= stopped orelse
|
||||
|
@ -347,16 +353,14 @@ handle_event({call, From}, start, State, Data) when
|
|||
handle_event({call, From}, start, _State, _Data) ->
|
||||
{keep_state_and_data, [{reply, From, ok}]};
|
||||
% Called when the resource received a `quit` message
|
||||
handle_event(info, quit, stopped, _Data) ->
|
||||
{stop, {shutdown, quit}};
|
||||
handle_event(info, quit, _State, _Data) ->
|
||||
{stop, {shutdown, quit}};
|
||||
% Called when the resource is to be stopped
|
||||
handle_event({call, From}, stop, stopped, _Data) ->
|
||||
{keep_state_and_data, [{reply, From, ok}]};
|
||||
handle_event({call, From}, stop, _State, Data) ->
|
||||
Result = stop_resource(Data),
|
||||
{next_state, stopped, Data, [{reply, From, Result}]};
|
||||
UpdatedData = stop_resource(Data),
|
||||
{next_state, stopped, update_state(UpdatedData, Data), [{reply, From, ok}]};
|
||||
% Called when a resource is to be stopped and removed.
|
||||
handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
|
||||
handle_remove_event(From, ClearMetrics, Data);
|
||||
|
@ -371,11 +375,9 @@ handle_event({call, From}, health_check, stopped, _Data) ->
|
|||
handle_event({call, From}, health_check, _State, Data) ->
|
||||
handle_manually_health_check(From, Data);
|
||||
% State: CONNECTING
|
||||
handle_event(enter, _OldState, connecting, Data) ->
|
||||
UpdatedData = Data#data{status = connecting},
|
||||
insert_cache(Data#data.id, Data#data.group, Data),
|
||||
Actions = [{state_timeout, 0, health_check}],
|
||||
{keep_state, UpdatedData, Actions};
|
||||
handle_event(enter, _OldState, connecting = State, Data) ->
|
||||
ok = log_state_consistency(State, Data),
|
||||
{keep_state_and_data, [{state_timeout, 0, health_check}]};
|
||||
handle_event(internal, start_resource, connecting, Data) ->
|
||||
start_resource(Data, undefined);
|
||||
handle_event(state_timeout, health_check, connecting, Data) ->
|
||||
|
@ -383,27 +385,23 @@ handle_event(state_timeout, health_check, connecting, Data) ->
|
|||
%% State: CONNECTED
|
||||
%% The connected state is entered after a successful on_start/2 of the callback mod
|
||||
%% and successful health_checks
|
||||
handle_event(enter, _OldState, connected, Data) ->
|
||||
UpdatedData = Data#data{status = connected},
|
||||
insert_cache(Data#data.id, Data#data.group, UpdatedData),
|
||||
handle_event(enter, _OldState, connected = State, Data) ->
|
||||
ok = log_state_consistency(State, Data),
|
||||
_ = emqx_alarm:deactivate(Data#data.id),
|
||||
Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}],
|
||||
{next_state, connected, UpdatedData, Actions};
|
||||
{keep_state_and_data, health_check_actions(Data)};
|
||||
handle_event(state_timeout, health_check, connected, Data) ->
|
||||
handle_connected_health_check(Data);
|
||||
%% State: DISCONNECTED
|
||||
handle_event(enter, _OldState, disconnected, Data) ->
|
||||
UpdatedData = Data#data{status = disconnected},
|
||||
insert_cache(Data#data.id, Data#data.group, UpdatedData),
|
||||
handle_disconnected_state_enter(UpdatedData);
|
||||
handle_event(enter, _OldState, disconnected = State, Data) ->
|
||||
ok = log_state_consistency(State, Data),
|
||||
{keep_state_and_data, retry_actions(Data)};
|
||||
handle_event(state_timeout, auto_retry, disconnected, Data) ->
|
||||
start_resource(Data, undefined);
|
||||
%% State: STOPPED
|
||||
%% The stopped state is entered after the resource has been explicitly stopped
|
||||
handle_event(enter, _OldState, stopped, Data) ->
|
||||
UpdatedData = Data#data{status = stopped},
|
||||
insert_cache(Data#data.id, Data#data.group, UpdatedData),
|
||||
{next_state, stopped, UpdatedData};
|
||||
handle_event(enter, _OldState, stopped = State, Data) ->
|
||||
ok = log_state_consistency(State, Data),
|
||||
{keep_state_and_data, []};
|
||||
% Ignore all other events
|
||||
handle_event(EventType, EventData, State, Data) ->
|
||||
?SLOG(
|
||||
|
@ -418,6 +416,22 @@ handle_event(EventType, EventData, State, Data) ->
|
|||
),
|
||||
keep_state_and_data.
|
||||
|
||||
log_state_consistency(State, #data{status = State} = Data) ->
|
||||
log_cache_consistency(read_cache(Data#data.id), Data);
|
||||
log_state_consistency(State, Data) ->
|
||||
?tp(warning, "inconsistent_state", #{
|
||||
state => State,
|
||||
data => Data
|
||||
}).
|
||||
|
||||
log_cache_consistency({_, Data}, Data) ->
|
||||
ok;
|
||||
log_cache_consistency({_, DataCached}, Data) ->
|
||||
?tp(warning, "inconsistent_cache", #{
|
||||
cache => DataCached,
|
||||
data => Data
|
||||
}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -451,10 +465,12 @@ delete_cache(ResId, MgrId) ->
|
|||
end.
|
||||
|
||||
do_delete_cache(<<?TEST_ID_PREFIX, _/binary>> = ResId) ->
|
||||
ets:delete(?ETS_TABLE, {owner, ResId}),
|
||||
ets:delete(?ETS_TABLE, ResId);
|
||||
true = ets:delete(?ETS_TABLE, {owner, ResId}),
|
||||
true = ets:delete(?ETS_TABLE, ResId),
|
||||
ok;
|
||||
do_delete_cache(ResId) ->
|
||||
ets:delete(?ETS_TABLE, ResId).
|
||||
true = ets:delete(?ETS_TABLE, ResId),
|
||||
ok.
|
||||
|
||||
set_new_owner(ResId) ->
|
||||
MgrId = make_manager_id(ResId),
|
||||
|
@ -471,9 +487,6 @@ get_owner(ResId) ->
|
|||
[] -> not_found
|
||||
end.
|
||||
|
||||
handle_disconnected_state_enter(Data) ->
|
||||
{next_state, disconnected, Data, retry_actions(Data)}.
|
||||
|
||||
retry_actions(Data) ->
|
||||
case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
|
||||
undefined ->
|
||||
|
@ -482,24 +495,27 @@ retry_actions(Data) ->
|
|||
[{state_timeout, RetryInterval, auto_retry}]
|
||||
end.
|
||||
|
||||
health_check_actions(Data) ->
|
||||
[{state_timeout, health_check_interval(Data#data.opts), health_check}].
|
||||
|
||||
handle_remove_event(From, ClearMetrics, Data) ->
|
||||
stop_resource(Data),
|
||||
_ = stop_resource(Data),
|
||||
ok = delete_cache(Data#data.id, Data#data.manager_id),
|
||||
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
|
||||
case ClearMetrics of
|
||||
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
|
||||
false -> ok
|
||||
end,
|
||||
{stop_and_reply, normal, [{reply, From, ok}]}.
|
||||
{stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}.
|
||||
|
||||
start_resource(Data, From) ->
|
||||
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
|
||||
insert_cache(Data#data.id, Data#data.group, Data),
|
||||
case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of
|
||||
{ok, ResourceState} ->
|
||||
UpdatedData = Data#data{state = ResourceState, status = connecting},
|
||||
UpdatedData = Data#data{status = connecting, state = ResourceState},
|
||||
%% Perform an initial health_check immediately before transitioning into a connected state
|
||||
Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
|
||||
{next_state, connecting, UpdatedData, Actions};
|
||||
{next_state, connecting, update_state(UpdatedData, Data), Actions};
|
||||
{error, Reason} = Err ->
|
||||
?SLOG(warning, #{
|
||||
msg => start_resource_failed,
|
||||
|
@ -509,34 +525,42 @@ start_resource(Data, From) ->
|
|||
_ = maybe_alarm(disconnected, Data#data.id),
|
||||
%% Keep track of the error reason why the connection did not work
|
||||
%% so that the Reason can be returned when the verification call is made.
|
||||
UpdatedData = Data#data{error = Reason},
|
||||
UpdatedData = Data#data{status = disconnected, error = Reason},
|
||||
Actions = maybe_reply(retry_actions(UpdatedData), From, Err),
|
||||
{next_state, disconnected, UpdatedData, Actions}
|
||||
{next_state, disconnected, update_state(UpdatedData, Data), Actions}
|
||||
end.
|
||||
|
||||
stop_resource(#data{state = undefined, id = ResId} = _Data) ->
|
||||
_ = maybe_clear_alarm(ResId),
|
||||
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId),
|
||||
ok;
|
||||
stop_resource(Data) ->
|
||||
maybe_stop_resource(#data{status = Status} = Data) when Status /= stopped ->
|
||||
stop_resource(Data);
|
||||
maybe_stop_resource(#data{status = stopped} = Data) ->
|
||||
Data.
|
||||
|
||||
stop_resource(#data{state = ResState, id = ResId} = Data) ->
|
||||
%% We don't care the return value of the Mod:on_stop/2.
|
||||
%% The callback mod should make sure the resource is stopped after on_stop/2
|
||||
%% is returned.
|
||||
ResId = Data#data.id,
|
||||
_ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state),
|
||||
case ResState /= undefined of
|
||||
true ->
|
||||
emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, ResState);
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
_ = maybe_clear_alarm(ResId),
|
||||
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId),
|
||||
ok.
|
||||
Data#data{status = stopped}.
|
||||
|
||||
make_test_id() ->
|
||||
RandId = iolist_to_binary(emqx_misc:gen_id(16)),
|
||||
<<?TEST_ID_PREFIX, RandId/binary>>.
|
||||
|
||||
handle_manually_health_check(From, Data) ->
|
||||
with_health_check(Data, fun(Status, UpdatedData) ->
|
||||
with_health_check(
|
||||
Data,
|
||||
fun(Status, UpdatedData) ->
|
||||
Actions = [{reply, From, {ok, Status}}],
|
||||
{next_state, Status, UpdatedData, Actions}
|
||||
end).
|
||||
end
|
||||
).
|
||||
|
||||
handle_connecting_health_check(Data) ->
|
||||
with_health_check(
|
||||
|
@ -545,8 +569,7 @@ handle_connecting_health_check(Data) ->
|
|||
(connected, UpdatedData) ->
|
||||
{next_state, connected, UpdatedData};
|
||||
(connecting, UpdatedData) ->
|
||||
Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}],
|
||||
{keep_state, UpdatedData, Actions};
|
||||
{keep_state, UpdatedData, health_check_actions(UpdatedData)};
|
||||
(disconnected, UpdatedData) ->
|
||||
{next_state, disconnected, UpdatedData}
|
||||
end
|
||||
|
@ -557,8 +580,7 @@ handle_connected_health_check(Data) ->
|
|||
Data,
|
||||
fun
|
||||
(connected, UpdatedData) ->
|
||||
Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}],
|
||||
{keep_state, UpdatedData, Actions};
|
||||
{keep_state, UpdatedData, health_check_actions(UpdatedData)};
|
||||
(Status, UpdatedData) ->
|
||||
?SLOG(warning, #{
|
||||
msg => health_check_failed,
|
||||
|
@ -580,8 +602,16 @@ with_health_check(Data, Func) ->
|
|||
UpdatedData = Data#data{
|
||||
state = NewState, status = Status, error = Err
|
||||
},
|
||||
insert_cache(ResId, UpdatedData#data.group, UpdatedData),
|
||||
Func(Status, UpdatedData).
|
||||
Func(Status, update_state(UpdatedData, Data)).
|
||||
|
||||
update_state(Data) ->
|
||||
update_state(Data, undefined).
|
||||
|
||||
update_state(DataWas, DataWas) ->
|
||||
DataWas;
|
||||
update_state(Data, _DataWas) ->
|
||||
_ = insert_cache(Data#data.id, Data#data.group, Data),
|
||||
Data.
|
||||
|
||||
health_check_interval(Opts) ->
|
||||
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).
|
||||
|
|
|
@ -75,8 +75,7 @@ on_start(InstId, #{name := Name} = Opts) ->
|
|||
on_stop(_InstId, #{stop_error := true}) ->
|
||||
{error, stop_error};
|
||||
on_stop(_InstId, #{pid := Pid}) ->
|
||||
erlang:exit(Pid, shutdown),
|
||||
ok.
|
||||
stop_counter_process(Pid).
|
||||
|
||||
on_query(_InstId, get_state, State) ->
|
||||
{ok, State};
|
||||
|
@ -247,6 +246,15 @@ spawn_counter_process(Name, Register) ->
|
|||
true = maybe_register(Name, Pid, Register),
|
||||
Pid.
|
||||
|
||||
stop_counter_process(Pid) ->
|
||||
true = erlang:is_process_alive(Pid),
|
||||
true = erlang:exit(Pid, shutdown),
|
||||
receive
|
||||
{'EXIT', Pid, shutdown} -> ok
|
||||
after 5000 ->
|
||||
{error, timeout}
|
||||
end.
|
||||
|
||||
counter_loop() ->
|
||||
counter_loop(#{
|
||||
counter => 0,
|
||||
|
|
|
@ -72,48 +72,74 @@ t_check_config(_) ->
|
|||
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}).
|
||||
|
||||
t_create_remove(_) ->
|
||||
{error, _} = emqx_resource:check_and_create_local(
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch(
|
||||
{error, _},
|
||||
emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => test_resource}
|
||||
)
|
||||
),
|
||||
|
||||
{ok, _} = emqx_resource:create(
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource}
|
||||
)
|
||||
),
|
||||
|
||||
{ok, _} = emqx_resource:recreate(
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:recreate(
|
||||
?ID,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{}
|
||||
)
|
||||
),
|
||||
|
||||
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
|
||||
|
||||
?assert(is_process_alive(Pid)),
|
||||
|
||||
ok = emqx_resource:remove(?ID),
|
||||
{error, _} = emqx_resource:remove(?ID),
|
||||
?assertEqual(ok, emqx_resource:remove(?ID)),
|
||||
?assertMatch({error, _}, emqx_resource:remove(?ID)),
|
||||
|
||||
?assertNot(is_process_alive(Pid)).
|
||||
?assertNot(is_process_alive(Pid))
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
|
||||
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
|
||||
end
|
||||
).
|
||||
|
||||
t_create_remove_local(_) ->
|
||||
{error, _} = emqx_resource:check_and_create_local(
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch(
|
||||
{error, _},
|
||||
emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => test_resource}
|
||||
)
|
||||
),
|
||||
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource}
|
||||
)
|
||||
),
|
||||
|
||||
emqx_resource:recreate_local(
|
||||
|
@ -122,6 +148,7 @@ t_create_remove_local(_) ->
|
|||
#{name => test_resource},
|
||||
#{}
|
||||
),
|
||||
|
||||
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
|
||||
|
||||
?assert(is_process_alive(Pid)),
|
||||
|
@ -135,23 +162,34 @@ t_create_remove_local(_) ->
|
|||
#{}
|
||||
),
|
||||
|
||||
ok = emqx_resource:remove_local(?ID),
|
||||
{error, _} = emqx_resource:remove_local(?ID),
|
||||
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||
?assertMatch({error, _}, emqx_resource:remove_local(?ID)),
|
||||
|
||||
?assertMatch(
|
||||
?RESOURCE_ERROR(not_found),
|
||||
emqx_resource:query(?ID, get_state)
|
||||
),
|
||||
?assertNot(is_process_alive(Pid)).
|
||||
|
||||
?assertNot(is_process_alive(Pid))
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
|
||||
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
|
||||
end
|
||||
).
|
||||
|
||||
t_do_not_start_after_created(_) ->
|
||||
ct:pal("creating resource"),
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{start_after_created => false}
|
||||
)
|
||||
),
|
||||
%% the resource should remain `disconnected` after created
|
||||
timer:sleep(200),
|
||||
|
@ -165,22 +203,25 @@ t_do_not_start_after_created(_) ->
|
|||
),
|
||||
|
||||
%% start the resource manually..
|
||||
ct:pal("starting resource manually"),
|
||||
ok = emqx_resource:start(?ID),
|
||||
?assertEqual(ok, emqx_resource:start(?ID)),
|
||||
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
|
||||
?assert(is_process_alive(Pid)),
|
||||
|
||||
%% restart the resource
|
||||
ct:pal("restarting resource"),
|
||||
ok = emqx_resource:restart(?ID),
|
||||
?assertEqual(ok, emqx_resource:restart(?ID)),
|
||||
?assertNot(is_process_alive(Pid)),
|
||||
{ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state),
|
||||
?assert(is_process_alive(Pid2)),
|
||||
|
||||
ct:pal("removing resource"),
|
||||
ok = emqx_resource:remove_local(?ID),
|
||||
?assertEqual(ok, emqx_resource:remove_local(?ID)),
|
||||
|
||||
?assertNot(is_process_alive(Pid2)).
|
||||
?assertNot(is_process_alive(Pid2))
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
|
||||
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
|
||||
end
|
||||
).
|
||||
|
||||
t_query(_) ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
|
@ -771,33 +812,51 @@ t_query_counter_async_inflight_batch(_) ->
|
|||
ok = emqx_resource:remove_local(?ID).
|
||||
|
||||
t_healthy_timeout(_) ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => <<"bad_not_atom_name">>, register => true},
|
||||
%% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
|
||||
#{health_check_interval => 200}
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
{error, {resource_error, #{reason := timeout}}},
|
||||
emqx_resource:query(?ID, get_state, #{timeout => 1_000})
|
||||
),
|
||||
?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)),
|
||||
ok = emqx_resource:remove_local(?ID).
|
||||
?assertMatch(
|
||||
{ok, _Group, #{status := disconnected}}, emqx_resource_manager:lookup(?ID)
|
||||
),
|
||||
?assertEqual(ok, emqx_resource:remove_local(?ID))
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
|
||||
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
|
||||
end
|
||||
).
|
||||
|
||||
t_healthy(_) ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource}
|
||||
)
|
||||
),
|
||||
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
|
||||
timer:sleep(300),
|
||||
emqx_resource:set_resource_status_connecting(?ID),
|
||||
|
||||
{ok, connected} = emqx_resource:health_check(?ID),
|
||||
?assertEqual({ok, connected}, emqx_resource:health_check(?ID)),
|
||||
?assertMatch(
|
||||
[#{status := connected}],
|
||||
emqx_resource:list_instances_verbose()
|
||||
|
@ -812,21 +871,35 @@ t_healthy(_) ->
|
|||
emqx_resource:list_instances_verbose()
|
||||
),
|
||||
|
||||
ok = emqx_resource:remove_local(?ID).
|
||||
?assertEqual(ok, emqx_resource:remove_local(?ID))
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
|
||||
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
|
||||
end
|
||||
).
|
||||
|
||||
t_stop_start(_) ->
|
||||
{error, _} = emqx_resource:check_and_create(
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch(
|
||||
{error, _},
|
||||
emqx_resource:check_and_create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => test_resource}
|
||||
)
|
||||
),
|
||||
|
||||
{ok, _} = emqx_resource:check_and_create(
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:check_and_create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{<<"name">> => <<"test_resource">>}
|
||||
)
|
||||
),
|
||||
|
||||
%% add some metrics to test their persistence
|
||||
|
@ -836,11 +909,14 @@ t_stop_start(_) ->
|
|||
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3),
|
||||
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
|
||||
|
||||
{ok, _} = emqx_resource:check_and_recreate(
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:check_and_recreate(
|
||||
?ID,
|
||||
?TEST_RESOURCE,
|
||||
#{<<"name">> => <<"test_resource">>},
|
||||
#{}
|
||||
)
|
||||
),
|
||||
|
||||
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
|
||||
|
@ -861,7 +937,7 @@ t_stop_start(_) ->
|
|||
emqx_resource:query(?ID, get_state)
|
||||
),
|
||||
|
||||
ok = emqx_resource:restart(?ID),
|
||||
?assertEqual(ok, emqx_resource:restart(?ID)),
|
||||
timer:sleep(300),
|
||||
|
||||
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
|
||||
|
@ -873,38 +949,54 @@ t_stop_start(_) ->
|
|||
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
|
||||
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
|
||||
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
|
||||
ok = emqx_resource:stop(?ID),
|
||||
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
|
||||
?assertEqual(ok, emqx_resource:stop(?ID)),
|
||||
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID))
|
||||
end,
|
||||
|
||||
ok.
|
||||
fun(Trace) ->
|
||||
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
|
||||
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
|
||||
end
|
||||
).
|
||||
|
||||
t_stop_start_local(_) ->
|
||||
{error, _} = emqx_resource:check_and_create_local(
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch(
|
||||
{error, _},
|
||||
emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{unknown => test_resource}
|
||||
)
|
||||
),
|
||||
|
||||
{ok, _} = emqx_resource:check_and_create_local(
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:check_and_create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{<<"name">> => <<"test_resource">>}
|
||||
)
|
||||
),
|
||||
|
||||
{ok, _} = emqx_resource:check_and_recreate_local(
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:check_and_recreate_local(
|
||||
?ID,
|
||||
?TEST_RESOURCE,
|
||||
#{<<"name">> => <<"test_resource">>},
|
||||
#{}
|
||||
)
|
||||
),
|
||||
|
||||
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
|
||||
|
||||
?assert(is_process_alive(Pid0)),
|
||||
|
||||
ok = emqx_resource:stop(?ID),
|
||||
?assertEqual(ok, emqx_resource:stop(?ID)),
|
||||
|
||||
?assertNot(is_process_alive(Pid0)),
|
||||
|
||||
|
@ -913,11 +1005,17 @@ t_stop_start_local(_) ->
|
|||
emqx_resource:query(?ID, get_state)
|
||||
),
|
||||
|
||||
ok = emqx_resource:restart(?ID),
|
||||
?assertEqual(ok, emqx_resource:restart(?ID)),
|
||||
|
||||
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
|
||||
|
||||
?assert(is_process_alive(Pid1)).
|
||||
?assert(is_process_alive(Pid1))
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
|
||||
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
|
||||
end
|
||||
).
|
||||
|
||||
t_list_filter(_) ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
|
@ -1031,6 +1129,8 @@ t_auto_retry(_) ->
|
|||
?assertEqual(ok, Res).
|
||||
|
||||
t_health_check_disconnected(_) ->
|
||||
?check_trace(
|
||||
begin
|
||||
_ = emqx_resource:create_local(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
|
@ -1041,6 +1141,12 @@ t_health_check_disconnected(_) ->
|
|||
?assertEqual(
|
||||
{ok, disconnected},
|
||||
emqx_resource:health_check(?ID)
|
||||
)
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
|
||||
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
|
||||
end
|
||||
).
|
||||
|
||||
t_unblock_only_required_buffer_workers(_) ->
|
||||
|
|
Loading…
Reference in New Issue