diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 88999823b..90ba7e605 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -656,16 +656,16 @@ init_schema() -> ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]), ok = emqx_trie:wait_for_tables(), ConfSchema = emqx_config:get([broker, routing, storage_schema]), - ClusterState = discover_cluster_schema_vsn(), - Schema = choose_schema_vsn(ConfSchema, ClusterState), + {ClusterSchema, ClusterState} = discover_cluster_schema_vsn(), + Schema = choose_schema_vsn(ConfSchema, ClusterSchema, ClusterState), ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), - case Schema of - ConfSchema -> + case Schema =:= ConfSchema of + true -> ?SLOG(info, #{ msg => "routing_schema_used", schema => Schema }); - _ -> + false -> ?SLOG(notice, #{ msg => "configured_routing_schema_ignored", schema_in_use => Schema, @@ -687,6 +687,7 @@ discover_cluster_schema_vsn() -> discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]). discover_cluster_schema_vsn([]) -> + %% single node undefined; discover_cluster_schema_vsn(Nodes) -> Responses = lists:zipwith( @@ -709,68 +710,72 @@ discover_cluster_schema_vsn(Nodes) -> [Vsn] when Vsn =:= v1; Vsn =:= v2 -> {Vsn, Responses}; [] -> - ?SLOG(notice, #{ + ?SLOG(warning, #{ msg => "cluster_routing_schema_discovery_failed", responses => Responses, reason => - "Could not determine configured routing storage schema in the cluster." + "Could not determine configured routing storage schema in peer nodes." }), {undefined, Responses}; [_ | _] -> + Desc = schema_conflict_reason(config, Responses), + io:format(standard_error, "Error: ~ts~n", [Desc]), ?SLOG(critical, #{ - msg => "conflicting_routing_schemas_configured_in_cluster", + msg => "conflicting_routing_schemas_in_cluster", responses => Responses, - reason => - "There are nodes in the cluster with different configured routing " - "storage schemas. This probably means that some nodes use v1 schema " - "and some use v2, independently of each other. The routing is likely " - "broken. Manual intervention required.", - action => mk_conflict_resolution_action(Responses) + description => Desc }), error(conflicting_routing_schemas_configured_in_cluster) end. -spec choose_schema_vsn( schemavsn(), - _ClusterState :: {schemavsn() | undefined, [{node(), schemavsn() | undefined, _Details}]} + _ClusterSchema :: schemavsn() | undefined, + _ClusterState :: [{node(), schemavsn() | undefined, _Details}] ) -> schemavsn(). -choose_schema_vsn(ConfSchema, {ClusterSchema, State}) -> +choose_schema_vsn(ConfSchema, ClusterSchema, State) -> case detect_table_schema_vsn() of - [ClusterSchema] -> - %% Table contents match configured schema in the cluster. - ClusterSchema; - [Schema] when ClusterSchema =:= undefined -> - %% There are existing records following some schema, we have to use it. - Schema; [] -> %% No records in the tables, use schema configured in the cluster if any, %% otherwise use configured. emqx_maybe:define(ClusterSchema, ConfSchema); - ConlictingSchemas -> - Reason = - "There are records in the routing tables either related to both v1 " - "and v2 storage schemas, or conflicting with storage schema assumed " - "by the cluster. This probably means that some nodes in the cluster " - "use v1 schema and some use v2, independently of each other. The " - "routing is likely broken. Manual intervention and full cluster " - "restart is required. This node will shut down.", - Action = mk_conflict_resolution_action(State), + [Schema] when Schema =:= ClusterSchema -> + %% Table contents match configured schema in the cluster. + Schema; + [Schema] when ClusterSchema =:= undefined -> + %% There are existing records following some schema, we have to use it. + Schema; + [v1, v2] when ClusterSchema =/= undefined -> + %% There are existing records in both v1 and v2 schema, + %% we have to use what the peer nodes agreed on. + %% because it could be HTIS node which caused cnoflict. + %% + %% The stale records will be left-over, but harmless + ClusterSchema; + [v1, v2] -> + Desc = schema_conflict_reason(records, State), + io:format(standard_error, "Error: ~ts~n", [Desc]), ?SLOG(critical, #{ - msg => "conflicting_routing_schemas_detected_in_cluster", - detected => ConlictingSchemas, - configured => ConfSchema, - configured_in_cluster => ClusterSchema, - reason => Reason, - action => Action + msg => "conflicting_routing_storage_in_cluster", + description => Desc }), - io:format( - standard_error, - "Error: conflicting routing schemas detected in the cluster.\n~s\n~s\n", - [Reason, Action] - ), error(conflicting_routing_schemas_detected_in_cluster) end. +schema_conflict_reason(Type, State) -> + Observe = + case Type of + config -> + "Peer nodes have route storage schema resolved into conflicting versions.\n"; + records -> + "There are conflicting routing records found.\n" + end, + Cause = + "\nThis was caused by a race-condition when the cluster was rolling upgraded " + "from an older version to 5.4.0, 5.4.1, 5.5.0 or 5.5.1." + "\nThis node cannot boot before the conflicts are resolved.\n", + Observe ++ Cause ++ mk_conflict_resolution_action(State). + detect_table_schema_vsn() -> lists:flatten([ [v1 || _NonEmptyTrieIndex = not emqx_trie:empty()], @@ -784,23 +789,27 @@ mk_conflict_resolution_action(State) -> NodesV1 = [Node || {Node, v1, _} <- State], NodesUnknown = [Node || {Node, unknown, _} <- State], Format = - "Following EMQX nodes are running with conflicting schema:" - "\n ~p" - "Please take the following steps to resolve the conflict:" - "\n 1. Stop listeners on those nodes: `$ emqx eval 'emqx_listener:stop()'`" - "\n 2. Wait until they are safe to restart." - "\n This could take some time, depending on the number of clients and their subscriptions." - "\n The following conditions should be both true for each of the nodes in order to proceed:" - "\n * `$ emqx eval 'ets:info(emqx_subscriber, size)'` prints `0`." - "\n * `$ emqx ctl topics list` prints `No topics.`" - "\n 3. Upgrade the nodes to version ~s." - "\n 4. Restart the nodes.", + "There are two ways to resolve the conflict:" + "\n" + "\nA: Full cluster restart: stop ALL running nodes one by one " + "and restart them in the reversed order." + "\n" + "\nB: Force v1 nodes to clean up their routes." + "\n Following EMQX nodes are running with v1 schema: ~0p." + "\n 1. Stop listeners with command \"emqx eval 'emqx_listener:stop()'\" in all v1 nodes" + "\n 2. Wait until they are safe to restart." + "\n This could take some time, depending on the number of clients and their subscriptions." + "\n Below conditions should be true for each of the nodes in order to proceed:" + "\n a) Command 'ets:info(emqx_subscriber, size)' prints `0`." + "\n b) Command 'emqx ctl topics list' prints No topics.`" + "\n 3. Upgrade the nodes to 5.6.0 or newer" + "\n 4. Restart the node", FormatUnkown = - "Additionally, the following nodes were unreachable during startup: ~p." + "Additionally, the following nodes were unreachable during startup: ~0p." "It is strongly advised to include them in the manual resolution procedure as well.", - Message = io_lib:format(Format, [NodesV1, emqx_release:version_with_prefix()]), + Message = io_lib:format(Format, [NodesV1]), MessageUnknown = [io_lib:format(FormatUnkown, [NodesUnknown]) || NodesUnknown =/= []], - unicode:characters_to_list(Message ++ "\n" ++ MessageUnknown). + unicode:characters_to_list([Message, "\n", MessageUnknown]). %%-------------------------------------------------------------------- %% gen_server callbacks