From a8cd609ff4cfee39bef210ab5484b8ffc2bb28a2 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 22 Dec 2023 17:34:46 +0100 Subject: [PATCH 1/5] chore(broker): simplify subscribe code path --- apps/emqx/src/emqx_broker.erl | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index cc9cb98a6..df498a5df 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -139,6 +139,7 @@ subscribe(Topic, SubId, SubOpts0) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId), is_ma %% New false -> ok = emqx_broker_helper:register_sub(SubPid, SubId), + true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts)); %% Existed true -> @@ -153,11 +154,7 @@ with_subid(undefined, SubOpts) -> with_subid(SubId, SubOpts) -> maps:put(subid, SubId, SubOpts). -do_subscribe(Topic, SubPid, SubOpts) -> - true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), - do_subscribe2(Topic, SubPid, SubOpts). - -do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) -> +do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% FIXME: subscribe shard bug %% https://emqx.atlassian.net/browse/EMQX-10214 case emqx_broker_helper:get_sub_shard(SubPid, Topic) of @@ -170,7 +167,7 @@ do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) -> true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}), call(pick({Topic, I}), {subscribe, Topic, I}) end; -do_subscribe2(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when +do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when is_binary(RealTopic) -> true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), From 331bfaa535f54fac085f129f0bdd79ccbdafa81c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 22 Dec 2023 17:34:02 +0100 Subject: [PATCH 2/5] fix(broker): avoid reclaiming per-topic counter twice --- apps/emqx/src/emqx_broker.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index df498a5df..43617a038 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -384,7 +384,6 @@ subscriber_down(SubPid) -> fun(Topic) -> case lookup_value(?SUBOPTION, {Topic, SubPid}) of SubOpts when is_map(SubOpts) -> - _ = emqx_broker_helper:reclaim_seq(Topic), true = ets:delete(?SUBOPTION, {Topic, SubPid}), do_unsubscribe2(Topic, SubPid, SubOpts); undefined -> From b9627c420f2397cd1b11fd8ea307781b2b643e58 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Thu, 28 Dec 2023 19:11:33 +0100 Subject: [PATCH 3/5] fix(router): rely on local sync state updates to propagate routes Instead of (potentially lagging) global table state itself. Local state should be the only source of truth anyway. Moreover, synchronize parts of local state update logic through the broker pool to ensure consistency. The problem was that `emqx_router:has_route/2` check may observe a stale route, which deletion is not yet replicated from the core node to the local replicant node. For example: 1. The only one subscriber per a given topic A unsubscribes from a replicant node. 2. The route to A is deleted by the emqx_broker. 3. Mria makes RPC to a core node, it succeeds and returns. 4. The client resubscribes or another client subscribes to the same topic A. 5. emqx_broker tries to add a route again: `emqx_router:do_add_route(Topic)`. 6. `emqx_router` checks if the route is present. 7. The stale route is present because deletion not replicated yet, so no route is being added. 8. Route deletion is replicated locally but it's too late: we already have a local subscriber without a route. Co-Authored-By: Serge Tupchii --- apps/emqx/src/emqx_broker.erl | 102 +++++++++++++------------- apps/emqx/src/emqx_router.erl | 9 +-- apps/emqx/test/emqx_routing_SUITE.erl | 88 +++++++++++++++++++--- 3 files changed, 130 insertions(+), 69 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 43617a038..103e358f6 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -157,22 +157,20 @@ with_subid(SubId, SubOpts) -> do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% FIXME: subscribe shard bug %% https://emqx.atlassian.net/browse/EMQX-10214 - case emqx_broker_helper:get_sub_shard(SubPid, Topic) of - 0 -> - true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), - call(pick(Topic), {subscribe, Topic}); - I -> - true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}), - call(pick({Topic, I}), {subscribe, Topic, I}) - end; + I = emqx_broker_helper:get_sub_shard(SubPid, Topic), + true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}), + call(pick(Topic), {subscribe, Topic, SubPid, I}); do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when is_binary(RealTopic) -> true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), emqx_shared_sub:subscribe(Group, RealTopic, SubPid). +with_shard_idx(0, SubOpts) -> + SubOpts; +with_shard_idx(I, SubOpts) -> + maps:put(shard, I, SubOpts). + %%-------------------------------------------------------------------- %% Unsubscribe API %%-------------------------------------------------------------------- @@ -201,15 +199,12 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), - case maps:get(shard, SubOpts, 0) of - 0 -> - true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - emqx_exclusive_subscription:unsubscribe(Topic, SubOpts), - cast(pick(Topic), {unsubscribed, Topic}); - I -> - true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end; + I = maps:get(shard, SubOpts, 0), + case I of + 0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts); + _ -> ok + end, + cast(pick(Topic), {unsubscribed, Topic, SubPid, I}); do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when is_binary(Group), is_binary(Topic), is_pid(SubPid) -> @@ -495,49 +490,38 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({subscribe, Topic}, _From, State) -> - Ok = emqx_router:do_add_route(Topic), - {reply, Ok, State}; -handle_call({subscribe, Topic, I}, _From, State) -> - Shard = {Topic, I}, - Ok = - case get(Shard) of - undefined -> - _ = put(Shard, true), - true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}), - cast(pick(Topic), {subscribe, Topic}); - true -> - ok - end, - {reply, Ok, State}; +handle_call({subscribe, Topic, SubPid, 0}, _From, State) -> + Existed = ets:member(?SUBSCRIBER, Topic), + true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), + Result = maybe_add_route(Existed, Topic), + {reply, Result, State}; +handle_call({subscribe, Topic, SubPid, I}, _From, State) -> + Existed = ets:member(?SUBSCRIBER, Topic), + true = ets:insert(?SUBSCRIBER, [ + {Topic, {shard, I}}, + {{shard, Topic, I}, SubPid} + ]), + Result = maybe_add_route(Existed, Topic), + {reply, Result, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. -handle_cast({subscribe, Topic}, State) -> - case emqx_router:do_add_route(Topic) of - ok -> ok; - {error, Reason} -> ?SLOG(error, #{msg => "failed_to_add_route", reason => Reason}) - end, +handle_cast({unsubscribed, Topic, SubPid, 0}, State) -> + true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + Exists = ets:member(?SUBSCRIBER, Topic), + _Result = maybe_delete_route(Exists, Topic), {noreply, State}; -handle_cast({unsubscribed, Topic}, State) -> - case ets:member(?SUBSCRIBER, Topic) of - false -> - _ = emqx_router:do_delete_route(Topic), - ok; - true -> - ok - end, - {noreply, State}; -handle_cast({unsubscribed, Topic, I}, State) -> +handle_cast({unsubscribed, Topic, SubPid, I}, State) -> + true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> - _ = erase({Topic, I}), - true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), - cast(pick(Topic), {unsubscribed, Topic}); + ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}); true -> - ok + true end, + Exists = ets:member(?SUBSCRIBER, Topic), + _Result = maybe_delete_route(Exists, Topic), {noreply, State}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), @@ -594,3 +578,15 @@ do_dispatch({shard, I}, Topic, Msg) -> 0, subscribers({shard, Topic, I}) ). + +%% + +maybe_add_route(_Existed = false, Topic) -> + emqx_router:do_add_route(Topic); +maybe_add_route(_Existed = true, _Topic) -> + ok. + +maybe_delete_route(_Exists = false, Topic) -> + emqx_router:do_delete_route(Topic); +maybe_delete_route(_Exists = true, _Topic) -> + ok. diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3b92d4f1c..defa398a8 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -160,13 +160,8 @@ do_add_route(Topic) when is_binary(Topic) -> -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_add_route(Topic, Dest) when is_binary(Topic) -> - case has_route(Topic, Dest) of - true -> - ok; - false -> - ok = emqx_router_helper:monitor(Dest), - mria_insert_route(get_schema_vsn(), Topic, Dest) - end. + ok = emqx_router_helper:monitor(Dest), + mria_insert_route(get_schema_vsn(), Topic, Dest). mria_insert_route(v2, Topic, Dest) -> mria_insert_route_v2(Topic, Dest); diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index c9ad63cf1..10ca866fe 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). all() -> [ @@ -33,7 +34,8 @@ all() -> groups() -> TCs = [ - t_cluster_routing + t_cluster_routing, + t_slow_rlog_routing_consistency ], [ {routing_schema_v1, [], TCs}, @@ -54,11 +56,10 @@ end_per_group(_GroupName, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)). init_per_testcase(TC, Config) -> - WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, TC]), - [{work_dir, WorkDir} | Config]. + emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config). -end_per_testcase(_TC, _Config) -> - ok. +end_per_testcase(TC, Config) -> + emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config). mk_emqx_appspec(GroupName, N) -> {emqx, #{ @@ -182,14 +183,15 @@ unsubscribe(C, Topic) -> %% t_routing_schema_switch_v1(Config) -> - t_routing_schema_switch(_From = v2, _To = v1, Config). + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + t_routing_schema_switch(_From = v2, _To = v1, WorkDir). t_routing_schema_switch_v2(Config) -> - t_routing_schema_switch(_From = v1, _To = v2, Config). + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + t_routing_schema_switch(_From = v1, _To = v2, WorkDir). -t_routing_schema_switch(VFrom, VTo, Config) -> +t_routing_schema_switch(VFrom, VTo, WorkDir) -> % Start first node with routing schema VTo (e.g. v1) - WorkDir = ?config(work_dir, Config), [Node1] = emqx_cth_cluster:start( [ {routing_schema_switch1, #{ @@ -254,8 +256,76 @@ t_routing_schema_switch(VFrom, VTo, Config) -> ok = emqx_cth_cluster:stop(Nodes) end. +t_slow_rlog_routing_consistency(init, Config) -> + [Core1, _Core2, _Replicant] = ?config(cluster, Config), + MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]), + [{original_mnesia_hook, MnesiaHook} | Config]; +t_slow_rlog_routing_consistency('end', Config) -> + [Core1, Core2, _Replicant] = ?config(cluster, Config), + MnesiaHook = ?config(original_mnesia_hook, Config), + ok = register_mria_hook(MnesiaHook, [Core1, Core2]). + +t_slow_rlog_routing_consistency(Config) -> + [Core1, Core2, Replicant] = ?config(cluster, Config), + ClientId = atom_to_binary(?FUNCTION_NAME), + Topic = <<"t/", ClientId/binary>>, + Self = self(), + ?assertEqual(ok, rpc:call(Replicant, emqx_broker, do_subscribe, [Topic, Self, #{}])), + %% Wait for normal route replication (must be fast enough) + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant]) + end, + 2_000 + ), + DelayMs = 3_000, + slowdown_mria_rlog(?config(original_mnesia_hook, Config), [Core1, Core2], DelayMs), + {ok, _} = rpc:call(Replicant, mnesia_subscr, subscribe, [Self, {table, ?ROUTE_TAB, simple}]), + UnSubSubFun = fun() -> + %% Unsubscribe must remove a route, but the effect + %% is expected to be delayed on the replicant node + ok = emqx_broker:do_unsubscribe(Topic, Self, #{}), + %% Wait a little (less than introduced delay), + %% just to reduce the risk of delete/add routes ops being re-ordered + timer:sleep(100), + %% Subscribe must add a route again, even though the previosus + %% route may be still present on the replicant at the time of + %% this re-subscription + ok = emqx_broker:do_subscribe(Topic, Self, #{}) + end, + ?assertEqual(ok, erpc:call(Replicant, UnSubSubFun)), + receive + %% Can't match route record, since table name =/= record name, + {mnesia_table_event, {write, {?ROUTE_TAB, Topic, Replicant}, _}} -> + %% Event is reported before Mnesia writes a record, need to wait again... + timer:sleep(100), + ?assert(rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant])) + after DelayMs * 3 -> + ct:pal("Received messages: ~p", [process_info(Self, messages)]), + ct:fail("quick re-subscribe failed to add a route") + end. + %% get_mqtt_tcp_port(Node) -> {_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), Port. + +slowdown_mria_rlog(MnesiaHook, Nodes, DelayMs) -> + MnesiaHook1 = fun(Tid, CommitData) -> + spawn(fun() -> + timer:sleep(DelayMs), + MnesiaHook(Tid, CommitData) + end), + ok + end, + register_mria_hook(MnesiaHook1, Nodes). + +register_mria_hook(MnesiaHook, Nodes) -> + [ok, ok] = [ + rpc:call(N, mnesia_hook, register_hook, [post_commit, MnesiaHook]) + || N <- Nodes + ], + ok. From 5c91984ad75465f9bce605027ff00ddd0d55afa9 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 29 Dec 2023 14:04:48 +0100 Subject: [PATCH 4/5] chore: leave comment describing need for tighter synchronization --- apps/emqx/src/emqx_broker.erl | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 103e358f6..856a3f7ae 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -159,6 +159,14 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% https://emqx.atlassian.net/browse/EMQX-10214 I = emqx_broker_helper:get_sub_shard(SubPid, Topic), true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}), + %% NOTE + %% We are relying on the local state to minimize global routing state changes, + %% thus it's important that some operations on ETS tables on the same topic + %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of + %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of + %% `unsubscribe` codepath. So we have to pick a worker according to the topic, + %% but not shard. If there are topics with high number of shards, then the + %% load across the pool will be unbalanced. call(pick(Topic), {subscribe, Topic, SubPid, I}); do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when is_binary(RealTopic) @@ -204,6 +212,14 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when 0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts); _ -> ok end, + %% NOTE + %% We are relying on the local state to minimize global routing state changes, + %% thus it's important that some operations on ETS tables on the same topic + %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of + %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of + %% `unsubscribe` codepath. So we have to pick a worker according to the topic, + %% but not shard. If there are topics with high number of shards, then the + %% load across the pool will be unbalanced. cast(pick(Topic), {unsubscribed, Topic, SubPid, I}); do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when is_binary(Group), is_binary(Topic), is_pid(SubPid) From 42da329b340d3929d7d876418a55b1a9a53728b7 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 29 Dec 2023 14:05:50 +0100 Subject: [PATCH 5/5] chore: add changelog entry --- changes/ce/fix-12243.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-12243.en.md diff --git a/changes/ce/fix-12243.en.md b/changes/ce/fix-12243.en.md new file mode 100644 index 000000000..682655551 --- /dev/null +++ b/changes/ce/fix-12243.en.md @@ -0,0 +1 @@ +Fixed a family of subtle race conditions that could lead to inconsistencies in the global routing state.