From 686bf8255b4a18f524d7a95983e27d094a42ad33 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 Mar 2023 14:35:08 +0300 Subject: [PATCH 1/7] fix(bridge): reply `emqx_resource:get_instance/1` from cache The resource manager may be busy at times, so this change ensures that getting resource instance state will not block. Currently, no users of `emqx_resource:get_instance/1` do seem to be relying on state being "as-actual-as-possible" guarantee it was providing. --- apps/emqx_resource/src/emqx_resource.erl | 2 +- .../src/emqx_resource_manager.erl | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 1c5eecfbb..2c6865e04 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -309,7 +309,7 @@ set_resource_status_connecting(ResId) -> -spec get_instance(resource_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. get_instance(ResId) -> - emqx_resource_manager:lookup(ResId). + emqx_resource_manager:ets_lookup(ResId, [metrics]). -spec fetch_creation_opts(map()) -> creation_opts(). fetch_creation_opts(Opts) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index ee9e218b2..05d100913 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -36,6 +36,7 @@ list_all/0, list_group/1, ets_lookup/1, + ets_lookup/2, get_metrics/1, reset_metrics/1 ]). @@ -229,14 +230,25 @@ set_resource_status_connecting(ResId) -> -spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. lookup(ResId) -> case safe_call(ResId, lookup, ?T_LOOKUP) of - {error, timeout} -> ets_lookup(ResId); + {error, timeout} -> ets_lookup(ResId, [metrics]); Result -> Result end. -%% @doc Lookup the group and data of a resource +%% @doc Lookup the group and data of a resource from the cache -spec ets_lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. ets_lookup(ResId) -> + ets_lookup(ResId, []). + +%% @doc Lookup the group and data of a resource from the cache +-spec ets_lookup(resource_id(), [Option]) -> + {ok, resource_group(), resource_data()} | {error, not_found} +when + Option :: metrics. +ets_lookup(ResId, Options) -> + NeedMetrics = lists:member(metrics, Options), case read_cache(ResId) of + {Group, Data} when NeedMetrics -> + {ok, Group, data_record_to_external_map_with_metrics(Data)}; {Group, Data} -> {ok, Group, data_record_to_external_map(Data)}; not_found -> @@ -253,7 +265,7 @@ reset_metrics(ResId) -> emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId). %% @doc Returns the data for all resources --spec list_all() -> [resource_data()] | []. +-spec list_all() -> [resource_data()]. list_all() -> try [ From 53bc27e0f43db534f9502c2304ac218feea09f59 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 Mar 2023 14:49:38 +0300 Subject: [PATCH 2/7] refactor(bridge): avoid unnecessary `maps:to_list/1` when listing --- apps/emqx_bridge/src/emqx_bridge.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index ddf24d380..292369d36 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -226,21 +226,21 @@ post_config_update(_, _Req, NewConf, OldConf, _AppEnv) -> Result. list() -> - lists:foldl( - fun({Type, NameAndConf}, Bridges) -> - lists:foldl( - fun({Name, RawConf}, Acc) -> + maps:fold( + fun(Type, NameAndConf, Bridges) -> + maps:fold( + fun(Name, RawConf, Acc) -> case lookup(Type, Name, RawConf) of {error, not_found} -> Acc; {ok, Res} -> [Res | Acc] end end, Bridges, - maps:to_list(NameAndConf) + NameAndConf ) end, [], - maps:to_list(emqx:get_raw_config([bridges], #{})) + emqx:get_raw_config([bridges], #{}) ). lookup(Id) -> From cad6492c990c4f694878fda8a1b64ed3bc776741 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 Mar 2023 14:51:28 +0300 Subject: [PATCH 3/7] perf(bridge-api): ask bridge listings in parallel Also rename response formatting functions to better clarify their purpose. --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_bridge/src/emqx_bridge_api.erl | 34 +++-- .../src/proto/emqx_bridge_proto_v2.erl | 4 + .../src/proto/emqx_bridge_proto_v3.erl | 128 ++++++++++++++++++ apps/emqx_resource/src/emqx_resource.erl | 2 + 5 files changed, 154 insertions(+), 15 deletions(-) create mode 100644 apps/emqx_bridge/src/proto/emqx_bridge_proto_v3.erl diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 769145722..904611199 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -4,6 +4,7 @@ {emqx_authz,1}. {emqx_bridge,1}. {emqx_bridge,2}. +{emqx_bridge,3}. {emqx_broker,1}. {emqx_cm,1}. {emqx_conf,1}. diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index ff55976d0..e9f31d010 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -483,11 +483,18 @@ schema("/bridges_probe") -> end end; '/bridges'(get, _Params) -> - {200, - zip_bridges([ - [format_resp(Data, Node) || Data <- emqx_bridge_proto_v1:list_bridges(Node)] - || Node <- mria:running_nodes() - ])}. + Nodes = mria:running_nodes(), + NodeReplies = emqx_bridge_proto_v3:list_bridges_on_nodes(Nodes), + case is_ok(NodeReplies) of + {ok, NodeBridges} -> + AllBridges = [ + format_resource(Data, Node) + || {Node, Bridges} <- lists:zip(Nodes, NodeBridges), Data <- Bridges + ], + {200, zip_bridges([AllBridges])}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} + end. '/bridges/:id'(get, #{bindings := #{id := Id}}) -> ?TRY_PARSE_ID(Id, lookup_from_all_nodes(BridgeType, BridgeName, 200)); @@ -591,7 +598,7 @@ lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) -> do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) -> Nodes = mria:running_nodes(), - case is_ok(emqx_bridge_proto_v1:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of + case is_ok(emqx_bridge_proto_v3:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of {ok, [{ok, _} | _] = Results} -> {SuccCode, FormatFun([R || {ok, R} <- Results])}; {ok, [{error, not_found} | _]} -> @@ -602,7 +609,7 @@ do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) -> lookup_from_local_node(BridgeType, BridgeName) -> case emqx_bridge:lookup(BridgeType, BridgeName) of - {ok, Res} -> {ok, format_resp(Res)}; + {ok, Res} -> {ok, format_resource(Res, node())}; Error -> Error end. @@ -802,10 +809,7 @@ aggregate_metrics( aggregate_metrics(#{}, Metrics) -> Metrics. -format_resp(Data) -> - format_resp(Data, node()). - -format_resp( +format_resource( #{ type := Type, name := BridgeName, @@ -974,7 +978,7 @@ do_bpapi_call(Node, Call, Args) -> do_bpapi_call_vsn(SupportedVersion, Call, Args) -> case lists:member(SupportedVersion, supported_versions(Call)) of true -> - apply(emqx_bridge_proto_v2, Call, Args); + apply(emqx_bridge_proto_v3, Call, Args); false -> {error, not_implemented} end. @@ -984,9 +988,9 @@ maybe_unwrap({error, not_implemented}) -> maybe_unwrap(RpcMulticallResult) -> emqx_rpc:unwrap_erpc(RpcMulticallResult). -supported_versions(start_bridge_to_node) -> [2]; -supported_versions(start_bridges_to_all_nodes) -> [2]; -supported_versions(_Call) -> [1, 2]. +supported_versions(start_bridge_to_node) -> [2, 3]; +supported_versions(start_bridges_to_all_nodes) -> [2, 3]; +supported_versions(_Call) -> [1, 2, 3]. to_hr_reason(nxdomain) -> <<"Host not found">>; diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl index 0fd733380..bcf6ca198 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v2.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, list_bridges/1, restart_bridge_to_node/3, @@ -38,6 +39,9 @@ introduced_in() -> "5.0.17". +deprecated_since() -> + "5.0.21". + -spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). list_bridges(Node) -> rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v3.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v3.erl new file mode 100644 index 000000000..a35db5d96 --- /dev/null +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v3.erl @@ -0,0 +1,128 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_proto_v3). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + list_bridges/1, + list_bridges_on_nodes/1, + restart_bridge_to_node/3, + start_bridge_to_node/3, + stop_bridge_to_node/3, + lookup_from_all_nodes/3, + restart_bridges_to_all_nodes/3, + start_bridges_to_all_nodes/3, + stop_bridges_to_all_nodes/3 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.0.21". + +-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). +list_bridges(Node) -> + rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). + +-spec list_bridges_on_nodes([node()]) -> + emqx_rpc:erpc_multicall([emqx_resource:resource_data()]). +list_bridges_on_nodes(Nodes) -> + erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT). + +-type key() :: atom() | binary() | [byte()]. + +-spec restart_bridge_to_node(node(), key(), key()) -> + term(). +restart_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridge_to_node(node(), key(), key()) -> + term(). +start_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridge_to_node(node(), key(), key()) -> + term(). +stop_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec restart_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + restart, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec start_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec stop_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_resource, + stop, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec lookup_from_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_api, + lookup_from_local_node, + [BridgeType, BridgeName], + ?TIMEOUT + ). diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 2c6865e04..57d56b339 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -112,6 +112,8 @@ -export([apply_reply_fun/2]). +-export_type([resource_data/0]). + -optional_callbacks([ on_query/3, on_batch_query/3, From b3e7e51094daf95d5ad3cee60db920300f793f11 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 Mar 2023 18:12:41 +0300 Subject: [PATCH 4/7] test(bridge): drop unnecessary cleanup routines Since `end_per_testcase` cleans out all the resources anyway. --- .../emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index e2c9382db..bd6af9323 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -19,7 +19,6 @@ -compile(export_all). -import(emqx_dashboard_api_test_helpers, [request/4, uri/1]). --import(emqx_common_test_helpers, [on_exit/1]). -include("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -164,9 +163,9 @@ init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), ok = snabbkaffe:start_trace(), Config. + end_per_testcase(_, _Config) -> clear_resources(), - emqx_common_test_helpers:call_janitor(), snabbkaffe:stop(), ok. @@ -710,13 +709,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> } ), - on_exit(fun() -> - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - ok - end), - %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, @@ -827,13 +819,6 @@ t_mqtt_conn_bridge_egress_async_reconnect(_) -> } ), - on_exit(fun() -> - %% delete the bridge - {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), - ok - end), - Self = self(), LocalTopic = <>, RemoteTopic = <>, From e411c5d5f8cb357e48ba9fa33ae713ec69f07019 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 13 Mar 2023 18:28:28 +0300 Subject: [PATCH 5/7] refactor(resman): work with state cache atomically Also ensure that cache entries are always consistent with `Data`, so that most of the code could rely on reading the cached entry most of the time. --- .../src/emqx_resource_manager.erl | 162 +++--- .../test/emqx_connector_demo.erl | 12 +- .../test/emqx_resource_SUITE.erl | 528 +++++++++++------- 3 files changed, 423 insertions(+), 279 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 05d100913..0983dff8d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -18,6 +18,7 @@ -include("emqx_resource.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). % API -export([ @@ -303,26 +304,30 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> query_mode = maps:get(query_mode, Opts, sync), config = Config, opts = Opts, - status = connecting, state = undefined, error = undefined }, gen_statem:start_link(?MODULE, {Data, Opts}, []). -init({Data, Opts}) -> +init({DataIn, Opts}) -> process_flag(trap_exit, true), - %% init the cache so that lookup/1 will always return something - DataWithPid = Data#data{pid = self()}, - insert_cache(DataWithPid#data.id, DataWithPid#data.group, DataWithPid), + Data = DataIn#data{pid = self()}, case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of - true -> {ok, connecting, DataWithPid, {next_event, internal, start_resource}}; - false -> {ok, stopped, DataWithPid} + true -> + %% init the cache so that lookup/1 will always return something + UpdatedData = update_state(Data#data{status = connecting}), + {ok, connecting, UpdatedData, {next_event, internal, start_resource}}; + false -> + %% init the cache so that lookup/1 will always return something + UpdatedData = update_state(Data#data{status = stopped}), + {ok, stopped, UpdatedData} end. +terminate({shutdown, removed}, _State, _Data) -> + ok; terminate(_Reason, _State, Data) -> - _ = stop_resource(Data), - _ = maybe_clear_alarm(Data#data.id), - delete_cache(Data#data.id, Data#data.manager_id), + _ = maybe_stop_resource(Data), + ok = delete_cache(Data#data.id, Data#data.manager_id), ok. %% Behavior callback @@ -333,11 +338,12 @@ callback_mode() -> [handle_event_function, state_enter]. % Called during testing to force a specific state handle_event({call, From}, set_resource_status_connecting, _State, Data) -> - {next_state, connecting, Data#data{status = connecting}, [{reply, From, ok}]}; + UpdatedData = update_state(Data#data{status = connecting}, Data), + {next_state, connecting, UpdatedData, [{reply, From, ok}]}; % Called when the resource is to be restarted handle_event({call, From}, restart, _State, Data) -> - _ = stop_resource(Data), - start_resource(Data, From); + DataNext = stop_resource(Data), + start_resource(DataNext, From); % Called when the resource is to be started (also used for manual reconnect) handle_event({call, From}, start, State, Data) when State =:= stopped orelse @@ -347,16 +353,14 @@ handle_event({call, From}, start, State, Data) when handle_event({call, From}, start, _State, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; % Called when the resource received a `quit` message -handle_event(info, quit, stopped, _Data) -> - {stop, {shutdown, quit}}; handle_event(info, quit, _State, _Data) -> {stop, {shutdown, quit}}; % Called when the resource is to be stopped handle_event({call, From}, stop, stopped, _Data) -> {keep_state_and_data, [{reply, From, ok}]}; handle_event({call, From}, stop, _State, Data) -> - Result = stop_resource(Data), - {next_state, stopped, Data, [{reply, From, Result}]}; + UpdatedData = stop_resource(Data), + {next_state, stopped, update_state(UpdatedData, Data), [{reply, From, ok}]}; % Called when a resource is to be stopped and removed. handle_event({call, From}, {remove, ClearMetrics}, _State, Data) -> handle_remove_event(From, ClearMetrics, Data); @@ -371,11 +375,9 @@ handle_event({call, From}, health_check, stopped, _Data) -> handle_event({call, From}, health_check, _State, Data) -> handle_manually_health_check(From, Data); % State: CONNECTING -handle_event(enter, _OldState, connecting, Data) -> - UpdatedData = Data#data{status = connecting}, - insert_cache(Data#data.id, Data#data.group, Data), - Actions = [{state_timeout, 0, health_check}], - {keep_state, UpdatedData, Actions}; +handle_event(enter, _OldState, connecting = State, Data) -> + ok = log_state_consistency(State, Data), + {keep_state_and_data, [{state_timeout, 0, health_check}]}; handle_event(internal, start_resource, connecting, Data) -> start_resource(Data, undefined); handle_event(state_timeout, health_check, connecting, Data) -> @@ -383,27 +385,23 @@ handle_event(state_timeout, health_check, connecting, Data) -> %% State: CONNECTED %% The connected state is entered after a successful on_start/2 of the callback mod %% and successful health_checks -handle_event(enter, _OldState, connected, Data) -> - UpdatedData = Data#data{status = connected}, - insert_cache(Data#data.id, Data#data.group, UpdatedData), +handle_event(enter, _OldState, connected = State, Data) -> + ok = log_state_consistency(State, Data), _ = emqx_alarm:deactivate(Data#data.id), - Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], - {next_state, connected, UpdatedData, Actions}; + {keep_state_and_data, health_check_actions(Data)}; handle_event(state_timeout, health_check, connected, Data) -> handle_connected_health_check(Data); %% State: DISCONNECTED -handle_event(enter, _OldState, disconnected, Data) -> - UpdatedData = Data#data{status = disconnected}, - insert_cache(Data#data.id, Data#data.group, UpdatedData), - handle_disconnected_state_enter(UpdatedData); +handle_event(enter, _OldState, disconnected = State, Data) -> + ok = log_state_consistency(State, Data), + {keep_state_and_data, retry_actions(Data)}; handle_event(state_timeout, auto_retry, disconnected, Data) -> start_resource(Data, undefined); %% State: STOPPED %% The stopped state is entered after the resource has been explicitly stopped -handle_event(enter, _OldState, stopped, Data) -> - UpdatedData = Data#data{status = stopped}, - insert_cache(Data#data.id, Data#data.group, UpdatedData), - {next_state, stopped, UpdatedData}; +handle_event(enter, _OldState, stopped = State, Data) -> + ok = log_state_consistency(State, Data), + {keep_state_and_data, []}; % Ignore all other events handle_event(EventType, EventData, State, Data) -> ?SLOG( @@ -418,6 +416,22 @@ handle_event(EventType, EventData, State, Data) -> ), keep_state_and_data. +log_state_consistency(State, #data{status = State} = Data) -> + log_cache_consistency(read_cache(Data#data.id), Data); +log_state_consistency(State, Data) -> + ?tp(warning, "inconsistent_state", #{ + state => State, + data => Data + }). + +log_cache_consistency({_, Data}, Data) -> + ok; +log_cache_consistency({_, DataCached}, Data) -> + ?tp(warning, "inconsistent_cache", #{ + cache => DataCached, + data => Data + }). + %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ @@ -451,10 +465,12 @@ delete_cache(ResId, MgrId) -> end. do_delete_cache(<> = ResId) -> - ets:delete(?ETS_TABLE, {owner, ResId}), - ets:delete(?ETS_TABLE, ResId); + true = ets:delete(?ETS_TABLE, {owner, ResId}), + true = ets:delete(?ETS_TABLE, ResId), + ok; do_delete_cache(ResId) -> - ets:delete(?ETS_TABLE, ResId). + true = ets:delete(?ETS_TABLE, ResId), + ok. set_new_owner(ResId) -> MgrId = make_manager_id(ResId), @@ -471,9 +487,6 @@ get_owner(ResId) -> [] -> not_found end. -handle_disconnected_state_enter(Data) -> - {next_state, disconnected, Data, retry_actions(Data)}. - retry_actions(Data) -> case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of undefined -> @@ -482,24 +495,27 @@ retry_actions(Data) -> [{state_timeout, RetryInterval, auto_retry}] end. +health_check_actions(Data) -> + [{state_timeout, health_check_interval(Data#data.opts), health_check}]. + handle_remove_event(From, ClearMetrics, Data) -> - stop_resource(Data), + _ = stop_resource(Data), + ok = delete_cache(Data#data.id, Data#data.manager_id), ok = emqx_resource_buffer_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, - {stop_and_reply, normal, [{reply, From, ok}]}. + {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}. start_resource(Data, From) -> %% in case the emqx_resource:call_start/2 hangs, the lookup/1 can read status from the cache - insert_cache(Data#data.id, Data#data.group, Data), case emqx_resource:call_start(Data#data.manager_id, Data#data.mod, Data#data.config) of {ok, ResourceState} -> - UpdatedData = Data#data{state = ResourceState, status = connecting}, + UpdatedData = Data#data{status = connecting, state = ResourceState}, %% Perform an initial health_check immediately before transitioning into a connected state Actions = maybe_reply([{state_timeout, 0, health_check}], From, ok), - {next_state, connecting, UpdatedData, Actions}; + {next_state, connecting, update_state(UpdatedData, Data), Actions}; {error, Reason} = Err -> ?SLOG(warning, #{ msg => start_resource_failed, @@ -509,34 +525,42 @@ start_resource(Data, From) -> _ = maybe_alarm(disconnected, Data#data.id), %% Keep track of the error reason why the connection did not work %% so that the Reason can be returned when the verification call is made. - UpdatedData = Data#data{error = Reason}, + UpdatedData = Data#data{status = disconnected, error = Reason}, Actions = maybe_reply(retry_actions(UpdatedData), From, Err), - {next_state, disconnected, UpdatedData, Actions} + {next_state, disconnected, update_state(UpdatedData, Data), Actions} end. -stop_resource(#data{state = undefined, id = ResId} = _Data) -> - _ = maybe_clear_alarm(ResId), - ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), - ok; -stop_resource(Data) -> +maybe_stop_resource(#data{status = Status} = Data) when Status /= stopped -> + stop_resource(Data); +maybe_stop_resource(#data{status = stopped} = Data) -> + Data. + +stop_resource(#data{state = ResState, id = ResId} = Data) -> %% We don't care the return value of the Mod:on_stop/2. %% The callback mod should make sure the resource is stopped after on_stop/2 %% is returned. - ResId = Data#data.id, - _ = emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, Data#data.state), + case ResState /= undefined of + true -> + emqx_resource:call_stop(Data#data.manager_id, Data#data.mod, ResState); + false -> + ok + end, _ = maybe_clear_alarm(ResId), ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, ResId), - ok. + Data#data{status = stopped}. make_test_id() -> RandId = iolist_to_binary(emqx_misc:gen_id(16)), <>. handle_manually_health_check(From, Data) -> - with_health_check(Data, fun(Status, UpdatedData) -> - Actions = [{reply, From, {ok, Status}}], - {next_state, Status, UpdatedData, Actions} - end). + with_health_check( + Data, + fun(Status, UpdatedData) -> + Actions = [{reply, From, {ok, Status}}], + {next_state, Status, UpdatedData, Actions} + end + ). handle_connecting_health_check(Data) -> with_health_check( @@ -545,8 +569,7 @@ handle_connecting_health_check(Data) -> (connected, UpdatedData) -> {next_state, connected, UpdatedData}; (connecting, UpdatedData) -> - Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], - {keep_state, UpdatedData, Actions}; + {keep_state, UpdatedData, health_check_actions(UpdatedData)}; (disconnected, UpdatedData) -> {next_state, disconnected, UpdatedData} end @@ -557,8 +580,7 @@ handle_connected_health_check(Data) -> Data, fun (connected, UpdatedData) -> - Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], - {keep_state, UpdatedData, Actions}; + {keep_state, UpdatedData, health_check_actions(UpdatedData)}; (Status, UpdatedData) -> ?SLOG(warning, #{ msg => health_check_failed, @@ -580,8 +602,16 @@ with_health_check(Data, Func) -> UpdatedData = Data#data{ state = NewState, status = Status, error = Err }, - insert_cache(ResId, UpdatedData#data.group, UpdatedData), - Func(Status, UpdatedData). + Func(Status, update_state(UpdatedData, Data)). + +update_state(Data) -> + update_state(Data, undefined). + +update_state(DataWas, DataWas) -> + DataWas; +update_state(Data, _DataWas) -> + _ = insert_cache(Data#data.id, Data#data.group, Data), + Data. health_check_interval(Opts) -> maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL). diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index f41087b20..a863dbb78 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -75,8 +75,7 @@ on_start(InstId, #{name := Name} = Opts) -> on_stop(_InstId, #{stop_error := true}) -> {error, stop_error}; on_stop(_InstId, #{pid := Pid}) -> - erlang:exit(Pid, shutdown), - ok. + stop_counter_process(Pid). on_query(_InstId, get_state, State) -> {ok, State}; @@ -247,6 +246,15 @@ spawn_counter_process(Name, Register) -> true = maybe_register(Name, Pid, Register), Pid. +stop_counter_process(Pid) -> + true = erlang:is_process_alive(Pid), + true = erlang:exit(Pid, shutdown), + receive + {'EXIT', Pid, shutdown} -> ok + after 5000 -> + {error, timeout} + end. + counter_loop() -> counter_loop(#{ counter => 0, diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index af72e86f9..f6d2b7ab4 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -72,115 +72,156 @@ t_check_config(_) -> {error, _} = emqx_resource:check_config(?TEST_RESOURCE, #{invalid => config}). t_create_remove(_) -> - {error, _} = emqx_resource:check_and_create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{unknown => test_resource} - ), + ?check_trace( + begin + ?assertMatch( + {error, _}, + emqx_resource:check_and_create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{unknown => test_resource} + ) + ), - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource} - ), + ?assertMatch( + {ok, _}, + emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource} + ) + ), - {ok, _} = emqx_resource:recreate( - ?ID, - ?TEST_RESOURCE, - #{name => test_resource}, - #{} - ), - {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), + ?assertMatch( + {ok, _}, + emqx_resource:recreate( + ?ID, + ?TEST_RESOURCE, + #{name => test_resource}, + #{} + ) + ), - ?assert(is_process_alive(Pid)), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), - ok = emqx_resource:remove(?ID), - {error, _} = emqx_resource:remove(?ID), + ?assert(is_process_alive(Pid)), - ?assertNot(is_process_alive(Pid)). + ?assertEqual(ok, emqx_resource:remove(?ID)), + ?assertMatch({error, _}, emqx_resource:remove(?ID)), + + ?assertNot(is_process_alive(Pid)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_create_remove_local(_) -> - {error, _} = emqx_resource:check_and_create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{unknown => test_resource} - ), + ?check_trace( + begin + ?assertMatch( + {error, _}, + emqx_resource:check_and_create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{unknown => test_resource} + ) + ), - {ok, _} = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource} - ), + ?assertMatch( + {ok, _}, + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource} + ) + ), - emqx_resource:recreate_local( - ?ID, - ?TEST_RESOURCE, - #{name => test_resource}, - #{} - ), - {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), + emqx_resource:recreate_local( + ?ID, + ?TEST_RESOURCE, + #{name => test_resource}, + #{} + ), - ?assert(is_process_alive(Pid)), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), - emqx_resource:set_resource_status_connecting(?ID), + ?assert(is_process_alive(Pid)), - emqx_resource:recreate_local( - ?ID, - ?TEST_RESOURCE, - #{name => test_resource}, - #{} - ), + emqx_resource:set_resource_status_connecting(?ID), - ok = emqx_resource:remove_local(?ID), - {error, _} = emqx_resource:remove_local(?ID), + emqx_resource:recreate_local( + ?ID, + ?TEST_RESOURCE, + #{name => test_resource}, + #{} + ), - ?assertMatch( - ?RESOURCE_ERROR(not_found), - emqx_resource:query(?ID, get_state) - ), - ?assertNot(is_process_alive(Pid)). + ?assertEqual(ok, emqx_resource:remove_local(?ID)), + ?assertMatch({error, _}, emqx_resource:remove_local(?ID)), + + ?assertMatch( + ?RESOURCE_ERROR(not_found), + emqx_resource:query(?ID, get_state) + ), + + ?assertNot(is_process_alive(Pid)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_do_not_start_after_created(_) -> - ct:pal("creating resource"), - {ok, _} = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{start_after_created => false} - ), - %% the resource should remain `disconnected` after created - timer:sleep(200), - ?assertMatch( - ?RESOURCE_ERROR(stopped), - emqx_resource:query(?ID, get_state) - ), - ?assertMatch( - {ok, _, #{status := stopped}}, - emqx_resource:get_instance(?ID) - ), + ?check_trace( + begin + ?assertMatch( + {ok, _}, + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{start_after_created => false} + ) + ), + %% the resource should remain `disconnected` after created + timer:sleep(200), + ?assertMatch( + ?RESOURCE_ERROR(stopped), + emqx_resource:query(?ID, get_state) + ), + ?assertMatch( + {ok, _, #{status := stopped}}, + emqx_resource:get_instance(?ID) + ), - %% start the resource manually.. - ct:pal("starting resource manually"), - ok = emqx_resource:start(?ID), - {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid)), + %% start the resource manually.. + ?assertEqual(ok, emqx_resource:start(?ID)), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), + ?assert(is_process_alive(Pid)), - %% restart the resource - ct:pal("restarting resource"), - ok = emqx_resource:restart(?ID), - ?assertNot(is_process_alive(Pid)), - {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid2)), + %% restart the resource + ?assertEqual(ok, emqx_resource:restart(?ID)), + ?assertNot(is_process_alive(Pid)), + {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state), + ?assert(is_process_alive(Pid2)), - ct:pal("removing resource"), - ok = emqx_resource:remove_local(?ID), + ?assertEqual(ok, emqx_resource:remove_local(?ID)), - ?assertNot(is_process_alive(Pid2)). + ?assertNot(is_process_alive(Pid2)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_query(_) -> {ok, _} = emqx_resource:create_local( @@ -771,153 +812,210 @@ t_query_counter_async_inflight_batch(_) -> ok = emqx_resource:remove_local(?ID). t_healthy_timeout(_) -> - {ok, _} = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => <<"bad_not_atom_name">>, register => true}, - %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later. - #{health_check_interval => 200} - ), - ?assertMatch( - {error, {resource_error, #{reason := timeout}}}, - emqx_resource:query(?ID, get_state, #{timeout => 1_000}) - ), - ?assertMatch({ok, _Group, #{status := disconnected}}, emqx_resource_manager:ets_lookup(?ID)), - ok = emqx_resource:remove_local(?ID). + ?check_trace( + begin + ?assertMatch( + {ok, _}, + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => <<"bad_not_atom_name">>, register => true}, + %% the ?TEST_RESOURCE always returns the `Mod:on_get_status/2` 300ms later. + #{health_check_interval => 200} + ) + ), + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, + emqx_resource:query(?ID, get_state, #{timeout => 1_000}) + ), + ?assertMatch( + {ok, _Group, #{status := disconnected}}, emqx_resource_manager:lookup(?ID) + ), + ?assertEqual(ok, emqx_resource:remove_local(?ID)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_healthy(_) -> - {ok, _} = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource} - ), - {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), - timer:sleep(300), - emqx_resource:set_resource_status_connecting(?ID), + ?check_trace( + begin + ?assertMatch( + {ok, _}, + emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource} + ) + ), + {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), + timer:sleep(300), + emqx_resource:set_resource_status_connecting(?ID), - {ok, connected} = emqx_resource:health_check(?ID), - ?assertMatch( - [#{status := connected}], - emqx_resource:list_instances_verbose() - ), + ?assertEqual({ok, connected}, emqx_resource:health_check(?ID)), + ?assertMatch( + [#{status := connected}], + emqx_resource:list_instances_verbose() + ), - erlang:exit(Pid, shutdown), + erlang:exit(Pid, shutdown), - ?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)), + ?assertEqual({ok, disconnected}, emqx_resource:health_check(?ID)), - ?assertMatch( - [#{status := disconnected}], - emqx_resource:list_instances_verbose() - ), + ?assertMatch( + [#{status := disconnected}], + emqx_resource:list_instances_verbose() + ), - ok = emqx_resource:remove_local(?ID). + ?assertEqual(ok, emqx_resource:remove_local(?ID)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_stop_start(_) -> - {error, _} = emqx_resource:check_and_create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{unknown => test_resource} - ), + ?check_trace( + begin + ?assertMatch( + {error, _}, + emqx_resource:check_and_create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{unknown => test_resource} + ) + ), - {ok, _} = emqx_resource:check_and_create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{<<"name">> => <<"test_resource">>} - ), + ?assertMatch( + {ok, _}, + emqx_resource:check_and_create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{<<"name">> => <<"test_resource">>} + ) + ), - %% add some metrics to test their persistence - WorkerID0 = <<"worker:0">>, - WorkerID1 = <<"worker:1">>, - emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2), - emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3), - ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), + %% add some metrics to test their persistence + WorkerID0 = <<"worker:0">>, + WorkerID1 = <<"worker:1">>, + emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2), + emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3), + ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), - {ok, _} = emqx_resource:check_and_recreate( - ?ID, - ?TEST_RESOURCE, - #{<<"name">> => <<"test_resource">>}, - #{} - ), + ?assertMatch( + {ok, _}, + emqx_resource:check_and_recreate( + ?ID, + ?TEST_RESOURCE, + #{<<"name">> => <<"test_resource">>}, + #{} + ) + ), - {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid0)), + ?assert(is_process_alive(Pid0)), - %% metrics are reset when recreating - %% depending on timing, might show the request we just did. - ct:sleep(500), - ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), + %% metrics are reset when recreating + %% depending on timing, might show the request we just did. + ct:sleep(500), + ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), - ok = emqx_resource:stop(?ID), + ok = emqx_resource:stop(?ID), - ?assertNot(is_process_alive(Pid0)), + ?assertNot(is_process_alive(Pid0)), - ?assertMatch( - ?RESOURCE_ERROR(stopped), - emqx_resource:query(?ID, get_state) - ), + ?assertMatch( + ?RESOURCE_ERROR(stopped), + emqx_resource:query(?ID, get_state) + ), - ok = emqx_resource:restart(?ID), - timer:sleep(300), + ?assertEqual(ok, emqx_resource:restart(?ID)), + timer:sleep(300), - {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid1)), + ?assert(is_process_alive(Pid1)), - %% now stop while resetting the metrics - ct:sleep(500), - emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1), - emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4), - ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), - ok = emqx_resource:stop(?ID), - ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), + %% now stop while resetting the metrics + ct:sleep(500), + emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1), + emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4), + ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), + ?assertEqual(ok, emqx_resource:stop(?ID)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)) + end, - ok. + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_stop_start_local(_) -> - {error, _} = emqx_resource:check_and_create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{unknown => test_resource} - ), + ?check_trace( + begin + ?assertMatch( + {error, _}, + emqx_resource:check_and_create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{unknown => test_resource} + ) + ), - {ok, _} = emqx_resource:check_and_create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{<<"name">> => <<"test_resource">>} - ), + ?assertMatch( + {ok, _}, + emqx_resource:check_and_create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{<<"name">> => <<"test_resource">>} + ) + ), - {ok, _} = emqx_resource:check_and_recreate_local( - ?ID, - ?TEST_RESOURCE, - #{<<"name">> => <<"test_resource">>}, - #{} - ), + ?assertMatch( + {ok, _}, + emqx_resource:check_and_recreate_local( + ?ID, + ?TEST_RESOURCE, + #{<<"name">> => <<"test_resource">>}, + #{} + ) + ), - {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid0}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid0)), + ?assert(is_process_alive(Pid0)), - ok = emqx_resource:stop(?ID), + ?assertEqual(ok, emqx_resource:stop(?ID)), - ?assertNot(is_process_alive(Pid0)), + ?assertNot(is_process_alive(Pid0)), - ?assertMatch( - ?RESOURCE_ERROR(stopped), - emqx_resource:query(?ID, get_state) - ), + ?assertMatch( + ?RESOURCE_ERROR(stopped), + emqx_resource:query(?ID, get_state) + ), - ok = emqx_resource:restart(?ID), + ?assertEqual(ok, emqx_resource:restart(?ID)), - {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), + {ok, #{pid := Pid1}} = emqx_resource:query(?ID, get_state), - ?assert(is_process_alive(Pid1)). + ?assert(is_process_alive(Pid1)) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end + ). t_list_filter(_) -> {ok, _} = emqx_resource:create_local( @@ -1031,16 +1129,24 @@ t_auto_retry(_) -> ?assertEqual(ok, Res). t_health_check_disconnected(_) -> - _ = emqx_resource:create_local( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource, create_error => true}, - #{auto_retry_interval => 100} - ), - ?assertEqual( - {ok, disconnected}, - emqx_resource:health_check(?ID) + ?check_trace( + begin + _ = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, create_error => true}, + #{auto_retry_interval => 100} + ), + ?assertEqual( + {ok, disconnected}, + emqx_resource:health_check(?ID) + ) + end, + fun(Trace) -> + ?assertEqual([], ?of_kind("inconsistent_state", Trace)), + ?assertEqual([], ?of_kind("inconsistent_cache", Trace)) + end ). t_unblock_only_required_buffer_workers(_) -> From 29907875bf11602e5edfa282c7a8d4ed8ca73839 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 14 Mar 2023 13:30:24 +0300 Subject: [PATCH 6/7] test(bufworker): set `batch_time` for batch-related testcases By default it's `0` since e9d3fc51. This made a couple of tests prone to flapping. --- .../emqx_resource/test/emqx_resource_SUITE.erl | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index f6d2b7ab4..ff7e1d347 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -263,7 +263,11 @@ t_batch_query_counter(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{batch_size => BatchSize, query_mode => sync} + #{ + batch_size => BatchSize, + batch_time => 100, + query_mode => sync + } ), ?check_trace( @@ -622,6 +626,7 @@ t_query_counter_async_inflight_batch(_) -> #{ query_mode => async, batch_size => BatchSize, + batch_time => 100, async_inflight_window => WindowSize, worker_pool_size => 1, resume_interval => 300 @@ -1157,7 +1162,8 @@ t_unblock_only_required_buffer_workers(_) -> #{name => test_resource}, #{ query_mode => async, - batch_size => 5 + batch_size => 5, + batch_time => 100 } ), lists:foreach( @@ -1171,7 +1177,8 @@ t_unblock_only_required_buffer_workers(_) -> #{name => test_resource}, #{ query_mode => async, - batch_size => 5 + batch_size => 5, + batch_time => 100 } ), %% creation of `?ID1` should not have unblocked `?ID`'s buffer workers @@ -1202,6 +1209,7 @@ t_retry_batch(_Config) -> #{ query_mode => async, batch_size => 5, + batch_time => 100, worker_pool_size => 1, resume_interval => 1_000 } @@ -1571,7 +1579,6 @@ t_retry_async_inflight_full(_Config) -> query_mode => async, async_inflight_window => AsyncInflightWindow, batch_size => 1, - batch_time => 20, worker_pool_size => 1, resume_interval => ResumeInterval } @@ -2086,7 +2093,6 @@ t_expiration_async_after_reply(_Config) -> #{ query_mode => async, batch_size => 1, - batch_time => 100, worker_pool_size => 1, resume_interval => 1_000 } @@ -2309,7 +2315,6 @@ t_expiration_retry(_Config) -> #{ query_mode => sync, batch_size => 1, - batch_time => 100, worker_pool_size => 1, resume_interval => 300 } @@ -2499,7 +2504,6 @@ t_recursive_flush(_Config) -> #{ query_mode => async, batch_size => 1, - batch_time => 10_000, worker_pool_size => 1 } ), From a9bc8a4464f42c092446a8c7d7a20cf5db0c5ab5 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 14 Mar 2023 14:01:16 +0300 Subject: [PATCH 7/7] =?UTF-8?q?refactor(resman):=20rename=20`ets=5Flookup`?= =?UTF-8?q?=20=E2=86=92=20`lookup=5Fcached`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit That way we hide the impementation details + the interface becomes cleaner and more obvious. --- apps/emqx_resource/src/emqx_resource.erl | 4 ++-- .../src/emqx_resource_buffer_worker.erl | 2 +- apps/emqx_resource/src/emqx_resource_manager.erl | 16 ++++++++-------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 57d56b339..1ccb5ca71 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -260,7 +260,7 @@ query(ResId, Request) -> -spec query(resource_id(), Request :: term(), query_opts()) -> Result :: term(). query(ResId, Request, Opts) -> - case emqx_resource_manager:ets_lookup(ResId) of + case emqx_resource_manager:lookup_cached(ResId) of {ok, _Group, #{query_mode := QM, mod := Module}} -> IsBufferSupported = is_buffer_supported(Module), case {IsBufferSupported, QM} of @@ -311,7 +311,7 @@ set_resource_status_connecting(ResId) -> -spec get_instance(resource_id()) -> {ok, resource_group(), resource_data()} | {error, Reason :: term()}. get_instance(ResId) -> - emqx_resource_manager:ets_lookup(ResId, [metrics]). + emqx_resource_manager:lookup_cached(ResId, [metrics]). -spec fetch_creation_opts(map()) -> creation_opts(). fetch_creation_opts(Opts) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 711833963..8bfd77e61 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -885,7 +885,7 @@ handle_async_worker_down(Data0, Pid) -> call_query(QM0, Id, Index, Ref, Query, QueryOpts) -> ?tp(call_query_enter, #{id => Id, query => Query, query_mode => QM0}), - case emqx_resource_manager:ets_lookup(Id) of + case emqx_resource_manager:lookup_cached(Id) of {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, Resource} -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 0983dff8d..40f9fe1ab 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -36,8 +36,8 @@ lookup/1, list_all/0, list_group/1, - ets_lookup/1, - ets_lookup/2, + lookup_cached/1, + lookup_cached/2, get_metrics/1, reset_metrics/1 ]). @@ -231,21 +231,21 @@ set_resource_status_connecting(ResId) -> -spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. lookup(ResId) -> case safe_call(ResId, lookup, ?T_LOOKUP) of - {error, timeout} -> ets_lookup(ResId, [metrics]); + {error, timeout} -> lookup_cached(ResId, [metrics]); Result -> Result end. %% @doc Lookup the group and data of a resource from the cache --spec ets_lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -ets_lookup(ResId) -> - ets_lookup(ResId, []). +-spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. +lookup_cached(ResId) -> + lookup_cached(ResId, []). %% @doc Lookup the group and data of a resource from the cache --spec ets_lookup(resource_id(), [Option]) -> +-spec lookup_cached(resource_id(), [Option]) -> {ok, resource_group(), resource_data()} | {error, not_found} when Option :: metrics. -ets_lookup(ResId, Options) -> +lookup_cached(ResId, Options) -> NeedMetrics = lists:member(metrics, Options), case read_cache(ResId) of {Group, Data} when NeedMetrics ->