diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 622dbf464..7ee384b84 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -647,8 +647,8 @@ reset_metrics(ConfRootKey, Type, Name) -> reset_metrics_helper(_ConfRootKey, _Type, _Name, #{enable := false}) -> ok; reset_metrics_helper(ConfRootKey, BridgeV2Type, BridgeName, #{connector := ConnectorName}) -> - BridgeV2Id = id_with_root_name(ConfRootKey, BridgeV2Type, BridgeName, ConnectorName), - ok = emqx_metrics_worker:reset_metrics(?RES_METRICS, BridgeV2Id); + ResourceId = id_with_root_name(ConfRootKey, BridgeV2Type, BridgeName, ConnectorName), + emqx_resource:reset_metrics(ResourceId); reset_metrics_helper(_, _, _, _) -> {error, not_found}. diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index fc9c9573f..4f98baebf 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -286,6 +286,10 @@ init_mocks() -> ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId) end), + meck:expect(?CONNECTOR_IMPL, on_query_async, fun(_ResId, _Req, ReplyFunAndArgs, _ConnState) -> + emqx_resource:apply_reply_fun(ReplyFunAndArgs, ok), + {ok, self()} + end), ok. clear_resources() -> @@ -378,6 +382,9 @@ enable_path(Enable, BridgeID) -> publish_message(Topic, Body, Config) -> Node = ?config(node, Config), + publish_message(Topic, Body, Node, Config). + +publish_message(Topic, Body, Node, _Config) -> erpc:call(Node, emqx, publish, [emqx_message:make(Topic, Body)]). update_config(Path, Value, Config) -> @@ -524,6 +531,17 @@ get_common_values(Kind, FnName) -> } end. +maybe_get_other_node(Config) -> + %% In the single node test group, this simply returns the lone node. Otherwise, it'll + %% return a node that's not the primary one that receives API calls. + PrimaryNode = ?config(node, Config), + case proplists:get_value(cluster_nodes, Config, []) -- [PrimaryNode] of + [] -> + PrimaryNode; + [OtherNode | _] -> + OtherNode + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -1385,7 +1403,8 @@ t_reset_metrics(Config) -> ActionID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ActionName), Body = <<"my msg">>, - _ = publish_message(?MQTT_LOCAL_TOPIC, Body, Config), + OtherNode = maybe_get_other_node(Config), + _ = publish_message(?MQTT_LOCAL_TOPIC, Body, OtherNode, Config), ?retry( _Sleep0 = 200, _Retries0 = 20, @@ -1400,16 +1419,30 @@ t_reset_metrics(Config) -> {ok, 204, <<>>} = request(put, uri([?ACTIONS_ROOT, ActionID, "metrics", "reset"]), Config), - ?retry( + Res = ?retry( _Sleep0 = 200, _Retries0 = 20, - ?assertMatch( - {ok, 200, #{ - <<"metrics">> := #{<<"matched">> := 0}, - <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] - }}, - request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config) - ) + begin + Res0 = request_json(get, uri([?ACTIONS_ROOT, ActionID, "metrics"]), Config), + ?assertMatch( + {ok, 200, #{ + <<"metrics">> := #{<<"matched">> := 0}, + <<"node_metrics">> := [#{<<"metrics">> := #{}} | _] + }}, + Res0 + ), + Res0 + end + ), + {ok, 200, #{<<"node_metrics">> := NodeMetrics}} = Res, + ?assert( + lists:all( + fun(#{<<"metrics">> := #{<<"matched">> := Matched}}) -> + Matched == 0 + end, + NodeMetrics + ), + #{node_metrics => NodeMetrics} ), ok.