diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 7c78d43d9..b80468104 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -16,6 +16,7 @@ {emqx_conf,1}. {emqx_conf,2}. {emqx_conf,3}. +{emqx_conf,4}. {emqx_connector,1}. {emqx_dashboard,1}. {emqx_delayed,1}. diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 042bf8d3c..53f309856 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -35,6 +35,17 @@ tnx_id :: pos_integer() | '$1' }). +-define(SUGGESTION(Node), + lists:flatten( + io_lib:format( + "run `./bin/emqx_ctl conf cluster_sync fix`" + " on ~p(config leader) to force sync the configs, " + "if this node has been lagging for more than 3 minutes.", + [Node] + ) + ) +). + -define(READONLY_KEYS, [cluster, rpc, node]). -endif. diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index e42505b49..7f7a44faf 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -28,12 +28,14 @@ reset/0, status/0, is_initiator/1, + find_leader/0, skip_failed_commit/1, fast_forward_to_commit/2, on_mria_stop/1, force_leave_clean/1, wait_for_cluster_rpc/0, - maybe_init_tnx_id/2 + maybe_init_tnx_id/2, + update_mfa/3 ]). -export([ commit/2, @@ -41,6 +43,7 @@ get_cluster_tnx_id/0, get_node_tnx_id/1, init_mfa/2, + force_sync_tnx_id/3, latest_tnx_id/0, make_initiate_call_req/3, read_next_mfa/1, @@ -227,6 +230,17 @@ status() -> is_initiator(Opts) -> ?KIND_INITIATE =:= maps:get(kind, Opts, ?KIND_INITIATE). +find_leader() -> + {atomic, Status} = status(), + case Status of + [#{node := N} | _] -> + N; + [] -> + %% running nodes already sort. + [N | _] = emqx:running_nodes(), + N + end. + %% DO NOT delete this on_leave_clean/0, It's use when rpc before v560. on_leave_clean() -> on_leave_clean(node()). @@ -500,12 +514,14 @@ do_initiate(MFA, State = #{node := Node}, Count, Failure0) -> end. stale_view_of_cluster_msg(Meta, Count) -> + Node = find_leader(), Reason = Meta#{ - msg => stale_view_of_cluster_state, - retry_times => Count + msg => stale_view_of_cluster, + retry_times => Count, + suggestion => ?SUGGESTION(Node) }, ?SLOG(warning, Reason), - Reason. + {error, Reason}. %% The entry point of a config change transaction. init_mfa(Node, MFA) -> @@ -532,11 +548,41 @@ init_mfa(Node, MFA) -> {retry, Meta} end. +force_sync_tnx_id(Node, MFA, NodeTnxId) -> + mnesia:write_lock_table(?CLUSTER_MFA), + case get_node_tnx_id(Node) of + NodeTnxId -> + TnxId = NodeTnxId + 1, + MFARec = #cluster_rpc_mfa{ + tnx_id = TnxId, + mfa = MFA, + initiator = Node, + created_at = erlang:localtime() + }, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + lists:foreach( + fun(N) -> + ok = emqx_cluster_rpc:commit(N, NodeTnxId) + end, + mria:running_nodes() + ); + NewTnxId -> + Fmt = "aborted_force_sync, tnx_id(~w) is not the latest(~w)", + Reason = emqx_utils:format(Fmt, [NodeTnxId, NewTnxId]), + mnesia:abort({error, Reason}) + end. + +update_mfa(Node, MFA, LatestId) -> + case transaction(fun ?MODULE:force_sync_tnx_id/3, [Node, MFA, LatestId]) of + {atomic, ok} -> ok; + {aborted, Error} -> Error + end. + transaction(Func, Args) -> mria:transaction(?CLUSTER_RPC_SHARD, Func, Args). trans_status() -> - mnesia:foldl( + List = mnesia:foldl( fun(Rec, Acc) -> #cluster_rpc_commit{node = Node, tnx_id = TnxId} = Rec, case mnesia:read(?CLUSTER_MFA, TnxId) of @@ -559,8 +605,22 @@ trans_status() -> end, [], ?CLUSTER_COMMIT + ), + Nodes = mria:running_nodes(), + IndexNodes = lists:zip(Nodes, lists:seq(1, length(Nodes))), + lists:sort( + fun(#{node := NA, tnx_id := IdA}, #{node := NB, tnx_id := IdB}) -> + {IdA, index_nodes(NA, IndexNodes)} > {IdB, index_nodes(NB, IndexNodes)} + end, + List ). +index_nodes(Node, IndexNodes) -> + case lists:keyfind(Node, 1, IndexNodes) of + false -> 0; + {_, Index} -> Index + end. + trans_query(TnxId) -> case mnesia:read(?CLUSTER_MFA, TnxId) of [] -> diff --git a/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl index 5b2552ad9..c4ea64aa6 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl @@ -94,6 +94,9 @@ del_stale_mfa(MaxHistory) -> ), delete_stale_mfa(mnesia:last(?CLUSTER_MFA), DoneId, MaxHistory). +%% Do nothing when cluster_rpc_commit is empty. +delete_stale_mfa(_, infinity, _Count) -> + ok; delete_stale_mfa('$end_of_table', _DoneId, _Count) -> ok; delete_stale_mfa(CurrId, DoneId, Count) when CurrId > DoneId -> diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl index 9ac4298bb..0749559b3 100644 --- a/apps/emqx_conf/src/emqx_conf.erl +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -64,7 +64,7 @@ get_raw(KeyPath) -> %% @doc Returns all values in the cluster. -spec get_all(emqx_utils_maps:config_key_path()) -> #{node() => term()}. get_all(KeyPath) -> - {ResL, []} = emqx_conf_proto_v3:get_all(KeyPath), + {ResL, []} = emqx_conf_proto_v4:get_all(KeyPath), maps:from_list(ResL). %% @doc Returns the specified node's KeyPath, or exception if not found @@ -72,14 +72,14 @@ get_all(KeyPath) -> get_by_node(Node, KeyPath) when Node =:= node() -> emqx:get_config(KeyPath); get_by_node(Node, KeyPath) -> - emqx_conf_proto_v3:get_config(Node, KeyPath). + emqx_conf_proto_v4:get_config(Node, KeyPath). %% @doc Returns the specified node's KeyPath, or the default value if not found -spec get_by_node(node(), emqx_utils_maps:config_key_path(), term()) -> term(). get_by_node(Node, KeyPath, Default) when Node =:= node() -> emqx:get_config(KeyPath, Default); get_by_node(Node, KeyPath, Default) -> - emqx_conf_proto_v3:get_config(Node, KeyPath, Default). + emqx_conf_proto_v4:get_config(Node, KeyPath, Default). %% @doc Returns the specified node's KeyPath, or config_not_found if key path not found -spec get_node_and_config(emqx_utils_maps:config_key_path()) -> term(). @@ -94,7 +94,7 @@ get_node_and_config(KeyPath) -> ) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. update(KeyPath, UpdateReq, Opts) -> - emqx_conf_proto_v3:update(KeyPath, UpdateReq, Opts). + emqx_conf_proto_v4:update(KeyPath, UpdateReq, Opts). %% @doc Update the specified node's key path in local-override.conf. -spec update( @@ -107,7 +107,7 @@ update(KeyPath, UpdateReq, Opts) -> update(Node, KeyPath, UpdateReq, Opts0) when Node =:= node() -> emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local}); update(Node, KeyPath, UpdateReq, Opts) -> - emqx_conf_proto_v3:update(Node, KeyPath, UpdateReq, Opts). + emqx_conf_proto_v4:update(Node, KeyPath, UpdateReq, Opts). %% @doc Mark the specified key path as tombstone tombstone(KeyPath, Opts) -> @@ -117,7 +117,7 @@ tombstone(KeyPath, Opts) -> -spec remove(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. remove(KeyPath, Opts) -> - emqx_conf_proto_v3:remove_config(KeyPath, Opts). + emqx_conf_proto_v4:remove_config(KeyPath, Opts). %% @doc remove the specified node's key path in local-override.conf. -spec remove(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> @@ -125,13 +125,13 @@ remove(KeyPath, Opts) -> remove(Node, KeyPath, Opts) when Node =:= node() -> emqx:remove_config(KeyPath, Opts#{override_to => local}); remove(Node, KeyPath, Opts) -> - emqx_conf_proto_v3:remove_config(Node, KeyPath, Opts). + emqx_conf_proto_v4:remove_config(Node, KeyPath, Opts). %% @doc reset all value of key path in cluster-override.conf or local-override.conf. -spec reset(emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. reset(KeyPath, Opts) -> - emqx_conf_proto_v3:reset(KeyPath, Opts). + emqx_conf_proto_v4:reset(KeyPath, Opts). %% @doc reset the specified node's key path in local-override.conf. -spec reset(node(), emqx_utils_maps:config_key_path(), emqx_config:update_opts()) -> @@ -139,7 +139,7 @@ reset(KeyPath, Opts) -> reset(Node, KeyPath, Opts) when Node =:= node() -> emqx:reset_config(KeyPath, Opts#{override_to => local}); reset(Node, KeyPath, Opts) -> - emqx_conf_proto_v3:reset(Node, KeyPath, Opts). + emqx_conf_proto_v4:reset(Node, KeyPath, Opts). %% @doc Called from build script. %% TODO: move to a external escript after all refactoring is done diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 75e106c54..e6bdb5487 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -139,7 +139,7 @@ sync_cluster_conf() -> %% @private Some core nodes are running, try to sync the cluster config from them. sync_cluster_conf2(Nodes) -> - {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes), + {Results, Failed} = emqx_conf_proto_v4:get_override_config_file(Nodes), {Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), LogData = #{peer_nodes => Nodes, self_node => node()}, case Failed ++ NotReady of @@ -300,7 +300,7 @@ conf_sort({ok, _}, {ok, _}) -> false. sync_data_from_node(Node) -> - case emqx_conf_proto_v3:sync_data_from_node(Node) of + case emqx_conf_proto_v4:sync_data_from_node(Node) of {ok, DataBin} -> case zip:unzip(DataBin, [{cwd, emqx:data_dir()}]) of {ok, []} -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index f2b9d63b0..e730f23c0 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -18,13 +18,15 @@ -include("emqx_conf.hrl"). -include_lib("emqx_auth/include/emqx_authn_chains.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_schema.hrl"). -export([ load/0, admins/1, conf/1, audit/3, - unload/0 + unload/0, + mark_fix_log/2 ]). -export([keys/0, get_config/0, get_config/1, load_config/2]). @@ -38,6 +40,7 @@ -define(UPDATE_READONLY_KEYS_PROHIBITED, <<"Cannot update read-only key '~s'.">>). -define(SCHEMA_VALIDATION_CONF_ROOT_BIN, <<"schema_validation">>). -define(MESSAGE_TRANSFORMATION_CONF_ROOT_BIN, <<"message_transformation">>). +-define(TIMEOUT, 30000). -dialyzer({no_match, [load/0]}). @@ -90,13 +93,22 @@ admins(["skip", Node0]) -> Node = list_to_existing_atom(Node0), emqx_cluster_rpc:skip_failed_commit(Node), status(); -admins(["tnxid", TnxId0]) -> - %% changed to 'inspect' in 5.6 - %% TODO: delete this clause in 5.7 - admins(["inspect", TnxId0]); admins(["inspect", TnxId0]) -> TnxId = list_to_integer(TnxId0), print(emqx_cluster_rpc:query(TnxId)); +admins(["fix"]) -> + {atomic, Status} = emqx_cluster_rpc:status(), + case mria_rlog:role() of + core -> + #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + maybe_fix_lagging(Status, #{fix => true}), + StoppedNodes =/= [] andalso + emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]), + ok; + Role -> + Leader = emqx_cluster_rpc:find_leader(), + emqx_ctl:print("Run fix command on ~p(core) node, but current is ~p~n", [Leader, Role]) + end; admins(["fast_forward"]) -> status(), Nodes = mria:running_nodes(), @@ -118,6 +130,38 @@ admins(["fast_forward", Node0, ToTnxId]) -> admins(_) -> emqx_ctl:usage(usage_sync()). +fix_lagging_with_raw(ToTnxId, Node, Keys) -> + Confs = lists:foldl( + fun(Key, Acc) -> + KeyRaw = atom_to_binary(Key), + Acc#{KeyRaw => emqx_conf_proto_v4:get_raw_config(Node, [Key])} + end, + #{}, + Keys + ), + case mark_fix_begin(Node, ToTnxId) of + ok -> + case load_config_from_raw(Confs, #{mode => replace}) of + ok -> waiting_for_fix_finish(); + Error0 -> Error0 + end; + {error, Reason} -> + emqx_ctl:warning("mark fix begin failed: ~s~n", [Reason]) + end. + +mark_fix_begin(Node, TnxId) -> + {atomic, Status} = emqx_cluster_rpc:status(), + MFA = {?MODULE, mark_fix_log, [Status]}, + emqx_cluster_rpc:update_mfa(Node, MFA, TnxId). + +mark_fix_log(Status, Opts) -> + ?SLOG(warning, #{ + msg => cluster_config_sync_triggered, + status => emqx_utils:redact(Status), + opts => Opts + }), + ok. + audit(Level, From, Log) -> ?AUDIT(Level, redact(Log#{from => From})). @@ -159,29 +203,72 @@ usage_sync() -> "WARNING: This results in inconsistent configs among the clustered nodes."}, {"conf cluster_sync fast_forward [node] ", "Fast-forward config change to the given commit ID on the given node.\n" - "WARNING: This results in inconsistent configs among the clustered nodes."} + "WARNING: This results in inconsistent configs among the clustered nodes."}, + {"conf cluster_sync fix", + "Sync the node with the most comprehensive configuration to other node.\n" + "WARNING: typically the config leader(with the highest tnxid)."} ]. status() -> - emqx_ctl:print("-----------------------------------------------\n"), {atomic, Status} = emqx_cluster_rpc:status(), - lists:foreach( - fun(S) -> - #{ - node := Node, - tnx_id := TnxId, - mfa := {M, F, A}, - created_at := CreatedAt - } = S, - emqx_ctl:print( - "~p:[~w] CreatedAt:~p ~p:~p/~w\n", - [Node, TnxId, CreatedAt, M, F, length(A)] - ) - end, - Status - ), + status(Status). + +status(Status) -> + emqx_ctl:print("-----------------------------------------------\n"), + #{stopped_nodes := StoppedNodes} = emqx_mgmt_cli:cluster_info(), + maybe_fix_lagging(Status, #{fix => false}), + StoppedNodes =/= [] andalso emqx_ctl:warning("Found stopped nodes: ~p~n", [StoppedNodes]), emqx_ctl:print("-----------------------------------------------\n"). +maybe_fix_lagging(Status, #{fix := Fix}) -> + %% find inconsistent in conf, but fix in raw way. + %% because the raw conf is hard to be compared. (e.g, 1000ms vs 1s) + AllConfs = find_running_confs(), + case find_lagging(Status, AllConfs) of + {inconsistent_tnx_id_key, ToTnxId, Target, InconsistentKeys} when Fix -> + _ = fix_lagging_with_raw(ToTnxId, Target, InconsistentKeys), + ok; + {inconsistent_tnx_id_key, _ToTnxId, Target, InconsistentKeys} -> + emqx_ctl:warning("Inconsistent keys: ~p~n", [InconsistentKeys]), + print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs); + {inconsistent_tnx_id, ToTnxId, Target} when Fix -> + print_tnx_id_status(Status), + case mark_fix_begin(Target, ToTnxId) of + ok -> + waiting_for_fix_finish(), + emqx_ctl:print("Forward tnxid to ~w successfully~n", [ToTnxId + 1]); + Error -> + Error + end; + {inconsistent_tnx_id, _ToTnxId, _Target} -> + print_tnx_id_status(Status), + Leader = emqx_cluster_rpc:find_leader(), + emqx_ctl:print(?SUGGESTION(Leader)); + {inconsistent_key, ToTnxId, InconsistentKeys} -> + [{Target, _} | _] = AllConfs, + print_inconsistent_conf(InconsistentKeys, Target, Status, AllConfs), + emqx_ctl:warning("All configuration synchronized(tnx_id=~w)~n", [ + ToTnxId + ]), + emqx_ctl:warning( + "but inconsistent keys were found: ~p, which come from environment variables or etc/emqx.conf.~n", + [InconsistentKeys] + ), + emqx_ctl:warning( + "Configuring different values (excluding node.name) through environment variables and etc/emqx.conf" + " is allowed but not recommended.~n" + ), + Fix andalso emqx_ctl:warning("So this fix will not make any changes.~n"), + ok; + {consistent, Msg} -> + emqx_ctl:print(Msg) + end. + +print_tnx_id_status(List0) -> + emqx_ctl:print("No inconsistent configuration found but has inconsistent tnxId ~n"), + List1 = lists:map(fun(#{node := Node, tnx_id := TnxId}) -> {Node, TnxId} end, List0), + emqx_ctl:print("~p~n", [List1]). + print_keys(Keys) -> SortKeys = lists:sort(Keys), emqx_ctl:print("~1p~n", [[binary_to_existing_atom(K) || K <- SortKeys]]). @@ -500,8 +587,7 @@ filter_readonly_config(Raw) -> try RawDefault = fill_defaults(Raw), _ = emqx_config:check_config(SchemaMod, RawDefault), - ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], - {ok, maps:without(ReadOnlyKeys, Raw)} + {ok, maps:without([atom_to_binary(K) || K <- ?READONLY_KEYS], Raw)} catch throw:Error -> ?SLOG(error, #{ @@ -588,3 +674,246 @@ warning(_, Format, Args) -> emqx_ctl:warning(Format, Args). print(#{log := none}, _, _) -> ok; print(_, Format, Args) -> emqx_ctl:print(Format, Args). + +waiting_for_fix_finish() -> + timer:sleep(1000), + waiting_for_sync_finish(1). + +waiting_for_sync_finish(10) -> + emqx_ctl:warning( + "Config is still not in sync after 10s.~n" + "It may need more time, and check the logs in the lagging nodes.~n" + ); +waiting_for_sync_finish(Sec) -> + {atomic, Status} = emqx_cluster_rpc:status(), + case lists:usort([TnxId || #{tnx_id := TnxId} <- Status]) of + [_] -> + emqx_ctl:warning("sync successfully in ~ws ~n", [Sec]); + _ -> + Res = lists:sort([{TnxId, Node} || #{node := Node, tnx_id := TnxId} <- Status]), + emqx_ctl:warning("sync status: ~p~n", [Res]), + timer:sleep(1000), + waiting_for_sync_finish(Sec + 1) + end. + +find_lagging(Status, AllConfs) -> + case find_highest_node(Status) of + {same_tnx_id, TnxId} -> + %% check the conf is the same or not + [{_, TargetConf} | OtherConfs] = AllConfs, + case find_inconsistent_key(TargetConf, OtherConfs) of + [] -> + Msg = + <<"All configuration synchronized(tnx_id=", + (integer_to_binary(TnxId))/binary, ") successfully\n">>, + {consistent, Msg}; + InconsistentKeys -> + {inconsistent_key, TnxId, InconsistentKeys} + end; + {ok, TargetId, Target} -> + {value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs), + case find_inconsistent_key(TargetConf, OtherConfs) of + [] -> {inconsistent_tnx_id, TargetId, Target}; + ChangedKeys -> {inconsistent_tnx_id_key, TargetId, Target, ChangedKeys} + end + end. + +find_inconsistent_key(TargetConf, OtherConfs) -> + lists:usort( + lists:foldl( + fun({_Node, OtherConf}, Changed) -> + lists:filtermap( + fun({K, V}) -> changed(K, V, TargetConf) end, + maps:to_list(OtherConf) + ) ++ Changed + end, + [], + OtherConfs + ) + ). + +find_highest_node([]) -> + {same_tnx_id, 0}; +find_highest_node(Status) -> + Ids = [{Id, Node} || #{tnx_id := Id, node := Node} <- Status], + case lists:usort(fun({A, _}, {B, _}) -> A >= B end, Ids) of + [{TnxId, _}] -> + {same_tnx_id, TnxId}; + [{TnxId, Target} | _] -> + {ok, TnxId, Target} + end. + +changed(K, V, Conf) -> + case maps:find(K, Conf) of + {ok, V1} when V =:= V1 -> false; + _ -> {true, K} + end. + +find_running_confs() -> + lists:map( + fun(Node) -> + Conf = emqx_conf_proto_v4:get_config(Node, []), + {Node, maps:without(?READONLY_KEYS, Conf)} + end, + mria:running_nodes() + ). + +print_inconsistent_conf(Keys, Target, Status, AllConfs) -> + {value, {_, TargetConf}, OtherConfs} = lists:keytake(Target, 1, AllConfs), + TargetTnxId = get_tnx_id(Target, Status), + lists:foreach( + fun(Key) -> + lists:foreach( + fun({Node, OtherConf}) -> + TargetV = maps:get(Key, TargetConf), + PrevV = maps:get(Key, OtherConf), + NodeTnxId = get_tnx_id(Node, Status), + Options = #{ + key => Key, + node => {Node, NodeTnxId}, + target => {Target, TargetTnxId} + }, + print_inconsistent_conf(TargetV, PrevV, Options) + end, + OtherConfs + ) + end, + Keys + ). + +get_tnx_id(Node, Status) -> + case lists:filter(fun(#{node := Node0}) -> Node0 =:= Node end, Status) of + [] -> 0; + [#{tnx_id := TnxId}] -> TnxId + end. + +print_inconsistent_conf(SameConf, SameConf, _Options) -> + ok; +print_inconsistent_conf(New = #{}, Old = #{}, Options) -> + #{ + added := Added, + removed := Removed, + changed := Changed + } = emqx_utils_maps:diff_maps(New, Old), + RemovedFmt = "~ts(~w)'s ~s has deleted certain keys, but they are still present on ~ts(~w).~n", + print_inconsistent(Removed, RemovedFmt, Options), + AddedFmt = "~ts(~w)'s ~s has new setting, but it has not been applied to ~ts(~w).~n", + print_inconsistent(Added, AddedFmt, Options), + ChangedFmt = + "~ts(~w)'s ~s has been updated, but the changes have not been applied to ~ts(~w).~n", + print_inconsistent(Changed, ChangedFmt, Options); +%% authentication rewrite topic_metrics is list(not map). +print_inconsistent_conf(New, Old, Options) -> + #{ + key := Key, + target := {Target, TargetTnxId}, + node := {Node, NodeTnxId} + } = Options, + emqx_ctl:print("~ts(tnx_id=~w)'s ~s is diff from ~ts(tnx_id=~w).~n", [ + Node, NodeTnxId, Key, Target, TargetTnxId + ]), + emqx_ctl:print("~ts:~n", [Node]), + print_hocon(Old), + emqx_ctl:print("~ts:~n", [Target]), + print_hocon(New). + +print_inconsistent(Conf, Fmt, Options) when Conf =/= #{} -> + #{ + key := Key, + target := {Target, TargetTnxId}, + node := {Node, NodeTnxId} + } = Options, + emqx_ctl:warning(Fmt, [Target, TargetTnxId, Key, Node, NodeTnxId]), + NodeRawConf = emqx_conf_proto_v4:get_raw_config(Node, [Key]), + TargetRawConf = emqx_conf_proto_v4:get_raw_config(Target, [Key]), + {TargetConf, NodeConf} = + maps:fold( + fun(SubKey, _, {NewAcc, OldAcc}) -> + SubNew0 = maps:get(atom_to_binary(SubKey), NodeRawConf, undefined), + SubOld0 = maps:get(atom_to_binary(SubKey), TargetRawConf, undefined), + {SubNew1, SubOld1} = remove_identical_value(SubNew0, SubOld0), + {NewAcc#{SubKey => SubNew1}, OldAcc#{SubKey => SubOld1}} + end, + {#{}, #{}}, + Conf + ), + %% zones.default is a virtual zone. It will be changed when mqtt changes, + %% so we can't retrieve the raw data for zones.default(always undefined). + case TargetConf =:= NodeConf of + true -> ok; + false -> print_hocon(#{Target => #{Key => TargetConf}, Node => #{Key => NodeConf}}) + end; +print_inconsistent(_Conf, _Format, _Options) -> + ok. + +remove_identical_value(New = #{}, Old = #{}) -> + maps:fold( + fun(K, NewV, {Acc1, Acc2}) -> + case maps:find(K, Old) of + {ok, NewV} -> + {maps:remove(K, Acc1), maps:remove(K, Acc2)}; + {ok, OldV} -> + {NewV1, OldV1} = remove_identical_value(NewV, OldV), + {maps:put(K, NewV1, Acc1), maps:put(K, OldV1, Acc2)} + end + end, + {New, Old}, + New + ); +remove_identical_value(New, Old) -> + {New, Old}. + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +find_inconsistent_test() -> + Status = [ + #{node => 'node1', tnx_id => 1}, + #{node => 'node2', tnx_id => 3}, + #{node => 'node3', tnx_id => 2} + ], + Stats = #{<<"enable">> => false}, + + %% chose the highest tnx_id node + Mqtt = #{ + <<"await_rel_timeout">> => <<"300s">>, + <<"exclusive_subscription">> => false, + <<"idle_timeout">> => <<"15s">> + }, + TargetMqtt1 = Mqtt#{<<"idle_timeout">> => <<"16s">>}, + Confs0 = [ + {node1, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"11s">>}, <<"stats">> => Stats}}, + {node3, #{<<"mqtt">> => Mqtt#{<<"idle_timeout">> => <<"17s">>}, <<"stats">> => Stats}}, + {node2, #{<<"mqtt">> => TargetMqtt1, <<"stats">> => Stats}} + ], + ?assertEqual( + {inconsistent_tnx_id_key, 3, node2, [<<"mqtt">>]}, find_lagging(Status, Confs0) + ), + + %% conf is the same, no changed + NoDiffConfs = [ + {node1, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, + {node2, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}}, + {node3, #{<<"mqtt">> => Mqtt, <<"stats">> => Stats}} + ], + ?assertEqual({inconsistent_tnx_id, 3, node2}, find_lagging(Status, NoDiffConfs)), + + %% same tnx_id + SameStatus = [ + #{node => 'node1', tnx_id => 3}, + #{node => 'node2', tnx_id => 3}, + #{node => 'node3', tnx_id => 3} + ], + %% same conf + ?assertEqual( + {consistent, <<"All configuration synchronized(tnx_id=3) successfully\n">>}, + find_lagging(SameStatus, NoDiffConfs) + ), + %% diff conf same tnx_id use the first one + ?assertEqual( + {inconsistent_key, 3, [<<"mqtt">>]}, + find_lagging(SameStatus, Confs0) + ), + ok. + +-endif. diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl index 14d578ec1..8e5421d57 100644 --- a/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v3.erl @@ -20,6 +20,7 @@ -export([ introduced_in/0, + deprecated_since/0, sync_data_from_node/1, get_config/2, get_config/3, @@ -43,6 +44,9 @@ introduced_in() -> "5.1.1". +deprecated_since() -> + "5.7.1". + -spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). sync_data_from_node(Node) -> rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). diff --git a/apps/emqx_conf/src/proto/emqx_conf_proto_v4.erl b/apps/emqx_conf/src/proto/emqx_conf_proto_v4.erl new file mode 100644 index 000000000..8810ef8fd --- /dev/null +++ b/apps/emqx_conf/src/proto/emqx_conf_proto_v4.erl @@ -0,0 +1,124 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_proto_v4). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + sync_data_from_node/1, + get_config/2, + get_config/3, + get_all/1, + + update/3, + update/4, + remove_config/2, + remove_config/3, + + reset/2, + reset/3, + + get_override_config_file/1 +]). + +-export([get_hocon_config/1, get_hocon_config/2]). +-export([get_raw_config/2]). + +-include_lib("emqx/include/bpapi.hrl"). + +introduced_in() -> + "5.7.1". + +-spec sync_data_from_node(node()) -> {ok, binary()} | emqx_rpc:badrpc(). +sync_data_from_node(Node) -> + rpc:call(Node, emqx_conf_app, sync_data_from_node, [], 20000). +-type update_config_key_path() :: [emqx_utils_maps:config_key(), ...]. + +-spec get_config(node(), emqx_utils_maps:config_key_path()) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_config, [KeyPath]). + +-spec get_config(node(), emqx_utils_maps:config_key_path(), _Default) -> + term() | emqx_rpc:badrpc(). +get_config(Node, KeyPath, Default) -> + rpc:call(Node, emqx, get_config, [KeyPath, Default]). + +-spec get_all(emqx_utils_maps:config_key_path()) -> emqx_rpc:multicall_result(). +get_all(KeyPath) -> + rpc:multicall(emqx_conf, get_node_and_config, [KeyPath], 5000). + +-spec update( + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +update(KeyPath, UpdateReq, Opts) -> + emqx_cluster_rpc:multicall(emqx, update_config, [KeyPath, UpdateReq, Opts]). + +-spec update( + node(), + update_config_key_path(), + emqx_config:update_request(), + emqx_config:update_opts() +) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +update(Node, KeyPath, UpdateReq, Opts) -> + rpc:call(Node, emqx, update_config, [KeyPath, UpdateReq, Opts], 5000). + +-spec remove_config(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +remove_config(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, remove_config, [KeyPath, Opts]). + +-spec remove_config(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +remove_config(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, remove_config, [KeyPath, Opts], 5000). + +-spec reset(update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +reset(KeyPath, Opts) -> + emqx_cluster_rpc:multicall(emqx, reset_config, [KeyPath, Opts]). + +-spec reset(node(), update_config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} + | {error, emqx_config:update_error()} + | emqx_rpc:badrpc(). +reset(Node, KeyPath, Opts) -> + rpc:call(Node, emqx, reset_config, [KeyPath, Opts]). + +-spec get_override_config_file([node()]) -> emqx_rpc:multicall_result(). +get_override_config_file(Nodes) -> + rpc:multicall(Nodes, emqx_conf_app, get_override_config_file, [], 20000). + +-spec get_hocon_config(node()) -> map() | {badrpc, _}. +get_hocon_config(Node) -> + rpc:call(Node, emqx_conf_cli, get_config, []). + +-spec get_raw_config(node(), update_config_key_path()) -> map() | {badrpc, _}. +get_raw_config(Node, KeyPath) -> + rpc:call(Node, emqx, get_raw_config, [KeyPath]). + +-spec get_hocon_config(node(), binary()) -> map() | {badrpc, _}. +get_hocon_config(Node, Key) -> + rpc:call(Node, emqx_conf_cli, get_config, [Key]). diff --git a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index 97b80ab6b..5dd005c02 100644 --- a/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -141,13 +141,15 @@ t_commit_ok_but_apply_fail_on_other_node(_Config) -> Res1 = gen_server:call(?NODE2, Call), Res2 = gen_server:call(?NODE3, Call), %% Node2 is retry on tnx_id 1, and should not run Next MFA. - ?assertEqual( - {init_failure, #{ - msg => stale_view_of_cluster_state, - retry_times => 2, - cluster_tnx_id => 2, - node_tnx_id => 1 - }}, + ?assertMatch( + {init_failure, + {error, #{ + msg := stale_view_of_cluster, + retry_times := 2, + cluster_tnx_id := 2, + node_tnx_id := 1, + suggestion := _ + }}}, Res1 ), ?assertEqual(Res1, Res2), diff --git a/apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl b/apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl new file mode 100644 index 000000000..093fe591f --- /dev/null +++ b/apps/emqx_conf/test/emqx_conf_cluster_sync_SUITE.erl @@ -0,0 +1,158 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_cluster_sync_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include("emqx_conf.hrl"). + +-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + WorkDir = ?config(priv_dir, Config), + Cluster = mk_cluster_spec(#{}), + Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}), + [{cluster_nodes, Nodes} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_cluster:stop(?config(cluster_nodes, Config)). + +t_fix(Config) -> + [Node1, Node2] = ?config(cluster_nodes, Config), + ?ON(Node1, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())), + ?ON(Node2, ?assertMatch({atomic, []}, emqx_cluster_rpc:status())), + ?ON(Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 100}, #{})), + ?assertEqual(100, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertEqual(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + ?ON( + Node1, + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 1}, + #{node := Node1, tnx_id := 1} + ]}, + emqx_cluster_rpc:status() + ) + ), + %% fix normal, nothing changed + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 1}, + #{node := Node1, tnx_id := 1} + ]}, + emqx_cluster_rpc:status() + ) + end), + %% fix inconsistent_key. tnx_id is the same, so nothing changed. + emqx_conf_proto_v4:update(Node1, [<<"mqtt">>], #{<<"max_topic_levels">> => 99}, #{}), + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 1}, + #{node := Node1, tnx_id := 1} + ]}, + emqx_cluster_rpc:status() + ) + end), + ?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertMatch(100, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + + %% fix inconsistent_tnx_id_key. tnx_id and key are updated. + ?ON(Node1, fake_mfa(2, Node1, {?MODULE, undef, []})), + %% 2 -> fake_mfa, 3-> mark_begin_log, 4-> mqtt 5 -> zones + ?ON(Node2, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 5}, + #{node := Node1, tnx_id := 5} + ]}, + emqx_cluster_rpc:status() + ) + end), + ?assertMatch(99, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertMatch(99, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + + %% fix inconsistent_tnx_id. tnx_id is updated. + {ok, _} = ?ON( + Node1, emqx_conf_proto_v4:update([<<"mqtt">>], #{<<"max_topic_levels">> => 98}, #{}) + ), + ?ON(Node2, fake_mfa(7, Node2, {?MODULE, undef1, []})), + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 8}, + #{node := Node1, tnx_id := 8} + ]}, + emqx_cluster_rpc:status() + ) + end), + ?assertMatch(98, emqx_conf_proto_v4:get_config(Node1, [mqtt, max_topic_levels])), + ?assertMatch(98, emqx_conf_proto_v4:get_config(Node2, [mqtt, max_topic_levels])), + %% unchanged + ?ON(Node1, begin + ok = emqx_conf_cli:admins(["fix"]), + ?assertMatch( + {atomic, [ + #{node := Node2, tnx_id := 8}, + #{node := Node1, tnx_id := 8} + ]}, + emqx_cluster_rpc:status() + ) + end), + ok. + +fake_mfa(TnxId, Node, MFA) -> + Func = fun() -> + MFARec = #cluster_rpc_mfa{ + tnx_id = TnxId, + mfa = MFA, + initiator = Node, + created_at = erlang:localtime() + }, + ok = mnesia:write(?CLUSTER_MFA, MFARec, write), + ok = emqx_cluster_rpc:commit(Node, TnxId) + end, + {atomic, ok} = mria:transaction(?CLUSTER_RPC_SHARD, Func, []), + ok. + +mk_cluster_spec(Opts) -> + Conf = #{ + listeners => #{ + tcp => #{default => <<"marked_for_deletion">>}, + ssl => #{default => <<"marked_for_deletion">>}, + ws => #{default => <<"marked_for_deletion">>}, + wss => #{default => <<"marked_for_deletion">>} + } + }, + Apps = [ + {emqx, #{config => Conf}}, + {emqx_conf, #{config => Conf}} + ], + [ + {emqx_authz_api_cluster_SUITE1, Opts#{role => core, apps => Apps}}, + {emqx_authz_api_cluster_SUITE2, Opts#{role => core, apps => Apps}} + ]. diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index 56e1f9bc1..f2c74968e 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -314,15 +314,15 @@ global_zone_configs(get, _Params, _Req) -> {200, get_zones()}; global_zone_configs(put, #{body := Body}, _Req) -> PrevZones = get_zones(), - Res = + {Res, Error} = maps:fold( - fun(Path, Value, Acc) -> + fun(Path, Value, {Acc, Error}) -> PrevValue = maps:get(Path, PrevZones), case Value =/= PrevValue of true -> case emqx_conf:update([Path], Value, ?OPTS) of {ok, #{raw_config := RawConf}} -> - Acc#{Path => RawConf}; + {Acc#{Path => RawConf}, Error}; {error, Reason} -> ?SLOG(error, #{ msg => "update_global_zone_failed", @@ -330,18 +330,18 @@ global_zone_configs(put, #{body := Body}, _Req) -> path => Path, value => Value }), - Acc + {Acc, Error#{Path => Reason}} end; false -> - Acc#{Path => Value} + {Acc#{Path => Value}, Error} end end, - #{}, + {#{}, #{}}, Body ), case maps:size(Res) =:= maps:size(Body) of true -> {200, Res}; - false -> {400, #{code => 'UPDATE_FAILED'}} + false -> {400, #{code => 'UPDATE_FAILED', message => ?ERR_MSG(Error)}} end. config_reset(post, _Params, Req) -> @@ -428,9 +428,9 @@ get_configs_v2(QueryStr) -> Conf = case maps:find(<<"key">>, QueryStr) of error -> - emqx_conf_proto_v3:get_hocon_config(Node); + emqx_conf_proto_v4:get_hocon_config(Node); {ok, Key} -> - emqx_conf_proto_v3:get_hocon_config(Node, atom_to_binary(Key)) + emqx_conf_proto_v4:get_hocon_config(Node, atom_to_binary(Key)) end, { 200, diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 8d327efe6..2742167f5 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -44,7 +44,8 @@ pem_cache/1, olp/1, data/1, - ds/1 + ds/1, + cluster_info/0 ]). -spec load() -> ok. @@ -53,7 +54,7 @@ load() -> lists:foreach(fun(Cmd) -> emqx_ctl:register_command(Cmd, {?MODULE, Cmd}, []) end, Cmds). is_cmd(Fun) -> - not lists:member(Fun, [init, load, module_info]). + not lists:member(Fun, [init, load, module_info, cluster_info]). %%-------------------------------------------------------------------- %% @doc Node status diff --git a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl index 10906183b..066d4a45d 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_configs_SUITE.erl @@ -240,11 +240,11 @@ t_configs_node({'init', Config}) -> (bad_node, _) -> {badrpc, bad} end, meck:expect(emqx_management_proto_v5, get_full_config, F), - meck:expect(emqx_conf_proto_v3, get_hocon_config, F2), + meck:expect(emqx_conf_proto_v4, get_hocon_config, F2), meck:expect(hocon_pp, do, fun(Conf, _) -> Conf end), Config; t_configs_node({'end', _}) -> - meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v3, hocon_pp]); + meck:unload([emqx, emqx_management_proto_v5, emqx_conf_proto_v4, hocon_pp]); t_configs_node(_) -> Node = atom_to_list(node()), diff --git a/changes/ce/feat-13202.en.md b/changes/ce/feat-13202.en.md new file mode 100644 index 000000000..6871f5acc --- /dev/null +++ b/changes/ce/feat-13202.en.md @@ -0,0 +1 @@ +Introduce `emqx_cli conf cluster_sync fix` to address cluster inconsistencies. It synchronizes the configuration of the node with the largest `tnx_id` to all nodes.