Merge pull request #10123 from fix/EMQX-9136/lookup-busy-resources

perf(bridge-api): ask nodes' bridge listings in parallel
This commit is contained in:
Andrew Mayorov 2023-03-16 00:07:12 +03:00 committed by GitHub
commit 816667d85c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 616 additions and 332 deletions

View File

@ -4,6 +4,7 @@
{emqx_authz,1}. {emqx_authz,1}.
{emqx_bridge,1}. {emqx_bridge,1}.
{emqx_bridge,2}. {emqx_bridge,2}.
{emqx_bridge,3}.
{emqx_broker,1}. {emqx_broker,1}.
{emqx_cm,1}. {emqx_cm,1}.
{emqx_conf,1}. {emqx_conf,1}.

View File

@ -226,21 +226,21 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) ->
Result. Result.
list() -> list() ->
lists:foldl( maps:fold(
fun({Type, NameAndConf}, Bridges) -> fun(Type, NameAndConf, Bridges) ->
lists:foldl( maps:fold(
fun({Name, RawConf}, Acc) -> fun(Name, RawConf, Acc) ->
case lookup(Type, Name, RawConf) of case lookup(Type, Name, RawConf) of
{error, not_found} -> Acc; {error, not_found} -> Acc;
{ok, Res} -> [Res | Acc] {ok, Res} -> [Res | Acc]
end end
end, end,
Bridges, Bridges,
maps:to_list(NameAndConf) NameAndConf
) )
end, end,
[], [],
maps:to_list(emqx:get_raw_config([bridges], #{})) emqx:get_raw_config([bridges], #{})
). ).
lookup(Id) -> lookup(Id) ->

View File

@ -487,11 +487,18 @@ schema("/bridges_probe") ->
lookup_from_all_nodes(BridgeType, BridgeName, 201) lookup_from_all_nodes(BridgeType, BridgeName, 201)
end; end;
'/bridges'(get, _Params) -> '/bridges'(get, _Params) ->
{200, Nodes = mria:running_nodes(),
zip_bridges([ NodeReplies = emqx_bridge_proto_v3:list_bridges_on_nodes(Nodes),
[format_resp(Data, Node) || Data <- emqx_bridge_proto_v1:list_bridges(Node)] case is_ok(NodeReplies) of
|| Node <- mria:running_nodes() {ok, NodeBridges} ->
])}. AllBridges = [
format_resource(Data, Node)
|| {Node, Bridges} <- lists:zip(Nodes, NodeBridges), Data <- Bridges
],
{200, zip_bridges([AllBridges])};
{error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)}
end.
'/bridges/:id'(get, #{bindings := #{id := Id}}) -> '/bridges/:id'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200));
@ -589,7 +596,7 @@ lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) -> do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of case is_ok(emqx_bridge_proto_v3:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
{ok, [{ok, _} | _] = Results} -> {ok, [{ok, _} | _] = Results} ->
{SuccCode, FormatFun([R || {ok, R} <- Results])}; {SuccCode, FormatFun([R || {ok, R} <- Results])};
{ok, [{error, not_found} | _]} -> {ok, [{error, not_found} | _]} ->
@ -600,7 +607,7 @@ do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
lookup_from_local_node(BridgeType, BridgeName) -> lookup_from_local_node(BridgeType, BridgeName) ->
case emqx_bridge:lookup(BridgeType, BridgeName) of case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, Res} -> {ok, format_resp(Res)}; {ok, Res} -> {ok, format_resource(Res, node())};
Error -> Error Error -> Error
end. end.
@ -809,10 +816,7 @@ aggregate_metrics(
aggregate_metrics(#{}, Metrics) -> aggregate_metrics(#{}, Metrics) ->
Metrics. Metrics.
format_resp(Data) -> format_resource(
format_resp(Data, node()).
format_resp(
#{ #{
type := Type, type := Type,
name := BridgeName, name := BridgeName,
@ -988,7 +992,7 @@ do_bpapi_call(Node, Call, Args) ->
do_bpapi_call_vsn(SupportedVersion, Call, Args) -> do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
case lists:member(SupportedVersion, supported_versions(Call)) of case lists:member(SupportedVersion, supported_versions(Call)) of
true -> true ->
apply(emqx_bridge_proto_v2, Call, Args); apply(emqx_bridge_proto_v3, Call, Args);
false -> false ->
{error, not_implemented} {error, not_implemented}
end. end.
@ -998,9 +1002,9 @@ maybe_unwrap({error, not_implemented}) ->
maybe_unwrap(RpcMulticallResult) -> maybe_unwrap(RpcMulticallResult) ->
emqx_rpc:unwrap_erpc(RpcMulticallResult). emqx_rpc:unwrap_erpc(RpcMulticallResult).
supported_versions(start_bridge_to_node) -> [2]; supported_versions(start_bridge_to_node) -> [2, 3];
supported_versions(start_bridges_to_all_nodes) -> [2]; supported_versions(start_bridges_to_all_nodes) -> [2, 3];
supported_versions(_Call) -> [1, 2]. supported_versions(_Call) -> [1, 2, 3].
to_hr_reason(nxdomain) -> to_hr_reason(nxdomain) ->
<<"Host not found">>; <<"Host not found">>;

View File

@ -20,6 +20,7 @@
-export([ -export([
introduced_in/0, introduced_in/0,
deprecated_since/0,
list_bridges/1, list_bridges/1,
restart_bridge_to_node/3, restart_bridge_to_node/3,
@ -38,6 +39,9 @@
introduced_in() -> introduced_in() ->
"5.0.17". "5.0.17".
deprecated_since() ->
"5.0.21".
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). -spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
list_bridges(Node) -> list_bridges(Node) ->
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).

View File

@ -0,0 +1,128 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_proto_v3).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
list_bridges/1,
list_bridges_on_nodes/1,
restart_bridge_to_node/3,
start_bridge_to_node/3,
stop_bridge_to_node/3,
lookup_from_all_nodes/3,
restart_bridges_to_all_nodes/3,
start_bridges_to_all_nodes/3,
stop_bridges_to_all_nodes/3
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 15000).
introduced_in() ->
"5.0.21".
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
list_bridges(Node) ->
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).
-spec list_bridges_on_nodes([node()]) ->
emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
list_bridges_on_nodes(Nodes) ->
erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT).
-type key() :: atom() | binary() | [byte()].
-spec restart_bridge_to_node(node(), key(), key()) ->
term().
restart_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridge_to_node(node(), key(), key()) ->
term().
start_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridge_to_node(node(), key(), key()) ->
term().
stop_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec lookup_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_api,
lookup_from_local_node,
[BridgeType, BridgeName],
?TIMEOUT
).

View File

@ -19,7 +19,6 @@
-compile(export_all). -compile(export_all).
-import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). -import(emqx_dashboard_api_test_helpers, [request/4, uri/1]).
-import(emqx_common_test_helpers, [on_exit/1]).
-include("emqx/include/emqx.hrl"). -include("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
@ -164,9 +163,9 @@ init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
ok = snabbkaffe:start_trace(), ok = snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_, _Config) -> end_per_testcase(_, _Config) ->
clear_resources(), clear_resources(),
emqx_common_test_helpers:call_janitor(),
snabbkaffe:stop(), snabbkaffe:stop(),
ok. ok.
@ -710,13 +709,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) ->
} }
), ),
on_exit(fun() ->
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok
end),
%% we now test if the bridge works as expected %% we now test if the bridge works as expected
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>, LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>, RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,
@ -827,13 +819,6 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) ->
} }
), ),
on_exit(fun() ->
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok
end),
Self = self(), Self = self(),
LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>, LocalTopic = <<?EGRESS_LOCAL_TOPIC, "/1">>,
RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>, RemoteTopic = <<?EGRESS_REMOTE_TOPIC, "/", LocalTopic/binary>>,

