diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl index 4692561e7..b3fd1316f 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.erl @@ -124,24 +124,31 @@ safe_stop(Pid, StopF, Timeout) -> end. send(Conn, Msgs) -> - send(Conn, Msgs, undefined). -send(_Conn, [], PktId) -> - {ok, PktId}; -send(#{client_pid := ClientPid} = Conn, [Msg | Rest], _PktId) -> + send(Conn, Msgs, []). + +send(_Conn, [], []) -> + %% all messages in the batch are QoS-0 + Ref = make_ref(), + %% QoS-0 messages do not have packet ID + %% the batch ack is simulated with a loop-back message + self() ! {batch_ack, Ref}, + {ok, Ref}; +send(_Conn, [], PktIds) -> + %% PktIds is not an empty list if there is any non-QoS-0 message in the batch, + %% And the worker should wait for all acks + {ok, PktIds}; +send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) -> case emqtt:publish(ClientPid, Msg) of ok -> - Ref = make_ref(), - self() ! {batch_ack, Ref}, - send(Conn, Rest, Ref); + send(Conn, Rest, PktIds); {ok, PktId} -> - send(Conn, Rest, PktId); + send(Conn, Rest, [PktId | PktIds]); {error, Reason} -> %% NOTE: There is no partial sucess of a batch and recover from the middle %% only to retry all messages in one batch {error, Reason} end. - handle_puback(#{packet_id := PktId, reason_code := RC}, Parent) when RC =:= ?RC_SUCCESS; RC =:= ?RC_NO_MATCHING_SUBSCRIBERS -> diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 95b3aa861..157431817 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -320,7 +320,7 @@ idle(state_timeout, reconnect, State) -> connecting(State); idle(info, {batch_ack, Ref}, State) -> - {ok, NewState} = do_ack(State, Ref), + NewState = handle_batch_ack(State, Ref), {keep_state, NewState}; idle(Type, Content, State) -> @@ -355,7 +355,7 @@ connected(info, {disconnected, Conn, Reason}, keep_state_and_data end; connected(info, {batch_ack, Ref}, State) -> - {ok, NewState} = do_ack(State, Ref), + NewState = handle_batch_ack(State, Ref), {keep_state, NewState, {next_event, internal, maybe_send}}; connected(Type, Content, State) -> common(connected, Type, Content, State). @@ -506,30 +506,57 @@ do_send(#{inflight := Inflight, emqx_bridge_msg:to_export(Module, Mountpoint, Message) end, case Module:send(Connection, [ExportMsg(M) || M <- Batch]) of - {ok, Ref} -> + {ok, Refs} -> {ok, State#{inflight := Inflight ++ [#{q_ack_ref => QAckRef, - send_ack_ref => Ref, + send_ack_ref => map_set(Refs), batch => Batch}]}}; {error, Reason} -> - ?LOG(info, "Batch produce failed~p", [Reason]), + ?LOG(info, "batch_produce_failed ~p", [Reason]), {error, State} end. +%% map as set, ack-reference -> 1 +map_set(Ref) when is_reference(Ref) -> + %% QoS-0 or RPC call returns a reference + map_set([Ref]); +map_set(List) -> + map_set(List, #{}). -do_ack(#{inflight := []} = State, Ref) -> - ?LOG(error, "Can't be found from the inflight:~p", [Ref]), - {ok, State}; +map_set([], Set) -> Set; +map_set([H | T], Set) -> map_set(T, Set#{H => 1}). -do_ack(#{inflight := [#{send_ack_ref := Ref, - q_ack_ref := QAckRef}| Rest], replayq := Q} = State, Ref) -> - ok = replayq:ack(Q, QAckRef), - {ok, State#{inflight => Rest}}; +handle_batch_ack(#{inflight := Inflight0, replayq := Q} = State, Ref) -> + Inflight1 = do_ack(Inflight0, Ref), + Inflight = drop_acked_batches(Q, Inflight1), + State#{inflight := Inflight}. -do_ack(#{inflight := [#{q_ack_ref := QAckRef, - batch := Batch}| Rest], replayq := Q} = State, Ref) -> - ok = replayq:ack(Q, QAckRef), - NewQ = replayq:append(Q, Batch), - do_ack(State#{replayq => NewQ, inflight => Rest}, Ref). +do_ack([], Ref) -> + ?LOG(debug, "stale_batch_ack_reference ~p", [Ref]), + []; +do_ack([#{send_ack_ref := Refs} = First | Rest], Ref) -> + case maps:is_key(Ref, Refs) of + true -> + NewRefs = maps:without([Ref], Refs), + [First#{send_ack_ref := NewRefs} | Rest]; + false -> + [First | do_ack(Rest, Ref)] + end. + +%% Drop the consecutive header of the inflight list having empty send_ack_ref +drop_acked_batches(_Q, []) -> []; +drop_acked_batches(Q, [#{send_ack_ref := Refs, + q_ack_ref := QAckRef} | Rest] = All) -> + case maps:size(Refs) of + 0 -> + %% all messages are acked by bridge target + %% now it's safe to ack replayq (delete from disk) + ok = replayq:ack(Q, QAckRef), + %% continue to check more sent batches + drop_acked_batches(Q, Rest); + _ -> + %% the head (oldest) inflight batch is not acked, keep waiting + All + end. subscribe_local_topics(Topics, Name) -> lists:foreach(fun(Topic) -> subscribe_local_topic(Topic, Name) end, Topics).