feat(resource): ask for metrics only when needed

This commit is contained in:
Andrew Mayorov 2023-04-10 17:59:33 +03:00
parent e186477531
commit e70deae1c3
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
11 changed files with 239 additions and 100 deletions

View File

@ -5,6 +5,7 @@
{emqx_bridge,1}. {emqx_bridge,1}.
{emqx_bridge,2}. {emqx_bridge,2}.
{emqx_bridge,3}. {emqx_bridge,3}.
{emqx_bridge,4}.
{emqx_broker,1}. {emqx_broker,1}.
{emqx_cm,1}. {emqx_cm,1}.
{emqx_conf,1}. {emqx_conf,1}.

View File

@ -872,8 +872,8 @@ lookup_from_local_node(ChainName, AuthenticatorID) ->
case emqx_resource:get_instance(ResourceId) of case emqx_resource:get_instance(ResourceId) of
{error, not_found} -> {error, not_found} ->
{error, {NodeId, not_found_resource}}; {error, {NodeId, not_found_resource}};
{ok, _, #{status := Status, metrics := ResourceMetrics}} -> {ok, _, #{status := Status}} ->
{ok, {NodeId, Status, Metrics, ResourceMetrics}} {ok, {NodeId, Status, Metrics, emqx_resource:get_metrics(ResourceId)}}
end end
end; end;
{error, Reason} -> {error, Reason} ->

View File

@ -344,8 +344,8 @@ lookup_from_local_node(Type) ->
case emqx_resource:get_instance(ResourceId) of case emqx_resource:get_instance(ResourceId) of
{error, not_found} -> {error, not_found} ->
{error, {NodeId, not_found_resource}}; {error, {NodeId, not_found_resource}};
{ok, _, #{status := Status, metrics := ResourceMetrics}} -> {ok, _, #{status := Status}} ->
{ok, {NodeId, Status, Metrics, ResourceMetrics}} {ok, {NodeId, Status, Metrics, emqx_resource:get_metrics(ResourceId)}}
end; end;
_ -> _ ->
Metrics = emqx_metrics_worker:get_metrics(authz_metrics, Type), Metrics = emqx_metrics_worker:get_metrics(authz_metrics, Type),

View File

@ -34,7 +34,7 @@
unload/0, unload/0,
lookup/1, lookup/1,
lookup/2, lookup/2,
lookup/3, get_metrics/2,
create/3, create/3,
disable_enable/3, disable_enable/3,
remove/2, remove/2,
@ -271,6 +271,9 @@ lookup(Type, Name, RawConf) ->
}} }}
end. end.
get_metrics(Type, Name) ->
emqx_resource:get_metrics(emqx_bridge_resource:resource_id(Type, Name)).
maybe_upgrade(mqtt, Config) -> maybe_upgrade(mqtt, Config) ->
emqx_bridge_compatible_config:maybe_upgrade(Config); emqx_bridge_compatible_config:maybe_upgrade(Config);
maybe_upgrade(webhook, Config) -> maybe_upgrade(webhook, Config) ->

View File

