diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 38454581d..c82168add 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -337,7 +337,7 @@ connecting(#{reconnect_delay_ms := ReconnectDelayMs} = State) -> end. connected(state_timeout, connected, #{inflight := Inflight} = State) -> - case retry_inflight(State, Inflight) of + case retry_inflight(State#{inflight := []}, Inflight) of {ok, NewState} -> {keep_state, NewState, {next_event, internal, maybe_send}}; {error, NewState} -> @@ -348,10 +348,10 @@ connected(internal, maybe_send, State) -> {keep_state, NewState}; connected(info, {disconnected, Conn, Reason}, - #{connection := Connection, name := Name, reconnect_delay_ms := ReconnectDelayMs} = State) -> + #{connection := Connection, name := Name, reconnect_delay_ms := ReconnectDelayMs} = State) -> + ?tp(info, disconnected, #{name => Name, reason => Reason}), case Conn =:= maps:get(client_pid, Connection, undefined) of true -> - ?LOG(info, "Bridge ~p diconnected~nreason=~p", [Name, Reason]), {next_state, idle, State#{connection => undefined}, {state_timeout, ReconnectDelayMs, reconnect}}; false -> keep_state_and_data @@ -434,12 +434,14 @@ do_connect(#{forwards := Forwards, subscriptions := Subs, connect_module := ConnectModule, connect_cfg := ConnectCfg, + inflight := Inflight, name := Name} = State) -> ok = subscribe_local_topics(Forwards, Name), case emqx_bridge_connect:start(ConnectModule, ConnectCfg#{subscriptions => Subs}) of {ok, Conn} -> - ?LOG(info, "Bridge ~p is connecting......", [Name]), - {ok, eval_bridge_handler(State#{connection => Conn}, connected)}; + Res = eval_bridge_handler(State#{connection => Conn}, connected), + ?tp(info, connected, #{name => Name, inflight => length(Inflight)}), + {ok, Res}; {error, Reason} -> {error, Reason, State} end. @@ -475,10 +477,12 @@ collect(Acc) -> %% Retry all inflight (previously sent but not acked) batches. retry_inflight(State, []) -> {ok, State}; -retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Inflight]) -> - case do_send(State#{inflight := Inflight}, QAckRef, Batch) of - {ok, State1} -> retry_inflight(State1, Inflight); - {error, State1} -> {error, State1} +retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Rest] = OldInf) -> + case do_send(State, QAckRef, Batch) of + {ok, State1} -> + retry_inflight(State1, Rest); + {error, #{inflight := NewInf} = State1} -> + {error, State1#{inflight := NewInf ++ OldInf}} end. pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) -> diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl index 6b8db31ea..13c9fa704 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_stub_conn.erl @@ -34,7 +34,7 @@ stop(_) -> ok. %% @doc Callback for `emqx_bridge_connect' behaviour -spec send(_, batch()) -> {ok, ack_ref()} | {error, any()}. -send(#{stub_pid := Pid}, Batch) -> +send(#{client_pid := Pid}, Batch) -> Ref = make_ref(), Pid ! {stub_message, self(), Ref, Batch}, {ok, Ref}. diff --git a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl index fbd1aae39..4ca4a2bb2 100644 --- a/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl +++ b/apps/emqx_bridge_mqtt/test/emqx_bridge_worker_SUITE.erl @@ -191,7 +191,7 @@ t_stub_normal(Config) when is_list(Config) -> connect_module => emqx_bridge_stub_conn, forward_mountpoint => <<"forwarded">>, start_type => auto, - stub_pid => self() + client_pid => self() }, {ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), ClientId = <<"ClientId">>, @@ -218,7 +218,7 @@ t_stub_overflow(Config) when is_list(Config) -> connect_module => emqx_bridge_stub_conn, forward_mountpoint => <<"forwarded">>, start_type => auto, - stub_pid => self(), + client_pid => self(), max_inflight => MaxInflight }, {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), @@ -250,7 +250,7 @@ t_stub_random_order(Config) when is_list(Config) -> connect_module => emqx_bridge_stub_conn, forward_mountpoint => <<"forwarded">>, start_type => auto, - stub_pid => self(), + client_pid => self(), max_inflight => MaxInflight }, {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), @@ -273,6 +273,53 @@ t_stub_random_order(Config) when is_list(Config) -> ok = emqx_bridge_worker:stop(Worker) end. +t_stub_retry_inflight(Config) when is_list(Config) -> + Topic = <<"to_stub_retry_inflight/a">>, + MaxInflight = 10, + Cfg = #{forwards => [Topic], + connect_module => emqx_bridge_stub_conn, + forward_mountpoint => <<"forwarded">>, + reconnect_delay_ms => 10, + start_type => auto, + client_pid => self(), + max_inflight => MaxInflight + }, + {ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg), + ClientId = <<"ClientId2">>, + try + case ?block_until(#{?snk_kind := connected, inflight := 0}, 2000, 1000) of + {ok, #{inflight := 0}} -> ok; + Other -> ct:fail("~p", [Other]) + end, + {ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]), + {ok, _} = emqtt:connect(ConnPid), + lists:foreach( + fun(I) -> + Data = integer_to_binary(I), + _ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1) + end, lists:seq(1, MaxInflight)), + %% receive acks but do not ack + Acks1 = stub_receive(MaxInflight), + ?assertEqual(MaxInflight, length(Acks1)), + %% simulate a disconnect + Worker ! {disconnected, self(), test}, + ?SNK_WAIT(disconnected), + case ?block_until(#{?snk_kind := connected, inflight := MaxInflight}, 2000, 20) of + {ok, _} -> ok; + Error -> ct:fail("~p", [Error]) + end, + %% expect worker to retry inflight, so to receive acks again + Acks2 = stub_receive(MaxInflight), + ?assertEqual(MaxInflight, length(Acks2)), + lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end, + lists:reverse(Acks2)), + ?SNK_WAIT(inflight_drained), + ?SNK_WAIT(replayq_drained), + emqtt:disconnect(ConnPid) + after + ok = emqx_bridge_worker:stop(Worker) + end. + stub_receive(N) -> stub_receive(N, []).