fix(router): rely on local sync state updates to propagate routes

Instead of (potentially lagging) global table state itself. Local
state should be the only source of truth anyway. Moreover,
synchronize parts of local state update logic through the broker
pool to ensure consistency.

The problem was that `emqx_router:has_route/2` check may observe a
stale route, which deletion is not yet replicated from the core node
to the local replicant node.

For example:
1. The only one subscriber per a given topic A unsubscribes from
   a replicant node.
2. The route to A is deleted by the emqx_broker.
3. Mria makes RPC to a core node, it succeeds and returns.
4. The client resubscribes or another client subscribes to the same
   topic A.
5. emqx_broker tries to add a route again:
   `emqx_router:do_add_route(Topic)`.
6. `emqx_router` checks if the route is present.
7. The stale route is present because deletion not replicated yet,
   so no route is being added.
8. Route deletion is replicated locally but it's too late:
   we already have a local subscriber without a route.

Co-Authored-By: Serge Tupchii <serge.tupchii@protonmail.com>
This commit is contained in:
Andrew Mayorov 2023-12-28 19:11:33 +01:00
parent 331bfaa535
commit b9627c420f
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 130 additions and 69 deletions

View File

@ -157,22 +157,20 @@ with_subid(SubId, SubOpts) ->
do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) ->
%% FIXME: subscribe shard bug
%% https://emqx.atlassian.net/browse/EMQX-10214
case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
0 ->
true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
call(pick(Topic), {subscribe, Topic});
I ->
true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, maps:put(shard, I, SubOpts)}),
call(pick({Topic, I}), {subscribe, Topic, I})
end;
I = emqx_broker_helper:get_sub_shard(SubPid, Topic),
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}),
call(pick(Topic), {subscribe, Topic, SubPid, I});
do_subscribe(Topic = #share{group = Group, topic = RealTopic}, SubPid, SubOpts) when
is_binary(RealTopic)
->
true = ets:insert(?SUBOPTION, {{Topic, SubPid}, SubOpts}),
emqx_shared_sub:subscribe(Group, RealTopic, SubPid).
with_shard_idx(0, SubOpts) ->
SubOpts;
with_shard_idx(I, SubOpts) ->
maps:put(shard, I, SubOpts).
%%--------------------------------------------------------------------
%% Unsubscribe API
%%--------------------------------------------------------------------
@ -201,15 +199,12 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when
is_binary(Topic), is_pid(SubPid), is_map(SubOpts)
->
_ = emqx_broker_helper:reclaim_seq(Topic),
case maps:get(shard, SubOpts, 0) of
0 ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
emqx_exclusive_subscription:unsubscribe(Topic, SubOpts),
cast(pick(Topic), {unsubscribed, Topic});
I ->
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
cast(pick({Topic, I}), {unsubscribed, Topic, I})
end;
I = maps:get(shard, SubOpts, 0),
case I of
0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts);
_ -> ok
end,
cast(pick(Topic), {unsubscribed, Topic, SubPid, I});
do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when
is_binary(Group), is_binary(Topic), is_pid(SubPid)
->
@ -495,49 +490,38 @@ init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{pool => Pool, id => Id}}.
handle_call({subscribe, Topic}, _From, State) ->
Ok = emqx_router:do_add_route(Topic),
{reply, Ok, State};
handle_call({subscribe, Topic, I}, _From, State) ->
Shard = {Topic, I},
Ok =
case get(Shard) of
undefined ->
_ = put(Shard, true),
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}),
cast(pick(Topic), {subscribe, Topic});
true ->
ok
end,
{reply, Ok, State};
handle_call({subscribe, Topic, SubPid, 0}, _From, State) ->
Existed = ets:member(?SUBSCRIBER, Topic),
true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
Result = maybe_add_route(Existed, Topic),
{reply, Result, State};
handle_call({subscribe, Topic, SubPid, I}, _From, State) ->
Existed = ets:member(?SUBSCRIBER, Topic),
true = ets:insert(?SUBSCRIBER, [
{Topic, {shard, I}},
{{shard, Topic, I}, SubPid}
]),
Result = maybe_add_route(Existed, Topic),
{reply, Result, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.
handle_cast({subscribe, Topic}, State) ->
case emqx_router:do_add_route(Topic) of
ok -> ok;
{error, Reason} -> ?SLOG(error, #{msg => "failed_to_add_route", reason => Reason})
end,
handle_cast({unsubscribed, Topic, SubPid, 0}, State) ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
Exists = ets:member(?SUBSCRIBER, Topic),
_Result = maybe_delete_route(Exists, Topic),
{noreply, State};
handle_cast({unsubscribed, Topic}, State) ->
case ets:member(?SUBSCRIBER, Topic) of
false ->
_ = emqx_router:do_delete_route(Topic),
ok;
true ->
ok
end,
{noreply, State};
handle_cast({unsubscribed, Topic, I}, State) ->
handle_cast({unsubscribed, Topic, SubPid, I}, State) ->
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
false ->
_ = erase({Topic, I}),
true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
cast(pick(Topic), {unsubscribed, Topic});
ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}});
true ->
ok
true
end,
Exists = ets:member(?SUBSCRIBER, Topic),
_Result = maybe_delete_route(Exists, Topic),
{noreply, State};
handle_cast(Msg, State) ->
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
@ -594,3 +578,15 @@ do_dispatch({shard, I}, Topic, Msg) ->
0,
subscribers({shard, Topic, I})
).
%%
maybe_add_route(_Existed = false, Topic) ->
emqx_router:do_add_route(Topic);
maybe_add_route(_Existed = true, _Topic) ->
ok.
maybe_delete_route(_Exists = false, Topic) ->
emqx_router:do_delete_route(Topic);
maybe_delete_route(_Exists = true, _Topic) ->
ok.