@ -46,6 +46,7 @@
]). ]).
-export([lookup_from_local_node/2]). -export([lookup_from_local_node/2]).
-export([get_metrics_from_local_node/2]).
-define(BRIDGE_NOT_ENABLED, -define(BRIDGE_NOT_ENABLED,
?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>) ?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>)
@ -467,7 +468,7 @@ schema("/bridges_probe") ->
end; end;
'/bridges'(get, _Params) -> '/bridges'(get, _Params) ->
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
NodeReplies = emqx_bridge_proto_v3:list_bridges_on_nodes(Nodes), NodeReplies = emqx_bridge_proto_v4:list_bridges_on_nodes(Nodes),
case is_ok(NodeReplies) of case is_ok(NodeReplies) of
{ok, NodeBridges} -> {ok, NodeBridges} ->
AllBridges = [ AllBridges = [
@ -524,7 +525,7 @@ schema("/bridges_probe") ->
). ).
'/bridges/:id/metrics'(get, #{bindings := #{id := Id}}) -> '/bridges/:id/metrics'(get, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID(Id, lookup_from_all_nodes_metrics(BridgeType, BridgeName, 200)). ?TRY_PARSE_ID(Id, get_metrics_from_all_nodes(BridgeType, BridgeName)).
'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) -> '/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(
@ -564,19 +565,21 @@ maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeN
maybe_deobfuscate_bridge_probe(Params) -> maybe_deobfuscate_bridge_probe(Params) ->
Params. Params.
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> get_metrics_from_all_nodes(BridgeType, BridgeName) ->
FormatFun = fun format_bridge_info/1,
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) ->
FormatFun = fun format_bridge_metrics/1,
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun) ->
Nodes = mria:running_nodes(), Nodes = mria:running_nodes(),
case is_ok(emqx_bridge_proto_v3:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of Result = do_bpapi_call(all, get_metrics_from_all_nodes, [Nodes, BridgeType, BridgeName]),
case Result of
Metrics when is_list(Metrics) ->
{200, format_bridge_metrics(lists:zip(Nodes, Metrics))};
{error, Reason} ->
?INTERNAL_ERROR(Reason)
end.
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
Nodes = mria:running_nodes(),
case is_ok(emqx_bridge_proto_v4:lookup_from_all_nodes(Nodes, BridgeType, BridgeName)) of
{ok, [{ok, _} | _] = Results} -> {ok, [{ok, _} | _] = Results} ->
{SuccCode, FormatFun([R || {ok, R} <- Results])}; {SuccCode, format_bridge_info([R || {ok, R} <- Results])};
{ok, [{error, not_found} | _]} -> {ok, [{error, not_found} | _]} ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName); ?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
{error, Reason} -> {error, Reason} ->
@ -603,6 +606,9 @@ create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
?BAD_REQUEST(map_to_json(Reason)) ?BAD_REQUEST(map_to_json(Reason))
end. end.
get_metrics_from_local_node(BridgeType, BridgeName) ->
format_metrics(emqx_bridge:get_metrics(BridgeType, BridgeName)).
'/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) -> '/bridges/:id/enable/:enable'(put, #{bindings := #{id := Id, enable := Enable}}) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(
Id, Id,
@ -739,7 +745,7 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) ->
). ).
format_bridge_info([FirstBridge | _] = Bridges) -> format_bridge_info([FirstBridge | _] = Bridges) ->
Res = maps:without([node, metrics], FirstBridge), Res = maps:remove(node, FirstBridge),
NodeStatus = node_status(Bridges), NodeStatus = node_status(Bridges),
redact(Res#{ redact(Res#{
status => aggregate_status(NodeStatus), status => aggregate_status(NodeStatus),
@ -766,7 +772,7 @@ aggregate_status(AllStatus) ->
end. end.
collect_metrics(Bridges) -> collect_metrics(Bridges) ->
[maps:with([node, metrics], B) || B <- Bridges]. [#{node => Node, metrics => Metrics} || {Node, Metrics} <- Bridges].
aggregate_metrics(AllMetrics) -> aggregate_metrics(AllMetrics) ->
InitMetrics = ?EMPTY_METRICS, InitMetrics = ?EMPTY_METRICS,
@ -800,9 +806,7 @@ aggregate_metrics(
M15 + N15, M15 + N15,
M16 + N16, M16 + N16,
M17 + N17 M17 + N17
); ).
aggregate_metrics(#{}, Metrics) ->
Metrics.
format_resource( format_resource(
#{ #{
@ -826,15 +830,16 @@ format_resource(
). ).
format_resource_data(ResData) -> format_resource_data(ResData) ->
maps:fold(fun format_resource_data/3, #{}, maps:with([status, metrics, error], ResData)). maps:fold(fun format_resource_data/3, #{}, maps:with([status, error], ResData)).
format_resource_data(error, undefined, Result) -> format_resource_data(error, undefined, Result) ->
Result; Result;
format_resource_data(error, Error, Result) -> format_resource_data(error, Error, Result) ->
Result#{status_reason => emqx_misc:readable_error_msg(Error)}; Result#{status_reason => emqx_misc:readable_error_msg(Error)};
format_resource_data( format_resource_data(K, V, Result) ->
metrics, Result#{K => V}.
#{
format_metrics(#{
counters := #{ counters := #{
'dropped' := Dropped, 'dropped' := Dropped,
'dropped.other' := DroppedOther, 'dropped.other' := DroppedOther,
@ -853,13 +858,9 @@ format_resource_data(
rate := #{ rate := #{
matched := #{current := Rate, last5m := Rate5m, max := RateMax} matched := #{current := Rate, last5m := Rate5m, max := RateMax}
} }
}, }) ->
Result
) ->
Queued = maps:get('queuing', Gauges, 0), Queued = maps:get('queuing', Gauges, 0),
SentInflight = maps:get('inflight', Gauges, 0), SentInflight = maps:get('inflight', Gauges, 0),
Result#{
metrics =>
?METRICS( ?METRICS(
Dropped, Dropped,
DroppedOther, DroppedOther,
@ -878,10 +879,7 @@ format_resource_data(
Rate5m, Rate5m,
RateMax, RateMax,
Rcvd Rcvd
) ).
};
format_resource_data(K, V, Result) ->
Result#{K => V}.
fill_defaults(Type, RawConf) -> fill_defaults(Type, RawConf) ->
PackedConf = pack_bridge_conf(Type, RawConf), PackedConf = pack_bridge_conf(Type, RawConf),
@ -990,7 +988,7 @@ do_bpapi_call(Node, Call, Args) ->
do_bpapi_call_vsn(SupportedVersion, Call, Args) -> do_bpapi_call_vsn(SupportedVersion, Call, Args) ->
case lists:member(SupportedVersion, supported_versions(Call)) of case lists:member(SupportedVersion, supported_versions(Call)) of
true -> true ->
apply(emqx_bridge_proto_v3, Call, Args); apply(emqx_bridge_proto_v4, Call, Args);
false -> false ->
{error, not_implemented} {error, not_implemented}
end. end.
@ -1000,9 +998,10 @@ maybe_unwrap({error, not_implemented}) ->
maybe_unwrap(RpcMulticallResult) -> maybe_unwrap(RpcMulticallResult) ->
emqx_rpc:unwrap_erpc(RpcMulticallResult). emqx_rpc:unwrap_erpc(RpcMulticallResult).
supported_versions(start_bridge_to_node) -> [2, 3]; supported_versions(start_bridge_to_node) -> [2, 3, 4];
supported_versions(start_bridges_to_all_nodes) -> [2, 3]; supported_versions(start_bridges_to_all_nodes) -> [2, 3, 4];
supported_versions(_Call) -> [1, 2, 3]. supported_versions(get_metrics_from_all_nodes) -> [4];
supported_versions(_Call) -> [1, 2, 3, 4].
redact(Term) -> redact(Term) ->
emqx_misc:redact(Term). emqx_misc:redact(Term).

View File

@ -20,6 +20,7 @@
-export([ -export([
introduced_in/0, introduced_in/0,
deprecated_since/0,
list_bridges/1, list_bridges/1,
list_bridges_on_nodes/1, list_bridges_on_nodes/1,
@ -39,6 +40,9 @@
introduced_in() -> introduced_in() ->
"5.0.21". "5.0.21".
deprecated_since() ->
"5.0.22".
-spec list_bridges(node()) -> list() | emqx_rpc:badrpc(). -spec list_bridges(node()) -> list() | emqx_rpc:badrpc().
list_bridges(Node) -> list_bridges(Node) ->
rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT). rpc:call(Node, emqx_bridge, list, [], ?TIMEOUT).

View File

@ -0,0 +1,135 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_proto_v4).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
list_bridges_on_nodes/1,
restart_bridge_to_node/3,
start_bridge_to_node/3,
stop_bridge_to_node/3,
lookup_from_all_nodes/3,
get_metrics_from_all_nodes/3,
restart_bridges_to_all_nodes/3,
start_bridges_to_all_nodes/3,
stop_bridges_to_all_nodes/3
]).
-include_lib("emqx/include/bpapi.hrl").
-define(TIMEOUT, 15000).
introduced_in() ->
"5.0.22".
-spec list_bridges_on_nodes([node()]) ->
emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
list_bridges_on_nodes(Nodes) ->
erpc:multicall(Nodes, emqx_bridge, list, [], ?TIMEOUT).
-type key() :: atom() | binary() | [byte()].
-spec restart_bridge_to_node(node(), key(), key()) ->
term().
restart_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridge_to_node(node(), key(), key()) ->
term().
start_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridge_to_node(node(), key(), key()) ->
term().
stop_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
restart,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec start_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
start_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_resource,
stop,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec lookup_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_api,
lookup_from_local_node,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec get_metrics_from_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall(emqx_metrics_worker:metrics()).
get_metrics_from_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_api,
get_metrics_from_local_node,
[BridgeType, BridgeName],
?TIMEOUT
).

