diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 133b3b621..5d3ae3e91 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -114,12 +114,12 @@ init([Options]) -> ReconnectInterval = get_value(reconnect_interval, Options, 30000), Mountpoint = format_mountpoint(get_value(mountpoint, Options)), QueueOptions = get_value(queue, Options), - {ok, #state{mountpoint = Mountpoint, - queue_option = QueueOptions, - readq = [], - writeq = [], - options = Options, - reconnect_interval = ReconnectInterval}}. + {ok, #state{mountpoint = Mountpoint, + queue_option = QueueOptions, + readq = [], + writeq = [], + options = Options, + reconnect_interval = ReconnectInterval}}. handle_call(start_bridge, _From, State = #state{client_pid = undefined}) -> {Msg, NewState} = bridge(start, State), @@ -228,16 +228,19 @@ handle_info(replay, State = #state{client_pid = ClientPid, readq = ReadQ}) -> %%---------------------------------------------------------------- %% received local node message %%---------------------------------------------------------------- -handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, +handle_info({dispatch, _, #message{topic = Topic, qos = QoS, payload = Payload, flags = #{retain := Retain}}}, State = #state{client_pid = undefined, - mountpoint = Mountpoint}) -> + mountpoint = Mountpoint}) + when QoS =< 1 -> Msg = #mqtt_msg{qos = 1, retain = Retain, topic = mountpoint(Mountpoint, Topic), payload = Payload}, {noreply, en_writeq({undefined, Msg}, State)}; -handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}}, - State = #state{client_pid = Pid, mountpoint = Mountpoint}) -> +handle_info({dispatch, _, #message{topic = Topic, qos = QoS ,payload = Payload, flags = #{retain := Retain}}}, + State = #state{client_pid = Pid, + mountpoint = Mountpoint}) + when QoS =< 1 -> Msg = #mqtt_msg{qos = 1, retain = Retain, topic = mountpoint(Mountpoint, Topic), @@ -347,7 +350,6 @@ format_mountpoint(undefined) -> format_mountpoint(Prefix) -> binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). - en_writeq(Msg, State = #state{replayq = ReplayQ, queue_option = #{mem_cache := false}}) -> NewReplayQ = replayq:append(ReplayQ, [Msg]), @@ -369,16 +371,21 @@ publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) -> publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]). delete(PktId, State = #state{ replayq = ReplayQ, - queue_option = #{ mem_cache := false }}) -> + readq = [], + queue_option = #{ mem_cache := false}}) -> {NewReplayQ, NewAckRef, Msgs} = replayq:pop(ReplayQ, #{count_limit => 1}), + logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msgs]), + ok = replayq:ack(NewReplayQ, NewAckRef), case Msgs of - [{PktId, Msg}] -> - logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msg]), - replayq:ack(ReplayQ, NewAckRef), - State#state{ replayq = NewReplayQ, ackref = NewAckRef}; - _ -> + [{PktId, _Msg}] -> self() ! pop, - State + State#state{ replayq = NewReplayQ, ackref = NewAckRef }; + [{_PktId, _Msg}] -> + NewReplayQ1 = replayq:append(NewReplayQ, Msgs), + self() ! pop, + State#state{ replayq = NewReplayQ1, ackref = NewAckRef }; + _Empty -> + State#state{ replayq = NewReplayQ, ackref = NewAckRef} end; delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) -> ok = replayq:ack(ReplayQ, AckRef), @@ -388,8 +395,16 @@ delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref delete(PktId, State = #state{readq = [], writeq = WriteQ}) -> State#state{writeq = lists:keydelete(PktId, 1, WriteQ)}; -delete(PktId, State = #state{readq = ReadQ}) -> - State#state{readq = lists:keydelete(PktId, 1, ReadQ)}. +delete(PktId, State = #state{readq = ReadQ, replayq = ReplayQ, ackref = AckRef}) -> + NewReadQ = lists:keydelete(PktId, 1, ReadQ), + case NewReadQ of + [] -> + ok = replayq:ack(ReplayQ, AckRef), + self() ! pop; + _NewReadQ -> + ok + end, + State#state{ readq = NewReadQ }. bridge(Action, State = #state{options = Options, replayq = ReplayQ, @@ -397,7 +412,7 @@ bridge(Action, State = #state{options = Options, = QueueOption = #{batch_size := BatchSize}}) when BatchSize > 0 -> - case emqx_client:start_link([{owner, self()}|options(Options)]) of + case emqx_client:start_link([{owner, self()} | options(Options)]) of {ok, ClientPid} -> case emqx_client:connect(ClientPid) of {ok, _} ->