fix(routing): add heuristic for routing schema in empty cluster
When the cluster is empty but comprised of different versions of EMQX, try to additionally ask the cluster nodes for the routing schema in use, to be able to make more informed decisions.
This commit is contained in:
parent
c2ace30318
commit
ba276d807f
|
@ -654,7 +654,8 @@ init_schema() ->
|
|||
ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]),
|
||||
ok = emqx_trie:wait_for_tables(),
|
||||
ConfSchema = emqx_config:get([broker, routing, storage_schema]),
|
||||
Schema = choose_schema_vsn(ConfSchema),
|
||||
ClusterSchema = discover_cluster_schema_vsn(),
|
||||
Schema = choose_schema_vsn(ConfSchema, ClusterSchema),
|
||||
ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),
|
||||
case Schema of
|
||||
ConfSchema ->
|
||||
|
@ -669,8 +670,7 @@ init_schema() ->
|
|||
configured => ConfSchema,
|
||||
reason =>
|
||||
"Could not use configured routing storage schema because "
|
||||
"there are already non-empty routing tables pertaining to "
|
||||
"another schema."
|
||||
"cluster is already running with a different schema."
|
||||
})
|
||||
end.
|
||||
|
||||
|
@ -679,31 +679,89 @@ deinit_schema() ->
|
|||
_ = persistent_term:erase(?PT_SCHEMA_VSN),
|
||||
ok.
|
||||
|
||||
-spec choose_schema_vsn(schemavsn()) -> schemavsn().
|
||||
choose_schema_vsn(ConfType) ->
|
||||
IsEmptyIndex = emqx_trie:empty(),
|
||||
IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS),
|
||||
case {IsEmptyIndex, IsEmptyFilters} of
|
||||
{true, true} ->
|
||||
ConfType;
|
||||
{false, true} ->
|
||||
v1;
|
||||
{true, false} ->
|
||||
v2;
|
||||
{false, false} ->
|
||||
-spec discover_cluster_schema_vsn() -> schemavsn() | undefined.
|
||||
discover_cluster_schema_vsn() ->
|
||||
discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]).
|
||||
|
||||
discover_cluster_schema_vsn([]) ->
|
||||
undefined;
|
||||
discover_cluster_schema_vsn(Nodes) ->
|
||||
Responses = lists:zipwith(
|
||||
fun
|
||||
(Node, {ok, Schema}) ->
|
||||
{Node, Schema, configured};
|
||||
(Node, {error, {exception, undef, _Stacktrace}}) ->
|
||||
%% No such function on the remote node, assuming it doesn't know about v2 routing.
|
||||
{Node, v1, legacy};
|
||||
(Node, {error, {exception, badarg, _Stacktrace}}) ->
|
||||
%% Likely, persistent term is not defined yet.
|
||||
{Node, unknown, starting};
|
||||
(Node, Error) ->
|
||||
{Node, unknown, Error}
|
||||
end,
|
||||
Nodes,
|
||||
emqx_router_proto_v1:get_routing_schema_vsn(Nodes)
|
||||
),
|
||||
case lists:usort([Vsn || {_Node, Vsn, _} <- Responses, Vsn /= unknown]) of
|
||||
[Vsn] when Vsn =:= v1; Vsn =:= v2 ->
|
||||
Vsn;
|
||||
[] ->
|
||||
?SLOG(notice, #{
|
||||
msg => "cluster_routing_schema_discovery_failed",
|
||||
responses => Responses,
|
||||
reason =>
|
||||
"Could not determine configured routing storage schema in the cluster."
|
||||
}),
|
||||
undefined;
|
||||
[_ | _] ->
|
||||
?SLOG(critical, #{
|
||||
msg => "conflicting_routing_schemas_configured_in_cluster",
|
||||
responses => Responses,
|
||||
reason =>
|
||||
"There are nodes in the cluster with different configured routing "
|
||||
"storage schemas. This probably means that some nodes use v1 schema "
|
||||
"and some use v2, independently of each other. The routing is likely "
|
||||
"broken. Manual intervention required."
|
||||
}),
|
||||
error(conflicting_routing_schemas_configured_in_cluster)
|
||||
end.
|
||||
|
||||
-spec choose_schema_vsn(schemavsn(), schemavsn() | undefined) -> schemavsn().
|
||||
choose_schema_vsn(ConfSchema, ClusterSchema) ->
|
||||
case detect_table_schema_vsn() of
|
||||
[ClusterSchema] ->
|
||||
%% Table contents match configured schema in the cluster.
|
||||
ClusterSchema;
|
||||
[Schema] when ClusterSchema =:= undefined ->
|
||||
%% There are existing records following some schema, we have to use it.
|
||||
Schema;
|
||||
[] ->
|
||||
%% No records in the tables, use schema configured in the cluster if any,
|
||||
%% otherwise use configured.
|
||||
emqx_maybe:define(ClusterSchema, ConfSchema);
|
||||
ConlictingSchemas ->
|
||||
?SLOG(critical, #{
|
||||
msg => "conflicting_routing_schemas_detected_in_cluster",
|
||||
configured => ConfType,
|
||||
detected => ConlictingSchemas,
|
||||
configured => ConfSchema,
|
||||
configured_in_cluster => ClusterSchema,
|
||||
reason =>
|
||||
"There are records in the routing tables related to both v1 "
|
||||
"and v2 storage schemas. This probably means that some nodes "
|
||||
"in the cluster use v1 schema and some use v2, independently "
|
||||
"of each other. The routing is likely broken. Manual intervention "
|
||||
"and full cluster restart is required. This node will shut down."
|
||||
"There are records in the routing tables either related to both v1 "
|
||||
"and v2 storage schemas, or conflicting with storage schema assumed "
|
||||
"by the cluster. This probably means that some nodes in the cluster "
|
||||
"use v1 schema and some use v2, independently of each other. The "
|
||||
"routing is likely broken. Manual intervention and full cluster "
|
||||
"restart is required. This node will shut down."
|
||||
}),
|
||||
error(conflicting_routing_schemas_detected_in_cluster)
|
||||
end.
|
||||
|
||||
detect_table_schema_vsn() ->
|
||||
lists:flatten([
|
||||
[v1 || _NonEmptyTrieIndex = not emqx_trie:empty()],
|
||||
[v2 || _NonEmptyFilterTab = not is_empty(?ROUTE_TAB_FILTERS)]
|
||||
]).
|
||||
|
||||
is_empty(Tab) ->
|
||||
ets:first(Tab) =:= '$end_of_table'.
|
||||
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_router_proto_v1).
|
||||
|
||||
-behaviour(emqx_bpapi).
|
||||
|
||||
-export([introduced_in/0]).
|
||||
|
||||
-export([
|
||||
get_routing_schema_vsn/1
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
-define(TIMEOUT, 3_000).
|
||||
|
||||
introduced_in() ->
|
||||
"5.6.0".
|
||||
|
||||
-spec get_routing_schema_vsn([node()]) ->
|
||||
[emqx_rpc:erpc(emqx_router:schema_vsn())].
|
||||
get_routing_schema_vsn(Nodes) ->
|
||||
erpc:multicall(Nodes, emqx_router, get_schema_vsn, [], ?TIMEOUT).
|
|
@ -30,7 +30,8 @@ all() ->
|
|||
{group, routing_schema_v1},
|
||||
{group, routing_schema_v2},
|
||||
t_routing_schema_switch_v1,
|
||||
t_routing_schema_switch_v2
|
||||
t_routing_schema_switch_v2,
|
||||
t_routing_schema_consistent_clean_cluster
|
||||
].
|
||||
|
||||
groups() ->
|
||||
|
@ -477,6 +478,60 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) ->
|
|||
ok = emqx_cth_cluster:stop(Nodes)
|
||||
end.
|
||||
|
||||
t_routing_schema_consistent_clean_cluster(Config) ->
|
||||
WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config),
|
||||
% Start first node with routing schema v1
|
||||
[Node1] = emqx_cth_cluster:start(
|
||||
[
|
||||
{routing_schema_consistent1, #{
|
||||
apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, v1)]
|
||||
}}
|
||||
],
|
||||
#{work_dir => WorkDir}
|
||||
),
|
||||
% Start rest of nodes with routing schema v2
|
||||
NodesRest = emqx_cth_cluster:start(
|
||||
[
|
||||
{routing_schema_consistent2, #{
|
||||
apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, v2)],
|
||||
base_port => 20000,
|
||||
join_to => Node1
|
||||
}},
|
||||
{routing_schema_consistent3, #{
|
||||
apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, v2)],
|
||||
base_port => 20100,
|
||||
join_to => Node1
|
||||
}}
|
||||
],
|
||||
#{work_dir => WorkDir}
|
||||
),
|
||||
Nodes = [Node1 | NodesRest],
|
||||
try
|
||||
% Verify that cluser is still on v1
|
||||
?assertEqual(
|
||||
[{ok, v1} || _ <- Nodes],
|
||||
erpc:multicall(Nodes, emqx_router, get_schema_vsn, [])
|
||||
),
|
||||
% Wait for all nodes to agree on cluster state
|
||||
?retry(
|
||||
500,
|
||||
10,
|
||||
?assertEqual(
|
||||
[{ok, Nodes} || _ <- Nodes],
|
||||
erpc:multicall(Nodes, emqx, running_nodes, [])
|
||||
)
|
||||
),
|
||||
C1 = start_client(Node1),
|
||||
C2 = start_client(hd(NodesRest)),
|
||||
ok = subscribe(C2, <<"t/#">>),
|
||||
{ok, _} = publish(C1, <<"t/a/b/c">>, <<"yayconsistency">>),
|
||||
?assertReceive({pub, C2, #{topic := <<"t/a/b/c">>, payload := <<"yayconsistency">>}}),
|
||||
ok = emqtt:stop(C1),
|
||||
ok = emqtt:stop(C2)
|
||||
after
|
||||
ok = emqx_cth_cluster:stop(Nodes)
|
||||
end.
|
||||
|
||||
t_slow_rlog_routing_consistency(init, Config) ->
|
||||
[Core1, _Core2, _Replicant] = ?config(cluster, Config),
|
||||
MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]),
|
||||
|
|
Loading…
Reference in New Issue