View File

@ -1201,13 +1201,16 @@ t_metrics(Config) ->
request_json(get, uri(["bridges", BridgeID, "metrics"]), Config) request_json(get, uri(["bridges", BridgeID, "metrics"]), Config)
), ),
%% check that metrics isn't returned when listing all bridges %% check for absence of metrics when listing all bridges
{ok, 200, Bridges} = request_json(get, uri(["bridges"]), Config), {ok, 200, Bridges} = request_json(get, uri(["bridges"]), Config),
?assert( ?assertNotMatch(
lists:all( [
fun(E) -> not maps:is_key(<<"metrics">>, E) end, #{
<<"metrics">> := #{},
<<"node_metrics">> := [_ | _]
}
],
Bridges Bridges
)
), ),
ok. ok.

View File

@ -103,6 +103,7 @@
list_instances_verbose/0, list_instances_verbose/0,
%% return the data of the instance %% return the data of the instance
get_instance/1, get_instance/1,
get_metrics/1,
fetch_creation_opts/1, fetch_creation_opts/1,
%% return all the instances of the same resource type %% return all the instances of the same resource type
list_instances_by_type/1, list_instances_by_type/1,
@ -311,7 +312,12 @@ set_resource_status_connecting(ResId) ->
-spec get_instance(resource_id()) -> -spec get_instance(resource_id()) ->
{ok, resource_group(), resource_data()} | {error, Reason :: term()}. {ok, resource_group(), resource_data()} | {error, Reason :: term()}.
get_instance(ResId) -> get_instance(ResId) ->
emqx_resource_manager:lookup_cached(ResId, [metrics]). emqx_resource_manager:lookup_cached(ResId).
-spec get_metrics(resource_id()) ->
emqx_metrics_worker:metrics().
get_metrics(ResId) ->
emqx_resource_manager:get_metrics(ResId).
-spec fetch_creation_opts(map()) -> creation_opts(). -spec fetch_creation_opts(map()) -> creation_opts().
fetch_creation_opts(Opts) -> fetch_creation_opts(Opts) ->

