From f41adb099794d85c917f5ad0496e164c0e5fb9ac Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 14 Sep 2022 15:18:07 +0800 Subject: [PATCH 1/4] refactor: change some default values of resource_opts --- apps/emqx_bridge/src/emqx_bridge_api.erl | 4 ++-- apps/emqx_resource/include/emqx_resource.hrl | 8 ++++---- apps/emqx_resource/src/schema/emqx_resource_schema.erl | 2 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl | 4 ++-- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a353c9cf0..6d5ecf808 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -208,8 +208,8 @@ info_example_basic(webhook) -> auto_restart_interval => 15000, query_mode => async, async_inflight_window => 100, - enable_queue => true, - max_queue_bytes => 1024 * 1024 * 1024 + enable_queue => false, + max_queue_bytes => 100 * 1024 * 1024 } }; info_example_basic(mqtt) -> diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index aab0129d1..71300df72 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -84,15 +84,15 @@ -define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024). -define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>). --define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024 * 1024). --define(DEFAULT_QUEUE_SIZE_RAW, <<"100GB">>). +-define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024). +-define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>). %% count -define(DEFAULT_BATCH_SIZE, 100). %% milliseconds --define(DEFAULT_BATCH_TIME, 10). --define(DEFAULT_BATCH_TIME_RAW, <<"10ms">>). +-define(DEFAULT_BATCH_TIME, 20). +-define(DEFAULT_BATCH_TIME_RAW, <<"20ms">>). %% count -define(DEFAULT_INFLIGHT, 100). diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index df284bbe8..c666974b1 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -82,7 +82,7 @@ query_mode(_) -> undefined. enable_batch(type) -> boolean(); enable_batch(required) -> false; -enable_batch(default) -> false; +enable_batch(default) -> true; enable_batch(desc) -> ?DESC("enable_batch"); enable_batch(_) -> undefined. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 6ad804b2c..a2f125722 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -93,8 +93,8 @@ values(common, Protocol, SupportUint, TypeOpts) -> precision => ms, resource_opts => #{ enable_batch => false, - batch_size => 5, - batch_time => <<"1m">> + batch_size => 100, + batch_time => <<"20ms">> }, server => <<"127.0.0.1:8086">>, ssl => #{enable => false} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index 616842292..bdbf96424 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -58,7 +58,7 @@ values(post) -> worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, - enable_batch => false, + enable_batch => true, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, From 1c03c236f5cfcac14befb6c17b34a93ec5408f42 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 14 Sep 2022 15:19:30 +0800 Subject: [PATCH 2/4] fix(mqtt_bridge): handle send_to_remote in idle state --- apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl | 6 +++++- apps/emqx_resource/src/emqx_resource_manager.erl | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 618361ad3..5f81e68e1 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -276,6 +276,8 @@ idle({call, From}, ensure_started, State) -> {error, Reason, _State} -> {keep_state_and_data, [{reply, From, {error, Reason}}]} end; +idle({call, From}, send_to_remote, _State) -> + {keep_state_and_data, [{reply, From, {error, {recoverable_error, not_connected}}}]}; %% @doc Standing by for manual start. idle(info, idle, #{start_type := manual}) -> keep_state_and_data; @@ -339,10 +341,12 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F {keep_state_and_data, [{reply, From, Forwards}]}; common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; +common(_StateName, {call, From}, Req, _State) -> + {keep_state_and_data, [{reply, From, {unsuppored_request, Req}}]}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; common(StateName, Type, Content, #{name := Name} = State) -> - ?SLOG(notice, #{ + ?SLOG(error, #{ msg => "bridge_discarded_event", name => Name, type => Type, diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index e4ba92b5c..a896e990e 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -467,11 +467,11 @@ retry_actions(Data) -> handle_remove_event(From, ClearMetrics, Data) -> stop_resource(Data), + ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts), case ClearMetrics of true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, - ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts), {stop_and_reply, normal, [{reply, From, ok}]}. start_resource(Data, From) -> From 4e211c12d3db5e074313c6b6dcfc481f4964471d Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 14 Sep 2022 16:15:10 +0800 Subject: [PATCH 3/4] fix(mqtt_bridge): return value of sending messages was discarded --- apps/emqx_connector/src/emqx_connector_mqtt.erl | 6 ++---- .../emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl | 2 +- apps/emqx_resource/src/emqx_resource_worker.erl | 7 ++++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index f52edab15..b063d7436 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -187,8 +187,7 @@ on_stop(_InstId, #{name := InstanceId}) -> on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg), - ok. + emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg). on_query_async( _InstId, @@ -197,8 +196,7 @@ on_query_async( #{name := InstanceId} ) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}), - ok. + emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}). on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> AutoReconn = maps:get(auto_reconnect, Conf, true), diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 5f81e68e1..e1651f114 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -342,7 +342,7 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; common(_StateName, {call, From}, Req, _State) -> - {keep_state_and_data, [{reply, From, {unsuppored_request, Req}}]}; + {keep_state_and_data, [{reply, From, {error, {unsuppored_request, Req}}}]}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; common(StateName, Type, Content, #{name := Name} = State) -> diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index f683b67ed..05292944b 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -585,7 +585,12 @@ assert_ok_result(ok) -> assert_ok_result({async_return, R}) -> assert_ok_result(R); assert_ok_result(R) when is_tuple(R) -> - ok = erlang:element(1, R); + try + ok = erlang:element(1, R) + catch + error:{badmatch, _} -> + error({not_ok_result, R}) + end; assert_ok_result(R) -> error({not_ok_result, R}). From d5d3972ff5c318d4f4604876e9e30b3ec8c854c2 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 15 Sep 2022 08:57:17 +0800 Subject: [PATCH 4/4] chore: add test cases for MQTT Bridge reconnecting --- .../test/emqx_bridge_mqtt_SUITE.erl | 116 ++++++++++++++++++ .../src/mqtt/emqx_connector_mqtt_worker.erl | 4 +- .../src/emqx_resource_manager.erl | 2 +- .../src/emqx_resource_worker.erl | 3 +- 4 files changed, 121 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index e35ad5fe5..c89621d59 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -481,5 +481,121 @@ t_egress_mqtt_bridge_with_rules(_) -> {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []). +t_mqtt_conn_bridge_egress_reconnect(_) -> + %% then we add a mqtt connector, using POST + User1 = <<"user1">>, + + {ok, 201, Bridge} = request( + post, + uri(["bridges"]), + ?SERVER_CONF(User1)#{ + <<"type">> => ?TYPE_MQTT, + <<"name">> => ?BRIDGE_NAME_EGRESS, + <<"egress">> => ?EGRESS_CONF, + %% to make it reconnect quickly + <<"reconnect_interval">> => <<"1s">>, + <<"resource_opts">> => #{ + <<"worker_pool_size">> => 2, + <<"enable_queue">> => true, + <<"query_mode">> => <<"sync">>, + %% to make it check the healthy quickly + <<"health_check_interval">> => <<"0.5s">> + } + } + ), + #{ + <<"type">> := ?TYPE_MQTT, + <<"name">> := ?BRIDGE_NAME_EGRESS + } = jsx:decode(Bridge), + BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS), + %% we now test if the bridge works as expected + LocalTopic = <<"local_topic/1">>, + RemoteTopic = <<"remote_topic/", LocalTopic/binary>>, + Payload = <<"hello">>, + emqx:subscribe(RemoteTopic), + timer:sleep(100), + %% PUBLISH a message to the 'local' broker, as we have only one broker, + %% the remote broker is also the local one. + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + %% we should receive a message on the "remote" broker, with specified topic + assert_mqtt_msg_received(RemoteTopic, Payload), + + %% verify the metrics of the bridge + {ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}, + <<"node_metrics">> := + [ + #{ + <<"node">> := _, + <<"metrics">> := + #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0} + } + ] + }, + jsx:decode(BridgeStr) + ), + + %% stop the listener 1883 to make the bridge disconnected + ok = emqx_listeners:stop_listener('tcp:default'), + + %% PUBLISH 2 messages to the 'local' broker, the message should + emqx:publish(emqx_message:make(LocalTopic, Payload)), + emqx:publish(emqx_message:make(LocalTopic, Payload)), + + %% verify the metrics of the bridge, the message should be queued + {ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"status">> := Status, + <<"metrics">> := #{ + <<"matched">> := 3, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2 + } + } when Status == <<"connected">> orelse Status == <<"connecting">>, + jsx:decode(BridgeStr1) + ), + + %% start the listener 1883 to make the bridge reconnected + ok = emqx_listeners:start_listener('tcp:default'), + timer:sleep(1500), + %% verify the metrics of the bridge, the 2 queued messages should have been sent + {ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []), + ?assertMatch( + #{ + <<"status">> := <<"connected">>, + <<"metrics">> := #{ + <<"matched">> := 3, + <<"success">> := 3, + <<"failed">> := 0, + <<"queuing">> := 0, + <<"retried">> := R + } + } when R > 0, + jsx:decode(BridgeStr2) + ), + %% also verify the 2 messages have been sent to the remote broker + assert_mqtt_msg_received(RemoteTopic, Payload), + assert_mqtt_msg_received(RemoteTopic, Payload), + %% delete the bridge + {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok. + +assert_mqtt_msg_received(Topic, Payload) -> + ?assert( + receive + {deliver, Topic, #message{payload = Payload}} -> + ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]), + true; + Msg -> + ct:pal("Unexpected Msg: ~p", [Msg]), + false + after 100 -> + false + end + ). + request(Method, Url, Body) -> request(<<"connector_admin">>, Method, Url, Body). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index e1651f114..fe359437c 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -276,7 +276,7 @@ idle({call, From}, ensure_started, State) -> {error, Reason, _State} -> {keep_state_and_data, [{reply, From, {error, Reason}}]} end; -idle({call, From}, send_to_remote, _State) -> +idle({call, From}, {send_to_remote, _}, _State) -> {keep_state_and_data, [{reply, From, {error, {recoverable_error, not_connected}}}]}; %% @doc Standing by for manual start. idle(info, idle, #{start_type := manual}) -> @@ -342,7 +342,7 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> {keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]}; common(_StateName, {call, From}, Req, _State) -> - {keep_state_and_data, [{reply, From, {error, {unsuppored_request, Req}}}]}; + {keep_state_and_data, [{reply, From, {error, {unsupported_request, Req}}}]}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; common(StateName, Type, Content, #{name := Name} = State) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index a896e990e..3430b3964 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -525,7 +525,7 @@ handle_connecting_health_check(Data) -> (connected, UpdatedData) -> {next_state, connected, UpdatedData}; (connecting, UpdatedData) -> - Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}], + Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}], {keep_state, UpdatedData, Actions}; (disconnected, UpdatedData) -> {next_state, disconnected, UpdatedData} diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 05292944b..1f6c4f599 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -133,6 +133,7 @@ init({Id, Index, Opts}) -> end, emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)), ok = inflight_new(Name), + HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL), St = #{ id => Id, index => Index, @@ -142,7 +143,7 @@ init({Id, Index, Opts}) -> batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, - resume_interval => maps:get(resume_interval, Opts, ?HEALTHCHECK_INTERVAL), + resume_interval => maps:get(resume_interval, Opts, HCItvl), acc => [], acc_left => BatchSize, tref => undefined