fix(bridge_mqtt): fix inflight reference booking

Prior to this change, the inflight batches are referenced
by the last packet ID for non-QoS-0 messages, other packet
IDs sent back from downstream causes an error log:
"Can't be found from the inflight"

Even worse, the batch is appended back to the queue for retry.
This commit is contained in:
Zaiming Shi 2021-04-08 14:26:38 +02:00 committed by Zaiming (Stone) Shi
parent 2b8c1efd1d
commit ae688e2c90
2 changed files with 60 additions and 26 deletions

View File

@ -124,24 +124,31 @@ safe_stop(Pid, StopF, Timeout) ->
end.
send(Conn, Msgs) ->
send(Conn, Msgs, undefined).
send(_Conn, [], PktId) ->
{ok, PktId};
send(#{client_pid := ClientPid} = Conn, [Msg | Rest], _PktId) ->
send(Conn, Msgs, []).
send(_Conn, [], []) ->
%% all messages in the batch are QoS-0
Ref = make_ref(),
%% QoS-0 messages do not have packet ID
%% the batch ack is simulated with a loop-back message
self() ! {batch_ack, Ref},
{ok, Ref};
send(_Conn, [], PktIds) ->
%% PktIds is not an empty list if there is any non-QoS-0 message in the batch,
%% And the worker should wait for all acks
{ok, PktIds};
send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) ->
case emqtt:publish(ClientPid, Msg) of
ok ->
Ref = make_ref(),
self() ! {batch_ack, Ref},
send(Conn, Rest, Ref);
send(Conn, Rest, PktIds);
{ok, PktId} ->
send(Conn, Rest, PktId);
send(Conn, Rest, [PktId | PktIds]);
{error, Reason} ->
%% NOTE: There is no partial sucess of a batch and recover from the middle
%% only to retry all messages in one batch
{error, Reason}
end.
handle_puback(#{packet_id := PktId, reason_code := RC}, Parent)
when RC =:= ?RC_SUCCESS;
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS ->

View File

@ -320,7 +320,7 @@ idle(state_timeout, reconnect, State) ->
connecting(State);
idle(info, {batch_ack, Ref}, State) ->
{ok, NewState} = do_ack(State, Ref),
NewState = handle_batch_ack(State, Ref),
{keep_state, NewState};
idle(Type, Content, State) ->
@ -355,7 +355,7 @@ connected(info, {disconnected, Conn, Reason},
keep_state_and_data
end;
connected(info, {batch_ack, Ref}, State) ->
{ok, NewState} = do_ack(State, Ref),
NewState = handle_batch_ack(State, Ref),
{keep_state, NewState, {next_event, internal, maybe_send}};
connected(Type, Content, State) ->
common(connected, Type, Content, State).
@ -506,30 +506,57 @@ do_send(#{inflight := Inflight,
emqx_bridge_msg:to_export(Module, Mountpoint, Message)
end,
case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of
{ok, Ref} ->
{ok, Refs} ->
{ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef,
send_ack_ref => Ref,
send_ack_ref => map_set(Refs),
batch => Batch}]}};
{error, Reason} ->
?LOG(info, "Batch produce failed~p", [Reason]),
?LOG(info, "batch_produce_failed ~p", [Reason]),
{error, State}
end.
%% map as set, ack-reference -> 1
map_set(Ref) when is_reference(Ref) ->
%% QoS-0 or RPC call returns a reference
map_set([Ref]);
map_set(List) ->
map_set(List, #{}).
do_ack(#{inflight := []} = State, Ref) ->
?LOG(error, "Can't be found from the inflight:~p", [Ref]),
{ok, State};
map_set([], Set) -> Set;
map_set([H | T], Set) -> map_set(T, Set#{H => 1}).
do_ack(#{inflight := [#{send_ack_ref := Ref,
q_ack_ref := QAckRef}| Rest], replayq := Q} = State, Ref) ->
ok = replayq:ack(Q, QAckRef),
{ok, State#{inflight => Rest}};
handle_batch_ack(#{inflight := Inflight0, replayq := Q} = State, Ref) ->
Inflight1 = do_ack(Inflight0, Ref),
Inflight = drop_acked_batches(Q, Inflight1),
State#{inflight := Inflight}.
do_ack(#{inflight := [#{q_ack_ref := QAckRef,
batch := Batch}| Rest], replayq := Q} = State, Ref) ->
ok = replayq:ack(Q, QAckRef),
NewQ = replayq:append(Q, Batch),
do_ack(State#{replayq => NewQ, inflight => Rest}, Ref).
do_ack([], Ref) ->
?LOG(debug, "stale_batch_ack_reference ~p", [Ref]),
[];
do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) ->
case maps:is_key(Ref, Refs) of
true ->
NewRefs = maps:without([Ref], Refs),
[First#{send_ack_ref := NewRefs} | Rest];
false ->
[First | do_ack(Rest, Ref)]
end.
%% Drop the consecutive header of the inflight list having empty send_ack_ref
drop_acked_batches(_Q, []) -> [];
drop_acked_batches(Q, [#{send_ack_ref := Refs,
q_ack_ref := QAckRef} | Rest] = All) ->
case maps:size(Refs) of
0 ->
%% all messages are acked by bridge target
%% now it's safe to ack replayq (delete from disk)
ok = replayq:ack(Q, QAckRef),
%% continue to check more sent batches
drop_acked_batches(Q, Rest);
_ ->
%% the head (oldest) inflight batch is not acked, keep waiting
All
end.
subscribe_local_topics(Topics, Name) ->
lists:foreach(fun(Topic) -> subscribe_local_topic(Topic, Name) end, Topics).