diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index df2c3ff07..133b3b621 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -368,6 +368,18 @@ publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) -> {ok, PktId} = emqx_client:publish(ClientPid, Msg), publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]). +delete(PktId, State = #state{ replayq = ReplayQ, + queue_option = #{ mem_cache := false }}) -> + {NewReplayQ, NewAckRef, Msgs} = replayq:pop(ReplayQ, #{count_limit => 1}), + case Msgs of + [{PktId, Msg}] -> + logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msg]), + replayq:ack(ReplayQ, NewAckRef), + State#state{ replayq = NewReplayQ, ackref = NewAckRef}; + _ -> + self() ! pop, + State + end; delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) -> ok = replayq:ack(ReplayQ, AckRef), self() ! pop, @@ -380,8 +392,11 @@ delete(PktId, State = #state{readq = ReadQ}) -> State#state{readq = lists:keydelete(PktId, 1, ReadQ)}. bridge(Action, State = #state{options = Options, - replayq = ReplayQ, - queue_option = QueueOption}) -> + replayq = ReplayQ, + queue_option + = QueueOption + = #{batch_size := BatchSize}}) + when BatchSize > 0 -> case emqx_client:start_link([{owner, self()}|options(Options)]) of {ok, ClientPid} -> case emqx_client:connect(ClientPid) of @@ -406,7 +421,10 @@ bridge(Action, State = #state{options = Options, {error, Reason} -> emqx_logger:error("[Bridge] ~p failed! error: ~p", [Action, Reason]), {<<"start bridge failed">>, State} - end. + end; +bridge(Action, State) -> + emqx_logger:error("[Bridge] ~p failed! error: batch_size should greater than zero", [Action]), + {<<"Open Replayq failed">>, State}. open_replayq(undefined, #{batch_size := BatchSize, replayq_dir := ReplayqDir,