Merge pull request #12243 from keynslug/fix/EMQX-11483/route-async-repl

fix(router): rely on local sync state updates to propagate routes
This commit is contained in:
Andrew Mayorov 2023-12-29 16:31:00 +01:00 committed by GitHub
commit a87df28dfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 150 additions and 76 deletions

View File

@ -139,6 +139,7 @@ subscribe(Topic, SubId, SubOpts0) when ?IS_TOPIC(Topic), ?IS_SUBID(SubId), is_ma
%% New
false ->
ok = emqx_broker_helper:register_sub(SubPid, SubId),
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
%% Existed
true ->
@ -153,29 +154,31 @@ with_subid(undefined, SubOpts) ->
with_subid(SubId, SubOpts) ->
maps:put(subid, SubId, SubOpts).
do_subscribe(Topic, SubPid, SubOpts) ->
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
do_subscribe2(Topic, SubPid, SubOpts).
do_subscribe2(Topic, SubPid, SubOpts) when is_binary(Topic) ->
do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) ->
%% FIXME: subscribe shard bug
%% https://emqx.atlassian.net/browse/EMQX-10214
case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
0 ->
true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
call(pick(Topic), {subscribe, Topic});
I ->
true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}),
call(pick({Topic, I}), {subscribe, Topic, I})
end;
do_subscribe2(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
I = emqx_broker_helper:get_sub_shard(SubPid, Topic),
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}),
%% NOTE
%% We are relying on the local state to minimize global routing state changes,
%% thus it's important that some operations on ETS tables on the same topic
%% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of
%% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of
%% `unsubscribe` codepath. So we have to pick a worker according to the topic,
%% but not shard. If there are topics with high number of shards, then the
%% load across the pool will be unbalanced.
call(pick(Topic), {subscribe, Topic, SubPid, I});
do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
is_binary(RealTopic)
->
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
emqx_shared_sub:subscribe(Group, RealTopic, SubPid).
with_shard_idx(0, SubOpts) ->
SubOpts;
with_shard_idx(I, SubOpts) ->
maps:put(shard, I, SubOpts).
%%--------------------------------------------------------------------
%% Unsubscribe API
%%--------------------------------------------------------------------
@ -204,15 +207,20 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when
is_binary(Topic), is_pid(SubPid), is_map(SubOpts)
->
_ = emqx_broker_helper:reclaim_seq(Topic),
case maps:get(shard, SubOpts, 0) of
0 ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
emqx_exclusive_subscription:unsubscribe(Topic, SubOpts),
cast(pick(Topic), {unsubscribed, Topic});
I ->
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
cast(pick({Topic, I}), {unsubscribed, Topic, I})
end;
I = maps:get(shard, SubOpts, 0),
case I of
0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts);
_ -> ok
end,
%% NOTE
%% We are relying on the local state to minimize global routing state changes,
%% thus it's important that some operations on ETS tables on the same topic
%% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of
%% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of
%% `unsubscribe` codepath. So we have to pick a worker according to the topic,
%% but not shard. If there are topics with high number of shards, then the
%% load across the pool will be unbalanced.
cast(pick(Topic), {unsubscribed, Topic, SubPid, I});
do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when
is_binary(Group), is_binary(Topic), is_pid(SubPid)
->
@ -387,7 +395,6 @@ subscriber_down(SubPid) ->
fun(Topic) ->
case lookup_value(?SUBOPTION, {Topic, SubPid}) of
SubOpts when is_map(SubOpts) ->
_ = emqx_broker_helper:reclaim_seq(Topic),
true = ets:delete(?SUBOPTION, {Topic, SubPid}),
do_unsubscribe2(Topic, SubPid, SubOpts);
undefined ->
@ -499,49 +506,38 @@ init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{pool => Pool, id => Id}}.
handle_call({subscribe, Topic}, _From, State) ->
Ok = emqx_router:do_add_route(Topic),
{reply, Ok, State};
handle_call({subscribe, Topic, I}, _From, State) ->
Shard = {Topic, I},
Ok =
case get(Shard) of
undefined ->
_ = put(Shard, true),
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}),
cast(pick(Topic), {subscribe, Topic});
true ->
ok
end,
{reply, Ok, State};
handle_call({subscribe, Topic, SubPid, 0}, _From, State) ->
Existed = ets:member(?SUBSCRIBER, Topic),
true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
Result = maybe_add_route(Existed, Topic),
{reply, Result, State};
handle_call({subscribe, Topic, SubPid, I}, _From, State) ->
Existed = ets:member(?SUBSCRIBER, Topic),
true = ets:insert(?SUBSCRIBER, [
{Topic, {shard, I}},
{{shard, Topic, I}, SubPid}
]),
Result = maybe_add_route(Existed, Topic),
{reply, Result, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
handle_cast({subscribe, Topic}, State) ->
case emqx_router:do_add_route(Topic) of
ok -> ok;
{error, Reason} -> ?SLOG(error, #{msg => "failed_to_add_route", reason => Reason})
end,
handle_cast({unsubscribed, Topic, SubPid, 0}, State) ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
Exists = ets:member(?SUBSCRIBER, Topic),
_Result = maybe_delete_route(Exists, Topic),
{noreply, State};
handle_cast({unsubscribed, Topic}, State) ->
case ets:member(?SUBSCRIBER, Topic) of
false ->
_ = emqx_router:do_delete_route(Topic),
ok;
true ->
ok
end,
{noreply, State};
handle_cast({unsubscribed, Topic, I}, State) ->
handle_cast({unsubscribed, Topic, SubPid, I}, State) ->
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
false ->
_ = erase({Topic, I}),
true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
cast(pick(Topic), {unsubscribed, Topic});
ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}});
true ->
ok
true
end,
Exists = ets:member(?SUBSCRIBER, Topic),
_Result = maybe_delete_route(Exists, Topic),
{noreply, State};
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
@ -598,3 +594,15 @@ do_dispatch({shard, I}, Topic, Msg) ->
0,
subscribers({shard, Topic, I})
).
%%
maybe_add_route(_Existed = false, Topic) ->
emqx_router:do_add_route(Topic);
maybe_add_route(_Existed = true, _Topic) ->
ok.
maybe_delete_route(_Exists = false, Topic) ->
emqx_router:do_delete_route(Topic);
maybe_delete_route(_Exists = true, _Topic) ->
ok.

