diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index cc9cb98a6..856a3f7ae 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -139,6 +139,7 @@ subscribe(Topic, SubId, SubOpts0) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId), is_ma %% New false -> ok = emqx_broker_helper:register_sub(SubPid, SubId), + true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts)); %% Existed true -> @@ -153,29 +154,31 @@ with_subid(undefined, SubOpts) -> with_subid(SubId, SubOpts) -> maps:put(subid, SubId, SubOpts). -do_subscribe(Topic, SubPid, SubOpts) -> - true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), - do_subscribe2(Topic, SubPid, SubOpts). - -do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) -> +do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% FIXME: subscribe shard bug %% https://emqx.atlassian.net/browse/EMQX-10214 - case emqx_broker_helper:get_sub_shard(SubPid, Topic) of - 0 -> - true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), - call(pick(Topic), {subscribe, Topic}); - I -> - true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}), - call(pick({Topic, I}), {subscribe, Topic, I}) - end; -do_subscribe2(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when + I = emqx_broker_helper:get_sub_shard(SubPid, Topic), + true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}), + %% NOTE + %% We are relying on the local state to minimize global routing state changes, + %% thus it's important that some operations on ETS tables on the same topic + %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of + %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of + %% `unsubscribe` codepath. So we have to pick a worker according to the topic, + %% but not shard. If there are topics with high number of shards, then the + %% load across the pool will be unbalanced. + call(pick(Topic), {subscribe, Topic, SubPid, I}); +do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when is_binary(RealTopic) -> true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}), emqx_shared_sub:subscribe(Group, RealTopic, SubPid). +with_shard_idx(0, SubOpts) -> + SubOpts; +with_shard_idx(I, SubOpts) -> + maps:put(shard, I, SubOpts). + %%-------------------------------------------------------------------- %% Unsubscribe API %%-------------------------------------------------------------------- @@ -204,15 +207,20 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) -> _ = emqx_broker_helper:reclaim_seq(Topic), - case maps:get(shard, SubOpts, 0) of - 0 -> - true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - emqx_exclusive_subscription:unsubscribe(Topic, SubOpts), - cast(pick(Topic), {unsubscribed, Topic}); - I -> - true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end; + I = maps:get(shard, SubOpts, 0), + case I of + 0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts); + _ -> ok + end, + %% NOTE + %% We are relying on the local state to minimize global routing state changes, + %% thus it's important that some operations on ETS tables on the same topic + %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of + %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of + %% `unsubscribe` codepath. So we have to pick a worker according to the topic, + %% but not shard. If there are topics with high number of shards, then the + %% load across the pool will be unbalanced. + cast(pick(Topic), {unsubscribed, Topic, SubPid, I}); do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when is_binary(Group), is_binary(Topic), is_pid(SubPid) -> @@ -387,7 +395,6 @@ subscriber_down(SubPid) -> fun(Topic) -> case lookup_value(?SUBOPTION, {Topic, SubPid}) of SubOpts when is_map(SubOpts) -> - _ = emqx_broker_helper:reclaim_seq(Topic), true = ets:delete(?SUBOPTION, {Topic, SubPid}), do_unsubscribe2(Topic, SubPid, SubOpts); undefined -> @@ -499,49 +506,38 @@ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({subscribe, Topic}, _From, State) -> - Ok = emqx_router:do_add_route(Topic), - {reply, Ok, State}; -handle_call({subscribe, Topic, I}, _From, State) -> - Shard = {Topic, I}, - Ok = - case get(Shard) of - undefined -> - _ = put(Shard, true), - true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}), - cast(pick(Topic), {subscribe, Topic}); - true -> - ok - end, - {reply, Ok, State}; +handle_call({subscribe, Topic, SubPid, 0}, _From, State) -> + Existed = ets:member(?SUBSCRIBER, Topic), + true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), + Result = maybe_add_route(Existed, Topic), + {reply, Result, State}; +handle_call({subscribe, Topic, SubPid, I}, _From, State) -> + Existed = ets:member(?SUBSCRIBER, Topic), + true = ets:insert(?SUBSCRIBER, [ + {Topic, {shard, I}}, + {{shard, Topic, I}, SubPid} + ]), + Result = maybe_add_route(Existed, Topic), + {reply, Result, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. -handle_cast({subscribe, Topic}, State) -> - case emqx_router:do_add_route(Topic) of - ok -> ok; - {error, Reason} -> ?SLOG(error, #{msg => "failed_to_add_route", reason => Reason}) - end, +handle_cast({unsubscribed, Topic, SubPid, 0}, State) -> + true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + Exists = ets:member(?SUBSCRIBER, Topic), + _Result = maybe_delete_route(Exists, Topic), {noreply, State}; -handle_cast({unsubscribed, Topic}, State) -> - case ets:member(?SUBSCRIBER, Topic) of - false -> - _ = emqx_router:do_delete_route(Topic), - ok; - true -> - ok - end, - {noreply, State}; -handle_cast({unsubscribed, Topic, I}, State) -> +handle_cast({unsubscribed, Topic, SubPid, I}, State) -> + true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> - _ = erase({Topic, I}), - true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), - cast(pick(Topic), {unsubscribed, Topic}); + ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}); true -> - ok + true end, + Exists = ets:member(?SUBSCRIBER, Topic), + _Result = maybe_delete_route(Exists, Topic), {noreply, State}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), @@ -598,3 +594,15 @@ do_dispatch({shard, I}, Topic, Msg) -> 0, subscribers({shard, Topic, I}) ). + +%% + +maybe_add_route(_Existed = false, Topic) -> + emqx_router:do_add_route(Topic); +maybe_add_route(_Existed = true, _Topic) -> + ok. + +maybe_delete_route(_Exists = false, Topic) -> + emqx_router:do_delete_route(Topic); +maybe_delete_route(_Exists = true, _Topic) -> + ok. diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3b92d4f1c..defa398a8 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -160,13 +160,8 @@ do_add_route(Topic) when is_binary(Topic) -> -spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}. do_add_route(Topic, Dest) when is_binary(Topic) -> - case has_route(Topic, Dest) of - true -> - ok; - false -> - ok = emqx_router_helper:monitor(Dest), - mria_insert_route(get_schema_vsn(), Topic, Dest) - end. + ok = emqx_router_helper:monitor(Dest), + mria_insert_route(get_schema_vsn(), Topic, Dest). mria_insert_route(v2, Topic, Dest) -> mria_insert_route_v2(Topic, Dest); diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index c9ad63cf1..10ca866fe 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx/include/asserts.hrl"). +-include_lib("emqx/include/emqx_router.hrl"). all() -> [ @@ -33,7 +34,8 @@ all() -> groups() -> TCs = [ - t_cluster_routing + t_cluster_routing, + t_slow_rlog_routing_consistency ], [ {routing_schema_v1, [], TCs}, @@ -54,11 +56,10 @@ end_per_group(_GroupName, Config) -> emqx_cth_cluster:stop(?config(cluster, Config)). init_per_testcase(TC, Config) -> - WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, TC]), - [{work_dir, WorkDir} | Config]. + emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config). -end_per_testcase(_TC, _Config) -> - ok. +end_per_testcase(TC, Config) -> + emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config). mk_emqx_appspec(GroupName, N) -> {emqx, #{ @@ -182,14 +183,15 @@ unsubscribe(C, Topic) -> %% t_routing_schema_switch_v1(Config) -> - t_routing_schema_switch(_From = v2, _To = v1, Config). + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + t_routing_schema_switch(_From = v2, _To = v1, WorkDir). t_routing_schema_switch_v2(Config) -> - t_routing_schema_switch(_From = v1, _To = v2, Config). + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + t_routing_schema_switch(_From = v1, _To = v2, WorkDir). -t_routing_schema_switch(VFrom, VTo, Config) -> +t_routing_schema_switch(VFrom, VTo, WorkDir) -> % Start first node with routing schema VTo (e.g. v1) - WorkDir = ?config(work_dir, Config), [Node1] = emqx_cth_cluster:start( [ {routing_schema_switch1, #{ @@ -254,8 +256,76 @@ t_routing_schema_switch(VFrom, VTo, Config) -> ok = emqx_cth_cluster:stop(Nodes) end. +t_slow_rlog_routing_consistency(init, Config) -> + [Core1, _Core2, _Replicant] = ?config(cluster, Config), + MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]), + [{original_mnesia_hook, MnesiaHook} | Config]; +t_slow_rlog_routing_consistency('end', Config) -> + [Core1, Core2, _Replicant] = ?config(cluster, Config), + MnesiaHook = ?config(original_mnesia_hook, Config), + ok = register_mria_hook(MnesiaHook, [Core1, Core2]). + +t_slow_rlog_routing_consistency(Config) -> + [Core1, Core2, Replicant] = ?config(cluster, Config), + ClientId = atom_to_binary(?FUNCTION_NAME), + Topic = <<"t/", ClientId/binary>>, + Self = self(), + ?assertEqual(ok, rpc:call(Replicant, emqx_broker, do_subscribe, [Topic, Self, #{}])), + %% Wait for normal route replication (must be fast enough) + emqx_common_test_helpers:wait_for( + ?FUNCTION_NAME, + ?LINE, + fun() -> + rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant]) + end, + 2_000 + ), + DelayMs = 3_000, + slowdown_mria_rlog(?config(original_mnesia_hook, Config), [Core1, Core2], DelayMs), + {ok, _} = rpc:call(Replicant, mnesia_subscr, subscribe, [Self, {table, ?ROUTE_TAB, simple}]), + UnSubSubFun = fun() -> + %% Unsubscribe must remove a route, but the effect + %% is expected to be delayed on the replicant node + ok = emqx_broker:do_unsubscribe(Topic, Self, #{}), + %% Wait a little (less than introduced delay), + %% just to reduce the risk of delete/add routes ops being re-ordered + timer:sleep(100), + %% Subscribe must add a route again, even though the previosus + %% route may be still present on the replicant at the time of + %% this re-subscription + ok = emqx_broker:do_subscribe(Topic, Self, #{}) + end, + ?assertEqual(ok, erpc:call(Replicant, UnSubSubFun)), + receive + %% Can't match route record, since table name =/= record name, + {mnesia_table_event, {write, {?ROUTE_TAB, Topic, Replicant}, _}} -> + %% Event is reported before Mnesia writes a record, need to wait again... + timer:sleep(100), + ?assert(rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant])) + after DelayMs * 3 -> + ct:pal("Received messages: ~p", [process_info(Self, messages)]), + ct:fail("quick re-subscribe failed to add a route") + end. + %% get_mqtt_tcp_port(Node) -> {_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]), Port. + +slowdown_mria_rlog(MnesiaHook, Nodes, DelayMs) -> + MnesiaHook1 = fun(Tid, CommitData) -> + spawn(fun() -> + timer:sleep(DelayMs), + MnesiaHook(Tid, CommitData) + end), + ok + end, + register_mria_hook(MnesiaHook1, Nodes). + +register_mria_hook(MnesiaHook, Nodes) -> + [ok, ok] = [ + rpc:call(N, mnesia_hook, register_hook, [post_commit, MnesiaHook]) + || N <- Nodes + ], + ok. diff --git a/changes/ce/fix-12243.en.md b/changes/ce/fix-12243.en.md new file mode 100644 index 000000000..682655551 --- /dev/null +++ b/changes/ce/fix-12243.en.md @@ -0,0 +1 @@ +Fixed a family of subtle race conditions that could lead to inconsistencies in the global routing state.