diff --git a/apps/emqx/src/emqx_metrics_worker.erl b/apps/emqx/src/emqx_metrics_worker.erl index 21e73ff51..ab6a0b1a6 100644 --- a/apps/emqx/src/emqx_metrics_worker.erl +++ b/apps/emqx/src/emqx_metrics_worker.erl @@ -173,7 +173,7 @@ get_metrics(Name, Id) -> inc(Name, Id, Metric) -> inc(Name, Id, Metric, 1). --spec inc(handler_name(), metric_id(), atom(), pos_integer()) -> ok. +-spec inc(handler_name(), metric_id(), atom(), integer()) -> ok. inc(Name, Id, Metric, Val) -> counters:add(get_ref(Name, Id), idx_metric(Name, Id, Metric), Val). diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 0d53b813e..d7953ac3b 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -143,7 +143,7 @@ emqx_resource_schema { } } - queue_max_bytes { + max_queue_bytes { desc { en: """Maximum queue storage.""" zh: """消息队列的最大长度。""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 4d1c45eb4..2409a7069 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -68,7 +68,7 @@ batch_size => pos_integer(), batch_time => pos_integer(), enable_queue => boolean(), - queue_max_bytes => pos_integer(), + max_queue_bytes => pos_integer(), query_mode => query_mode(), resume_interval => pos_integer(), async_inflight_window => pos_integer() @@ -81,8 +81,11 @@ -define(WORKER_POOL_SIZE, 16). --define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). --define(DEFAULT_QUEUE_SIZE_RAW, <<"1GB">>). +-define(DEFAULT_QUEUE_SEG_SIZE, 10 * 1024 * 1024). +-define(DEFAULT_QUEUE_SEG_SIZE_RAW, <<"10MB">>). + +-define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024 * 1024). +-define(DEFAULT_QUEUE_SIZE_RAW, <<"100GB">>). %% count -define(DEFAULT_BATCH_SIZE, 100). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 261863d4c..eaf0a9a5f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -577,8 +577,12 @@ maybe_alarm(_Status, ResId) -> ). maybe_resume_resource_workers(connected) -> - {_, Pid, _, _} = supervisor:which_children(emqx_resource_worker_sup), - emqx_resource_worker:resume(Pid); + lists:foreach( + fun({_, Pid, _, _}) -> + emqx_resource_worker:resume(Pid) + end, + supervisor:which_children(emqx_resource_worker_sup) + ); maybe_resume_resource_workers(_) -> ok. diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index dabaf037c..013f430b1 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -123,13 +123,15 @@ init({Id, Index, Opts}) -> true -> replayq:open(#{ dir => disk_queue_dir(Id, Index), - seg_bytes => maps:get(queue_max_bytes, Opts, ?DEFAULT_QUEUE_SIZE), + seg_bytes => maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), + max_total_bytes => maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), sizer => fun ?MODULE:estimate_size/1, marshaller => fun ?MODULE:queue_item_marshaller/1 }); false -> undefined end, + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', replayq:count(Queue)), ok = inflight_new(Name), St = #{ id => Id, @@ -323,23 +325,27 @@ flush( end. maybe_append_queue(Id, undefined, _Items) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), undefined; maybe_append_queue(Id, Q, Items) -> - case replayq:overflow(Q) of - Overflow when Overflow =< 0 -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), - replayq:append(Q, Items); - Overflow -> - PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, - {Q1, QAckRef, Items} = replayq:pop(Q, PopOpts), - ok = replayq:ack(Q1, QAckRef), - Dropped = length(Items), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), - ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), - Q1 - end. + Q2 = + case replayq:overflow(Q) of + Overflow when Overflow =< 0 -> + Q; + Overflow -> + PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, + {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), + ok = replayq:ack(Q1, QAckRef), + Dropped = length(Items2), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued', -Dropped), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), + ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), + Q1 + end, + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queued'), + replayq:append(Q2, Items). batch_reply_caller(Id, BatchResult, Batch) -> lists:foldl( diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index fe8564a41..9e54c8a7b 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -53,7 +53,7 @@ fields("creation_opts") -> {batch_size, fun batch_size/1}, {batch_time, fun batch_time/1}, {enable_queue, fun enable_queue/1}, - {max_queue_bytes, fun queue_max_bytes/1} + {max_queue_bytes, fun max_queue_bytes/1} ]. worker_pool_size(type) -> pos_integer(); @@ -110,11 +110,11 @@ batch_time(default) -> ?DEFAULT_BATCH_TIME_RAW; batch_time(required) -> false; batch_time(_) -> undefined. -queue_max_bytes(type) -> emqx_schema:bytesize(); -queue_max_bytes(desc) -> ?DESC("queue_max_bytes"); -queue_max_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; -queue_max_bytes(required) -> false; -queue_max_bytes(_) -> undefined. +max_queue_bytes(type) -> emqx_schema:bytesize(); +max_queue_bytes(desc) -> ?DESC("max_queue_bytes"); +max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW; +max_queue_bytes(required) -> false; +max_queue_bytes(_) -> undefined. desc("creation_opts") -> ?DESC("creation_opts"). diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index aafce137c..847c4ff00 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -522,6 +522,6 @@ inc_action_metrics(R, RuleId) -> is_ok_result(ok) -> true; is_ok_result(R) when is_tuple(R) -> - ok = erlang:element(1, R); + ok == erlang:element(1, R); is_ok_result(ok) -> false.