From ba276d807f819afe52e2115feb5678987d376416 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Fri, 22 Mar 2024 14:54:41 +0100 Subject: [PATCH] fix(routing): add heuristic for routing schema in empty cluster When the cluster is empty but comprised of different versions of EMQX, try to additionally ask the cluster nodes for the routing schema in use, to be able to make more informed decisions. --- apps/emqx/src/emqx_router.erl | 100 +++++++++++++++---- apps/emqx/src/proto/emqx_router_proto_v1.erl | 37 +++++++ apps/emqx/test/emqx_routing_SUITE.erl | 57 ++++++++++- 3 files changed, 172 insertions(+), 22 deletions(-) create mode 100644 apps/emqx/src/proto/emqx_router_proto_v1.erl diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index ef9aac628..9cad50714 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -654,7 +654,8 @@ init_schema() -> ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]), ok = emqx_trie:wait_for_tables(), ConfSchema = emqx_config:get([broker, routing, storage_schema]), - Schema = choose_schema_vsn(ConfSchema), + ClusterSchema = discover_cluster_schema_vsn(), + Schema = choose_schema_vsn(ConfSchema, ClusterSchema), ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), case Schema of ConfSchema -> @@ -669,8 +670,7 @@ init_schema() -> configured => ConfSchema, reason => "Could not use configured routing storage schema because " - "there are already non-empty routing tables pertaining to " - "another schema." + "cluster is already running with a different schema." }) end. @@ -679,31 +679,89 @@ deinit_schema() -> _ = persistent_term:erase(?PT_SCHEMA_VSN), ok. --spec choose_schema_vsn(schemavsn()) -> schemavsn(). -choose_schema_vsn(ConfType) -> - IsEmptyIndex = emqx_trie:empty(), - IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS), - case {IsEmptyIndex, IsEmptyFilters} of - {true, true} -> - ConfType; - {false, true} -> - v1; - {true, false} -> - v2; - {false, false} -> +-spec discover_cluster_schema_vsn() -> schemavsn() | undefined. +discover_cluster_schema_vsn() -> + discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]). + +discover_cluster_schema_vsn([]) -> + undefined; +discover_cluster_schema_vsn(Nodes) -> + Responses = lists:zipwith( + fun + (Node, {ok, Schema}) -> + {Node, Schema, configured}; + (Node, {error, {exception, undef, _Stacktrace}}) -> + %% No such function on the remote node, assuming it doesn't know about v2 routing. + {Node, v1, legacy}; + (Node, {error, {exception, badarg, _Stacktrace}}) -> + %% Likely, persistent term is not defined yet. + {Node, unknown, starting}; + (Node, Error) -> + {Node, unknown, Error} + end, + Nodes, + emqx_router_proto_v1:get_routing_schema_vsn(Nodes) + ), + case lists:usort([Vsn || {_Node, Vsn, _} <- Responses, Vsn /= unknown]) of + [Vsn] when Vsn =:= v1; Vsn =:= v2 -> + Vsn; + [] -> + ?SLOG(notice, #{ + msg => "cluster_routing_schema_discovery_failed", + responses => Responses, + reason => + "Could not determine configured routing storage schema in the cluster." + }), + undefined; + [_ | _] -> + ?SLOG(critical, #{ + msg => "conflicting_routing_schemas_configured_in_cluster", + responses => Responses, + reason => + "There are nodes in the cluster with different configured routing " + "storage schemas. This probably means that some nodes use v1 schema " + "and some use v2, independently of each other. The routing is likely " + "broken. Manual intervention required." + }), + error(conflicting_routing_schemas_configured_in_cluster) + end. + +-spec choose_schema_vsn(schemavsn(), schemavsn() | undefined) -> schemavsn(). +choose_schema_vsn(ConfSchema, ClusterSchema) -> + case detect_table_schema_vsn() of + [ClusterSchema] -> + %% Table contents match configured schema in the cluster. + ClusterSchema; + [Schema] when ClusterSchema =:= undefined -> + %% There are existing records following some schema, we have to use it. + Schema; + [] -> + %% No records in the tables, use schema configured in the cluster if any, + %% otherwise use configured. + emqx_maybe:define(ClusterSchema, ConfSchema); + ConlictingSchemas -> ?SLOG(critical, #{ msg => "conflicting_routing_schemas_detected_in_cluster", - configured => ConfType, + detected => ConlictingSchemas, + configured => ConfSchema, + configured_in_cluster => ClusterSchema, reason => - "There are records in the routing tables related to both v1 " - "and v2 storage schemas. This probably means that some nodes " - "in the cluster use v1 schema and some use v2, independently " - "of each other. The routing is likely broken. Manual intervention " - "and full cluster restart is required. This node will shut down." + "There are records in the routing tables either related to both v1 " + "and v2 storage schemas, or conflicting with storage schema assumed " + "by the cluster. This probably means that some nodes in the cluster " + "use v1 schema and some use v2, independently of each other. The " + "routing is likely broken. Manual intervention and full cluster " + "restart is required. This node will shut down." }), error(conflicting_routing_schemas_detected_in_cluster) end. +detect_table_schema_vsn() -> + lists:flatten([ + [v1 || _NonEmptyTrieIndex = not emqx_trie:empty()], + [v2 || _NonEmptyFilterTab = not is_empty(?ROUTE_TAB_FILTERS)] + ]). + is_empty(Tab) -> ets:first(Tab) =:= '$end_of_table'. diff --git a/apps/emqx/src/proto/emqx_router_proto_v1.erl b/apps/emqx/src/proto/emqx_router_proto_v1.erl new file mode 100644 index 000000000..f1c08f893 --- /dev/null +++ b/apps/emqx/src/proto/emqx_router_proto_v1.erl @@ -0,0 +1,37 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_router_proto_v1). + +-behaviour(emqx_bpapi). + +-export([introduced_in/0]). + +-export([ + get_routing_schema_vsn/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 3_000). + +introduced_in() -> + "5.6.0". + +-spec get_routing_schema_vsn([node()]) -> + [emqx_rpc:erpc(emqx_router:schema_vsn())]. +get_routing_schema_vsn(Nodes) -> + erpc:multicall(Nodes, emqx_router, get_schema_vsn, [], ?TIMEOUT). diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 86c7dd1e2..5112059ca 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -30,7 +30,8 @@ all() -> {group, routing_schema_v1}, {group, routing_schema_v2}, t_routing_schema_switch_v1, - t_routing_schema_switch_v2 + t_routing_schema_switch_v2, + t_routing_schema_consistent_clean_cluster ]. groups() -> @@ -477,6 +478,60 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) -> ok = emqx_cth_cluster:stop(Nodes) end. +t_routing_schema_consistent_clean_cluster(Config) -> + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + % Start first node with routing schema v1 + [Node1] = emqx_cth_cluster:start( + [ + {routing_schema_consistent1, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, v1)] + }} + ], + #{work_dir => WorkDir} + ), + % Start rest of nodes with routing schema v2 + NodesRest = emqx_cth_cluster:start( + [ + {routing_schema_consistent2, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, v2)], + base_port => 20000, + join_to => Node1 + }}, + {routing_schema_consistent3, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, v2)], + base_port => 20100, + join_to => Node1 + }} + ], + #{work_dir => WorkDir} + ), + Nodes = [Node1 | NodesRest], + try + % Verify that cluser is still on v1 + ?assertEqual( + [{ok, v1} || _ <- Nodes], + erpc:multicall(Nodes, emqx_router, get_schema_vsn, []) + ), + % Wait for all nodes to agree on cluster state + ?retry( + 500, + 10, + ?assertEqual( + [{ok, Nodes} || _ <- Nodes], + erpc:multicall(Nodes, emqx, running_nodes, []) + ) + ), + C1 = start_client(Node1), + C2 = start_client(hd(NodesRest)), + ok = subscribe(C2, <<"t/#">>), + {ok, _} = publish(C1, <<"t/a/b/c">>, <<"yayconsistency">>), + ?assertReceive({pub, C2, #{topic := <<"t/a/b/c">>, payload := <<"yayconsistency">>}}), + ok = emqtt:stop(C1), + ok = emqtt:stop(C2) + after + ok = emqx_cth_cluster:stop(Nodes) + end. + t_slow_rlog_routing_consistency(init, Config) -> [Core1, _Core2, _Replicant] = ?config(cluster, Config), MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]),