Merge pull request #8959 from terry-xiaoyu/fix_mqtt_bridge_hangs_on_msg_sending

Fix mqtt bridge hangs on msg sending
This commit is contained in:
Xinyu Liu 2022-09-15 12:26:12 +08:00 committed by GitHub
commit 7242ffd713
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 143 additions and 19 deletions

View File

@ -208,8 +208,8 @@ info_example_basic(webhook) ->
auto_restart_interval => 15000,
query_mode => async,
async_inflight_window => 100,
enable_queue => true,
max_queue_bytes => 1024 * 1024 * 1024
enable_queue => false,
max_queue_bytes => 100 * 1024 * 1024
}
};
info_example_basic(mqtt) ->

View File

@ -481,5 +481,121 @@ t_egress_mqtt_bridge_with_rules(_) ->
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []).
t_mqtt_conn_bridge_egress_reconnect(_) ->
%% then we add a mqtt connector, using POST
User1 = <<"user1">>,
{ok, 201, Bridge} = request(
post,
uri(["bridges"]),
?SERVER_CONF(User1)#{
<<"type">> => ?TYPE_MQTT,
<<"name">> => ?BRIDGE_NAME_EGRESS,
<<"egress">> => ?EGRESS_CONF,
%% to make it reconnect quickly
<<"reconnect_interval">> => <<"1s">>,
<<"resource_opts">> => #{
<<"worker_pool_size">> => 2,
<<"enable_queue">> => true,
<<"query_mode">> => <<"sync">>,
%% to make it check the healthy quickly
<<"health_check_interval">> => <<"0.5s">>
}
}
),
#{
<<"type">> := ?TYPE_MQTT,
<<"name">> := ?BRIDGE_NAME_EGRESS
} = jsx:decode(Bridge),
BridgeIDEgress = emqx_bridge_resource:bridge_id(?TYPE_MQTT, ?BRIDGE_NAME_EGRESS),
%% we now test if the bridge works as expected
LocalTopic = <<"local_topic/1">>,
RemoteTopic = <<"remote_topic/", LocalTopic/binary>>,
Payload = <<"hello">>,
emqx:subscribe(RemoteTopic),
timer:sleep(100),
%% PUBLISH a message to the 'local' broker, as we have only one broker,
%% the remote broker is also the local one.
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% we should receive a message on the "remote" broker, with specified topic
assert_mqtt_msg_received(RemoteTopic, Payload),
%% verify the metrics of the bridge
{ok, 200, BridgeStr} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"metrics">> := #{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0},
<<"node_metrics">> :=
[
#{
<<"node">> := _,
<<"metrics">> :=
#{<<"matched">> := 1, <<"success">> := 1, <<"failed">> := 0}
}
]
},
jsx:decode(BridgeStr)
),
%% stop the listener 1883 to make the bridge disconnected
ok = emqx_listeners:stop_listener('tcp:default'),
%% PUBLISH 2 messages to the 'local' broker, the message should
emqx:publish(emqx_message:make(LocalTopic, Payload)),
emqx:publish(emqx_message:make(LocalTopic, Payload)),
%% verify the metrics of the bridge, the message should be queued
{ok, 200, BridgeStr1} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"status">> := Status,
<<"metrics">> := #{
<<"matched">> := 3, <<"success">> := 1, <<"failed">> := 0, <<"queuing">> := 2
}
} when Status == <<"connected">> orelse Status == <<"connecting">>,
jsx:decode(BridgeStr1)
),
%% start the listener 1883 to make the bridge reconnected
ok = emqx_listeners:start_listener('tcp:default'),
timer:sleep(1500),
%% verify the metrics of the bridge, the 2 queued messages should have been sent
{ok, 200, BridgeStr2} = request(get, uri(["bridges", BridgeIDEgress]), []),
?assertMatch(
#{
<<"status">> := <<"connected">>,
<<"metrics">> := #{
<<"matched">> := 3,
<<"success">> := 3,
<<"failed">> := 0,
<<"queuing">> := 0,
<<"retried">> := R
}
} when R > 0,
jsx:decode(BridgeStr2)
),
%% also verify the 2 messages have been sent to the remote broker
assert_mqtt_msg_received(RemoteTopic, Payload),
assert_mqtt_msg_received(RemoteTopic, Payload),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeIDEgress]), []),
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
ok.
assert_mqtt_msg_received(Topic, Payload) ->
?assert(
receive
{deliver, Topic, #message{payload = Payload}} ->
ct:pal("Got mqtt message: ~p on topic ~p", [Payload, Topic]),
true;
Msg ->
ct:pal("Unexpected Msg: ~p", [Msg]),
false
after 100 ->
false
end
).
request(Method, Url, Body) ->
request(<<"connector_admin">>, Method, Url, Body).

View File

@ -187,8 +187,7 @@ on_stop(_InstId, #{name := InstanceId}) ->
on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg),
ok.
emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg).
on_query_async(
_InstId,
@ -197,8 +196,7 @@ on_query_async(
#{name := InstanceId}
) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}),
ok.
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}).
on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
AutoReconn = maps:get(auto_reconnect, Conf, true),