View File

@ -37,7 +37,6 @@
list_all/0, list_all/0,
list_group/1, list_group/1,
lookup_cached/1, lookup_cached/1,
lookup_cached/2,
get_metrics/1, get_metrics/1,
reset_metrics/1 reset_metrics/1
]). ]).
@ -231,25 +230,14 @@ set_resource_status_connecting(ResId) ->
-spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -spec lookup(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
lookup(ResId) -> lookup(ResId) ->
case safe_call(ResId, lookup, ?T_LOOKUP) of case safe_call(ResId, lookup, ?T_LOOKUP) of
{error, timeout} -> lookup_cached(ResId, [metrics]); {error, timeout} -> lookup_cached(ResId);
Result -> Result Result -> Result
end. end.
%% @doc Lookup the group and data of a resource from the cache %% @doc Lookup the group and data of a resource from the cache
-spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}. -spec lookup_cached(resource_id()) -> {ok, resource_group(), resource_data()} | {error, not_found}.
lookup_cached(ResId) -> lookup_cached(ResId) ->
lookup_cached(ResId, []).
%% @doc Lookup the group and data of a resource from the cache
-spec lookup_cached(resource_id(), [Option]) ->
{ok, resource_group(), resource_data()} | {error, not_found}
when
Option :: metrics.
lookup_cached(ResId, Options) ->
NeedMetrics = lists:member(metrics, Options),
case read_cache(ResId) of case read_cache(ResId) of
{Group, Data} when NeedMetrics ->
{ok, Group, data_record_to_external_map_with_metrics(Data)};
{Group, Data} -> {Group, Data} ->
{ok, Group, data_record_to_external_map(Data)}; {ok, Group, data_record_to_external_map(Data)};
not_found -> not_found ->
@ -366,7 +354,7 @@ handle_event({call, From}, {remove, ClearMetrics}, _State, Data) ->
handle_remove_event(From, ClearMetrics, Data); handle_remove_event(From, ClearMetrics, Data);
% Called when the state-data of the resource is being looked up. % Called when the state-data of the resource is being looked up.
handle_event({call, From}, lookup, _State, #data{group = Group} = Data) -> handle_event({call, From}, lookup, _State, #data{group = Group} = Data) ->
Reply = {ok, Group, data_record_to_external_map_with_metrics(Data)}, Reply = {ok, Group, data_record_to_external_map(Data)},
{keep_state_and_data, [{reply, From, Reply}]}; {keep_state_and_data, [{reply, From, Reply}]};
% Called when doing a manually health check. % Called when doing a manually health check.
handle_event({call, From}, health_check, stopped, _Data) -> handle_event({call, From}, health_check, stopped, _Data) ->

View File

@ -349,7 +349,7 @@ t_query_counter_async_query(_) ->
?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
end end
), ),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), #{counters := C} = emqx_resource:get_metrics(?ID),
?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C), ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
ok = emqx_resource:remove_local(?ID). ok = emqx_resource:remove_local(?ID).
@ -402,7 +402,7 @@ t_query_counter_async_callback(_) ->
?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace) ?assertMatch([#{query := {query, _, get_counter, _, _}}], QueryTrace)
end end
), ),
{ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), #{counters := C} = emqx_resource:get_metrics(?ID),
?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C), ?assertMatch(#{matched := 1002, 'success' := 1002, 'failed' := 0}, C),
?assertMatch(1000, ets:info(Tab0, size)), ?assertMatch(1000, ets:info(Tab0, size)),
?assert( ?assert(
@ -2702,7 +2702,7 @@ config() ->
Config. Config.
tap_metrics(Line) -> tap_metrics(Line) ->
{ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID), #{counters := C, gauges := G} = emqx_resource:get_metrics(?ID),
ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]), ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]),
#{counters => C, gauges => G}. #{counters => C, gauges => G}.