diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index bdbeffb4b..d81fc5d88 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -78,7 +78,7 @@ push(Action, Topic, Dest, Opts) -> Worker = gproc_pool:pick_worker(?POOL, Topic), Prio = designate_prio(Action, Opts), Context = mk_push_context(Opts), - Worker ! ?PUSH(Prio, {Action, Topic, Dest, Context}), + _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})), case Context of {MRef, _} -> MRef; @@ -148,7 +148,7 @@ run_batch_loop(Incoming, State = #{stash := Stash0}) -> ok = send_replies(Errors, Batch), NState = State#{stash := Stash3}, %% TODO: postpone if only ?PRIO_BG operations left? - case stash_empty(Stash3) of + case is_stash_empty(Stash3) of true -> NState; false -> @@ -209,7 +209,8 @@ send_replies(Errors, Batch) -> replyctx_send(_Result, []) -> noreply; replyctx_send(Result, {MRef, Pid}) -> - Pid ! {MRef, Result}. + _ = erlang:send(Pid, {MRef, Result}), + ok. %% @@ -221,7 +222,7 @@ run_batch(Batch) -> stash_new() -> #{}. -stash_empty(Stash) -> +is_stash_empty(Stash) -> maps:size(Stash) =:= 0. stash_drain(Stash) ->