fix(route-sync): use public function as mria activity target

This commit is contained in:
Andrew Mayorov 2024-01-10 19:18:24 +01:00
parent 7d037cfe91
commit d6f731c4fc
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
1 changed files with 39 additions and 31 deletions

View File

@ -46,9 +46,9 @@
]). ]).
%% Mria Activity RPC targets %% Mria Activity RPC targets
% -export([ -export([
% mria_insert_route/2 mria_batch_run/2
% ]). ]).
-export([do_batch/1]). -export([do_batch/1]).
@ -96,9 +96,12 @@
-export_type([dest/0]). -export_type([dest/0]).
-type group() :: binary(). -type group() :: binary().
-type dest() :: node() | {group(), node()}. -type dest() :: node() | {group(), node()}.
%% Operation :: {add, ...} | {delete, ...}.
-type batch() :: #{batch_route() => _Operation :: tuple()}.
-type batch_route() :: {emqx_types:topic(), dest()}.
-record(routeidx, { -record(routeidx, {
entry :: '$1' | emqx_topic_index:key(dest()), entry :: '$1' | emqx_topic_index:key(dest()),
unused = [] :: nil() unused = [] :: nil()
@ -241,11 +244,7 @@ mria_delete_route(v2, Topic, Dest, Ctx) ->
mria_delete_route(v1, Topic, Dest, Ctx) -> mria_delete_route(v1, Topic, Dest, Ctx) ->
mria_delete_route_v1(Topic, Dest, Ctx). mria_delete_route_v1(Topic, Dest, Ctx).
-spec do_batch(Batch) -> Errors when -spec do_batch(batch()) -> #{batch_route() => _Error}.
%% Operation :: {add, ...} | {delete, ...}.
Batch :: #{Route => _Operation :: tuple()},
Errors :: #{Route => _Error},
Route :: {emqx_types:topic(), dest()}.
do_batch(Batch) -> do_batch(Batch) ->
Nodes = batch_get_dest_nodes(Batch), Nodes = batch_get_dest_nodes(Batch),
ok = lists:foreach(fun emqx_router_helper:monitor/1, ordsets:to_list(Nodes)), ok = lists:foreach(fun emqx_router_helper:monitor/1, ordsets:to_list(Nodes)),
@ -257,30 +256,15 @@ mria_batch(v1, Batch) ->
mria_batch_v1(Batch). mria_batch_v1(Batch).
mria_batch_v2(Batch) -> mria_batch_v2(Batch) ->
mria:async_dirty(?ROUTE_SHARD, fun mria_batch_run/2, [v2, Batch]). mria:async_dirty(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v2, Batch]).
mria_batch_v1(Batch) -> mria_batch_v1(Batch) ->
{atomic, Res} = mria:transaction(?ROUTE_SHARD, fun mria_batch_run/2, [v1, Batch]), case mria:transaction(?ROUTE_SHARD, fun ?MODULE:mria_batch_run/2, [v1, Batch]) of
Res. {atomic, Result} ->
Result;
mria_batch_run(SchemaVsn, Batch) ->
maps:fold(
fun({Topic, Dest}, Op, Errors) ->
case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of
ok ->
Errors;
Error -> Error ->
Errors#{{Topic, Dest} => Error} Error
end end.
end,
#{},
Batch
).
mria_batch_operation(SchemaVsn, add, Topic, Dest) ->
mria_insert_route(SchemaVsn, Topic, Dest, batch);
mria_batch_operation(SchemaVsn, delete, Topic, Dest) ->
mria_delete_route(SchemaVsn, Topic, Dest, batch).
batch_get_dest_nodes(Batch) -> batch_get_dest_nodes(Batch) ->
maps:fold( maps:fold(
@ -368,6 +352,30 @@ call(Router, Msg) ->
pick(Topic) -> pick(Topic) ->
gproc_pool:pick_worker(router_pool, Topic). gproc_pool:pick_worker(router_pool, Topic).
%%--------------------------------------------------------------------
%% Route batch RPC targets
%%--------------------------------------------------------------------
-spec mria_batch_run(schemavsn(), batch()) -> #{batch_route() => _Error}.
mria_batch_run(SchemaVsn, Batch) ->
maps:fold(
fun({Topic, Dest}, Op, Errors) ->
case mria_batch_operation(SchemaVsn, batch_get_action(Op), Topic, Dest) of
ok ->
Errors;
Error ->
Errors#{{Topic, Dest} => Error}
end
end,
#{},
Batch
).
mria_batch_operation(SchemaVsn, add, Topic, Dest) ->
mria_insert_route(SchemaVsn, Topic, Dest, batch);
mria_batch_operation(SchemaVsn, delete, Topic, Dest) ->
mria_delete_route(SchemaVsn, Topic, Dest, batch).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Schema v1 %% Schema v1
%% -------------------------------------------------------------------- %% --------------------------------------------------------------------