diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index a694a7134..e10ed399c 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -16,6 +16,7 @@ -module(emqx_router_syncer). +-include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). -behaviour(gen_server). @@ -25,6 +26,8 @@ -export([push/4]). -export([wait/1]). +-export([stats/0]). + -export([ init/1, handle_call/3, @@ -40,6 +43,16 @@ -define(MAX_BATCH_SIZE, 1000). -define(MIN_SYNC_INTERVAL, 1). +%% How long (ms) to idle after observing a batch sync error? +%% Should help to avoid excessive retries in situations when errors are caused by +%% conditions that take some time to resolve (e.g. restarting an upstream core node). +-define(ERROR_DELAY, 10). + +%% How soon (ms) to retry last failed batch sync attempt? +%% Only matter in absence of new operations, otherwise batch sync is triggered as +%% soon as `?ERROR_DELAY` is over. +-define(ERROR_RETRY_INTERVAL, 500). + -define(PRIO_HI, 1). -define(PRIO_LO, 2). -define(PRIO_BG, 3). @@ -117,16 +130,36 @@ mk_push_context(_) -> %% +-type stats() :: #{ + size := non_neg_integer(), + n_add := non_neg_integer(), + n_delete := non_neg_integer(), + prio_highest := non_neg_integer() | undefined, + prio_lowest := non_neg_integer() | undefined +}. + +-spec stats() -> [stats()]. +stats() -> + Workers = gproc_pool:active_workers(?POOL), + [gen_server:call(Pid, stats, infinity) || {_Name, Pid} <- Workers]. + +%% + init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{stash => stash_new()}}. +handle_call(stats, _From, State = #{stash := Stash}) -> + {reply, stash_stats(Stash), State}; handle_call(_Call, _From, State) -> {reply, ignored, State}. handle_cast(_Msg, State) -> {noreply, State}. +handle_info({timeout, _TRef, retry}, State) -> + NState = run_batch_loop([], maps:remove(retry_timer, State)), + {noreply, NState}; handle_info(Push = ?PUSH(_, _), State) -> %% NOTE: Wait a bit to collect potentially overlapping operations. ok = timer:sleep(?MIN_SYNC_INTERVAL), @@ -142,26 +175,41 @@ run_batch_loop(Incoming, State = #{stash := Stash0}) -> Stash1 = stash_add(Incoming, Stash0), Stash2 = stash_drain(Stash1), {Batch, Stash3} = mk_batch(Stash2), - ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, #{ - size => maps:size(Batch), - stashed => maps:size(Stash3), - n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Batch)), - n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Batch)), - prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Batch), - prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Batch) - }), - %% TODO: retry if error? - Errors = run_batch(Batch), - ok = send_replies(Errors, Batch), - NState = State#{stash := Stash3}, - %% TODO: postpone if only ?PRIO_BG operations left? - case is_stash_empty(Stash3) of - true -> - NState; - false -> - run_batch_loop([], NState) + ?tp_ignore_side_effects_in_prod(router_syncer_new_batch, batch_stats(Batch, Stash3)), + case run_batch(Batch) of + Status = #{} -> + ok = send_replies(Status, Batch), + NState = cancel_retry_timer(State#{stash := Stash3}), + %% TODO: postpone if only ?PRIO_BG operations left? + case is_stash_empty(Stash3) of + true -> + NState; + false -> + run_batch_loop([], NState) + end; + BatchError -> + ?SLOG(warning, #{ + msg => "router_batch_sync_failed", + reason => BatchError, + batch => batch_stats(Batch, Stash3) + }), + NState = State#{stash := Stash2}, + ok = timer:sleep(?ERROR_DELAY), + ensure_retry_timer(NState) end. +ensure_retry_timer(State = #{retry_timer := _TRef}) -> + State; +ensure_retry_timer(State) -> + TRef = emqx_utils:start_timer(?ERROR_RETRY_INTERVAL, retry), + State#{retry_timer => TRef}. + +cancel_retry_timer(State = #{retry_timer := TRef}) -> + ok = emqx_utils:cancel_timer(TRef), + maps:remove(retry_timer, State); +cancel_retry_timer(State) -> + State. + %% mk_batch(Stash) when map_size(Stash) =< ?MAX_BATCH_SIZE -> @@ -222,7 +270,7 @@ replyctx_send(Result, {MRef, Pid}) -> %% run_batch(Batch) when map_size(Batch) > 0 -> - emqx_router:do_batch(Batch); + catch emqx_router:do_batch(Batch); run_batch(_Empty) -> #{}. @@ -275,6 +323,23 @@ merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), ?ROUTEOP(_Action2, _Prio2, Ctx2 %% +batch_stats(Batch, Stash) -> + BatchStats = stash_stats(Batch), + BatchStats#{ + stashed => maps:size(Stash) + }. + +stash_stats(Stash) -> + #{ + size => maps:size(Stash), + n_add => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == add end, Stash)), + n_delete => maps:size(maps:filter(fun(_, ?ROUTEOP(A)) -> A == delete end, Stash)), + prio_highest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> min(P, M) end, none, Stash), + prio_lowest => maps:fold(fun(_, ?ROUTEOP(_, P), M) -> max(P, M) end, 0, Stash) + }. + +%% + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 70b4eaf51..96ca0b297 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -39,21 +39,21 @@ groups() -> {group, batch_sync_replicants}, {group, batch_sync_off} ], - GroupBase = [ - {group, cluster}, - t_concurrent_routing_updates - ], ClusterTCs = [ t_cluster_routing, t_slow_rlog_routing_consistency ], + SingleTCs = [t_concurrent_routing_updates], + BatchSyncTCs = lists:duplicate(5, t_concurrent_routing_updates_with_errors), [ {routing_schema_v1, [], GroupVsn}, {routing_schema_v2, [], GroupVsn}, - {batch_sync_on, [], GroupBase}, - {batch_sync_replicants, [], GroupBase}, - {batch_sync_off, [], GroupBase}, - {cluster, [], ClusterTCs} + {batch_sync_on, [], [{group, cluster}, {group, single_batch_on}]}, + {batch_sync_replicants, [], [{group, cluster}, {group, single}]}, + {batch_sync_off, [], [{group, cluster}, {group, single}]}, + {cluster, [], ClusterTCs}, + {single_batch_on, [], SingleTCs ++ BatchSyncTCs}, + {single, [], SingleTCs} ]. init_per_group(routing_schema_v1, Config) -> @@ -74,10 +74,38 @@ init_per_group(cluster, Config) -> {emqx_routing_SUITE3, #{apps => [mk_emqx_appspec(3, Config)], role => replicant}} ], Nodes = emqx_cth_cluster:start(NodeSpecs, #{work_dir => WorkDir}), - [{cluster, Nodes} | Config]. + [{cluster, Nodes} | Config]; +init_per_group(GroupName, Config) when + GroupName =:= single_batch_on; + GroupName =:= single +-> + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + Apps = emqx_cth_suite:start( + [ + {emqx, #{ + config => mk_config_broker(Config), + %% NOTE + %% Artificially increasing pool workers contention by forcing small pool size. + before_start => fun() -> + % NOTE + % This one is actually defined on `emqx_conf_schema` level, but used + % in `emqx_broker`. Thus we have to resort to this ugly hack. + emqx_config:force_put([node, broker_pool_size], 2), + emqx_app:set_config_loader(?MODULE) + end + }} + ], + #{work_dir => WorkDir} + ), + [{group_apps, Apps} | Config]. end_per_group(cluster, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)); +end_per_group(GroupName, Config) when + GroupName =:= single_batch_on; + GroupName =:= single +-> + emqx_cth_suite:stop(?config(group_apps, Config)); end_per_group(_, _Config) -> ok. @@ -226,29 +254,10 @@ unsubscribe(C, Topic) -> ]). t_concurrent_routing_updates(init, Config) -> - WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), - Apps = emqx_cth_suite:start( - [ - {emqx, #{ - config => mk_config_broker(Config), - %% NOTE - %% Artificially increasing pool workers contention by forcing small pool size. - before_start => fun() -> - % NOTE - % This one is actually defined on `emqx_conf_schema` level, but used - % in `emqx_broker`. Thus we have to resort to this ugly hack. - emqx_config:force_put([node, broker_pool_size], 2), - emqx_app:set_config_loader(?MODULE) - end - }} - ], - #{work_dir => WorkDir} - ), ok = snabbkaffe:start_trace(), - [{tc_apps, Apps} | Config]; -t_concurrent_routing_updates('end', Config) -> - ok = snabbkaffe:stop(), - ok = emqx_cth_suite:stop(?config(tc_apps, Config)). + Config; +t_concurrent_routing_updates('end', _Config) -> + ok = snabbkaffe:stop(). t_concurrent_routing_updates(_Config) -> NClients = 400, @@ -270,6 +279,51 @@ t_concurrent_routing_updates(_Config) -> ?assertEqual([], ets:tab2list(?SUBSCRIBER)), ?assertEqual([], emqx_router:topics()). +t_concurrent_routing_updates_with_errors(init, Config) -> + ok = snabbkaffe:start_trace(), + ok = meck:new(emqx_router, [passthrough, no_history]), + Config; +t_concurrent_routing_updates_with_errors('end', _Config) -> + ok = meck:unload(emqx_router), + ok = snabbkaffe:stop(). + +t_concurrent_routing_updates_with_errors(_Config) -> + NClients = 100, + NRTopics = 80, + MCommands = 6, + PSyncError = 0.1, + Port = get_mqtt_tcp_port(node()), + %% Crash the batch sync operation with some small probability. + ok = meck:expect(emqx_router, mria_batch_run, fun(Vsn, Batch) -> + case rand:uniform() < PSyncError of + false -> meck:passthrough([Vsn, Batch]); + true -> error(overload) + end + end), + Clients = [ + spawn_link(?MODULE, run_concurrent_client, [I, Port, MCommands, NRTopics]) + || I <- lists:seq(1, NClients) + ], + ok = lists:foreach(fun ping_concurrent_client/1, Clients), + 0 = ?retry( + _Interval = 500, + _NTimes = 10, + 0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()]) + ), + Subscribers = ets:tab2list(?SUBSCRIBER), + Topics = maps:keys(maps:from_list(Subscribers)), + ?assertEqual(lists:sort(Topics), lists:sort(emqx_router:topics())), + ok = lists:foreach(fun stop_concurrent_client/1, Clients), + ok = timer:sleep(100), + 0 = ?retry( + 500, + 10, + 0 = lists:sum([S || #{size := S} <- emqx_router_syncer:stats()]) + ), + ct:pal("Trace: ~p", [?of_kind(router_syncer_new_batch, snabbkaffe:collect_trace())]), + ?assertEqual([], ets:tab2list(?SUBSCRIBER)), + ?assertEqual([], emqx_router:topics()). + run_concurrent_client(I, Port, MCommands, NRTopics) -> % _ = rand:seed(default, I), Ctx = #{