Merge remote-tracking branch 'origin/release-55' into sync-release-55

This commit is contained in:
Zaiming (Stone) Shi 2024-01-26 16:51:13 +01:00
commit 2a40152721
12 changed files with 191 additions and 143 deletions

View File

@ -21,7 +21,7 @@ endif
# Dashboard version # Dashboard version
# from https://github.com/emqx/emqx-dashboard5 # from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.6.1 export EMQX_DASHBOARD_VERSION ?= v1.6.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.8 export EMQX_EE_DASHBOARD_VERSION ?= e1.5.0-beta.10
PROFILE ?= emqx PROFILE ?= emqx
REL_PROFILES := emqx emqx-enterprise REL_PROFILES := emqx emqx-enterprise

View File

@ -24,7 +24,7 @@ IsQuicSupp = fun() ->
end, end,
Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}, Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.312"}}}. Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.313"}}}.
Dialyzer = fun(Config) -> Dialyzer = fun(Config) ->
{dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config), {dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),

View File

@ -182,11 +182,9 @@ current_conns(Type, Name, ListenOn) when Type == tcp; Type == ssl ->
esockd:get_current_connections({listener_id(Type, Name), ListenOn}); esockd:get_current_connections({listener_id(Type, Name), ListenOn});
current_conns(Type, Name, _ListenOn) when Type =:= ws; Type =:= wss -> current_conns(Type, Name, _ListenOn) when Type =:= ws; Type =:= wss ->
proplists:get_value(all_connections, ranch:info(listener_id(Type, Name))); proplists:get_value(all_connections, ranch:info(listener_id(Type, Name)));
current_conns(quic, _Name, _ListenOn) -> current_conns(quic, Name, _ListenOn) ->
case quicer:perf_counters() of {ok, LPid} = quicer:listener(listener_id(quic, Name)),
{ok, PerfCnts} -> proplists:get_value(conn_active, PerfCnts); quicer_listener:count_conns(LPid);
_ -> 0
end;
current_conns(_, _, _) -> current_conns(_, _, _) ->
{error, not_support}. {error, not_support}.

View File

@ -94,7 +94,7 @@ fields(action_parameters) ->
array(ref(?MODULE, action_parameters_data)), array(ref(?MODULE, action_parameters_data)),
#{ #{
desc => ?DESC("action_parameters_data"), desc => ?DESC("action_parameters_data"),
required => true default => []
} }
)} )}
] ++ ] ++

View File

@ -1107,8 +1107,7 @@ tr_prometheus_collectors(Conf) ->
%% emqx collectors %% emqx collectors
emqx_prometheus, emqx_prometheus,
{'/prometheus/auth', emqx_prometheus_auth}, {'/prometheus/auth', emqx_prometheus_auth},
{'/prometheus/data_integration', emqx_prometheus_data_integration}, {'/prometheus/data_integration', emqx_prometheus_data_integration}
emqx_prometheus_mria
%% builtin vm collectors %% builtin vm collectors
| prometheus_collectors(Conf) | prometheus_collectors(Conf)
]. ].

View File

