From f0395be3830a0baeeba43c7674e17733f875b47b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 31 Jan 2023 23:18:46 +0300 Subject: [PATCH 1/5] refactor(mqtt-worker): avoid unnecessary abstraction So the code is easier to follow. --- .../src/emqx_connector_mqtt.erl | 5 +- .../src/mqtt/emqx_connector_mqtt_worker.erl | 78 ++++++++++--------- 2 files changed, 45 insertions(+), 38 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 462bac0b8..c1a051836 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -198,8 +198,9 @@ on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of ok -> - % TODO this is racy - {ok, emqx_connector_mqtt_worker:pid(InstanceId)}; + ok; + {ok, Pid} -> + {ok, Pid}; {error, Reason} -> classify_error(Reason) end. diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 85261a063..9fac20153 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -67,8 +67,7 @@ %% APIs -export([ start_link/2, - stop/1, - pid/1 + stop/1 ]). %% management APIs @@ -175,7 +174,7 @@ mk_client_event_handler(undefined, _Opts) -> connect(Name) -> #{subscriptions := Subscriptions} = get_config(Name), - case emqtt:connect(pid(Name)) of + case emqtt:connect(get_pid(Name)) of {ok, Properties} -> case subscribe_remote_topics(Name, Subscriptions) of ok -> @@ -206,37 +205,28 @@ subscribe_remote_topics(_Ref, undefined) -> stop(Ref) -> emqtt:stop(ref(Ref)). -pid(Name) -> - gproc:lookup_pid(?NAME(Name)). - status(Ref) -> - trycall( - fun() -> - Info = emqtt:info(ref(Ref)), - case proplists:get_value(socket, Info) of - Socket when Socket /= undefined -> - connected; - undefined -> - connecting - end - end, - #{noproc => disconnected} - ). + try + Info = emqtt:info(ref(Ref)), + case proplists:get_value(socket, Info) of + Socket when Socket /= undefined -> + connected; + undefined -> + connecting + end + catch + exit:{noproc, _} -> + disconnected + end. ping(Ref) -> emqtt:ping(ref(Ref)). send_to_remote(Name, MsgIn) -> - trycall( - fun() -> do_send(Name, export_msg(Name, MsgIn)) end, - #{ - badarg => {error, disconnected}, - noproc => {error, disconnected} - } - ). + trycall(fun() -> do_send(Name, export_msg(Name, MsgIn)) end). do_send(Name, {true, Msg}) -> - case emqtt:publish(pid(Name), Msg) of + case emqtt:publish(get_pid(Name), Msg) of ok -> ok; {ok, #{reason_code := RC}} when @@ -263,13 +253,16 @@ do_send(_Name, false) -> ok. send_to_remote_async(Name, MsgIn, Callback) -> - trycall( - fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end, - #{badarg => {error, disconnected}} - ). + trycall(fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end). do_send_async(Name, {true, Msg}, Callback) -> - emqtt:publish_async(pid(Name), Msg, _Timeout = infinity, Callback); + Pid = get_pid(Name), + case emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback) of + ok -> + {ok, Pid}; + {error, _} = Error -> + Error + end; do_send_async(_Name, false, _Callback) -> ok. @@ -278,14 +271,14 @@ ref(Pid) when is_pid(Pid) -> ref(Term) -> ?REF(Term). -trycall(Fun, Else) -> +trycall(Fun) -> try Fun() catch - error:badarg -> - maps:get(badarg, Else); + throw:noproc -> + {error, disconnected}; exit:{noproc, _} -> - maps:get(noproc, Else) + {error, disconnected} end. format_mountpoint(undefined) -> @@ -325,8 +318,21 @@ pre_process_conf(Key, Conf) -> Conf#{Key => Val} end. +get_pid(Name) -> + case gproc:where(?NAME(Name)) of + Pid when is_pid(Pid) -> + Pid; + undefined -> + throw(noproc) + end. + get_config(Name) -> - gproc:lookup_value(?NAME(Name)). + try + gproc:lookup_value(?NAME(Name)) + catch + error:badarg -> + throw(noproc) + end. export_msg(Name, Msg) -> case get_config(Name) of From ad88938d34f322eaa298b217587a0ed243cfe73b Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 1 Feb 2023 16:22:19 +0300 Subject: [PATCH 2/5] refactor: reuse some parts of test code for brewity --- .../test/emqx_bridge_mqtt_SUITE.erl | 456 +++++------------- 1 file changed, 131 insertions(+), 325 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 6e3bf77ee..52084196a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -32,7 +32,6 @@ -define(BRIDGE_CONF_DEFAULT, <<"bridges: {}">>). -define(TYPE_MQTT, <<"mqtt">>). --define(NAME_MQTT, <<"my_mqtt_bridge">>). -define(BRIDGE_NAME_INGRESS, <<"ingress_mqtt_bridge">>). -define(BRIDGE_NAME_EGRESS, <<"egress_mqtt_bridge">>). @@ -98,6 +97,24 @@ } }). +-define(assertMetrics(Pat, BridgeID), + ?assertMetrics(Pat, true, BridgeID) +). +-define(assertMetrics(Pat, Guard, BridgeID), + ?assertMatch( + #{ + <<"metrics">> := Pat, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := Pat + } + ] + } when Guard, + request_bridge_metrics(BridgeID) + ) +). + inspect(Selected, _Envs, _Args) -> persistent_term:put(?MODULE, #{inspect => Selected}). @@ -176,7 +193,7 @@ t_mqtt_conn_bridge_ingress(_) -> {ok, 201, Bridge} = request( post, uri(["bridges"]), - ?SERVER_CONF(User1)#{ + ServerConf = ?SERVER_CONF(User1)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_INGRESS, <<"ingress">> => ?INGRESS_CONF @@ -186,6 +203,7 @@ t_mqtt_conn_bridge_ingress(_) -> <<"type">> := ?TYPE_MQTT, <<"name">> := ?BRIDGE_NAME_INGRESS } = jsx:decode(Bridge), + BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), %% we now test if the bridge works as expected @@ -198,34 +216,12 @@ t_mqtt_conn_bridge_ingress(_) -> %% the remote broker is also the local one. emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic - ?assert( - receive - {deliver, LocalTopic, #message{payload = Payload}} -> - ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), + assert_mqtt_msg_received(LocalTopic, Payload), %% verify the metrics of the bridge - {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []), - ?assertMatch( - #{ - <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, - <<"node_metrics">> := - [ - #{ - <<"node">> := _, - <<"metrics">> := - #{<<"matched">> := 0, <<"received">> := 1} - } - ] - }, - jsx:decode(BridgeMetricsStr) + ?assertMetrics( + #{<<"matched">> := 0, <<"received">> := 1}, + BridgeIDIngress ), %% delete the bridge @@ -236,21 +232,13 @@ t_mqtt_conn_bridge_ingress(_) -> t_mqtt_conn_bridge_ingress_no_payload_template(_) -> User1 = <<"user1">>, - %% create an MQTT bridge, using POST - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), + BridgeIDIngress = create_bridge( ?SERVER_CONF(User1)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_INGRESS, <<"ingress">> => ?INGRESS_CONF_NO_PAYLOAD_TEMPLATE } ), - #{ - <<"type">> := ?TYPE_MQTT, - <<"name">> := ?BRIDGE_NAME_INGRESS - } = jsx:decode(Bridge), - BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), %% we now test if the bridge works as expected RemoteTopic = <>, @@ -262,40 +250,13 @@ t_mqtt_conn_bridge_ingress_no_payload_template(_) -> %% the remote broker is also the local one. emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic - ?assert( - receive - {deliver, LocalTopic, #message{payload = MapMsg}} -> - ct:pal("local broker got message: ~p on topic ~p", [MapMsg, LocalTopic]), - %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here. - case jsx:decode(MapMsg) of - #{<<"payload">> := Payload} -> - true; - _ -> - false - end; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), + Msg = assert_mqtt_msg_received(LocalTopic), + ?assertMatch(#{<<"payload">> := Payload}, jsx:decode(Msg#message.payload)), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []), - ?assertMatch( - #{ - <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, - <<"node_metrics">> := - [ - #{ - <<"node">> := _, - <<"metrics">> := - #{<<"matched">> := 0, <<"received">> := 1} - } - ] - }, - jsx:decode(BridgeStr) + ?assertMetrics( + #{<<"matched">> := 0, <<"received">> := 1}, + BridgeIDIngress ), %% delete the bridge @@ -307,22 +268,15 @@ t_mqtt_conn_bridge_ingress_no_payload_template(_) -> t_mqtt_conn_bridge_egress(_) -> %% then we add a mqtt connector, using POST User1 = <<"user1">>, - - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), + BridgeIDEgress = create_bridge( ?SERVER_CONF(User1)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF } ), - #{ - <<"type">> := ?TYPE_MQTT, - <<"name">> := ?BRIDGE_NAME_EGRESS - } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, @@ -334,36 +288,14 @@ t_mqtt_conn_bridge_egress(_) -> emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic - ?assert( - receive - {deliver, RemoteTopic, #message{payload = Payload, from = From}} -> - ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), - Size = byte_size(ResourceID), - ?assertMatch(<>, From), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), + Msg = assert_mqtt_msg_received(RemoteTopic, Payload), + Size = byte_size(ResourceID), + ?assertMatch(<>, Msg#message.from), %% verify the metrics of the bridge - {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), - ?assertMatch( - #{ - <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, - <<"node_metrics">> := - [ - #{ - <<"node">> := _, - <<"metrics">> := - #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0} - } - ] - }, - jsx:decode(BridgeMetricsStr) + ?assertMetrics( + #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, + BridgeIDEgress ), %% delete the bridge @@ -375,21 +307,15 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> %% then we add a mqtt connector, using POST User1 = <<"user1">>, - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), + BridgeIDEgress = create_bridge( ?SERVER_CONF(User1)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF_NO_PAYLOAD_TEMPLATE } ), - #{ - <<"type">> := ?TYPE_MQTT, - <<"name">> := ?BRIDGE_NAME_EGRESS - } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, @@ -401,42 +327,15 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic - ?assert( - receive - {deliver, RemoteTopic, #message{payload = MapMsg, from = From}} -> - ct:pal("local broker got message: ~p on topic ~p", [MapMsg, RemoteTopic]), - %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here. - Size = byte_size(ResourceID), - ?assertMatch(<>, From), - case jsx:decode(MapMsg) of - #{<<"payload">> := Payload} -> - true; - _ -> - false - end; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), + Msg = assert_mqtt_msg_received(RemoteTopic), + %% the MapMsg is all fields outputed by Rule-Engine. it's a binary coded json here. + ?assertMatch(<>, Msg#message.from), + ?assertMatch(#{<<"payload">> := Payload}, jsx:decode(Msg#message.payload)), %% verify the metrics of the bridge - {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), - ?assertMatch( - #{ - <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, - <<"node_metrics">> := - [ - #{ - <<"node">> := _, - <<"metrics">> := - #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0} - } - ] - }, - jsx:decode(BridgeStr) + ?assertMetrics( + #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, + BridgeIDEgress ), %% delete the bridge @@ -447,9 +346,7 @@ t_mqtt_conn_bridge_egress_no_payload_template(_) -> t_egress_custom_clientid_prefix(_Config) -> User1 = <<"user1">>, - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), + BridgeIDEgress = create_bridge( ?SERVER_CONF(User1)#{ <<"clientid_prefix">> => <<"my-custom-prefix">>, <<"type">> => ?TYPE_MQTT, @@ -457,11 +354,6 @@ t_egress_custom_clientid_prefix(_Config) -> <<"egress">> => ?EGRESS_CONF } ), - #{ - <<"type">> := ?TYPE_MQTT, - <<"name">> := ?BRIDGE_NAME_EGRESS - } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), ResourceID = emqx_bridge_resource:resource_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), LocalTopic = <>, RemoteTopic = <>, @@ -470,58 +362,36 @@ t_egress_custom_clientid_prefix(_Config) -> timer:sleep(100), emqx:publish(emqx_message:make(LocalTopic, Payload)), - receive - {deliver, RemoteTopic, #message{from = From}} -> - Size = byte_size(ResourceID), - ?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, From), - ok - after 1000 -> - ct:fail("should have published message") - end, + Msg = assert_mqtt_msg_received(RemoteTopic, Payload), + Size = byte_size(ResourceID), + ?assertMatch(<<"my-custom-prefix:", _ResouceID:Size/binary, _/binary>>, Msg#message.from), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), ok. t_mqtt_conn_bridge_ingress_and_egress(_) -> User1 = <<"user1">>, - %% create an MQTT bridge, using POST - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), + BridgeIDIngress = create_bridge( ?SERVER_CONF(User1)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_INGRESS, <<"ingress">> => ?INGRESS_CONF } ), - - #{ - <<"type">> := ?TYPE_MQTT, - <<"name">> := ?BRIDGE_NAME_INGRESS - } = jsx:decode(Bridge), - BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), - {ok, 201, Bridge2} = request( - post, - uri(["bridges"]), + BridgeIDEgress = create_bridge( ?SERVER_CONF(User1)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF } ), - #{ - <<"type">> := ?TYPE_MQTT, - <<"name">> := ?BRIDGE_NAME_EGRESS - } = jsx:decode(Bridge2), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, Payload = <<"hello">>, emqx:subscribe(RemoteTopic), - {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), #{ <<"metrics">> := #{ <<"matched">> := CntMatched1, <<"success">> := CntSuccess1, <<"failed">> := 0 @@ -538,29 +408,17 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> } } ] - } = jsx:decode(BridgeMetricsStr1), + } = request_bridge_metrics(BridgeIDEgress), timer:sleep(100), %% PUBLISH a message to the 'local' broker, as we have only one broker, %% the remote broker is also the local one. emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic - ?assert( - receive - {deliver, RemoteTopic, #message{payload = Payload}} -> - ct:pal("local broker got message: ~p on topic ~p", [Payload, RemoteTopic]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), + assert_mqtt_msg_received(RemoteTopic, Payload), %% verify the metrics of the bridge timer:sleep(1000), - {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), #{ <<"metrics">> := #{ <<"matched">> := CntMatched2, <<"success">> := CntSuccess2, <<"failed">> := 0 @@ -577,7 +435,7 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> } } ] - } = jsx:decode(BridgeMetricsStr2), + } = request_bridge_metrics(BridgeIDEgress), ?assertEqual(CntMatched2, CntMatched1 + 1), ?assertEqual(CntSuccess2, CntSuccess1 + 1), ?assertEqual(NodeCntMatched2, NodeCntMatched1 + 1), @@ -590,16 +448,13 @@ t_mqtt_conn_bridge_ingress_and_egress(_) -> ok. t_ingress_mqtt_bridge_with_rules(_) -> - {ok, 201, _} = request( - post, - uri(["bridges"]), + BridgeIDIngress = create_bridge( ?SERVER_CONF(<<"user1">>)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_INGRESS, <<"ingress">> => ?INGRESS_CONF } ), - BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), {ok, 201, Rule} = request( post, @@ -624,18 +479,7 @@ t_ingress_mqtt_bridge_with_rules(_) -> %% the remote broker is also the local one. emqx:publish(emqx_message:make(RemoteTopic, Payload)), %% we should receive a message on the local broker, with specified topic - ?assert( - receive - {deliver, LocalTopic, #message{payload = Payload}} -> - ct:pal("local broker got message: ~p on topic ~p", [Payload, LocalTopic]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), + assert_mqtt_msg_received(LocalTopic, Payload), %% and also the rule should be matched, with matched + 1: {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), {ok, 200, Metrics} = request(get, uri(["rules", RuleId, "metrics"]), []), @@ -680,37 +524,22 @@ t_ingress_mqtt_bridge_with_rules(_) -> ), %% verify the metrics of the bridge - {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDIngress, "metrics"]), []), - ?assertMatch( - #{ - <<"metrics">> := #{<<"matched">> := 0, <<"received">> := 1}, - <<"node_metrics">> := - [ - #{ - <<"node">> := _, - <<"metrics">> := - #{<<"matched">> := 0, <<"received">> := 1} - } - ] - }, - jsx:decode(BridgeMetricsStr) + ?assertMetrics( + #{<<"matched">> := 0, <<"received">> := 1}, + BridgeIDIngress ), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDIngress]), []). t_egress_mqtt_bridge_with_rules(_) -> - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), + BridgeIDEgress = create_bridge( ?SERVER_CONF(<<"user1">>)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF } ), - #{<<"type">> := ?TYPE_MQTT, <<"name">> := ?BRIDGE_NAME_EGRESS} = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), {ok, 201, Rule} = request( post, @@ -734,18 +563,7 @@ t_egress_mqtt_bridge_with_rules(_) -> %% the remote broker is also the local one. emqx:publish(emqx_message:make(LocalTopic, Payload)), %% we should receive a message on the "remote" broker, with specified topic - ?assert( - receive - {deliver, RemoteTopic, #message{payload = Payload}} -> - ct:pal("remote broker got message: ~p on topic ~p", [Payload, RemoteTopic]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), + assert_mqtt_msg_received(RemoteTopic, Payload), emqx:unsubscribe(RemoteTopic), %% PUBLISH a message to the rule. @@ -780,35 +598,12 @@ t_egress_mqtt_bridge_with_rules(_) -> ), %% we should receive a message on the "remote" broker, with specified topic - ?assert( - receive - {deliver, RemoteTopic2, #message{payload = Payload2}} -> - ct:pal("remote broker got message: ~p on topic ~p", [Payload2, RemoteTopic2]), - true; - Msg -> - ct:pal("Msg: ~p", [Msg]), - false - after 100 -> - false - end - ), + assert_mqtt_msg_received(RemoteTopic2, Payload2), %% verify the metrics of the bridge - {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), - ?assertMatch( - #{ - <<"metrics">> := #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0}, - <<"node_metrics">> := - [ - #{ - <<"node">> := _, - <<"metrics">> := #{ - <<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0 - } - } - ] - }, - jsx:decode(BridgeMetricsStr) + ?assertMetrics( + #{<<"matched">> := 2, <<"success">> := 2, <<"failed">> := 0}, + BridgeIDEgress ), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), @@ -817,10 +612,7 @@ t_egress_mqtt_bridge_with_rules(_) -> t_mqtt_conn_bridge_egress_reconnect(_) -> %% then we add a mqtt connector, using POST User1 = <<"user1">>, - - {ok, 201, Bridge} = request( - post, - uri(["bridges"]), + BridgeIDEgress = create_bridge( ?SERVER_CONF(User1)#{ <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_EGRESS, @@ -837,17 +629,14 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> } } ), - #{ - <<"type">> := ?TYPE_MQTT, - <<"name">> := ?BRIDGE_NAME_EGRESS - } = jsx:decode(Bridge), - BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + on_exit(fun() -> %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok end), + %% we now test if the bridge works as expected LocalTopic = <>, RemoteTopic = <>, @@ -862,20 +651,9 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> assert_mqtt_msg_received(RemoteTopic, Payload0), %% verify the metrics of the bridge - {ok, 200, BridgeMetricsStr} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), - ?assertMatch( - #{ - <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, - <<"node_metrics">> := - [ - #{ - <<"node">> := _, - <<"metrics">> := - #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0} - } - ] - }, - jsx:decode(BridgeMetricsStr) + ?assertMetrics( + #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, + BridgeIDEgress ), %% stop the listener 1883 to make the bridge disconnected @@ -906,63 +684,91 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> {ok, _} = snabbkaffe:receive_events(SRef), %% verify the metrics of the bridge, the message should be queued - {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, BridgeMetricsStr1} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), - Decoded1 = jsx:decode(BridgeStr1), - DecodedMetrics1 = jsx:decode(BridgeMetricsStr1), ?assertMatch( - Status when (Status == <<"connecting">> orelse Status == <<"disconnected">>), - maps:get(<<"status">>, Decoded1) + #{<<"status">> := Status} when Status == <<"connecting">>; Status == <<"disconnected">>, + request_bridge(BridgeIDEgress) ), %% matched >= 3 because of possible retries. - ?assertMatch( + ?assertMetrics( #{ <<"matched">> := Matched, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := Queuing, <<"inflight">> := Inflight - } when Matched >= 3 andalso Inflight + Queuing == 2, - maps:get(<<"metrics">>, DecodedMetrics1) + }, + Matched >= 3 andalso Inflight + Queuing == 2, + BridgeIDEgress ), %% start the listener 1883 to make the bridge reconnected ok = emqx_listeners:start_listener('tcp:default'), timer:sleep(1500), %% verify the metrics of the bridge, the 2 queued messages should have been sent - {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), - {ok, 200, BridgeMetricsStr2} = request(get, uri(["bridges", BridgeIDEgress, "metrics"]), []), - Decoded2 = jsx:decode(BridgeStr2), - ?assertEqual(<<"connected">>, maps:get(<<"status">>, Decoded2)), + ?assertMatch(#{<<"status">> := <<"connected">>}, request_bridge(BridgeIDEgress)), %% matched >= 3 because of possible retries. - ?assertMatch( + ?assertMetrics( #{ - <<"metrics">> := #{ - <<"matched">> := Matched, - <<"success">> := 3, - <<"failed">> := 0, - <<"queuing">> := 0, - <<"retried">> := _ - } - } when Matched >= 3, - jsx:decode(BridgeMetricsStr2) + <<"matched">> := Matched, + <<"success">> := 3, + <<"failed">> := 0, + <<"queuing">> := 0, + <<"retried">> := _ + }, + Matched >= 3, + BridgeIDEgress ), %% also verify the 2 messages have been sent to the remote broker assert_mqtt_msg_received(RemoteTopic, Payload1), assert_mqtt_msg_received(RemoteTopic, Payload2), ok. +assert_mqtt_msg_received(Topic) -> + assert_mqtt_msg_received(Topic, '_', 200). + assert_mqtt_msg_received(Topic, Payload) -> - ct:pal("checking if ~p has been received on ~p", [Payload, Topic]), + assert_mqtt_msg_received(Topic, Payload, 200). + +assert_mqtt_msg_received(Topic, Payload, Timeout) -> receive - {deliver, Topic, #message{payload = Payload}} -> - ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]), - ok - after 300 -> + {deliver, Topic, Msg = #message{}} when Payload == '_' -> + ct:pal("received mqtt ~p on topic ~p", [Msg, Topic]), + Msg; + {deliver, Topic, Msg = #message{payload = Payload}} -> + ct:pal("received mqtt ~p on topic ~p", [Msg, Topic]), + Msg + after Timeout -> {messages, Messages} = process_info(self(), messages), - Msg = io_lib:format("timeout waiting for ~p on topic ~p", [Payload, Topic]), - error({Msg, #{messages => Messages}}) + ct:fail("timeout waiting ~p ms for ~p on topic '~s', messages = ~0p", [ + Timeout, + Payload, + Topic, + Messages + ]) end. +create_bridge(Config = #{<<"type">> := Type, <<"name">> := Name}) -> + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + Config + ), + ?assertMatch( + #{ + <<"type">> := Type, + <<"name">> := Name + }, + jsx:decode(Bridge) + ), + emqx_bridge_resource:bridge_id(Type, Name). + +request_bridge(BridgeID) -> + {ok, 200, Bridge} = request(get, uri(["bridges", BridgeID]), []), + jsx:decode(Bridge). + +request_bridge_metrics(BridgeID) -> + {ok, 200, BridgeMetrics} = request(get, uri(["bridges", BridgeID, "metrics"]), []), + jsx:decode(BridgeMetrics). + request(Method, Url, Body) -> request(<<"connector_admin">>, Method, Url, Body). From 5ebceb20d2b32ba92522ecf24c5734c4dbf3908e Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 1 Feb 2023 16:23:58 +0300 Subject: [PATCH 3/5] test(mqtt-bridge): also test reconfiguration --- apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 52084196a..7e1d08497 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -206,6 +206,18 @@ t_mqtt_conn_bridge_ingress(_) -> BridgeIDIngress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_INGRESS), + %% try to create the bridge again + ?assertMatch( + {ok, 400, _}, + request(post, uri(["bridges"]), ServerConf) + ), + + %% try to reconfigure the bridge + ?assertMatch( + {ok, 200, _}, + request(put, uri(["bridges", BridgeIDIngress]), ServerConf) + ), + %% we now test if the bridge works as expected RemoteTopic = <>, LocalTopic = <>, From 8a46cb974e9a45ca1f3e023c3f6e11edd8868b25 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 1 Feb 2023 16:24:40 +0300 Subject: [PATCH 4/5] test(mqtt-bridge): test async bridge reconnects seamlessly --- .../test/emqx_bridge_mqtt_SUITE.erl | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 7e1d08497..841ed885f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -735,6 +735,89 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> assert_mqtt_msg_received(RemoteTopic, Payload2), ok. +t_mqtt_conn_bridge_egress_async_reconnect(_) -> + User1 = <<"user1">>, + BridgeIDEgress = create_bridge( + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF, + <<"resource_opts">> => #{ + <<"worker_pool_size">> => 2, + <<"query_mode">> => <<"async">>, + %% using a long time so we can test recovery + <<"request_timeout">> => <<"15s">>, + %% to make it check the healthy quickly + <<"health_check_interval">> => <<"0.5s">>, + %% to make it reconnect quickly + <<"auto_restart_interval">> => <<"1s">> + } + } + ), + + on_exit(fun() -> + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok + end), + + Self = self(), + LocalTopic = <>, + RemoteTopic = <>, + emqx:subscribe(RemoteTopic), + + Publisher = start_publisher(LocalTopic, 200, Self), + ct:sleep(1000), + + %% stop the listener 1883 to make the bridge disconnected + ok = emqx_listeners:stop_listener('tcp:default'), + ct:sleep(1500), + ?assertMatch( + #{<<"status">> := Status} when Status == <<"connecting">>; Status == <<"disconnected">>, + request_bridge(BridgeIDEgress) + ), + + %% start the listener 1883 to make the bridge reconnected + ok = emqx_listeners:start_listener('tcp:default'), + timer:sleep(1500), + ?assertMatch( + #{<<"status">> := <<"connected">>}, + request_bridge(BridgeIDEgress) + ), + + N = stop_publisher(Publisher), + + %% all those messages should eventually be delivered + [ + assert_mqtt_msg_received(RemoteTopic, Payload) + || I <- lists:seq(1, N), + Payload <- [integer_to_binary(I)] + ], + + ok. + +start_publisher(Topic, Interval, CtrlPid) -> + spawn_link(fun() -> publisher(Topic, 1, Interval, CtrlPid) end). + +stop_publisher(Pid) -> + _ = Pid ! {self(), stop}, + receive + {Pid, N} -> N + after 1_000 -> ct:fail("publisher ~p did not stop", [Pid]) + end. + +publisher(Topic, N, Delay, CtrlPid) -> + _ = emqx:publish(emqx_message:make(Topic, integer_to_binary(N))), + receive + {CtrlPid, stop} -> + CtrlPid ! {self(), N} + after Delay -> + publisher(Topic, N + 1, Delay, CtrlPid) + end. + +%% + assert_mqtt_msg_received(Topic) -> assert_mqtt_msg_received(Topic, '_', 200). From 5fd7f65a1f09637f4f3c7080f1e9da34d71aebd7 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 1 Feb 2023 16:52:47 +0300 Subject: [PATCH 5/5] test(bufworker): make testcase simpler to follow The confusion was due to the fact that subsequent query was missing `async_reply_fun` and thus, was not accumulating in the results. --- apps/emqx_resource/test/emqx_resource_SUITE.erl | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 620516a88..3ae69a47d 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -625,7 +625,7 @@ t_query_counter_async_inflight_batch(_) -> %% this will block the resource_worker as the inflight window is full now {ok, {ok, _}} = ?wait_async_action( - emqx_resource:query(?ID, {inc_counter, 2}), + emqx_resource:query(?ID, {inc_counter, 2}, ReqOpts()), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), @@ -635,11 +635,7 @@ t_query_counter_async_inflight_batch(_) -> [] ), - %% NOTE - %% The query above won't affect the size of the results table for some reason, - %% it's not clear if this is expected behaviour. Only the `async_reply_fun` - %% defined below will be called for the whole batch consisting of 2 increments. - Sent2 = Sent1 + 0, + Sent2 = Sent1 + 1, tap_metrics(?LINE), %% send query now will fail because the resource is blocked.