feat(cluster link api): add status to responses

Fixes https://emqx.atlassian.net/browse/EMQX-12627
This commit is contained in:
Thales Macedo Garitezi 2024-07-16 16:47:25 -03:00
parent 0b1f0db73c
commit ba3cbe02e3
5 changed files with 524 additions and 40 deletions

View File

@ -10,6 +10,7 @@
{emqx_bridge,5}.
{emqx_bridge,6}.
{emqx_broker,1}.
{emqx_cluster_link,1}.
{emqx_cm,1}.
{emqx_cm,2}.
{emqx_cm,3}.

View File

@ -8,10 +8,14 @@
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/http_api.hrl").
-include_lib("emqx_utils/include/emqx_utils_api.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx/include/logger.hrl").
-export([
api_spec/0,
paths/0,
namespace/0,
fields/1,
schema/1
]).
@ -23,6 +27,10 @@
-define(CONF_PATH, [cluster, links]).
-define(TAGS, [<<"Cluster">>]).
-type cluster_name() :: binary().
namespace() -> "cluster_link".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@ -40,7 +48,7 @@ schema("/cluster/links") ->
description => "Get cluster links configuration",
tags => ?TAGS,
responses =>
#{200 => links_config_schema()}
#{200 => links_config_schema_response()}
},
post =>
#{
@ -49,7 +57,7 @@ schema("/cluster/links") ->
'requestBody' => link_config_schema(),
responses =>
#{
200 => link_config_schema(),
200 => link_config_schema_response(),
400 =>
emqx_dashboard_swagger:error_codes(
[?BAD_REQUEST, ?ALREADY_EXISTS],
@ -68,7 +76,7 @@ schema("/cluster/links/:name") ->
parameters => [param_path_name()],
responses =>
#{
200 => link_config_schema(),
200 => link_config_schema_response(),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Cluster link not found">>
)
@ -95,7 +103,7 @@ schema("/cluster/links/:name") ->
'requestBody' => update_link_config_schema(),
responses =>
#{
200 => link_config_schema(),
200 => link_config_schema_response(),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Cluster link not found">>
),
@ -107,44 +115,30 @@ schema("/cluster/links/:name") ->
}
}.
fields(link_config_response) ->
[
{node, hoconsc:mk(binary(), #{desc => ?DESC("node")})},
{status, hoconsc:mk(status(), #{desc => ?DESC("status")})}
| emqx_cluster_link_schema:fields("link")
].
%%--------------------------------------------------------------------
%% API Handler funcs
%%--------------------------------------------------------------------
'/cluster/links'(get, _Params) ->
?OK(get_raw());
handle_list();
'/cluster/links'(post, #{body := Body = #{<<"name">> := Name}}) ->
with_link(
Name,
return(?BAD_REQUEST('ALREADY_EXISTS', <<"Cluster link already exists">>)),
fun() ->
case emqx_cluster_link_config:create(Body) of
{ok, Res} ->
?CREATED(Res);
{error, Reason} ->
Message = list_to_binary(io_lib:format("Create link failed ~p", [Reason])),
?BAD_REQUEST(Message)
end
end
fun() -> handle_create(Name, Body) end
).
'/cluster/links/:name'(get, #{bindings := #{name := Name}}) ->
with_link(Name, fun(Link) -> ?OK(Link) end, not_found());
with_link(Name, fun(Link) -> handle_lookup(Name, Link) end, not_found());
'/cluster/links/:name'(put, #{bindings := #{name := Name}, body := Params0}) ->
with_link(
Name,
fun(Link) ->
Params = Params0#{<<"name">> => Name},
case emqx_cluster_link_config:update_one_link(Params) of
{ok, Res} ->
?OK(Res);
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update link failed ~p", [Reason])),
?BAD_REQUEST(Message)
end
end,
not_found()
);
with_link(Name, fun() -> handle_update(Name, Params0) end, not_found());
'/cluster/links/:name'(delete, #{bindings := #{name := Name}}) ->
with_link(
Name,
@ -164,6 +158,48 @@ schema("/cluster/links/:name") ->
%% Internal funcs
%%--------------------------------------------------------------------
handle_list() ->
Links = get_raw(),
NodeResults = get_all_link_status_cluster(),
NameToStatus = collect_all_status(NodeResults),
EmptyStatus = #{status => inconsistent, node_status => []},
Response =
lists:map(
fun(#{<<"name">> := Name} = Link) ->
Status = maps:get(Name, NameToStatus, EmptyStatus),
maps:merge(Link, Status)
end,
Links
),
?OK(Response).
handle_create(Name, Params) ->
case emqx_cluster_link_config:create(Params) of
{ok, Link} ->
?CREATED(add_status(Name, Link));
{error, Reason} ->
Message = list_to_binary(io_lib:format("Create link failed ~p", [Reason])),
?BAD_REQUEST(Message)
end.
handle_lookup(Name, Link) ->
?OK(add_status(Name, Link)).
add_status(Name, Link) ->
NodeResults = get_link_status_cluster(Name),
Status = collect_single_status(NodeResults),
maps:merge(Link, Status).
handle_update(Name, Params0) ->
Params = Params0#{<<"name">> => Name},
case emqx_cluster_link_config:update_one_link(Params) of
{ok, Link} ->
?OK(add_status(Name, Link));
{error, Reason} ->
Message = list_to_binary(io_lib:format("Update link failed ~p", [Reason])),
?BAD_REQUEST(Message)
end.
get_raw() ->
#{<<"cluster">> := #{<<"links">> := Links}} =
emqx_config:fill_defaults(
@ -172,15 +208,130 @@ get_raw() ->
),
Links.
links_config_schema() ->
emqx_cluster_link_schema:links_schema(
get_all_link_status_cluster() ->
case emqx_cluster_link_mqtt:get_all_resources_cluster() of
{error, BadResults} ->
?SLOG(warning, #{
msg => "cluster_link_api_all_status_bad_erpc_results",
results => BadResults
}),
[];
{ok, NodeResults} ->
NodeResults
end.
get_link_status_cluster(Name) ->
case emqx_cluster_link_mqtt:get_resource_cluster(Name) of
{error, BadResults} ->
?SLOG(warning, #{
msg => "cluster_link_api_lookup_status_bad_erpc_results",
results => BadResults
}),
[];
{ok, NodeResults} ->
NodeResults
end.
-spec collect_all_status([{node(), #{cluster_name() => _}}]) ->
#{
examples => #{<<"example">> => links_config_example()}
cluster_name() => #{
node := node(),
status := emqx_resource:resource_status() | inconsistent
}
}.
collect_all_status(NodeResults) ->
Reindexed = lists:foldl(
fun({Node, AllLinkData}, Acc) ->
maps:fold(
fun(Name, Data, AccIn) ->
collect_all_status1(Node, Name, Data, AccIn)
end,
Acc,
AllLinkData
)
end,
#{},
NodeResults
),
maps:fold(
fun(Name, NodeToData, Acc) ->
OnlyStatus = [S || #{status := S} <- maps:values(NodeToData)],
SummaryStatus =
case lists:usort(OnlyStatus) of
[SameStatus] -> SameStatus;
_ -> inconsistent
end,
NodeStatus = lists:map(
fun({Node, #{status := S}}) ->
#{node => Node, status => S}
end,
maps:to_list(NodeToData)
),
Acc#{
Name => #{
status => SummaryStatus,
node_status => NodeStatus
}
}
end,
#{},
Reindexed
).
collect_all_status1(Node, Name, Data, Acc) ->
maps:update_with(
Name,
fun(Old) -> Old#{Node => Data} end,
#{Node => Data},
Acc
).
collect_single_status(NodeResults) ->
NodeStatus =
lists:map(
fun
({Node, {ok, #{status := S}}}) ->
#{node => Node, status => S};
({Node, {error, _}}) ->
#{node => Node, status => ?status_disconnected};
({Node, _}) ->
#{node => Node, status => inconsistent}
end,
NodeResults
),
OnlyStatus = [S || #{status := S} <- NodeStatus],
SummaryStatus =
case lists:usort(OnlyStatus) of
[SameStatus] -> SameStatus;
_ -> inconsistent
end,
#{
status => SummaryStatus,
node_status => NodeStatus
}.
links_config_schema_response() ->
hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, link_config_response)), #{
examples => #{<<"example">> => links_config_response_example()}
}).
link_config_schema() ->
hoconsc:mk(emqx_cluster_link_schema:link_schema(), #{
examples => #{<<"example">> => hd(links_config_example())}
}).
link_config_schema_response() ->
hoconsc:mk(
hoconsc:ref(?MODULE, link_config_response),
#{
examples => #{
<<"example">> => hd(links_config_response_example())
}
}
).
link_config_schema() ->
emqx_cluster_link_schema:link_schema().
status() ->
hoconsc:enum([?status_connected, ?status_disconnected, ?status_connecting, inconsistent]).
param_path_name() ->
{name,
@ -197,6 +348,22 @@ param_path_name() ->
update_link_config_schema() ->
proplists:delete(name, emqx_cluster_link_schema:fields("link")).
links_config_response_example() ->
lists:map(
fun(LinkEx) ->
LinkEx#{
<<"status">> => <<"connected">>,
<<"node_status">> => [
#{
<<"node">> => <<"emqx1@emqx.net">>,
<<"status">> => <<"connected">>
}
]
}
end,
links_config_example()
).
links_config_example() ->
[
#{
@ -229,7 +396,8 @@ with_link(Name, FoundFn, NotFoundFn) ->
case emqx_cluster_link_config:link_raw(Name) of
undefined ->
NotFoundFn();
Link = #{} ->
Link0 = #{} ->
Link = fill_defaults_single(Link0),
{arity, Arity} = erlang:fun_info(FoundFn, arity),
case Arity of
1 -> FoundFn(Link);
@ -237,6 +405,14 @@ with_link(Name, FoundFn, NotFoundFn) ->
end
end.
fill_defaults_single(Link0) ->
#{<<"cluster">> := #{<<"links">> := [Link]}} =
emqx_config:fill_defaults(
#{<<"cluster">> => #{<<"links">> => [Link0]}},
#{obfuscate_sensitive_values => true}
),
Link.
return(Response) ->
fun() -> Response end.

View File

@ -46,6 +46,16 @@
forward/2
]).
-export([
get_all_resources_cluster/0,
get_resource_cluster/1
]).
%% BpAPI / RPC Targets
-export([
get_resource_local_v1/1,
get_all_resources_local_v1/0
]).
-define(MSG_CLIENTID_SUFFIX, ":msg:").
-define(MQTT_HOST_OPTS, #{default_port => 1883}).
@ -80,6 +90,8 @@
-define(PUB_TIMEOUT, 10_000).
-type cluster_name() :: binary().
-spec ensure_msg_fwd_resource(map()) ->
{ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}.
ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) ->
@ -89,10 +101,57 @@ ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf)
},
emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResOpts1).
-spec remove_msg_fwd_resource(binary() | map()) -> ok | {error, Reason :: term()}.
-spec remove_msg_fwd_resource(cluster_name()) -> ok | {error, Reason :: term()}.
remove_msg_fwd_resource(ClusterName) ->
emqx_resource:remove_local(?MSG_RES_ID(ClusterName)).
-spec get_all_resources_cluster() ->
{ok, [{node(), #{cluster_name() => emqx_resource:resource_data()}}]}
| {error, [term()]}.
get_all_resources_cluster() ->
Nodes = emqx:running_nodes(),
Results = emqx_cluster_link_proto_v1:get_all_resources(Nodes),
sequence_multicall_results(Nodes, Results).
-spec get_resource_cluster(cluster_name()) ->
{ok, [{node(), {ok, emqx_resource:resource_data()} | {error, not_found}}]}
| {error, [term()]}.
get_resource_cluster(ClusterName) ->
Nodes = emqx:running_nodes(),
Results = emqx_cluster_link_proto_v1:get_resource(Nodes, ClusterName),
sequence_multicall_results(Nodes, Results).
%% RPC Target in `emqx_cluster_link_proto_v1'.
-spec get_resource_local_v1(cluster_name()) ->
{ok, emqx_resource:resource_data()} | {error, not_found}.
get_resource_local_v1(ClusterName) ->
case emqx_resource:get_instance(?MSG_RES_ID(ClusterName)) of
{ok, _ResourceGroup, ResourceData} ->
{ok, ResourceData};
{error, not_found} ->
{error, not_found}
end.
%% RPC Target in `emqx_cluster_link_proto_v1'.
-spec get_all_resources_local_v1() -> #{cluster_name() => emqx_resource:resource_data()}.
get_all_resources_local_v1() ->
lists:foldl(
fun
(?MSG_RES_ID(Name) = Id, Acc) ->
case emqx_resource:get_instance(Id) of
{ok, ?RES_GROUP, ResourceData} ->
Acc#{Name => ResourceData};
_ ->
Acc
end;
(_Id, Acc) ->
%% Doesn't follow the naming pattern; manually crafted?
Acc
end,
#{},
emqx_resource:list_group_instances(?RES_GROUP)
).
%%--------------------------------------------------------------------
%% emqx_resource callbacks (message forwarding)
%%--------------------------------------------------------------------
@ -419,3 +478,16 @@ emqtt_client_opts(ClientIdSuffix, ClusterConf) ->
#{clientid := BaseClientId} = Opts = emqx_cluster_link_config:mk_emqtt_options(ClusterConf),
ClientId = emqx_bridge_mqtt_lib:clientid_base([BaseClientId, ClientIdSuffix]),
Opts#{clientid => ClientId}.
-spec sequence_multicall_results([node()], emqx_rpc:erpc_multicall(term())) ->
{ok, [{node(), term()}]} | {error, [term()]}.
sequence_multicall_results(Nodes, Results) ->
case lists:partition(fun is_ok/1, lists:zip(Nodes, Results)) of
{OkResults, []} ->
{ok, [{Node, Res} || {Node, {ok, Res}} <- OkResults]};
{_OkResults, BadResults} ->
{error, BadResults}
end.
is_ok({_Node, {ok, _}}) -> true;
is_ok(_) -> false.

View File

@ -0,0 +1,31 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_cluster_link_proto_v1).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
get_resource/2,
get_all_resources/1
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 15000).
introduced_in() ->
"5.7.2".
-spec get_resource([node()], binary()) ->
emqx_rpc:erpc_multicall({ok, emqx_resource:resource_data()} | {error, not_found}).
get_resource(Nodes, ClusterName) ->
erpc:multicall(Nodes, emqx_cluster_link_mqtt, get_resource_local_v1, [ClusterName], ?TIMEOUT).
-spec get_all_resources([node()]) ->
emqx_rpc:erpc_multicall(#{binary() => emqx_resource:resource_data()}).
get_all_resources(Nodes) ->
erpc:multicall(Nodes, emqx_cluster_link_mqtt, get_all_resources_local_v1, [], ?TIMEOUT).

View File

@ -37,6 +37,8 @@
"-----END CERTIFICATE-----"
>>).
-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
@ -67,10 +69,36 @@ end_per_suite(Config) ->
auth_header() ->
emqx_mgmt_api_test_util:auth_header_().
init_per_testcase(t_status = TestCase, Config) ->
ok = emqx_cth_suite:stop_apps([emqx_dashboard]),
SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(TestCase, Config),
TargetClusterSpec = emqx_cluster_link_SUITE:mk_target_cluster(TestCase, Config),
SourceNodes = [SN1 | _] = emqx_cth_cluster:start(SourceClusterSpec),
TargetNodes = emqx_cth_cluster:start(TargetClusterSpec),
emqx_cluster_link_SUITE:start_cluster_link(SourceNodes ++ TargetNodes, Config),
erpc:call(SN1, emqx_cth_suite, start_apps, [
[emqx_management, emqx_mgmt_api_test_util:emqx_dashboard()],
#{work_dir => emqx_cth_suite:work_dir(TestCase, Config)}
]),
[
{source_nodes, SourceNodes},
{target_nodes, TargetNodes}
| Config
];
init_per_testcase(_TC, Config) ->
{ok, _} = emqx_cluster_link_config:update([]),
Config.
end_per_testcase(t_status, Config) ->
SourceNodes = ?config(source_nodes, Config),
TargetNodes = ?config(target_nodes, Config),
ok = emqx_cth_cluster:stop(SourceNodes),
ok = emqx_cth_cluster:stop(TargetNodes),
_ = emqx_cth_suite:start_apps(
[emqx_mgmt_api_test_util:emqx_dashboard()],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
ok;
end_per_testcase(_TC, _Config) ->
ok.
@ -158,6 +186,8 @@ t_put_invalid(_Config) ->
update_link(Name, maps:remove(<<"server">>, link_params()))
).
%% Tests a sequence of CRUD operations and their expected responses, for common use cases
%% and configuration states.
t_crud(_Config) ->
%% No links initially.
?assertMatch({200, []}, list()),
@ -167,13 +197,43 @@ t_crud(_Config) ->
?assertMatch({404, _}, update_link(NameA, link_params())),
Params1 = link_params(),
?assertMatch({201, #{<<"name">> := NameA}}, create_link(NameA, Params1)),
?assertMatch(
{201, #{
<<"name">> := NameA,
<<"status">> := _,
<<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
}},
create_link(NameA, Params1)
),
?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, create_link(NameA, Params1)),
?assertMatch({200, [#{<<"name">> := NameA}]}, list()),
?assertMatch({200, #{<<"name">> := NameA}}, get_link(NameA)),
?assertMatch(
{200, [
#{
<<"name">> := NameA,
<<"status">> := _,
<<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
}
]},
list()
),
?assertMatch(
{200, #{
<<"name">> := NameA,
<<"status">> := _,
<<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
}},
get_link(NameA)
),
Params2 = Params1#{<<"pool_size">> := 2},
?assertMatch({200, #{<<"name">> := NameA}}, update_link(NameA, Params2)),
?assertMatch(
{200, #{
<<"name">> := NameA,
<<"status">> := _,
<<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _]
}},
update_link(NameA, Params2)
),
?assertMatch({204, _}, delete_link(NameA)),
?assertMatch({404, _}, delete_link(NameA)),
@ -182,3 +242,147 @@ t_crud(_Config) ->
?assertMatch({200, []}, list()),
ok.
%% Verifies the behavior of reported status under different conditions when listing all
%% links and when fetching a specific link.
t_status(Config) ->
[SN1 | _] = ?config(source_nodes, Config),
Name = <<"cl.target">>,
?assertMatch(
{200, [
#{
<<"status">> := <<"connected">>,
<<"node_status">> := [
#{
<<"node">> := _,
<<"status">> := <<"connected">>
},
#{
<<"node">> := _,
<<"status">> := <<"connected">>
}
]
}
]},
list()
),
?assertMatch(
{200, #{
<<"status">> := <<"connected">>,
<<"node_status">> := [
#{
<<"node">> := _,
<<"status">> := <<"connected">>
},
#{
<<"node">> := _,
<<"status">> := <<"connected">>
}
]
}},
get_link(Name)
),
%% If one of the nodes reports a different status, the cluster is inconsistent.
ProtoMod = emqx_cluster_link_proto_v1,
?ON(SN1, begin
ok = meck:new(ProtoMod, [no_link, passthrough, no_history]),
meck:expect(ProtoMod, get_all_resources, fun(Nodes) ->
[Res1, {ok, Res2A} | Rest] = meck:passthrough([Nodes]),
%% Res2A :: #{cluster_name() => emqx_resource:resource_data()}
Res2B = maps:map(fun(_, Data) -> Data#{status := disconnected} end, Res2A),
[Res1, {ok, Res2B} | Rest]
end),
meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) ->
[Res1, {ok, {ok, Res2A}} | Rest] = meck:passthrough([Nodes, LinkName]),
Res2B = Res2A#{status := disconnected},
[Res1, {ok, {ok, Res2B}} | Rest]
end)
end),
?assertMatch(
{200, [
#{
<<"status">> := <<"inconsistent">>,
<<"node_status">> := [
#{
<<"node">> := _,
<<"status">> := <<"connected">>
},
#{
<<"node">> := _,
<<"status">> := <<"disconnected">>
}
]
}
]},
list()
),
?assertMatch(
{200, #{
<<"status">> := <<"inconsistent">>,
<<"node_status">> := [
#{
<<"node">> := _,
<<"status">> := <<"connected">>
},
#{
<<"node">> := _,
<<"status">> := <<"disconnected">>
}
]
}},
get_link(Name)
),
%% Simulating erpc failures
?ON(SN1, begin
meck:expect(ProtoMod, get_all_resources, fun(Nodes) ->
[Res1, _ | Rest] = meck:passthrough([Nodes]),
[Res1, {error, {erpc, noconnection}} | Rest]
end),
meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) ->
[Res1, _ | Rest] = meck:passthrough([Nodes, LinkName]),
[Res1, {error, {erpc, noconnection}} | Rest]
end)
end),
?assertMatch(
{200, [
#{
<<"status">> := <<"inconsistent">>,
<<"node_status">> := []
}
]},
list()
),
?assertMatch(
{200, #{
<<"status">> := <<"inconsistent">>,
<<"node_status">> := []
}},
get_link(Name)
),
%% Simulate another inconsistency
?ON(SN1, begin
meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) ->
[Res1, _ | Rest] = meck:passthrough([Nodes, LinkName]),
[Res1, {ok, {error, not_found}} | Rest]
end)
end),
?assertMatch(
{200, #{
<<"status">> := <<"inconsistent">>,
<<"node_status">> := [
#{
<<"node">> := _,
<<"status">> := <<"connected">>
},
#{
<<"node">> := _,
<<"status">> := <<"disconnected">>
}
]
}},
get_link(Name)
),
ok.