From d061d64c70dcbca4c400c02df9381fefe595ecd1 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 6 Nov 2023 10:56:48 +0300 Subject: [PATCH 1/7] fix(plugins): fix backward compatibility --- apps/emqx/src/emqx_hookpoints.erl | 47 ++++++++++++++----- apps/emqx_plugins/test/emqx_plugins_SUITE.erl | 40 ++++++++++++++++ changes/ce/fix-11886.en.md | 3 ++ 3 files changed, 77 insertions(+), 13 deletions(-) create mode 100644 changes/ce/fix-11886.en.md diff --git a/apps/emqx/src/emqx_hookpoints.erl b/apps/emqx/src/emqx_hookpoints.erl index 1a1452a57..ba125101e 100644 --- a/apps/emqx/src/emqx_hookpoints.erl +++ b/apps/emqx/src/emqx_hookpoints.erl @@ -16,6 +16,8 @@ -module(emqx_hookpoints). +-include("logger.hrl"). + -type callback_result() :: stop | any(). -type fold_callback_result(Acc) :: {stop, Acc} | {ok, Acc} | stop | any(). @@ -62,12 +64,16 @@ 'delivery.dropped', 'delivery.completed', 'cm.channel.unregistered', - 'tls_handshake.psk_lookup', + 'tls_handshake.psk_lookup' +]). +%% Our template plugin used this hookpoints before its 5.1.0 version, +%% so we keep them here +-define(DEPRECATED_HOOKPOINTS, [ %% This is a deprecated hookpoint renamed to 'client.authorize' - %% However, our template plugin used this hookpoint before its 5.1.0 version, - %% so we keep it here - 'client.check_acl' + 'client.check_acl', + %% Misspelled hookpoint + 'session.takeovered' ]). %%----------------------------------------------------------------------------- @@ -206,27 +212,42 @@ when %% API %%----------------------------------------------------------------------------- -default_hookpoints() -> - ?HOOKPOINTS. +%% Binary hookpoint names are dynamic and used for bridges +-type registered_hookpoint() :: atom(). +-type registered_hookpoint_status() :: valid | deprecated. +-spec default_hookpoints() -> #{registered_hookpoint() => registered_hookpoint_status()}. +default_hookpoints() -> + maps:merge( + maps:from_keys(?HOOKPOINTS, valid), + maps:from_keys(?DEPRECATED_HOOKPOINTS, deprecated) + ). + +-spec register_hookpoints() -> ok. register_hookpoints() -> register_hookpoints(default_hookpoints()). -register_hookpoints(HookPoints) -> - persistent_term:put(?MODULE, maps:from_keys(HookPoints, true)). +-spec register_hookpoints( + [registered_hookpoint()] | #{registered_hookpoint() => registered_hookpoint_status()} +) -> ok. +register_hookpoints(HookPoints) when is_list(HookPoints) -> + register_hookpoints(maps:from_keys(HookPoints, valid)); +register_hookpoints(HookPoints) when is_map(HookPoints) -> + persistent_term:put(?MODULE, HookPoints). +-spec verify_hookpoint(registered_hookpoint() | binary()) -> ok | no_return(). verify_hookpoint(HookPoint) when is_binary(HookPoint) -> ok; verify_hookpoint(HookPoint) -> - case maps:is_key(HookPoint, registered_hookpoints()) of - true -> - ok; - false -> - error({invalid_hookpoint, HookPoint}) + case maps:find(HookPoint, registered_hookpoints()) of + {ok, valid} -> ok; + {ok, deprecated} -> ?SLOG(warning, #{msg => deprecated_hookpoint, hookpoint => HookPoint}); + error -> error({invalid_hookpoint, HookPoint}) end. %%----------------------------------------------------------------------------- %% Internal API %%----------------------------------------------------------------------------- +-spec registered_hookpoints() -> #{registered_hookpoint() => registered_hookpoint_status()}. registered_hookpoints() -> persistent_term:get(?MODULE, #{}). diff --git a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl index 5680aa047..3e9850129 100644 --- a/apps/emqx_plugins/test/emqx_plugins_SUITE.erl +++ b/apps/emqx_plugins/test/emqx_plugins_SUITE.erl @@ -29,6 +29,16 @@ ). -define(EMQX_PLUGIN_TEMPLATE_VSN, "5.1.0"). -define(EMQX_PLUGIN_TEMPLATE_TAG, "5.1.0"). + +-define(EMQX_PLUGIN_TEMPLATES_LEGACY, [ + #{ + vsn => "5.0.0", + tag => "5.0.0", + release_name => "emqx_plugin_template", + app_name => emqx_plugin_template + } +]). + -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_RELEASE_NAME, "elixir_plugin_template"). -define(EMQX_ELIXIR_PLUGIN_TEMPLATE_URL, "https://github.com/emqx/emqx-elixir-plugin/releases/download/" @@ -290,6 +300,36 @@ t_start_restart_and_stop(Config) -> ?assertEqual([], emqx_plugins:list()), ok. +t_legacy_plugins({init, Config}) -> + Config; +t_legacy_plugins({'end', _Config}) -> + ok; +t_legacy_plugins(Config) -> + lists:foreach( + fun(LegacyPlugin) -> + test_legacy_plugin(LegacyPlugin, Config) + end, + ?EMQX_PLUGIN_TEMPLATES_LEGACY + ). + +test_legacy_plugin(#{app_name := AppName} = LegacyPlugin, _Config) -> + #{package := Package} = get_demo_plugin_package(LegacyPlugin#{ + shdir => emqx_plugins:install_dir(), git_url => ?EMQX_PLUGIN_TEMPLATE_URL + }), + NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), + ok = emqx_plugins:ensure_installed(NameVsn), + %% start + ok = emqx_plugins:ensure_started(NameVsn), + ok = assert_app_running(AppName, true), + ok = assert_app_running(map_sets, true), + %% stop + ok = emqx_plugins:ensure_stopped(NameVsn), + ok = assert_app_running(AppName, false), + ok = assert_app_running(map_sets, false), + ok = emqx_plugins:ensure_uninstalled(NameVsn), + ?assertEqual([], emqx_plugins:list()), + ok. + t_enable_disable({init, Config}) -> #{package := Package} = get_demo_plugin_package(), NameVsn = filename:basename(Package, ?PACKAGE_SUFFIX), diff --git a/changes/ce/fix-11886.en.md b/changes/ce/fix-11886.en.md new file mode 100644 index 000000000..a4f7617a1 --- /dev/null +++ b/changes/ce/fix-11886.en.md @@ -0,0 +1,3 @@ +Fixed backward plugin compatibility. + +Currently, EMQX validates hookpoint names, so invalid hookspoints cannot be used for registering hooks. However, older versions of plugin templates used some misspelled hookpoints, and so could the real plugins. We allow the old hookpoints to be used for registering hooks, but issue a warning that they are deprecated. As before, these hooks are never called. From f5456135aa8864b343adc18d5aa65913700ece87 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 8 Nov 2023 09:45:50 -0300 Subject: [PATCH 2/7] fix(bridge_v2_api): take status and error from bridge, not the connector Fixes https://emqx.atlassian.net/browse/EMQX-11284 Fixes https://emqx.atlassian.net/browse/EMQX-11298 --- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 14 +++-- .../test/emqx_bridge_v2_testlib.erl | 33 ++++++++++++ .../emqx_bridge_v2_kafka_producer_SUITE.erl | 54 +++++++++++++------ 3 files changed, 79 insertions(+), 22 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 1da84451d..a8d634963 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -709,8 +709,10 @@ format_resource( #{ type := Type, name := Name, + status := Status, + error := Error, raw_config := RawConf, - resource_data := ResourceData + resource_data := _ResourceData }, Node ) -> @@ -719,14 +721,16 @@ format_resource( RawConf#{ type => Type, name => maps:get(<<"name">>, RawConf, Name), - node => Node + node => Node, + status => Status, + error => Error }, - format_resource_data(ResourceData) + format_bridge_status_and_error(#{status => Status, error => Error}) ) ). -format_resource_data(ResData) -> - maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)). +format_bridge_status_and_error(Data) -> + maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], Data)). format_resource_data(error, undefined, Result) -> Result; diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 278a0420a..5a2b6b000 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -145,6 +145,39 @@ create_bridge(Config, Overrides) -> ct:pal("creating bridge with config: ~p", [BridgeConfig]), emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig). +list_bridges_api() -> + Params = [], + Path = emqx_mgmt_api_test_util:api_path(["actions"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("listing bridges (via http)"), + Res = + case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; + Error -> + Error + end, + ct:pal("list bridges result: ~p", [Res]), + Res. + +get_bridge_api(BridgeType, BridgeName) -> + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + Params = [], + Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("get bridge ~p (via http)", [{BridgeType, BridgeName}]), + Res = + case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}}; + Error -> + Error + end, + ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]), + Res. + create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 58a16ea67..6adb66357 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -29,25 +29,27 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - _ = application:load(emqx_conf), - ok = emqx_common_test_helpers:start_apps(apps_to_start_and_stop()), - application:ensure_all_started(telemetry), - application:ensure_all_started(wolff), - application:ensure_all_started(brod), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_kafka, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(), - Config. + [{apps, Apps} | Config]. -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps(apps_to_start_and_stop()). - -apps_to_start_and_stop() -> - [ - emqx, - emqx_conf, - emqx_connector, - emqx_bridge, - emqx_rule_engine - ]. +end_per_suite(Config) -> + Apps = ?config(apps, Config), + emqx_cth_suite:stop(Apps), + ok. t_create_remove_list(_) -> [] = emqx_bridge_v2:list(), @@ -165,6 +167,24 @@ t_unknown_topic(_Config) -> ok end ), + ?assertMatch( + {ok, + {{_, 200, _}, _, [ + #{ + <<"status">> := <<"disconnected">>, + <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] + } + ]}}, + emqx_bridge_v2_testlib:list_bridges_api() + ), + ?assertMatch( + {ok, + {{_, 200, _}, _, #{ + <<"status">> := <<"disconnected">>, + <<"node_status">> := [#{<<"status">> := <<"disconnected">>}] + }}}, + emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName) + ), ok. check_send_message_with_bridge(BridgeName) -> From f9e9748cecf5e54025b9077fbd8f6c10fbcdc988 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Nov 2023 17:06:08 +0100 Subject: [PATCH 3/7] fix(cluster-rpc): boot from local config if table loaded When EMQX boots up, it tries to get latest config from peer (core type) nodes, if none of the nodes are replying, the node will decide to boot with local config (and replay the committed changes) if the commit table is loaded from disk locally (an indication of the data being latest), otherwise it will sleep for 1-2 seconds and retry. This lead to a race condition, e.g. in a two nodes cluster: 1. node1 boots up 2. node2 boots up and copy mnesia table from node1 3. node1 restart before node2 can sync cluster.hocon from it 4. node1 boots up and copy mnesia table from node2 Now that both node1 and node2 has the mnesia `load_node` pointing to each other (i.e. not a local disk load). Prior to this fix, the nodes would wait for each other in a dead loop. This commit fixes the issue by allowing node to boot with local config if it does not have a lagging. --- apps/emqx_conf/include/emqx_conf.hrl | 1 + apps/emqx_conf/src/emqx_cluster_rpc.erl | 33 ++++- apps/emqx_conf/src/emqx_conf_app.erl | 189 +++++++++++++----------- changes/ce/fix-11897.en.md | 1 + 4 files changed, 130 insertions(+), 94 deletions(-) create mode 100644 changes/ce/fix-11897.en.md diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl index 4ae2b1df9..2b4d48173 100644 --- a/apps/emqx_conf/include/emqx_conf.hrl +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -21,6 +21,7 @@ -define(CLUSTER_MFA, cluster_rpc_mfa). -define(CLUSTER_COMMIT, cluster_rpc_commit). +-define(DEFAULT_INIT_TXN_ID, -1). -record(cluster_rpc_mfa, { tnx_id :: pos_integer(), diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 934d7ef7a..5bc330afa 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -44,7 +44,9 @@ read_next_mfa/1, trans_query/1, trans_status/0, - on_leave_clean/0 + on_leave_clean/0, + get_commit_lag/0, + get_commit_lag/1 ]). -export([ @@ -231,13 +233,29 @@ make_initiate_call_req(M, F, A) -> -spec get_node_tnx_id(node()) -> integer(). get_node_tnx_id(Node) -> case mnesia:wread({?CLUSTER_COMMIT, Node}) of - [] -> -1; + [] -> ?DEFAULT_INIT_TXN_ID; [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId end. +%% @doc Return the commit lag of *this* node. +-spec get_commit_lag() -> #{my_id := pos_integer(), latest := pos_integer()}. +get_commit_lag() -> + {atomic, Result} = transaction(fun ?MODULE:get_commit_lag/1, [node()]), + Result. + +get_commit_lag(Node) -> + LatestId = get_cluster_tnx_id(), + LatestNode = + case mnesia:read(?CLUSTER_MFA, LatestId) of + [#?CLUSTER_MFA{initiator = N}] -> N; + _ -> undefined + end, + MyId = get_node_tnx_id(Node), + #{my_id => MyId, latest => LatestId, latest_node => LatestNode}. + %% Checks whether the Mnesia tables used by this module are waiting to %% be loaded and from where. --spec get_tables_status() -> #{atom() => {waiting, [node()]} | {disc | network, node()}}. +-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {loaded, local | node()}}. get_tables_status() -> maps:from_list([ {Tab, do_get_tables_status(Tab)} @@ -249,13 +267,16 @@ do_get_tables_status(Tab) -> TabNodes = proplists:get_value(all_nodes, Props), KnownDown = mnesia_recover:get_mnesia_downs(), LocalNode = node(), - case proplists:get_value(load_node, Props) of + %% load_node. Returns the name of the node that Mnesia loaded the table from. + %% The structure of the returned value is unspecified, but can be useful for debugging purposes. + LoadedFrom = proplists:get_value(load_node, Props), + case LoadedFrom of unknown -> {waiting, TabNodes -- [LocalNode | KnownDown]}; LocalNode -> - {disc, LocalNode}; + {loaded, local}; Node -> - {network, Node} + {loaded, Node} end. %% Regardless of what MFA is returned, consider it a success), diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index 7addb3823..a2a2dc649 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -26,8 +26,6 @@ -include_lib("emqx/include/logger.hrl"). -include("emqx_conf.hrl"). --define(DEFAULT_INIT_TXN_ID, -1). - start(_StartType, _StartArgs) -> try ok = init_conf() @@ -52,31 +50,32 @@ unset_config_loaded() -> %% This function is named 'override' due to historical reasons. get_override_config_file() -> Node = node(), + Data = #{ + wall_clock => erlang:statistics(wall_clock), + node => Node, + release => emqx_release:version_with_prefix() + }, case emqx_app:init_load_done() of false -> - {error, #{node => Node, msg => "init_conf_load_not_done"}}; + {error, Data#{msg => "init_conf_load_not_done"}}; true -> case erlang:whereis(emqx_config_handler) of undefined -> - {error, #{node => Node, msg => "emqx_config_handler_not_ready"}}; + {error, #{msg => "emqx_config_handler_not_ready"}}; _ -> Fun = fun() -> TnxId = emqx_cluster_rpc:get_node_tnx_id(Node), - WallClock = erlang:statistics(wall_clock), Conf = emqx_config_handler:get_raw_cluster_override_conf(), HasDeprecateFile = emqx_config:has_deprecated_file(), - #{ - wall_clock => WallClock, + Data#{ conf => Conf, tnx_id => TnxId, - node => Node, - has_deprecated_file => HasDeprecateFile, - release => emqx_release:version_with_prefix() + has_deprecated_file => HasDeprecateFile } end, case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of {atomic, Res} -> {ok, Res}; - {aborted, Reason} -> {error, #{node => Node, msg => Reason}} + {aborted, Reason} -> {error, Data#{msg => Reason}} end end end. @@ -105,7 +104,7 @@ init_load(TnxId) -> ok = emqx_app:set_config_loader(emqx_conf), ok; Module -> - ?SLOG(debug, #{ + ?SLOG(info, #{ msg => "skip_init_config_load", reason => "Some application has set another config loader", loader => Module @@ -126,7 +125,7 @@ sync_cluster_conf() -> case cluster_nodes() of [] -> %% The first core nodes is self. - ?SLOG(debug, #{ + ?SLOG(info, #{ msg => "skip_sync_cluster_conf", reason => "This is a single node, or the first node in the cluster" }), @@ -138,70 +137,94 @@ sync_cluster_conf() -> %% @private Some core nodes are running, try to sync the cluster config from them. sync_cluster_conf2(Nodes) -> {Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes), - {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), - NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), - case (Failed =/= [] orelse NotReady =/= []) of - true when Ready =/= [] -> - %% Some core nodes failed to reply. - Warning = #{ - nodes => Nodes, - failed => Failed, - not_ready => NotReady, - msg => "ignored_nodes_when_sync_cluster_conf" - }, - ?SLOG(warning, Warning); - true when Failed =/= [] -> - %% There are core nodes running but no one was able to reply. - ?SLOG(error, #{ - msg => "failed_to_sync_cluster_conf", - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }); - true -> - %% There are core nodes booting up - ?SLOG(info, #{ - msg => "peer_not_ready_for_config_sync", - reason => "The 'not_ready' peer node(s) are loading configs", - nodes => Nodes, - not_ready => NotReady - }); - false -> - ok - end, - case Ready of + {Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), + LogData = #{peer_nodes => Nodes, self_node => node()}, + case Failed ++ NotReady of [] -> - case should_proceed_with_boot() of - true -> - %% Act as if this node is alone, so it can - %% finish the boot sequence and load the - %% config for other nodes to copy it. - ?SLOG(info, #{ - msg => "skip_sync_cluster_conf", - loading_from_disk => true, - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }), - {ok, ?DEFAULT_INIT_TXN_ID}; - false -> - %% retry in some time - Jitter = rand:uniform(2000), - Timeout = 10000 + Jitter, - timer:sleep(Timeout), - ?SLOG(warning, #{ - msg => "sync_cluster_conf_retry", - timeout => Timeout, - nodes => Nodes, - failed => Failed, - not_ready => NotReady - }), - sync_cluster_conf() - end; + ok; _ -> + ?SLOG( + warning, + LogData#{ + msg => "cluster_config_fetch_failures", + failed_nodes => Failed, + booting_nodes => NotReady + } + ) + end, + MyRole = mria_rlog:role(), + case Ready of + [] when MyRole =:= replicant -> + %% replicant should never boot without copying from a core node + delay_and_retry(LogData#{role => replicant}); + [] -> + %% none of the nodes are ready, either delay-and-retry or boot without wait + TableStatus = tx_commit_table_status(), + sync_cluster_conf5(TableStatus, LogData); + _ -> + %% copy config from the best node in the Ready list sync_cluster_conf3(Ready) end. +%% None of the peer nodes are responsive, so we have to make a decision +%% based on the commit lagging (if the commit table is loaded). +%% +%% It could be that the peer nodes are also booting up, +%% however we cannot always wait because it may run into a dead-lock. +%% +%% Giving up wait here implies that some changes made to the peer node outside +%% of cluster-rpc MFAs will be lost. +%% e.g. stop all nodes, manually change cluster.hocon in one node +%% then boot all nodes around the same time, the changed cluster.hocon may +%% get lost if the node happen to copy config from others. +sync_cluster_conf5({loaded, local}, LogData) -> + ?SLOG(info, LogData#{ + msg => "skip_copy_cluster_config_from_peer_nodes", + explain => "Commit table loaded locally from disk, assuming that I have the latest config" + }), + {ok, ?DEFAULT_INIT_TXN_ID}; +sync_cluster_conf5({loaded, From}, LogData) -> + case get_commit_lag() of + #{my_id := MyId, latest := Latest} = Lagging when MyId >= Latest orelse Latest =:= 0 -> + ?SLOG(info, LogData#{ + msg => "skip_copy_cluster_config_from_peer_nodes", + explain => "I have the latest cluster config commit", + commit_loaded_from => From, + lagging_info => Lagging + }), + {ok, ?DEFAULT_INIT_TXN_ID}; + #{my_id := _MyId, latest := _Latest} = Lagging -> + delay_and_retry(LogData#{lagging_info => Lagging, commit_loaded_from => From}) + end; +sync_cluster_conf5({waiting, Waiting}, LogData) -> + %% this may never happen? since we waited for table before + delay_and_retry(LogData#{table_pending => Waiting}). + +get_commit_lag() -> + emqx_cluster_rpc:get_commit_lag(). + +delay_and_retry(LogData) -> + Timeout = sync_delay_timeout(), + ?SLOG(warning, LogData#{ + msg => "sync_cluster_conf_retry", + explain => + "Cannot boot alone due to potentially stale data. " + "Will try sync cluster config again after delay", + delay => Timeout + }), + timer:sleep(Timeout), + sync_cluster_conf(). + +-ifdef(TEST). +sync_delay_timeout() -> + Jitter = rand:uniform(200), + 1_000 + Jitter. +-else. +sync_delay_timeout() -> + Jitter = rand:uniform(2000), + 10_000 + Jitter. +-endif. + %% @private Filter out the nodes which are running a newer version than this node. sync_cluster_conf3(Ready) -> case lists:filter(fun is_older_or_same_version/1, Ready) of @@ -217,10 +240,10 @@ sync_cluster_conf3(Ready) -> ), ?SLOG(warning, #{ msg => "all_available_nodes_running_newer_version", - hint => - "Booting this node without syncing cluster config from peer core nodes " + explain => + "Booting this node without syncing cluster config from core nodes " "because other nodes are running a newer version", - peer_nodes => NodesAndVersions + versions => NodesAndVersions }), {ok, ?DEFAULT_INIT_TXN_ID}; Ready2 -> @@ -246,7 +269,7 @@ sync_cluster_conf4(Ready) -> [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready), #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, HasDeprecatedFile = has_deprecated_file(Info), - ?SLOG(debug, #{ + ?SLOG(info, #{ msg => "sync_cluster_conf_success", synced_from_node => Node, has_deprecated_file => HasDeprecatedFile, @@ -263,19 +286,9 @@ sync_cluster_conf4(Ready) -> ok = sync_data_from_node(Node), {ok, TnxId}. -should_proceed_with_boot() -> +tx_commit_table_status() -> TablesStatus = emqx_cluster_rpc:get_tables_status(), - LocalNode = node(), - case maps:get(?CLUSTER_COMMIT, TablesStatus) of - {disc, LocalNode} -> - %% Loading locally; let this node finish its boot sequence - %% so others can copy the config from this one. - true; - _ -> - %% Loading from another node or still waiting for nodes to - %% be up. Try again. - false - end. + maps:get(?CLUSTER_COMMIT, TablesStatus). conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true; conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) -> diff --git a/changes/ce/fix-11897.en.md b/changes/ce/fix-11897.en.md new file mode 100644 index 000000000..383129b4a --- /dev/null +++ b/changes/ce/fix-11897.en.md @@ -0,0 +1 @@ +Fix config sync wait-loop race condition when cluster nodes boot around the same time. From 8540566eba02560880d16192060e107a5a6ab454 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 8 Nov 2023 13:02:19 -0300 Subject: [PATCH 4/7] chore: prepare to tag `e5.3.1-alpha.5` --- apps/emqx/include/emqx_release.hrl | 2 +- deploy/charts/emqx-enterprise/Chart.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 87a4b47e0..1bec82504 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.3.1-alpha.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.3.1-alpha.4"). +-define(EMQX_RELEASE_EE, "5.3.1-alpha.5"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/deploy/charts/emqx-enterprise/Chart.yaml b/deploy/charts/emqx-enterprise/Chart.yaml index 4211f37e4..4f385a9ba 100644 --- a/deploy/charts/emqx-enterprise/Chart.yaml +++ b/deploy/charts/emqx-enterprise/Chart.yaml @@ -14,8 +14,8 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. -version: 5.3.1-alpha.4 +version: 5.3.1-alpha.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: 5.3.1-alpha.4 +appVersion: 5.3.1-alpha.5 From 1e3500ffd2d4dc2f01aaafaa10734d1ec3cd2ee6 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Wed, 8 Nov 2023 21:19:22 +0100 Subject: [PATCH 5/7] fix(emqx_conf_app): ensure log data fields Co-authored-by: Thales Macedo Garitezi --- apps/emqx_conf/src/emqx_conf_app.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl index a2a2dc649..74a7a8f2e 100644 --- a/apps/emqx_conf/src/emqx_conf_app.erl +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -61,7 +61,7 @@ get_override_config_file() -> true -> case erlang:whereis(emqx_config_handler) of undefined -> - {error, #{msg => "emqx_config_handler_not_ready"}}; + {error, Data#{msg => "emqx_config_handler_not_ready"}}; _ -> Fun = fun() -> TnxId = emqx_cluster_rpc:get_node_tnx_id(Node), From 5d25daee88ec3db7ce86938ba3a8b44cee34b2de Mon Sep 17 00:00:00 2001 From: Kinplemelon Date: Thu, 9 Nov 2023 10:17:58 +0800 Subject: [PATCH 6/7] chore: upgrade dashboard to e1.3.1 for ee and v1.5.1 for ce --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 1d9137c41..d0d9127b8 100644 --- a/Makefile +++ b/Makefile @@ -20,8 +20,8 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 -export EMQX_DASHBOARD_VERSION ?= v1.5.0 -export EMQX_EE_DASHBOARD_VERSION ?= e1.3.0 +export EMQX_DASHBOARD_VERSION ?= v1.5.1 +export EMQX_EE_DASHBOARD_VERSION ?= e1.3.1 PROFILE ?= emqx REL_PROFILES := emqx emqx-enterprise From eabd09051aa77714c53ed912d459d94003e1191d Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 9 Nov 2023 13:54:57 -0300 Subject: [PATCH 7/7] feat(actions_api): add `/action_types` API --- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 30 +++++++++++++++++-- .../src/schema/emqx_bridge_v2_schema.erl | 19 ++++++++++++ .../test/emqx_bridge_v2_api_SUITE.erl | 28 +++++++++++++++-- rel/i18n/emqx_bridge_v2_api.hocon | 6 ++++ 4 files changed, 78 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index a8d634963..1935a1b5a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -40,7 +40,8 @@ '/actions/:id/enable/:enable'/2, '/actions/:id/:operation'/2, '/nodes/:node/actions/:id/:operation'/2, - '/actions_probe'/2 + '/actions_probe'/2, + '/action_types'/2 ]). %% BpAPI @@ -79,7 +80,8 @@ paths() -> "/actions/:id/enable/:enable", "/actions/:id/:operation", "/nodes/:node/actions/:id/:operation", - "/actions_probe" + "/actions_probe", + "/action_types" ]. error_schema(Code, Message) when is_atom(Code) -> @@ -338,6 +340,27 @@ schema("/actions_probe") -> 400 => error_schema(['TEST_FAILED'], "bridge test failed") } } + }; +schema("/action_types") -> + #{ + 'operationId' => '/action_types', + get => #{ + tags => [<<"actions">>], + desc => ?DESC("desc_api10"), + summary => <<"List available action types">>, + responses => #{ + 200 => emqx_dashboard_swagger:schema_with_examples( + array(emqx_bridge_v2_schema:types_sc()), + #{ + <<"types">> => + #{ + summary => <<"Action types">>, + value => emqx_bridge_v2_schema:types() + } + } + ) + } + } }. '/actions'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) -> @@ -486,6 +509,9 @@ schema("/actions_probe") -> redact(BadRequest) end. +'/action_types'(get, _Request) -> + ?OK(emqx_bridge_v2_schema:types()). + maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) -> case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, #{raw_config := RawConf}} -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index d6d8eb9a1..1d059903a 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -30,9 +30,18 @@ post_request/0 ]). +-export([types/0, types_sc/0]). + -export([enterprise_api_schemas/1]). +-export_type([action_type/0]). + +%% Should we explicitly list them here so dialyzer may be more helpful? +-type action_type() :: atom(). + -if(?EMQX_RELEASE_EDITION == ee). +-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when + Method :: string(). enterprise_api_schemas(Method) -> %% We *must* do this to ensure the module is really loaded, especially when we use %% `call_hocon' from `nodetool' to generate initial configurations. @@ -55,6 +64,8 @@ enterprise_fields_actions() -> -else. +-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when + Method :: string(). enterprise_api_schemas(_Method) -> []. enterprise_fields_actions() -> []. @@ -129,6 +140,14 @@ desc(actions) -> desc(_) -> undefined. +-spec types() -> [action_type()]. +types() -> + proplists:get_keys(?MODULE:fields(actions)). + +-spec types_sc() -> ?ENUM([action_type()]). +types_sc() -> + hoconsc:enum(types()). + -ifdef(TEST). -include_lib("hocon/include/hocon_types.hrl"). schema_homogeneous_test() -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index bf2ac51a2..9879fe1e6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -236,6 +236,14 @@ end_per_group(_, Config) -> emqx_cth_suite:stop(?config(group_apps, Config)), ok. +init_per_testcase(t_action_types, Config) -> + case ?config(cluster_nodes, Config) of + undefined -> + init_mocks(); + Nodes -> + [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] + end, + Config; init_per_testcase(_TestCase, Config) -> case ?config(cluster_nodes, Config) of undefined -> @@ -260,8 +268,14 @@ end_per_testcase(_TestCase, Config) -> -define(CONNECTOR_IMPL, emqx_bridge_v2_dummy_connector). init_mocks() -> - meck:new(emqx_connector_ee_schema, [passthrough, no_link]), - meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), + case emqx_release:edition() of + ee -> + meck:new(emqx_connector_ee_schema, [passthrough, no_link]), + meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL), + ok; + ce -> + ok + end, meck:new(?CONNECTOR_IMPL, [non_strict, no_link]), meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible), meck:expect( @@ -289,7 +303,7 @@ init_mocks() -> ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) -> emqx_bridge_v2:get_channels_for_connector(ResId) end), - [?CONNECTOR_IMPL, emqx_connector_ee_schema]. + ok. clear_resources() -> lists:foreach( @@ -886,6 +900,14 @@ t_cascade_delete_actions(Config) -> ), {ok, 200, []} = request_json(get, uri([?ROOT]), Config). +t_action_types(Config) -> + Res = request_json(get, uri(["action_types"]), Config), + ?assertMatch({ok, 200, _}, Res), + {ok, 200, Types} = Res, + ?assert(is_list(Types), #{types => Types}), + ?assert(lists:all(fun is_binary/1, Types), #{types => Types}), + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], diff --git a/rel/i18n/emqx_bridge_v2_api.hocon b/rel/i18n/emqx_bridge_v2_api.hocon index 1f2c2bd8d..23a75712a 100644 --- a/rel/i18n/emqx_bridge_v2_api.hocon +++ b/rel/i18n/emqx_bridge_v2_api.hocon @@ -54,6 +54,12 @@ desc_api9.desc: desc_api9.label: """Test Bridge Creation""" +desc_api10.desc: +"""Lists the available action types.""" + +desc_api10.label: +"""List action types""" + desc_bridge_metrics.desc: """Get bridge metrics by id."""