diff --git a/Makefile b/Makefile index f117d5e9d..0112776bb 100644 --- a/Makefile +++ b/Makefile @@ -20,8 +20,8 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.5.0 -export EMQX_EE_DASHBOARD_VERSION ?= e1.3.0 +export EMQX_DASHBOARD_VERSION ?= v1.5.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.3.1 PROFILE ?= emqx REL_PROFILES := emqx emqx-enterprise diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 87a4b47e0..1bec82504 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.3.1-alpha.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.3.1-alpha.4"). +-define(EMQX_RELEASE_EE, "5.3.1-alpha.5"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 1da84451d..1935a1b5a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -40,7 +40,8 @@ '/actions/:id/enable/:enable'/2, '/actions/:id/:operation'/2, '/nodes/:node/actions/:id/:operation'/2, - '/actions_probe'/2 + '/actions_probe'/2, + '/action_types'/2 ]). %% BpAPI @@ -79,7 +80,8 @@ paths() -> "/actions/:id/enable/:enable", "/actions/:id/:operation", "/nodes/:node/actions/:id/:operation", - "/actions_probe" + "/actions_probe", + "/action_types" ]. error_schema(Code, Message) when is_atom(Code) -> @@ -338,6 +340,27 @@ schema("/actions_probe") -> 400 => error_schema(['TEST_FAILED'], "bridge test failed") } } + }; +schema("/action_types") -> + #{ + 'operationId' => '/action_types', + get => #{ + tags => [<<"actions">>], + desc => ?DESC("desc_api10"), + summary => <<"List available action types">>, + responses => #{ + 200 => emqx_dashboard_swagger:schema_with_examples( + array(emqx_bridge_v2_schema:types_sc()), + #{ + <<"types">> => + #{ + summary => <<"Action types">>, + value => emqx_bridge_v2_schema:types() + } + } + ) + } + } }. '/actions'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) -> @@ -486,6 +509,9 @@ schema("/actions_probe") -> redact(BadRequest) end. +'/action_types'(get, _Request) -> + ?OK(emqx_bridge_v2_schema:types()). + maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) -> case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, #{raw_config := RawConf}} -> @@ -709,8 +735,10 @@ format_resource( #{ type := Type, name := Name, + status := Status, + error := Error, raw_config := RawConf, - resource_data := ResourceData + resource_data := _ResourceData }, Node ) -> @@ -719,14 +747,16 @@ format_resource( RawConf#{ type => Type, name => maps:get(<<"name">>, RawConf, Name), - node => Node + node => Node, + status => Status, + error => Error }, - format_resource_data(ResourceData) + format_bridge_status_and_error(#{status => Status, error => Error}) ) ). -format_resource_data(ResData) -> - maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)). +format_bridge_status_and_error(Data) -> + maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], Data)). format_resource_data(error, undefined, Result) -> Result; diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index d6d8eb9a1..1d059903a 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -30,9 +30,18 @@ post_request/0 ]). +-export([types/0, types_sc/0]). + -export([enterprise_api_schemas/1]). +-export_type([action_type/0]). + +%% Should we explicitly list them here so dialyzer may be more helpful? +-type action_type() :: atom(). + -if(?EMQX_RELEASE_EDITION == ee). +-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when + Method :: string(). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use %% `call_hocon' from `nodetool' to generate initial configurations. @@ -55,6 +64,8 @@ enterprise_fields_actions() -> -else. +-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when + Method :: string(). enterprise_api_schemas(_Method) -> []. enterprise_fields_actions() -> []. @@ -129,6 +140,14 @@ desc(actions) -> desc(_) -> undefined. +-spec types() -> [action_type()]. +types() -> + proplists:get_keys(?MODULE:fields(actions)). + +-spec types_sc() -> ?ENUM([action_type()]). +types_sc() -> + hoconsc:enum(types()). + -ifdef(TEST). -include_lib("hocon/include/hocon_types.hrl"). schema_homogeneous_test() -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index bf2ac51a2..9879fe1e6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -236,6 +236,14 @@ end_per_group(_, Config) -> emqx_cth_suite:stop(?config(group_apps, Config)), ok. +init_per_testcase(t_action_types, Config) -> + case ?config(cluster_nodes, Config) of + undefined -> + init_mocks(); + Nodes -> + [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] + end, + Config; init_per_testcase(_TestCase, Config) -> case ?config(cluster_nodes, Config) of undefined -> @@ -260,8 +268,14 @@ end_per_testcase(_TestCase, Config) -> -define(CONNECTOR_IMPL, emqx_bridge_v2_dummy_connector). init_mocks() -> - meck:new(emqx_connector_ee_schema, [passthrough, no_link]), - meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), + case emqx_release:edition() of + ee -> + meck:new(emqx_connector_ee_schema, [passthrough, no_link]), + meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), + ok; + ce -> + ok + end, meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect( @@ -289,7 +303,7 @@ init_mocks() -> ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId) end), - [?CONNECTOR_IMPL, emqx_connector_ee_schema]. + ok. clear_resources() -> lists:foreach( @@ -886,6 +900,14 @@ t_cascade_delete_actions(Config) -> ), {ok, 200, []} = request_json(get, uri([?ROOT]), Config). +t_action_types(Config) -> + Res = request_json(get, uri(["action_types"]), Config), + ?assertMatch({ok, 200, _}, Res), + {ok, 200, Types} = Res, + ?assert(is_list(Types), #{types => Types}), + ?assert(lists:all(fun is_binary/1, Types), #{types => Types}), + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 278a0420a..5a2b6b000 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -145,6 +145,39 @@ create_bridge(Config, Overrides) -> ct:pal("creating bridge with config: ~p", [BridgeConfig]), emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig). +list_bridges_api() -> + Params = [], + Path = emqx_mgmt_api_test_util:api_path(["actions"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("listing bridges (via http)"), + Res = + case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; + Error -> + Error + end, + ct:pal("list bridges result: ~p", [Res]), + Res. + +get_bridge_api(BridgeType, BridgeName) -> + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + Params = [], + Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("get bridge ~p (via http)", [{BridgeType, BridgeName}]), + Res = + case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; + Error -> + Error + end, + ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]), + Res. + create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 58a16ea67..6adb66357 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -29,25 +29,27 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - _ = application:load(emqx_conf), - ok = emqx_common_test_helpers:start_apps(apps_to_start_and_stop()), - application:ensure_all_started(telemetry), - application:ensure_all_started(wolff), - application:ensure_all_started(brod), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_kafka, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(), - Config. + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps(apps_to_start_and_stop()). - -apps_to_start_and_stop() -> - [ - emqx, - emqx_conf, - emqx_connector, - emqx_bridge, - emqx_rule_engine - ]. +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok. t_create_remove_list(_) -> [] = emqx_bridge_v2:list(), @@ -165,6 +167,24 @@ t_unknown_topic(_Config) -> ok end ), + ?assertMatch( + {ok, + {{_, 200, _}, _, [ + #{ + <<"status">> := <<"disconnected">>, + <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] + } + ]}}, + emqx_bridge_v2_testlib:list_bridges_api() + ), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status">> := <<"disconnected">>, + <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] + }}}, + emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName) + ), ok. check_send_message_with_bridge(BridgeName) -> diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index a758681ff..83737e746 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -21,6 +21,7 @@ -define(CLUSTER_MFA, cluster_rpc_mfa). -define(CLUSTER_COMMIT, cluster_rpc_commit). +-define(DEFAULT_INIT_TXN_ID, -1). -record(cluster_rpc_mfa, { tnx_id :: pos_integer(), diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 934d7ef7a..5bc330afa 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -44,7 +44,9 @@ read_next_mfa/1, trans_query/1, trans_status/0, - on_leave_clean/0 + on_leave_clean/0, + get_commit_lag/0, + get_commit_lag/1 ]). -export([ @@ -231,13 +233,29 @@ make_initiate_call_req(M, F, A) -> -spec get_node_tnx_id(node()) -> integer(). get_node_tnx_id(Node) -> case mnesia:wread({?CLUSTER_COMMIT, Node}) of - [] -> -1; + [] -> ?DEFAULT_INIT_TXN_ID; [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId end. +%% @doc Return the commit lag of *this* node. +-spec get_commit_lag() -> #{my_id := pos_integer(), latest := pos_integer()}. +get_commit_lag() -> + {atomic, Result} = transaction(fun ?MODULE:get_commit_lag/1, [node()]), + Result. + +get_commit_lag(Node) -> + LatestId = get_cluster_tnx_id(), + LatestNode = + case mnesia:read(?CLUSTER_MFA, LatestId) of + [#?CLUSTER_MFA{initiator = N}] -> N; + _ -> undefined + end, + MyId = get_node_tnx_id(Node), + #{my_id => MyId, latest => LatestId, latest_node => LatestNode}. + %% Checks whether the Mnesia tables used by this module are waiting to %% be loaded and from where. --spec get_tables_status() -> #{atom() => {waiting, [node()]} | {disc | network, node()}}. +-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {loaded, local | node()}}. get_tables_status() -> maps:from_list([ {Tab, do_get_tables_status(Tab)} @@ -249,13 +267,16 @@ do_get_tables_status(Tab) -> TabNodes = proplists:get_value(all_nodes, Props), KnownDown = mnesia_recover:get_mnesia_downs(), LocalNode = node(), - case proplists:get_value(load_node, Props) of + %% load_node. Returns the name of the node that Mnesia loaded the table from. + %% The structure of the returned value is unspecified, but can be useful for debugging purposes. + LoadedFrom = proplists:get_value(load_node, Props), + case LoadedFrom of unknown -> {waiting, TabNodes -- [LocalNode | KnownDown]}; LocalNode -> - {disc, LocalNode}; + {loaded, local}; Node -> - {network, Node} + {loaded, Node} end. %% Regardless of what MFA is returned, consider it a success), diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 7addb3823..74a7a8f2e 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -26,8 +26,6 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_conf.hrl"). --define(DEFAULT_INIT_TXN_ID, -1). - start(_StartType, _StartArgs) -> try ok = init_conf() @@ -52,31 +50,32 @@ unset_config_loaded() -> %% This function is named 'override' due to historical reasons. get_override_config_file() -> Node = node(), + Data = #{ + wall_clock => erlang:statistics(wall_clock), + node => Node, + release => emqx_release:version_with_prefix() + }, case emqx_app:init_load_done() of false -> - {error, #{node => Node, msg => "init_conf_load_not_done"}}; + {error, Data#{msg => "init_conf_load_not_done"}}; true -> case erlang:whereis(emqx_config_handler) of undefined -> - {error, #{node => Node, msg => "emqx_config_handler_not_ready"}}; + {error, Data#{msg => "emqx_config_handler_not_ready"}}; _ -> Fun = fun() -> TnxId = emqx_cluster_rpc:get_node_tnx_id(Node), - WallClock = erlang:statistics(wall_clock), Conf = emqx_config_handler:get_raw_cluster_override_conf(), HasDeprecateFile = emqx_config:has_deprecated_file(), - #{ - wall_clock => WallClock, + Data#{ conf => Conf, tnx_id => TnxId, - node => Node, - has_deprecated_file => HasDeprecateFile, - release => emqx_release:version_with_prefix() + has_deprecated_file => HasDeprecateFile } end, case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of {atomic, Res} -> {ok, Res}; - {aborted, Reason} -> {error, #{node => Node, msg => Reason}} + {aborted, Reason} -> {error, Data#{msg => Reason}} end end end. @@ -105,7 +104,7 @@ init_load(TnxId) -> ok = emqx_app:set_config_loader(emqx_conf), ok; Module -> - ?SLOG(debug, #{ + ?SLOG(info, #{ msg => "skip_init_config_load", reason => "Some application has set another config loader", loader => Module @@ -126,7 +125,7 @@ sync_cluster_conf() -> case cluster_nodes() of [] -> %% The first core nodes is self. - ?SLOG(debug, #{ + ?SLOG(info, #{ msg => "skip_sync_cluster_conf", reason => "This is a single node, or the first node in the cluster" }), @@ -138,70 +137,94 @@ sync_cluster_conf() -> %% @private Some core nodes are running, try to sync the cluster config from them. sync_cluster_conf2(Nodes) -> {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes), - {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), - NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), - case (Failed =/= [] orelse NotReady =/= []) of - true when Ready =/= [] -> - %% Some core nodes failed to reply. - Warning = #{ - nodes => Nodes, - failed => Failed, - not_ready => NotReady, - msg => "ignored_nodes_when_sync_cluster_conf" - }, - ?SLOG(warning, Warning); - true when Failed =/= [] -> - %% There are core nodes running but no one was able to reply. - ?SLOG(error, #{ - msg => "failed_to_sync_cluster_conf", - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }); - true -> - %% There are core nodes booting up - ?SLOG(info, #{ - msg => "peer_not_ready_for_config_sync", - reason => "The 'not_ready' peer node(s) are loading configs", - nodes => Nodes, - not_ready => NotReady - }); - false -> - ok - end, - case Ready of + {Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), + LogData = #{peer_nodes => Nodes, self_node => node()}, + case Failed ++ NotReady of [] -> - case should_proceed_with_boot() of - true -> - %% Act as if this node is alone, so it can - %% finish the boot sequence and load the - %% config for other nodes to copy it. - ?SLOG(info, #{ - msg => "skip_sync_cluster_conf", - loading_from_disk => true, - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }), - {ok, ?DEFAULT_INIT_TXN_ID}; - false -> - %% retry in some time - Jitter = rand:uniform(2000), - Timeout = 10000 + Jitter, - timer:sleep(Timeout), - ?SLOG(warning, #{ - msg => "sync_cluster_conf_retry", - timeout => Timeout, - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }), - sync_cluster_conf() - end; + ok; _ -> + ?SLOG( + warning, + LogData#{ + msg => "cluster_config_fetch_failures", + failed_nodes => Failed, + booting_nodes => NotReady + } + ) + end, + MyRole = mria_rlog:role(), + case Ready of + [] when MyRole =:= replicant -> + %% replicant should never boot without copying from a core node + delay_and_retry(LogData#{role => replicant}); + [] -> + %% none of the nodes are ready, either delay-and-retry or boot without wait + TableStatus = tx_commit_table_status(), + sync_cluster_conf5(TableStatus, LogData); + _ -> + %% copy config from the best node in the Ready list sync_cluster_conf3(Ready) end. +%% None of the peer nodes are responsive, so we have to make a decision +%% based on the commit lagging (if the commit table is loaded). +%% +%% It could be that the peer nodes are also booting up, +%% however we cannot always wait because it may run into a dead-lock. +%% +%% Giving up wait here implies that some changes made to the peer node outside +%% of cluster-rpc MFAs will be lost. +%% e.g. stop all nodes, manually change cluster.hocon in one node +%% then boot all nodes around the same time, the changed cluster.hocon may +%% get lost if the node happen to copy config from others. +sync_cluster_conf5({loaded, local}, LogData) -> + ?SLOG(info, LogData#{ + msg => "skip_copy_cluster_config_from_peer_nodes", + explain => "Commit table loaded locally from disk, assuming that I have the latest config" + }), + {ok, ?DEFAULT_INIT_TXN_ID}; +sync_cluster_conf5({loaded, From}, LogData) -> + case get_commit_lag() of + #{my_id := MyId, latest := Latest} = Lagging when MyId >= Latest orelse Latest =:= 0 -> + ?SLOG(info, LogData#{ + msg => "skip_copy_cluster_config_from_peer_nodes", + explain => "I have the latest cluster config commit", + commit_loaded_from => From, + lagging_info => Lagging + }), + {ok, ?DEFAULT_INIT_TXN_ID}; + #{my_id := _MyId, latest := _Latest} = Lagging -> + delay_and_retry(LogData#{lagging_info => Lagging, commit_loaded_from => From}) + end; +sync_cluster_conf5({waiting, Waiting}, LogData) -> + %% this may never happen? since we waited for table before + delay_and_retry(LogData#{table_pending => Waiting}). + +get_commit_lag() -> + emqx_cluster_rpc:get_commit_lag(). + +delay_and_retry(LogData) -> + Timeout = sync_delay_timeout(), + ?SLOG(warning, LogData#{ + msg => "sync_cluster_conf_retry", + explain => + "Cannot boot alone due to potentially stale data. " + "Will try sync cluster config again after delay", + delay => Timeout + }), + timer:sleep(Timeout), + sync_cluster_conf(). + +-ifdef(TEST). +sync_delay_timeout() -> + Jitter = rand:uniform(200), + 1_000 + Jitter. +-else. +sync_delay_timeout() -> + Jitter = rand:uniform(2000), + 10_000 + Jitter. +-endif. + %% @private Filter out the nodes which are running a newer version than this node. sync_cluster_conf3(Ready) -> case lists:filter(fun is_older_or_same_version/1, Ready) of @@ -217,10 +240,10 @@ sync_cluster_conf3(Ready) -> ), ?SLOG(warning, #{ msg => "all_available_nodes_running_newer_version", - hint => - "Booting this node without syncing cluster config from peer core nodes " + explain => + "Booting this node without syncing cluster config from core nodes " "because other nodes are running a newer version", - peer_nodes => NodesAndVersions + versions => NodesAndVersions }), {ok, ?DEFAULT_INIT_TXN_ID}; Ready2 -> @@ -246,7 +269,7 @@ sync_cluster_conf4(Ready) -> [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready), #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, HasDeprecatedFile = has_deprecated_file(Info), - ?SLOG(debug, #{ + ?SLOG(info, #{ msg => "sync_cluster_conf_success", synced_from_node => Node, has_deprecated_file => HasDeprecatedFile, @@ -263,19 +286,9 @@ sync_cluster_conf4(Ready) -> ok = sync_data_from_node(Node), {ok, TnxId}. -should_proceed_with_boot() -> +tx_commit_table_status() -> TablesStatus = emqx_cluster_rpc:get_tables_status(), - LocalNode = node(), - case maps:get(?CLUSTER_COMMIT, TablesStatus) of - {disc, LocalNode} -> - %% Loading locally; let this node finish its boot sequence - %% so others can copy the config from this one. - true; - _ -> - %% Loading from another node or still waiting for nodes to - %% be up. Try again. - false - end. + maps:get(?CLUSTER_COMMIT, TablesStatus). conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true; conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) -> diff --git a/changes/ce/fix-11897.en.md b/changes/ce/fix-11897.en.md new file mode 100644 index 000000000..383129b4a --- /dev/null +++ b/changes/ce/fix-11897.en.md @@ -0,0 +1 @@ +Fix config sync wait-loop race condition when cluster nodes boot around the same time. diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml index 4211f37e4..4f385a9ba 100644 --- a/deploy/charts/emqx-enterprise/Chart.yaml +++ b/deploy/charts/emqx-enterprise/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.3.1-alpha.4 +version: 5.3.1-alpha.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.3.1-alpha.4 +appVersion: 5.3.1-alpha.5 diff --git a/rel/i18n/emqx_bridge_v2_api.hocon b/rel/i18n/emqx_bridge_v2_api.hocon index 1f2c2bd8d..23a75712a 100644 --- a/rel/i18n/emqx_bridge_v2_api.hocon +++ b/rel/i18n/emqx_bridge_v2_api.hocon @@ -54,6 +54,12 @@ desc_api9.desc: desc_api9.label: """Test Bridge Creation""" +desc_api10.desc: +"""Lists the available action types.""" + +desc_api10.label: +"""List action types""" + desc_bridge_metrics.desc: """Get bridge metrics by id."""