Fix bridge bug (#2160)

* Fix bridge bug

* Fix ack bug

* Limit bridge QoS less than 1
This commit is contained in:
Gilbert 2019-01-22 09:42:32 +08:00 committed by turtleDeng
parent 067d28dcb6
commit 55ec358cd6
1 changed files with 36 additions and 21 deletions

View File

@ -114,12 +114,12 @@ init([Options]) ->
ReconnectInterval = get_value(reconnect_interval, Options, 30000),
Mountpoint = format_mountpoint(get_value(mountpoint, Options)),
QueueOptions = get_value(queue, Options),
{ok, #state{mountpoint = Mountpoint,
queue_option = QueueOptions,
readq = [],
writeq = [],
options = Options,
reconnect_interval = ReconnectInterval}}.
{ok, #state{mountpoint = Mountpoint,
queue_option = QueueOptions,
readq = [],
writeq = [],
options = Options,
reconnect_interval = ReconnectInterval}}.
handle_call(start_bridge, _From, State = #state{client_pid = undefined}) ->
{Msg, NewState} = bridge(start, State),
@ -228,16 +228,19 @@ handle_info(replay, State = #state{client_pid = ClientPid, readq = ReadQ}) ->
%%----------------------------------------------------------------
%% received local node message
%%----------------------------------------------------------------
handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}},
handle_info({dispatch, _, #message{topic = Topic, qos = QoS, payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = undefined,
mountpoint = Mountpoint}) ->
mountpoint = Mountpoint})
when QoS =< 1 ->
Msg = #mqtt_msg{qos = 1,
retain = Retain,
topic = mountpoint(Mountpoint, Topic),
payload = Payload},
{noreply, en_writeq({undefined, Msg}, State)};
handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = Pid, mountpoint = Mountpoint}) ->
handle_info({dispatch, _, #message{topic = Topic, qos = QoS ,payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = Pid,
mountpoint = Mountpoint})
when QoS =< 1 ->
Msg = #mqtt_msg{qos = 1,
retain = Retain,
topic = mountpoint(Mountpoint, Topic),
@ -347,7 +350,6 @@ format_mountpoint(undefined) ->
format_mountpoint(Prefix) ->
binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
en_writeq(Msg, State = #state{replayq = ReplayQ,
queue_option = #{mem_cache := false}}) ->
NewReplayQ = replayq:append(ReplayQ, [Msg]),
@ -369,16 +371,21 @@ publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) ->
publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]).
delete(PktId, State = #state{ replayq = ReplayQ,
queue_option = #{ mem_cache := false }}) ->
readq = [],
queue_option = #{ mem_cache := false}}) ->
{NewReplayQ, NewAckRef, Msgs} = replayq:pop(ReplayQ, #{count_limit => 1}),
logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msgs]),
ok = replayq:ack(NewReplayQ, NewAckRef),
case Msgs of
[{PktId, Msg}] ->
logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msg]),
replayq:ack(ReplayQ, NewAckRef),
State#state{ replayq = NewReplayQ, ackref = NewAckRef};
_ ->
[{PktId, _Msg}] ->
self() ! pop,
State
State#state{ replayq = NewReplayQ, ackref = NewAckRef };
[{_PktId, _Msg}] ->
NewReplayQ1 = replayq:append(NewReplayQ, Msgs),
self() ! pop,
State#state{ replayq = NewReplayQ1, ackref = NewAckRef };
_Empty ->
State#state{ replayq = NewReplayQ, ackref = NewAckRef}
end;
delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) ->
ok = replayq:ack(ReplayQ, AckRef),
@ -388,8 +395,16 @@ delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref
delete(PktId, State = #state{readq = [], writeq = WriteQ}) ->
State#state{writeq = lists:keydelete(PktId, 1, WriteQ)};
delete(PktId, State = #state{readq = ReadQ}) ->
State#state{readq = lists:keydelete(PktId, 1, ReadQ)}.
delete(PktId, State = #state{readq = ReadQ, replayq = ReplayQ, ackref = AckRef}) ->
NewReadQ = lists:keydelete(PktId, 1, ReadQ),
case NewReadQ of
[] ->
ok = replayq:ack(ReplayQ, AckRef),
self() ! pop;
_NewReadQ ->
ok
end,
State#state{ readq = NewReadQ }.
bridge(Action, State = #state{options = Options,
replayq = ReplayQ,
@ -397,7 +412,7 @@ bridge(Action, State = #state{options = Options,
= QueueOption
= #{batch_size := BatchSize}})
when BatchSize > 0 ->
case emqx_client:start_link([{owner, self()}|options(Options)]) of
case emqx_client:start_link([{owner, self()} | options(Options)]) of
{ok, ClientPid} ->
case emqx_client:connect(ClientPid) of
{ok, _} ->