diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index e35ad5fe5..c89621d59 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -481,5 +481,121 @@ t_egress_mqtt_bridge_with_rules(_) -> {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []). +t_mqtt_conn_bridge_egress_reconnect(_) -> + %% then we add a mqtt connector, using POST + User1 = <<"user1">>, + + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF, + %% to make it reconnect quickly + <<"reconnect_interval">> => <<"1s">>, + <<"resource_opts">> => #{ + <<"worker_pool_size">> => 2, + <<"enable_queue">> => true, + <<"query_mode">> => <<"sync">>, + %% to make it check the healthy quickly + <<"health_check_interval">> => <<"0.5s">> + } + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_EGRESS + } = jsx:decode(Bridge), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + timer:sleep(100), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + %% we should receive a message on the "remote" broker, with specified topic + assert_mqtt_msg_received(RemoteTopic, Payload), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0} + } + ] + }, + jsx:decode(BridgeStr) + ), + + %% stop the listener 1883 to make the bridge disconnected + ok = emqx_listeners:stop_listener('tcp:default'), + + %% PUBLISH 2 messages to the 'local' broker, the message should + emqx:publish(emqx_message:make(LocalTopic, Payload)), + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + %% verify the metrics of the bridge, the message should be queued + {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"status">> := Status, + <<"metrics">> := #{ + <<"matched">> := 3, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 + } + } when Status == <<"connected">> orelse Status == <<"connecting">>, + jsx:decode(BridgeStr1) + ), + + %% start the listener 1883 to make the bridge reconnected + ok = emqx_listeners:start_listener('tcp:default'), + timer:sleep(1500), + %% verify the metrics of the bridge, the 2 queued messages should have been sent + {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"status">> := <<"connected">>, + <<"metrics">> := #{ + <<"matched">> := 3, + <<"success">> := 3, + <<"failed">> := 0, + <<"queuing">> := 0, + <<"retried">> := R + } + } when R > 0, + jsx:decode(BridgeStr2) + ), + %% also verify the 2 messages have been sent to the remote broker + assert_mqtt_msg_received(RemoteTopic, Payload), + assert_mqtt_msg_received(RemoteTopic, Payload), + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok. + +assert_mqtt_msg_received(Topic, Payload) -> + ?assert( + receive + {deliver, Topic, #message{payload = Payload}} -> + ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]), + true; + Msg -> + ct:pal("Unexpected Msg: ~p", [Msg]), + false + after 100 -> + false + end + ). + request(Method, Url, Body) -> request(<<"connector_admin">>, Method, Url, Body). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index e1651f114..fe359437c 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -276,7 +276,7 @@ idle({call, From}, ensure_started, State) -> {error, Reason, _State} -> {keep_state_and_data, [{reply, From, {error, Reason}}]} end; -idle({call, From}, send_to_remote, _State) -> +idle({call, From}, {send_to_remote, _}, _State) -> {keep_state_and_data, [{reply, From, {error, {recoverable_error, not_connected}}}]}; %% @doc Standing by for manual start. idle(info, idle, #{start_type := manual}) -> @@ -342,7 +342,7 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; common(_StateName, {call, From}, Req, _State) -> - {keep_state_and_data, [{reply, From, {error, {unsuppored_request, Req}}}]}; + {keep_state_and_data, [{reply, From, {error, {unsupported_request, Req}}}]}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; common(StateName, Type, Content, #{name := Name} = State) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index a896e990e..3430b3964 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -525,7 +525,7 @@ handle_connecting_health_check(Data) -> (connected, UpdatedData) -> {next_state, connected, UpdatedData}; (connecting, UpdatedData) -> - Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], + Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {keep_state, UpdatedData, Actions}; (disconnected, UpdatedData) -> {next_state, disconnected, UpdatedData} diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 05292944b..1f6c4f599 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -133,6 +133,7 @@ init({Id, Index, Opts}) -> end, emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), ok = inflight_new(Name), + HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), St = #{ id => Id, index => Index, @@ -142,7 +143,7 @@ init({Id, Index, Opts}) -> batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, - resume_interval => maps:get(resume_interval, Opts, ?HEALTHCHECK_INTERVAL), + resume_interval => maps:get(resume_interval, Opts, HCItvl), acc => [], acc_left => BatchSize, tref => undefined