fix(route schema): allow boot if all peer nodes agree on one version

This commit is contained in:
zmstone 2024-03-25 16:15:07 +01:00
parent fe3cc25855
commit 3aff9eb2a4
1 changed files with 65 additions and 56 deletions

View File

@ -656,16 +656,16 @@ init_schema() ->
ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]),
ok = emqx_trie:wait_for_tables(),
ConfSchema = emqx_config:get([broker, routing, storage_schema]),
ClusterState = discover_cluster_schema_vsn(),
Schema = choose_schema_vsn(ConfSchema, ClusterState),
{ClusterSchema, ClusterState} = discover_cluster_schema_vsn(),
Schema = choose_schema_vsn(ConfSchema, ClusterSchema, ClusterState),
ok = persistent_term:put(?PT_SCHEMA_VSN, Schema),
case Schema of
ConfSchema ->
case Schema =:= ConfSchema of
true ->
?SLOG(info, #{
msg => "routing_schema_used",
schema => Schema
});
_ ->
false ->
?SLOG(notice, #{
msg => "configured_routing_schema_ignored",
schema_in_use => Schema,
@ -687,6 +687,7 @@ discover_cluster_schema_vsn() ->
discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]).
discover_cluster_schema_vsn([]) ->
%% single node
undefined;
discover_cluster_schema_vsn(Nodes) ->
Responses = lists:zipwith(
@ -709,68 +710,72 @@ discover_cluster_schema_vsn(Nodes) ->
[Vsn] when Vsn =:= v1; Vsn =:= v2 ->
{Vsn, Responses};
[] ->
?SLOG(notice, #{
?SLOG(warning, #{
msg => "cluster_routing_schema_discovery_failed",
responses => Responses,
reason =>
"Could not determine configured routing storage schema in the cluster."
"Could not determine configured routing storage schema in peer nodes."
}),
{undefined, Responses};
[_ | _] ->
Desc = schema_conflict_reason(config, Responses),
io:format(standard_error, "Error: ~ts~n", [Desc]),
?SLOG(critical, #{
msg => "conflicting_routing_schemas_configured_in_cluster",
msg => "conflicting_routing_schemas_in_cluster",
responses => Responses,
reason =>
"There are nodes in the cluster with different configured routing "
"storage schemas. This probably means that some nodes use v1 schema "
"and some use v2, independently of each other. The routing is likely "
"broken. Manual intervention required.",
action => mk_conflict_resolution_action(Responses)
description => Desc
}),
error(conflicting_routing_schemas_configured_in_cluster)
end.
-spec choose_schema_vsn(
schemavsn(),
_ClusterState :: {schemavsn() | undefined, [{node(), schemavsn() | undefined, _Details}]}
_ClusterSchema :: schemavsn() | undefined,
_ClusterState :: [{node(), schemavsn() | undefined, _Details}]
) -> schemavsn().
choose_schema_vsn(ConfSchema, {ClusterSchema, State}) ->
choose_schema_vsn(ConfSchema, ClusterSchema, State) ->
case detect_table_schema_vsn() of
[ClusterSchema] ->
%% Table contents match configured schema in the cluster.
ClusterSchema;
[Schema] when ClusterSchema =:= undefined ->
%% There are existing records following some schema, we have to use it.
Schema;
[] ->
%% No records in the tables, use schema configured in the cluster if any,
%% otherwise use configured.
emqx_maybe:define(ClusterSchema, ConfSchema);
ConlictingSchemas ->
Reason =
"There are records in the routing tables either related to both v1 "
"and v2 storage schemas, or conflicting with storage schema assumed "
"by the cluster. This probably means that some nodes in the cluster "
"use v1 schema and some use v2, independently of each other. The "
"routing is likely broken. Manual intervention and full cluster "
"restart is required. This node will shut down.",
Action = mk_conflict_resolution_action(State),
[Schema] when Schema =:= ClusterSchema ->
%% Table contents match configured schema in the cluster.
Schema;
[Schema] when ClusterSchema =:= undefined ->
%% There are existing records following some schema, we have to use it.
Schema;
[v1, v2] when ClusterSchema =/= undefined ->
%% There are existing records in both v1 and v2 schema,
%% we have to use what the peer nodes agreed on.
%% because it could be HTIS node which caused cnoflict.
%%
%% The stale records will be left-over, but harmless
ClusterSchema;
[v1, v2] ->
Desc = schema_conflict_reason(records, State),
io:format(standard_error, "Error: ~ts~n", [Desc]),
?SLOG(critical, #{
msg => "conflicting_routing_schemas_detected_in_cluster",
detected => ConlictingSchemas,
configured => ConfSchema,
configured_in_cluster => ClusterSchema,
reason => Reason,
action => Action
msg => "conflicting_routing_storage_in_cluster",
description => Desc
}),
io:format(
standard_error,
"Error: conflicting routing schemas detected in the cluster.\n~s\n~s\n",
[Reason, Action]
),
error(conflicting_routing_schemas_detected_in_cluster)
end.
schema_conflict_reason(Type, State) ->
Observe =
case Type of
config ->
"Peer nodes have route storage schema resolved into conflicting versions.\n";
records ->
"There are conflicting routing records found.\n"
end,
Cause =
"\nThis was caused by a race-condition when the cluster was rolling upgraded "
"from an older version to 5.4.0, 5.4.1, 5.5.0 or 5.5.1."
"\nThis node cannot boot before the conflicts are resolved.\n",
Observe ++ Cause ++ mk_conflict_resolution_action(State).
detect_table_schema_vsn() ->
lists:flatten([
[v1 || _NonEmptyTrieIndex = not emqx_trie:empty()],
@ -784,23 +789,27 @@ mk_conflict_resolution_action(State) ->
NodesV1 = [Node || {Node, v1, _} <- State],
NodesUnknown = [Node || {Node, unknown, _} <- State],
Format =
"Following EMQX nodes are running with conflicting schema:"
"\n ~p"
"Please take the following steps to resolve the conflict:"
"\n 1. Stop listeners on those nodes: `$ emqx eval 'emqx_listener:stop()'`"
"\n 2. Wait until they are safe to restart."
"\n This could take some time, depending on the number of clients and their subscriptions."
"\n The following conditions should be both true for each of the nodes in order to proceed:"
"\n * `$ emqx eval 'ets:info(emqx_subscriber, size)'` prints `0`."
"\n * `$ emqx ctl topics list` prints `No topics.`"
"\n 3. Upgrade the nodes to version ~s."
"\n 4. Restart the nodes.",
"There are two ways to resolve the conflict:"
"\n"
"\nA: Full cluster restart: stop ALL running nodes one by one "
"and restart them in the reversed order."
"\n"
"\nB: Force v1 nodes to clean up their routes."
"\n Following EMQX nodes are running with v1 schema: ~0p."
"\n 1. Stop listeners with command \"emqx eval 'emqx_listener:stop()'\" in all v1 nodes"
"\n 2. Wait until they are safe to restart."
"\n This could take some time, depending on the number of clients and their subscriptions."
"\n Below conditions should be true for each of the nodes in order to proceed:"
"\n a) Command 'ets:info(emqx_subscriber, size)' prints `0`."
"\n b) Command 'emqx ctl topics list' prints No topics.`"
"\n 3. Upgrade the nodes to 5.6.0 or newer"
"\n 4. Restart the node",
FormatUnkown =
"Additionally, the following nodes were unreachable during startup: ~p."
"Additionally, the following nodes were unreachable during startup: ~0p."
"It is strongly advised to include them in the manual resolution procedure as well.",
Message = io_lib:format(Format, [NodesV1, emqx_release:version_with_prefix()]),
Message = io_lib:format(Format, [NodesV1]),
MessageUnknown = [io_lib:format(FormatUnkown, [NodesUnknown]) || NodesUnknown =/= []],
unicode:characters_to_list(Message ++ "\n" ++ MessageUnknown).
unicode:characters_to_list([Message, "\n", MessageUnknown]).
%%--------------------------------------------------------------------
%% gen_server callbacks