From d6f731c4fce99e62a7977d0bb5e2d374f08fda2d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 10 Jan 2024 19:18:24 +0100 Subject: [PATCH] fix(route-sync): use public function as mria activity target --- apps/emqx/src/emqx_router.erl | 70 +++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 54667065a..3b191ac36 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -46,9 +46,9 @@ ]). %% Mria Activity RPC targets -% -export([ -% mria_insert_route/2 -% ]). +-export([ + mria_batch_run/2 +]). -export([do_batch/1]). @@ -96,9 +96,12 @@ -export_type([dest/0]). -type group() :: binary(). - -type dest() :: node() | {group(), node()}. +%% Operation :: {add, ...} | {delete, ...}. +-type batch() :: #{batch_route() => _Operation :: tuple()}. +-type batch_route() :: {emqx_types:topic(), dest()}. + -record(routeidx, { entry :: '$1' | emqx_topic_index:key(dest()), unused = [] :: nil() @@ -241,11 +244,7 @@ mria_delete_route(v2, Topic, Dest, Ctx) -> mria_delete_route(v1, Topic, Dest, Ctx) -> mria_delete_route_v1(Topic, Dest, Ctx). --spec do_batch(Batch) -> Errors when - %% Operation :: {add, ...} | {delete, ...}. - Batch :: #{Route => _Operation :: tuple()}, - Errors :: #{Route => _Error}, - Route :: {emqx_types:topic(), dest()}. +-spec do_batch(batch()) -> #{batch_route() => _Error}. do_batch(Batch) -> Nodes = batch_get_dest_nodes(Batch), ok = lists:foreach(fun emqx_router_helper:monitor/1, ordsets:to_list(Nodes)), @@ -257,30 +256,15 @@ mria_batch(v1, Batch) -> mria_batch_v1(Batch). mria_batch_v2(Batch) -> - mria:async_dirty(?ROUTE_SHARD, fun mria_batch_run/2, [v2, Batch]). + mria:async_dirty(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v2, Batch]). mria_batch_v1(Batch) -> - {atomic, Res} = mria:transaction(?ROUTE_SHARD, fun mria_batch_run/2, [v1, Batch]), - Res. - -mria_batch_run(SchemaVsn, Batch) -> - maps:fold( - fun({Topic, Dest}, Op, Errors) -> - case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of - ok -> - Errors; - Error -> - Errors#{{Topic, Dest} => Error} - end - end, - #{}, - Batch - ). - -mria_batch_operation(SchemaVsn, add, Topic, Dest) -> - mria_insert_route(SchemaVsn, Topic, Dest, batch); -mria_batch_operation(SchemaVsn, delete, Topic, Dest) -> - mria_delete_route(SchemaVsn, Topic, Dest, batch). + case mria:transaction(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v1, Batch]) of + {atomic, Result} -> + Result; + Error -> + Error + end. batch_get_dest_nodes(Batch) -> maps:fold( @@ -368,6 +352,30 @@ call(Router, Msg) -> pick(Topic) -> gproc_pool:pick_worker(router_pool, Topic). +%%-------------------------------------------------------------------- +%% Route batch RPC targets +%%-------------------------------------------------------------------- + +-spec mria_batch_run(schemavsn(), batch()) -> #{batch_route() => _Error}. +mria_batch_run(SchemaVsn, Batch) -> + maps:fold( + fun({Topic, Dest}, Op, Errors) -> + case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of + ok -> + Errors; + Error -> + Errors#{{Topic, Dest} => Error} + end + end, + #{}, + Batch + ). + +mria_batch_operation(SchemaVsn, add, Topic, Dest) -> + mria_insert_route(SchemaVsn, Topic, Dest, batch); +mria_batch_operation(SchemaVsn, delete, Topic, Dest) -> + mria_delete_route(SchemaVsn, Topic, Dest, batch). + %%-------------------------------------------------------------------- %% Schema v1 %% --------------------------------------------------------------------