diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index f0a2adca7..bc3e4f1a2 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -10,6 +10,7 @@ {emqx_bridge,5}. {emqx_bridge,6}. {emqx_broker,1}. +{emqx_cluster_link,1}. {emqx_cm,1}. {emqx_cm,2}. {emqx_cm,3}. diff --git a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl index 08dc7f4ad..32c675d8d 100644 --- a/apps/emqx_cluster_link/include/emqx_cluster_link.hrl +++ b/apps/emqx_cluster_link/include/emqx_cluster_link.hrl @@ -17,3 +17,7 @@ %% Fairly compact text encoding. -define(SHARED_ROUTE_ID(Topic, Group), <<"$s/", Group/binary, "/", Topic/binary>>). -define(PERSISTENT_ROUTE_ID(Topic, ID), <<"$p/", ID/binary, "/", Topic/binary>>). + +-define(METRIC_NAME, cluster_link). + +-define(route_metric, 'routes'). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl index 33634607e..77f613e45 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_api.erl @@ -7,84 +7,523 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/http_api.hrl"). +-include_lib("emqx_utils/include/emqx_utils_api.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include("emqx_cluster_link.hrl"). -export([ api_spec/0, paths/0, + namespace/0, + fields/1, schema/1 ]). --export([config/2]). +-export([ + '/cluster/links'/2, + '/cluster/links/link/:name'/2, + '/cluster/links/link/:name/metrics'/2 +]). -define(CONF_PATH, [cluster, links]). -define(TAGS, [<<"Cluster">>]). +-type cluster_name() :: binary(). + +namespace() -> "cluster_link". + api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}). paths() -> [ - "/cluster/links" + "/cluster/links", + "/cluster/links/link/:name", + "/cluster/links/link/:name/metrics" ]. schema("/cluster/links") -> #{ - 'operationId' => config, + 'operationId' => '/cluster/links', get => #{ description => "Get cluster links configuration", tags => ?TAGS, responses => - #{200 => links_config_schema()} + #{200 => links_config_schema_response()} + }, + post => + #{ + description => "Create a cluster link", + tags => ?TAGS, + 'requestBody' => link_config_schema(), + responses => + #{ + 200 => link_config_schema_response(), + 400 => + emqx_dashboard_swagger:error_codes( + [?BAD_REQUEST, ?ALREADY_EXISTS], + <<"Update Config Failed">> + ) + } + } + }; +schema("/cluster/links/link/:name") -> + #{ + 'operationId' => '/cluster/links/link/:name', + get => + #{ + description => "Get a cluster link configuration", + tags => ?TAGS, + parameters => [param_path_name()], + responses => + #{ + 200 => link_config_schema_response(), + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Cluster link not found">> + ) + } + }, + delete => + #{ + description => "Delete a cluster link", + tags => ?TAGS, + parameters => [param_path_name()], + responses => + #{ + 204 => <<"Link deleted">>, + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Cluster link not found">> + ) + } }, put => #{ - description => "Update cluster links configuration", + description => "Update a cluster link configuration", tags => ?TAGS, - 'requestBody' => links_config_schema(), + parameters => [param_path_name()], + 'requestBody' => update_link_config_schema(), responses => #{ - 200 => links_config_schema(), + 200 => link_config_schema_response(), + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Cluster link not found">> + ), 400 => emqx_dashboard_swagger:error_codes( [?BAD_REQUEST], <<"Update Config Failed">> ) } } + }; +schema("/cluster/links/link/:name/metrics") -> + #{ + 'operationId' => '/cluster/links/link/:name/metrics', + get => + #{ + description => "Get a cluster link metrics", + tags => ?TAGS, + parameters => [param_path_name()], + responses => + #{ + 200 => link_metrics_schema_response(), + 404 => emqx_dashboard_swagger:error_codes( + [?NOT_FOUND], <<"Cluster link not found">> + ) + } + } }. +fields(link_config_response) -> + [ + {node, hoconsc:mk(binary(), #{desc => ?DESC("node")})}, + {status, hoconsc:mk(status(), #{desc => ?DESC("status")})} + | emqx_cluster_link_schema:fields("link") + ]; +fields(metrics) -> + [ + {metrics, hoconsc:mk(map(), #{desc => ?DESC("metrics")})} + ]; +fields(link_metrics_response) -> + [ + {node_metrics, + hoconsc:mk( + hoconsc:array(hoconsc:ref(?MODULE, node_metrics)), + #{desc => ?DESC("node_metrics")} + )} + | fields(metrics) + ]; +fields(node_metrics) -> + [ + {node, hoconsc:mk(atom(), #{desc => ?DESC("node")})} + | fields(metrics) + ]. + %%-------------------------------------------------------------------- %% API Handler funcs %%-------------------------------------------------------------------- -config(get, _Params) -> - {200, get_raw()}; -config(put, #{body := Body}) -> - case emqx_cluster_link_config:update(Body) of - {ok, NewConfig} -> - {200, NewConfig}; - {error, Reason} -> - Message = list_to_binary(io_lib:format("Update config failed ~p", [Reason])), - {400, ?BAD_REQUEST, Message} - end. +'/cluster/links'(get, _Params) -> + handle_list(); +'/cluster/links'(post, #{body := Body = #{<<"name">> := Name}}) -> + with_link( + Name, + return(?BAD_REQUEST('ALREADY_EXISTS', <<"Cluster link already exists">>)), + fun() -> handle_create(Name, Body) end + ). + +'/cluster/links/link/:name'(get, #{bindings := #{name := Name}}) -> + with_link(Name, fun(Link) -> handle_lookup(Name, Link) end, not_found()); +'/cluster/links/link/:name'(put, #{bindings := #{name := Name}, body := Params0}) -> + with_link(Name, fun() -> handle_update(Name, Params0) end, not_found()); +'/cluster/links/link/:name'(delete, #{bindings := #{name := Name}}) -> + with_link( + Name, + fun() -> + case emqx_cluster_link_config:delete_link(Name) of + ok -> + ?NO_CONTENT; + {error, Reason} -> + Message = list_to_binary(io_lib:format("Delete link failed ~p", [Reason])), + ?BAD_REQUEST(Message) + end + end, + not_found() + ). + +'/cluster/links/link/:name/metrics'(get, #{bindings := #{name := Name}}) -> + with_link(Name, fun() -> handle_metrics(Name) end, not_found()). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- +handle_list() -> + Links = get_raw(), + NodeRPCResults = emqx_cluster_link_mqtt:get_all_resources_cluster(), + {NameToStatus, Errors} = collect_all_status(NodeRPCResults), + NodeErrors = lists:map( + fun({Node, Error}) -> + #{node => Node, status => inconsistent, reason => Error} + end, + Errors + ), + EmptyStatus = #{status => inconsistent, node_status => NodeErrors}, + Response = + lists:map( + fun(#{<<"name">> := Name} = Link) -> + Status = maps:get(Name, NameToStatus, EmptyStatus), + maps:merge(Link, Status) + end, + Links + ), + ?OK(Response). + +handle_create(Name, Params) -> + case emqx_cluster_link_config:create_link(Params) of + {ok, Link} -> + ?CREATED(add_status(Name, Link)); + {error, Reason} -> + Message = list_to_binary(io_lib:format("Create link failed ~p", [Reason])), + ?BAD_REQUEST(Message) + end. + +handle_lookup(Name, Link) -> + ?OK(add_status(Name, Link)). + +handle_metrics(Name) -> + Results = emqx_cluster_link_metrics:get_metrics(Name), + {NodeMetrics0, NodeErrors} = + lists:foldl( + fun({Node, RouterMetrics0, ResourceMetrics0}, {OkAccIn, ErrAccIn}) -> + {RouterMetrics, RouterError} = get_metrics_or_errors(RouterMetrics0), + {ResourceMetrics, ResourceError} = get_metrics_or_errors(ResourceMetrics0), + ErrAcc = append_errors(RouterError, ResourceError, Node, ErrAccIn), + {[format_metrics(Node, RouterMetrics, ResourceMetrics) | OkAccIn], ErrAcc} + end, + {[], []}, + Results + ), + case NodeErrors of + [] -> + ok; + [_ | _] -> + ?SLOG(warning, #{ + msg => "cluster_link_api_metrics_bad_erpc_results", + errors => maps:from_list(NodeErrors) + }) + end, + NodeMetrics1 = lists:map(fun({Node, _Error}) -> format_metrics(Node, #{}, #{}) end, NodeErrors), + NodeMetrics = NodeMetrics1 ++ NodeMetrics0, + AggregatedMetrics = aggregate_metrics(NodeMetrics), + Response = #{metrics => AggregatedMetrics, node_metrics => NodeMetrics}, + ?OK(Response). + +get_metrics_or_errors({ok, Metrics}) -> + {Metrics, undefined}; +get_metrics_or_errors(Error) -> + {#{}, Error}. + +append_errors(undefined, undefined, _Node, Acc) -> + Acc; +append_errors(RouterError, ResourceError, Node, Acc) -> + Err0 = emqx_utils_maps:put_if(#{}, router, RouterError, RouterError =/= undefined), + Err = emqx_utils_maps:put_if(Err0, resource, ResourceError, ResourceError =/= undefined), + [{Node, Err} | Acc]. + +aggregate_metrics(NodeMetrics) -> + ErrorLogger = fun(_) -> ok end, + #{metrics := #{router := EmptyRouterMetrics}} = format_metrics(node(), #{}, #{}), + {RouterMetrics, ResourceMetrics} = lists:foldl( + fun( + #{metrics := #{router := RMetrics, forwarding := FMetrics}}, + {RouterAccIn, ResourceAccIn} + ) -> + ResourceAcc = + emqx_utils_maps:best_effort_recursive_sum(FMetrics, ResourceAccIn, ErrorLogger), + RouterAcc = merge_cluster_wide_metrics(RMetrics, RouterAccIn), + {RouterAcc, ResourceAcc} + end, + {EmptyRouterMetrics, #{}}, + NodeMetrics + ), + #{router => RouterMetrics, forwarding => ResourceMetrics}. + +merge_cluster_wide_metrics(Metrics, Acc) -> + %% For cluster-wide metrics, all nodes should report the same values, except if the + %% RPC to fetch a node's metrics failed, in which case all values will be 0. + F = + fun(_Key, V1, V2) -> + case {erlang:is_map(V1), erlang:is_map(V2)} of + {true, true} -> + merge_cluster_wide_metrics(V1, V2); + {true, false} -> + merge_cluster_wide_metrics(V1, #{}); + {false, true} -> + merge_cluster_wide_metrics(V2, #{}); + {false, false} -> + true = is_number(V1), + true = is_number(V2), + max(V1, V2) + end + end, + maps:merge_with(F, Acc, Metrics). + +format_metrics(Node, RouterMetrics, ResourceMetrics) -> + Get = fun(Path, Map) -> emqx_utils_maps:deep_get(Path, Map, 0) end, + Routes = Get([gauges, ?route_metric], RouterMetrics), + #{ + node => Node, + metrics => #{ + router => #{ + ?route_metric => Routes + }, + forwarding => #{ + 'matched' => Get([counters, 'matched'], ResourceMetrics), + 'success' => Get([counters, 'success'], ResourceMetrics), + 'failed' => Get([counters, 'failed'], ResourceMetrics), + 'dropped' => Get([counters, 'dropped'], ResourceMetrics), + 'retried' => Get([counters, 'retried'], ResourceMetrics), + 'received' => Get([counters, 'received'], ResourceMetrics), + + 'queuing' => Get([gauges, 'queuing'], ResourceMetrics), + 'inflight' => Get([gauges, 'inflight'], ResourceMetrics), + + 'rate' => Get([rate, 'matched', current], ResourceMetrics), + 'rate_last5m' => Get([rate, 'matched', last5m], ResourceMetrics), + 'rate_max' => Get([rate, 'matched', max], ResourceMetrics) + } + } + }. + +add_status(Name, Link) -> + NodeRPCResults = emqx_cluster_link_mqtt:get_resource_cluster(Name), + Status = collect_single_status(NodeRPCResults), + maps:merge(Link, Status). + +handle_update(Name, Params0) -> + Params = Params0#{<<"name">> => Name}, + case emqx_cluster_link_config:update_link(Params) of + {ok, Link} -> + ?OK(add_status(Name, Link)); + {error, Reason} -> + Message = list_to_binary(io_lib:format("Update link failed ~p", [Reason])), + ?BAD_REQUEST(Message) + end. + get_raw() -> - #{<<"links">> := Conf} = + #{<<"cluster">> := #{<<"links">> := Links}} = emqx_config:fill_defaults( - #{<<"links">> => emqx_conf:get_raw(?CONF_PATH)}, + #{<<"cluster">> => #{<<"links">> => emqx_conf:get_raw(?CONF_PATH)}}, #{obfuscate_sensitive_values => true} ), - Conf. + Links. -links_config_schema() -> - emqx_cluster_link_schema:links_schema( - #{ - examples => #{<<"example">> => links_config_example()} +-spec collect_all_status([{node(), {ok, #{cluster_name() => _}} | _Error}]) -> + {ClusterToStatus, Errors} +when + ClusterToStatus :: #{ + cluster_name() => #{ + node := node(), + status := emqx_resource:resource_status() | inconsistent } + }, + Errors :: [{node(), term()}]. +collect_all_status(NodeResults) -> + {Reindexed, Errors} = lists:foldl( + fun + ({Node, {ok, AllLinkData}}, {OkAccIn, ErrAccIn}) -> + OkAcc = maps:fold( + fun(Name, Data, AccIn) -> + collect_all_status1(Node, Name, Data, AccIn) + end, + OkAccIn, + AllLinkData + ), + {OkAcc, ErrAccIn}; + ({Node, Error}, {OkAccIn, ErrAccIn}) -> + {OkAccIn, [{Node, Error} | ErrAccIn]} + end, + {#{}, []}, + NodeResults + ), + NoErrors = + case Errors of + [] -> + true; + [_ | _] -> + ?SLOG(warning, #{ + msg => "cluster_link_api_lookup_status_bad_erpc_results", + errors => Errors + }), + false + end, + ClusterToStatus = maps:fold( + fun(Name, NodeToData, Acc) -> + OnlyStatus = [S || #{status := S} <- maps:values(NodeToData)], + SummaryStatus = + case lists:usort(OnlyStatus) of + [SameStatus] when NoErrors -> SameStatus; + _ -> inconsistent + end, + NodeStatus = lists:map( + fun + ({Node, #{status := S}}) -> + #{node => Node, status => S}; + ({Node, Error0}) -> + Error = emqx_logger_jsonfmt:best_effort_json(Error0), + #{node => Node, status => inconsistent, reason => Error} + end, + maps:to_list(NodeToData) ++ Errors + ), + Acc#{ + Name => #{ + status => SummaryStatus, + node_status => NodeStatus + } + } + end, + #{}, + Reindexed + ), + {ClusterToStatus, Errors}. + +collect_all_status1(Node, Name, Data, Acc) -> + maps:update_with( + Name, + fun(Old) -> Old#{Node => Data} end, + #{Node => Data}, + Acc + ). + +collect_single_status(NodeResults) -> + NodeStatus = + lists:map( + fun + ({Node, {ok, {ok, #{status := S}}}}) -> + #{node => Node, status => S}; + ({Node, {ok, {error, _}}}) -> + #{node => Node, status => ?status_disconnected}; + ({Node, Error0}) -> + Error = emqx_logger_jsonfmt:best_effort_json(Error0), + #{node => Node, status => inconsistent, reason => Error} + end, + NodeResults + ), + OnlyStatus = [S || #{status := S} <- NodeStatus], + SummaryStatus = + case lists:usort(OnlyStatus) of + [SameStatus] -> SameStatus; + _ -> inconsistent + end, + #{ + status => SummaryStatus, + node_status => NodeStatus + }. + +links_config_schema_response() -> + hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, link_config_response)), #{ + examples => #{<<"example">> => links_config_response_example()} + }). + +link_config_schema() -> + hoconsc:mk(emqx_cluster_link_schema:link_schema(), #{ + examples => #{<<"example">> => hd(links_config_example())} + }). + +link_config_schema_response() -> + hoconsc:mk( + hoconsc:ref(?MODULE, link_config_response), + #{ + examples => #{ + <<"example">> => hd(links_config_response_example()) + } + } + ). + +link_metrics_schema_response() -> + hoconsc:mk( + hoconsc:ref(?MODULE, link_metrics_response), + #{ + examples => #{ + <<"example">> => link_metrics_response_example() + } + } + ). + +status() -> + hoconsc:enum([?status_connected, ?status_disconnected, ?status_connecting, inconsistent]). + +param_path_name() -> + {name, + hoconsc:mk( + binary(), + #{ + in => path, + required => true, + example => <<"my_link">>, + desc => ?DESC("param_path_name") + } + )}. + +update_link_config_schema() -> + proplists:delete(name, emqx_cluster_link_schema:fields("link")). + +links_config_response_example() -> + lists:map( + fun(LinkEx) -> + LinkEx#{ + <<"status">> => <<"connected">>, + <<"node_status">> => [ + #{ + <<"node">> => <<"emqx1@emqx.net">>, + <<"status">> => <<"connected">> + } + ] + } + end, + links_config_example() ). links_config_example() -> @@ -114,3 +553,39 @@ links_config_example() -> <<"name">> => <<"emqxcl_c">> } ]. + +link_metrics_response_example() -> + #{ + <<"metrics">> => #{<<"routes">> => 10240}, + <<"node_metrics">> => [ + #{ + <<"node">> => <<"emqx1@emqx.net">>, + <<"metrics">> => #{<<"routes">> => 10240} + } + ] + }. + +with_link(Name, FoundFn, NotFoundFn) -> + case emqx_cluster_link_config:link_raw(Name) of + undefined -> + NotFoundFn(); + Link0 = #{} when is_function(FoundFn, 1) -> + Link = fill_defaults_single(Link0), + FoundFn(Link); + _Link = #{} when is_function(FoundFn, 0) -> + FoundFn() + end. + +fill_defaults_single(Link0) -> + #{<<"cluster">> := #{<<"links">> := [Link]}} = + emqx_config:fill_defaults( + #{<<"cluster">> => #{<<"links">> => [Link0]}}, + #{obfuscate_sensitive_values => true} + ), + Link. + +return(Response) -> + fun() -> Response end. + +not_found() -> + return(?NOT_FOUND(<<"Cluster link not found">>)). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl index 41f1a0a77..9502ad1c3 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_app.erl @@ -22,7 +22,9 @@ start(_StartType, _StartArgs) -> _ -> ok end, - emqx_cluster_link_sup:start_link(LinksConf). + {ok, Sup} = emqx_cluster_link_sup:start_link(LinksConf), + ok = create_metrics(LinksConf), + {ok, Sup}. prep_stop(State) -> emqx_cluster_link_config:remove_handler(), @@ -53,3 +55,11 @@ remove_msg_fwd_resources(LinksConf) -> end, LinksConf ). + +create_metrics(LinksConf) -> + lists:foreach( + fun(#{name := ClusterName}) -> + ok = emqx_cluster_link_metrics:maybe_create_metrics(ClusterName) + end, + LinksConf + ). diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_bookkeeper.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_bookkeeper.erl new file mode 100644 index 000000000..826d4f0db --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_bookkeeper.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_cluster_link_bookkeeper). + +%% API +-export([ + start_link/0 +]). + +%% `gen_server' API +-export([ + init/1, + handle_continue/2, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +%%------------------------------------------------------------------------------ +%% Type declarations +%%------------------------------------------------------------------------------ + +-ifdef(TEST). +%% ms +-define(TALLY_ROUTES_INTERVAL, 300). +-else. +%% ms +-define(TALLY_ROUTES_INTERVAL, 15_000). +-endif. + +%% call/cast/info events +-record(tally_routes, {}). + +%%------------------------------------------------------------------------------ +%% API +%%------------------------------------------------------------------------------ + +-spec start_link() -> gen_server:start_ret(). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, _InitOpts = #{}, _Opts = []). + +%%------------------------------------------------------------------------------ +%% `gen_server' API +%%------------------------------------------------------------------------------ + +init(_Opts) -> + State = #{}, + {ok, State, {continue, #tally_routes{}}}. + +handle_continue(#tally_routes{}, State) -> + handle_tally_routes(), + {noreply, State}. + +handle_call(_Call, _From, State) -> + {reply, {error, bad_call}, State}. + +handle_cast(_Cast, State) -> + {noreply, State}. + +handle_info(#tally_routes{}, State) -> + handle_tally_routes(), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%------------------------------------------------------------------------------ +%% Internal fns +%%------------------------------------------------------------------------------ + +cluster_names() -> + Links = emqx_cluster_link_config:links(), + lists:map(fun(#{name := Name}) -> Name end, Links). + +ensure_timer(Event, Timeout) -> + _ = erlang:send_after(Timeout, self(), Event), + ok. + +handle_tally_routes() -> + ClusterNames = cluster_names(), + tally_routes(ClusterNames), + ensure_timer(#tally_routes{}, ?TALLY_ROUTES_INTERVAL), + ok. + +tally_routes([ClusterName | ClusterNames]) -> + NumRoutes = emqx_cluster_link_extrouter:count(ClusterName), + emqx_cluster_link_metrics:routes_set(ClusterName, NumRoutes), + tally_routes(ClusterNames); +tally_routes([]) -> + ok. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl index f27c7702e..2a97f2d69 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_config.erl @@ -4,6 +4,8 @@ -module(emqx_cluster_link_config). +-feature(maybe_expr, enable). + -behaviour(emqx_config_handler). -include_lib("emqx/include/logger.hrl"). @@ -28,11 +30,15 @@ -export([ %% General + create_link/1, + delete_link/1, + update_link/1, update/1, cluster/0, enabled_links/0, links/0, link/1, + link_raw/1, topic_filters/1, %% Connections emqtt_options/1, @@ -55,6 +61,52 @@ %% +create_link(LinkConfig) -> + #{<<"name">> := Name} = LinkConfig, + case + emqx_conf:update( + ?LINKS_PATH, + {create, LinkConfig}, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of + {ok, #{raw_config := NewConfigRows}} -> + NewLinkConfig = find_link(Name, NewConfigRows), + {ok, NewLinkConfig}; + {error, Reason} -> + {error, Reason} + end. + +delete_link(Name) -> + case + emqx_conf:update( + ?LINKS_PATH, + {delete, Name}, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of + {ok, _} -> + ok; + {error, Reason} -> + {error, Reason} + end. + +update_link(LinkConfig) -> + #{<<"name">> := Name} = LinkConfig, + case + emqx_conf:update( + ?LINKS_PATH, + {update, LinkConfig}, + #{rawconf_with_defaults => true, override_to => cluster} + ) + of + {ok, #{raw_config := NewConfigRows}} -> + NewLinkConfig = find_link(Name, NewConfigRows), + {ok, NewLinkConfig}; + {error, Reason} -> + {error, Reason} + end. + update(Config) -> case emqx_conf:update( @@ -75,11 +127,20 @@ cluster() -> links() -> emqx:get_config(?LINKS_PATH, []). +links_raw() -> + emqx:get_raw_config(?LINKS_PATH, []). + enabled_links() -> [L || L = #{enable := true} <- links()]. link(Name) -> - case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, links()) of + find_link(Name, links()). + +link_raw(Name) -> + find_link(Name, links_raw()). + +find_link(Name, Links) -> + case lists:dropwhile(fun(L) -> Name =/= upstream_name(L) end, Links) of [LinkConf | _] -> LinkConf; [] -> undefined end. @@ -133,6 +194,37 @@ remove_handler() -> pre_config_update(?LINKS_PATH, RawConf, RawConf) -> {ok, RawConf}; +pre_config_update(?LINKS_PATH, {create, LinkRawConf}, OldRawConf) -> + #{<<"name">> := Name} = LinkRawConf, + maybe + undefined ?= find_link(Name, OldRawConf), + NewRawConf0 = OldRawConf ++ [LinkRawConf], + NewRawConf = convert_certs(maybe_increment_ps_actor_incr(NewRawConf0, OldRawConf)), + {ok, NewRawConf} + else + _ -> + {error, already_exists} + end; +pre_config_update(?LINKS_PATH, {update, LinkRawConf}, OldRawConf) -> + #{<<"name">> := Name} = LinkRawConf, + maybe + {_Found, Front, Rear} ?= safe_take(Name, OldRawConf), + NewRawConf0 = Front ++ [LinkRawConf] ++ Rear, + NewRawConf = convert_certs(maybe_increment_ps_actor_incr(NewRawConf0, OldRawConf)), + {ok, NewRawConf} + else + not_found -> + {error, not_found} + end; +pre_config_update(?LINKS_PATH, {delete, Name}, OldRawConf) -> + maybe + {_Found, Front, Rear} ?= safe_take(Name, OldRawConf), + NewRawConf = Front ++ Rear, + {ok, NewRawConf} + else + _ -> + {error, not_found} + end; pre_config_update(?LINKS_PATH, NewRawConf, OldRawConf) -> {ok, convert_certs(maybe_increment_ps_actor_incr(NewRawConf, OldRawConf))}. @@ -185,9 +277,10 @@ all_ok(Results) -> add_links(LinksConf) -> [add_link(Link) || Link <- LinksConf]. -add_link(#{enable := true} = LinkConf) -> +add_link(#{name := ClusterName, enable := true} = LinkConf) -> {ok, _Pid} = emqx_cluster_link_sup:ensure_actor(LinkConf), {ok, _} = emqx_cluster_link_mqtt:ensure_msg_fwd_resource(LinkConf), + ok = emqx_cluster_link_metrics:maybe_create_metrics(ClusterName), ok; add_link(_DisabledLinkConf) -> ok. @@ -197,12 +290,13 @@ remove_links(LinksConf) -> remove_link(Name) -> _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), - ensure_actor_stopped(Name). + _ = ensure_actor_stopped(Name), + emqx_cluster_link_metrics:drop_metrics(Name). update_links(LinksConf) -> - [update_link(Link) || Link <- LinksConf]. + [do_update_link(Link) || Link <- LinksConf]. -update_link({OldLinkConf, #{enable := true, name := Name} = NewLinkConf}) -> +do_update_link({OldLinkConf, #{enable := true, name := Name} = NewLinkConf}) -> case what_is_changed(OldLinkConf, NewLinkConf) of both -> _ = ensure_actor_stopped(Name), @@ -215,7 +309,7 @@ update_link({OldLinkConf, #{enable := true, name := Name} = NewLinkConf}) -> msg_resource -> ok = update_msg_fwd_resource(OldLinkConf, NewLinkConf) end; -update_link({_OldLinkConf, #{enable := false, name := Name} = _NewLinkConf}) -> +do_update_link({_OldLinkConf, #{enable := false, name := Name} = _NewLinkConf}) -> _ = emqx_cluster_link_mqtt:remove_msg_fwd_resource(Name), ensure_actor_stopped(Name). @@ -320,3 +414,11 @@ do_convert_certs(LinkName, SSLOpts) -> ), throw({bad_ssl_config, Reason}) end. + +safe_take(Name, Transformations) -> + case lists:splitwith(fun(#{<<"name">> := N}) -> N =/= Name end, Transformations) of + {_Front, []} -> + not_found; + {Front, [Found | Rear]} -> + {Found, Front, Rear} + end. diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl index 79d96e207..44b147454 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_extrouter.erl @@ -34,6 +34,9 @@ apply_actor_operation/5 ]). +%% Internal export for bookkeeping +-export([count/1]). + %% Strictly monotonically increasing integer. -type smint() :: integer(). @@ -147,6 +150,16 @@ make_extroute_rec_pat(Entry) -> [{1, extroute}, {#extroute.entry, Entry}] ). +%% Internal exports for bookkeeping +count(ClusterName) -> + TopicPat = '_', + RouteIDPat = '_', + Pat = make_extroute_rec_pat( + emqx_trie_search:make_pat(TopicPat, ?ROUTE_ID(ClusterName, RouteIDPat)) + ), + MS = [{Pat, [], [true]}], + ets:select_count(?EXTROUTE_TAB, MS). + %% -record(state, { @@ -280,7 +293,9 @@ apply_operation(Entry, MCounter, OpName, Lane) -> Marker = 1 bsl Lane, case MCounter band Marker of 0 when OpName =:= add -> - mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker); + Res = mria:dirty_update_counter(?EXTROUTE_TAB, Entry, Marker), + ?tp("cluster_link_extrouter_route_added", #{}), + Res; Marker when OpName =:= add -> %% Already added. MCounter; @@ -289,6 +304,7 @@ apply_operation(Entry, MCounter, OpName, Lane) -> 0 -> Record = #extroute{entry = Entry, mcounter = 0}, ok = mria:dirty_delete_object(?EXTROUTE_TAB, Record), + ?tp("cluster_link_extrouter_route_deleted", #{}), 0; C -> C diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl new file mode 100644 index 000000000..36c5e791d --- /dev/null +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_metrics.erl @@ -0,0 +1,60 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_cluster_link_metrics). + +-include("emqx_cluster_link.hrl"). + +%% API +-export([ + maybe_create_metrics/1, + drop_metrics/1, + + get_metrics/1, + routes_set/2 +]). + +%%-------------------------------------------------------------------- +%% Type definitions +%%-------------------------------------------------------------------- + +-define(METRICS, [ + ?route_metric +]). +-define(RATE_METRICS, []). + +%%-------------------------------------------------------------------- +%% metrics API +%%-------------------------------------------------------------------- + +get_metrics(ClusterName) -> + Nodes = emqx:running_nodes(), + Timeout = 15_000, + RouterResults = emqx_metrics_proto_v2:get_metrics(Nodes, ?METRIC_NAME, ClusterName, Timeout), + ResourceId = emqx_cluster_link_mqtt:resource_id(ClusterName), + ResourceResults = emqx_metrics_proto_v2:get_metrics( + Nodes, resource_metrics, ResourceId, Timeout + ), + lists:zip3(Nodes, RouterResults, ResourceResults). + +maybe_create_metrics(ClusterName) -> + case emqx_metrics_worker:has_metrics(?METRIC_NAME, ClusterName) of + true -> + ok = emqx_metrics_worker:reset_metrics(?METRIC_NAME, ClusterName); + false -> + ok = emqx_metrics_worker:create_metrics( + ?METRIC_NAME, ClusterName, ?METRICS, ?RATE_METRICS + ) + end. + +drop_metrics(ClusterName) -> + ok = emqx_metrics_worker:clear_metrics(?METRIC_NAME, ClusterName). + +routes_set(ClusterName, Val) -> + catch emqx_metrics_worker:set_gauge( + ?METRIC_NAME, ClusterName, <<"singleton">>, ?route_metric, Val + ). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl index 3a6411cbe..cb9955863 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_mqtt.erl @@ -9,6 +9,7 @@ -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/trace.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -behaviour(emqx_resource). -behaviour(ecpool_worker). @@ -28,6 +29,7 @@ ]). -export([ + resource_id/1, ensure_msg_fwd_resource/1, remove_msg_fwd_resource/1, decode_route_op/1, @@ -47,6 +49,16 @@ forward/2 ]). +-export([ + get_all_resources_cluster/0, + get_resource_cluster/1 +]). +%% BpAPI / RPC Targets +-export([ + get_resource_local_v1/1, + get_all_resources_local_v1/0 +]). + -define(MSG_CLIENTID_SUFFIX, ":msg:"). -define(MQTT_HOST_OPTS, #{default_port => 1883}). @@ -81,6 +93,12 @@ -define(PUB_TIMEOUT, 10_000). +-type cluster_name() :: binary(). + +-spec resource_id(cluster_name()) -> resource_id(). +resource_id(ClusterName) -> + ?MSG_RES_ID(ClusterName). + -spec ensure_msg_fwd_resource(map()) -> {ok, emqx_resource:resource_data() | already_started} | {error, Reason :: term()}. ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) -> @@ -90,10 +108,55 @@ ensure_msg_fwd_resource(#{name := Name, resource_opts := ResOpts} = ClusterConf) }, emqx_resource:create_local(?MSG_RES_ID(Name), ?RES_GROUP, ?MODULE, ClusterConf, ResOpts1). --spec remove_msg_fwd_resource(binary() | map()) -> ok | {error, Reason :: term()}. +-spec remove_msg_fwd_resource(cluster_name()) -> ok | {error, Reason :: term()}. remove_msg_fwd_resource(ClusterName) -> emqx_resource:remove_local(?MSG_RES_ID(ClusterName)). +-spec get_all_resources_cluster() -> + [{node(), emqx_rpc:erpc(#{cluster_name() => emqx_resource:resource_data()})}]. +get_all_resources_cluster() -> + Nodes = emqx:running_nodes(), + Results = emqx_cluster_link_proto_v1:get_all_resources(Nodes), + lists:zip(Nodes, Results). + +-spec get_resource_cluster(cluster_name()) -> + [{node(), {ok, {ok, emqx_resource:resource_data()} | {error, not_found}} | _Error}]. +get_resource_cluster(ClusterName) -> + Nodes = emqx:running_nodes(), + Results = emqx_cluster_link_proto_v1:get_resource(Nodes, ClusterName), + lists:zip(Nodes, Results). + +%% RPC Target in `emqx_cluster_link_proto_v1'. +-spec get_resource_local_v1(cluster_name()) -> + {ok, emqx_resource:resource_data()} | {error, not_found}. +get_resource_local_v1(ClusterName) -> + case emqx_resource:get_instance(?MSG_RES_ID(ClusterName)) of + {ok, _ResourceGroup, ResourceData} -> + {ok, ResourceData}; + {error, not_found} -> + {error, not_found} + end. + +%% RPC Target in `emqx_cluster_link_proto_v1'. +-spec get_all_resources_local_v1() -> #{cluster_name() => emqx_resource:resource_data()}. +get_all_resources_local_v1() -> + lists:foldl( + fun + (?MSG_RES_ID(Name) = Id, Acc) -> + case emqx_resource:get_instance(Id) of + {ok, ?RES_GROUP, ResourceData} -> + Acc#{Name => ResourceData}; + _ -> + Acc + end; + (_Id, Acc) -> + %% Doesn't follow the naming pattern; manually crafted? + Acc + end, + #{}, + emqx_resource:list_group_instances(?RES_GROUP) + ). + %%-------------------------------------------------------------------- %% emqx_resource callbacks (message forwarding) %%-------------------------------------------------------------------- diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl index f46249a4f..9b08510b9 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_schema.erl @@ -12,7 +12,7 @@ -export([injected_fields/0]). %% Used in emqx_cluster_link_api --export([links_schema/1]). +-export([links_schema/1, link_schema/0]). -export([ roots/0, @@ -30,13 +30,20 @@ namespace() -> "cluster". roots() -> []. injected_fields() -> - #{cluster => [{links, links_schema(#{})}]}. + #{ + cluster => [ + {links, links_schema(#{})} + ] + }. links_schema(Meta) -> ?HOCON(?ARRAY(?R_REF("link")), Meta#{ default => [], validator => fun links_validator/1, desc => ?DESC("links") }). +link_schema() -> + hoconsc:ref(?MODULE, "link"). + fields("link") -> [ {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}, @@ -44,8 +51,8 @@ fields("link") -> {server, emqx_schema:servers_sc(#{required => true, desc => ?DESC(server)}, ?MQTT_HOST_OPTS)}, {clientid, ?HOCON(binary(), #{desc => ?DESC(clientid)})}, - {username, ?HOCON(binary(), #{desc => ?DESC(username)})}, - {password, emqx_schema_secret:mk(#{desc => ?DESC(password)})}, + {username, ?HOCON(binary(), #{required => false, desc => ?DESC(username)})}, + {password, emqx_schema_secret:mk(#{required => false, desc => ?DESC(password)})}, {ssl, #{ type => ?R_REF(emqx_schema, "ssl_client_opts"), default => #{<<"enable">> => false}, diff --git a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl index 2025510fc..42f195cf7 100644 --- a/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl +++ b/apps/emqx_cluster_link/src/emqx_cluster_link_sup.erl @@ -6,6 +6,8 @@ -behaviour(supervisor). +-include("emqx_cluster_link.hrl"). + -export([start_link/1]). -export([ @@ -27,12 +29,14 @@ init(LinksConf) -> intensity => 10, period => 5 }, + Metrics = emqx_metrics_worker:child_spec(metrics, ?METRIC_NAME), + BookKeeper = bookkeeper_spec(), ExtrouterGC = extrouter_gc_spec(), RouteActors = [ sup_spec(Name, ?ACTOR_MODULE, [LinkConf]) || #{name := Name} = LinkConf <- LinksConf ], - {ok, {SupFlags, [ExtrouterGC | RouteActors]}}. + {ok, {SupFlags, [Metrics, BookKeeper, ExtrouterGC | RouteActors]}}. extrouter_gc_spec() -> %% NOTE: This one is currently global, not per-link. @@ -53,6 +57,15 @@ sup_spec(Id, Mod, Args) -> modules => [Mod] }. +bookkeeper_spec() -> + #{ + id => bookkeeper, + start => {emqx_cluster_link_bookkeeper, start_link, []}, + restart => permanent, + type => worker, + shutdown => 5_000 + }. + ensure_actor(#{name := Name} = LinkConf) -> case supervisor:start_child(?SERVER, sup_spec(Name, ?ACTOR_MODULE, [LinkConf])) of {ok, Pid} -> diff --git a/apps/emqx_cluster_link/src/proto/emqx_cluster_link_proto_v1.erl b/apps/emqx_cluster_link/src/proto/emqx_cluster_link_proto_v1.erl new file mode 100644 index 000000000..725bb8afc --- /dev/null +++ b/apps/emqx_cluster_link/src/proto/emqx_cluster_link_proto_v1.erl @@ -0,0 +1,31 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_cluster_link_proto_v1). + +-behaviour(emqx_bpapi). + +-export([ + introduced_in/0, + + get_resource/2, + get_all_resources/1 +]). + +-include_lib("emqx/include/bpapi.hrl"). + +-define(TIMEOUT, 15000). + +introduced_in() -> + "5.7.2". + +-spec get_resource([node()], binary()) -> + emqx_rpc:erpc_multicall({ok, emqx_resource:resource_data()} | {error, not_found}). +get_resource(Nodes, ClusterName) -> + erpc:multicall(Nodes, emqx_cluster_link_mqtt, get_resource_local_v1, [ClusterName], ?TIMEOUT). + +-spec get_all_resources([node()]) -> + emqx_rpc:erpc_multicall(#{binary() => emqx_resource:resource_data()}). +get_all_resources(Nodes) -> + erpc:multicall(Nodes, emqx_cluster_link_mqtt, get_all_resources_local_v1, [], ?TIMEOUT). diff --git a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl index c5ec8da6c..8157c86d6 100644 --- a/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl +++ b/apps/emqx_cluster_link/test/emqx_cluster_link_api_SUITE.erl @@ -11,6 +11,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + -define(API_PATH, emqx_mgmt_api_test_util:api_path(["cluster", "links"])). -define(CONF_PATH, [cluster, links]). @@ -37,8 +39,28 @@ "-----END CERTIFICATE-----" >>). +-define(ON(NODE, BODY), erpc:call(NODE, fun() -> BODY end)). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + all() -> - emqx_common_test_helpers:all(?MODULE). + AllTCs = emqx_common_test_helpers:all(?MODULE), + OtherTCs = AllTCs -- cluster_test_cases(), + [ + {group, cluster} + | OtherTCs + ]. + +groups() -> + [{cluster, cluster_test_cases()}]. + +cluster_test_cases() -> + [ + t_status, + t_metrics + ]. init_per_suite(Config) -> %% This is called by emqx_machine in EMQX release @@ -47,7 +69,7 @@ init_per_suite(Config) -> [ emqx_conf, emqx_management, - {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}, + emqx_mgmt_api_test_util:emqx_dashboard(), emqx_cluster_link ], #{work_dir => emqx_cth_suite:work_dir(Config)} @@ -60,73 +82,641 @@ end_per_suite(Config) -> emqx_config:delete_override_conf_files(), ok. +init_per_group(cluster = Group, Config) -> + ok = emqx_cth_suite:stop_apps([emqx_dashboard]), + SourceClusterSpec = emqx_cluster_link_SUITE:mk_source_cluster(Group, Config), + TargetClusterSpec = emqx_cluster_link_SUITE:mk_target_cluster(Group, Config), + SourceNodes = [SN1 | _] = emqx_cth_cluster:start(SourceClusterSpec), + TargetNodes = [TN1 | _] = emqx_cth_cluster:start(TargetClusterSpec), + emqx_cluster_link_SUITE:start_cluster_link(SourceNodes ++ TargetNodes, Config), + erpc:call(SN1, emqx_cth_suite, start_apps, [ + [emqx_management, emqx_mgmt_api_test_util:emqx_dashboard()], + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} + ]), + erpc:call(TN1, emqx_cth_suite, start_apps, [ + [ + emqx_management, + emqx_mgmt_api_test_util:emqx_dashboard( + "dashboard.listeners.http { enable = true, bind = 28083 }" + ) + ], + #{work_dir => emqx_cth_suite:work_dir(Group, Config)} + ]), + [ + {source_nodes, SourceNodes}, + {target_nodes, TargetNodes} + | Config + ]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(cluster, Config) -> + SourceNodes = ?config(source_nodes, Config), + TargetNodes = ?config(target_nodes, Config), + ok = emqx_cth_cluster:stop(SourceNodes), + ok = emqx_cth_cluster:stop(TargetNodes), + _ = emqx_cth_suite:start_apps( + [emqx_mgmt_api_test_util:emqx_dashboard()], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + ok; +end_per_group(_Group, _Config) -> + ok. + auth_header() -> - {ok, API} = emqx_common_test_http:create_default_app(), - emqx_common_test_http:auth_header(API). + emqx_mgmt_api_test_util:auth_header_(). init_per_testcase(_TC, Config) -> {ok, _} = emqx_cluster_link_config:update([]), + snabbkaffe:start_trace(), Config. end_per_testcase(_TC, _Config) -> + snabbkaffe:stop(), + emqx_common_test_helpers:call_janitor(), ok. -t_put_get_valid(Config) -> - Auth = ?config(auth, Config), - Path = ?API_PATH, - {ok, Resp} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), - ?assertMatch([], emqx_utils_json:decode(Resp)), +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ - Link1 = #{ +api_root() -> + <<"cluster/links">>. + +list() -> + Path = emqx_mgmt_api_test_util:api_path([api_root()]), + emqx_mgmt_api_test_util:simple_request(get, Path, _Params = ""). + +get_link(Name) -> + get_link(source, Name). + +get_link(SourceOrTargetCluster, Name) -> + Host = host(SourceOrTargetCluster), + Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name]), + emqx_mgmt_api_test_util:simple_request(get, Path, _Params = ""). + +delete_link(Name) -> + Path = emqx_mgmt_api_test_util:api_path([api_root(), "link", Name]), + emqx_mgmt_api_test_util:simple_request(delete, Path, _Params = ""). + +update_link(Name, Params) -> + update_link(source, Name, Params). + +update_link(SourceOrTargetCluster, Name, Params) -> + Host = host(SourceOrTargetCluster), + Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name]), + emqx_mgmt_api_test_util:simple_request(put, Path, Params). + +create_link(Name, Params0) -> + Params = Params0#{<<"name">> => Name}, + Path = emqx_mgmt_api_test_util:api_path([api_root()]), + emqx_mgmt_api_test_util:simple_request(post, Path, Params). + +get_metrics(Name) -> + get_metrics(source, Name). + +get_metrics(SourceOrTargetCluster, Name) -> + Host = host(SourceOrTargetCluster), + Path = emqx_mgmt_api_test_util:api_path(Host, [api_root(), "link", Name, "metrics"]), + emqx_mgmt_api_test_util:simple_request(get, Path, _Params = []). + +host(source) -> "http://127.0.0.1:18083"; +host(target) -> "http://127.0.0.1:28083". + +link_params() -> + link_params(_Overrides = #{}). + +link_params(Overrides) -> + Default = #{ + <<"clientid">> => <<"linkclientid">>, <<"pool_size">> => 1, <<"server">> => <<"emqxcl_2.nohost:31883">>, - <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"name">> => <<"emqcl_1">> + <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>] }, - Link2 = #{ - <<"pool_size">> => 1, - <<"server">> => <<"emqxcl_2.nohost:41883">>, - <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"name">> => <<"emqcl_2">> - }, - ?assertMatch({ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link1, Link2])), + emqx_utils_maps:deep_merge(Default, Overrides). - {ok, Resp1} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), - ?assertMatch([Link1, Link2], emqx_utils_json:decode(Resp1)), +%%------------------------------------------------------------------------------ +%% Test cases +%%------------------------------------------------------------------------------ + +t_put_get_valid(_Config) -> + ?assertMatch({200, []}, list()), + + Name1 = <<"emqcl_1">>, + Link1 = link_params(#{ + <<"server">> => <<"emqxcl_2.nohost:31883">>, + <<"name">> => Name1 + }), + Name2 = <<"emqcl_2">>, + Link2 = link_params(#{ + <<"server">> => <<"emqxcl_2.nohost:41883">>, + <<"name">> => Name2 + }), + ?assertMatch({201, _}, create_link(Name1, Link1)), + ?assertMatch({201, _}, create_link(Name2, Link2)), + ?assertMatch({200, [_, _]}, list()), DisabledLink1 = Link1#{<<"enable">> => false}, - ?assertMatch( - {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [DisabledLink1, Link2]) - ), - - {ok, Resp2} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), - ?assertMatch([DisabledLink1, Link2], emqx_utils_json:decode(Resp2)), + ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, DisabledLink1))), + ?assertMatch({200, #{<<"enable">> := false}}, get_link(Name1)), + ?assertMatch({200, #{<<"enable">> := true}}, get_link(Name2)), SSL = #{<<"enable">> => true, <<"cacertfile">> => ?CACERT}, SSLLink1 = Link1#{<<"ssl">> => SSL}, + ?assertMatch({200, _}, update_link(Name1, maps:remove(<<"name">>, SSLLink1))), ?assertMatch( - {ok, _}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link2, SSLLink1]) + {200, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}}, + get_link(Name1) ), - {ok, Resp3} = emqx_mgmt_api_test_util:request_api(get, Path, Auth), + ok. +t_put_invalid(_Config) -> + Name = <<"l1">>, + {201, _} = create_link(Name, link_params()), ?assertMatch( - [Link2, #{<<"ssl">> := #{<<"enable">> := true, <<"cacertfile">> := _Path}}], - emqx_utils_json:decode(Resp3) + {400, _}, + update_link(Name, maps:remove(<<"server">>, link_params())) ). -t_put_invalid(Config) -> - Auth = ?config(auth, Config), - Path = ?API_PATH, - Link = #{ - <<"pool_size">> => 1, - <<"server">> => <<"emqxcl_2.nohost:31883">>, - <<"topics">> => [<<"t/test-topic">>, <<"t/test/#">>], - <<"name">> => <<"emqcl_1">> - }, +%% Tests a sequence of CRUD operations and their expected responses, for common use cases +%% and configuration states. +t_crud(_Config) -> + %% No links initially. + ?assertMatch({200, []}, list()), + NameA = <<"a">>, + ?assertMatch({404, _}, get_link(NameA)), + ?assertMatch({404, _}, delete_link(NameA)), + ?assertMatch({404, _}, update_link(NameA, link_params())), + ?assertMatch({404, _}, get_metrics(NameA)), + + Params1 = link_params(), ?assertMatch( - {error, {_, 400, _}}, emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [Link, Link]) + {201, #{ + <<"name">> := NameA, + <<"status">> := _, + <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _] + }}, + create_link(NameA, Params1) + ), + ?assertMatch({400, #{<<"code">> := <<"ALREADY_EXISTS">>}}, create_link(NameA, Params1)), + ?assertMatch( + {200, [ + #{ + <<"name">> := NameA, + <<"status">> := _, + <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _] + } + ]}, + list() ), ?assertMatch( - {error, {_, 400, _}}, - emqx_mgmt_api_test_util:request_api(put, Path, "", Auth, [maps:remove(<<"name">>, Link)]) - ). + {200, #{ + <<"name">> := NameA, + <<"status">> := _, + <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _] + }}, + get_link(NameA) + ), + ?assertMatch({200, _}, get_metrics(NameA)), + + Params2 = Params1#{<<"pool_size">> := 2}, + ?assertMatch( + {200, #{ + <<"name">> := NameA, + <<"status">> := _, + <<"node_status">> := [#{<<"node">> := _, <<"status">> := _} | _] + }}, + update_link(NameA, Params2) + ), + + ?assertMatch({204, _}, delete_link(NameA)), + ?assertMatch({404, _}, delete_link(NameA)), + ?assertMatch({404, _}, get_link(NameA)), + ?assertMatch({404, _}, update_link(NameA, Params1)), + ?assertMatch({404, _}, get_metrics(NameA)), + ?assertMatch({200, []}, list()), + + ok. + +%% Verifies the behavior of reported status under different conditions when listing all +%% links and when fetching a specific link. +t_status(Config) -> + [SN1 | _] = ?config(source_nodes, Config), + Name = <<"cl.target">>, + ?retry( + 100, + 10, + ?assertMatch( + {200, [ + #{ + <<"status">> := <<"connected">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + } + ] + } + ]}, + list() + ) + ), + ?assertMatch( + {200, #{ + <<"status">> := <<"connected">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + } + ] + }}, + get_link(Name) + ), + + %% If one of the nodes reports a different status, the cluster is inconsistent. + ProtoMod = emqx_cluster_link_proto_v1, + ?ON(SN1, begin + ok = meck:new(ProtoMod, [no_link, passthrough, no_history]), + meck:expect(ProtoMod, get_all_resources, fun(Nodes) -> + [Res1, {ok, Res2A} | Rest] = meck:passthrough([Nodes]), + %% Res2A :: #{cluster_name() => emqx_resource:resource_data()} + Res2B = maps:map(fun(_, Data) -> Data#{status := disconnected} end, Res2A), + [Res1, {ok, Res2B} | Rest] + end), + meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) -> + [Res1, {ok, {ok, Res2A}} | Rest] = meck:passthrough([Nodes, LinkName]), + Res2B = Res2A#{status := disconnected}, + [Res1, {ok, {ok, Res2B}} | Rest] + end) + end), + on_exit(fun() -> catch ?ON(SN1, meck:unload()) end), + ?assertMatch( + {200, [ + #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"disconnected">> + } + ] + } + ]}, + list() + ), + ?assertMatch( + {200, #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"disconnected">> + } + ] + }}, + get_link(Name) + ), + + %% Simulating erpc failures + ?ON(SN1, begin + meck:expect(ProtoMod, get_all_resources, fun(Nodes) -> + [Res1, _ | Rest] = meck:passthrough([Nodes]), + [Res1, {error, {erpc, noconnection}} | Rest] + end), + meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) -> + [Res1, _ | Rest] = meck:passthrough([Nodes, LinkName]), + [Res1, {error, {erpc, noconnection}} | Rest] + end) + end), + ?assertMatch( + {200, [ + #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"inconsistent">>, + <<"reason">> := _ + } + ] + } + ]}, + list() + ), + ?assertMatch( + {200, #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"inconsistent">>, + <<"reason">> := _ + } + ] + }}, + get_link(Name) + ), + %% Simulate another inconsistency + ?ON(SN1, begin + meck:expect(ProtoMod, get_resource, fun(Nodes, LinkName) -> + [Res1, _ | Rest] = meck:passthrough([Nodes, LinkName]), + [Res1, {ok, {error, not_found}} | Rest] + end) + end), + ?assertMatch( + {200, #{ + <<"status">> := <<"inconsistent">>, + <<"node_status">> := [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">> + }, + #{ + <<"node">> := _, + <<"status">> := <<"disconnected">> + } + ] + }}, + get_link(Name) + ), + + ok. + +t_metrics(Config) -> + ct:timetrap({seconds, 10}), + [SN1, SN2] = ?config(source_nodes, Config), + [TN1, TN2] = ?config(target_nodes, Config), + %% N.B. Link names on each cluster, so they are switched. + SourceName = <<"cl.target">>, + TargetName = <<"cl.source">>, + + ?assertMatch( + {200, #{ + <<"metrics">> := #{ + <<"router">> := #{ + <<"routes">> := 0 + }, + <<"forwarding">> := #{ + <<"matched">> := _, + <<"success">> := _, + <<"failed">> := _, + <<"dropped">> := _, + <<"retried">> := _, + <<"received">> := _, + <<"queuing">> := _, + <<"inflight">> := _, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + } + }, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"router">> := #{ + <<"routes">> := 0 + }, + <<"forwarding">> := #{ + <<"matched">> := _, + <<"success">> := _, + <<"failed">> := _, + <<"dropped">> := _, + <<"retried">> := _, + <<"received">> := _, + <<"queuing">> := _, + <<"inflight">> := _, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + } + } + }, + #{ + <<"node">> := _, + <<"metrics">> := #{ + <<"router">> := #{ + <<"routes">> := 0 + }, + <<"forwarding">> := #{ + <<"matched">> := _, + <<"success">> := _, + <<"failed">> := _, + <<"dropped">> := _, + <<"retried">> := _, + <<"received">> := _, + <<"queuing">> := _, + <<"inflight">> := _, + <<"rate">> := _, + <<"rate_last5m">> := _, + <<"rate_max">> := _ + } + } + } + ] + }}, + get_metrics(source, SourceName) + ), + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}} + }, + #{ + <<"node">> := _, + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}} + } + ] + }}, + get_metrics(target, TargetName) + ), + + SourceC1 = emqx_cluster_link_SUITE:start_client(<<"sc1">>, SN1), + SourceC2 = emqx_cluster_link_SUITE:start_client(<<"sc2">>, SN2), + {ok, _, _} = emqtt:subscribe(SourceC1, <<"t/sc1">>), + {ok, _, _} = emqtt:subscribe(SourceC2, <<"t/sc2">>), + + %% Still no routes, as routes in the source cluster are replicated to the target + %% cluster. + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}} + }, + #{ + <<"node">> := _, + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}} + } + ] + }}, + get_metrics(source, SourceName) + ), + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}, + <<"node_metrics">> := [ + #{ + <<"node">> := _, + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}} + }, + #{ + <<"node">> := _, + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}} + } + ] + }}, + get_metrics(target, TargetName) + ), + + TargetC1 = emqx_cluster_link_SUITE:start_client(<<"tc1">>, TN1), + TargetC2 = emqx_cluster_link_SUITE:start_client(<<"tc2">>, TN2), + {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/tc1">>), + {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/tc2">>), + {_, {ok, _}} = + ?wait_async_action( + begin + {ok, _, _} = emqtt:subscribe(TargetC1, <<"t/tc1">>), + {ok, _, _} = emqtt:subscribe(TargetC2, <<"t/tc2">>) + end, + #{?snk_kind := clink_route_sync_complete} + ), + + %% Routes = 2 in source cluster, because the target cluster has some topic filters + %% configured and subscribers to them, which were replicated to the source cluster. + %% This metric is global (cluster-wide). + ?retry( + 300, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}, + <<"node_metrics">> := [ + #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}}, + #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 2}}} + ] + }}, + get_metrics(source, SourceName) + ) + ), + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}, + <<"node_metrics">> := _ + }}, + get_metrics(target, TargetName) + ), + + %% Unsubscribe and remove route. + ct:pal("unsubscribing"), + {_, {ok, _}} = + ?wait_async_action( + begin + {ok, _, _} = emqtt:unsubscribe(TargetC1, <<"t/tc1">>) + end, + #{?snk_kind := clink_route_sync_complete} + ), + + ?retry( + 300, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}, + <<"node_metrics">> := [ + #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}}, + #{<<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}} + ] + }}, + get_metrics(source, SourceName) + ) + ), + + %% Disabling the link should remove the routes. + ct:pal("disabling"), + {200, TargetLink0} = get_link(target, TargetName), + TargetLink1 = maps:without([<<"status">>, <<"node_status">>], TargetLink0), + TargetLink2 = TargetLink1#{<<"enable">> := false}, + {_, {ok, _}} = + ?wait_async_action( + begin + {200, _} = update_link(target, TargetName, TargetLink2), + %% Note that only when the GC runs and collects the stopped actor it'll actually + %% remove the routes + NowMS = erlang:system_time(millisecond), + TTL = emqx_cluster_link_config:actor_ttl(), + ct:pal("gc"), + %% 2 Actors: one for normal routes, one for PS routes + 1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})), + 1 = ?ON(SN1, emqx_cluster_link_extrouter:actor_gc(#{timestamp => NowMS + TTL * 3})), + ct:pal("gc done"), + ok + end, + #{?snk_kind := "cluster_link_extrouter_route_deleted"} + ), + + ?retry( + 300, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 0}}, + <<"node_metrics">> := _ + }}, + get_metrics(source, SourceName) + ) + ), + + %% Enabling again + TargetLink3 = TargetLink2#{<<"enable">> := true}, + {_, {ok, _}} = + ?wait_async_action( + begin + {200, _} = update_link(target, TargetName, TargetLink3) + end, + #{?snk_kind := "cluster_link_extrouter_route_added"} + ), + + ?retry( + 300, + 10, + ?assertMatch( + {200, #{ + <<"metrics">> := #{<<"router">> := #{<<"routes">> := 1}}, + <<"node_metrics">> := _ + }}, + get_metrics(source, SourceName) + ) + ), + + ok. diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 106a65a9c..4b1d40651 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -293,3 +293,30 @@ format_multipart_formdata(Data, Params, Name, FileNames, MimeType, Boundary) -> FileNames ), erlang:iolist_to_binary([WithPaths, StartBoundary, <<"--">>, LineSeparator]). + +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +simple_request(Method, Path, Params) -> + AuthHeader = auth_header_(), + Opts = #{return_all => true}, + case request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {{_, Status, _}, _Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {Status, Body}; + {error, {{_, Status, _}, _Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {Status, Body} + end. diff --git a/apps/emqx_utils/include/emqx_utils_api.hrl b/apps/emqx_utils/include/emqx_utils_api.hrl index ba2941a4f..0876b9829 100644 --- a/apps/emqx_utils/include/emqx_utils_api.hrl +++ b/apps/emqx_utils/include/emqx_utils_api.hrl @@ -21,6 +21,8 @@ -define(OK(CONTENT), {200, CONTENT}). +-define(CREATED(CONTENT), {201, CONTENT}). + -define(NO_CONTENT, 204). -define(BAD_REQUEST(CODE, REASON), {400, ?ERROR_MSG(CODE, REASON)}).