fix(route-sync): ensure batch sync preserve idemopotency
This commit is contained in:
parent
e21a3497c7
commit
2f98f1faaf
|
@ -312,23 +312,21 @@ stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
Stash#{Route => {Action, Prio, Ctx}};
|
Stash#{Route => {Action, Prio, Ctx}};
|
||||||
RouteOp ->
|
RouteOp ->
|
||||||
case merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)) of
|
RouteOpMerged = merge_route_op(RouteOp, ?ROUTEOP(Action, Prio, Ctx)),
|
||||||
undefined ->
|
Stash#{Route := RouteOpMerged}
|
||||||
maps:remove(Route, Stash);
|
|
||||||
RouteOpMerged ->
|
|
||||||
Stash#{Route := RouteOpMerged}
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) ->
|
merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) ->
|
||||||
%% NOTE: This should not happen anyway.
|
%% NOTE: This should not happen anyway.
|
||||||
_ = replyctx_send(ignored, Ctx1),
|
_ = replyctx_send(ignored, Ctx1),
|
||||||
DestOp;
|
DestOp;
|
||||||
merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), ?ROUTEOP(_Action2, _Prio2, Ctx2)) ->
|
merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) ->
|
||||||
%% NOTE: Operations cancel each other.
|
%% NOTE: Latter cancel the former.
|
||||||
|
%% Strictly speaking, in ideal conditions we could just cancel both, because they
|
||||||
|
%% essentially do not change the global state. However, we decided to stay on the
|
||||||
|
%% safe side and cancel only the former, making batch syncs idempotent.
|
||||||
_ = replyctx_send(ok, Ctx1),
|
_ = replyctx_send(ok, Ctx1),
|
||||||
_ = replyctx_send(ok, Ctx2),
|
DestOp.
|
||||||
undefined.
|
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
|
@ -398,13 +396,13 @@ batch_test() ->
|
||||||
[
|
[
|
||||||
{2, ignored},
|
{2, ignored},
|
||||||
{1, ok},
|
{1, ok},
|
||||||
{4, ok},
|
|
||||||
{5, ok},
|
{5, ok},
|
||||||
{7, ok},
|
{7, ignored},
|
||||||
|
{4, ok},
|
||||||
{9, ok},
|
{9, ok},
|
||||||
{11, ok},
|
|
||||||
{8, ok},
|
{8, ok},
|
||||||
{13, ok}
|
{13, ignored},
|
||||||
|
{11, ok}
|
||||||
],
|
],
|
||||||
emqx_utils_stream:consume(emqx_utils_stream:mqueue(0))
|
emqx_utils_stream:consume(emqx_utils_stream:mqueue(0))
|
||||||
).
|
).
|
||||||
|
|
Loading…
Reference in New Issue