From 2d01726b223dbcf8c33172bed3ae9fd873d77a10 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 13 Oct 2022 15:27:35 -0300 Subject: [PATCH] fix: account calls when resource is not connected as matched --- .../test/emqx_bridge_mqtt_SUITE.erl | 46 ++++++++++++++----- .../src/emqx_resource_worker.erl | 3 ++ 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 819556d81..84152efc6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -23,6 +23,7 @@ -include("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include("emqx_dashboard/include/emqx_dashboard.hrl"). %% output functions @@ -511,15 +512,15 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> %% we now test if the bridge works as expected LocalTopic = <<"local_topic/1">>, RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, - Payload = <<"hello">>, + Payload0 = <<"hello">>, emqx:subscribe(RemoteTopic), timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. - emqx:publish(emqx_message:make(LocalTopic, Payload)), + emqx:publish(emqx_message:make(LocalTopic, Payload0)), %% we should receive a message on the "remote" broker, with specified topic - assert_mqtt_msg_received(RemoteTopic, Payload), + assert_mqtt_msg_received(RemoteTopic, Payload0), %% verify the metrics of the bridge {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), @@ -543,18 +544,40 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> ct:sleep(1500), %% PUBLISH 2 messages to the 'local' broker, the message should - emqx:publish(emqx_message:make(LocalTopic, Payload)), - emqx:publish(emqx_message:make(LocalTopic, Payload)), + ok = snabbkaffe:start_trace(), + {ok, SRef} = + snabbkaffe:subscribe( + fun + ( + #{ + ?snk_kind := call_query_enter, + query := {query, _From, {send_message, #{}}, _Sent} + } + ) -> + true; + (_) -> + false + end, + _NEvents = 2, + _Timeout = 1_000 + ), + Payload1 = <<"hello2">>, + Payload2 = <<"hello3">>, + emqx:publish(emqx_message:make(LocalTopic, Payload1)), + emqx:publish(emqx_message:make(LocalTopic, Payload2)), + {ok, _} = snabbkaffe:receive_events(SRef), + ok = snabbkaffe:stop(), %% verify the metrics of the bridge, the message should be queued {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + %% matched >= 3 because of possible retries. ?assertMatch( #{ <<"status">> := Status, <<"metrics">> := #{ - <<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 + <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 } - } when Status == <<"connected">> orelse Status == <<"connecting">>, + } when Matched >= 3 andalso (Status == <<"connected">> orelse Status == <<"connecting">>), jsx:decode(BridgeStr1) ), @@ -563,22 +586,23 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> timer:sleep(1500), %% verify the metrics of the bridge, the 2 queued messages should have been sent {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + %% matched >= 3 because of possible retries. ?assertMatch( #{ <<"status">> := <<"connected">>, <<"metrics">> := #{ - <<"matched">> := 3, + <<"matched">> := Matched, <<"success">> := 3, <<"failed">> := 0, <<"queuing">> := 0, <<"retried">> := _ } - }, + } when Matched >= 3, jsx:decode(BridgeStr2) ), %% also verify the 2 messages have been sent to the remote broker - assert_mqtt_msg_received(RemoteTopic, Payload), - assert_mqtt_msg_received(RemoteTopic, Payload), + assert_mqtt_msg_received(RemoteTopic, Payload1), + assert_mqtt_msg_received(RemoteTopic, Payload2), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index b309299f6..a36cb15b7 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -410,6 +410,7 @@ handle_query_result(Id, Result, HasSent, BlockWorker) -> BlockWorker. call_query(QM0, Id, Query, QueryOpts) -> + ?tp(call_query_enter, #{id => Id, query => Query}), case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> QM = @@ -421,8 +422,10 @@ call_query(QM0, Id, Query, QueryOpts) -> emqx_resource_metrics:matched_inc(Id), apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> + emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> + emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(not_connected, "resource not connected"); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found")