View File

@ -276,6 +276,8 @@ idle({call, From}, ensure_started, State) ->
{error, Reason, _State} ->
{keep_state_and_data, [{reply, From, {error, Reason}}]}
end;
idle({call, From}, {send_to_remote, _}, _State) ->
{keep_state_and_data, [{reply, From, {error, {recoverable_error, not_connected}}}]};
%% @doc Standing by for manual start.
idle(info, idle, #{start_type := manual}) ->
keep_state_and_data;
@ -339,10 +341,12 @@ common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := F
{keep_state_and_data, [{reply, From, Forwards}]};
common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) ->
{keep_state_and_data, [{reply, From, maps:get(subscriptions, Connection, #{})}]};
common(_StateName, {call, From}, Req, _State) ->
{keep_state_and_data, [{reply, From, {error, {unsupported_request, Req}}}]};
common(_StateName, info, {'EXIT', _, _}, State) ->
{keep_state, State};
common(StateName, Type, Content, #{name := Name} = State) ->
?SLOG(notice, #{
?SLOG(error, #{
msg => "bridge_discarded_event",
name => Name,
type => Type,

View File

@ -84,15 +84,15 @@
-define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024).
-define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>).
-define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024 * 1024).
-define(DEFAULT_QUEUE_SIZE_RAW, <<"100GB">>).
-define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024).
-define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>).
%% count
-define(DEFAULT_BATCH_SIZE, 100).
%% milliseconds
-define(DEFAULT_BATCH_TIME, 10).
-define(DEFAULT_BATCH_TIME_RAW, <<"10ms">>).
-define(DEFAULT_BATCH_TIME, 20).
-define(DEFAULT_BATCH_TIME_RAW, <<"20ms">>).
%% count
-define(DEFAULT_INFLIGHT, 100).

View File

@ -467,11 +467,11 @@ retry_actions(Data) ->
handle_remove_event(From, ClearMetrics, Data) ->
stop_resource(Data),
ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts),
case ClearMetrics of
true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id);
false -> ok
end,
ok = emqx_resource_worker_sup:stop_workers(Data#data.id, Data#data.opts),
{stop_and_reply, normal, [{reply, From, ok}]}.
start_resource(Data, From) ->
@ -525,7 +525,7 @@ handle_connecting_health_check(Data) ->
(connected, UpdatedData) ->
{next_state, connected, UpdatedData};
(connecting, UpdatedData) ->
Actions = [{state_timeout, ?HEALTHCHECK_INTERVAL, health_check}],
Actions = [{state_timeout, health_check_interval(Data#data.opts), health_check}],
{keep_state, UpdatedData, Actions};
(disconnected, UpdatedData) ->
{next_state, disconnected, UpdatedData}

View File

@ -133,6 +133,7 @@ init({Id, Index, Opts}) ->
end,
emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', queue_count(Queue)),
ok = inflight_new(Name),
HCItvl = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
St = #{
id => Id,
index => Index,
@ -142,7 +143,7 @@ init({Id, Index, Opts}) ->
batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
queue => Queue,
resume_interval => maps:get(resume_interval, Opts, ?HEALTHCHECK_INTERVAL),
resume_interval => maps:get(resume_interval, Opts, HCItvl),
acc => [],
acc_left => BatchSize,
tref => undefined
@ -585,7 +586,12 @@ assert_ok_result(ok) ->
assert_ok_result({async_return, R}) ->
assert_ok_result(R);
assert_ok_result(R) when is_tuple(R) ->
ok = erlang:element(1, R);
try
ok = erlang:element(1, R)
catch
error:{badmatch, _} ->
error({not_ok_result, R})
end;
assert_ok_result(R) ->
error({not_ok_result, R}).

View File

@ -82,7 +82,7 @@ query_mode(_) -> undefined.
enable_batch(type) -> boolean();
enable_batch(required) -> false;
enable_batch(default) -> false;
enable_batch(default) -> true;
enable_batch(desc) -> ?DESC("enable_batch");
enable_batch(_) -> undefined.

View File

@ -93,8 +93,8 @@ values(common, Protocol, SupportUint, TypeOpts) ->
precision => ms,
resource_opts => #{
enable_batch => false,
batch_size => 5,
batch_time => <<"1m">>
batch_size => 100,
batch_time => <<"20ms">>
},
server => <<"127.0.0.1:8086">>,
ssl => #{enable => false}

View File

@ -58,7 +58,7 @@ values(post) ->
worker_pool_size => 1,
health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
enable_batch => false,
enable_batch => true,
batch_size => ?DEFAULT_BATCH_SIZE,
batch_time => ?DEFAULT_BATCH_TIME,
query_mode => async,