From ba276d807f819afe52e2115feb5678987d376416 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 22 Mar 2024 14:54:41 +0100 Subject: [PATCH 01/15] fix(routing): add heuristic for routing schema in empty cluster When the cluster is empty but comprised of different versions of EMQX, try to additionally ask the cluster nodes for the routing schema in use, to be able to make more informed decisions. --- apps/emqx/src/emqx_router.erl | 100 +++++++++++++++---- apps/emqx/src/proto/emqx_router_proto_v1.erl | 37 +++++++ apps/emqx/test/emqx_routing_SUITE.erl | 57 ++++++++++- 3 files changed, 172 insertions(+), 22 deletions(-) create mode 100644 apps/emqx/src/proto/emqx_router_proto_v1.erl diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index ef9aac628..9cad50714 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -654,7 +654,8 @@ init_schema() -> ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]), ok = emqx_trie:wait_for_tables(), ConfSchema = emqx_config:get([broker, routing, storage_schema]), - Schema = choose_schema_vsn(ConfSchema), + ClusterSchema = discover_cluster_schema_vsn(), + Schema = choose_schema_vsn(ConfSchema, ClusterSchema), ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), case Schema of ConfSchema -> @@ -669,8 +670,7 @@ init_schema() -> configured => ConfSchema, reason => "Could not use configured routing storage schema because " - "there are already non-empty routing tables pertaining to " - "another schema." + "cluster is already running with a different schema." }) end. @@ -679,31 +679,89 @@ deinit_schema() -> _ = persistent_term:erase(?PT_SCHEMA_VSN), ok. --spec choose_schema_vsn(schemavsn()) -> schemavsn(). -choose_schema_vsn(ConfType) -> - IsEmptyIndex = emqx_trie:empty(), - IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS), - case {IsEmptyIndex, IsEmptyFilters} of - {true, true} -> - ConfType; - {false, true} -> - v1; - {true, false} -> - v2; - {false, false} -> +-spec discover_cluster_schema_vsn() -> schemavsn() | undefined. +discover_cluster_schema_vsn() -> + discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]). + +discover_cluster_schema_vsn([]) -> + undefined; +discover_cluster_schema_vsn(Nodes) -> + Responses = lists:zipwith( + fun + (Node, {ok, Schema}) -> + {Node, Schema, configured}; + (Node, {error, {exception, undef, _Stacktrace}}) -> + %% No such function on the remote node, assuming it doesn't know about v2 routing. + {Node, v1, legacy}; + (Node, {error, {exception, badarg, _Stacktrace}}) -> + %% Likely, persistent term is not defined yet. + {Node, unknown, starting}; + (Node, Error) -> + {Node, unknown, Error} + end, + Nodes, + emqx_router_proto_v1:get_routing_schema_vsn(Nodes) + ), + case lists:usort([Vsn || {_Node, Vsn, _} <- Responses, Vsn /= unknown]) of + [Vsn] when Vsn =:= v1; Vsn =:= v2 -> + Vsn; + [] -> + ?SLOG(notice, #{ + msg => "cluster_routing_schema_discovery_failed", + responses => Responses, + reason => + "Could not determine configured routing storage schema in the cluster." + }), + undefined; + [_ | _] -> + ?SLOG(critical, #{ + msg => "conflicting_routing_schemas_configured_in_cluster", + responses => Responses, + reason => + "There are nodes in the cluster with different configured routing " + "storage schemas. This probably means that some nodes use v1 schema " + "and some use v2, independently of each other. The routing is likely " + "broken. Manual intervention required." + }), + error(conflicting_routing_schemas_configured_in_cluster) + end. + +-spec choose_schema_vsn(schemavsn(), schemavsn() | undefined) -> schemavsn(). +choose_schema_vsn(ConfSchema, ClusterSchema) -> + case detect_table_schema_vsn() of + [ClusterSchema] -> + %% Table contents match configured schema in the cluster. + ClusterSchema; + [Schema] when ClusterSchema =:= undefined -> + %% There are existing records following some schema, we have to use it. + Schema; + [] -> + %% No records in the tables, use schema configured in the cluster if any, + %% otherwise use configured. + emqx_maybe:define(ClusterSchema, ConfSchema); + ConlictingSchemas -> ?SLOG(critical, #{ msg => "conflicting_routing_schemas_detected_in_cluster", - configured => ConfType, + detected => ConlictingSchemas, + configured => ConfSchema, + configured_in_cluster => ClusterSchema, reason => - "There are records in the routing tables related to both v1 " - "and v2 storage schemas. This probably means that some nodes " - "in the cluster use v1 schema and some use v2, independently " - "of each other. The routing is likely broken. Manual intervention " - "and full cluster restart is required. This node will shut down." + "There are records in the routing tables either related to both v1 " + "and v2 storage schemas, or conflicting with storage schema assumed " + "by the cluster. This probably means that some nodes in the cluster " + "use v1 schema and some use v2, independently of each other. The " + "routing is likely broken. Manual intervention and full cluster " + "restart is required. This node will shut down." }), error(conflicting_routing_schemas_detected_in_cluster) end. +detect_table_schema_vsn() -> + lists:flatten([ + [v1 || _NonEmptyTrieIndex = not emqx_trie:empty()], + [v2 || _NonEmptyFilterTab = not is_empty(?ROUTE_TAB_FILTERS)] + ]). + is_empty(Tab) -> ets:first(Tab) =:= '$end_of_table'. diff --git a/apps/emqx/src/proto/emqx_router_proto_v1.erl b/apps/emqx/src/proto/emqx_router_proto_v1.erl new file mode 100644 index 000000000..f1c08f893 --- /dev/null +++ b/apps/emqx/src/proto/emqx_router_proto_v1.erl @@ -0,0 +1,37 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_router_proto_v1). + +-behaviour(emqx_bpapi). + +-export([introduced_in/0]). + +-export([ + get_routing_schema_vsn/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 3_000). + +introduced_in() -> + "5.6.0". + +-spec get_routing_schema_vsn([node()]) -> + [emqx_rpc:erpc(emqx_router:schema_vsn())]. +get_routing_schema_vsn(Nodes) -> + erpc:multicall(Nodes, emqx_router, get_schema_vsn, [], ?TIMEOUT). diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 86c7dd1e2..5112059ca 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -30,7 +30,8 @@ all() -> {group, routing_schema_v1}, {group, routing_schema_v2}, t_routing_schema_switch_v1, - t_routing_schema_switch_v2 + t_routing_schema_switch_v2, + t_routing_schema_consistent_clean_cluster ]. groups() -> @@ -477,6 +478,60 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) -> ok = emqx_cth_cluster:stop(Nodes) end. +t_routing_schema_consistent_clean_cluster(Config) -> + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + % Start first node with routing schema v1 + [Node1] = emqx_cth_cluster:start( + [ + {routing_schema_consistent1, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, v1)] + }} + ], + #{work_dir => WorkDir} + ), + % Start rest of nodes with routing schema v2 + NodesRest = emqx_cth_cluster:start( + [ + {routing_schema_consistent2, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, v2)], + base_port => 20000, + join_to => Node1 + }}, + {routing_schema_consistent3, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, v2)], + base_port => 20100, + join_to => Node1 + }} + ], + #{work_dir => WorkDir} + ), + Nodes = [Node1 | NodesRest], + try + % Verify that cluser is still on v1 + ?assertEqual( + [{ok, v1} || _ <- Nodes], + erpc:multicall(Nodes, emqx_router, get_schema_vsn, []) + ), + % Wait for all nodes to agree on cluster state + ?retry( + 500, + 10, + ?assertEqual( + [{ok, Nodes} || _ <- Nodes], + erpc:multicall(Nodes, emqx, running_nodes, []) + ) + ), + C1 = start_client(Node1), + C2 = start_client(hd(NodesRest)), + ok = subscribe(C2, <<"t/#">>), + {ok, _} = publish(C1, <<"t/a/b/c">>, <<"yayconsistency">>), + ?assertReceive({pub, C2, #{topic := <<"t/a/b/c">>, payload := <<"yayconsistency">>}}), + ok = emqtt:stop(C1), + ok = emqtt:stop(C2) + after + ok = emqx_cth_cluster:stop(Nodes) + end. + t_slow_rlog_routing_consistency(init, Config) -> [Core1, _Core2, _Replicant] = ?config(cluster, Config), MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]), From b81a11b790ae4ea1f0073e30492e14ff85a9c6e7 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 22 Mar 2024 15:14:53 +0100 Subject: [PATCH 02/15] chore(routing): mark function that is now an RPC target --- apps/emqx/src/emqx_router.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 9cad50714..0736a7082 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -645,6 +645,8 @@ match_to_route(M) -> -type schemavsn() :: v1 | v2. +%% @doc Get the schema version in use. +%% BPAPI RPC Target @ emqx_router_proto -spec get_schema_vsn() -> schemavsn(). get_schema_vsn() -> persistent_term:get(?PT_SCHEMA_VSN). From a0f500d1d06b5eb355670b8abe1db8a4fd89ce46 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 22 Mar 2024 15:21:03 +0100 Subject: [PATCH 03/15] chore: add changelog entry --- changes/ce/fix-12768.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ce/fix-12768.en.md diff --git a/changes/ce/fix-12768.en.md b/changes/ce/fix-12768.en.md new file mode 100644 index 000000000..1de56afee --- /dev/null +++ b/changes/ce/fix-12768.en.md @@ -0,0 +1 @@ +When the cluster is empty (more precisely, routing tables are empty), try to additionally ask the cluster nodes for the routing schema in use, to make more informed decision about routing storage schema upon startup. This should make routing storage schema less likely to diverge across cluster nodes, especially when the cluster is composed of different versions of EMQX. From be1886fb9109064fa46570052d118731f0f029a9 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Fri, 22 Mar 2024 15:56:44 +0100 Subject: [PATCH 04/15] chore: make dyalizer happy and update changelog --- apps/emqx/src/emqx_router.erl | 4 ++-- apps/emqx/src/proto/emqx_router_proto_v1.erl | 2 +- changes/ce/fix-12768.en.md | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 0736a7082..380dddc55 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -92,9 +92,11 @@ ]). -export_type([dest/0]). +-export_type([schemavsn/0]). -type group() :: binary(). -type dest() :: node() | {group(), node()}. +-type schemavsn() :: v1 | v2. %% Operation :: {add, ...} | {delete, ...}. -type batch() :: #{batch_route() => _Operation :: tuple()}. @@ -643,8 +645,6 @@ match_to_route(M) -> -define(PT_SCHEMA_VSN, {?MODULE, schemavsn}). --type schemavsn() :: v1 | v2. - %% @doc Get the schema version in use. %% BPAPI RPC Target @ emqx_router_proto -spec get_schema_vsn() -> schemavsn(). diff --git a/apps/emqx/src/proto/emqx_router_proto_v1.erl b/apps/emqx/src/proto/emqx_router_proto_v1.erl index f1c08f893..78b39f506 100644 --- a/apps/emqx/src/proto/emqx_router_proto_v1.erl +++ b/apps/emqx/src/proto/emqx_router_proto_v1.erl @@ -32,6 +32,6 @@ introduced_in() -> "5.6.0". -spec get_routing_schema_vsn([node()]) -> - [emqx_rpc:erpc(emqx_router:schema_vsn())]. + [emqx_rpc:erpc(emqx_router:schemavsn())]. get_routing_schema_vsn(Nodes) -> erpc:multicall(Nodes, emqx_router, get_schema_vsn, [], ?TIMEOUT). diff --git a/changes/ce/fix-12768.en.md b/changes/ce/fix-12768.en.md index 1de56afee..7d101f1a7 100644 --- a/changes/ce/fix-12768.en.md +++ b/changes/ce/fix-12768.en.md @@ -1 +1,3 @@ +Fixed an issue which may occur when upgrading from versions prior to 5.4 to 5.4 or later. + When the cluster is empty (more precisely, routing tables are empty), try to additionally ask the cluster nodes for the routing schema in use, to make more informed decision about routing storage schema upon startup. This should make routing storage schema less likely to diverge across cluster nodes, especially when the cluster is composed of different versions of EMQX. From cbc84900b2c52ce6920768aba4f2625c7b0ac003 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 22 Mar 2024 16:24:52 +0100 Subject: [PATCH 05/15] chore: update bpapi versions dump --- apps/emqx/priv/bpapi.versions | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 11ac8f582..fa3a1ef85 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -66,6 +66,7 @@ {emqx_resource,2}. {emqx_retainer,1}. {emqx_retainer,2}. +{emqx_router,1}. {emqx_rule_engine,1}. {emqx_shared_sub,1}. {emqx_slow_subs,1}. From 0210f83f06bfec5cbeb2531ad011b4dc4fa56cb6 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Fri, 22 Mar 2024 16:36:45 +0100 Subject: [PATCH 06/15] chore: update changelog --- changes/ce/fix-12768.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ce/fix-12768.en.md b/changes/ce/fix-12768.en.md index 7d101f1a7..c0445c3b8 100644 --- a/changes/ce/fix-12768.en.md +++ b/changes/ce/fix-12768.en.md @@ -1,3 +1,3 @@ -Fixed an issue which may occur when upgrading from versions prior to 5.4 to 5.4 or later. +Fixed an issue which may occur when performing rolling upgrade, especially when upgrading from a version earlier than 5.4.0. When the cluster is empty (more precisely, routing tables are empty), try to additionally ask the cluster nodes for the routing schema in use, to make more informed decision about routing storage schema upon startup. This should make routing storage schema less likely to diverge across cluster nodes, especially when the cluster is composed of different versions of EMQX. From eb9e3aa9e68e03472775ecc7dd307d092540f4be Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Mon, 25 Mar 2024 12:00:06 +0100 Subject: [PATCH 07/15] chore: update changelog entry to include manual resolution steps --- changes/ce/fix-12768.en.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/changes/ce/fix-12768.en.md b/changes/ce/fix-12768.en.md index c0445c3b8..38b5011b3 100644 --- a/changes/ce/fix-12768.en.md +++ b/changes/ce/fix-12768.en.md @@ -1,3 +1,13 @@ Fixed an issue which may occur when performing rolling upgrade, especially when upgrading from a version earlier than 5.4.0. When the cluster is empty (more precisely, routing tables are empty), try to additionally ask the cluster nodes for the routing schema in use, to make more informed decision about routing storage schema upon startup. This should make routing storage schema less likely to diverge across cluster nodes, especially when the cluster is composed of different versions of EMQX. + +In case you get the following message about broken routing during rolling upgrade: "There are records in the routing tables either related to both v1 and v2 storage schemas, or conflicting with storage schema assumed by the cluster. This probably means that some nodes in the cluster use v1 schema and some use v2, independently of each other. The routing is likely broken. Manual intervention and full cluster restart is required. This node will shut down.", please follow the steps below to resolve the issue. +1. Stop listeners on legacy nodes: `$ emqx eval 'emqx_listener:stop()'` +2. Wait until they are safe to restart. + This could take some time, depending on the number of clients and their subscriptions. + Those conditions should be true for both nodes in order to proceed: + * `$ emqx eval 'ets:info(emqx_subscriber, size)'` prints `0`. + * `$ emqx ctl topics list` prints `No topics.` +3. Upgrade the nodes to the latest version. +4. Restart the nodes. From 849fe0c2c8f05ee60c1243bb69c08ef96ec8a04d Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 25 Mar 2024 13:43:46 +0100 Subject: [PATCH 08/15] feat(routing): add schema conflict resolution procedure In the log message printed when a schema conflict in cluster routing is detected. --- apps/emqx/src/emqx_router.erl | 43 +++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 380dddc55..c8d82dda3 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -656,8 +656,8 @@ init_schema() -> ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]), ok = emqx_trie:wait_for_tables(), ConfSchema = emqx_config:get([broker, routing, storage_schema]), - ClusterSchema = discover_cluster_schema_vsn(), - Schema = choose_schema_vsn(ConfSchema, ClusterSchema), + ClusterState = discover_cluster_schema_vsn(), + Schema = choose_schema_vsn(ConfSchema, ClusterState), ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), case Schema of ConfSchema -> @@ -681,7 +681,8 @@ deinit_schema() -> _ = persistent_term:erase(?PT_SCHEMA_VSN), ok. --spec discover_cluster_schema_vsn() -> schemavsn() | undefined. +-spec discover_cluster_schema_vsn() -> + {schemavsn() | undefined, _State :: [{node(), schemavsn() | undefined, _Details}]}. discover_cluster_schema_vsn() -> discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]). @@ -723,13 +724,17 @@ discover_cluster_schema_vsn(Nodes) -> "There are nodes in the cluster with different configured routing " "storage schemas. This probably means that some nodes use v1 schema " "and some use v2, independently of each other. The routing is likely " - "broken. Manual intervention required." + "broken. Manual intervention required.", + action => mk_conflict_resolution_action(Responses) }), error(conflicting_routing_schemas_configured_in_cluster) end. --spec choose_schema_vsn(schemavsn(), schemavsn() | undefined) -> schemavsn(). -choose_schema_vsn(ConfSchema, ClusterSchema) -> +-spec choose_schema_vsn( + schemavsn(), + _ClusterState :: {schemavsn() | undefined, [{node(), schemavsn() | undefined, _Details}]} +) -> schemavsn(). +choose_schema_vsn(ConfSchema, {ClusterSchema, State}) -> case detect_table_schema_vsn() of [ClusterSchema] -> %% Table contents match configured schema in the cluster. @@ -753,7 +758,8 @@ choose_schema_vsn(ConfSchema, ClusterSchema) -> "by the cluster. This probably means that some nodes in the cluster " "use v1 schema and some use v2, independently of each other. The " "routing is likely broken. Manual intervention and full cluster " - "restart is required. This node will shut down." + "restart is required. This node will shut down.", + action => mk_conflict_resolution_action(State) }), error(conflicting_routing_schemas_detected_in_cluster) end. @@ -767,6 +773,29 @@ detect_table_schema_vsn() -> is_empty(Tab) -> ets:first(Tab) =:= '$end_of_table'. +mk_conflict_resolution_action(State) -> + NodesV1 = [Node || {Node, v1, _} <- State], + NodesUnknown = [Node || {Node, unknown, _} <- State], + Format = + "Following EMQX nodes are running with conflicting schema:" + "\n ~p" + "Please take the following steps to resolve the conflict:" + "\n 1. Stop listeners on those nodes: `$ emqx eval 'emqx_listener:stop()'`" + "\n 2. Wait until they are safe to restart." + "\n This could take some time, depending on the number of clients and their subscriptions." + "\n Those conditions should be true for each of the nodes in order to proceed:" + "\n * `$ emqx eval 'ets:info(emqx_subscriber, size)'` prints `0`." + "\n * `$ emqx ctl topics list` prints `No topics.`" + "\n 3. Upgrade the nodes to the latest version." + "\n 4. Restart the nodes.", + FormatUnkown = + "Additionally, following nodes were unreachable during startup:" + "\n ~p" + "It's strongly advised to include them in the manual resolution procedure as well.", + Message = io_lib:format(Format, [NodesV1]), + MessageUnknown = [io_lib:format(FormatUnkown, [NodesUnknown]) || NodesUnknown =/= []], + unicode:characters_to_list(Message ++ "\n" ++ MessageUnknown). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- From fe3cc2585521025671f635fc84d518180ec04cf2 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Mon, 25 Mar 2024 15:09:01 +0100 Subject: [PATCH 09/15] chore: update instructions for routing schema conflict --- apps/emqx/src/emqx_router.erl | 38 ++++++++++++++++++++--------------- changes/ce/fix-12768.en.md | 10 --------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index c8d82dda3..88999823b 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -707,7 +707,7 @@ discover_cluster_schema_vsn(Nodes) -> ), case lists:usort([Vsn || {_Node, Vsn, _} <- Responses, Vsn /= unknown]) of [Vsn] when Vsn =:= v1; Vsn =:= v2 -> - Vsn; + {Vsn, Responses}; [] -> ?SLOG(notice, #{ msg => "cluster_routing_schema_discovery_failed", @@ -715,7 +715,7 @@ discover_cluster_schema_vsn(Nodes) -> reason => "Could not determine configured routing storage schema in the cluster." }), - undefined; + {undefined, Responses}; [_ | _] -> ?SLOG(critical, #{ msg => "conflicting_routing_schemas_configured_in_cluster", @@ -747,20 +747,27 @@ choose_schema_vsn(ConfSchema, {ClusterSchema, State}) -> %% otherwise use configured. emqx_maybe:define(ClusterSchema, ConfSchema); ConlictingSchemas -> + Reason = + "There are records in the routing tables either related to both v1 " + "and v2 storage schemas, or conflicting with storage schema assumed " + "by the cluster. This probably means that some nodes in the cluster " + "use v1 schema and some use v2, independently of each other. The " + "routing is likely broken. Manual intervention and full cluster " + "restart is required. This node will shut down.", + Action = mk_conflict_resolution_action(State), ?SLOG(critical, #{ msg => "conflicting_routing_schemas_detected_in_cluster", detected => ConlictingSchemas, configured => ConfSchema, configured_in_cluster => ClusterSchema, - reason => - "There are records in the routing tables either related to both v1 " - "and v2 storage schemas, or conflicting with storage schema assumed " - "by the cluster. This probably means that some nodes in the cluster " - "use v1 schema and some use v2, independently of each other. The " - "routing is likely broken. Manual intervention and full cluster " - "restart is required. This node will shut down.", - action => mk_conflict_resolution_action(State) + reason => Reason, + action => Action }), + io:format( + standard_error, + "Error: conflicting routing schemas detected in the cluster.\n~s\n~s\n", + [Reason, Action] + ), error(conflicting_routing_schemas_detected_in_cluster) end. @@ -783,16 +790,15 @@ mk_conflict_resolution_action(State) -> "\n 1. Stop listeners on those nodes: `$ emqx eval 'emqx_listener:stop()'`" "\n 2. Wait until they are safe to restart." "\n This could take some time, depending on the number of clients and their subscriptions." - "\n Those conditions should be true for each of the nodes in order to proceed:" + "\n The following conditions should be both true for each of the nodes in order to proceed:" "\n * `$ emqx eval 'ets:info(emqx_subscriber, size)'` prints `0`." "\n * `$ emqx ctl topics list` prints `No topics.`" - "\n 3. Upgrade the nodes to the latest version." + "\n 3. Upgrade the nodes to version ~s." "\n 4. Restart the nodes.", FormatUnkown = - "Additionally, following nodes were unreachable during startup:" - "\n ~p" - "It's strongly advised to include them in the manual resolution procedure as well.", - Message = io_lib:format(Format, [NodesV1]), + "Additionally, the following nodes were unreachable during startup: ~p." + "It is strongly advised to include them in the manual resolution procedure as well.", + Message = io_lib:format(Format, [NodesV1, emqx_release:version_with_prefix()]), MessageUnknown = [io_lib:format(FormatUnkown, [NodesUnknown]) || NodesUnknown =/= []], unicode:characters_to_list(Message ++ "\n" ++ MessageUnknown). diff --git a/changes/ce/fix-12768.en.md b/changes/ce/fix-12768.en.md index 38b5011b3..c0445c3b8 100644 --- a/changes/ce/fix-12768.en.md +++ b/changes/ce/fix-12768.en.md @@ -1,13 +1,3 @@ Fixed an issue which may occur when performing rolling upgrade, especially when upgrading from a version earlier than 5.4.0. When the cluster is empty (more precisely, routing tables are empty), try to additionally ask the cluster nodes for the routing schema in use, to make more informed decision about routing storage schema upon startup. This should make routing storage schema less likely to diverge across cluster nodes, especially when the cluster is composed of different versions of EMQX. - -In case you get the following message about broken routing during rolling upgrade: "There are records in the routing tables either related to both v1 and v2 storage schemas, or conflicting with storage schema assumed by the cluster. This probably means that some nodes in the cluster use v1 schema and some use v2, independently of each other. The routing is likely broken. Manual intervention and full cluster restart is required. This node will shut down.", please follow the steps below to resolve the issue. -1. Stop listeners on legacy nodes: `$ emqx eval 'emqx_listener:stop()'` -2. Wait until they are safe to restart. - This could take some time, depending on the number of clients and their subscriptions. - Those conditions should be true for both nodes in order to proceed: - * `$ emqx eval 'ets:info(emqx_subscriber, size)'` prints `0`. - * `$ emqx ctl topics list` prints `No topics.` -3. Upgrade the nodes to the latest version. -4. Restart the nodes. From 3aff9eb2a4c22be7c56a8c10f15ec0a6227d20dc Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 25 Mar 2024 16:15:07 +0100 Subject: [PATCH 10/15] fix(route schema): allow boot if all peer nodes agree on one version --- apps/emqx/src/emqx_router.erl | 121 ++++++++++++++++++---------------- 1 file changed, 65 insertions(+), 56 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 88999823b..90ba7e605 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -656,16 +656,16 @@ init_schema() -> ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]), ok = emqx_trie:wait_for_tables(), ConfSchema = emqx_config:get([broker, routing, storage_schema]), - ClusterState = discover_cluster_schema_vsn(), - Schema = choose_schema_vsn(ConfSchema, ClusterState), + {ClusterSchema, ClusterState} = discover_cluster_schema_vsn(), + Schema = choose_schema_vsn(ConfSchema, ClusterSchema, ClusterState), ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), - case Schema of - ConfSchema -> + case Schema =:= ConfSchema of + true -> ?SLOG(info, #{ msg => "routing_schema_used", schema => Schema }); - _ -> + false -> ?SLOG(notice, #{ msg => "configured_routing_schema_ignored", schema_in_use => Schema, @@ -687,6 +687,7 @@ discover_cluster_schema_vsn() -> discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]). discover_cluster_schema_vsn([]) -> + %% single node undefined; discover_cluster_schema_vsn(Nodes) -> Responses = lists:zipwith( @@ -709,68 +710,72 @@ discover_cluster_schema_vsn(Nodes) -> [Vsn] when Vsn =:= v1; Vsn =:= v2 -> {Vsn, Responses}; [] -> - ?SLOG(notice, #{ + ?SLOG(warning, #{ msg => "cluster_routing_schema_discovery_failed", responses => Responses, reason => - "Could not determine configured routing storage schema in the cluster." + "Could not determine configured routing storage schema in peer nodes." }), {undefined, Responses}; [_ | _] -> + Desc = schema_conflict_reason(config, Responses), + io:format(standard_error, "Error: ~ts~n", [Desc]), ?SLOG(critical, #{ - msg => "conflicting_routing_schemas_configured_in_cluster", + msg => "conflicting_routing_schemas_in_cluster", responses => Responses, - reason => - "There are nodes in the cluster with different configured routing " - "storage schemas. This probably means that some nodes use v1 schema " - "and some use v2, independently of each other. The routing is likely " - "broken. Manual intervention required.", - action => mk_conflict_resolution_action(Responses) + description => Desc }), error(conflicting_routing_schemas_configured_in_cluster) end. -spec choose_schema_vsn( schemavsn(), - _ClusterState :: {schemavsn() | undefined, [{node(), schemavsn() | undefined, _Details}]} + _ClusterSchema :: schemavsn() | undefined, + _ClusterState :: [{node(), schemavsn() | undefined, _Details}] ) -> schemavsn(). -choose_schema_vsn(ConfSchema, {ClusterSchema, State}) -> +choose_schema_vsn(ConfSchema, ClusterSchema, State) -> case detect_table_schema_vsn() of - [ClusterSchema] -> - %% Table contents match configured schema in the cluster. - ClusterSchema; - [Schema] when ClusterSchema =:= undefined -> - %% There are existing records following some schema, we have to use it. - Schema; [] -> %% No records in the tables, use schema configured in the cluster if any, %% otherwise use configured. emqx_maybe:define(ClusterSchema, ConfSchema); - ConlictingSchemas -> - Reason = - "There are records in the routing tables either related to both v1 " - "and v2 storage schemas, or conflicting with storage schema assumed " - "by the cluster. This probably means that some nodes in the cluster " - "use v1 schema and some use v2, independently of each other. The " - "routing is likely broken. Manual intervention and full cluster " - "restart is required. This node will shut down.", - Action = mk_conflict_resolution_action(State), + [Schema] when Schema =:= ClusterSchema -> + %% Table contents match configured schema in the cluster. + Schema; + [Schema] when ClusterSchema =:= undefined -> + %% There are existing records following some schema, we have to use it. + Schema; + [v1, v2] when ClusterSchema =/= undefined -> + %% There are existing records in both v1 and v2 schema, + %% we have to use what the peer nodes agreed on. + %% because it could be HTIS node which caused cnoflict. + %% + %% The stale records will be left-over, but harmless + ClusterSchema; + [v1, v2] -> + Desc = schema_conflict_reason(records, State), + io:format(standard_error, "Error: ~ts~n", [Desc]), ?SLOG(critical, #{ - msg => "conflicting_routing_schemas_detected_in_cluster", - detected => ConlictingSchemas, - configured => ConfSchema, - configured_in_cluster => ClusterSchema, - reason => Reason, - action => Action + msg => "conflicting_routing_storage_in_cluster", + description => Desc }), - io:format( - standard_error, - "Error: conflicting routing schemas detected in the cluster.\n~s\n~s\n", - [Reason, Action] - ), error(conflicting_routing_schemas_detected_in_cluster) end. +schema_conflict_reason(Type, State) -> + Observe = + case Type of + config -> + "Peer nodes have route storage schema resolved into conflicting versions.\n"; + records -> + "There are conflicting routing records found.\n" + end, + Cause = + "\nThis was caused by a race-condition when the cluster was rolling upgraded " + "from an older version to 5.4.0, 5.4.1, 5.5.0 or 5.5.1." + "\nThis node cannot boot before the conflicts are resolved.\n", + Observe ++ Cause ++ mk_conflict_resolution_action(State). + detect_table_schema_vsn() -> lists:flatten([ [v1 || _NonEmptyTrieIndex = not emqx_trie:empty()], @@ -784,23 +789,27 @@ mk_conflict_resolution_action(State) -> NodesV1 = [Node || {Node, v1, _} <- State], NodesUnknown = [Node || {Node, unknown, _} <- State], Format = - "Following EMQX nodes are running with conflicting schema:" - "\n ~p" - "Please take the following steps to resolve the conflict:" - "\n 1. Stop listeners on those nodes: `$ emqx eval 'emqx_listener:stop()'`" - "\n 2. Wait until they are safe to restart." - "\n This could take some time, depending on the number of clients and their subscriptions." - "\n The following conditions should be both true for each of the nodes in order to proceed:" - "\n * `$ emqx eval 'ets:info(emqx_subscriber, size)'` prints `0`." - "\n * `$ emqx ctl topics list` prints `No topics.`" - "\n 3. Upgrade the nodes to version ~s." - "\n 4. Restart the nodes.", + "There are two ways to resolve the conflict:" + "\n" + "\nA: Full cluster restart: stop ALL running nodes one by one " + "and restart them in the reversed order." + "\n" + "\nB: Force v1 nodes to clean up their routes." + "\n Following EMQX nodes are running with v1 schema: ~0p." + "\n 1. Stop listeners with command \"emqx eval 'emqx_listener:stop()'\" in all v1 nodes" + "\n 2. Wait until they are safe to restart." + "\n This could take some time, depending on the number of clients and their subscriptions." + "\n Below conditions should be true for each of the nodes in order to proceed:" + "\n a) Command 'ets:info(emqx_subscriber, size)' prints `0`." + "\n b) Command 'emqx ctl topics list' prints No topics.`" + "\n 3. Upgrade the nodes to 5.6.0 or newer" + "\n 4. Restart the node", FormatUnkown = - "Additionally, the following nodes were unreachable during startup: ~p." + "Additionally, the following nodes were unreachable during startup: ~0p." "It is strongly advised to include them in the manual resolution procedure as well.", - Message = io_lib:format(Format, [NodesV1, emqx_release:version_with_prefix()]), + Message = io_lib:format(Format, [NodesV1]), MessageUnknown = [io_lib:format(FormatUnkown, [NodesUnknown]) || NodesUnknown =/= []], - unicode:characters_to_list(Message ++ "\n" ++ MessageUnknown). + unicode:characters_to_list([Message, "\n", MessageUnknown]). %%-------------------------------------------------------------------- %% gen_server callbacks From c93145cb978efa35420c276c080dfff6ec55bbf2 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Mon, 25 Mar 2024 16:40:53 +0100 Subject: [PATCH 11/15] fix: function clause --- apps/emqx/src/emqx_router.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 90ba7e605..f7468fbfa 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -686,9 +686,11 @@ deinit_schema() -> discover_cluster_schema_vsn() -> discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]). +-spec discover_cluster_schema_vsn([node()]) -> + {schemavsn() | undefined, _State :: [{node(), schemavsn() | undefined, _Details}]}. discover_cluster_schema_vsn([]) -> %% single node - undefined; + {undefined, []}; discover_cluster_schema_vsn(Nodes) -> Responses = lists:zipwith( fun From d38545010a095400563261e39a554286c881272e Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 25 Mar 2024 16:42:46 +0100 Subject: [PATCH 12/15] fix: case_clause --- apps/emqx/src/emqx_router.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index f7468fbfa..89bd06352 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -747,14 +747,14 @@ choose_schema_vsn(ConfSchema, ClusterSchema, State) -> [Schema] when ClusterSchema =:= undefined -> %% There are existing records following some schema, we have to use it. Schema; - [v1, v2] when ClusterSchema =/= undefined -> + _Conflicting when ClusterSchema =/= undefined -> %% There are existing records in both v1 and v2 schema, %% we have to use what the peer nodes agreed on. %% because it could be HTIS node which caused cnoflict. %% %% The stale records will be left-over, but harmless ClusterSchema; - [v1, v2] -> + _Conflicting -> Desc = schema_conflict_reason(records, State), io:format(standard_error, "Error: ~ts~n", [Desc]), ?SLOG(critical, #{ From 9411d6078e248c05f50b7a79679d18af86b74a84 Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 25 Mar 2024 16:50:51 +0100 Subject: [PATCH 13/15] docs: remove unnecessary restart instruction --- apps/emqx/src/emqx_router.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 89bd06352..a417abb26 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -804,8 +804,7 @@ mk_conflict_resolution_action(State) -> "\n Below conditions should be true for each of the nodes in order to proceed:" "\n a) Command 'ets:info(emqx_subscriber, size)' prints `0`." "\n b) Command 'emqx ctl topics list' prints No topics.`" - "\n 3. Upgrade the nodes to 5.6.0 or newer" - "\n 4. Restart the node", + "\n 3. Upgrade the nodes to 5.6.0 or newer.", FormatUnkown = "Additionally, the following nodes were unreachable during startup: ~0p." "It is strongly advised to include them in the manual resolution procedure as well.", From d32a1a892a7737195ad6d55ff72468063413303d Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 25 Mar 2024 16:56:48 +0100 Subject: [PATCH 14/15] docs: add change log --- changes/ce/fix-12768.en.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/changes/ce/fix-12768.en.md b/changes/ce/fix-12768.en.md index c0445c3b8..2dbb249ba 100644 --- a/changes/ce/fix-12768.en.md +++ b/changes/ce/fix-12768.en.md @@ -1,3 +1,6 @@ Fixed an issue which may occur when performing rolling upgrade, especially when upgrading from a version earlier than 5.4.0. When the cluster is empty (more precisely, routing tables are empty), try to additionally ask the cluster nodes for the routing schema in use, to make more informed decision about routing storage schema upon startup. This should make routing storage schema less likely to diverge across cluster nodes, especially when the cluster is composed of different versions of EMQX. + +The version also logs instructions for how to manually resolve if conflict is detected in a running cluster. + From b8b0b809b4e2c9b3d342f656a638a130e977259a Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 25 Mar 2024 17:07:56 +0100 Subject: [PATCH 15/15] chore: add a warning log if conflict is detected --- apps/emqx/src/emqx_router.erl | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index a417abb26..d2014f3a3 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -750,9 +750,19 @@ choose_schema_vsn(ConfSchema, ClusterSchema, State) -> _Conflicting when ClusterSchema =/= undefined -> %% There are existing records in both v1 and v2 schema, %% we have to use what the peer nodes agreed on. - %% because it could be HTIS node which caused cnoflict. + %% because it could be THIS node which caused the cnoflict. %% %% The stale records will be left-over, but harmless + Desc = + "Conflicting schema version detected for routing records, but " + "all the peer nodes are running the same version, so this node " + "will use the same schema but discard the harmless stale records. " + "This warning will go away after the next full cluster (non-rolling) restart.", + ?SLOG(warning, #{ + msg => "conflicting_routing_storage_detected", + resolved => ClusterSchema, + description => Desc + }), ClusterSchema; _Conflicting -> Desc = schema_conflict_reason(records, State),