Fix mem cache bug (#2129)
This commit is contained in:
parent
0d9929fdaa
commit
dee88fb018
|
@ -368,6 +368,18 @@ publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) ->
|
||||||
{ok, PktId} = emqx_client:publish(ClientPid, Msg),
|
{ok, PktId} = emqx_client:publish(ClientPid, Msg),
|
||||||
publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]).
|
publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]).
|
||||||
|
|
||||||
|
delete(PktId, State = #state{ replayq = ReplayQ,
|
||||||
|
queue_option = #{ mem_cache := false }}) ->
|
||||||
|
{NewReplayQ, NewAckRef, Msgs} = replayq:pop(ReplayQ, #{count_limit => 1}),
|
||||||
|
case Msgs of
|
||||||
|
[{PktId, Msg}] ->
|
||||||
|
logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msg]),
|
||||||
|
replayq:ack(ReplayQ, NewAckRef),
|
||||||
|
State#state{ replayq = NewReplayQ, ackref = NewAckRef};
|
||||||
|
_ ->
|
||||||
|
self() ! pop,
|
||||||
|
State
|
||||||
|
end;
|
||||||
delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) ->
|
delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) ->
|
||||||
ok = replayq:ack(ReplayQ, AckRef),
|
ok = replayq:ack(ReplayQ, AckRef),
|
||||||
self() ! pop,
|
self() ! pop,
|
||||||
|
@ -380,8 +392,11 @@ delete(PktId, State = #state{readq = ReadQ}) ->
|
||||||
State#state{readq = lists:keydelete(PktId, 1, ReadQ)}.
|
State#state{readq = lists:keydelete(PktId, 1, ReadQ)}.
|
||||||
|
|
||||||
bridge(Action, State = #state{options = Options,
|
bridge(Action, State = #state{options = Options,
|
||||||
replayq = ReplayQ,
|
replayq = ReplayQ,
|
||||||
queue_option = QueueOption}) ->
|
queue_option
|
||||||
|
= QueueOption
|
||||||
|
= #{batch_size := BatchSize}})
|
||||||
|
when BatchSize > 0 ->
|
||||||
case emqx_client:start_link([{owner, self()}|options(Options)]) of
|
case emqx_client:start_link([{owner, self()}|options(Options)]) of
|
||||||
{ok, ClientPid} ->
|
{ok, ClientPid} ->
|
||||||
case emqx_client:connect(ClientPid) of
|
case emqx_client:connect(ClientPid) of
|
||||||
|
@ -406,7 +421,10 @@ bridge(Action, State = #state{options = Options,
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
emqx_logger:error("[Bridge] ~p failed! error: ~p", [Action, Reason]),
|
emqx_logger:error("[Bridge] ~p failed! error: ~p", [Action, Reason]),
|
||||||
{<<"start bridge failed">>, State}
|
{<<"start bridge failed">>, State}
|
||||||
end.
|
end;
|
||||||
|
bridge(Action, State) ->
|
||||||
|
emqx_logger:error("[Bridge] ~p failed! error: batch_size should greater than zero", [Action]),
|
||||||
|
{<<"Open Replayq failed">>, State}.
|
||||||
|
|
||||||
open_replayq(undefined, #{batch_size := BatchSize,
|
open_replayq(undefined, #{batch_size := BatchSize,
|
||||||
replayq_dir := ReplayqDir,
|
replayq_dir := ReplayqDir,
|
||||||
|
|
Loading…
Reference in New Issue