@ -82,6 +82,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(MG(K, MAP), maps:get(K, MAP)). -define(MG(K, MAP), maps:get(K, MAP)).
-define(MG(K, MAP, DEFAULT), maps:get(K, MAP, DEFAULT)).
-define(MG0(K, MAP), maps:get(K, MAP, 0)). -define(MG0(K, MAP), maps:get(K, MAP, 0)).
-define(C(K, L), proplists:get_value(K, L, 0)). -define(C(K, L), proplists:get_value(K, L, 0)).
@ -210,6 +211,7 @@ collect_mf(?PROMETHEUS_DEFAULT_REGISTRY, Callback) ->
ok = add_collect_family(Callback, authn_metric_meta(), ?MG(emqx_authn_data, RawData)), ok = add_collect_family(Callback, authn_metric_meta(), ?MG(emqx_authn_data, RawData)),
ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)), ok = add_collect_family(Callback, cert_metric_meta(), ?MG(cert_data, RawData)),
ok = add_collect_family(Callback, mria_metric_meta(), ?MG(mria_data, RawData)),
ok = maybe_license_add_collect_family(Callback, RawData), ok = maybe_license_add_collect_family(Callback, RawData),
ok; ok;
collect_mf(_Registry, _Callback) -> collect_mf(_Registry, _Callback) ->
@ -219,7 +221,9 @@ collect_mf(_Registry, _Callback) ->
collect(<<"json">>) -> collect(<<"json">>) ->
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
(maybe_license_collect_json_data(RawData))#{ (maybe_license_collect_json_data(RawData))#{
stats => collect_stats_json_data(RawData), stats => collect_stats_json_data(
?MG(stats_data, RawData), ?MG(stats_data_cluster_consistented, RawData)
),
metrics => collect_vm_json_data(?MG(vm_data, RawData)), metrics => collect_vm_json_data(?MG(vm_data, RawData)),
packets => collect_json_data(?MG(emqx_packet_data, RawData)), packets => collect_json_data(?MG(emqx_packet_data, RawData)),
messages => collect_json_data(?MG(emqx_message_data, RawData)), messages => collect_json_data(?MG(emqx_message_data, RawData)),
@ -259,7 +263,8 @@ fetch_from_local_node(Mode) ->
emqx_session_data => emqx_metric_data(session_metric_meta(), Mode), emqx_session_data => emqx_metric_data(session_metric_meta(), Mode),
emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode), emqx_olp_data => emqx_metric_data(olp_metric_meta(), Mode),
emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode), emqx_acl_data => emqx_metric_data(acl_metric_meta(), Mode),
emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode) emqx_authn_data => emqx_metric_data(authn_metric_meta(), Mode),
mria_data => mria_data(Mode)
}}. }}.
fetch_cluster_consistented_data() -> fetch_cluster_consistented_data() ->
@ -280,7 +285,8 @@ aggre_or_zip_init_acc() ->
emqx_session_data => maps:from_keys(metrics_name(session_metric_meta()), []), emqx_session_data => maps:from_keys(metrics_name(session_metric_meta()), []),
emqx_olp_data => maps:from_keys(metrics_name(olp_metric_meta()), []), emqx_olp_data => maps:from_keys(metrics_name(olp_metric_meta()), []),
emqx_acl_data => maps:from_keys(metrics_name(acl_metric_meta()), []), emqx_acl_data => maps:from_keys(metrics_name(acl_metric_meta()), []),
emqx_authn_data => maps:from_keys(metrics_name(authn_metric_meta()), []) emqx_authn_data => maps:from_keys(metrics_name(authn_metric_meta()), []),
mria_data => maps:from_keys(metrics_name(mria_metric_meta()), [])
}. }.
logic_sum_metrics() -> logic_sum_metrics() ->
@ -460,7 +466,19 @@ emqx_collect(K = emqx_authentication_failure, D) -> counter_metrics(?MG(K, D));
emqx_collect(K = emqx_license_expiry_at, D) -> gauge_metric(?MG(K, D)); emqx_collect(K = emqx_license_expiry_at, D) -> gauge_metric(?MG(K, D));
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Certs %% Certs
emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D)). emqx_collect(K = emqx_cert_expiry_at, D) -> gauge_metrics(?MG(K, D));
%% Mria
%% ========== core
emqx_collect(K = emqx_mria_last_intercepted_trans, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = emqx_mria_weight, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = emqx_mria_replicants, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = emqx_mria_server_mql, D) -> gauge_metrics(?MG(K, D, []));
%% ========== replicant
emqx_collect(K = emqx_mria_lag, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = emqx_mria_bootstrap_time, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = emqx_mria_bootstrap_num_keys, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = emqx_mria_message_queue_len, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = emqx_mria_replayq_len, D) -> gauge_metrics(?MG(K, D, [])).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Indicators %% Indicators
@ -889,6 +907,80 @@ utc_time_to_datetime(Str) ->
date_to_expiry_epoch(DateTime) -> date_to_expiry_epoch(DateTime) ->
calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START. calendar:datetime_to_gregorian_seconds(DateTime) - ?EPOCH_START.
%%========================================
%% Mria
%%========================================
mria_metric_meta() ->
mria_metric_meta(core) ++ mria_metric_meta(replicant).
mria_metric_meta(core) ->
[
{emqx_mria_last_intercepted_trans, gauge, last_intercepted_trans},
{emqx_mria_weight, gauge, weight},
{emqx_mria_replicants, gauge, replicants},
{emqx_mria_server_mql, gauge, server_mql}
];
mria_metric_meta(replicant) ->
[
{emqx_mria_lag, gauge, lag},
{emqx_mria_bootstrap_time, gauge, bootstrap_time},
{emqx_mria_bootstrap_num_keys, gauge, bootstrap_num_keys},
{emqx_mria_message_queue_len, gauge, message_queue_len},
{emqx_mria_replayq_len, gauge, replayq_len}
].
mria_data(Mode) ->
case mria_rlog:backend() of
rlog ->
mria_data(mria_rlog:role(), Mode);
mnesia ->
#{}
end.
mria_data(Role, Mode) ->
lists:foldl(
fun({Name, _Type, MetricK}, AccIn) ->
%% TODO: only report shards that are up
DataFun = fun() -> get_shard_metrics(Mode, MetricK) end,
AccIn#{
Name => catch_all(DataFun)
}
end,
#{},
mria_metric_meta(Role)
).
get_shard_metrics(Mode, MetricK) ->
Labels =
case Mode of
node ->
[];
_ ->
[{node, node(self())}]
end,
[
{[{shard, Shard} | Labels], get_shard_metric(MetricK, Shard)}
|| Shard <- mria_schema:shards(), Shard =/= undefined
].
get_shard_metric(replicants, Shard) ->
length(mria_status:agents(Shard));
get_shard_metric(Metric, Shard) ->
case mria_status:get_shard_stats(Shard) of
#{Metric := Value} when is_number(Value) ->
Value;
_ ->
undefined
end.
catch_all(DataFun) ->
try
DataFun()
catch
_:_ -> undefined
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Collect functions %% Collect functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -896,11 +988,15 @@ date_to_expiry_epoch(DateTime) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `application/json` %% merge / zip formatting funcs for type `application/json`
collect_stats_json_data(RawData) -> collect_stats_json_data(StatsData, StatsClData) ->
StatsData = ?MG(stats_data, RawData), StatsDatas = collect_json_data_(StatsData),
StatsClData = ?MG(stats_data_cluster_consistented, RawData), CLData = hd(collect_json_data_(StatsClData)),
D = maps:merge(StatsData, StatsClData), lists:map(
collect_json_data(D). fun(NodeData) ->
maps:merge(NodeData, CLData)
end,
StatsDatas
).
%% always return json array %% always return json array
collect_cert_json_data(Data) -> collect_cert_json_data(Data) ->

