Merge pull request #4526 from zmstone/fix-mqtt-bridge-retry

fix(emqx_bridge_mqtt): fix retry_inflight
This commit is contained in:
Zaiming (Stone) Shi 2021-04-13 12:04:14 +02:00 committed by GitHub
commit 24fdd7aef5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 13 deletions

View File

@ -337,7 +337,7 @@ connecting(#{reconnect_delay_ms := ReconnectDelayMs} = State) ->
end.
connected(state_timeout, connected, #{inflight := Inflight} = State) ->
case retry_inflight(State, Inflight) of
case retry_inflight(State#{inflight := []}, Inflight) of
{ok, NewState} ->
{keep_state, NewState, {next_event, internal, maybe_send}};
{error, NewState} ->
@ -348,10 +348,10 @@ connected(internal, maybe_send, State) ->
{keep_state, NewState};
connected(info, {disconnected, Conn, Reason},
#{connection := Connection, name := Name, reconnect_delay_ms := ReconnectDelayMs} = State) ->
#{connection := Connection, name := Name, reconnect_delay_ms := ReconnectDelayMs} = State) ->
?tp(info, disconnected, #{name => Name, reason => Reason}),
case Conn =:= maps:get(client_pid, Connection, undefined) of
true ->
?LOG(info, "Bridge ~p diconnected~nreason=~p", [Name, Reason]),
{next_state, idle, State#{connection => undefined}, {state_timeout, ReconnectDelayMs, reconnect}};
false ->
keep_state_and_data
@ -434,12 +434,14 @@ do_connect(#{forwards := Forwards,
subscriptions := Subs,
connect_module := ConnectModule,
connect_cfg := ConnectCfg,
inflight := Inflight,
name := Name} = State) ->
ok = subscribe_local_topics(Forwards, Name),
case emqx_bridge_connect:start(ConnectModule, ConnectCfg#{subscriptions => Subs}) of
{ok, Conn} ->
?LOG(info, "Bridge ~p is connecting......", [Name]),
{ok, eval_bridge_handler(State#{connection => Conn}, connected)};
Res = eval_bridge_handler(State#{connection => Conn}, connected),
?tp(info, connected, #{name => Name, inflight => length(Inflight)}),
{ok, Res};
{error, Reason} ->
{error, Reason, State}
end.
@ -475,10 +477,12 @@ collect(Acc) ->
%% Retry all inflight (previously sent but not acked) batches.
retry_inflight(State, []) -> {ok, State};
retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Inflight]) ->
case do_send(State#{inflight := Inflight}, QAckRef, Batch) of
{ok, State1} -> retry_inflight(State1, Inflight);
{error, State1} -> {error, State1}
retry_inflight(State, [#{q_ack_ref := QAckRef, batch := Batch} | Rest] = OldInf) ->
case do_send(State, QAckRef, Batch) of
{ok, State1} ->
retry_inflight(State1, Rest);
{error, #{inflight := NewInf} = State1} ->
{error, State1#{inflight := NewInf ++ OldInf}}
end.
pop_and_send(#{inflight := Inflight, max_inflight := Max } = State) ->

View File

@ -34,7 +34,7 @@ stop(_) -> ok.
%% @doc Callback for `emqx_bridge_connect' behaviour
-spec send(_, batch()) -> {ok, ack_ref()} | {error, any()}.
send(#{stub_pid := Pid}, Batch) ->
send(#{client_pid := Pid}, Batch) ->
Ref = make_ref(),
Pid ! {stub_message, self(), Ref, Batch},
{ok, Ref}.

View File

@ -191,7 +191,7 @@ t_stub_normal(Config) when is_list(Config) ->
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
start_type => auto,
stub_pid => self()
client_pid => self()
},
{ok, Pid} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId">>,
@ -218,7 +218,7 @@ t_stub_overflow(Config) when is_list(Config) ->
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
start_type => auto,
stub_pid => self(),
client_pid => self(),
max_inflight => MaxInflight
},
{ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
@ -250,7 +250,7 @@ t_stub_random_order(Config) when is_list(Config) ->
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
start_type => auto,
stub_pid => self(),
client_pid => self(),
max_inflight => MaxInflight
},
{ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
@ -273,6 +273,53 @@ t_stub_random_order(Config) when is_list(Config) ->
ok = emqx_bridge_worker:stop(Worker)
end.
t_stub_retry_inflight(Config) when is_list(Config) ->
Topic = <<"to_stub_retry_inflight/a">>,
MaxInflight = 10,
Cfg = #{forwards => [Topic],
connect_module => emqx_bridge_stub_conn,
forward_mountpoint => <<"forwarded">>,
reconnect_delay_ms => 10,
start_type => auto,
client_pid => self(),
max_inflight => MaxInflight
},
{ok, Worker} = emqx_bridge_worker:start_link(?FUNCTION_NAME, Cfg),
ClientId = <<"ClientId2">>,
try
case ?block_until(#{?snk_kind := connected, inflight := 0}, 2000, 1000) of
{ok, #{inflight := 0}} -> ok;
Other -> ct:fail("~p", [Other])
end,
{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(ConnPid),
lists:foreach(
fun(I) ->
Data = integer_to_binary(I),
_ = emqtt:publish(ConnPid, Topic, Data, ?QOS_1)
end, lists:seq(1, MaxInflight)),
%% receive acks but do not ack
Acks1 = stub_receive(MaxInflight),
?assertEqual(MaxInflight, length(Acks1)),
%% simulate a disconnect
Worker ! {disconnected, self(), test},
?SNK_WAIT(disconnected),
case ?block_until(#{?snk_kind := connected, inflight := MaxInflight}, 2000, 20) of
{ok, _} -> ok;
Error -> ct:fail("~p", [Error])
end,
%% expect worker to retry inflight, so to receive acks again
Acks2 = stub_receive(MaxInflight),
?assertEqual(MaxInflight, length(Acks2)),
lists:foreach(fun({Pid, Ref}) -> Pid ! {batch_ack, Ref} end,
lists:reverse(Acks2)),
?SNK_WAIT(inflight_drained),
?SNK_WAIT(replayq_drained),
emqtt:disconnect(ConnPid)
after
ok = emqx_bridge_worker:stop(Worker)
end.
stub_receive(N) ->
stub_receive(N, []).