fix(bridge_mqtt): push to max-inflight limit

the old pop_and_send implementation may leave inflight queue starving
This commit is contained in:
Zaiming Shi 2021-04-08 18:53:25 +02:00 committed by Zaiming (Stone) Shi
parent ae688e2c90
commit ae9278a409
3 changed files with 178 additions and 16 deletions

View File

@ -62,6 +62,8 @@
-module(emqx_bridge_worker).
-behaviour(gen_statem).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% APIs
-export([ start_link/1
, start_link/2
@ -479,11 +481,16 @@ retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Inflight]) ->
{error, State1} -> {error, State1}
end.
pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) when length(Inflight) >= Max ->
pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) ->
pop_and_send_loop(State, Max - length(Inflight)).
pop_and_send_loop(State, 0) ->
?tp(debug, inflight_full, #{}),
{ok, State};
pop_and_send(#{replayq := Q, connect_module := Module} = State) ->
pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) ->
case replayq:is_empty(Q) of
true ->
?tp(debug, replayq_drained, #{}),
{ok, State};
false ->
BatchSize = case Module of
@ -492,7 +499,10 @@ pop_and_send(#{replayq := Q, connect_module := Module} = State) ->
end,
Opts = #{count_limit => BatchSize, bytes_limit => 999999999},
{Q1, QAckRef, Batch} = replayq:pop(Q, Opts),
do_send(State#{replayq := Q1}, QAckRef, Batch)
case do_send(State#{replayq := Q1}, QAckRef, Batch) of
{ok, NewState} -> pop_and_send_loop(NewState, N - 1);
{error, NewState} -> {error, NewState}
end
end.
%% Assert non-empty batch because we have a is_empty check earlier.
@ -500,7 +510,7 @@ do_send(#{inflight := Inflight,
connect_module := Module,
connection := Connection,
mountpoint := Mountpoint,
if_record_metrics := IfRecordMetrics} = State, QAckRef, Batch) ->
if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) ->
ExportMsg = fun(Message) ->
bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'),
emqx_bridge_msg:to_export(Module, Mountpoint, Message)
@ -511,7 +521,7 @@ do_send(#{inflight := Inflight,
send_ack_ref => map_set(Refs),
batch => Batch}]}};
{error, Reason} ->
?LOG(info, "batch_produce_failed ~p", [Reason]),
?LOG(info, "mqtt_bridge_produce_failed ~p", [Reason]),
{error, State}
end.
@ -543,7 +553,9 @@ do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) ->
end.
%% Drop the consecutive header of the inflight list having empty send_ack_ref
drop_acked_batches(_Q, []) -> [];
drop_acked_batches(_Q, []) ->
?tp(debug, inflight_drained, #{}),
[];
drop_acked_batches(Q, [#{send_ack_ref := Refs,
q_ack_ref := QAckRef} | Rest] = All) ->
case maps:size(Refs) of

View File

@ -0,0 +1,40 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_stub_conn).
-behaviour(emqx_bridge_connect).
%% behaviour callbacks
-export([ start/1
, send/2
, stop/1
]).
-type ack_ref() :: emqx_bridge_worker:ack_ref().
-type batch() :: emqx_bridge_worker:batch().
start(Cfg) ->
{ok, Cfg}.
stop(_) -> ok.
%% @doc Callback for `emqx_bridge_connect' behaviour
-spec send(_, batch()) -> {ok, ack_ref()} | {error, any()}.
send(#{stub_pid := Pid}, Batch) ->
Ref = make_ref(),
Pid ! {stub_message, self(), Ref, Batch},
{ok, Ref}.

View File

@ -16,20 +16,19 @@
-module(emqx_bridge_worker_SUITE).
-export([ all/0
, init_per_suite/1
, end_per_suite/1]).
-export([ t_rpc/1
, t_mqtt/1
, t_mngr/1]).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
-define(SNK_WAIT(WHAT), ?assertMatch({ok, _}, ?block_until(#{?snk_kind := WHAT}, 2000, 1000))).
receive_messages(Count) ->
receive_messages(Count, []).
@ -45,10 +44,15 @@ receive_messages(Count, Msgs) ->
Msgs
end.
all() -> [ t_rpc
, t_mqtt
, t_mngr
].
all() ->
lists:filtermap(
fun({FunName, _Arity}) ->
case atom_to_list(FunName) of
"t_" ++ _ -> {true, FunName};
_ -> false
end
end,
?MODULE:module_info(exports)).
init_per_suite(Config) ->
case node() of
@ -174,3 +178,109 @@ t_mqtt(Config) when is_list(Config) ->
after
ok = emqx_bridge_worker:stop(Pid)
end.
t_stub_normal(Config) when is_list(Config) ->
Cfg = #{forwards => [<<"t_stub_normal/#">>],
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
start_type => auto,
stub_pid => self()
},
{ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId">>,
snabbkaffe:start_trace(),
try
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(ConnPid),
{ok, _PacketId} = emqtt:publish(ConnPid, <<"t_stub_normal/one">>, <<"hello">>, ?QOS_1),
receive
{stub_message, WorkerPid, BatchRef, _Batch} ->
WorkerPid ! {batch_ack, BatchRef},
ok
end,
?SNK_WAIT(inflight_drained),
?SNK_WAIT(replayq_drained),
emqtt:disconnect(ConnPid)
after
snabbkaffe:stop(),
ok = emqx_bridge_worker:stop(Pid)
end.
t_stub_overflow(Config) when is_list(Config) ->
Topic = <<"t_stub_overflow/one">>,
MaxInflight = 20,
Cfg = #{forwards => [Topic],
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
start_type => auto,
stub_pid => self(),
max_inflight => MaxInflight
},
{ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId">>,
snabbkaffe:start_trace(),
try
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(ConnPid),
lists:foreach(
fun(I) ->
Data = integer_to_binary(I),
_ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
end, lists:seq(1, MaxInflight * 2)),
?SNK_WAIT(inflight_full),
Acks = stub_receive(MaxInflight),
lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks),
Acks2 = stub_receive(MaxInflight),
lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks2),
?SNK_WAIT(inflight_drained),
?SNK_WAIT(replayq_drained),
emqtt:disconnect(ConnPid)
after
snabbkaffe:stop(),
ok = emqx_bridge_worker:stop(Worker)
end.
t_stub_random_order(Config) when is_list(Config) ->
Topic = <<"t_stub_random_order/a">>,
MaxInflight = 10,
Cfg = #{forwards => [Topic],
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
start_type => auto,
stub_pid => self(),
max_inflight => MaxInflight
},
{ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId">>,
snabbkaffe:start_trace(),
try
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(ConnPid),
lists:foreach(
fun(I) ->
Data = integer_to_binary(I),
_ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
end, lists:seq(1, MaxInflight)),
Acks = stub_receive(MaxInflight),
lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end,
lists:reverse(Acks)),
?SNK_WAIT(inflight_drained),
?SNK_WAIT(replayq_drained),
emqtt:disconnect(ConnPid)
after
snabbkaffe:stop(),
ok = emqx_bridge_worker:stop(Worker)
end.
stub_receive(N) ->
stub_receive(N, []).
stub_receive(0, Acc) -> lists:reverse(Acc);
stub_receive(N, Acc) ->
receive
{stub_message, WorkerPid, BatchRef, _Batch} ->
stub_receive(N - 1, [{WorkerPid, BatchRef} | Acc])
after
5000 ->
lists:reverse(Acc)
end.