Merge branch 'release-53' into sync-r53-to-m-20231109
This commit is contained in:
commit
371a49304d
4
Makefile
4
Makefile
|
@ -20,8 +20,8 @@ endif
|
||||||
|
|
||||||
# Dashboard version
|
# Dashboard version
|
||||||
# from https://github.com/emqx/emqx-dashboard5
|
# from https://github.com/emqx/emqx-dashboard5
|
||||||
export EMQX_DASHBOARD_VERSION ?= v1.5.0
|
export EMQX_DASHBOARD_VERSION ?= v1.5.1
|
||||||
export EMQX_EE_DASHBOARD_VERSION ?= e1.3.0
|
export EMQX_EE_DASHBOARD_VERSION ?= e1.3.1
|
||||||
|
|
||||||
PROFILE ?= emqx
|
PROFILE ?= emqx
|
||||||
REL_PROFILES := emqx emqx-enterprise
|
REL_PROFILES := emqx emqx-enterprise
|
||||||
|
|
|
@ -35,7 +35,7 @@
|
||||||
-define(EMQX_RELEASE_CE, "5.3.1-alpha.1").
|
-define(EMQX_RELEASE_CE, "5.3.1-alpha.1").
|
||||||
|
|
||||||
%% Enterprise edition
|
%% Enterprise edition
|
||||||
-define(EMQX_RELEASE_EE, "5.3.1-alpha.4").
|
-define(EMQX_RELEASE_EE, "5.3.1-alpha.5").
|
||||||
|
|
||||||
%% The HTTP API version
|
%% The HTTP API version
|
||||||
-define(EMQX_API_VERSION, "5.0").
|
-define(EMQX_API_VERSION, "5.0").
|
||||||
|
|
|
@ -40,7 +40,8 @@
|
||||||
'/actions/:id/enable/:enable'/2,
|
'/actions/:id/enable/:enable'/2,
|
||||||
'/actions/:id/:operation'/2,
|
'/actions/:id/:operation'/2,
|
||||||
'/nodes/:node/actions/:id/:operation'/2,
|
'/nodes/:node/actions/:id/:operation'/2,
|
||||||
'/actions_probe'/2
|
'/actions_probe'/2,
|
||||||
|
'/action_types'/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% BpAPI
|
%% BpAPI
|
||||||
|
@ -79,7 +80,8 @@ paths() ->
|
||||||
"/actions/:id/enable/:enable",
|
"/actions/:id/enable/:enable",
|
||||||
"/actions/:id/:operation",
|
"/actions/:id/:operation",
|
||||||
"/nodes/:node/actions/:id/:operation",
|
"/nodes/:node/actions/:id/:operation",
|
||||||
"/actions_probe"
|
"/actions_probe",
|
||||||
|
"/action_types"
|
||||||
].
|
].
|
||||||
|
|
||||||
error_schema(Code, Message) when is_atom(Code) ->
|
error_schema(Code, Message) when is_atom(Code) ->
|
||||||
|
@ -338,6 +340,27 @@ schema("/actions_probe") ->
|
||||||
400 => error_schema(['TEST_FAILED'], "bridge test failed")
|
400 => error_schema(['TEST_FAILED'], "bridge test failed")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
schema("/action_types") ->
|
||||||
|
#{
|
||||||
|
'operationId' => '/action_types',
|
||||||
|
get => #{
|
||||||
|
tags => [<<"actions">>],
|
||||||
|
desc => ?DESC("desc_api10"),
|
||||||
|
summary => <<"List available action types">>,
|
||||||
|
responses => #{
|
||||||
|
200 => emqx_dashboard_swagger:schema_with_examples(
|
||||||
|
array(emqx_bridge_v2_schema:types_sc()),
|
||||||
|
#{
|
||||||
|
<<"types">> =>
|
||||||
|
#{
|
||||||
|
summary => <<"Action types">>,
|
||||||
|
value => emqx_bridge_v2_schema:types()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
'/actions'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
|
'/actions'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) ->
|
||||||
|
@ -486,6 +509,9 @@ schema("/actions_probe") ->
|
||||||
redact(BadRequest)
|
redact(BadRequest)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
'/action_types'(get, _Request) ->
|
||||||
|
?OK(emqx_bridge_v2_schema:types()).
|
||||||
|
|
||||||
maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) ->
|
maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) ->
|
||||||
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
||||||
{ok, #{raw_config := RawConf}} ->
|
{ok, #{raw_config := RawConf}} ->
|
||||||
|
@ -709,8 +735,10 @@ format_resource(
|
||||||
#{
|
#{
|
||||||
type := Type,
|
type := Type,
|
||||||
name := Name,
|
name := Name,
|
||||||
|
status := Status,
|
||||||
|
error := Error,
|
||||||
raw_config := RawConf,
|
raw_config := RawConf,
|
||||||
resource_data := ResourceData
|
resource_data := _ResourceData
|
||||||
},
|
},
|
||||||
Node
|
Node
|
||||||
) ->
|
) ->
|
||||||
|
@ -719,14 +747,16 @@ format_resource(
|
||||||
RawConf#{
|
RawConf#{
|
||||||
type => Type,
|
type => Type,
|
||||||
name => maps:get(<<"name">>, RawConf, Name),
|
name => maps:get(<<"name">>, RawConf, Name),
|
||||||
node => Node
|
node => Node,
|
||||||
|
status => Status,
|
||||||
|
error => Error
|
||||||
},
|
},
|
||||||
format_resource_data(ResourceData)
|
format_bridge_status_and_error(#{status => Status, error => Error})
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
format_resource_data(ResData) ->
|
format_bridge_status_and_error(Data) ->
|
||||||
maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)).
|
maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], Data)).
|
||||||
|
|
||||||
format_resource_data(error, undefined, Result) ->
|
format_resource_data(error, undefined, Result) ->
|
||||||
Result;
|
Result;
|
||||||
|
|
|
@ -30,9 +30,18 @@
|
||||||
post_request/0
|
post_request/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([types/0, types_sc/0]).
|
||||||
|
|
||||||
-export([enterprise_api_schemas/1]).
|
-export([enterprise_api_schemas/1]).
|
||||||
|
|
||||||
|
-export_type([action_type/0]).
|
||||||
|
|
||||||
|
%% Should we explicitly list them here so dialyzer may be more helpful?
|
||||||
|
-type action_type() :: atom().
|
||||||
|
|
||||||
-if(?EMQX_RELEASE_EDITION == ee).
|
-if(?EMQX_RELEASE_EDITION == ee).
|
||||||
|
-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when
|
||||||
|
Method :: string().
|
||||||
enterprise_api_schemas(Method) ->
|
enterprise_api_schemas(Method) ->
|
||||||
%% We *must* do this to ensure the module is really loaded, especially when we use
|
%% We *must* do this to ensure the module is really loaded, especially when we use
|
||||||
%% `call_hocon' from `nodetool' to generate initial configurations.
|
%% `call_hocon' from `nodetool' to generate initial configurations.
|
||||||
|
@ -55,6 +64,8 @@ enterprise_fields_actions() ->
|
||||||
|
|
||||||
-else.
|
-else.
|
||||||
|
|
||||||
|
-spec enterprise_api_schemas(Method) -> [{_Type :: binary(), ?R_REF(module(), Method)}] when
|
||||||
|
Method :: string().
|
||||||
enterprise_api_schemas(_Method) -> [].
|
enterprise_api_schemas(_Method) -> [].
|
||||||
|
|
||||||
enterprise_fields_actions() -> [].
|
enterprise_fields_actions() -> [].
|
||||||
|
@ -129,6 +140,14 @@ desc(actions) ->
|
||||||
desc(_) ->
|
desc(_) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
||||||
|
-spec types() -> [action_type()].
|
||||||
|
types() ->
|
||||||
|
proplists:get_keys(?MODULE:fields(actions)).
|
||||||
|
|
||||||
|
-spec types_sc() -> ?ENUM([action_type()]).
|
||||||
|
types_sc() ->
|
||||||
|
hoconsc:enum(types()).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-include_lib("hocon/include/hocon_types.hrl").
|
-include_lib("hocon/include/hocon_types.hrl").
|
||||||
schema_homogeneous_test() ->
|
schema_homogeneous_test() ->
|
||||||
|
|
|
@ -236,6 +236,14 @@ end_per_group(_, Config) ->
|
||||||
emqx_cth_suite:stop(?config(group_apps, Config)),
|
emqx_cth_suite:stop(?config(group_apps, Config)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(t_action_types, Config) ->
|
||||||
|
case ?config(cluster_nodes, Config) of
|
||||||
|
undefined ->
|
||||||
|
init_mocks();
|
||||||
|
Nodes ->
|
||||||
|
[erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
|
||||||
|
end,
|
||||||
|
Config;
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
case ?config(cluster_nodes, Config) of
|
case ?config(cluster_nodes, Config) of
|
||||||
undefined ->
|
undefined ->
|
||||||
|
@ -260,8 +268,14 @@ end_per_testcase(_TestCase, Config) ->
|
||||||
|
|
||||||
-define(CONNECTOR_IMPL, emqx_bridge_v2_dummy_connector).
|
-define(CONNECTOR_IMPL, emqx_bridge_v2_dummy_connector).
|
||||||
init_mocks() ->
|
init_mocks() ->
|
||||||
meck:new(emqx_connector_ee_schema, [passthrough, no_link]),
|
case emqx_release:edition() of
|
||||||
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL),
|
ee ->
|
||||||
|
meck:new(emqx_connector_ee_schema, [passthrough, no_link]),
|
||||||
|
meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR_IMPL),
|
||||||
|
ok;
|
||||||
|
ce ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
|
meck:new(?CONNECTOR_IMPL, [non_strict, no_link]),
|
||||||
meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible),
|
meck:expect(?CONNECTOR_IMPL, callback_mode, 0, async_if_possible),
|
||||||
meck:expect(
|
meck:expect(
|
||||||
|
@ -289,7 +303,7 @@ init_mocks() ->
|
||||||
ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
|
ok = meck:expect(?CONNECTOR_IMPL, on_get_channels, fun(ResId) ->
|
||||||
emqx_bridge_v2:get_channels_for_connector(ResId)
|
emqx_bridge_v2:get_channels_for_connector(ResId)
|
||||||
end),
|
end),
|
||||||
[?CONNECTOR_IMPL, emqx_connector_ee_schema].
|
ok.
|
||||||
|
|
||||||
clear_resources() ->
|
clear_resources() ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
@ -886,6 +900,14 @@ t_cascade_delete_actions(Config) ->
|
||||||
),
|
),
|
||||||
{ok, 200, []} = request_json(get, uri([?ROOT]), Config).
|
{ok, 200, []} = request_json(get, uri([?ROOT]), Config).
|
||||||
|
|
||||||
|
t_action_types(Config) ->
|
||||||
|
Res = request_json(get, uri(["action_types"]), Config),
|
||||||
|
?assertMatch({ok, 200, _}, Res),
|
||||||
|
{ok, 200, Types} = Res,
|
||||||
|
?assert(is_list(Types), #{types => Types}),
|
||||||
|
?assert(lists:all(fun is_binary/1, Types), #{types => Types}),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%% helpers
|
%%% helpers
|
||||||
listen_on_random_port() ->
|
listen_on_random_port() ->
|
||||||
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
|
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
|
||||||
|
|
|
@ -145,6 +145,39 @@ create_bridge(Config, Overrides) ->
|
||||||
ct:pal("creating bridge with config: ~p", [BridgeConfig]),
|
ct:pal("creating bridge with config: ~p", [BridgeConfig]),
|
||||||
emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig).
|
emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig).
|
||||||
|
|
||||||
|
list_bridges_api() ->
|
||||||
|
Params = [],
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["actions"]),
|
||||||
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Opts = #{return_all => true},
|
||||||
|
ct:pal("listing bridges (via http)"),
|
||||||
|
Res =
|
||||||
|
case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
|
||||||
|
{ok, {Status, Headers, Body0}} ->
|
||||||
|
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end,
|
||||||
|
ct:pal("list bridges result: ~p", [Res]),
|
||||||
|
Res.
|
||||||
|
|
||||||
|
get_bridge_api(BridgeType, BridgeName) ->
|
||||||
|
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
|
||||||
|
Params = [],
|
||||||
|
Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]),
|
||||||
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
Opts = #{return_all => true},
|
||||||
|
ct:pal("get bridge ~p (via http)", [{BridgeType, BridgeName}]),
|
||||||
|
Res =
|
||||||
|
case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
|
||||||
|
{ok, {Status, Headers, Body0}} ->
|
||||||
|
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end,
|
||||||
|
ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]),
|
||||||
|
Res.
|
||||||
|
|
||||||
create_bridge_api(Config) ->
|
create_bridge_api(Config) ->
|
||||||
create_bridge_api(Config, _Overrides = #{}).
|
create_bridge_api(Config, _Overrides = #{}).
|
||||||
|
|
||||||
|
|
|
@ -29,25 +29,27 @@ all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
_ = application:load(emqx_conf),
|
Apps = emqx_cth_suite:start(
|
||||||
ok = emqx_common_test_helpers:start_apps(apps_to_start_and_stop()),
|
[
|
||||||
application:ensure_all_started(telemetry),
|
emqx,
|
||||||
application:ensure_all_started(wolff),
|
emqx_conf,
|
||||||
application:ensure_all_started(brod),
|
emqx_connector,
|
||||||
|
emqx_bridge_kafka,
|
||||||
|
emqx_bridge,
|
||||||
|
emqx_rule_engine,
|
||||||
|
emqx_management,
|
||||||
|
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
||||||
|
],
|
||||||
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||||
|
),
|
||||||
|
{ok, _} = emqx_common_test_http:create_default_app(),
|
||||||
emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(),
|
emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(),
|
||||||
Config.
|
[{apps, Apps} | Config].
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(Config) ->
|
||||||
emqx_common_test_helpers:stop_apps(apps_to_start_and_stop()).
|
Apps = ?config(apps, Config),
|
||||||
|
emqx_cth_suite:stop(Apps),
|
||||||
apps_to_start_and_stop() ->
|
ok.
|
||||||
[
|
|
||||||
emqx,
|
|
||||||
emqx_conf,
|
|
||||||
emqx_connector,
|
|
||||||
emqx_bridge,
|
|
||||||
emqx_rule_engine
|
|
||||||
].
|
|
||||||
|
|
||||||
t_create_remove_list(_) ->
|
t_create_remove_list(_) ->
|
||||||
[] = emqx_bridge_v2:list(),
|
[] = emqx_bridge_v2:list(),
|
||||||
|
@ -165,6 +167,24 @@ t_unknown_topic(_Config) ->
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_, 200, _}, _, [
|
||||||
|
#{
|
||||||
|
<<"status">> := <<"disconnected">>,
|
||||||
|
<<"node_status">> := [#{<<"status">> := <<"disconnected">>}]
|
||||||
|
}
|
||||||
|
]}},
|
||||||
|
emqx_bridge_v2_testlib:list_bridges_api()
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
{ok,
|
||||||
|
{{_, 200, _}, _, #{
|
||||||
|
<<"status">> := <<"disconnected">>,
|
||||||
|
<<"node_status">> := [#{<<"status">> := <<"disconnected">>}]
|
||||||
|
}}},
|
||||||
|
emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName)
|
||||||
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
check_send_message_with_bridge(BridgeName) ->
|
check_send_message_with_bridge(BridgeName) ->
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
|
|
||||||
-define(CLUSTER_MFA, cluster_rpc_mfa).
|
-define(CLUSTER_MFA, cluster_rpc_mfa).
|
||||||
-define(CLUSTER_COMMIT, cluster_rpc_commit).
|
-define(CLUSTER_COMMIT, cluster_rpc_commit).
|
||||||
|
-define(DEFAULT_INIT_TXN_ID, -1).
|
||||||
|
|
||||||
-record(cluster_rpc_mfa, {
|
-record(cluster_rpc_mfa, {
|
||||||
tnx_id :: pos_integer(),
|
tnx_id :: pos_integer(),
|
||||||
|
|
|
@ -44,7 +44,9 @@
|
||||||
read_next_mfa/1,
|
read_next_mfa/1,
|
||||||
trans_query/1,
|
trans_query/1,
|
||||||
trans_status/0,
|
trans_status/0,
|
||||||
on_leave_clean/0
|
on_leave_clean/0,
|
||||||
|
get_commit_lag/0,
|
||||||
|
get_commit_lag/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
@ -231,13 +233,29 @@ make_initiate_call_req(M, F, A) ->
|
||||||
-spec get_node_tnx_id(node()) -> integer().
|
-spec get_node_tnx_id(node()) -> integer().
|
||||||
get_node_tnx_id(Node) ->
|
get_node_tnx_id(Node) ->
|
||||||
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
|
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
|
||||||
[] -> -1;
|
[] -> ?DEFAULT_INIT_TXN_ID;
|
||||||
[#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId
|
[#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Return the commit lag of *this* node.
|
||||||
|
-spec get_commit_lag() -> #{my_id := pos_integer(), latest := pos_integer()}.
|
||||||
|
get_commit_lag() ->
|
||||||
|
{atomic, Result} = transaction(fun ?MODULE:get_commit_lag/1, [node()]),
|
||||||
|
Result.
|
||||||
|
|
||||||
|
get_commit_lag(Node) ->
|
||||||
|
LatestId = get_cluster_tnx_id(),
|
||||||
|
LatestNode =
|
||||||
|
case mnesia:read(?CLUSTER_MFA, LatestId) of
|
||||||
|
[#?CLUSTER_MFA{initiator = N}] -> N;
|
||||||
|
_ -> undefined
|
||||||
|
end,
|
||||||
|
MyId = get_node_tnx_id(Node),
|
||||||
|
#{my_id => MyId, latest => LatestId, latest_node => LatestNode}.
|
||||||
|
|
||||||
%% Checks whether the Mnesia tables used by this module are waiting to
|
%% Checks whether the Mnesia tables used by this module are waiting to
|
||||||
%% be loaded and from where.
|
%% be loaded and from where.
|
||||||
-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {disc | network, node()}}.
|
-spec get_tables_status() -> #{atom() => {waiting, [node()]} | {loaded, local | node()}}.
|
||||||
get_tables_status() ->
|
get_tables_status() ->
|
||||||
maps:from_list([
|
maps:from_list([
|
||||||
{Tab, do_get_tables_status(Tab)}
|
{Tab, do_get_tables_status(Tab)}
|
||||||
|
@ -249,13 +267,16 @@ do_get_tables_status(Tab) ->
|
||||||
TabNodes = proplists:get_value(all_nodes, Props),
|
TabNodes = proplists:get_value(all_nodes, Props),
|
||||||
KnownDown = mnesia_recover:get_mnesia_downs(),
|
KnownDown = mnesia_recover:get_mnesia_downs(),
|
||||||
LocalNode = node(),
|
LocalNode = node(),
|
||||||
case proplists:get_value(load_node, Props) of
|
%% load_node. Returns the name of the node that Mnesia loaded the table from.
|
||||||
|
%% The structure of the returned value is unspecified, but can be useful for debugging purposes.
|
||||||
|
LoadedFrom = proplists:get_value(load_node, Props),
|
||||||
|
case LoadedFrom of
|
||||||
unknown ->
|
unknown ->
|
||||||
{waiting, TabNodes -- [LocalNode | KnownDown]};
|
{waiting, TabNodes -- [LocalNode | KnownDown]};
|
||||||
LocalNode ->
|
LocalNode ->
|
||||||
{disc, LocalNode};
|
{loaded, local};
|
||||||
Node ->
|
Node ->
|
||||||
{network, Node}
|
{loaded, Node}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Regardless of what MFA is returned, consider it a success),
|
%% Regardless of what MFA is returned, consider it a success),
|
||||||
|
|
|
@ -26,8 +26,6 @@
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include("emqx_conf.hrl").
|
-include("emqx_conf.hrl").
|
||||||
|
|
||||||
-define(DEFAULT_INIT_TXN_ID, -1).
|
|
||||||
|
|
||||||
start(_StartType, _StartArgs) ->
|
start(_StartType, _StartArgs) ->
|
||||||
try
|
try
|
||||||
ok = init_conf()
|
ok = init_conf()
|
||||||
|
@ -52,31 +50,32 @@ unset_config_loaded() ->
|
||||||
%% This function is named 'override' due to historical reasons.
|
%% This function is named 'override' due to historical reasons.
|
||||||
get_override_config_file() ->
|
get_override_config_file() ->
|
||||||
Node = node(),
|
Node = node(),
|
||||||
|
Data = #{
|
||||||
|
wall_clock => erlang:statistics(wall_clock),
|
||||||
|
node => Node,
|
||||||
|
release => emqx_release:version_with_prefix()
|
||||||
|
},
|
||||||
case emqx_app:init_load_done() of
|
case emqx_app:init_load_done() of
|
||||||
false ->
|
false ->
|
||||||
{error, #{node => Node, msg => "init_conf_load_not_done"}};
|
{error, Data#{msg => "init_conf_load_not_done"}};
|
||||||
true ->
|
true ->
|
||||||
case erlang:whereis(emqx_config_handler) of
|
case erlang:whereis(emqx_config_handler) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, #{node => Node, msg => "emqx_config_handler_not_ready"}};
|
{error, Data#{msg => "emqx_config_handler_not_ready"}};
|
||||||
_ ->
|
_ ->
|
||||||
Fun = fun() ->
|
Fun = fun() ->
|
||||||
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
|
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
|
||||||
WallClock = erlang:statistics(wall_clock),
|
|
||||||
Conf = emqx_config_handler:get_raw_cluster_override_conf(),
|
Conf = emqx_config_handler:get_raw_cluster_override_conf(),
|
||||||
HasDeprecateFile = emqx_config:has_deprecated_file(),
|
HasDeprecateFile = emqx_config:has_deprecated_file(),
|
||||||
#{
|
Data#{
|
||||||
wall_clock => WallClock,
|
|
||||||
conf => Conf,
|
conf => Conf,
|
||||||
tnx_id => TnxId,
|
tnx_id => TnxId,
|
||||||
node => Node,
|
has_deprecated_file => HasDeprecateFile
|
||||||
has_deprecated_file => HasDeprecateFile,
|
|
||||||
release => emqx_release:version_with_prefix()
|
|
||||||
}
|
}
|
||||||
end,
|
end,
|
||||||
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
|
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
|
||||||
{atomic, Res} -> {ok, Res};
|
{atomic, Res} -> {ok, Res};
|
||||||
{aborted, Reason} -> {error, #{node => Node, msg => Reason}}
|
{aborted, Reason} -> {error, Data#{msg => Reason}}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
@ -105,7 +104,7 @@ init_load(TnxId) ->
|
||||||
ok = emqx_app:set_config_loader(emqx_conf),
|
ok = emqx_app:set_config_loader(emqx_conf),
|
||||||
ok;
|
ok;
|
||||||
Module ->
|
Module ->
|
||||||
?SLOG(debug, #{
|
?SLOG(info, #{
|
||||||
msg => "skip_init_config_load",
|
msg => "skip_init_config_load",
|
||||||
reason => "Some application has set another config loader",
|
reason => "Some application has set another config loader",
|
||||||
loader => Module
|
loader => Module
|
||||||
|
@ -126,7 +125,7 @@ sync_cluster_conf() ->
|
||||||
case cluster_nodes() of
|
case cluster_nodes() of
|
||||||
[] ->
|
[] ->
|
||||||
%% The first core nodes is self.
|
%% The first core nodes is self.
|
||||||
?SLOG(debug, #{
|
?SLOG(info, #{
|
||||||
msg => "skip_sync_cluster_conf",
|
msg => "skip_sync_cluster_conf",
|
||||||
reason => "This is a single node, or the first node in the cluster"
|
reason => "This is a single node, or the first node in the cluster"
|
||||||
}),
|
}),
|
||||||
|
@ -138,70 +137,94 @@ sync_cluster_conf() ->
|
||||||
%% @private Some core nodes are running, try to sync the cluster config from them.
|
%% @private Some core nodes are running, try to sync the cluster config from them.
|
||||||
sync_cluster_conf2(Nodes) ->
|
sync_cluster_conf2(Nodes) ->
|
||||||
{Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
|
{Results, Failed} = emqx_conf_proto_v3:get_override_config_file(Nodes),
|
||||||
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
{Ready, NotReady} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
|
||||||
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
|
LogData = #{peer_nodes => Nodes, self_node => node()},
|
||||||
case (Failed =/= [] orelse NotReady =/= []) of
|
case Failed ++ NotReady of
|
||||||
true when Ready =/= [] ->
|
|
||||||
%% Some core nodes failed to reply.
|
|
||||||
Warning = #{
|
|
||||||
nodes => Nodes,
|
|
||||||
failed => Failed,
|
|
||||||
not_ready => NotReady,
|
|
||||||
msg => "ignored_nodes_when_sync_cluster_conf"
|
|
||||||
},
|
|
||||||
?SLOG(warning, Warning);
|
|
||||||
true when Failed =/= [] ->
|
|
||||||
%% There are core nodes running but no one was able to reply.
|
|
||||||
?SLOG(error, #{
|
|
||||||
msg => "failed_to_sync_cluster_conf",
|
|
||||||
nodes => Nodes,
|
|
||||||
failed => Failed,
|
|
||||||
not_ready => NotReady
|
|
||||||
});
|
|
||||||
true ->
|
|
||||||
%% There are core nodes booting up
|
|
||||||
?SLOG(info, #{
|
|
||||||
msg => "peer_not_ready_for_config_sync",
|
|
||||||
reason => "The 'not_ready' peer node(s) are loading configs",
|
|
||||||
nodes => Nodes,
|
|
||||||
not_ready => NotReady
|
|
||||||
});
|
|
||||||
false ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
case Ready of
|
|
||||||
[] ->
|
[] ->
|
||||||
case should_proceed_with_boot() of
|
ok;
|
||||||
true ->
|
|
||||||
%% Act as if this node is alone, so it can
|
|
||||||
%% finish the boot sequence and load the
|
|
||||||
%% config for other nodes to copy it.
|
|
||||||
?SLOG(info, #{
|
|
||||||
msg => "skip_sync_cluster_conf",
|
|
||||||
loading_from_disk => true,
|
|
||||||
nodes => Nodes,
|
|
||||||
failed => Failed,
|
|
||||||
not_ready => NotReady
|
|
||||||
}),
|
|
||||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
|
||||||
false ->
|
|
||||||
%% retry in some time
|
|
||||||
Jitter = rand:uniform(2000),
|
|
||||||
Timeout = 10000 + Jitter,
|
|
||||||
timer:sleep(Timeout),
|
|
||||||
?SLOG(warning, #{
|
|
||||||
msg => "sync_cluster_conf_retry",
|
|
||||||
timeout => Timeout,
|
|
||||||
nodes => Nodes,
|
|
||||||
failed => Failed,
|
|
||||||
not_ready => NotReady
|
|
||||||
}),
|
|
||||||
sync_cluster_conf()
|
|
||||||
end;
|
|
||||||
_ ->
|
_ ->
|
||||||
|
?SLOG(
|
||||||
|
warning,
|
||||||
|
LogData#{
|
||||||
|
msg => "cluster_config_fetch_failures",
|
||||||
|
failed_nodes => Failed,
|
||||||
|
booting_nodes => NotReady
|
||||||
|
}
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
MyRole = mria_rlog:role(),
|
||||||
|
case Ready of
|
||||||
|
[] when MyRole =:= replicant ->
|
||||||
|
%% replicant should never boot without copying from a core node
|
||||||
|
delay_and_retry(LogData#{role => replicant});
|
||||||
|
[] ->
|
||||||
|
%% none of the nodes are ready, either delay-and-retry or boot without wait
|
||||||
|
TableStatus = tx_commit_table_status(),
|
||||||
|
sync_cluster_conf5(TableStatus, LogData);
|
||||||
|
_ ->
|
||||||
|
%% copy config from the best node in the Ready list
|
||||||
sync_cluster_conf3(Ready)
|
sync_cluster_conf3(Ready)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% None of the peer nodes are responsive, so we have to make a decision
|
||||||
|
%% based on the commit lagging (if the commit table is loaded).
|
||||||
|
%%
|
||||||
|
%% It could be that the peer nodes are also booting up,
|
||||||
|
%% however we cannot always wait because it may run into a dead-lock.
|
||||||
|
%%
|
||||||
|
%% Giving up wait here implies that some changes made to the peer node outside
|
||||||
|
%% of cluster-rpc MFAs will be lost.
|
||||||
|
%% e.g. stop all nodes, manually change cluster.hocon in one node
|
||||||
|
%% then boot all nodes around the same time, the changed cluster.hocon may
|
||||||
|
%% get lost if the node happen to copy config from others.
|
||||||
|
sync_cluster_conf5({loaded, local}, LogData) ->
|
||||||
|
?SLOG(info, LogData#{
|
||||||
|
msg => "skip_copy_cluster_config_from_peer_nodes",
|
||||||
|
explain => "Commit table loaded locally from disk, assuming that I have the latest config"
|
||||||
|
}),
|
||||||
|
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||||
|
sync_cluster_conf5({loaded, From}, LogData) ->
|
||||||
|
case get_commit_lag() of
|
||||||
|
#{my_id := MyId, latest := Latest} = Lagging when MyId >= Latest orelse Latest =:= 0 ->
|
||||||
|
?SLOG(info, LogData#{
|
||||||
|
msg => "skip_copy_cluster_config_from_peer_nodes",
|
||||||
|
explain => "I have the latest cluster config commit",
|
||||||
|
commit_loaded_from => From,
|
||||||
|
lagging_info => Lagging
|
||||||
|
}),
|
||||||
|
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||||
|
#{my_id := _MyId, latest := _Latest} = Lagging ->
|
||||||
|
delay_and_retry(LogData#{lagging_info => Lagging, commit_loaded_from => From})
|
||||||
|
end;
|
||||||
|
sync_cluster_conf5({waiting, Waiting}, LogData) ->
|
||||||
|
%% this may never happen? since we waited for table before
|
||||||
|
delay_and_retry(LogData#{table_pending => Waiting}).
|
||||||
|
|
||||||
|
get_commit_lag() ->
|
||||||
|
emqx_cluster_rpc:get_commit_lag().
|
||||||
|
|
||||||
|
delay_and_retry(LogData) ->
|
||||||
|
Timeout = sync_delay_timeout(),
|
||||||
|
?SLOG(warning, LogData#{
|
||||||
|
msg => "sync_cluster_conf_retry",
|
||||||
|
explain =>
|
||||||
|
"Cannot boot alone due to potentially stale data. "
|
||||||
|
"Will try sync cluster config again after delay",
|
||||||
|
delay => Timeout
|
||||||
|
}),
|
||||||
|
timer:sleep(Timeout),
|
||||||
|
sync_cluster_conf().
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
sync_delay_timeout() ->
|
||||||
|
Jitter = rand:uniform(200),
|
||||||
|
1_000 + Jitter.
|
||||||
|
-else.
|
||||||
|
sync_delay_timeout() ->
|
||||||
|
Jitter = rand:uniform(2000),
|
||||||
|
10_000 + Jitter.
|
||||||
|
-endif.
|
||||||
|
|
||||||
%% @private Filter out the nodes which are running a newer version than this node.
|
%% @private Filter out the nodes which are running a newer version than this node.
|
||||||
sync_cluster_conf3(Ready) ->
|
sync_cluster_conf3(Ready) ->
|
||||||
case lists:filter(fun is_older_or_same_version/1, Ready) of
|
case lists:filter(fun is_older_or_same_version/1, Ready) of
|
||||||
|
@ -217,10 +240,10 @@ sync_cluster_conf3(Ready) ->
|
||||||
),
|
),
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "all_available_nodes_running_newer_version",
|
msg => "all_available_nodes_running_newer_version",
|
||||||
hint =>
|
explain =>
|
||||||
"Booting this node without syncing cluster config from peer core nodes "
|
"Booting this node without syncing cluster config from core nodes "
|
||||||
"because other nodes are running a newer version",
|
"because other nodes are running a newer version",
|
||||||
peer_nodes => NodesAndVersions
|
versions => NodesAndVersions
|
||||||
}),
|
}),
|
||||||
{ok, ?DEFAULT_INIT_TXN_ID};
|
{ok, ?DEFAULT_INIT_TXN_ID};
|
||||||
Ready2 ->
|
Ready2 ->
|
||||||
|
@ -246,7 +269,7 @@ sync_cluster_conf4(Ready) ->
|
||||||
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
|
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
|
||||||
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
|
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
|
||||||
HasDeprecatedFile = has_deprecated_file(Info),
|
HasDeprecatedFile = has_deprecated_file(Info),
|
||||||
?SLOG(debug, #{
|
?SLOG(info, #{
|
||||||
msg => "sync_cluster_conf_success",
|
msg => "sync_cluster_conf_success",
|
||||||
synced_from_node => Node,
|
synced_from_node => Node,
|
||||||
has_deprecated_file => HasDeprecatedFile,
|
has_deprecated_file => HasDeprecatedFile,
|
||||||
|
@ -263,19 +286,9 @@ sync_cluster_conf4(Ready) ->
|
||||||
ok = sync_data_from_node(Node),
|
ok = sync_data_from_node(Node),
|
||||||
{ok, TnxId}.
|
{ok, TnxId}.
|
||||||
|
|
||||||
should_proceed_with_boot() ->
|
tx_commit_table_status() ->
|
||||||
TablesStatus = emqx_cluster_rpc:get_tables_status(),
|
TablesStatus = emqx_cluster_rpc:get_tables_status(),
|
||||||
LocalNode = node(),
|
maps:get(?CLUSTER_COMMIT, TablesStatus).
|
||||||
case maps:get(?CLUSTER_COMMIT, TablesStatus) of
|
|
||||||
{disc, LocalNode} ->
|
|
||||||
%% Loading locally; let this node finish its boot sequence
|
|
||||||
%% so others can copy the config from this one.
|
|
||||||
true;
|
|
||||||
_ ->
|
|
||||||
%% Loading from another node or still waiting for nodes to
|
|
||||||
%% be up. Try again.
|
|
||||||
false
|
|
||||||
end.
|
|
||||||
|
|
||||||
conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true;
|
conf_sort({ok, #{tnx_id := Id1}}, {ok, #{tnx_id := Id2}}) when Id1 > Id2 -> true;
|
||||||
conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) ->
|
conf_sort({ok, #{tnx_id := Id, wall_clock := W1}}, {ok, #{tnx_id := Id, wall_clock := W2}}) ->
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix config sync wait-loop race condition when cluster nodes boot around the same time.
|
|
@ -14,8 +14,8 @@ type: application
|
||||||
|
|
||||||
# This is the chart version. This version number should be incremented each time you make changes
|
# This is the chart version. This version number should be incremented each time you make changes
|
||||||
# to the chart and its templates, including the app version.
|
# to the chart and its templates, including the app version.
|
||||||
version: 5.3.1-alpha.4
|
version: 5.3.1-alpha.5
|
||||||
|
|
||||||
# This is the version number of the application being deployed. This version number should be
|
# This is the version number of the application being deployed. This version number should be
|
||||||
# incremented each time you make changes to the application.
|
# incremented each time you make changes to the application.
|
||||||
appVersion: 5.3.1-alpha.4
|
appVersion: 5.3.1-alpha.5
|
||||||
|
|
|
@ -54,6 +54,12 @@ desc_api9.desc:
|
||||||
desc_api9.label:
|
desc_api9.label:
|
||||||
"""Test Bridge Creation"""
|
"""Test Bridge Creation"""
|
||||||
|
|
||||||
|
desc_api10.desc:
|
||||||
|
"""Lists the available action types."""
|
||||||
|
|
||||||
|
desc_api10.label:
|
||||||
|
"""List action types"""
|
||||||
|
|
||||||
desc_bridge_metrics.desc:
|
desc_bridge_metrics.desc:
|
||||||
"""Get bridge metrics by id."""
|
"""Get bridge metrics by id."""
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue