diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 157431817..38454581d 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -62,6 +62,8 @@ -module(emqx_bridge_worker). -behaviour(gen_statem). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + %% APIs -export([ start_link/1 , start_link/2 @@ -479,11 +481,16 @@ retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Inflight]) -> {error, State1} -> {error, State1} end. -pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) when length(Inflight) >= Max -> +pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) -> + pop_and_send_loop(State, Max - length(Inflight)). + +pop_and_send_loop(State, 0) -> + ?tp(debug, inflight_full, #{}), {ok, State}; -pop_and_send(#{replayq := Q, connect_module := Module} = State) -> +pop_and_send_loop(#{replayq := Q, connect_module := Module} = State, N) -> case replayq:is_empty(Q) of true -> + ?tp(debug, replayq_drained, #{}), {ok, State}; false -> BatchSize = case Module of @@ -492,7 +499,10 @@ pop_and_send(#{replayq := Q, connect_module := Module} = State) -> end, Opts = #{count_limit => BatchSize, bytes_limit => 999999999}, {Q1, QAckRef, Batch} = replayq:pop(Q, Opts), - do_send(State#{replayq := Q1}, QAckRef, Batch) + case do_send(State#{replayq := Q1}, QAckRef, Batch) of + {ok, NewState} -> pop_and_send_loop(NewState, N - 1); + {error, NewState} -> {error, NewState} + end end. %% Assert non-empty batch because we have a is_empty check earlier. @@ -500,7 +510,7 @@ do_send(#{inflight := Inflight, connect_module := Module, connection := Connection, mountpoint := Mountpoint, - if_record_metrics := IfRecordMetrics} = State, QAckRef, Batch) -> + if_record_metrics := IfRecordMetrics} = State, QAckRef, [_ | _] = Batch) -> ExportMsg = fun(Message) -> bridges_metrics_inc(IfRecordMetrics, 'bridge.mqtt.message_sent'), emqx_bridge_msg:to_export(Module, Mountpoint, Message) @@ -511,7 +521,7 @@ do_send(#{inflight := Inflight, send_ack_ref => map_set(Refs), batch => Batch}]}}; {error, Reason} -> - ?LOG(info, "batch_produce_failed ~p", [Reason]), + ?LOG(info, "mqtt_bridge_produce_failed ~p", [Reason]), {error, State} end. @@ -543,7 +553,9 @@ do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) -> end. %% Drop the consecutive header of the inflight list having empty send_ack_ref -drop_acked_batches(_Q, []) -> []; +drop_acked_batches(_Q, []) -> + ?tp(debug, inflight_drained, #{}), + []; drop_acked_batches(Q, [#{send_ack_ref := Refs, q_ack_ref := QAckRef} | Rest] = All) -> case maps:size(Refs) of diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl new file mode 100644 index 000000000..6b8db31ea --- /dev/null +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl @@ -0,0 +1,40 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_stub_conn). + +-behaviour(emqx_bridge_connect). + +%% behaviour callbacks +-export([ start/1 + , send/2 + , stop/1 + ]). + +-type ack_ref() :: emqx_bridge_worker:ack_ref(). +-type batch() :: emqx_bridge_worker:batch(). + +start(Cfg) -> + {ok, Cfg}. + +stop(_) -> ok. + +%% @doc Callback for `emqx_bridge_connect' behaviour +-spec send(_, batch()) -> {ok, ack_ref()} | {error, any()}. +send(#{stub_pid := Pid}, Batch) -> + Ref = make_ref(), + Pid ! {stub_message, self(), Ref, Batch}, + {ok, Ref}. diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl index 7cbf0987c..1bba71971 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl @@ -16,20 +16,19 @@ -module(emqx_bridge_worker_SUITE). --export([ all/0 - , init_per_suite/1 - , end_per_suite/1]). --export([ t_rpc/1 - , t_mqtt/1 - , t_mngr/1]). +-compile(export_all). +-compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(wait(For, Timeout), emqx_ct_helpers:wait_for(?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)). +-define(SNK_WAIT(WHAT), ?assertMatch({ok, _}, ?block_until(#{?snk_kind := WHAT}, 2000, 1000))). + receive_messages(Count) -> receive_messages(Count, []). @@ -45,10 +44,15 @@ receive_messages(Count, Msgs) -> Msgs end. -all() -> [ t_rpc - , t_mqtt - , t_mngr - ]. +all() -> + lists:filtermap( + fun({FunName, _Arity}) -> + case atom_to_list(FunName) of + "t_" ++ _ -> {true, FunName}; + _ -> false + end + end, + ?MODULE:module_info(exports)). init_per_suite(Config) -> case node() of @@ -174,3 +178,109 @@ t_mqtt(Config) when is_list(Config) -> after ok = emqx_bridge_worker:stop(Pid) end. + +t_stub_normal(Config) when is_list(Config) -> + Cfg = #{forwards => [<<"t_stub_normal/#">>], + connect_module => emqx_bridge_stub_conn, + forward_mountpoint => <<"forwarded">>, + start_type => auto, + stub_pid => self() + }, + {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), + ClientId = <<"ClientId">>, + snabbkaffe:start_trace(), + try + {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), + {ok, _} = emqtt:connect(ConnPid), + {ok, _PacketId} = emqtt:publish(ConnPid, <<"t_stub_normal/one">>, <<"hello">>, ?QOS_1), + receive + {stub_message, WorkerPid, BatchRef, _Batch} -> + WorkerPid ! {batch_ack, BatchRef}, + ok + end, + ?SNK_WAIT(inflight_drained), + ?SNK_WAIT(replayq_drained), + emqtt:disconnect(ConnPid) + after + snabbkaffe:stop(), + ok = emqx_bridge_worker:stop(Pid) + end. + +t_stub_overflow(Config) when is_list(Config) -> + Topic = <<"t_stub_overflow/one">>, + MaxInflight = 20, + Cfg = #{forwards => [Topic], + connect_module => emqx_bridge_stub_conn, + forward_mountpoint => <<"forwarded">>, + start_type => auto, + stub_pid => self(), + max_inflight => MaxInflight + }, + {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), + ClientId = <<"ClientId">>, + snabbkaffe:start_trace(), + try + {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), + {ok, _} = emqtt:connect(ConnPid), + lists:foreach( + fun(I) -> + Data = integer_to_binary(I), + _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1) + end, lists:seq(1, MaxInflight * 2)), + ?SNK_WAIT(inflight_full), + Acks = stub_receive(MaxInflight), + lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks), + Acks2 = stub_receive(MaxInflight), + lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, Acks2), + ?SNK_WAIT(inflight_drained), + ?SNK_WAIT(replayq_drained), + emqtt:disconnect(ConnPid) + after + snabbkaffe:stop(), + ok = emqx_bridge_worker:stop(Worker) + end. + +t_stub_random_order(Config) when is_list(Config) -> + Topic = <<"t_stub_random_order/a">>, + MaxInflight = 10, + Cfg = #{forwards => [Topic], + connect_module => emqx_bridge_stub_conn, + forward_mountpoint => <<"forwarded">>, + start_type => auto, + stub_pid => self(), + max_inflight => MaxInflight + }, + {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), + ClientId = <<"ClientId">>, + snabbkaffe:start_trace(), + try + {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), + {ok, _} = emqtt:connect(ConnPid), + lists:foreach( + fun(I) -> + Data = integer_to_binary(I), + _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1) + end, lists:seq(1, MaxInflight)), + Acks = stub_receive(MaxInflight), + lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, + lists:reverse(Acks)), + ?SNK_WAIT(inflight_drained), + ?SNK_WAIT(replayq_drained), + emqtt:disconnect(ConnPid) + after + snabbkaffe:stop(), + ok = emqx_bridge_worker:stop(Worker) + end. + +stub_receive(N) -> + stub_receive(N, []). + +stub_receive(0, Acc) -> lists:reverse(Acc); +stub_receive(N, Acc) -> + receive + {stub_message, WorkerPid, BatchRef, _Batch} -> + stub_receive(N - 1, [{WorkerPid, BatchRef} | Acc]) + after + 5000 -> + lists:reverse(Acc) + end.