fix(sessds): Handle errors when storing messages

This commit is contained in:
ieQu1 2024-03-31 18:00:48 +02:00
parent f41e538526
commit b379f331de
No known key found for this signature in database
GPG Key ID: 488654DF3FED6FDE
6 changed files with 95 additions and 29 deletions

View File

@ -253,8 +253,12 @@ persist_publish(Msg) ->
case emqx_persistent_message:persist(Msg) of
ok ->
[persisted];
{_SkipOrError, _Reason} ->
% TODO: log errors?
{skipped, _} ->
[];
{error, Recoverable, Reason} ->
?SLOG(debug, #{
msg => "failed_to_persist_message", is_recoverable => Recoverable, reason => Reason
}),
[]
end.

View File

@ -98,7 +98,7 @@ pre_config_update(_Root, _NewConf, _OldConf) ->
%%--------------------------------------------------------------------
-spec persist(emqx_types:message()) ->
ok | {skipped, _Reason} | {error, _TODO}.
emqx_ds:store_batch_result() | {skipped, needs_no_persistence}.
persist(Msg) ->
?WHEN_ENABLED(
case needs_persistence(Msg) andalso has_subscribers(Msg) of

View File

@ -29,7 +29,10 @@
inc_egress_bytes/2,
observe_egress_flush_time/2,
observe_next_time/3
observe_store_batch_time/2,
observe_next_time/2
]).
%% behavior callbacks:
@ -46,7 +49,15 @@
-define(WORKER, ?MODULE).
-define(DB_METRICS, []).
-define(STORAGE_LAYER_METRICS, [
{slide, 'emqx_ds_store_batch_time'}
]).
-define(FETCH_METRICS, [
{slide, 'emqx_ds_builtin_next_time'}
]).
-define(DB_METRICS, ?STORAGE_LAYER_METRICS ++ ?FETCH_METRICS).
-define(EGRESS_METRICS, [
{counter, 'emqx_ds_egress_batches'},
@ -57,14 +68,12 @@
{slide, 'emqx_ds_egress_flush_time'}
]).
-define(INGRESS_METRICS, [
{slide, 'emqx_ds_builtin_next_time'}
]).
-define(SHARD_METRICS, ?EGRESS_METRICS ++ ?INGRESS_METRICS).
-define(SHARD_METRICS, ?EGRESS_METRICS).
-type shard_metrics_id() :: binary().
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
%%================================================================================
%% API functions
%%================================================================================
@ -75,8 +84,8 @@ child_spec() ->
%% @doc Initialize metrics that are global for a DS database
-spec init_for_db(emqx_ds:db()) -> ok.
init_for_db(_DB) ->
ok.
init_for_db(DB) ->
emqx_metrics_worker:create_metrics(?WORKER, DB, ?DB_METRICS, []).
-spec shard_metric_id(emqx_ds:db(), emqx_ds_replication_layer:shard_id()) -> shard_metrics_id().
shard_metric_id(DB, ShardId) ->
@ -90,43 +99,45 @@ init_for_shard(ShardId) ->
%% @doc Increase the number of successfully flushed batches
-spec inc_egress_batches(shard_metrics_id()) -> ok.
inc_egress_batches(Id) ->
emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches').
catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches').
%% @doc Increase the number of time the egress worker had to retry
%% flushing the batch
-spec inc_egress_batches_retry(shard_metrics_id()) -> ok.
inc_egress_batches_retry(Id) ->
emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry').
catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_retry').
%% @doc Increase the number of time the egress worker encountered an
%% unrecoverable error while trying to flush the batch
-spec inc_egress_batches_failed(shard_metrics_id()) -> ok.
inc_egress_batches_failed(Id) ->
emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed').
catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_batches_failed').
%% @doc Increase the number of messages successfully saved to the shard
-spec inc_egress_messages(shard_metrics_id(), non_neg_integer()) -> ok.
inc_egress_messages(Id, NMessages) ->
emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages).
catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_messages', NMessages).
%% @doc Increase the number of messages successfully saved to the shard
-spec inc_egress_bytes(shard_metrics_id(), non_neg_integer()) -> ok.
inc_egress_bytes(Id, NMessages) ->
emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages).
catch emqx_metrics_worker:inc(?WORKER, Id, 'emqx_ds_egress_bytes', NMessages).
%% @doc Add a sample of elapsed time spent flushing the egress to the
%% Raft log (in microseconds)
-spec observe_egress_flush_time(shard_metrics_id(), non_neg_integer()) -> ok.
observe_egress_flush_time(Id, FlushTime) ->
emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime).
catch emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_egress_flush_time', FlushTime).
%% @doc Add a sample of elapsed time spent waiting for a
-spec observe_store_batch_time(emqx_ds_storage_layer:shard_id(), non_neg_integer()) -> ok.
observe_store_batch_time({DB, _}, StoreTime) ->
catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_store_batch_time', StoreTime).
%% @doc Add a sample of elapsed time spent waiting for a batch
%% `emqx_ds_replication_layer:next'
-spec observe_next_time(emqx_ds:db(), emqx_ds_replication_layer:shard_id(), non_neg_integer()) ->
ok.
observe_next_time(DB, Shard, NextTime) ->
Id = shard_metric_id(DB, Shard),
emqx_metrics_worker:observe(?WORKER, Id, 'emqx_ds_builtin_next_time', NextTime).
-spec observe_next_time(emqx_ds:db(), non_neg_integer()) -> ok.
observe_next_time(DB, NextTime) ->
catch emqx_metrics_worker:observe(?WORKER, DB, 'emqx_ds_builtin_next_time', NextTime).
prometheus_meta() ->
lists:map(
@ -136,11 +147,56 @@ prometheus_meta() ->
({slide, A}) ->
{A, counter, A}
end,
?SHARD_METRICS
?DB_METRICS ++ ?SHARD_METRICS
).
prometheus_collect(NodeOrAggr) ->
prometheus_per_shard(NodeOrAggr).
maps:merge(prometheus_per_db(NodeOrAggr), prometheus_per_shard(NodeOrAggr)).
prometheus_per_db(NodeOrAggr) ->
lists:foldl(
fun(DB, Acc) ->
prometheus_per_db(NodeOrAggr, DB, Acc)
end,
#{},
emqx_ds_builtin_db_sup:which_dbs()
).
%% This function returns the data in the following format:
%% ```
%% #{emqx_ds_store_batch_time =>
%% [{[{db, emqx_persistent_message}], 42}],
%% ...
%% '''
%%
%% If `NodeOrAggr' = `node' then node name is appended to the list of
%% labels.
prometheus_per_db(NodeOrAggr, DB, Acc0) ->
Labels = [
{db, DB}
| case NodeOrAggr of
node -> [];
_ -> [{node, node()}]
end
],
#{counters := CC, slides := SS} = emqx_metrics_worker:get_metrics(?WORKER, DB),
%% Collect counters:
Acc1 = maps:fold(
fun(MetricId, Value, Acc1) ->
append_to_key(MetricId, {Labels, Value}, Acc1)
end,
Acc0,
CC
),
%% Collect slides:
maps:fold(
fun(MetricId, Value, Acc2) ->
Acc3 = append_to_key(MetricId, slide_value(current, Value, Labels), Acc2),
append_to_key(MetricId, slide_value(last5m, Value, Labels), Acc3)
end,
Acc1,
SS
).
%% This function returns the data in the following format:
%% ```

View File

@ -332,7 +332,7 @@ next(DB, Iter0, BatchSize) ->
T0 = erlang:monotonic_time(microsecond),
Result = ra_next(DB, Shard, StorageIter0, BatchSize),
T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_next_time(DB, Shard, T1 - T0),
emqx_ds_builtin_metrics:observe_next_time(DB, T1 - T0),
case Result of
{ok, StorageIter, Batch} ->
Iter = Iter0#{?enc := StorageIter},

View File

@ -258,7 +258,11 @@ store_batch(Shard, Messages = [{Time, _Msg} | _], Options) ->
shard => Shard, messages => Messages, options => Options
}),
#{module := Mod, data := GenData} = generation_at(Shard, Time),
Mod:store_batch(Shard, GenData, Messages, Options);
T0 = erlang:monotonic_time(microsecond),
Result = Mod:store_batch(Shard, GenData, Messages, Options),
T1 = erlang:monotonic_time(microsecond),
emqx_ds_builtin_metrics:observe_store_batch_time(Shard, T1 - T0),
Result;
store_batch(_Shard, [], _Options) ->
ok.

View File

@ -491,7 +491,9 @@ emqx_collect(K = emqx_ds_egress_batches_retry, D) -> counter_metrics(?MG(K, D, [
emqx_collect(K = emqx_ds_egress_batches_failed, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = emqx_ds_egress_messages, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = emqx_ds_egress_bytes, D) -> counter_metrics(?MG(K, D, []));
emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, [])).
emqx_collect(K = emqx_ds_egress_flush_time, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = emqx_ds_store_batch_time, D) -> gauge_metrics(?MG(K, D, []));
emqx_collect(K = emqx_ds_builtin_next_time, D) -> gauge_metrics(?MG(K, D, [])).
%%--------------------------------------------------------------------
%% Indicators