diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 7e1d08497..841ed885f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -735,6 +735,89 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> assert_mqtt_msg_received(RemoteTopic, Payload2), ok. +t_mqtt_conn_bridge_egress_async_reconnect(_) -> + User1 = <<"user1">>, + BridgeIDEgress = create_bridge( + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF, + <<"resource_opts">> => #{ + <<"worker_pool_size">> => 2, + <<"query_mode">> => <<"async">>, + %% using a long time so we can test recovery + <<"request_timeout">> => <<"15s">>, + %% to make it check the healthy quickly + <<"health_check_interval">> => <<"0.5s">>, + %% to make it reconnect quickly + <<"auto_restart_interval">> => <<"1s">> + } + } + ), + + on_exit(fun() -> + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok + end), + + Self = self(), + LocalTopic = <>, + RemoteTopic = <>, + emqx:subscribe(RemoteTopic), + + Publisher = start_publisher(LocalTopic, 200, Self), + ct:sleep(1000), + + %% stop the listener 1883 to make the bridge disconnected + ok = emqx_listeners:stop_listener('tcp:default'), + ct:sleep(1500), + ?assertMatch( + #{<<"status">> := Status} when Status == <<"connecting">>; Status == <<"disconnected">>, + request_bridge(BridgeIDEgress) + ), + + %% start the listener 1883 to make the bridge reconnected + ok = emqx_listeners:start_listener('tcp:default'), + timer:sleep(1500), + ?assertMatch( + #{<<"status">> := <<"connected">>}, + request_bridge(BridgeIDEgress) + ), + + N = stop_publisher(Publisher), + + %% all those messages should eventually be delivered + [ + assert_mqtt_msg_received(RemoteTopic, Payload) + || I <- lists:seq(1, N), + Payload <- [integer_to_binary(I)] + ], + + ok. + +start_publisher(Topic, Interval, CtrlPid) -> + spawn_link(fun() -> publisher(Topic, 1, Interval, CtrlPid) end). + +stop_publisher(Pid) -> + _ = Pid ! {self(), stop}, + receive + {Pid, N} -> N + after 1_000 -> ct:fail("publisher ~p did not stop", [Pid]) + end. + +publisher(Topic, N, Delay, CtrlPid) -> + _ = emqx:publish(emqx_message:make(Topic, integer_to_binary(N))), + receive + {CtrlPid, stop} -> + CtrlPid ! {self(), N} + after Delay -> + publisher(Topic, N + 1, Delay, CtrlPid) + end. + +%% + assert_mqtt_msg_received(Topic) -> assert_mqtt_msg_received(Topic, '_', 200).