View File

@ -112,6 +112,8 @@
-export([apply_reply_fun/2]). -export([apply_reply_fun/2]).
-export_type([resource_data/0]).
-optional_callbacks([ -optional_callbacks([
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
@ -258,7 +260,7 @@ query(ResId, Request) ->
-spec query(resource_id(), Request :: term(), query_opts()) -> -spec query(resource_id(), Request :: term(), query_opts()) ->
Result :: term(). Result :: term().
query(ResId, Request, Opts) -> query(ResId, Request, Opts) ->
case emqx_resource_manager:ets_lookup(ResId) of case emqx_resource_manager:lookup_cached(ResId) of
{ok, _Group, #{query_mode := QM, mod := Module}} -> {ok, _Group, #{query_mode := QM, mod := Module}} ->
IsBufferSupported = is_buffer_supported(Module), IsBufferSupported = is_buffer_supported(Module),
case {IsBufferSupported, QM} of case {IsBufferSupported, QM} of
@ -309,7 +311,7 @@ set_resource_status_connecting(ResId) ->
-spec get_instance(resource_id()) -> -spec get_instance(resource_id()) ->
{ok, resource_group(), resource_data()} | {error, Reason :: term()}. {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
get_instance(ResId) -> get_instance(ResId) ->
emqx_resource_manager:lookup(ResId). emqx_resource_manager:lookup_cached(ResId, [metrics]).
-spec fetch_creation_opts(map()) -> creation_opts(). -spec fetch_creation_opts(map()) -> creation_opts().
fetch_creation_opts(Opts) -> fetch_creation_opts(Opts) ->

View File

@ -885,7 +885,7 @@ handle_async_worker_down(Data0, Pid) ->
call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> call_query(QM0, Id, Index, Ref, Query, QueryOpts) ->
?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}), ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}),
case emqx_resource_manager:ets_lookup(Id) of case emqx_resource_manager:lookup_cached(Id) of
{ok, _Group, #{status := stopped}} -> {ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled"); ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, Resource} -> {ok, _Group, Resource} ->

View File

@ -18,6 +18,7 @@
-include("emqx_resource.hrl"). -include("emqx_resource.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
% API % API
-export([ -export([
@ -35,7 +36,8 @@
lookup/1, lookup/1,
list_all/0, list_all/0,
list_group/1, list_group/1,
ets_lookup/1, lookup_cached/1,
lookup_cached/2,
get_metrics/1, get_metrics/1,
reset_metrics/1 reset_metrics/1
]). ]).
@ -229,14 +231,25 @@ set_resource_status_connecting(ResId) ->
-spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
lookup(ResId) -> lookup(ResId) ->
case safe_call(ResId, lookup, ?T_LOOKUP) of case safe_call(ResId, lookup, ?T_LOOKUP) of
{error, timeout} -> ets_lookup(ResId); {error, timeout} -> lookup_cached(ResId, [metrics]);
Result -> Result Result -> Result
end. end.
%% @doc Lookup the group and data of a resource %% @doc Lookup the group and data of a resource from the cache
-spec ets_lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
ets_lookup(ResId) -> lookup_cached(ResId) ->
lookup_cached(ResId, []).
%% @doc Lookup the group and data of a resource from the cache
-spec lookup_cached(resource_id(), [Option]) ->
{ok, resource_group(), resource_data()} | {error, not_found}
when
Option :: metrics.
lookup_cached(ResId, Options) ->
NeedMetrics = lists:member(metrics, Options),
case read_cache(ResId) of case read_cache(ResId) of
{Group, Data} when NeedMetrics ->
{ok, Group, data_record_to_external_map_with_metrics(Data)};
{Group, Data} -> {Group, Data} ->
{ok, Group, data_record_to_external_map(Data)}; {ok, Group, data_record_to_external_map(Data)};
not_found -> not_found ->
@ -253,7 +266,7 @@ reset_metrics(ResId) ->
emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId). emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId).
%% @doc Returns the data for all resources %% @doc Returns the data for all resources
-spec list_all() -> [resource_data()] | []. -spec list_all() -> [resource_data()].
list_all() -> list_all() ->
try try
[ [
@ -291,26 +304,30 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
query_mode = maps:get(query_mode, Opts, sync), query_mode = maps:get(query_mode, Opts, sync),
config = Config, config = Config,
opts = Opts, opts = Opts,
status = connecting,
state = undefined, state = undefined,
error = undefined error = undefined
}, },
gen_statem:start_link(?MODULE, {Data, Opts}, []). gen_statem:start_link(?MODULE, {Data, Opts}, []).
init({Data, Opts}) -> init({DataIn, Opts}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
%% init the cache so that lookup/1 will always return something Data = DataIn#data{pid = self()},
DataWithPid = Data#data{pid = self()},
insert_cache(DataWithPid#data.id, DataWithPid#data.group, DataWithPid),
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
true -> {ok, connecting, DataWithPid, {next_event, internal, start_resource}}; true ->
false -> {ok, stopped, DataWithPid} %% init the cache so that lookup/1 will always return something
UpdatedData = update_state(Data#data{status = connecting}),
{ok, connecting, UpdatedData, {next_event, internal, start_resource}};
false ->
%% init the cache so that lookup/1 will always return something
UpdatedData = update_state(Data#data{status = stopped}),
{ok, stopped, UpdatedData}
end. end.
terminate({shutdown, removed}, _State, _Data) ->
ok;
terminate(_Reason, _State, Data) -> terminate(_Reason, _State, Data) ->
_ = stop_resource(Data), _ = maybe_stop_resource(Data),
_ = maybe_clear_alarm(Data#data.id), ok = delete_cache(Data#data.id, Data#data.manager_id),
delete_cache(Data#data.id, Data#data.manager_id),
ok. ok.
%% Behavior callback %% Behavior callback
@ -321,11 +338,12 @@ callback_mode() -> [handle_event_function, state_enter].
% Called during testing to force a specific state % Called during testing to force a specific state
handle_event({call, From}, set_resource_status_connecting, _State, Data) -> handle_event({call, From}, set_resource_status_connecting, _State, Data) ->
{next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]}; UpdatedData = update_state(Data#data{status = connecting}, Data),
{next_state, connecting, UpdatedData, [{reply, From, ok}]};
% Called when the resource is to be restarted % Called when the resource is to be restarted
handle_event({call, From}, restart, _State, Data) -> handle_event({call, From}, restart, _State, Data) ->
_ = stop_resource(Data), DataNext = stop_resource(Data),
start_resource(Data, From); start_resource(DataNext, From);
% Called when the resource is to be started (also used for manual reconnect) % Called when the resource is to be started (also used for manual reconnect)
handle_event({call, From}, start, State, Data) when handle_event({call, From}, start, State, Data) when
State =:= stopped orelse State =:= stopped orelse
@ -335,16 +353,14 @@ handle_event({call, From}, start, State, Data) when
handle_event({call, From}, start, _State, _Data) -> handle_event({call, From}, start, _State, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
% Called when the resource received a `quit` message % Called when the resource received a `quit` message
handle_event(info, quit, stopped, _Data) ->
{stop, {shutdown, quit}};
handle_event(info, quit, _State, _Data) -> handle_event(info, quit, _State, _Data) ->
{stop, {shutdown, quit}}; {stop, {shutdown, quit}};
% Called when the resource is to be stopped % Called when the resource is to be stopped
handle_event({call, From}, stop, stopped, _Data) -> handle_event({call, From}, stop, stopped, _Data) ->
{keep_state_and_data, [{reply, From, ok}]}; {keep_state_and_data, [{reply, From, ok}]};
handle_event({call, From}, stop, _State, Data) -> handle_event({call, From}, stop, _State, Data) ->
Result = stop_resource(Data), UpdatedData = stop_resource(Data),
{next_state, stopped, Data, [{reply, From, Result}]}; {next_state, stopped, update_state(UpdatedData, Data), [{reply, From, ok}]};
% Called when a resource is to be stopped and removed. % Called when a resource is to be stopped and removed.
handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
handle_remove_event(From, ClearMetrics, Data); handle_remove_event(From, ClearMetrics, Data);
@ -359,11 +375,9 @@ handle_event({call, From}, health_check, stopped, _Data) ->
handle_event({call, From}, health_check, _State, Data) -> handle_event({call, From}, health_check, _State, Data) ->
handle_manually_health_check(From, Data); handle_manually_health_check(From, Data);
% State: CONNECTING % State: CONNECTING
handle_event(enter, _OldState, connecting, Data) -> handle_event(enter, _OldState, connecting = State, Data) ->
UpdatedData = Data#data{status = connecting}, ok = log_state_consistency(State, Data),
insert_cache(Data#data.id, Data#data.group, Data), {keep_state_and_data, [{state_timeout, 0, health_check}]};
Actions = [{state_timeout, 0, health_check}],
{keep_state, UpdatedData, Actions};
handle_event(internal, start_resource, connecting, Data) -> handle_event(internal, start_resource, connecting, Data) ->
start_resource(Data, undefined); start_resource(Data, undefined);
handle_event(state_timeout, health_check, connecting, Data) -> handle_event(state_timeout, health_check, connecting, Data) ->
@ -371,27 +385,23 @@ handle_event(state_timeout, health_check, connecting, Data) ->
%% State: CONNECTED %% State: CONNECTED
%% The connected state is entered after a successful on_start/2 of the callback mod %% The connected state is entered after a successful on_start/2 of the callback mod
%% and successful health_checks %% and successful health_checks
handle_event(enter, _OldState, connected, Data) -> handle_event(enter, _OldState, connected = State, Data) ->
UpdatedData = Data#data{status = connected}, ok = log_state_consistency(State, Data),
insert_cache(Data#data.id, Data#data.group, UpdatedData),
_ = emqx_alarm:deactivate(Data#data.id), _ = emqx_alarm:deactivate(Data#data.id),
Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {keep_state_and_data, health_check_actions(Data)};
{next_state, connected, UpdatedData, Actions};
handle_event(state_timeout, health_check, connected, Data) -> handle_event(state_timeout, health_check, connected, Data) ->
handle_connected_health_check(Data); handle_connected_health_check(Data);
%% State: DISCONNECTED %% State: DISCONNECTED
handle_event(enter, _OldState, disconnected, Data) -> handle_event(enter, _OldState, disconnected = State, Data) ->
UpdatedData = Data#data{status = disconnected}, ok = log_state_consistency(State, Data),
insert_cache(Data#data.id, Data#data.group, UpdatedData), {keep_state_and_data, retry_actions(Data)};
handle_disconnected_state_enter(UpdatedData);
handle_event(state_timeout, auto_retry, disconnected, Data) -> handle_event(state_timeout, auto_retry, disconnected, Data) ->
start_resource(Data, undefined); start_resource(Data, undefined);
%% State: STOPPED %% State: STOPPED
%% The stopped state is entered after the resource has been explicitly stopped %% The stopped state is entered after the resource has been explicitly stopped
handle_event(enter, _OldState, stopped, Data) -> handle_event(enter, _OldState, stopped = State, Data) ->
UpdatedData = Data#data{status = stopped}, ok = log_state_consistency(State, Data),
insert_cache(Data#data.id, Data#data.group, UpdatedData), {keep_state_and_data, []};
{next_state, stopped, UpdatedData};
% Ignore all other events % Ignore all other events
handle_event(EventType, EventData, State, Data) -> handle_event(EventType, EventData, State, Data) ->
?SLOG( ?SLOG(
@ -406,6 +416,22 @@ handle_event(EventType, EventData, State, Data) ->
), ),
keep_state_and_data. keep_state_and_data.
log_state_consistency(State, #data{status = State} = Data) ->
log_cache_consistency(read_cache(Data#data.id), Data);
log_state_consistency(State, Data) ->
?tp(warning, "inconsistent_state", #{
state => State,
data => Data
}).
log_cache_consistency({_, Data}, Data) ->
ok;
log_cache_consistency({_, DataCached}, Data) ->
?tp(warning, "inconsistent_cache", #{
cache => DataCached,
data => Data
}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% internal functions %% internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -439,10 +465,12 @@ delete_cache(ResId, MgrId) ->
end. end.
do_delete_cache(<<?TEST_ID_PREFIX, _/binary>> = ResId) -> do_delete_cache(<<?TEST_ID_PREFIX, _/binary>> = ResId) ->
ets:delete(?ETS_TABLE, {owner, ResId}), true = ets:delete(?ETS_TABLE, {owner, ResId}),
ets:delete(?ETS_TABLE, ResId); true = ets:delete(?ETS_TABLE, ResId),
ok;
do_delete_cache(ResId) -> do_delete_cache(ResId) ->
ets:delete(?ETS_TABLE, ResId). true = ets:delete(?ETS_TABLE, ResId),
ok.
set_new_owner(ResId) -> set_new_owner(ResId) ->
MgrId = make_manager_id(ResId), MgrId = make_manager_id(ResId),
@ -459,9 +487,6 @@ get_owner(ResId) ->
[] -> not_found [] -> not_found
end. end.
handle_disconnected_state_enter(Data) ->
{next_state, disconnected, Data, retry_actions(Data)}.
retry_actions(Data) -> retry_actions(Data) ->
case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
undefined -> undefined ->
@ -470,24 +495,27 @@ retry_actions(Data) ->
[{state_timeout, RetryInterval, auto_retry}] [{state_timeout, RetryInterval, auto_retry}]
end. end.
health_check_actions(Data) ->
[{state_timeout, health_check_interval(Data#data.opts), health_check}].
handle_remove_event(From, ClearMetrics, Data) -> handle_remove_event(From, ClearMetrics, Data) ->
stop_resource(Data), _ = stop_resource(Data),
ok = delete_cache(Data#data.id, Data#data.manager_id),
ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts),
case ClearMetrics of case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
false -> ok false -> ok
end, end,
{stop_and_reply, normal, [{reply, From, ok}]}. {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}.
start_resource(Data, From) -> start_resource(Data, From) ->
%% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache
insert_cache(Data#data.id, Data#data.group, Data),
case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of
{ok, ResourceState} -> {ok, ResourceState} ->
UpdatedData = Data#data{state = ResourceState, status = connecting}, UpdatedData = Data#data{status = connecting, state = ResourceState},
%% Perform an initial health_check immediately before transitioning into a connected state %% Perform an initial health_check immediately before transitioning into a connected state
Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok),
{next_state, connecting, UpdatedData, Actions}; {next_state, connecting, update_state(UpdatedData, Data), Actions};
{error, Reason} = Err -> {error, Reason} = Err ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => start_resource_failed, msg => start_resource_failed,
@ -497,34 +525,42 @@ start_resource(Data, From) ->
_ = maybe_alarm(disconnected, Data#data.id), _ = maybe_alarm(disconnected, Data#data.id),
%% Keep track of the error reason why the connection did not work %% Keep track of the error reason why the connection did not work
%% so that the Reason can be returned when the verification call is made. %% so that the Reason can be returned when the verification call is made.
UpdatedData = Data#data{error = Reason}, UpdatedData = Data#data{status = disconnected, error = Reason},
Actions = maybe_reply(retry_actions(UpdatedData), From, Err), Actions = maybe_reply(retry_actions(UpdatedData), From, Err),
{next_state, disconnected, UpdatedData, Actions} {next_state, disconnected, update_state(UpdatedData, Data), Actions}
end. end.
stop_resource(#data{state = undefined, id = ResId} = _Data) -> maybe_stop_resource(#data{status = Status} = Data) when Status /= stopped ->
_ = maybe_clear_alarm(ResId), stop_resource(Data);
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), maybe_stop_resource(#data{status = stopped} = Data) ->
ok; Data.
stop_resource(Data) ->
stop_resource(#data{state = ResState, id = ResId} = Data) ->
%% We don't care the return value of the Mod:on_stop/2. %% We don't care the return value of the Mod:on_stop/2.
%% The callback mod should make sure the resource is stopped after on_stop/2 %% The callback mod should make sure the resource is stopped after on_stop/2
%% is returned. %% is returned.
ResId = Data#data.id, case ResState /= undefined of
_ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state), true ->
emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, ResState);
false ->
ok
end,
_ = maybe_clear_alarm(ResId), _ = maybe_clear_alarm(ResId),
ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId),
ok. Data#data{status = stopped}.
make_test_id() -> make_test_id() ->
RandId = iolist_to_binary(emqx_misc:gen_id(16)), RandId = iolist_to_binary(emqx_misc:gen_id(16)),
<<?TEST_ID_PREFIX, RandId/binary>>. <<?TEST_ID_PREFIX, RandId/binary>>.
handle_manually_health_check(From, Data) -> handle_manually_health_check(From, Data) ->
with_health_check(Data, fun(Status, UpdatedData) -> with_health_check(
Actions = [{reply, From, {ok, Status}}], Data,
{next_state, Status, UpdatedData, Actions} fun(Status, UpdatedData) ->
end). Actions = [{reply, From, {ok, Status}}],
{next_state, Status, UpdatedData, Actions}
end
).
handle_connecting_health_check(Data) -> handle_connecting_health_check(Data) ->
with_health_check( with_health_check(
@ -533,8 +569,7 @@ handle_connecting_health_check(Data) ->
(connected, UpdatedData) -> (connected, UpdatedData) ->
{next_state, connected, UpdatedData}; {next_state, connected, UpdatedData};
(connecting, UpdatedData) -> (connecting, UpdatedData) ->
Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {keep_state, UpdatedData, health_check_actions(UpdatedData)};
{keep_state, UpdatedData, Actions};
(disconnected, UpdatedData) -> (disconnected, UpdatedData) ->
{next_state, disconnected, UpdatedData} {next_state, disconnected, UpdatedData}
end end
@ -545,8 +580,7 @@ handle_connected_health_check(Data) ->
Data, Data,
fun fun
(connected, UpdatedData) -> (connected, UpdatedData) ->
Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {keep_state, UpdatedData, health_check_actions(UpdatedData)};
{keep_state, UpdatedData, Actions};
(Status, UpdatedData) -> (Status, UpdatedData) ->
?SLOG(warning, #{ ?SLOG(warning, #{
msg => health_check_failed, msg => health_check_failed,
@ -568,8 +602,16 @@ with_health_check(Data, Func) ->
UpdatedData = Data#data{ UpdatedData = Data#data{
state = NewState, status = Status, error = Err state = NewState, status = Status, error = Err
}, },
insert_cache(ResId, UpdatedData#data.group, UpdatedData), Func(Status, update_state(UpdatedData, Data)).
Func(Status, UpdatedData).
update_state(Data) ->
update_state(Data, undefined).
update_state(DataWas, DataWas) ->
DataWas;
update_state(Data, _DataWas) ->
_ = insert_cache(Data#data.id, Data#data.group, Data),
Data.
health_check_interval(Opts) -> health_check_interval(Opts) ->
maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL).

View File

@ -75,8 +75,7 @@ on_start(InstId, #{name := Name} = Opts) ->
on_stop(_InstId, #{stop_error := true}) -> on_stop(_InstId, #{stop_error := true}) ->
{error, stop_error}; {error, stop_error};
on_stop(_InstId, #{pid := Pid}) -> on_stop(_InstId, #{pid := Pid}) ->
erlang:exit(Pid, shutdown), stop_counter_process(Pid).
ok.
on_query(_InstId, get_state, State) -> on_query(_InstId, get_state, State) ->
{ok, State}; {ok, State};
@ -247,6 +246,15 @@ spawn_counter_process(Name, Register) ->
true = maybe_register(Name, Pid, Register), true = maybe_register(Name, Pid, Register),
Pid. Pid.
stop_counter_process(Pid) ->
true = erlang:is_process_alive(Pid),
true = erlang:exit(Pid, shutdown),
receive
{'EXIT', Pid, shutdown} -> ok
after 5000 ->
{error, timeout}
end.
counter_loop() -> counter_loop() ->
counter_loop(#{ counter_loop(#{
counter => 0, counter => 0,

View File

@ -72,115 +72,156 @@ t_check_config(_) ->
{error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}). {error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}).
t_create_remove(_) -> t_create_remove(_) ->
{error, _} = emqx_resource:check_and_create_local( ?check_trace(
?ID, begin
?DEFAULT_RESOURCE_GROUP, ?assertMatch(
?TEST_RESOURCE, {error, _},
#{unknown => test_resource} emqx_resource:check_and_create_local(
), ?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
)
),
{ok, _} = emqx_resource:create( ?assertMatch(
?ID, {ok, _},
?DEFAULT_RESOURCE_GROUP, emqx_resource:create(
?TEST_RESOURCE, ?ID,
#{name => test_resource} ?DEFAULT_RESOURCE_GROUP,
), ?TEST_RESOURCE,
#{name => test_resource}
)
),
{ok, _} = emqx_resource:recreate( ?assertMatch(
?ID, {ok, _},
?TEST_RESOURCE, emqx_resource:recreate(
#{name => test_resource}, ?ID,
#{} ?TEST_RESOURCE,
), #{name => test_resource},
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), #{}
)
),
?assert(is_process_alive(Pid)), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
ok = emqx_resource:remove(?ID), ?assert(is_process_alive(Pid)),
{error, _} = emqx_resource:remove(?ID),
?assertNot(is_process_alive(Pid)). ?assertEqual(ok, emqx_resource:remove(?ID)),
?assertMatch({error, _}, emqx_resource:remove(?ID)),
?assertNot(is_process_alive(Pid))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_create_remove_local(_) -> t_create_remove_local(_) ->
{error, _} = emqx_resource:check_and_create_local( ?check_trace(
?ID, begin
?DEFAULT_RESOURCE_GROUP, ?assertMatch(
?TEST_RESOURCE, {error, _},
#{unknown => test_resource} emqx_resource:check_and_create_local(
), ?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
)
),
{ok, _} = emqx_resource:create_local( ?assertMatch(
?ID, {ok, _},
?DEFAULT_RESOURCE_GROUP, emqx_resource:create_local(
?TEST_RESOURCE, ?ID,
#{name => test_resource} ?DEFAULT_RESOURCE_GROUP,
), ?TEST_RESOURCE,
#{name => test_resource}
)
),
emqx_resource:recreate_local( emqx_resource:recreate_local(
?ID, ?ID,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource}, #{name => test_resource},
#{} #{}
), ),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid)), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
emqx_resource:set_resource_status_connecting(?ID), ?assert(is_process_alive(Pid)),
emqx_resource:recreate_local( emqx_resource:set_resource_status_connecting(?ID),
?ID,
?TEST_RESOURCE,
#{name => test_resource},
#{}
),
ok = emqx_resource:remove_local(?ID), emqx_resource:recreate_local(
{error, _} = emqx_resource:remove_local(?ID), ?ID,
?TEST_RESOURCE,
#{name => test_resource},
#{}
),
?assertMatch( ?assertEqual(ok, emqx_resource:remove_local(?ID)),
?RESOURCE_ERROR(not_found), ?assertMatch({error, _}, emqx_resource:remove_local(?ID)),
emqx_resource:query(?ID, get_state)
), ?assertMatch(
?assertNot(is_process_alive(Pid)). ?RESOURCE_ERROR(not_found),
emqx_resource:query(?ID, get_state)
),
?assertNot(is_process_alive(Pid))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_do_not_start_after_created(_) -> t_do_not_start_after_created(_) ->
ct:pal("creating resource"), ?check_trace(
{ok, _} = emqx_resource:create_local( begin
?ID, ?assertMatch(
?DEFAULT_RESOURCE_GROUP, {ok, _},
?TEST_RESOURCE, emqx_resource:create_local(
#{name => test_resource}, ?ID,
#{start_after_created => false} ?DEFAULT_RESOURCE_GROUP,
), ?TEST_RESOURCE,
%% the resource should remain `disconnected` after created #{name => test_resource},
timer:sleep(200), #{start_after_created => false}
?assertMatch( )
?RESOURCE_ERROR(stopped), ),
emqx_resource:query(?ID, get_state) %% the resource should remain `disconnected` after created
), timer:sleep(200),
?assertMatch( ?assertMatch(
{ok, _, #{status := stopped}}, ?RESOURCE_ERROR(stopped),
emqx_resource:get_instance(?ID) emqx_resource:query(?ID, get_state)
), ),
?assertMatch(
{ok, _, #{status := stopped}},
emqx_resource:get_instance(?ID)
),
%% start the resource manually.. %% start the resource manually..
ct:pal("starting resource manually"), ?assertEqual(ok, emqx_resource:start(?ID)),
ok = emqx_resource:start(?ID), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid)),
?assert(is_process_alive(Pid)),
%% restart the resource %% restart the resource
ct:pal("restarting resource"), ?assertEqual(ok, emqx_resource:restart(?ID)),
ok = emqx_resource:restart(?ID), ?assertNot(is_process_alive(Pid)),
?assertNot(is_process_alive(Pid)), {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state),
{ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid2)),
?assert(is_process_alive(Pid2)),
ct:pal("removing resource"), ?assertEqual(ok, emqx_resource:remove_local(?ID)),
ok = emqx_resource:remove_local(?ID),
?assertNot(is_process_alive(Pid2)). ?assertNot(is_process_alive(Pid2))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_query(_) -> t_query(_) ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
@ -222,7 +263,11 @@ t_batch_query_counter(_) ->
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, register => true}, #{name => test_resource, register => true},
#{batch_size => BatchSize, query_mode => sync} #{
batch_size => BatchSize,
batch_time => 100,
query_mode => sync
}
), ),
?check_trace( ?check_trace(
@ -581,6 +626,7 @@ t_query_counter_async_inflight_batch(_) ->
#{ #{
query_mode => async, query_mode => async,
batch_size => BatchSize, batch_size => BatchSize,
batch_time => 100,
async_inflight_window => WindowSize, async_inflight_window => WindowSize,
worker_pool_size => 1, worker_pool_size => 1,
resume_interval => 300 resume_interval => 300
@ -771,153 +817,210 @@ t_query_counter_async_inflight_batch(_) ->
ok = emqx_resource:remove_local(?ID). ok = emqx_resource:remove_local(?ID).
t_healthy_timeout(_) -> t_healthy_timeout(_) ->
{ok, _} = emqx_resource:create_local( ?check_trace(
?ID, begin
?DEFAULT_RESOURCE_GROUP, ?assertMatch(
?TEST_RESOURCE, {ok, _},
#{name => <<"bad_not_atom_name">>, register => true}, emqx_resource:create_local(
%% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later. ?ID,
#{health_check_interval => 200} ?DEFAULT_RESOURCE_GROUP,
), ?TEST_RESOURCE,
?assertMatch( #{name => <<"bad_not_atom_name">>, register => true},
{error, {resource_error, #{reason := timeout}}}, %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later.
emqx_resource:query(?ID, get_state, #{timeout => 1_000}) #{health_check_interval => 200}
), )
?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)), ),
ok = emqx_resource:remove_local(?ID). ?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(?ID, get_state, #{timeout => 1_000})
),
?assertMatch(
{ok, _Group, #{status := disconnected}}, emqx_resource_manager:lookup(?ID)
),
?assertEqual(ok, emqx_resource:remove_local(?ID))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_healthy(_) -> t_healthy(_) ->
{ok, _} = emqx_resource:create_local( ?check_trace(
?ID, begin
?DEFAULT_RESOURCE_GROUP, ?assertMatch(
?TEST_RESOURCE, {ok, _},
#{name => test_resource} emqx_resource:create_local(
), ?ID,
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), ?DEFAULT_RESOURCE_GROUP,
timer:sleep(300), ?TEST_RESOURCE,
emqx_resource:set_resource_status_connecting(?ID), #{name => test_resource}
)
),
{ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state),
timer:sleep(300),
emqx_resource:set_resource_status_connecting(?ID),
{ok, connected} = emqx_resource:health_check(?ID), ?assertEqual({ok, connected}, emqx_resource:health_check(?ID)),
?assertMatch( ?assertMatch(
[#{status := connected}], [#{status := connected}],
emqx_resource:list_instances_verbose() emqx_resource:list_instances_verbose()
), ),
erlang:exit(Pid, shutdown), erlang:exit(Pid, shutdown),
?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)), ?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)),
?assertMatch( ?assertMatch(
[#{status := disconnected}], [#{status := disconnected}],
emqx_resource:list_instances_verbose() emqx_resource:list_instances_verbose()
), ),
ok = emqx_resource:remove_local(?ID). ?assertEqual(ok, emqx_resource:remove_local(?ID))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_stop_start(_) -> t_stop_start(_) ->
{error, _} = emqx_resource:check_and_create( ?check_trace(
?ID, begin
?DEFAULT_RESOURCE_GROUP, ?assertMatch(
?TEST_RESOURCE, {error, _},
#{unknown => test_resource} emqx_resource:check_and_create(
), ?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
)
),
{ok, _} = emqx_resource:check_and_create( ?assertMatch(
?ID, {ok, _},
?DEFAULT_RESOURCE_GROUP, emqx_resource:check_and_create(
?TEST_RESOURCE, ?ID,
#{<<"name">> => <<"test_resource">>} ?DEFAULT_RESOURCE_GROUP,
), ?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>}
)
),
%% add some metrics to test their persistence %% add some metrics to test their persistence
WorkerID0 = <<"worker:0">>, WorkerID0 = <<"worker:0">>,
WorkerID1 = <<"worker:1">>, WorkerID1 = <<"worker:1">>,
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2), emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2),
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3), emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3),
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
{ok, _} = emqx_resource:check_and_recreate( ?assertMatch(
?ID, {ok, _},
?TEST_RESOURCE, emqx_resource:check_and_recreate(
#{<<"name">> => <<"test_resource">>}, ?ID,
#{} ?TEST_RESOURCE,
), #{<<"name">> => <<"test_resource">>},
#{}
)
),
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid0)), ?assert(is_process_alive(Pid0)),
%% metrics are reset when recreating %% metrics are reset when recreating
%% depending on timing, might show the request we just did. %% depending on timing, might show the request we just did.
ct:sleep(500), ct:sleep(500),
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)),
ok = emqx_resource:stop(?ID), ok = emqx_resource:stop(?ID),
?assertNot(is_process_alive(Pid0)), ?assertNot(is_process_alive(Pid0)),
?assertMatch( ?assertMatch(
?RESOURCE_ERROR(stopped), ?RESOURCE_ERROR(stopped),
emqx_resource:query(?ID, get_state) emqx_resource:query(?ID, get_state)
), ),
ok = emqx_resource:restart(?ID), ?assertEqual(ok, emqx_resource:restart(?ID)),
timer:sleep(300), timer:sleep(300),
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid1)), ?assert(is_process_alive(Pid1)),
%% now stop while resetting the metrics %% now stop while resetting the metrics
ct:sleep(500), ct:sleep(500),
emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1), emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1),
emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4), emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4),
?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)),
ok = emqx_resource:stop(?ID), ?assertEqual(ok, emqx_resource:stop(?ID)),
?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID))
end,
ok. fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_stop_start_local(_) -> t_stop_start_local(_) ->
{error, _} = emqx_resource:check_and_create_local( ?check_trace(
?ID, begin
?DEFAULT_RESOURCE_GROUP, ?assertMatch(
?TEST_RESOURCE, {error, _},
#{unknown => test_resource} emqx_resource:check_and_create_local(
), ?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{unknown => test_resource}
)
),
{ok, _} = emqx_resource:check_and_create_local( ?assertMatch(
?ID, {ok, _},
?DEFAULT_RESOURCE_GROUP, emqx_resource:check_and_create_local(
?TEST_RESOURCE, ?ID,
#{<<"name">> => <<"test_resource">>} ?DEFAULT_RESOURCE_GROUP,
), ?TEST_RESOURCE,
#{<<"name">> => <<"test_resource">>}
)
),
{ok, _} = emqx_resource:check_and_recreate_local( ?assertMatch(
?ID, {ok, _},
?TEST_RESOURCE, emqx_resource:check_and_recreate_local(
#{<<"name">> => <<"test_resource">>}, ?ID,
#{} ?TEST_RESOURCE,
), #{<<"name">> => <<"test_resource">>},
#{}
)
),
{ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid0)), ?assert(is_process_alive(Pid0)),
ok = emqx_resource:stop(?ID), ?assertEqual(ok, emqx_resource:stop(?ID)),
?assertNot(is_process_alive(Pid0)), ?assertNot(is_process_alive(Pid0)),
?assertMatch( ?assertMatch(
?RESOURCE_ERROR(stopped), ?RESOURCE_ERROR(stopped),
emqx_resource:query(?ID, get_state) emqx_resource:query(?ID, get_state)
), ),
ok = emqx_resource:restart(?ID), ?assertEqual(ok, emqx_resource:restart(?ID)),
{ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state),
?assert(is_process_alive(Pid1)). ?assert(is_process_alive(Pid1))
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
).
t_list_filter(_) -> t_list_filter(_) ->
{ok, _} = emqx_resource:create_local( {ok, _} = emqx_resource:create_local(
@ -1031,16 +1134,24 @@ t_auto_retry(_) ->
?assertEqual(ok, Res). ?assertEqual(ok, Res).
t_health_check_disconnected(_) -> t_health_check_disconnected(_) ->
_ = emqx_resource:create_local( ?check_trace(
?ID, begin
?DEFAULT_RESOURCE_GROUP, _ = emqx_resource:create_local(
?TEST_RESOURCE, ?ID,
#{name => test_resource, create_error => true}, ?DEFAULT_RESOURCE_GROUP,
#{auto_retry_interval => 100} ?TEST_RESOURCE,
), #{name => test_resource, create_error => true},
?assertEqual( #{auto_retry_interval => 100}
{ok, disconnected}, ),
emqx_resource:health_check(?ID) ?assertEqual(
{ok, disconnected},
emqx_resource:health_check(?ID)
)
end,
fun(Trace) ->
?assertEqual([], ?of_kind("inconsistent_state", Trace)),
?assertEqual([], ?of_kind("inconsistent_cache", Trace))
end
). ).
t_unblock_only_required_buffer_workers(_) -> t_unblock_only_required_buffer_workers(_) ->
@ -1051,7 +1162,8 @@ t_unblock_only_required_buffer_workers(_) ->
#{name => test_resource}, #{name => test_resource},
#{ #{
query_mode => async, query_mode => async,
batch_size => 5 batch_size => 5,
batch_time => 100
} }
), ),
lists:foreach( lists:foreach(
@ -1065,7 +1177,8 @@ t_unblock_only_required_buffer_workers(_) ->
#{name => test_resource}, #{name => test_resource},
#{ #{
query_mode => async, query_mode => async,
batch_size => 5 batch_size => 5,
batch_time => 100
} }
), ),
%% creation of `?ID1` should not have unblocked `?ID`'s buffer workers %% creation of `?ID1` should not have unblocked `?ID`'s buffer workers
@ -1096,6 +1209,7 @@ t_retry_batch(_Config) ->
#{ #{
query_mode => async, query_mode => async,
batch_size => 5, batch_size => 5,
batch_time => 100,
worker_pool_size => 1, worker_pool_size => 1,
resume_interval => 1_000 resume_interval => 1_000
} }
@ -1465,7 +1579,6 @@ t_retry_async_inflight_full(_Config) ->
query_mode => async, query_mode => async,
async_inflight_window => AsyncInflightWindow, async_inflight_window => AsyncInflightWindow,
batch_size => 1, batch_size => 1,
batch_time => 20,
worker_pool_size => 1, worker_pool_size => 1,
resume_interval => ResumeInterval resume_interval => ResumeInterval
} }
@ -1980,7 +2093,6 @@ t_expiration_async_after_reply(_Config) ->
#{ #{
query_mode => async, query_mode => async,
batch_size => 1, batch_size => 1,
batch_time => 100,
worker_pool_size => 1, worker_pool_size => 1,
resume_interval => 1_000 resume_interval => 1_000
} }
@ -2203,7 +2315,6 @@ t_expiration_retry(_Config) ->
#{ #{
query_mode => sync, query_mode => sync,
batch_size => 1, batch_size => 1,
batch_time => 100,
worker_pool_size => 1, worker_pool_size => 1,
resume_interval => 300 resume_interval => 300
} }
@ -2393,7 +2504,6 @@ t_recursive_flush(_Config) ->
#{ #{
query_mode => async, query_mode => async,
batch_size => 1, batch_size => 1,
batch_time => 10_000,
worker_pool_size => 1 worker_pool_size => 1
} }
), ),