View File

@ -160,13 +160,8 @@ do_add_route(Topic) when is_binary(Topic) ->
-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_add_route(Topic, Dest) when is_binary(Topic) ->
case has_route(Topic, Dest) of
true ->
ok;
false ->
ok = emqx_router_helper:monitor(Dest),
mria_insert_route(get_schema_vsn(), Topic, Dest)
end.
ok = emqx_router_helper:monitor(Dest),
mria_insert_route(get_schema_vsn(), Topic, Dest).
mria_insert_route(v2, Topic, Dest) ->
mria_insert_route_v2(Topic, Dest);

View File

@ -22,6 +22,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_router.hrl").
all() ->
[
@ -33,7 +34,8 @@ all() ->
groups() ->
TCs = [
t_cluster_routing
t_cluster_routing,
t_slow_rlog_routing_consistency
],
[
{routing_schema_v1, [], TCs},
@ -54,11 +56,10 @@ end_per_group(_GroupName, Config) ->
emqx_cth_cluster:stop(?config(cluster, Config)).
init_per_testcase(TC, Config) ->
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, TC]),
[{work_dir, WorkDir} | Config].
emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config).
end_per_testcase(_TC, _Config) ->
ok.
end_per_testcase(TC, Config) ->
emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config).
mk_emqx_appspec(GroupName, N) ->
{emqx, #{
@ -182,14 +183,15 @@ unsubscribe(C, Topic) ->
%%
t_routing_schema_switch_v1(Config) ->
t_routing_schema_switch(_From = v2, _To = v1, Config).
WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
t_routing_schema_switch(_From = v2, _To = v1, WorkDir).
t_routing_schema_switch_v2(Config) ->
t_routing_schema_switch(_From = v1, _To = v2, Config).
WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
t_routing_schema_switch(_From = v1, _To = v2, WorkDir).
t_routing_schema_switch(VFrom, VTo, Config) ->
t_routing_schema_switch(VFrom, VTo, WorkDir) ->
% Start first node with routing schema VTo (e.g. v1)
WorkDir = ?config(work_dir, Config),
[Node1] = emqx_cth_cluster:start(
[
{routing_schema_switch1, #{
@ -254,8 +256,76 @@ t_routing_schema_switch(VFrom, VTo, Config) ->
ok = emqx_cth_cluster:stop(Nodes)
end.
t_slow_rlog_routing_consistency(init, Config) ->
[Core1, _Core2, _Replicant] = ?config(cluster, Config),
MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]),
[{original_mnesia_hook, MnesiaHook} | Config];
t_slow_rlog_routing_consistency('end', Config) ->
[Core1, Core2, _Replicant] = ?config(cluster, Config),
MnesiaHook = ?config(original_mnesia_hook, Config),
ok = register_mria_hook(MnesiaHook, [Core1, Core2]).
t_slow_rlog_routing_consistency(Config) ->
[Core1, Core2, Replicant] = ?config(cluster, Config),
ClientId = atom_to_binary(?FUNCTION_NAME),
Topic = <<"t/", ClientId/binary>>,
Self = self(),
?assertEqual(ok, rpc:call(Replicant, emqx_broker, do_subscribe, [Topic, Self, #{}])),
%% Wait for normal route replication (must be fast enough)
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant])
end,
2_000
),
DelayMs = 3_000,
slowdown_mria_rlog(?config(original_mnesia_hook, Config), [Core1, Core2], DelayMs),
{ok, _} = rpc:call(Replicant, mnesia_subscr, subscribe, [Self, {table, ?ROUTE_TAB, simple}]),
UnSubSubFun = fun() ->
%% Unsubscribe must remove a route, but the effect
%% is expected to be delayed on the replicant node
ok = emqx_broker:do_unsubscribe(Topic, Self, #{}),
%% Wait a little (less than introduced delay),
%% just to reduce the risk of delete/add routes ops being re-ordered
timer:sleep(100),
%% Subscribe must add a route again, even though the previosus
%% route may be still present on the replicant at the time of
%% this re-subscription
ok = emqx_broker:do_subscribe(Topic, Self, #{})
end,
?assertEqual(ok, erpc:call(Replicant, UnSubSubFun)),
receive
%% Can't match route record, since table name =/= record name,
{mnesia_table_event, {write, {?ROUTE_TAB, Topic, Replicant}, _}} ->
%% Event is reported before Mnesia writes a record, need to wait again...
timer:sleep(100),
?assert(rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant]))
after DelayMs * 3 ->
ct:pal("Received messages: ~p", [process_info(Self, messages)]),
ct:fail("quick re-subscribe failed to add a route")
end.
%%
get_mqtt_tcp_port(Node) ->
{_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
Port.
slowdown_mria_rlog(MnesiaHook, Nodes, DelayMs) ->
MnesiaHook1 = fun(Tid, CommitData) ->
spawn(fun() ->
timer:sleep(DelayMs),
MnesiaHook(Tid, CommitData)
end),
ok
end,
register_mria_hook(MnesiaHook1, Nodes).
register_mria_hook(MnesiaHook, Nodes) ->
[ok, ok] = [
rpc:call(N, mnesia_hook, register_hook, [post_commit, MnesiaHook])
|| N <- Nodes
],
ok.

View File

@ -0,0 +1 @@
Fixed a family of subtle race conditions that could lead to inconsistencies in the global routing state.