diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index ef9aac628..9cad50714 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -654,7 +654,8 @@ init_schema() -> ok = mria:wait_for_tables([?ROUTE_TAB, ?ROUTE_TAB_FILTERS]), ok = emqx_trie:wait_for_tables(), ConfSchema = emqx_config:get([broker, routing, storage_schema]), - Schema = choose_schema_vsn(ConfSchema), + ClusterSchema = discover_cluster_schema_vsn(), + Schema = choose_schema_vsn(ConfSchema, ClusterSchema), ok = persistent_term:put(?PT_SCHEMA_VSN, Schema), case Schema of ConfSchema -> @@ -669,8 +670,7 @@ init_schema() -> configured => ConfSchema, reason => "Could not use configured routing storage schema because " - "there are already non-empty routing tables pertaining to " - "another schema." + "cluster is already running with a different schema." }) end. @@ -679,31 +679,89 @@ deinit_schema() -> _ = persistent_term:erase(?PT_SCHEMA_VSN), ok. --spec choose_schema_vsn(schemavsn()) -> schemavsn(). -choose_schema_vsn(ConfType) -> - IsEmptyIndex = emqx_trie:empty(), - IsEmptyFilters = is_empty(?ROUTE_TAB_FILTERS), - case {IsEmptyIndex, IsEmptyFilters} of - {true, true} -> - ConfType; - {false, true} -> - v1; - {true, false} -> - v2; - {false, false} -> +-spec discover_cluster_schema_vsn() -> schemavsn() | undefined. +discover_cluster_schema_vsn() -> + discover_cluster_schema_vsn(emqx:running_nodes() -- [node()]). + +discover_cluster_schema_vsn([]) -> + undefined; +discover_cluster_schema_vsn(Nodes) -> + Responses = lists:zipwith( + fun + (Node, {ok, Schema}) -> + {Node, Schema, configured}; + (Node, {error, {exception, undef, _Stacktrace}}) -> + %% No such function on the remote node, assuming it doesn't know about v2 routing. + {Node, v1, legacy}; + (Node, {error, {exception, badarg, _Stacktrace}}) -> + %% Likely, persistent term is not defined yet. + {Node, unknown, starting}; + (Node, Error) -> + {Node, unknown, Error} + end, + Nodes, + emqx_router_proto_v1:get_routing_schema_vsn(Nodes) + ), + case lists:usort([Vsn || {_Node, Vsn, _} <- Responses, Vsn /= unknown]) of + [Vsn] when Vsn =:= v1; Vsn =:= v2 -> + Vsn; + [] -> + ?SLOG(notice, #{ + msg => "cluster_routing_schema_discovery_failed", + responses => Responses, + reason => + "Could not determine configured routing storage schema in the cluster." + }), + undefined; + [_ | _] -> + ?SLOG(critical, #{ + msg => "conflicting_routing_schemas_configured_in_cluster", + responses => Responses, + reason => + "There are nodes in the cluster with different configured routing " + "storage schemas. This probably means that some nodes use v1 schema " + "and some use v2, independently of each other. The routing is likely " + "broken. Manual intervention required." + }), + error(conflicting_routing_schemas_configured_in_cluster) + end. + +-spec choose_schema_vsn(schemavsn(), schemavsn() | undefined) -> schemavsn(). +choose_schema_vsn(ConfSchema, ClusterSchema) -> + case detect_table_schema_vsn() of + [ClusterSchema] -> + %% Table contents match configured schema in the cluster. + ClusterSchema; + [Schema] when ClusterSchema =:= undefined -> + %% There are existing records following some schema, we have to use it. + Schema; + [] -> + %% No records in the tables, use schema configured in the cluster if any, + %% otherwise use configured. + emqx_maybe:define(ClusterSchema, ConfSchema); + ConlictingSchemas -> ?SLOG(critical, #{ msg => "conflicting_routing_schemas_detected_in_cluster", - configured => ConfType, + detected => ConlictingSchemas, + configured => ConfSchema, + configured_in_cluster => ClusterSchema, reason => - "There are records in the routing tables related to both v1 " - "and v2 storage schemas. This probably means that some nodes " - "in the cluster use v1 schema and some use v2, independently " - "of each other. The routing is likely broken. Manual intervention " - "and full cluster restart is required. This node will shut down." + "There are records in the routing tables either related to both v1 " + "and v2 storage schemas, or conflicting with storage schema assumed " + "by the cluster. This probably means that some nodes in the cluster " + "use v1 schema and some use v2, independently of each other. The " + "routing is likely broken. Manual intervention and full cluster " + "restart is required. This node will shut down." }), error(conflicting_routing_schemas_detected_in_cluster) end. +detect_table_schema_vsn() -> + lists:flatten([ + [v1 || _NonEmptyTrieIndex = not emqx_trie:empty()], + [v2 || _NonEmptyFilterTab = not is_empty(?ROUTE_TAB_FILTERS)] + ]). + is_empty(Tab) -> ets:first(Tab) =:= '$end_of_table'. diff --git a/apps/emqx/src/proto/emqx_router_proto_v1.erl b/apps/emqx/src/proto/emqx_router_proto_v1.erl new file mode 100644 index 000000000..f1c08f893 --- /dev/null +++ b/apps/emqx/src/proto/emqx_router_proto_v1.erl @@ -0,0 +1,37 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_router_proto_v1). + +-behaviour(emqx_bpapi). + +-export([introduced_in/0]). + +-export([ + get_routing_schema_vsn/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 3_000). + +introduced_in() -> + "5.6.0". + +-spec get_routing_schema_vsn([node()]) -> + [emqx_rpc:erpc(emqx_router:schema_vsn())]. +get_routing_schema_vsn(Nodes) -> + erpc:multicall(Nodes, emqx_router, get_schema_vsn, [], ?TIMEOUT). diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 86c7dd1e2..5112059ca 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -30,7 +30,8 @@ all() -> {group, routing_schema_v1}, {group, routing_schema_v2}, t_routing_schema_switch_v1, - t_routing_schema_switch_v2 + t_routing_schema_switch_v2, + t_routing_schema_consistent_clean_cluster ]. groups() -> @@ -477,6 +478,60 @@ t_routing_schema_switch(VFrom, VTo, WorkDir) -> ok = emqx_cth_cluster:stop(Nodes) end. +t_routing_schema_consistent_clean_cluster(Config) -> + WorkDir = emqx_cth_suite:work_dir(?FUNCTION_NAME, Config), + % Start first node with routing schema v1 + [Node1] = emqx_cth_cluster:start( + [ + {routing_schema_consistent1, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(1, v1)] + }} + ], + #{work_dir => WorkDir} + ), + % Start rest of nodes with routing schema v2 + NodesRest = emqx_cth_cluster:start( + [ + {routing_schema_consistent2, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(2, v2)], + base_port => 20000, + join_to => Node1 + }}, + {routing_schema_consistent3, #{ + apps => [mk_genrpc_appspec(), mk_emqx_appspec(3, v2)], + base_port => 20100, + join_to => Node1 + }} + ], + #{work_dir => WorkDir} + ), + Nodes = [Node1 | NodesRest], + try + % Verify that cluser is still on v1 + ?assertEqual( + [{ok, v1} || _ <- Nodes], + erpc:multicall(Nodes, emqx_router, get_schema_vsn, []) + ), + % Wait for all nodes to agree on cluster state + ?retry( + 500, + 10, + ?assertEqual( + [{ok, Nodes} || _ <- Nodes], + erpc:multicall(Nodes, emqx, running_nodes, []) + ) + ), + C1 = start_client(Node1), + C2 = start_client(hd(NodesRest)), + ok = subscribe(C2, <<"t/#">>), + {ok, _} = publish(C1, <<"t/a/b/c">>, <<"yayconsistency">>), + ?assertReceive({pub, C2, #{topic := <<"t/a/b/c">>, payload := <<"yayconsistency">>}}), + ok = emqtt:stop(C1), + ok = emqtt:stop(C2) + after + ok = emqx_cth_cluster:stop(Nodes) + end. + t_slow_rlog_routing_consistency(init, Config) -> [Core1, _Core2, _Replicant] = ?config(cluster, Config), MnesiaHook = rpc:call(Core1, persistent_term, get, [{mnesia_hook, post_commit}]),