diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index ad076fc31..dccc681c8 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -312,23 +312,21 @@ stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) -> undefined -> Stash#{Route => {Action, Prio, Ctx}}; RouteOp -> - case merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)) of - undefined -> - maps:remove(Route, Stash); - RouteOpMerged -> - Stash#{Route := RouteOpMerged} - end + RouteOpMerged = merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)), + Stash#{Route := RouteOpMerged} end. merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) -> %% NOTE: This should not happen anyway. _ = replyctx_send(ignored, Ctx1), DestOp; -merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), ?ROUTEOP(_Action2, _Prio2, Ctx2)) -> - %% NOTE: Operations cancel each other. +merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) -> + %% NOTE: Latter cancel the former. + %% Strictly speaking, in ideal conditions we could just cancel both, because they + %% essentially do not change the global state. However, we decided to stay on the + %% safe side and cancel only the former, making batch syncs idempotent. _ = replyctx_send(ok, Ctx1), - _ = replyctx_send(ok, Ctx2), - undefined. + DestOp. %% @@ -398,13 +396,13 @@ batch_test() -> [ {2, ignored}, {1, ok}, - {4, ok}, {5, ok}, - {7, ok}, + {7, ignored}, + {4, ok}, {9, ok}, - {11, ok}, {8, ok}, - {13, ok} + {13, ignored}, + {11, ok} ], emqx_utils_stream:consume(emqx_utils_stream:mqueue(0)) ).