From b9627c420f2397cd1b11fd8ea307781b2b643e58 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 28 Dec 2023 19:11:33 +0100 Subject: [PATCH] fix(router): rely on local sync state updates to propagate routes Instead of (potentially lagging) global table state itself. Local state should be the only source of truth anyway. Moreover, synchronize parts of local state update logic through the broker pool to ensure consistency. The problem was that `emqx_router:has_route/2` check may observe a stale route, which deletion is not yet replicated from the core node to the local replicant node. For example: 1. The only one subscriber per a given topic A unsubscribes from a replicant node. 2. The route to A is deleted by the emqx_broker. 3. Mria makes RPC to a core node, it succeeds and returns. 4. The client resubscribes or another client subscribes to the same topic A. 5. emqx_broker tries to add a route again: `emqx_router:do_add_route(Topic)`. 6. `emqx_router` checks if the route is present. 7. The stale route is present because deletion not replicated yet, so no route is being added. 8. Route deletion is replicated locally but it's too late: we already have a local subscriber without a route. Co-Authored-By: Serge Tupchii --- apps/emqx/src/emqx_broker.erl | 102 +++++++++++++------------- apps/emqx/src/emqx_router.erl | 9 +-- apps/emqx/test/emqx_routing_SUITE.erl | 88 +++++++++++++++++++--- 3 files changed, 130 insertions(+), 69 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 43617a038..103e358f6 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -157,22 +157,20 @@ with_subid(SubId, SubOpts) -> do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% FIXME: subscribe shard bug %% https://emqx.atlassian.net/browse/EMQX-10214 - case emqx_broker_helper:get_sub_shard(SubPid, Topic) of - 0 -> - true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), - call(pick(Topic), {subscribe, Topic}); - I -> - true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}), - call(pick({Topic, I}), {subscribe, Topic, I}) - end; + I = emqx_broker_helper:get_sub_shard(SubPid, Topic), + true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}), + call(pick(Topic), {subscribe, Topic, SubPid, I}); do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when is_binary(RealTopic) -> true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), emqx_shared_sub:subscribe(Group, RealTopic, SubPid). +with_shard_idx(0, SubOpts) -> + SubOpts; +with_shard_idx(I, SubOpts) -> + maps:put(shard, I, SubOpts). + %%-------------------------------------------------------------------- %% Unsubscribe API %%-------------------------------------------------------------------- @@ -201,15 +199,12 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), - case maps:get(shard, SubOpts, 0) of - 0 -> - true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - emqx_exclusive_subscription:unsubscribe(Topic, SubOpts), - cast(pick(Topic), {unsubscribed, Topic}); - I -> - true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end; + I = maps:get(shard, SubOpts, 0), + case I of + 0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts); + _ -> ok + end, + cast(pick(Topic), {unsubscribed, Topic, SubPid, I}); do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when is_binary(Group), is_binary(Topic), is_pid(SubPid) -> @@ -495,49 +490,38 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({subscribe, Topic}, _From, State) -> - Ok = emqx_router:do_add_route(Topic), - {reply, Ok, State}; -handle_call({subscribe, Topic, I}, _From, State) -> - Shard = {Topic, I}, - Ok = - case get(Shard) of - undefined -> - _ = put(Shard, true), - true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}), - cast(pick(Topic), {subscribe, Topic}); - true -> - ok - end, - {reply, Ok, State}; +handle_call({subscribe, Topic, SubPid, 0}, _From, State) -> + Existed = ets:member(?SUBSCRIBER, Topic), + true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), + Result = maybe_add_route(Existed, Topic), + {reply, Result, State}; +handle_call({subscribe, Topic, SubPid, I}, _From, State) -> + Existed = ets:member(?SUBSCRIBER, Topic), + true = ets:insert(?SUBSCRIBER, [ + {Topic, {shard, I}}, + {{shard, Topic, I}, SubPid} + ]), + Result = maybe_add_route(Existed, Topic), + {reply, Result, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. -handle_cast({subscribe, Topic}, State) -> - case emqx_router:do_add_route(Topic) of - ok -> ok; - {error, Reason} -> ?SLOG(error, #{msg => "failed_to_add_route", reason => Reason}) - end, +handle_cast({unsubscribed, Topic, SubPid, 0}, State) -> + true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + Exists = ets:member(?SUBSCRIBER, Topic), + _Result = maybe_delete_route(Exists, Topic), {noreply, State}; -handle_cast({unsubscribed, Topic}, State) -> - case ets:member(?SUBSCRIBER, Topic) of - false -> - _ = emqx_router:do_delete_route(Topic), - ok; - true -> - ok - end, - {noreply, State}; -handle_cast({unsubscribed, Topic, I}, State) -> +handle_cast({unsubscribed, Topic, SubPid, I}, State) -> + true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> - _ = erase({Topic, I}), - true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), - cast(pick(Topic), {unsubscribed, Topic}); + ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}); true -> - ok + true end, + Exists = ets:member(?SUBSCRIBER, Topic), + _Result = maybe_delete_route(Exists, Topic), {noreply, State}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), @@ -594,3 +578,15 @@ do_dispatch({shard, I}, Topic, Msg) -> 0, subscribers({shard, Topic, I}) ). + +%% + +maybe_add_route(_Existed = false, Topic) -> + emqx_router:do_add_route(Topic); +maybe_add_route(_Existed = true, _Topic) -> + ok. + +maybe_delete_route(_Exists = false, Topic) -> + emqx_router:do_delete_route(Topic); +maybe_delete_route(_Exists = true, _Topic) -> + ok. diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3b92d4f1c..defa398a8 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -160,13 +160,8 @@ do_add_route(Topic) when is_binary(Topic) -> -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_add_route(Topic, Dest) when is_binary(Topic) -> - case has_route(Topic, Dest) of - true -> - ok; - false -> - ok = emqx_router_helper:monitor(Dest), - mria_insert_route(get_schema_vsn(), Topic, Dest) - end. + ok = emqx_router_helper:monitor(Dest), + mria_insert_route(get_schema_vsn(), Topic, Dest). mria_insert_route(v2, Topic, Dest) -> mria_insert_route_v2(Topic, Dest); diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index c9ad63cf1..10ca866fe 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). all() -> [ @@ -33,7 +34,8 @@ all() -> groups() -> TCs = [ - t_cluster_routing + t_cluster_routing, + t_slow_rlog_routing_consistency ], [ {routing_schema_v1, [], TCs}, @@ -54,11 +56,10 @@ end_per_group(_GroupName, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)). init_per_testcase(TC, Config) -> - WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, TC]), - [{work_dir, WorkDir} | Config]. + emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config). -end_per_testcase(_TC, _Config) -> - ok. +end_per_testcase(TC, Config) -> + emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config). mk_emqx_appspec(GroupName, N) -> {emqx, #{ @@ -182,14 +183,15 @@ unsubscribe(C, Topic) -> %% t_routing_schema_switch_v1(Config) -> - t_routing_schema_switch(_From = v2, _To = v1, Config). + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + t_routing_schema_switch(_From = v2, _To = v1, WorkDir). t_routing_schema_switch_v2(Config) -> - t_routing_schema_switch(_From = v1, _To = v2, Config). + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + t_routing_schema_switch(_From = v1, _To = v2, WorkDir). -t_routing_schema_switch(VFrom, VTo, Config) -> +t_routing_schema_switch(VFrom, VTo, WorkDir) -> % Start first node with routing schema VTo (e.g. v1) - WorkDir = ?config(work_dir, Config), [Node1] = emqx_cth_cluster:start( [ {routing_schema_switch1, #{ @@ -254,8 +256,76 @@ t_routing_schema_switch(VFrom, VTo, Config) -> ok = emqx_cth_cluster:stop(Nodes) end. +t_slow_rlog_routing_consistency(init, Config) -> + [Core1, _Core2, _Replicant] = ?config(cluster, Config), + MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]), + [{original_mnesia_hook, MnesiaHook} | Config]; +t_slow_rlog_routing_consistency('end', Config) -> + [Core1, Core2, _Replicant] = ?config(cluster, Config), + MnesiaHook = ?config(original_mnesia_hook, Config), + ok = register_mria_hook(MnesiaHook, [Core1, Core2]). + +t_slow_rlog_routing_consistency(Config) -> + [Core1, Core2, Replicant] = ?config(cluster, Config), + ClientId = atom_to_binary(?FUNCTION_NAME), + Topic = <<"t/", ClientId/binary>>, + Self = self(), + ?assertEqual(ok, rpc:call(Replicant, emqx_broker, do_subscribe, [Topic, Self, #{}])), + %% Wait for normal route replication (must be fast enough) + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant]) + end, + 2_000 + ), + DelayMs = 3_000, + slowdown_mria_rlog(?config(original_mnesia_hook, Config), [Core1, Core2], DelayMs), + {ok, _} = rpc:call(Replicant, mnesia_subscr, subscribe, [Self, {table, ?ROUTE_TAB, simple}]), + UnSubSubFun = fun() -> + %% Unsubscribe must remove a route, but the effect + %% is expected to be delayed on the replicant node + ok = emqx_broker:do_unsubscribe(Topic, Self, #{}), + %% Wait a little (less than introduced delay), + %% just to reduce the risk of delete/add routes ops being re-ordered + timer:sleep(100), + %% Subscribe must add a route again, even though the previosus + %% route may be still present on the replicant at the time of + %% this re-subscription + ok = emqx_broker:do_subscribe(Topic, Self, #{}) + end, + ?assertEqual(ok, erpc:call(Replicant, UnSubSubFun)), + receive + %% Can't match route record, since table name =/= record name, + {mnesia_table_event, {write, {?ROUTE_TAB, Topic, Replicant}, _}} -> + %% Event is reported before Mnesia writes a record, need to wait again... + timer:sleep(100), + ?assert(rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant])) + after DelayMs * 3 -> + ct:pal("Received messages: ~p", [process_info(Self, messages)]), + ct:fail("quick re-subscribe failed to add a route") + end. + %% get_mqtt_tcp_port(Node) -> {_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), Port. + +slowdown_mria_rlog(MnesiaHook, Nodes, DelayMs) -> + MnesiaHook1 = fun(Tid, CommitData) -> + spawn(fun() -> + timer:sleep(DelayMs), + MnesiaHook(Tid, CommitData) + end), + ok + end, + register_mria_hook(MnesiaHook1, Nodes). + +register_mria_hook(MnesiaHook, Nodes) -> + [ok, ok] = [ + rpc:call(N, mnesia_hook, register_hook, [post_commit, MnesiaHook]) + || N <- Nodes + ], + ok.