feat(sessds): Add metrics for the number of persisted messages
This commit is contained in:
parent
75b092bf0e
commit
b9ad241658
|
@ -222,7 +222,9 @@
|
||||||
% Messages delivered
|
% Messages delivered
|
||||||
{counter, 'messages.delivered'},
|
{counter, 'messages.delivered'},
|
||||||
% Messages acked
|
% Messages acked
|
||||||
{counter, 'messages.acked'}
|
{counter, 'messages.acked'},
|
||||||
|
% Messages persistently stored
|
||||||
|
{counter, 'messages.persisted'}
|
||||||
]
|
]
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -718,4 +720,5 @@ reserved_idx('overload_protection.gc') -> 403;
|
||||||
reserved_idx('overload_protection.new_conn') -> 404;
|
reserved_idx('overload_protection.new_conn') -> 404;
|
||||||
reserved_idx('messages.validation_succeeded') -> 405;
|
reserved_idx('messages.validation_succeeded') -> 405;
|
||||||
reserved_idx('messages.validation_failed') -> 406;
|
reserved_idx('messages.validation_failed') -> 406;
|
||||||
|
reserved_idx('messages.persisted') -> 407;
|
||||||
reserved_idx(_) -> undefined.
|
reserved_idx(_) -> undefined.
|
||||||
|
|
|
@ -114,6 +114,7 @@ needs_persistence(Msg) ->
|
||||||
|
|
||||||
-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
|
-spec store_message(emqx_types:message()) -> emqx_ds:store_batch_result().
|
||||||
store_message(Msg) ->
|
store_message(Msg) ->
|
||||||
|
emqx_metrics:inc('messages.persisted'),
|
||||||
emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}).
|
emqx_ds:store_batch(?PERSISTENT_MESSAGE_DB, [Msg], #{sync => false}).
|
||||||
|
|
||||||
has_subscribers(#message{topic = Topic}) ->
|
has_subscribers(#message{topic = Topic}) ->
|
||||||
|
|
|
@ -67,7 +67,8 @@
|
||||||
%, sent_bytes
|
%, sent_bytes
|
||||||
validation_succeeded,
|
validation_succeeded,
|
||||||
validation_failed,
|
validation_failed,
|
||||||
dropped
|
dropped,
|
||||||
|
persisted
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(GAUGE_SAMPLER_LIST, [
|
-define(GAUGE_SAMPLER_LIST, [
|
||||||
|
@ -87,7 +88,8 @@
|
||||||
sent => sent_msg_rate,
|
sent => sent_msg_rate,
|
||||||
validation_succeeded => validation_succeeded_rate,
|
validation_succeeded => validation_succeeded_rate,
|
||||||
validation_failed => validation_failed_rate,
|
validation_failed => validation_failed_rate,
|
||||||
dropped => dropped_msg_rate
|
dropped => dropped_msg_rate,
|
||||||
|
persisted => persisted_rate
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-define(CURRENT_SAMPLE_NON_RATE,
|
-define(CURRENT_SAMPLE_NON_RATE,
|
||||||
|
|
|
@ -428,7 +428,8 @@ stats(sent) -> emqx_metrics:val('messages.sent');
|
||||||
stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
|
stats(sent_bytes) -> emqx_metrics:val('bytes.sent');
|
||||||
stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded');
|
stats(validation_succeeded) -> emqx_metrics:val('messages.validation_succeeded');
|
||||||
stats(validation_failed) -> emqx_metrics:val('messages.validation_failed');
|
stats(validation_failed) -> emqx_metrics:val('messages.validation_failed');
|
||||||
stats(dropped) -> emqx_metrics:val('messages.dropped').
|
stats(dropped) -> emqx_metrics:val('messages.dropped');
|
||||||
|
stats(persisted) -> emqx_metrics:val('messages.persisted').
|
||||||
|
|
||||||
%% -------------------------------------------------------------------------------------------------
|
%% -------------------------------------------------------------------------------------------------
|
||||||
%% Retained && License Quota
|
%% Retained && License Quota
|
||||||
|
|
|
@ -192,6 +192,8 @@ swagger_desc(validation_succeeded) ->
|
||||||
swagger_desc_format("Message validations succeeded ");
|
swagger_desc_format("Message validations succeeded ");
|
||||||
swagger_desc(validation_failed) ->
|
swagger_desc(validation_failed) ->
|
||||||
swagger_desc_format("Message validations failed ");
|
swagger_desc_format("Message validations failed ");
|
||||||
|
swagger_desc(persisted) ->
|
||||||
|
swagger_desc_format("Messages saved to the durable storage ");
|
||||||
swagger_desc(subscriptions) ->
|
swagger_desc(subscriptions) ->
|
||||||
<<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
|
<<"Subscriptions at the time of sampling.", ?APPROXIMATE_DESC>>;
|
||||||
swagger_desc(topics) ->
|
swagger_desc(topics) ->
|
||||||
|
@ -218,6 +220,8 @@ swagger_desc(validation_succeeded_rate) ->
|
||||||
swagger_desc_format("Message validations succeeded ", per);
|
swagger_desc_format("Message validations succeeded ", per);
|
||||||
swagger_desc(validation_failed_rate) ->
|
swagger_desc(validation_failed_rate) ->
|
||||||
swagger_desc_format("Message validations failed ", per);
|
swagger_desc_format("Message validations failed ", per);
|
||||||
|
swagger_desc(persisted_rate) ->
|
||||||
|
swagger_desc_format("Messages saved to the durable storage ", per);
|
||||||
swagger_desc(retained_msg_count) ->
|
swagger_desc(retained_msg_count) ->
|
||||||
<<"Retained messages count at the time of sampling.", ?APPROXIMATE_DESC>>;
|
<<"Retained messages count at the time of sampling.", ?APPROXIMATE_DESC>>;
|
||||||
swagger_desc(shared_subscriptions) ->
|
swagger_desc(shared_subscriptions) ->
|
||||||
|
|
|
@ -89,7 +89,9 @@ init_per_group(new_config, Config) ->
|
||||||
Apps = emqx_cth_suite:start(
|
Apps = emqx_cth_suite:start(
|
||||||
[
|
[
|
||||||
%% coverage olp metrics
|
%% coverage olp metrics
|
||||||
{emqx, "overload_protection.enable = true"},
|
{emqx,
|
||||||
|
"overload_protection.enable = true\n"
|
||||||
|
"session_persistence.enable = true"},
|
||||||
{emqx_license, "license.key = default"},
|
{emqx_license, "license.key = default"},
|
||||||
{emqx_prometheus, #{config => config(default)}}
|
{emqx_prometheus, #{config => config(default)}}
|
||||||
],
|
],
|
||||||
|
|
Loading…
Reference in New Issue