View File

@ -160,13 +160,8 @@ do_add_route(Topic) when is_binary(Topic) ->
-spec do_add_route(emqx_types:topic(), dest()) -> ok | {error, term()}.
do_add_route(Topic, Dest) when is_binary(Topic) ->
case has_route(Topic, Dest) of
true ->
ok;
false ->
ok = emqx_router_helper:monitor(Dest),
mria_insert_route(get_schema_vsn(), Topic, Dest)
end.
mria_insert_route(get_schema_vsn(), Topic, Dest).
mria_insert_route(v2, Topic, Dest) ->
mria_insert_route_v2(Topic, Dest);

View File

@ -22,6 +22,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/asserts.hrl").
-include_lib("emqx/include/emqx_router.hrl").
all() ->
[
@ -33,7 +34,8 @@ all() ->
groups() ->
TCs = [
t_cluster_routing
t_cluster_routing,
t_slow_rlog_routing_consistency
],
[
{routing_schema_v1, [], TCs},
@ -54,11 +56,10 @@ end_per_group(_GroupName, Config) ->
emqx_cth_cluster:stop(?config(cluster, Config)).
init_per_testcase(TC, Config) ->
WorkDir = filename:join([?config(priv_dir, Config), ?MODULE, TC]),
[{work_dir, WorkDir} | Config].
emqx_common_test_helpers:init_per_testcase(?MODULE, TC, Config).
end_per_testcase(_TC, _Config) ->
ok.
end_per_testcase(TC, Config) ->
emqx_common_test_helpers:end_per_testcase(?MODULE, TC, Config).
mk_emqx_appspec(GroupName, N) ->
{emqx, #{
@ -182,14 +183,15 @@ unsubscribe(C, Topic) ->
%%
t_routing_schema_switch_v1(Config) ->
t_routing_schema_switch(_From = v2, _To = v1, Config).
WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
t_routing_schema_switch(_From = v2, _To = v1, WorkDir).
t_routing_schema_switch_v2(Config) ->
t_routing_schema_switch(_From = v1, _To = v2, Config).
WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
t_routing_schema_switch(_From = v1, _To = v2, WorkDir).
t_routing_schema_switch(VFrom, VTo, Config) ->
t_routing_schema_switch(VFrom, VTo, WorkDir) ->
% Start first node with routing schema VTo (e.g. v1)
WorkDir = ?config(work_dir, Config),
[Node1] = emqx_cth_cluster:start(
[
{routing_schema_switch1, #{
@ -254,8 +256,76 @@ t_routing_schema_switch(VFrom, VTo, Config) ->
ok = emqx_cth_cluster:stop(Nodes)
end.
t_slow_rlog_routing_consistency(init, Config) ->
[Core1, _Core2, _Replicant] = ?config(cluster, Config),
MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]),
[{original_mnesia_hook, MnesiaHook} | Config];
t_slow_rlog_routing_consistency('end', Config) ->
[Core1, Core2, _Replicant] = ?config(cluster, Config),
MnesiaHook = ?config(original_mnesia_hook, Config),
ok = register_mria_hook(MnesiaHook, [Core1, Core2]).
t_slow_rlog_routing_consistency(Config) ->
[Core1, Core2, Replicant] = ?config(cluster, Config),
ClientId = atom_to_binary(?FUNCTION_NAME),
Topic = <<"t/", ClientId/binary>>,
Self = self(),
?assertEqual(ok, rpc:call(Replicant, emqx_broker, do_subscribe, [Topic, Self, #{}])),
%% Wait for normal route replication (must be fast enough)
emqx_common_test_helpers:wait_for(
?FUNCTION_NAME,
?LINE,
fun() ->
rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant])
end,
2_000
),
DelayMs = 3_000,
slowdown_mria_rlog(?config(original_mnesia_hook, Config), [Core1, Core2], DelayMs),
{ok, _} = rpc:call(Replicant, mnesia_subscr, subscribe, [Self, {table, ?ROUTE_TAB, simple}]),
UnSubSubFun = fun() ->
%% Unsubscribe must remove a route, but the effect
%% is expected to be delayed on the replicant node
ok = emqx_broker:do_unsubscribe(Topic, Self, #{}),
%% Wait a little (less than introduced delay),
%% just to reduce the risk of delete/add routes ops being re-ordered
timer:sleep(100),
%% Subscribe must add a route again, even though the previosus
%% route may be still present on the replicant at the time of
%% this re-subscription
ok = emqx_broker:do_subscribe(Topic, Self, #{})
end,
?assertEqual(ok, erpc:call(Replicant, UnSubSubFun)),
receive
%% Can't match route record, since table name =/= record name,
{mnesia_table_event, {write, {?ROUTE_TAB, Topic, Replicant}, _}} ->
%% Event is reported before Mnesia writes a record, need to wait again...
timer:sleep(100),
?assert(rpc:call(Replicant, emqx_router, has_route, [Topic, Replicant]))
after DelayMs * 3 ->
ct:pal("Received messages: ~p", [process_info(Self, messages)]),
ct:fail("quick re-subscribe failed to add a route")
end.
%%
get_mqtt_tcp_port(Node) ->
{_, Port} = erpc:call(Node, emqx_config, get, [[listeners, tcp, default, bind]]),
Port.
slowdown_mria_rlog(MnesiaHook, Nodes, DelayMs) ->
MnesiaHook1 = fun(Tid, CommitData) ->
spawn(fun() ->
timer:sleep(DelayMs),
MnesiaHook(Tid, CommitData)
end),
ok
end,
register_mria_hook(MnesiaHook1, Nodes).
register_mria_hook(MnesiaHook, Nodes) ->
[ok, ok] = [
rpc:call(N, mnesia_hook, register_hook, [post_commit, MnesiaHook])
|| N <- Nodes
],
ok.