From 72791b569e3302e5473ffa978e7dcb09cfe8d885 Mon Sep 17 00:00:00 2001 From: Gilbert Date: Fri, 28 Dec 2018 20:04:52 +0800 Subject: [PATCH] Improve app src (#2114) * Add replayq in emqx.app.src * Fix publish_readq_msg badmatch --- src/emqx_bridge.erl | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 46107e877..55bf162eb 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -448,12 +448,11 @@ en_writeq(Msg, State = #state{writeq = WriteQ, replayq = ReplayQ, NewReplayQ =replayq:append(ReplayQ, lists:reverse(WriteQ)), State#state{writeq = [Msg], replayq = NewReplayQ}. -publish_readq_msg(_ClientPid, [], ReadQ) -> - {ok, ReadQ}; -publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], ReadQ) -> - io:format("~n replay msg: ~p ~n", [Msg]), +publish_readq_msg(_ClientPid, [], NewReadQ) -> + {ok, NewReadQ}; +publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) -> {ok, PktId} = emqx_client:publish(ClientPid, Msg), - publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | ReadQ]). + publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]). delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) -> ok = replayq:ack(ReplayQ, AckRef),