View File

@ -24,7 +24,6 @@
collect_json_data/2, collect_json_data/2,
aggre_cluster/3, aggre_cluster/3,
%% with_node_name_label/2,
point_to_map_fun/1, point_to_map_fun/1,

View File

@ -73,20 +73,23 @@
fetch_from_local_node(Mode) -> fetch_from_local_node(Mode) ->
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS), BridgesV1 = emqx:get_config([bridges], #{}),
BridgeV2Actions = emqx_bridge_v2:list(?ROOT_KEY_ACTIONS),
Connectors = emqx_connector:list(), Connectors = emqx_connector:list(),
{node(self()), #{ {node(self()), #{
rule_metric_data => rule_metric_data(Mode, Rules), rule_metric_data => rule_metric_data(Mode, Rules),
action_metric_data => action_metric_data(Mode, Bridges), action_metric_data => action_metric_data(Mode, BridgeV2Actions),
connector_metric_data => connector_metric_data(Mode, Connectors) connector_metric_data => connector_metric_data(Mode, BridgesV1, Connectors)
}}. }}.
fetch_cluster_consistented_data() -> fetch_cluster_consistented_data() ->
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
%% for bridge v1
BridgesV1 = emqx:get_config([bridges], #{}),
Connectors = emqx_connector:list(), Connectors = emqx_connector:list(),
(maybe_collect_schema_registry())#{ (maybe_collect_schema_registry())#{
rules_ov_data => rules_ov_data(Rules), rules_ov_data => rules_ov_data(Rules),
connectors_ov_data => connectors_ov_data(Connectors) connectors_ov_data => connectors_ov_data(BridgesV1, Connectors)
}. }.
aggre_or_zip_init_acc() -> aggre_or_zip_init_acc() ->
@ -144,9 +147,13 @@ collect_mf(_, _) ->
collect(<<"json">>) -> collect(<<"json">>) ->
RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()), RawData = emqx_prometheus_cluster:raw_data(?MODULE, ?GET_PROM_DATA_MODE()),
Rules = emqx_rule_engine:get_rules(), Rules = emqx_rule_engine:get_rules(),
Bridges = emqx_bridge:list(), Connectors = emqx_connector:list(),
%% for bridge v1
BridgesV1 = emqx:get_config([bridges], #{}),
#{ #{
data_integration_overview => collect_data_integration_overview(Rules, Bridges), data_integration_overview => collect_data_integration_overview(
Rules, BridgesV1, Connectors
),
rules => collect_json_data(?MG(rule_metric_data, RawData)), rules => collect_json_data(?MG(rule_metric_data, RawData)),
actions => collect_json_data(?MG(action_metric_data, RawData)), actions => collect_json_data(?MG(action_metric_data, RawData)),
connectors => collect_json_data(?MG(connector_metric_data, RawData)) connectors => collect_json_data(?MG(connector_metric_data, RawData))
@ -205,6 +212,8 @@ collect_di(K = emqx_rule_actions_failed_out_of_service, Data) -> counter_metrics
collect_di(K = emqx_rule_actions_failed_unknown, Data) -> counter_metrics(?MG(K, Data)); collect_di(K = emqx_rule_actions_failed_unknown, Data) -> counter_metrics(?MG(K, Data));
%%==================== %%====================
%% Action Metric %% Action Metric
collect_di(K = emqx_action_enable, Data) -> gauge_metrics(?MG(K, Data));
collect_di(K = emqx_action_status, Data) -> gauge_metrics(?MG(K, Data));
collect_di(K = emqx_action_matched, Data) -> counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_matched, Data) -> counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_dropped, Data) -> counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_dropped, Data) -> counter_metrics(?MG(K, Data));
collect_di(K = emqx_action_success, Data) -> counter_metrics(?MG(K, Data)); collect_di(K = emqx_action_success, Data) -> counter_metrics(?MG(K, Data));
@ -296,10 +305,17 @@ connectors_ov_metric_meta() ->
connectors_ov_metric(names) -> connectors_ov_metric(names) ->
emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()). emqx_prometheus_cluster:metric_names(connectors_ov_metric_meta()).
connectors_ov_data(Connectors) -> connectors_ov_data(BridgesV1, Connectors) ->
#{
%% Both Bridge V1 and V2 %% Both Bridge V1 and V2
emqx_connectors_count => erlang:length(Connectors) V1ConnectorsCnt = maps:fold(
fun(_Type, NameAndConf, AccIn) ->
AccIn + maps:size(NameAndConf)
end,
0,
BridgesV1
),
#{
emqx_connectors_count => erlang:length(Connectors) + V1ConnectorsCnt
}. }.
%%======================================== %%========================================
@ -375,6 +391,8 @@ get_metric(#{id := Id, enable := Bool} = _Rule) ->
action_metric_meta() -> action_metric_meta() ->
[ [
{emqx_action_enable, gauge},
{emqx_action_status, gauge},
{emqx_action_matched, counter}, {emqx_action_matched, counter},
{emqx_action_dropped, counter}, {emqx_action_dropped, counter},
{emqx_action_success, counter}, {emqx_action_success, counter},
@ -398,9 +416,11 @@ action_metric(names) ->
action_metric_data(Mode, Bridges) -> action_metric_data(Mode, Bridges) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name} = _Bridge, AccIn) -> fun(#{type := Type, name := Name} = Action, AccIn) ->
Id = emqx_bridge_resource:bridge_id(Type, Name), Id = emqx_bridge_resource:bridge_id(Type, Name),
merge_acc_with_bridges(Mode, Id, get_bridge_metric(Type, Name), AccIn) Status = get_action_status(Action),
Metrics = get_action_metric(Type, Name),
merge_acc_with_bridges(Mode, Id, maps:merge(Status, Metrics), AccIn)
end, end,
maps:from_keys(action_metric(names), []), maps:from_keys(action_metric(names), []),
Bridges Bridges
@ -415,10 +435,18 @@ merge_acc_with_bridges(Mode, Id, BridgeMetrics, PointsAcc) ->
BridgeMetrics BridgeMetrics
). ).
get_action_status(#{resource_data := ResourceData} = _Action) ->
Enable = emqx_utils_maps:deep_get([config, enable], ResourceData),
Status = ?MG(status, ResourceData),
#{
emqx_action_enable => emqx_prometheus_cluster:boolean_to_number(Enable),
emqx_action_status => emqx_prometheus_cluster:status_to_number(Status)
}.
action_point(Mode, Id, V) -> action_point(Mode, Id, V) ->
{with_node_label(Mode, [{id, Id}]), V}. {with_node_label(Mode, [{id, Id}]), V}.
get_bridge_metric(Type, Name) -> get_action_metric(Type, Name) ->
#{counters := Counters, gauges := Gauges} = emqx_bridge_v2:get_metrics(Type, Name), #{counters := Counters, gauges := Gauges} = emqx_bridge_v2:get_metrics(Type, Name),
#{ #{
emqx_action_matched => ?MG0(matched, Counters), emqx_action_matched => ?MG0(matched, Counters),
@ -452,16 +480,44 @@ connector_metric_meta() ->
connectr_metric(names) -> connectr_metric(names) ->
emqx_prometheus_cluster:metric_names(connector_metric_meta()). emqx_prometheus_cluster:metric_names(connector_metric_meta()).
connector_metric_data(Mode, Connectors) -> connector_metric_data(Mode, BridgesV1, Connectors) ->
AccIn = maps:from_keys(connectr_metric(names), []),
Acc0 = connector_metric_data_v1(Mode, BridgesV1, AccIn),
_AccOut = connector_metric_data_v2(Mode, Connectors, Acc0).
connector_metric_data_v2(Mode, Connectors, InitAcc) ->
lists:foldl( lists:foldl(
fun(#{type := Type, name := Name} = Connector, AccIn) -> fun(#{type := Type, name := Name, resource_data := ResourceData} = _Connector, AccIn) ->
Id = emqx_connector_resource:connector_id(Type, Name), Id = emqx_connector_resource:connector_id(Type, Name),
merge_acc_with_connectors(Mode, Id, get_connector_status(Connector), AccIn) merge_acc_with_connectors(Mode, Id, get_connector_status(ResourceData), AccIn)
end, end,
maps:from_keys(connectr_metric(names), []), InitAcc,
Connectors Connectors
). ).
connector_metric_data_v1(Mode, BridgesV1, InitAcc) ->
maps:fold(
fun(Type, NameAndConfMap, Acc0) ->
maps:fold(
fun(Name, _Conf, Acc1) ->
BridgeV1Id = emqx_bridge_resource:resource_id(Type, Name),
case emqx_resource:get_instance(BridgeV1Id) of
{error, not_found} ->
Acc1;
{ok, _, ResourceData} ->
merge_acc_with_connectors(
Mode, BridgeV1Id, get_connector_status(ResourceData), Acc1
)
end
end,
Acc0,
NameAndConfMap
)
end,
InitAcc,
BridgesV1
).
merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) -> merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
maps:fold( maps:fold(
fun(K, V, AccIn) -> fun(K, V, AccIn) ->
@ -474,7 +530,7 @@ merge_acc_with_connectors(Mode, Id, ConnectorMetrics, PointsAcc) ->
connector_point(Mode, Id, V) -> connector_point(Mode, Id, V) ->
{with_node_label(Mode, [{id, Id}]), V}. {with_node_label(Mode, [{id, Id}]), V}.
get_connector_status(#{resource_data := ResourceData} = _Connector) -> get_connector_status(ResourceData) ->
Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData), Enabled = emqx_utils_maps:deep_get([config, enable], ResourceData),
Status = ?MG(status, ResourceData), Status = ?MG(status, ResourceData),
#{ #{
@ -488,9 +544,9 @@ get_connector_status(#{resource_data := ResourceData} = _Connector) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% merge / zip formatting funcs for type `application/json` %% merge / zip formatting funcs for type `application/json`
collect_data_integration_overview(Rules, Bridges) -> collect_data_integration_overview(Rules, BridgesV1, Connectors) ->
RulesD = rules_ov_data(Rules), RulesD = rules_ov_data(Rules),
ConnectorsD = connectors_ov_data(Bridges), ConnectorsD = connectors_ov_data(BridgesV1, Connectors),
M1 = lists:foldl( M1 = lists:foldl(
fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end, fun(K, AccIn) -> AccIn#{K => ?MG(K, RulesD)} end,

View File

@ -1,102 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_prometheus_mria).
-export([
deregister_cleanup/1,
collect_mf/2
]).
-include_lib("prometheus/include/prometheus.hrl").
%% Please don't remove this attribute, prometheus uses it to
%% automatically register collectors.
-behaviour(prometheus_collector).
%%====================================================================
%% Macros
%%====================================================================
-define(METRIC_NAME_PREFIX, "emqx_mria_").
%%====================================================================
%% Collector API
%%====================================================================
%% @private
deregister_cleanup(_) -> ok.
%% @private
-spec collect_mf(_Registry, Callback) -> ok when
_Registry :: prometheus_registry:registry(),
Callback :: prometheus_collector:collect_mf_callback().
collect_mf(_Registry, Callback) ->
case mria_rlog:backend() of
rlog ->
Metrics = metrics(),
_ = [add_metric_family(Metric, Callback) || Metric <- Metrics],
ok;
mnesia ->
ok
end.
add_metric_family({Name, Metrics}, Callback) ->
Callback(
prometheus_model_helpers:create_mf(
?METRIC_NAME(Name),
<<"">>,
gauge,
catch_all(Metrics)
)
).
%%====================================================================
%% Internal functions
%%====================================================================
metrics() ->
Metrics =
case mria_rlog:role() of
replicant ->
[lag, bootstrap_time, bootstrap_num_keys, message_queue_len, replayq_len];
core ->
[last_intercepted_trans, weight, replicants, server_mql]
end,
[{MetricId, fun() -> get_shard_metric(MetricId) end} || MetricId <- Metrics].
get_shard_metric(Metric) ->
%% TODO: only report shards that are up
[
{[{shard, Shard}], get_shard_metric(Metric, Shard)}
|| Shard <- mria_schema:shards(), Shard =/= undefined
].
get_shard_metric(replicants, Shard) ->
length(mria_status:agents(Shard));
get_shard_metric(Metric, Shard) ->
case mria_status:get_shard_stats(Shard) of
#{Metric := Value} when is_number(Value) ->
Value;
_ ->
undefined
end.
catch_all(DataFun) ->
try
DataFun()
catch
_:_ -> undefined
end.

2
changes/feat-12388.en.md Normal file
View File

@ -0,0 +1,2 @@
QUIC listener now shows per listener connection count instead of global one.

View File

@ -795,7 +795,7 @@ defmodule EMQXUmbrella.MixProject do
defp quicer_dep() do defp quicer_dep() do
if enable_quicer?(), if enable_quicer?(),
# in conflict with emqx and emqtt # in conflict with emqx and emqtt
do: [{:quicer, github: "emqx/quic", tag: "0.0.312", override: true}], do: [{:quicer, github: "emqx/quic", tag: "0.0.313", override: true}],
else: [] else: []
end end

View File

@ -36,7 +36,7 @@ assert_otp() ->
end. end.
quicer() -> quicer() ->
{quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.312"}}}. {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.313"}}}.
jq() -> jq() ->
{jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.12"}}}. {jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.12"}}}.