diff --git a/Makefile b/Makefile index c2b33786b..79107cba9 100644 --- a/Makefile +++ b/Makefile @@ -88,6 +88,7 @@ define gen-app-ct-target $1-ct: $(REBAR) @$(SCRIPTS)/pre-compile.sh $(PROFILE) @ENABLE_COVER_COMPILE=1 $(REBAR) ct -c -v \ + --readable=$(CT_READABLE) \ --name $(CT_NODE_NAME) \ --cover_export_name $(CT_COVER_EXPORT_PREFIX)-$(subst /,-,$1) \ --suite $(shell $(SCRIPTS)/find-suites.sh $1) diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 61196a645..f440dfc5a 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -447,8 +447,11 @@ is_all_tcp_servers_available(Servers) -> is_tcp_server_available(Host, Port) end, case lists:partition(Fun, Servers) of - {_, []} -> true; - {_, Unavail} -> ct:print("Unavailable servers: ~p", [Unavail]) + {_, []} -> + true; + {_, Unavail} -> + ct:print("Unavailable servers: ~p", [Unavail]), + false end. -spec is_tcp_server_available( diff --git a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf index b62a2ee68..d575f09bc 100644 --- a/apps/emqx_bridge/i18n/emqx_bridge_schema.conf +++ b/apps/emqx_bridge/i18n/emqx_bridge_schema.conf @@ -78,17 +78,6 @@ emqx_bridge_schema { } } - metric_batching { - desc { - en: """Count of messages that are currently accumulated in memory waiting for sending in one batch.""" - zh: """当前积压在内存里,等待批量发送的消息个数""" - } - label: { - en: "Batched" - zh: "等待批量发送" - } - } - metric_dropped { desc { en: """Count of messages dropped.""" @@ -120,16 +109,6 @@ emqx_bridge_schema { zh: "队列已满被丢弃" } } - metric_dropped_queue_not_enabled { - desc { - en: """Count of messages dropped due to the queue is not enabled.""" - zh: """因为队列未启用被丢弃的消息个数。""" - } - label: { - en: "Dropped Queue Disabled" - zh: "队列未启用被丢弃" - } - } metric_dropped_resource_not_found { desc { en: """Count of messages dropped due to the resource is not found.""" @@ -193,7 +172,7 @@ emqx_bridge_schema { } } - metric_sent_inflight { + metric_inflight { desc { en: """Count of messages that were sent asynchronously but ACKs are not yet received.""" zh: """已异步地发送但没有收到 ACK 的消息个数。""" diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl index ff639066f..d8229cc77 100644 --- a/apps/emqx_bridge/include/emqx_bridge.hrl +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -16,16 +16,14 @@ -define(EMPTY_METRICS, ?METRICS( - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ) ). -define(METRICS( - Batched, Dropped, DroppedOther, DroppedQueueFull, - DroppedQueueNotEnabled, DroppedResourceNotFound, DroppedResourceStopped, Matched, @@ -40,11 +38,9 @@ Rcvd ), #{ - 'batching' => Batched, 'dropped' => Dropped, 'dropped.other' => DroppedOther, 'dropped.queue_full' => DroppedQueueFull, - 'dropped.queue_not_enabled' => DroppedQueueNotEnabled, 'dropped.resource_not_found' => DroppedResourceNotFound, 'dropped.resource_stopped' => DroppedResourceStopped, 'matched' => Matched, @@ -61,11 +57,9 @@ ). -define(metrics( - Batched, Dropped, DroppedOther, DroppedQueueFull, - DroppedQueueNotEnabled, DroppedResourceNotFound, DroppedResourceStopped, Matched, @@ -80,11 +74,9 @@ Rcvd ), #{ - 'batching' := Batched, 'dropped' := Dropped, 'dropped.other' := DroppedOther, 'dropped.queue_full' := DroppedQueueFull, - 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 1debc90c4..0d0d2e9ad 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -207,7 +207,6 @@ info_example_basic(webhook) -> auto_restart_interval => 15000, query_mode => async, async_inflight_window => 100, - enable_queue => false, max_queue_bytes => 100 * 1024 * 1024 } }; @@ -233,7 +232,6 @@ mqtt_main_example() -> health_check_interval => <<"15s">>, auto_restart_interval => <<"60s">>, query_mode => sync, - enable_queue => false, max_queue_bytes => 100 * 1024 * 1024 }, ssl => #{ @@ -634,11 +632,11 @@ aggregate_metrics(AllMetrics) -> fun( #{ metrics := ?metrics( - M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17 + M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15 ) }, ?metrics( - N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17 + N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15 ) ) -> ?METRICS( @@ -656,9 +654,7 @@ aggregate_metrics(AllMetrics) -> M12 + N12, M13 + N13, M14 + N14, - M15 + N15, - M16 + N16, - M17 + N17 + M15 + N15 ) end, InitMetrics, @@ -691,7 +687,6 @@ format_metrics(#{ 'dropped' := Dropped, 'dropped.other' := DroppedOther, 'dropped.queue_full' := DroppedQueueFull, - 'dropped.queue_not_enabled' := DroppedQueueNotEnabled, 'dropped.resource_not_found' := DroppedResourceNotFound, 'dropped.resource_stopped' := DroppedResourceStopped, 'matched' := Matched, @@ -705,15 +700,12 @@ format_metrics(#{ matched := #{current := Rate, last5m := Rate5m, max := RateMax} } }) -> - Batched = maps:get('batching', Gauges, 0), Queued = maps:get('queuing', Gauges, 0), SentInflight = maps:get('inflight', Gauges, 0), ?METRICS( - Batched, Dropped, DroppedOther, DroppedQueueFull, - DroppedQueueNotEnabled, DroppedResourceNotFound, DroppedResourceStopped, Matched, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl index 4e35e38aa..7bd83d139 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl @@ -82,7 +82,6 @@ default_resource_opts() -> #{ <<"async_inflight_window">> => 100, <<"auto_restart_interval">> => <<"60s">>, - <<"enable_queue">> => false, <<"health_check_interval">> => <<"15s">>, <<"max_queue_bytes">> => <<"1GB">>, <<"query_mode">> => <<"sync">>, diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index c2358da51..b4159e0a0 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -128,12 +128,9 @@ fields(bridges) -> ] ++ ee_fields_bridges(); fields("metrics") -> [ - {"batching", mk(integer(), #{desc => ?DESC("metric_batching")})}, {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, {"dropped.other", mk(integer(), #{desc => ?DESC("metric_dropped_other")})}, {"dropped.queue_full", mk(integer(), #{desc => ?DESC("metric_dropped_queue_full")})}, - {"dropped.queue_not_enabled", - mk(integer(), #{desc => ?DESC("metric_dropped_queue_not_enabled")})}, {"dropped.resource_not_found", mk(integer(), #{desc => ?DESC("metric_dropped_resource_not_found")})}, {"dropped.resource_stopped", @@ -142,7 +139,7 @@ fields("metrics") -> {"queuing", mk(integer(), #{desc => ?DESC("metric_queuing")})}, {"retried", mk(integer(), #{desc => ?DESC("metric_retried")})}, {"failed", mk(integer(), #{desc => ?DESC("metric_sent_failed")})}, - {"inflight", mk(integer(), #{desc => ?DESC("metric_sent_inflight")})}, + {"inflight", mk(integer(), #{desc => ?DESC("metric_inflight")})}, {"success", mk(integer(), #{desc => ?DESC("metric_sent_success")})}, {"rate", mk(float(), #{desc => ?DESC("metric_rate")})}, {"rate_max", mk(float(), #{desc => ?DESC("metric_rate_max")})}, diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index 90d19eccf..ae4fc4692 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -662,7 +662,6 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"reconnect_interval">> => <<"1s">>, <<"resource_opts">> => #{ <<"worker_pool_size">> => 2, - <<"enable_queue">> => true, <<"query_mode">> => <<"sync">>, %% to make it check the healthy quickly <<"health_check_interval">> => <<"0.5s">> diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 332dfdd8c..52756f70d 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -126,8 +126,8 @@ When disabled the messages are buffered in RAM only.""" batch_size { desc { - en: """Maximum batch count.""" - zh: """批量请求大小。""" + en: """Maximum batch count. If equal to 1, there's effectively no batching.""" + zh: """批量请求大小。如果设为1,则无批处理。""" } label { en: """Batch size""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 6929cece9..d7b080ae8 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -64,10 +64,8 @@ %% If the resource disconnected, we can set to retry starting the resource %% periodically. auto_restart_interval => pos_integer(), - enable_batch => boolean(), batch_size => pos_integer(), batch_time => pos_integer(), - enable_queue => boolean(), max_queue_bytes => pos_integer(), query_mode => query_mode(), resume_interval => pos_integer(), @@ -90,7 +88,7 @@ -define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>). %% count --define(DEFAULT_BATCH_SIZE, 100). +-define(DEFAULT_BATCH_SIZE, 1). %% milliseconds -define(DEFAULT_BATCH_TIME, 20). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 26ce5d6f7..8ad3fdd80 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -136,7 +136,6 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> 'success', 'failed', 'dropped', - 'dropped.queue_not_enabled', 'dropped.queue_full', 'dropped.resource_not_found', 'dropped.resource_stopped', diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index 64f014918..455be9c22 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -24,9 +24,6 @@ ]). -export([ - batching_set/3, - batching_shift/3, - batching_get/1, inflight_set/3, inflight_get/1, queuing_set/3, @@ -40,9 +37,6 @@ dropped_queue_full_inc/1, dropped_queue_full_inc/2, dropped_queue_full_get/1, - dropped_queue_not_enabled_inc/1, - dropped_queue_not_enabled_inc/2, - dropped_queue_not_enabled_get/1, dropped_resource_not_found_inc/1, dropped_resource_not_found_inc/2, dropped_resource_not_found_get/1, @@ -80,10 +74,8 @@ events() -> [ [?TELEMETRY_PREFIX, Event] || Event <- [ - batching, dropped_other, dropped_queue_full, - dropped_queue_not_enabled, dropped_resource_not_found, dropped_resource_stopped, failed, @@ -125,9 +117,6 @@ handle_telemetry_event( dropped_queue_full -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_full', Val); - dropped_queue_not_enabled -> - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), - emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.queue_not_enabled', Val); dropped_resource_not_found -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped.resource_not_found', Val); @@ -160,8 +149,6 @@ handle_telemetry_event( _HandlerConfig ) -> case Event of - batching -> - emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val); inflight -> emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val); queuing -> @@ -169,45 +156,12 @@ handle_telemetry_event( _ -> ok end; -handle_telemetry_event( - [?TELEMETRY_PREFIX, Event], - _Measurements = #{gauge_shift := Val}, - _Metadata = #{resource_id := ID, worker_id := WorkerID}, - _HandlerConfig -) -> - case Event of - batching -> - emqx_metrics_worker:shift_gauge(?RES_METRICS, ID, WorkerID, 'batching', Val); - _ -> - ok - end; handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> ok. %% Gauges (value can go both up and down): %% -------------------------------------- -%% @doc Count of messages that are currently accumulated in memory waiting for -%% being sent in one batch -batching_set(ID, WorkerID, Val) -> - telemetry:execute( - [?TELEMETRY_PREFIX, batching], - #{gauge_set => Val}, - #{resource_id => ID, worker_id => WorkerID} - ). - -batching_shift(_ID, _WorkerID = undefined, _Val) -> - ok; -batching_shift(ID, WorkerID, Val) -> - telemetry:execute( - [?TELEMETRY_PREFIX, batching], - #{gauge_shift => Val}, - #{resource_id => ID, worker_id => WorkerID} - ). - -batching_get(ID) -> - emqx_metrics_worker:get_gauge(?RES_METRICS, ID, 'batching'). - %% @doc Count of batches of messages that are currently %% queuing. [Gauge] queuing_set(ID, WorkerID, Val) -> @@ -269,18 +223,6 @@ dropped_queue_full_inc(ID, Val) -> dropped_queue_full_get(ID) -> emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_full'). -%% @doc Count of messages dropped because the queue was not enabled -dropped_queue_not_enabled_inc(ID) -> - dropped_queue_not_enabled_inc(ID, 1). - -dropped_queue_not_enabled_inc(ID, Val) -> - telemetry:execute([?TELEMETRY_PREFIX, dropped_queue_not_enabled], #{counter_inc => Val}, #{ - resource_id => ID - }). - -dropped_queue_not_enabled_get(ID) -> - emqx_metrics_worker:get(?RES_METRICS, ID, 'dropped.queue_not_enabled'). - %% @doc Count of messages dropped because the resource was not found dropped_resource_not_found_inc(ID) -> dropped_resource_not_found_inc(ID, 1). diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 5f7cdf7e0..93bb22551 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -32,7 +32,6 @@ sync_query/3, async_query/3, block/1, - block/2, resume/1 ]). @@ -54,8 +53,12 @@ -export([reply_after_query/7, batch_reply_after_query/7]). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + -define(Q_ITEM(REQUEST), {q_item, REQUEST}). +-define(COLLECT_REQ_LIMIT, 1000). +-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). -define(QUERY(FROM, REQUEST, SENT), {query, FROM, REQUEST, SENT}). -define(REPLY(FROM, REQUEST, SENT, RESULT), {reply, FROM, REQUEST, SENT, RESULT}). -define(EXPAND(RESULT, BATCH), [ @@ -64,12 +67,22 @@ ]). -type id() :: binary(). --type query() :: {query, from(), request()}. +-type index() :: pos_integer(). +-type queue_query() :: ?QUERY(from(), request(), HasBeenSent :: boolean()). -type request() :: term(). --type from() :: pid() | reply_fun(). - --callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) -> - {{from(), result()}, NewCbState :: term()}. +-type from() :: pid() | reply_fun() | request_from(). +-type request_from() :: undefined | gen_statem:from(). +-type state() :: blocked | running. +-type data() :: #{ + id => id(), + index => index(), + name => atom(), + batch_size => pos_integer(), + batch_time => timer:time(), + queue => replayq:q(), + resume_interval => timer:time(), + tref => undefined | timer:tref() +}. callback_mode() -> [state_functions, state_enter]. @@ -80,11 +93,13 @@ start_link(Id, Index, Opts) -> sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), + emqx_resource_metrics:matched_inc(Id), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). -spec async_query(id(), request(), query_opts()) -> Result :: term(). async_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), + emqx_resource_metrics:matched_inc(Id), pick_cast(Id, PickKey, {query, Request, Opts}). %% simple query the resource without batching and queuing messages. @@ -97,7 +112,9 @@ simple_sync_query(Id, Request) -> %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. Index = undefined, - Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), #{}), + QueryOpts = #{}, + emqx_resource_metrics:matched_inc(Id), + Result = call_query(sync, Id, Index, ?QUERY(self(), Request, false), QueryOpts), _ = handle_query_result(Id, Result, false, false), Result. @@ -110,7 +127,9 @@ simple_async_query(Id, Request, ReplyFun) -> %% would mess up the metrics anyway. `undefined' is ignored by %% `emqx_resource_metrics:*_shift/3'. Index = undefined, - Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), #{}), + QueryOpts = #{}, + emqx_resource_metrics:matched_inc(Id), + Result = call_query(async, Id, Index, ?QUERY(ReplyFun, Request, false), QueryOpts), _ = handle_query_result(Id, Result, false, false), Result. @@ -118,14 +137,11 @@ simple_async_query(Id, Request, ReplyFun) -> block(ServerRef) -> gen_statem:cast(ServerRef, block). --spec block(pid() | atom(), [query()]) -> ok. -block(ServerRef, Query) -> - gen_statem:cast(ServerRef, {block, Query}). - -spec resume(pid() | atom()) -> ok. resume(ServerRef) -> gen_statem:cast(ServerRef, resume). +-spec init({id(), pos_integer(), map()}) -> gen_statem:init_result(state(), data()). init({Id, Index, Opts}) -> process_flag(trap_exit, true), true = gproc_pool:connect_worker(Id, {Id, Index}), @@ -134,21 +150,19 @@ init({Id, Index, Opts}) -> SegBytes0 = maps:get(queue_seg_bytes, Opts, ?DEFAULT_QUEUE_SEG_SIZE), TotalBytes = maps:get(max_queue_bytes, Opts, ?DEFAULT_QUEUE_SIZE), SegBytes = min(SegBytes0, TotalBytes), - Queue = - case maps:get(enable_queue, Opts, false) of - true -> - replayq:open(#{ - dir => disk_queue_dir(Id, Index), - seg_bytes => SegBytes, - max_total_bytes => TotalBytes, - sizer => fun ?MODULE:estimate_size/1, - marshaller => fun ?MODULE:queue_item_marshaller/1 - }); - false -> - undefined - end, + QueueOpts = + #{ + dir => disk_queue_dir(Id, Index), + marshaller => fun ?MODULE:queue_item_marshaller/1, + max_total_bytes => TotalBytes, + %% we don't want to retain the queue after + %% resource restarts. + offload => true, + seg_bytes => SegBytes, + sizer => fun ?MODULE:estimate_size/1 + }, + Queue = replayq:open(QueueOpts), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Queue)), - emqx_resource_metrics:batching_set(Id, Index, 0), emqx_resource_metrics:inflight_set(Id, Index, 0), InfltWinSZ = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), ok = inflight_new(Name, InfltWinSZ, Id, Index), @@ -157,79 +171,67 @@ init({Id, Index, Opts}) -> id => Id, index => Index, name => Name, - enable_batch => maps:get(enable_batch, Opts, false), batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, resume_interval => maps:get(resume_interval, Opts, HCItvl), - acc => [], - acc_left => BatchSize, tref => undefined }, {ok, blocked, St, {next_event, cast, resume}}. -running(enter, _, _St) -> - keep_state_and_data; +running(enter, _, St) -> + ?tp(resource_worker_enter_running, #{}), + maybe_flush(St); running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> {next_state, blocked, St}; -running( - cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St -) when - is_list(Batch) --> - Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]), - {next_state, blocked, St#{queue := Q1}}; -running({call, From}, {query, Request, _Opts}, St) -> - query_or_acc(From, Request, St); -running(cast, {query, Request, Opts}, St) -> - ReplyFun = maps:get(async_reply_fun, Opts, undefined), - query_or_acc(ReplyFun, Request, St); +running(info, ?SEND_REQ(_From, _Req) = Request0, Data) -> + handle_query_requests(Request0, Data); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); +running(internal, flush, St) -> + flush(St); running(info, {flush, _Ref}, _St) -> keep_state_and_data; running(info, Info, _St) -> - ?SLOG(error, #{msg => unexpected_msg, info => Info}), + ?SLOG(error, #{msg => unexpected_msg, state => running, info => Info}), keep_state_and_data. blocked(enter, _, #{resume_interval := ResumeT} = _St) -> + ?tp(resource_worker_enter_blocked, #{}), {keep_state_and_data, {state_timeout, ResumeT, resume}}; blocked(cast, block, _St) -> keep_state_and_data; -blocked( - cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, index := Index, queue := Q} = St -) when - is_list(Batch) --> - Q1 = maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query) || Query <- Batch]), - {keep_state, St#{queue := Q1}}; blocked(cast, resume, St) -> do_resume(St); blocked(state_timeout, resume, St) -> do_resume(St); -blocked({call, From}, {query, Request, _Opts}, #{id := Id, index := Index, queue := Q} = St) -> +blocked(info, ?SEND_REQ(ReqFrom, {query, Request, Opts}), Data0) -> + #{ + id := Id, + index := Index, + queue := Q + } = Data0, + From = + case ReqFrom of + undefined -> maps:get(async_reply_fun, Opts, undefined); + From1 -> From1 + end, Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = reply_caller(Id, ?REPLY(From, Request, false, Error)), - {keep_state, St#{ - queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(From, Request, false))]) - }}; -blocked(cast, {query, Request, Opts}, #{id := Id, index := Index, queue := Q} = St) -> - ReplyFun = maps:get(async_reply_fun, Opts, undefined), - Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)), - {keep_state, St#{ - queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))]) - }}. + HasBeenSent = false, + _ = reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Error)), + NewQ = append_queue(Id, Index, Q, [?QUERY(From, Request, HasBeenSent)]), + Data = Data0#{queue := NewQ}, + {keep_state, Data}; +blocked(info, {flush, _Ref}, _Data) -> + keep_state_and_data; +blocked(info, Info, _Data) -> + ?SLOG(error, #{msg => unexpected_msg, state => blocked, info => Info}), + keep_state_and_data. terminate(_Reason, #{id := Id, index := Index, queue := Q}) -> - GaugeFns = - [ - fun emqx_resource_metrics:batching_set/3, - fun emqx_resource_metrics:inflight_set/3 - ], - lists:foreach(fun(Fn) -> Fn(Id, Index, 0) end, GaugeFns), + emqx_resource_metrics:inflight_set(Id, Index, 0), emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q)), gproc_pool:disconnect_worker(Id, {Id, Index}). @@ -252,43 +254,71 @@ code_change(_OldVsn, State, _Extra) -> ). pick_call(Id, Key, Query, Timeout) -> - ?PICK(Id, Key, gen_statem:call(Pid, Query, {clean_timeout, Timeout})). + ?PICK(Id, Key, begin + Caller = self(), + MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), + From = {Caller, MRef}, + erlang:send(Pid, ?SEND_REQ(From, Query)), + receive + {MRef, Response} -> + erlang:demonitor(MRef, [flush]), + Response; + {'DOWN', MRef, process, Pid, Reason} -> + error({worker_down, Reason}) + after Timeout -> + erlang:demonitor(MRef, [flush]), + receive + {MRef, Response} -> + Response + after 0 -> + error(timeout) + end + end + end). pick_cast(Id, Key, Query) -> - ?PICK(Id, Key, gen_statem:cast(Pid, Query)). + ?PICK(Id, Key, begin + From = undefined, + erlang:send(Pid, ?SEND_REQ(From, Query)), + ok + end). -do_resume(#{id := Id, name := Name} = St) -> +do_resume(#{id := Id, name := Name} = Data) -> case inflight_get_first(Name) of empty -> - retry_queue(St); + retry_queue(Data); {Ref, FirstQuery} -> %% We retry msgs in inflight window sync, as if we send them %% async, they will be appended to the end of inflight window again. - retry_inflight_sync(Id, Ref, FirstQuery, Name, St) + retry_inflight_sync(Id, Ref, FirstQuery, Name, Data) end. -retry_queue(#{queue := undefined} = St) -> - {next_state, running, St}; retry_queue( #{ - queue := Q, + queue := Q0, id := Id, index := Index, - enable_batch := false, + batch_size := 1, + name := Name, resume_interval := ResumeT - } = St + } = Data0 ) -> - case get_first_n_from_queue(Q, 1) of - [] -> - {next_state, running, St}; - [?QUERY(_, Request, HasSent) = Query] -> - QueryOpts = #{inflight_name => maps:get(name, St)}, + %% no batching + case get_first_n_from_queue(Q0, 1) of + empty -> + {next_state, running, Data0}; + {Q1, QAckRef, [?QUERY(_, Request, HasBeenSent) = Query]} -> + QueryOpts = #{inflight_name => Name}, Result = call_query(configured, Id, Index, Query, QueryOpts), - case reply_caller(Id, ?REPLY(undefined, Request, HasSent, Result)) of + Reply = ?REPLY(undefined, Request, HasBeenSent, Result), + case reply_caller(Id, Reply) of true -> - {keep_state, St, {state_timeout, ResumeT, resume}}; + {keep_state, Data0, {state_timeout, ResumeT, resume}}; false -> - retry_queue(St#{queue := drop_head(Q, Id, Index)}) + ok = replayq:ack(Q1, QAckRef), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + Data = Data0#{queue := Q1}, + retry_queue(Data) end end; retry_queue( @@ -296,101 +326,202 @@ retry_queue( queue := Q, id := Id, index := Index, - enable_batch := true, batch_size := BatchSize, + name := Name, resume_interval := ResumeT - } = St + } = Data0 ) -> + %% batching case get_first_n_from_queue(Q, BatchSize) of - [] -> - {next_state, running, St}; - Batch0 -> - QueryOpts = #{inflight_name => maps:get(name, St)}, + empty -> + {next_state, running, Data0}; + {Q1, QAckRef, Batch0} -> + QueryOpts = #{inflight_name => Name}, Result = call_query(configured, Id, Index, Batch0, QueryOpts), %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, %% we now change the 'from' field to 'undefined' so it will not reply the caller again. - Batch = [?QUERY(undefined, Request, HasSent) || ?QUERY(_, Request, HasSent) <- Batch0], + Batch = [ + ?QUERY(undefined, Request, HasBeenSent0) + || ?QUERY(_, Request, HasBeenSent0) <- Batch0 + ], case batch_reply_caller(Id, Result, Batch) of true -> - {keep_state, St, {state_timeout, ResumeT, resume}}; + ?tp(resource_worker_retry_queue_batch_failed, #{batch => Batch}), + {keep_state, Data0, {state_timeout, ResumeT, resume}}; false -> - retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id, Index)}) + ?tp(resource_worker_retry_queue_batch_succeeded, #{batch => Batch}), + ok = replayq:ack(Q1, QAckRef), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + Data = Data0#{queue := Q1}, + retry_queue(Data) end end. retry_inflight_sync( Id, Ref, - ?QUERY(_, _, HasSent) = Query, + QueryOrBatch, Name, - #{index := Index, resume_interval := ResumeT} = St0 + #{index := Index, resume_interval := ResumeT} = Data0 ) -> - Result = call_query(sync, Id, Index, Query, #{}), - case handle_query_result(Id, Result, HasSent, false) of - %% Send failed because resource down + QueryOpts = #{}, + %% if we are retrying an inflight query, it has been sent + HasBeenSent = true, + Result = call_query(sync, Id, Index, QueryOrBatch, QueryOpts), + BlockWorker = false, + case handle_query_result(Id, Result, HasBeenSent, BlockWorker) of + %% Send failed because resource is down true -> - {keep_state, St0, {state_timeout, ResumeT, resume}}; + {keep_state, Data0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> inflight_drop(Name, Ref, Id, Index), - do_resume(St0) + do_resume(Data0) end. -query_or_acc( - From, - Request, - #{ - enable_batch := true, - acc := Acc, - acc_left := Left, - index := Index, - id := Id - } = St0 -) -> - Acc1 = [?QUERY(From, Request, false) | Acc], - emqx_resource_metrics:batching_shift(Id, Index, 1), - St = St0#{acc := Acc1, acc_left := Left - 1}, - case Left =< 1 of - true -> flush(St); - false -> {keep_state, ensure_flush_timer(St)} - end; -query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, index := Index} = St) -> - QueryOpts = #{ - inflight_name => maps:get(name, St) - }, - Result = call_query(configured, Id, Index, ?QUERY(From, Request, false), QueryOpts), - case reply_caller(Id, ?REPLY(From, Request, false, Result)) of - true -> - Query = ?QUERY(From, Request, false), - {next_state, blocked, St#{queue := maybe_append_queue(Id, Index, Q, [?Q_ITEM(Query)])}}; - false -> - {keep_state, St} - end. - -flush(#{acc := []} = St) -> - {keep_state, St}; -flush( +%% Called during the `running' state only. +-spec handle_query_requests(?SEND_REQ(request_from(), request()), data()) -> data(). +handle_query_requests(Request0, Data0) -> #{ id := Id, index := Index, - acc := Batch0, - batch_size := Size, - queue := Q0 - } = St -) -> - Batch = lists:reverse(Batch0), - QueryOpts = #{ - inflight_name => maps:get(name, St) - }, - emqx_resource_metrics:batching_shift(Id, Index, -length(Batch)), - Result = call_query(configured, Id, Index, Batch, QueryOpts), - St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), - case batch_reply_caller(Id, Result, Batch) of + queue := Q + } = Data0, + Requests = collect_requests([Request0], ?COLLECT_REQ_LIMIT), + QueueItems = + lists:map( + fun + (?SEND_REQ(undefined = _From, {query, Req, Opts})) -> + ReplyFun = maps:get(async_reply_fun, Opts, undefined), + HasBeenSent = false, + ?QUERY(ReplyFun, Req, HasBeenSent); + (?SEND_REQ(From, {query, Req, _Opts})) -> + HasBeenSent = false, + ?QUERY(From, Req, HasBeenSent) + end, + Requests + ), + NewQ = append_queue(Id, Index, Q, QueueItems), + Data = Data0#{queue := NewQ}, + maybe_flush(Data). + +maybe_flush(Data) -> + #{ + batch_size := BatchSize, + queue := Q + } = Data, + QueueCount = queue_count(Q), + case QueueCount >= BatchSize of true -> - Q1 = maybe_append_queue(Id, Index, Q0, [?Q_ITEM(Query) || Query <- Batch]), - {next_state, blocked, St1#{queue := Q1}}; + flush(Data); false -> - {keep_state, St1} + {keep_state, ensure_flush_timer(Data)} + end. + +%% Called during the `running' state only. +-spec flush(data()) -> gen_statem:event_handler_result(state(), data()). +flush(Data0) -> + #{ + batch_size := BatchSize, + queue := Q0 + } = Data0, + case replayq:count(Q0) of + 0 -> + Data = cancel_flush_timer(Data0), + {keep_state, Data}; + _ -> + {Q1, QAckRef, Batch0} = replayq:pop(Q0, #{count_limit => BatchSize}), + Batch = [Item || ?Q_ITEM(Item) <- Batch0], + IsBatch = BatchSize =/= 1, + do_flush(Data0, #{ + new_queue => Q1, + is_batch => IsBatch, + batch => Batch, + ack_ref => QAckRef + }) + end. + +-spec do_flush(data(), #{ + is_batch := boolean(), + batch := [?QUERY(from(), request(), boolean())], + ack_ref := replayq:ack_ref() +}) -> + gen_statem:event_handler_result(state(), data()). +do_flush(Data0, #{is_batch := false, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) -> + #{ + id := Id, + index := Index, + name := Name + } = Data0, + %% unwrap when not batching (i.e., batch size == 1) + [?QUERY(From, CoreReq, HasBeenSent) = Request] = Batch, + QueryOpts = #{inflight_name => Name}, + Result = call_query(configured, Id, Index, Request, QueryOpts), + IsAsync = is_async(Id), + Data1 = cancel_flush_timer(Data0), + Reply = ?REPLY(From, CoreReq, HasBeenSent, Result), + case {reply_caller(Id, Reply), IsAsync} of + %% failed and is not async; keep the request in the queue to + %% be retried + {true, false} -> + {next_state, blocked, Data1}; + %% failed and is async; remove the request from the queue, as + %% it is already in inflight table + {true, true} -> + ok = replayq:ack(Q1, QAckRef), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + Data = Data1#{queue := Q1}, + {next_state, blocked, Data}; + %% success; just ack + {false, _} -> + ok = replayq:ack(Q1, QAckRef), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + Data2 = Data1#{queue := Q1}, + case replayq:count(Q1) > 0 of + true -> + {keep_state, Data2, [{next_event, internal, flush}]}; + false -> + {keep_state, Data2} + end + end; +do_flush(Data0, #{is_batch := true, batch := Batch, ack_ref := QAckRef, new_queue := Q1}) -> + #{ + id := Id, + index := Index, + batch_size := BatchSize, + name := Name + } = Data0, + QueryOpts = #{inflight_name => Name}, + Result = call_query(configured, Id, Index, Batch, QueryOpts), + IsAsync = is_async(Id), + Data1 = cancel_flush_timer(Data0), + case {batch_reply_caller(Id, Result, Batch), IsAsync} of + %% failed and is not async; keep the request in the queue to + %% be retried + {true, false} -> + {next_state, blocked, Data1}; + %% failed and is async; remove the request from the queue, as + %% it is already in inflight table + {true, true} -> + ok = replayq:ack(Q1, QAckRef), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + Data = Data1#{queue := Q1}, + {next_state, blocked, Data}; + %% success; just ack + {false, _} -> + ok = replayq:ack(Q1, QAckRef), + emqx_resource_metrics:queuing_set(Id, Index, queue_count(Q1)), + CurrentCount = replayq:count(Q1), + Data2 = Data1#{queue := Q1}, + case {CurrentCount > 0, CurrentCount >= BatchSize} of + {false, _} -> + {keep_state, Data2}; + {true, true} -> + {keep_state, Data2, [{next_event, internal, flush}]}; + {true, false} -> + Data3 = ensure_flush_timer(Data2), + {keep_state, Data3} + end end. batch_reply_caller(Id, BatchResult, Batch) -> @@ -405,11 +536,12 @@ batch_reply_caller(Id, BatchResult, Batch) -> ). reply_caller(Id, Reply) -> - reply_caller(Id, Reply, false). + BlockWorker = false, + reply_caller(Id, Reply, BlockWorker). -reply_caller(Id, ?REPLY(undefined, _, HasSent, Result), BlockWorker) -> - handle_query_result(Id, Result, HasSent, BlockWorker); -reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasSent, Result), BlockWorker) when +reply_caller(Id, ?REPLY(undefined, _, HasBeenSent, Result), BlockWorker) -> + handle_query_result(Id, Result, HasBeenSent, BlockWorker); +reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasBeenSent, Result), BlockWorker) when is_function(ReplyFun) -> _ = @@ -417,52 +549,52 @@ reply_caller(Id, ?REPLY({ReplyFun, Args}, _, HasSent, Result), BlockWorker) when {async_return, _} -> no_reply_for_now; _ -> apply(ReplyFun, Args ++ [Result]) end, - handle_query_result(Id, Result, HasSent, BlockWorker); -reply_caller(Id, ?REPLY(From, _, HasSent, Result), BlockWorker) -> + handle_query_result(Id, Result, HasBeenSent, BlockWorker); +reply_caller(Id, ?REPLY(From, _, HasBeenSent, Result), BlockWorker) -> gen_statem:reply(From, Result), - handle_query_result(Id, Result, HasSent, BlockWorker). + handle_query_result(Id, Result, HasBeenSent, BlockWorker). -handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasSent, BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(exception, Msg), HasBeenSent, BlockWorker) -> ?SLOG(error, #{msg => resource_exception, info => Msg}), - inc_sent_failed(Id, HasSent), + inc_sent_failed(Id, HasBeenSent), BlockWorker; -handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasSent, _) when +handle_query_result(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _) when NotWorking == not_connected; NotWorking == blocked -> true; -handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasSent, BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_not_found, info => Msg}), emqx_resource_metrics:dropped_resource_not_found_inc(Id), BlockWorker; -handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasSent, BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => resource_stopped, info => Msg}), emqx_resource_metrics:dropped_resource_stopped_inc(Id), BlockWorker; -handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasSent, BlockWorker) -> +handle_query_result(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => other_resource_error, reason => Reason}), emqx_resource_metrics:dropped_other_inc(Id), BlockWorker; -handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasSent, _BlockWorker) -> +handle_query_result(Id, {error, {recoverable_error, Reason}}, _HasBeenSent, _BlockWorker) -> %% the message will be queued in replayq or inflight window, %% i.e. the counter 'queuing' or 'dropped' will increase, so we pretend that we have not %% sent this message. ?SLOG(warning, #{id => Id, msg => recoverable_error, reason => Reason}), true; -handle_query_result(Id, {error, Reason}, HasSent, BlockWorker) -> +handle_query_result(Id, {error, Reason}, HasBeenSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => send_error, reason => Reason}), - inc_sent_failed(Id, HasSent), + inc_sent_failed(Id, HasBeenSent), BlockWorker; -handle_query_result(_Id, {async_return, inflight_full}, _HasSent, _BlockWorker) -> +handle_query_result(_Id, {async_return, inflight_full}, _HasBeenSent, _BlockWorker) -> true; -handle_query_result(Id, {async_return, {error, Msg}}, HasSent, BlockWorker) -> +handle_query_result(Id, {async_return, {error, Msg}}, HasBeenSent, BlockWorker) -> ?SLOG(error, #{id => Id, msg => async_send_error, info => Msg}), - inc_sent_failed(Id, HasSent), + inc_sent_failed(Id, HasBeenSent), BlockWorker; -handle_query_result(_Id, {async_return, ok}, _HasSent, BlockWorker) -> +handle_query_result(_Id, {async_return, ok}, _HasBeenSent, BlockWorker) -> BlockWorker; -handle_query_result(Id, Result, HasSent, BlockWorker) -> +handle_query_result(Id, Result, HasBeenSent, BlockWorker) -> assert_ok_result(Result), - inc_sent_success(Id, HasSent), + inc_sent_success(Id, HasBeenSent), BlockWorker. call_query(QM0, Id, Index, Query, QueryOpts) -> @@ -475,13 +607,10 @@ call_query(QM0, Id, Index, Query, QueryOpts) -> _ -> QM0 end, CM = maps:get(callback_mode, Data), - emqx_resource_metrics:matched_inc(Id), apply_query_fun(call_mode(QM, CM), Mod, Id, Index, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> - emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> - emqx_resource_metrics:matched_inc(Id), ?RESOURCE_ERROR(not_connected, "resource not connected"); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") @@ -513,7 +642,7 @@ apply_query_fun(async, Mod, Id, Index, ?QUERY(_, Request, _) = Query, ResSt, Que Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( call_query_async, - case inflight_is_full(Name) of + case is_inflight_full(Name) of true -> {async_return, inflight_full}; false -> @@ -535,26 +664,26 @@ apply_query_fun(async, Mod, Id, Index, [?QUERY(_, _, _) | _] = Batch, ResSt, Que Name = maps:get(inflight_name, QueryOpts, undefined), ?APPLY_RESOURCE( call_batch_query_async, - case inflight_is_full(Name) of + case is_inflight_full(Name) of true -> {async_return, inflight_full}; false -> ReplyFun = fun ?MODULE:batch_reply_after_query/7, Ref = make_message_ref(), - Args = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]}, + ReplyFunAndArgs = {ReplyFun, [self(), Id, Index, Name, Ref, Batch]}, Requests = [Request || ?QUERY(_From, Request, _) <- Batch], ok = inflight_append(Name, Ref, Batch, Id, Index), - Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt), + Result = Mod:on_batch_query_async(Id, Requests, ReplyFunAndArgs, ResSt), {async_return, Result} end, Batch ). -reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasSent), Result) -> +reply_after_query(Pid, Id, Index, Name, Ref, ?QUERY(From, Request, HasBeenSent), Result) -> %% NOTE: 'inflight' is the count of messages that were sent async %% but received no ACK, NOT the number of messages queued in the %% inflight window. - case reply_caller(Id, ?REPLY(From, Request, HasSent, Result)) of + case reply_caller(Id, ?REPLY(From, Request, HasBeenSent, Result)) of true -> ?MODULE:block(Pid); false -> @@ -573,7 +702,7 @@ batch_reply_after_query(Pid, Id, Index, Name, Ref, Batch, Result) -> end. drop_inflight_and_resume(Pid, Name, Ref, Id, Index) -> - case inflight_is_full(Name) of + case is_inflight_full(Name) of true -> inflight_drop(Name, Ref, Id, Index), ?MODULE:resume(Pid); @@ -591,10 +720,8 @@ queue_item_marshaller(Bin) when is_binary(Bin) -> estimate_size(QItem) -> size(queue_item_marshaller(QItem)). -maybe_append_queue(Id, _Index, undefined, _Items) -> - emqx_resource_metrics:dropped_queue_not_enabled_inc(Id), - undefined; -maybe_append_queue(Id, Index, Q, Items) -> +-spec append_queue(id(), index(), replayq:q(), [queue_query()]) -> replayq:q(). +append_queue(Id, Index, Q, Queries) -> Q2 = case replayq:overflow(Q) of Overflow when Overflow =< 0 -> @@ -608,42 +735,38 @@ maybe_append_queue(Id, Index, Q, Items) -> ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), Q1 end, + Items = [?Q_ITEM(X) || X <- Queries], Q3 = replayq:append(Q2, Items), emqx_resource_metrics:queuing_set(Id, Index, replayq:count(Q3)), + ?tp(resource_worker_appended_to_queue, #{id => Id, items => Queries}), Q3. +-spec get_first_n_from_queue(replayq:q(), pos_integer()) -> + empty | {replayq:q(), replayq:ack_ref(), [?Q_ITEM(?QUERY(_From, _Request, _HasBeenSent))]}. get_first_n_from_queue(Q, N) -> - get_first_n_from_queue(Q, N, []). - -get_first_n_from_queue(_Q, 0, Acc) -> - lists:reverse(Acc); -get_first_n_from_queue(Q, N, Acc) when N > 0 -> - case replayq:peek(Q) of - empty -> Acc; - ?Q_ITEM(Query) -> get_first_n_from_queue(Q, N - 1, [Query | Acc]) + case replayq:count(Q) of + 0 -> + empty; + _ -> + {NewQ, QAckRef, Items} = replayq:pop(Q, #{count_limit => N}), + Queries = [X || ?Q_ITEM(X) <- Items], + {NewQ, QAckRef, Queries} end. -drop_first_n_from_queue(Q, 0, _Id, _Index) -> - Q; -drop_first_n_from_queue(Q, N, Id, Index) when N > 0 -> - drop_first_n_from_queue(drop_head(Q, Id, Index), N - 1, Id, Index). - -drop_head(Q, Id, Index) -> - {NewQ, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), - ok = replayq:ack(NewQ, AckRef), - emqx_resource_metrics:queuing_set(Id, Index, replayq:count(NewQ)), - NewQ. - %%============================================================================== %% the inflight queue for async query --define(SIZE_REF, -1). +-define(MAX_SIZE_REF, -1). +-define(SIZE_REF, -2). inflight_new(Name, InfltWinSZ, Id, Index) -> _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), - inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}, Id, Index), + inflight_append(Name, ?MAX_SIZE_REF, {max_size, InfltWinSZ}, Id, Index), + %% we use this counter because we might deal with batches as + %% elements. + inflight_append(Name, ?SIZE_REF, 0, Id, Index), ok. inflight_get_first(Name) -> - case ets:next(Name, ?SIZE_REF) of + case ets:next(Name, ?MAX_SIZE_REF) of '$end_of_table' -> empty; Ref -> @@ -656,31 +779,42 @@ inflight_get_first(Name) -> end end. -inflight_is_full(undefined) -> +is_inflight_full(undefined) -> false; -inflight_is_full(Name) -> - [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF), - Size = inflight_size(Name), +is_inflight_full(Name) -> + [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?MAX_SIZE_REF), + %% we consider number of batches rather than number of messages + %% because one batch request may hold several messages. + Size = inflight_num_batches(Name), Size >= MaxSize. -inflight_size(Name) -> - %% Note: we subtract 1 because there's a metadata row that hold - %% the maximum size value. - MetadataRowCount = 1, +inflight_num_batches(Name) -> + %% Note: we subtract 2 because there're 2 metadata rows that hold + %% the maximum size value and the number of messages. + MetadataRowCount = 2, case ets:info(Name, size) of undefined -> 0; Size -> max(0, Size - MetadataRowCount) end. +inflight_num_msgs(Name) -> + [{_, Size}] = ets:lookup(Name, ?SIZE_REF), + Size. + inflight_append(undefined, _Ref, _Query, _Id, _Index) -> ok; -inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch, Id, Index) -> - ets:insert(Name, {Ref, [?QUERY(From, Req, true) || ?QUERY(From, Req, _) <- Batch]}), - emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), +inflight_append(Name, Ref, [?QUERY(_, _, _) | _] = Batch0, Id, Index) -> + Batch = mark_as_sent(Batch0), + ets:insert(Name, {Ref, Batch}), + BatchSize = length(Batch), + ets:update_counter(Name, ?SIZE_REF, {2, BatchSize}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), ok; -inflight_append(Name, Ref, ?QUERY(From, Req, _), Id, Index) -> - ets:insert(Name, {Ref, ?QUERY(From, Req, true)}), - emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), +inflight_append(Name, Ref, ?QUERY(_From, _Req, _HasBeenSent) = Query0, Id, Index) -> + Query = mark_as_sent(Query0), + ets:insert(Name, {Ref, Query}), + ets:update_counter(Name, ?SIZE_REF, {2, 1}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), ok; inflight_append(Name, Ref, Data, _Id, _Index) -> ets:insert(Name, {Ref, Data}), @@ -691,20 +825,26 @@ inflight_append(Name, Ref, Data, _Id, _Index) -> inflight_drop(undefined, _, _Id, _Index) -> ok; inflight_drop(Name, Ref, Id, Index) -> - ets:delete(Name, Ref), - emqx_resource_metrics:inflight_set(Id, Index, inflight_size(Name)), + Count = + case ets:take(Name, Ref) of + [{Ref, ?QUERY(_, _, _)}] -> 1; + [{Ref, [?QUERY(_, _, _) | _] = Batch}] -> length(Batch); + _ -> 0 + end, + Count > 0 andalso ets:update_counter(Name, ?SIZE_REF, {2, -Count, 0, 0}), + emqx_resource_metrics:inflight_set(Id, Index, inflight_num_msgs(Name)), ok. %%============================================================================== -inc_sent_failed(Id, _HasSent = true) -> +inc_sent_failed(Id, _HasBeenSent = true) -> emqx_resource_metrics:retried_failed_inc(Id); -inc_sent_failed(Id, _HasSent) -> +inc_sent_failed(Id, _HasBeenSent) -> emqx_resource_metrics:failed_inc(Id). -inc_sent_success(Id, _HasSent = true) -> +inc_sent_success(Id, _HasBeenSent = true) -> emqx_resource_metrics:retried_success_inc(Id); -inc_sent_success(Id, _HasSent) -> +inc_sent_success(Id, _HasBeenSent) -> emqx_resource_metrics:success_inc(Id). call_mode(sync, _) -> sync; @@ -725,8 +865,6 @@ assert_ok_result(R) when is_tuple(R) -> assert_ok_result(R) -> error({not_ok_result, R}). -queue_count(undefined) -> - 0; queue_count(Q) -> replayq:count(Q). @@ -741,12 +879,12 @@ disk_queue_dir(Id, Index) -> QDir = binary_to_list(Id) ++ ":" ++ integer_to_list(Index), filename:join([emqx:data_dir(), "resource_worker", node(), QDir]). -ensure_flush_timer(St = #{tref := undefined, batch_time := T}) -> +ensure_flush_timer(Data = #{tref := undefined, batch_time := T}) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush, Ref}), - St#{tref => {TRef, Ref}}; -ensure_flush_timer(St) -> - St. + Data#{tref => {TRef, Ref}}; +ensure_flush_timer(Data) -> + Data. cancel_flush_timer(St = #{tref := undefined}) -> St; @@ -756,3 +894,31 @@ cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> make_message_ref() -> erlang:unique_integer([monotonic, positive]). + +collect_requests(Acc, Limit) -> + Count = length(Acc), + do_collect_requests(Acc, Count, Limit). + +do_collect_requests(Acc, Count, Limit) when Count >= Limit -> + lists:reverse(Acc); +do_collect_requests(Acc, Count, Limit) -> + receive + ?SEND_REQ(_From, _Req) = Request -> + do_collect_requests([Request | Acc], Count + 1, Limit) + after 0 -> + lists:reverse(Acc) + end. + +mark_as_sent(Batch) when is_list(Batch) -> + lists:map(fun mark_as_sent/1, Batch); +mark_as_sent(?QUERY(From, Req, _)) -> + HasBeenSent = true, + ?QUERY(From, Req, HasBeenSent). + +is_async(ResourceId) -> + case emqx_resource_manager:ets_lookup(ResourceId) of + {ok, _Group, #{query_mode := QM, callback_mode := CM}} -> + call_mode(QM, CM) =:= async; + _ -> + false + end. diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 0e25d01b8..d105b21ef 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -83,12 +83,14 @@ query_mode(_) -> undefined. enable_batch(type) -> boolean(); enable_batch(required) -> false; enable_batch(default) -> true; +enable_batch(deprecated) -> {since, "v5.0.14"}; enable_batch(desc) -> ?DESC("enable_batch"); enable_batch(_) -> undefined. enable_queue(type) -> boolean(); enable_queue(required) -> false; enable_queue(default) -> false; +enable_queue(deprecated) -> {since, "v5.0.14"}; enable_queue(desc) -> ?DESC("enable_queue"); enable_queue(_) -> undefined. diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 7af0607cb..692895548 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -17,6 +17,7 @@ -module(emqx_connector_demo). -include_lib("typerefl/include/types.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(emqx_resource). @@ -28,6 +29,7 @@ on_query/3, on_query_async/4, on_batch_query/3, + on_batch_query_async/4, on_get_status/2 ]). @@ -36,6 +38,8 @@ %% callbacks for emqx_resource config schema -export([roots/0]). +-define(CM_KEY, {?MODULE, callback_mode}). + roots() -> [ {name, fun name/1}, @@ -51,7 +55,6 @@ register(required) -> true; register(default) -> false; register(_) -> undefined. --define(CM_KEY, {?MODULE, callback_mode}). callback_mode() -> persistent_term:get(?CM_KEY). @@ -60,17 +63,12 @@ set_callback_mode(Mode) -> on_start(_InstId, #{create_error := true}) -> error("some error"); -on_start(InstId, #{name := Name, stop_error := true} = Opts) -> - Register = maps:get(register, Opts, false), - {ok, Opts#{ - id => InstId, - stop_error => true, - pid => spawn_counter_process(Name, Register) - }}; on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), + StopError = maps:get(stop_error, Opts, false), {ok, Opts#{ id => InstId, + stop_error => StopError, pid => spawn_counter_process(Name, Register) }}. @@ -95,8 +93,11 @@ on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> From = {self(), ReqRef}, Pid ! {From, {inc, N}}, receive - {ReqRef, ok} -> ok; - {ReqRef, incorrect_status} -> {error, {recoverable_error, incorrect_status}} + {ReqRef, ok} -> + ?tp(connector_demo_inc_counter, #{n => N}), + ok; + {ReqRef, incorrect_status} -> + {error, {recoverable_error, incorrect_status}} after 1000 -> {error, timeout} end; @@ -127,18 +128,30 @@ on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) -> ok. on_batch_query(InstId, BatchReq, State) -> - %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed. + %% Requests can be either 'get_counter' or 'inc_counter', but + %% cannot be mixed. case hd(BatchReq) of {inc_counter, _} -> - batch_inc_counter(InstId, BatchReq, State); + batch_inc_counter(sync, InstId, BatchReq, State); get_counter -> - batch_get_counter(InstId, State) + batch_get_counter(sync, InstId, State) end. -batch_inc_counter(InstId, BatchReq, State) -> +on_batch_query_async(InstId, BatchReq, ReplyFunAndArgs, State) -> + %% Requests can be either 'get_counter' or 'inc_counter', but + %% cannot be mixed. + case hd(BatchReq) of + {inc_counter, _} -> + batch_inc_counter({async, ReplyFunAndArgs}, InstId, BatchReq, State); + get_counter -> + batch_get_counter({async, ReplyFunAndArgs}, InstId, State) + end. + +batch_inc_counter(CallMode, InstId, BatchReq, State) -> TotalN = lists:foldl( fun ({inc_counter, N}, Total) -> + ?tp(connector_demo_batch_inc_individual, #{n => N}), Total + N; (Req, _Total) -> error({mixed_requests_not_allowed, {inc_counter, Req}}) @@ -146,10 +159,17 @@ batch_inc_counter(InstId, BatchReq, State) -> 0, BatchReq ), - on_query(InstId, {inc_counter, TotalN}, State). + case CallMode of + sync -> + on_query(InstId, {inc_counter, TotalN}, State); + {async, ReplyFunAndArgs} -> + on_query_async(InstId, {inc_counter, TotalN}, ReplyFunAndArgs, State) + end. -batch_get_counter(InstId, State) -> - on_query(InstId, get_counter, State). +batch_get_counter(sync, InstId, State) -> + on_query(InstId, get_counter, State); +batch_get_counter({async, ReplyFunAndArgs}, InstId, State) -> + on_query_async(InstId, get_counter, ReplyFunAndArgs, State). on_get_status(_InstId, #{health_check_error := true}) -> disconnected; @@ -187,6 +207,7 @@ counter_loop( {inc, N, ReplyFun} when Status == running -> %ct:pal("async counter recv: ~p", [{inc, N}]), apply_reply(ReplyFun, ok), + ?tp(connector_demo_inc_counter_async, #{n => N}), State#{counter => Num + N}; {{FromPid, ReqRef}, {inc, N}} when Status == running -> %ct:pal("sync counter recv: ~p", [{inc, N}]), diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 8a6b179d5..4bea0a1ee 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -30,6 +30,8 @@ -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). +-import(emqx_common_test_helpers, [on_exit/1]). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -37,11 +39,15 @@ groups() -> []. init_per_testcase(_, Config) -> + ct:timetrap({seconds, 30}), emqx_connector_demo:set_callback_mode(always_sync), Config. end_per_testcase(_, _Config) -> - _ = emqx_resource:remove(?ID). + snabbkaffe:stop(), + _ = emqx_resource:remove(?ID), + emqx_common_test_helpers:call_janitor(), + ok. init_per_suite(Config) -> code:ensure_loaded(?TEST_RESOURCE), @@ -140,6 +146,7 @@ t_create_remove_local(_) -> ?assertNot(is_process_alive(Pid)). t_do_not_start_after_created(_) -> + ct:pal("creating resource"), {ok, _} = emqx_resource:create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, @@ -159,16 +166,19 @@ t_do_not_start_after_created(_) -> ), %% start the resource manually.. + ct:pal("starting resource manually"), ok = emqx_resource:start(?ID), {ok, #{pid := Pid}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid)), %% restart the resource + ct:pal("restarting resource"), ok = emqx_resource:restart(?ID), ?assertNot(is_process_alive(Pid)), {ok, #{pid := Pid2}} = emqx_resource:query(?ID, get_state), ?assert(is_process_alive(Pid2)), + ct:pal("removing resource"), ok = emqx_resource:remove_local(?ID), ?assertNot(is_process_alive(Pid2)). @@ -207,12 +217,13 @@ t_query_counter(_) -> ok = emqx_resource:remove_local(?ID). t_batch_query_counter(_) -> + BatchSize = 100, {ok, _} = emqx_resource:create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{enable_batch => true, query_mode => sync} + #{batch_size => BatchSize, query_mode => sync} ), ?check_trace( @@ -225,15 +236,26 @@ t_batch_query_counter(_) -> end ), + NMsgs = 1_000, ?check_trace( ?TRACE_OPTS, - inc_counter_in_parallel(1000), + begin + NEvents = round(math:ceil(NMsgs / BatchSize)), + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := connector_demo_inc_counter}), + NEvents, + _Timeout = 10_000 + ), + inc_counter_in_parallel(NMsgs), + {ok, _} = snabbkaffe:receive_events(SRef), + ok + end, fun(Trace) -> QueryTrace = ?of_kind(call_batch_query, Trace), ?assertMatch([#{batch := BatchReq} | _] when length(BatchReq) > 1, QueryTrace) end ), - {ok, 1000} = emqx_resource:query(?ID, get_counter), + {ok, NMsgs} = emqx_resource:query(?ID, get_counter), ok = emqx_resource:remove_local(?ID). @@ -243,20 +265,28 @@ t_query_counter_async_query(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{query_mode => async, enable_batch => false} + #{query_mode => async, batch_size => 1} ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), + NMsgs = 1_000, ?check_trace( ?TRACE_OPTS, - inc_counter_in_parallel(1000), + begin + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := connector_demo_inc_counter}), + NMsgs, + _Timeout = 60_000 + ), + inc_counter_in_parallel(NMsgs), + {ok, _} = snabbkaffe:receive_events(SRef), + ok + end, fun(Trace) -> - %% the callback_mode if 'emqx_connector_demo' is 'always_sync'. + %% the callback_mode of 'emqx_connector_demo' is 'always_sync'. QueryTrace = ?of_kind(call_query, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) end ), - %% wait for 1s to make sure all the aysnc query is sent to the resource. - timer:sleep(1000), %% simple query ignores the query_mode and batching settings in the resource_worker ?check_trace( ?TRACE_OPTS, @@ -285,20 +315,32 @@ t_query_counter_async_callback(_) -> ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{query_mode => async, enable_batch => false, async_inflight_window => 1000000} + #{ + query_mode => async, + batch_size => 1, + async_inflight_window => 1000000 + } ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), + NMsgs = 1_000, ?check_trace( ?TRACE_OPTS, - inc_counter_in_parallel(1000, ReqOpts), + begin + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), + NMsgs, + _Timeout = 60_000 + ), + inc_counter_in_parallel(NMsgs, ReqOpts), + {ok, _} = snabbkaffe:receive_events(SRef), + ok + end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) end ), - %% wait for 1s to make sure all the aysnc query is sent to the resource. - timer:sleep(1000), %% simple query ignores the query_mode and batching settings in the resource_worker ?check_trace( ?TRACE_OPTS, @@ -325,12 +367,29 @@ t_query_counter_async_callback(_) -> t_query_counter_async_inflight(_) -> emqx_connector_demo:set_callback_mode(async_if_possible), + MetricsTab = ets:new(metrics_tab, [ordered_set, public]), + ok = telemetry:attach_many( + ?FUNCTION_NAME, + emqx_resource_metrics:events(), + fun(Event, Measurements, Meta, _Config) -> + ets:insert( + MetricsTab, + {erlang:monotonic_time(), #{ + event => Event, measurements => Measurements, metadata => Meta + }} + ), + ok + end, + unused_config + ), + on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end), Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), - Insert0 = fun(Tab, Result) -> - ets:insert(Tab, {make_ref(), Result}) + Insert0 = fun(Tab, Ref, Result) -> + ct:pal("inserting ~p", [{Ref, Result}]), + ets:insert(Tab, {Ref, Result}) end, - ReqOpts = #{async_reply_fun => {Insert0, [Tab0]}}, + ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end, WindowSize = 15, {ok, _} = emqx_resource:create_local( ?ID, @@ -339,11 +398,10 @@ t_query_counter_async_inflight(_) -> #{name => test_resource, register => true}, #{ query_mode => async, - enable_batch => false, + batch_size => 1, async_inflight_window => WindowSize, worker_pool_size => 1, - resume_interval => 300, - enable_queue => false + resume_interval => 300 } ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), @@ -360,40 +418,76 @@ t_query_counter_async_inflight(_) -> ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) end ), + tap_metrics(?LINE), %% this will block the resource_worker as the inflight window is full now - ok = emqx_resource:query(?ID, {inc_counter, 1}), + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {inc_counter, 2}), + #{?snk_kind := resource_worker_enter_blocked}, + 1_000 + ), ?assertMatch(0, ets:info(Tab0, size)), - %% sleep to make the resource_worker resume some times - timer:sleep(2000), + tap_metrics(?LINE), %% send query now will fail because the resource is blocked. Insert = fun(Tab, Ref, Result) -> - ets:insert(Tab, {Ref, Result}) + ct:pal("inserting ~p", [{Ref, Result}]), + ets:insert(Tab, {Ref, Result}), + ?tp(tmp_query_inserted, #{}) end, - ok = emqx_resource:query(?ID, {inc_counter, 1}, #{ - async_reply_fun => {Insert, [Tab0, tmp_query]} - }), - timer:sleep(100), + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {inc_counter, 3}, #{ + async_reply_fun => {Insert, [Tab0, tmp_query]} + }), + #{?snk_kind := tmp_query_inserted}, + 1_000 + ), + %% since this counts as a failure, it'll be enqueued and retried + %% later, when the resource is unblocked. ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)), + tap_metrics(?LINE), - %% all response should be received after the resource is resumed. + %% all responses should be received after the resource is resumed. + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), + %% +1 because the tmp_query above will be retried and succeed + %% this time. + WindowSize + 1, + _Timeout = 60_000 + ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), - timer:sleep(1000), + tap_metrics(?LINE), + {ok, _} = snabbkaffe:receive_events(SRef0), + %% since the previous tmp_query was enqueued to be retried, we + %% take it again from the table; this time, it should have + %% succeeded. + ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), ?assertEqual(WindowSize, ets:info(Tab0, size)), + tap_metrics(?LINE), %% send async query, this time everything should be ok. Num = 10, ?check_trace( ?TRACE_OPTS, - inc_counter_in_parallel(Num, ReqOpts), + begin + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), + Num, + _Timeout = 60_000 + ), + inc_counter_in_parallel(Num, ReqOpts), + {ok, _} = snabbkaffe:receive_events(SRef), + ok + end, fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), - ?assertMatch([#{query := {query, _, {inc_counter, 1}, _}} | _], QueryTrace) + ?assertMatch([#{query := {query, _, {inc_counter, _}, _}} | _], QueryTrace) end ), - timer:sleep(1000), - ?assertEqual(WindowSize + Num, ets:info(Tab0, size)), + ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), + tap_metrics(?LINE), %% block the resource ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), @@ -411,27 +505,253 @@ t_query_counter_async_inflight(_) -> ok = emqx_resource:query(?ID, {inc_counter, 1}), Sent = WindowSize + Num + WindowSize, + {ok, SRef1} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), + WindowSize, + _Timeout = 60_000 + ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), - timer:sleep(1000), + {ok, _} = snabbkaffe:receive_events(SRef1), ?assertEqual(Sent, ets:info(Tab0, size)), {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), ?assert(Sent =< Counter), - {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), - ct:pal("metrics: ~p", [C]), - {ok, IncorrectStatusCount} = emqx_resource:simple_sync_query(?ID, get_incorrect_status_count), - %% The `simple_sync_query' we just did also increases the matched - %% count, hence the + 1. - ExtraSimpleCallCount = IncorrectStatusCount + 1, + %% give the metrics some time to stabilize. + ct:sleep(1000), + #{counters := C, gauges := G} = tap_metrics(?LINE), ?assertMatch( - #{matched := M, success := Ss, dropped := Dp, 'retried.success' := Rs} when - M == Ss + Dp - Rs + ExtraSimpleCallCount, - C, #{ - metrics => C, - extra_simple_call_count => ExtraSimpleCallCount + counters := + #{matched := M, success := Ss, dropped := Dp}, + gauges := #{queuing := Qing, inflight := Infl} + } when + M == Ss + Dp + Qing + Infl, + #{counters => C, gauges => G}, + #{ + metrics => #{counters => C, gauges => G}, + results => ets:tab2list(Tab0), + metrics_trace => ets:tab2list(MetricsTab) + } + ), + ?assert( + lists:all( + fun + ({_, ok}) -> true; + (_) -> false + end, + ets:tab2list(Tab0) + ) + ), + ok = emqx_resource:remove_local(?ID). + +t_query_counter_async_inflight_batch(_) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + MetricsTab = ets:new(metrics_tab, [ordered_set, public]), + ok = telemetry:attach_many( + ?FUNCTION_NAME, + emqx_resource_metrics:events(), + fun(Event, Measurements, Meta, _Config) -> + ets:insert( + MetricsTab, + {erlang:monotonic_time(), #{ + event => Event, measurements => Measurements, metadata => Meta + }} + ), + ok + end, + unused_config + ), + on_exit(fun() -> telemetry:detach(?FUNCTION_NAME) end), + + Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), + Insert0 = fun(Tab, Ref, Result) -> + ct:pal("inserting ~p", [{Ref, Result}]), + ets:insert(Tab, {Ref, Result}) + end, + ReqOpts = fun() -> #{async_reply_fun => {Insert0, [Tab0, make_ref()]}} end, + BatchSize = 2, + WindowSize = 3, + {ok, _} = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, register => true}, + #{ + query_mode => async, + batch_size => BatchSize, + async_inflight_window => WindowSize, + worker_pool_size => 1, + resume_interval => 300 + } + ), + ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), + + %% block the resource + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), + + %% send async query to make the inflight window full + NumMsgs = BatchSize * WindowSize, + ?check_trace( + begin + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := call_batch_query_async}), + WindowSize, + _Timeout = 60_000 + ), + inc_counter_in_parallel(NumMsgs, ReqOpts), + {ok, _} = snabbkaffe:receive_events(SRef), + ok + end, + fun(Trace) -> + QueryTrace = ?of_kind(call_batch_query_async, Trace), + ?assertMatch( + [ + #{ + batch := [ + {query, _, {inc_counter, 1}, _}, + {query, _, {inc_counter, 1}, _} + ] + } + | _ + ], + QueryTrace + ) + end + ), + tap_metrics(?LINE), + + ?check_trace( + begin + %% this will block the resource_worker as the inflight window is full now + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {inc_counter, 2}), + #{?snk_kind := resource_worker_enter_blocked}, + 5_000 + ), + ?assertMatch(0, ets:info(Tab0, size)), + ok + end, + [] + ), + + tap_metrics(?LINE), + %% send query now will fail because the resource is blocked. + Insert = fun(Tab, Ref, Result) -> + ct:pal("inserting ~p", [{Ref, Result}]), + ets:insert(Tab, {Ref, Result}), + ?tp(tmp_query_inserted, #{}) + end, + {ok, {ok, _}} = + ?wait_async_action( + emqx_resource:query(?ID, {inc_counter, 3}, #{ + async_reply_fun => {Insert, [Tab0, tmp_query]} + }), + #{?snk_kind := tmp_query_inserted}, + 1_000 + ), + %% since this counts as a failure, it'll be enqueued and retried + %% later, when the resource is unblocked. + ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)), + tap_metrics(?LINE), + + %% all responses should be received after the resource is resumed. + {ok, SRef0} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), + %% +1 because the tmp_query above will be retried and succeed + %% this time. + WindowSize + 1, + _Timeout = 60_000 + ), + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), + tap_metrics(?LINE), + {ok, _} = snabbkaffe:receive_events(SRef0), + %% since the previous tmp_query was enqueued to be retried, we + %% take it again from the table; this time, it should have + %% succeeded. + ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), + ?assertEqual(NumMsgs, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), + tap_metrics(?LINE), + + %% send async query, this time everything should be ok. + NumBatches1 = 3, + NumMsgs1 = BatchSize * NumBatches1, + ?check_trace( + ?TRACE_OPTS, + begin + {ok, SRef} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), + NumBatches1, + _Timeout = 60_000 + ), + inc_counter_in_parallel(NumMsgs1, ReqOpts), + {ok, _} = snabbkaffe:receive_events(SRef), + ok + end, + fun(Trace) -> + QueryTrace = ?of_kind(call_batch_query_async, Trace), + ?assertMatch( + [#{batch := [{query, _, {inc_counter, _}, _} | _]} | _], + QueryTrace + ) + end + ), + ?assertEqual( + NumMsgs + NumMsgs1, + ets:info(Tab0, size), + #{tab => ets:tab2list(Tab0)} + ), + tap_metrics(?LINE), + + %% block the resource + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), + %% again, send async query to make the inflight window full + ?check_trace( + ?TRACE_OPTS, + inc_counter_in_parallel(WindowSize, ReqOpts), + fun(Trace) -> + QueryTrace = ?of_kind(call_batch_query_async, Trace), + ?assertMatch( + [#{batch := [{query, _, {inc_counter, _}, _} | _]} | _], + QueryTrace + ) + end + ), + + %% this will block the resource_worker + ok = emqx_resource:query(?ID, {inc_counter, 1}), + + Sent = NumMsgs + NumMsgs1 + WindowSize, + {ok, SRef1} = snabbkaffe:subscribe( + ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), + WindowSize, + _Timeout = 60_000 + ), + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), + {ok, _} = snabbkaffe:receive_events(SRef1), + ?assertEqual(Sent, ets:info(Tab0, size)), + + {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), + ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), + ?assert(Sent =< Counter), + + %% give the metrics some time to stabilize. + ct:sleep(1000), + #{counters := C, gauges := G} = tap_metrics(?LINE), + ?assertMatch( + #{ + counters := + #{matched := M, success := Ss, dropped := Dp}, + gauges := #{queuing := Qing, inflight := Infl} + } when + M == Ss + Dp + Qing + Infl, + #{counters => C, gauges => G}, + #{ + metrics => #{counters => C, gauges => G}, + results => ets:tab2list(Tab0), + metrics_trace => ets:tab2list(MetricsTab) } ), ?assert( @@ -506,9 +826,9 @@ t_stop_start(_) -> %% add some metrics to test their persistence WorkerID0 = <<"worker:0">>, WorkerID1 = <<"worker:1">>, - emqx_resource_metrics:batching_set(?ID, WorkerID0, 2), - emqx_resource_metrics:batching_set(?ID, WorkerID1, 3), - ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), + emqx_resource_metrics:inflight_set(?ID, WorkerID0, 2), + emqx_resource_metrics:inflight_set(?ID, WorkerID1, 3), + ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), {ok, _} = emqx_resource:check_and_recreate( ?ID, @@ -522,7 +842,7 @@ t_stop_start(_) -> ?assert(is_process_alive(Pid0)), %% metrics are reset when recreating - ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ok = emqx_resource:stop(?ID), @@ -541,11 +861,11 @@ t_stop_start(_) -> ?assert(is_process_alive(Pid1)), %% now stop while resetting the metrics - emqx_resource_metrics:batching_set(?ID, WorkerID0, 1), - emqx_resource_metrics:batching_set(?ID, WorkerID1, 4), - ?assertEqual(5, emqx_resource_metrics:batching_get(?ID)), + emqx_resource_metrics:inflight_set(?ID, WorkerID0, 1), + emqx_resource_metrics:inflight_set(?ID, WorkerID1, 4), + ?assertEqual(5, emqx_resource_metrics:inflight_get(?ID)), ok = emqx_resource:stop(?ID), - ?assertEqual(0, emqx_resource_metrics:batching_get(?ID)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(?ID)), ok. @@ -641,18 +961,22 @@ create_dry_run_local_succ() -> ?assertEqual(undefined, whereis(test_resource)). t_create_dry_run_local_failed(_) -> + ct:timetrap({seconds, 120}), + ct:pal("creating with creation error"), Res1 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{create_error => true} ), ?assertMatch({error, _}, Res1), + ct:pal("creating with health check error"), Res2 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, health_check_error => true} ), ?assertMatch({error, _}, Res2), + ct:pal("creating with stop error"), Res3 = emqx_resource:create_dry_run_local( ?TEST_RESOURCE, #{name => test_resource, stop_error => true} @@ -689,16 +1013,116 @@ t_auto_retry(_) -> ), ?assertEqual(ok, Res). +t_retry_batch(_Config) -> + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + batch_size => 5, + worker_pool_size => 1, + resume_interval => 1_000 + } + ), + + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), + Matched0 = emqx_resource_metrics:matched_get(?ID), + ?assertEqual(1, Matched0), + + %% these requests will batch together and fail; the buffer worker + %% will enter the `blocked' state and they'll be retried later, + %% after it unblocks. + Payloads = lists:seq(1, 5), + NumPayloads = length(Payloads), + ExpectedCount = 15, + + ?check_trace( + begin + {ok, {ok, _}} = + ?wait_async_action( + lists:foreach( + fun(N) -> + ok = emqx_resource:query(?ID, {inc_counter, N}) + end, + Payloads + ), + #{?snk_kind := resource_worker_enter_blocked}, + 5_000 + ), + %% now the individual messages should have been counted + Matched1 = emqx_resource_metrics:matched_get(?ID), + ?assertEqual(Matched0 + NumPayloads, Matched1), + + %% wait for two more retries while the failure is enabled; the + %% batch shall remain enqueued. + {ok, _} = + snabbkaffe:block_until( + ?match_n_events(2, #{?snk_kind := resource_worker_retry_queue_batch_failed}), + 5_000 + ), + %% should not have increased the matched count with the retries + Matched2 = emqx_resource_metrics:matched_get(?ID), + ?assertEqual(Matched1, Matched2), + + %% now unblock the buffer worker so it may retry the batch, + %% but it'll still fail + {ok, {ok, _}} = + ?wait_async_action( + ok = emqx_resource:simple_sync_query(?ID, resume), + #{?snk_kind := resource_worker_retry_queue_batch_succeeded}, + 5_000 + ), + %% 1 more because of the `resume' call + Matched3 = emqx_resource_metrics:matched_get(?ID), + ?assertEqual(Matched2 + 1, Matched3), + + {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), + {Counter, Matched3} + end, + fun({Counter, Matched3}, Trace) -> + %% 1 original attempt + 2 failed retries + final + %% successful attempt. + %% each time should be the original batch (no duplicate + %% elements or reordering). + ExpectedSeenPayloads = lists:flatten(lists:duplicate(4, Payloads)), + ?assertEqual( + ExpectedSeenPayloads, + ?projection(n, ?of_kind(connector_demo_batch_inc_individual, Trace)) + ), + ?assertMatch( + [#{n := ExpectedCount}], + ?of_kind(connector_demo_inc_counter, Trace) + ), + ?assertEqual(ExpectedCount, Counter), + %% matched should count only the original requests, and not retries + %% + 1 for `resume' call + %% + 1 for `block' call + %% + 1 for `get_counter' call + %% and the message count (1 time) + Matched4 = emqx_resource_metrics:matched_get(?ID), + ?assertEqual(Matched3 + 1, Matched4), + ok + end + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ inc_counter_in_parallel(N) -> inc_counter_in_parallel(N, #{}). -inc_counter_in_parallel(N, Opts) -> +inc_counter_in_parallel(N, Opts0) -> Parent = self(), Pids = [ erlang:spawn(fun() -> + Opts = + case is_function(Opts0) of + true -> Opts0(); + false -> Opts0 + end, emqx_resource:query(?ID, {inc_counter, 1}, Opts), Parent ! {complete, self()} end) @@ -719,3 +1143,8 @@ bin_config() -> config() -> {ok, Config} = hocon:binary(bin_config()), Config. + +tap_metrics(Line) -> + {ok, _, #{metrics := #{counters := C, gauges := G}}} = emqx_resource:get_instance(?ID), + ct:pal("metrics (l. ~b): ~p", [Line, #{counters => C, gauges => G}]), + #{counters => C, gauges => G}. diff --git a/changes/v5.0.14/feat-9642.en.md b/changes/v5.0.14/feat-9642.en.md new file mode 100644 index 000000000..19de4d946 --- /dev/null +++ b/changes/v5.0.14/feat-9642.en.md @@ -0,0 +1 @@ +Deprecates `enable_batch` and `enable_queue` options for bridges/resources. After this change, queuing is always enabled for bridges, and batching is controlled by the `batch_size` option: `batch_size > 1` means batching will be enabled. diff --git a/changes/v5.0.14/feat-9642.zh.md b/changes/v5.0.14/feat-9642.zh.md new file mode 100644 index 000000000..e394abdbb --- /dev/null +++ b/changes/v5.0.14/feat-9642.zh.md @@ -0,0 +1 @@ +废弃了桥接的 `enable_batch` 和 `enable_queue` 配置项 。在这一改变之后,桥接的工作进程总是启用缓存队列,而批处理由 `batch_size` 选项控制:`batch_size > 1` 则意味着启用批处理。 diff --git a/changes/v5.0.14/fix-9642.en.md b/changes/v5.0.14/fix-9642.en.md new file mode 100644 index 000000000..12906299e --- /dev/null +++ b/changes/v5.0.14/fix-9642.en.md @@ -0,0 +1,3 @@ +Fix some issues that could lead to wrong bridge metrics. +Fix and issue that could lead to message loss and wrong metrics with Kafka Producer bridge when Kafka or the connection to it is down. +Fix some issues that could lead to the same message being delivered more than once when using batching for bridges and when the batch was retried. diff --git a/changes/v5.0.14/fix-9642.zh.md b/changes/v5.0.14/fix-9642.zh.md new file mode 100644 index 000000000..2565c422c --- /dev/null +++ b/changes/v5.0.14/fix-9642.zh.md @@ -0,0 +1,3 @@ +修复一些可能导致错误桥接指标的问题。 +修复当Kafka或其连接中断时,可能导致Kafka Producer桥的消息丢失和错误指标的问题。 +修复一些问题,这些问题可能导致在为桥接使用批处理时,同一消息被多次传递,以及批处理被重试时。 diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index c445e9a56..bf5d2e140 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -58,11 +58,9 @@ values(post) -> worker_pool_size => 1, health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, - enable_batch => true, batch_size => ?DEFAULT_BATCH_SIZE, batch_time => ?DEFAULT_BATCH_TIME, query_mode => async, - enable_queue => false, max_queue_bytes => ?DEFAULT_QUEUE_SIZE } }; diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index 42c4d12e8..727a6df4b 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -79,8 +79,7 @@ values(common, RedisType, SpecificOpts) -> auto_reconnect => true, command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], resource_opts => #{ - enable_batch => false, - batch_size => 100, + batch_size => 1, batch_time => <<"20ms">> }, ssl => #{enable => false} diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 6b46de35e..a7de100d6 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_bridge_impl_kafka_producer). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + %% callbacks of behaviour emqx_resource -export([ callback_mode/0, @@ -36,7 +38,7 @@ on_start(InstId, Config) -> %% TODO: change this to `kafka_producer` after refactoring for kafka_consumer BridgeType = kafka, ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), - _ = maybe_install_wolff_telemetry_handlers(InstId, ResourceID), + _ = maybe_install_wolff_telemetry_handlers(ResourceID), %% it's a bug if producer config is not found %% the caller should not try to start a producer if %% there is no producer config @@ -79,7 +81,8 @@ on_start(InstId, Config) -> {ok, #{ message_template => compile_message_template(MessageTemplate), client_id => ClientId, - producers => Producers + producers => Producers, + resource_id => ResourceID }}; {error, Reason2} -> ?SLOG(error, #{ @@ -105,7 +108,7 @@ on_start(InstId, Config) -> throw(failed_to_start_kafka_producer) end. -on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> +on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> _ = with_log_at_error( fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, #{ @@ -121,7 +124,7 @@ on_stop(InstanceID, #{client_id := ClientID, producers := Producers}) -> } ), with_log_at_error( - fun() -> uninstall_telemetry_handlers(InstanceID) end, + fun() -> uninstall_telemetry_handlers(ResourceID) end, #{ msg => "failed_to_uninstall_telemetry_handlers", client_id => ClientID @@ -176,15 +179,14 @@ on_kafka_ack(_Partition, _Offset, _Extra) -> %% Maybe need to bump some counters? ok. -on_get_status(_InstId, #{client_id := ClientID}) -> - case wolff:check_connectivity(ClientID) of - ok -> connected; - _ -> disconnected - end. +on_get_status(_InstId, _State) -> + connected. %% Parse comma separated host:port list into a [{Host,Port}] list -hosts(Hosts) -> - emqx_schema:parse_servers(Hosts, emqx_ee_bridge_kafka:host_opts()). +hosts(Hosts) when is_binary(Hosts) -> + hosts(binary_to_list(Hosts)); +hosts(Hosts) when is_list(Hosts) -> + kpro:parse_endpoints(Hosts). %% Extra socket options, such as sndbuf size etc. socket_opts(Opts) when is_map(Opts) -> @@ -240,14 +242,10 @@ producers_config(BridgeName, ClientId, Input) -> mode := BufferMode, per_partition_limit := PerPartitionLimit, segment_bytes := SegmentBytes, - memory_overload_protection := MemOLP0 + memory_overload_protection := MemOLP } } = Input, - MemOLP = - case os:type() of - {unix, linux} -> MemOLP0; - _ -> false - end, + {OffloadMode, ReplayqDir} = case BufferMode of memory -> {false, false}; @@ -384,20 +382,23 @@ handle_telemetry_event(_EventId, _Metrics, _MetaData, _HandlerConfig) -> %% Event that we do not handle ok. --spec telemetry_handler_id(emqx_resource:resource_id()) -> binary(). -telemetry_handler_id(InstanceID) -> - <<"emqx-bridge-kafka-producer-", InstanceID/binary>>. +%% Note: don't use the instance/manager ID, as that changes everytime +%% the bridge is recreated, and will lead to multiplication of +%% metrics. +-spec telemetry_handler_id(resource_id()) -> binary(). +telemetry_handler_id(ResourceID) -> + <<"emqx-bridge-kafka-producer-", ResourceID/binary>>. -uninstall_telemetry_handlers(InstanceID) -> - HandlerID = telemetry_handler_id(InstanceID), +uninstall_telemetry_handlers(ResourceID) -> + HandlerID = telemetry_handler_id(ResourceID), telemetry:detach(HandlerID). -maybe_install_wolff_telemetry_handlers(InstanceID, ResourceID) -> +maybe_install_wolff_telemetry_handlers(ResourceID) -> %% Attach event handlers for Kafka telemetry events. If a handler with the %% handler id already exists, the attach_many function does nothing telemetry:attach_many( %% unique handler id - telemetry_handler_id(InstanceID), + telemetry_handler_id(ResourceID), [ [wolff, dropped], [wolff, dropped_queue_full], diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index da4c1007a..bdde21c76 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -287,11 +287,9 @@ kafka_bridge_rest_api_helper(Config) -> ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), @@ -314,11 +312,9 @@ kafka_bridge_rest_api_helper(Config) -> ?assertEqual(0, emqx_resource_metrics:dropped_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:failed_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:inflight_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:batching_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:queuing_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:dropped_other_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:dropped_queue_full_get(ResourceId)), - ?assertEqual(0, emqx_resource_metrics:dropped_queue_not_enabled_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:dropped_resource_not_found_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:dropped_resource_stopped_get(ResourceId)), ?assertEqual(0, emqx_resource_metrics:retried_get(ResourceId)), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 70125596c..91fd2f399 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -38,18 +38,12 @@ groups() -> {group, sync_query}, {group, async_query} ], - QueueGroups = [ - {group, queue_enabled}, - {group, queue_disabled} - ], ResourceGroups = [{group, gcp_pubsub}], [ {with_batch, SynchronyGroups}, {without_batch, SynchronyGroups}, - {sync_query, QueueGroups}, - {async_query, QueueGroups}, - {queue_enabled, ResourceGroups}, - {queue_disabled, ResourceGroups}, + {sync_query, ResourceGroups}, + {async_query, ResourceGroups}, {gcp_pubsub, MatrixTCs} ]. @@ -99,13 +93,9 @@ init_per_group(sync_query, Config) -> init_per_group(async_query, Config) -> [{query_mode, async} | Config]; init_per_group(with_batch, Config) -> - [{enable_batch, true} | Config]; + [{batch_size, 100} | Config]; init_per_group(without_batch, Config) -> - [{enable_batch, false} | Config]; -init_per_group(queue_enabled, Config) -> - [{enable_queue, true} | Config]; -init_per_group(queue_disabled, Config) -> - [{enable_queue, false} | Config]; + [{batch_size, 1} | Config]; init_per_group(_Group, Config) -> Config. @@ -118,16 +108,16 @@ end_per_group(_Group, _Config) -> init_per_testcase(TestCase, Config0) when TestCase =:= t_publish_success_batch -> - case ?config(enable_batch, Config0) of - true -> + case ?config(batch_size, Config0) of + 1 -> + {skip, no_batching}; + _ -> {ok, _} = start_echo_http_server(), delete_all_bridges(), Tid = install_telemetry_handler(TestCase), Config = generate_config(Config0), put(telemetry_table, Tid), - [{telemetry_table, Tid} | Config]; - false -> - {skip, no_batching} + [{telemetry_table, Tid} | Config] end; init_per_testcase(TestCase, Config0) -> {ok, _} = start_echo_http_server(), @@ -271,9 +261,7 @@ certs() -> ]. gcp_pubsub_config(Config) -> - EnableBatch = proplists:get_value(enable_batch, Config, true), QueryMode = proplists:get_value(query_mode, Config, sync), - EnableQueue = proplists:get_value(enable_queue, Config, false), BatchSize = proplists:get_value(batch_size, Config, 100), BatchTime = proplists:get_value(batch_time, Config, <<"20ms">>), PayloadTemplate = proplists:get_value(payload_template, Config, ""), @@ -296,9 +284,7 @@ gcp_pubsub_config(Config) -> " pipelining = ~b\n" " resource_opts = {\n" " worker_pool_size = 1\n" - " enable_batch = ~p\n" " query_mode = ~s\n" - " enable_queue = ~p\n" " batch_size = ~b\n" " batch_time = \"~s\"\n" " }\n" @@ -309,9 +295,7 @@ gcp_pubsub_config(Config) -> PayloadTemplate, PubSubTopic, PipelineSize, - EnableBatch, QueryMode, - EnableQueue, BatchSize, BatchTime ] @@ -358,11 +342,9 @@ service_account_json(PrivateKeyPEM) -> metrics_mapping() -> #{ - batching => fun emqx_resource_metrics:batching_get/1, dropped => fun emqx_resource_metrics:dropped_get/1, dropped_other => fun emqx_resource_metrics:dropped_other_get/1, dropped_queue_full => fun emqx_resource_metrics:dropped_queue_full_get/1, - dropped_queue_not_enabled => fun emqx_resource_metrics:dropped_queue_not_enabled_get/1, dropped_resource_not_found => fun emqx_resource_metrics:dropped_resource_not_found_get/1, dropped_resource_stopped => fun emqx_resource_metrics:dropped_resource_stopped_get/1, failed => fun emqx_resource_metrics:failed_get/1, @@ -625,7 +607,6 @@ t_publish_success(Config) -> ), assert_metrics( #{ - batching => 0, dropped => 0, failed => 0, inflight => 0, @@ -674,7 +655,6 @@ t_publish_success_local_topic(Config) -> ), assert_metrics( #{ - batching => 0, dropped => 0, failed => 0, inflight => 0, @@ -761,7 +741,6 @@ t_publish_templated(Config) -> ), assert_metrics( #{ - batching => 0, dropped => 0, failed => 0, inflight => 0, @@ -830,11 +809,10 @@ t_publish_success_batch(Config) -> wait_until_gauge_is(inflight, 0, _Timeout = 400), assert_metrics( #{ - batching => 0, dropped => 0, failed => 0, inflight => 0, - matched => NumMessages div BatchSize, + matched => NumMessages, queuing => 0, retried => 0, success => NumMessages @@ -1013,7 +991,6 @@ t_publish_timeout(Config) -> do_econnrefused_or_timeout_test(Config, timeout). do_econnrefused_or_timeout_test(Config, Error) -> - EnableQueue = ?config(enable_queue, Config), QueryMode = ?config(query_mode, Config), ResourceId = ?config(resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), @@ -1089,39 +1066,22 @@ do_econnrefused_or_timeout_test(Config, Error) -> end ), - case {Error, QueryMode, EnableQueue} of - {_, sync, false} -> - wait_telemetry_event(TelemetryTable, dropped_queue_not_enabled, ResourceId, #{ - timeout => 10_000, - n_events => 1 - }), - assert_metrics( - #{ - batching => 0, - dropped => 1, - dropped_queue_not_enabled => 1, - failed => 0, - inflight => 0, - matched => 1, - queuing => 0, - retried => 0, - success => 0 - }, - ResourceId - ); + case {Error, QueryMode} of %% apparently, async with disabled queue doesn't mark the %% message as dropped; and since it never considers the %% response expired, this succeeds. - {econnrefused, async, _} -> + {econnrefused, async} -> wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ timeout => 10_000, n_events => 1 }), + %% even waiting, hard to avoid flakiness... simpler to just sleep + %% a bit until stabilization. + ct:sleep(200), CurrentMetrics = current_metrics(ResourceId), RecordedEvents = ets:tab2list(TelemetryTable), ct:pal("telemetry events: ~p", [RecordedEvents]), ?assertMatch( #{ - batching := 0, dropped := Dropped, failed := 0, inflight := Inflight, @@ -1132,7 +1092,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> } when Matched >= 1 andalso Inflight + Queueing + Dropped =< 2, CurrentMetrics ); - {timeout, async, _} -> + {timeout, async} -> wait_telemetry_event(TelemetryTable, success, ResourceId, #{ timeout => 10_000, n_events => 2 }), @@ -1140,7 +1100,6 @@ do_econnrefused_or_timeout_test(Config, Error) -> wait_until_gauge_is(queuing, 0, _Timeout = 400), assert_metrics( #{ - batching => 0, dropped => 0, failed => 0, inflight => 0, @@ -1151,13 +1110,15 @@ do_econnrefused_or_timeout_test(Config, Error) -> }, ResourceId ); - {_, sync, true} -> + {_, sync} -> wait_telemetry_event(TelemetryTable, queuing, ResourceId, #{ timeout => 10_000, n_events => 2 }), + %% even waiting, hard to avoid flakiness... simpler to just sleep + %% a bit until stabilization. + ct:sleep(200), assert_metrics( #{ - batching => 0, dropped => 0, failed => 0, inflight => 0, @@ -1364,9 +1325,11 @@ t_unrecoverable_error(Config) -> ResourceId, #{n_events => ExpectedInflightEvents, timeout => 5_000} ), + %% even waiting, hard to avoid flakiness... simpler to just sleep + %% a bit until stabilization. + ct:sleep(200), assert_metrics( #{ - batching => 0, dropped => 0, failed => 1, inflight => 0, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl index f2037ba14..6331611d0 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -204,9 +204,9 @@ init_per_group(sync_query, Config) -> init_per_group(async_query, Config) -> [{query_mode, async} | Config]; init_per_group(with_batch, Config) -> - [{enable_batch, true} | Config]; + [{batch_size, 100} | Config]; init_per_group(without_batch, Config) -> - [{enable_batch, false} | Config]; + [{batch_size, 1} | Config]; init_per_group(_Group, Config) -> Config. @@ -261,7 +261,6 @@ example_write_syntax() -> "${undef_key}=\"hard-coded-value\",", "bool=${payload.bool}">>. influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> - EnableBatch = proplists:get_value(enable_batch, Config, true), BatchSize = proplists:get_value(batch_size, Config, 100), QueryMode = proplists:get_value(query_mode, Config, sync), UseTLS = proplists:get_value(use_tls, Config, false), @@ -278,7 +277,6 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" - " enable_batch = ~p\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" @@ -292,7 +290,6 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> InfluxDBHost, InfluxDBPort, WriteSyntax, - EnableBatch, QueryMode, BatchSize, UseTLS @@ -300,7 +297,6 @@ influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> ), {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}; influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> - EnableBatch = proplists:get_value(enable_batch, Config, true), BatchSize = proplists:get_value(batch_size, Config, 100), QueryMode = proplists:get_value(query_mode, Config, sync), UseTLS = proplists:get_value(use_tls, Config, false), @@ -317,7 +313,6 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> " precision = ns\n" " write_syntax = \"~s\"\n" " resource_opts = {\n" - " enable_batch = ~p\n" " query_mode = ~s\n" " batch_size = ~b\n" " }\n" @@ -331,7 +326,6 @@ influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> InfluxDBHost, InfluxDBPort, WriteSyntax, - EnableBatch, QueryMode, BatchSize, UseTLS @@ -723,7 +717,7 @@ t_bad_timestamp(Config) -> InfluxDBType = ?config(influxdb_type, Config), InfluxDBName = ?config(influxdb_name, Config), QueryMode = ?config(query_mode, Config), - EnableBatch = ?config(enable_batch, Config), + BatchSize = ?config(batch_size, Config), InfluxDBConfigString0 = ?config(influxdb_config_string, Config), InfluxDBTypeCfg = case InfluxDBType of @@ -774,7 +768,8 @@ t_bad_timestamp(Config) -> fun(Result, Trace) -> ?assertMatch({_, {ok, _}}, Result), {Return, {ok, _}} = Result, - case {QueryMode, EnableBatch} of + IsBatch = BatchSize > 1, + case {QueryMode, IsBatch} of {async, true} -> ?assertEqual(ok, Return), ?assertMatch( @@ -921,12 +916,13 @@ t_write_failure(Config) -> t_missing_field(Config) -> QueryMode = ?config(query_mode, Config), - EnableBatch = ?config(enable_batch, Config), + BatchSize = ?config(batch_size, Config), + IsBatch = BatchSize > 1, {ok, _} = create_bridge( Config, #{ - <<"resource_opts">> => #{<<"batch_size">> => 1}, + <<"resource_opts">> => #{<<"worker_pool_size">> => 1}, <<"write_syntax">> => <<"${clientid} foo=${foo}i">> } ), @@ -943,9 +939,14 @@ t_missing_field(Config) -> begin emqx:publish(Msg0), emqx:publish(Msg1), + NEvents = + case IsBatch of + true -> 1; + false -> 2 + end, {ok, _} = snabbkaffe:block_until( - ?match_n_events(2, #{ + ?match_n_events(NEvents, #{ ?snk_kind := influxdb_connector_send_query_error, mode := QueryMode }), @@ -956,10 +957,10 @@ t_missing_field(Config) -> fun(Trace) -> PersistedData0 = query_by_clientid(ClientId0, Config), PersistedData1 = query_by_clientid(ClientId1, Config), - case EnableBatch of + case IsBatch of true -> ?assertMatch( - [#{error := points_trans_failed}, #{error := points_trans_failed} | _], + [#{error := points_trans_failed} | _], ?of_kind(influxdb_connector_send_query_error, Trace) ); false -> diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index caace762a..812c4ee85 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -78,10 +78,10 @@ init_per_group(tls, Config) -> | Config ]; init_per_group(with_batch, Config0) -> - Config = [{enable_batch, true} | Config0], + Config = [{batch_size, 100} | Config0], common_init(Config); init_per_group(without_batch, Config0) -> - Config = [{enable_batch, false} | Config0], + Config = [{batch_size, 1} | Config0], common_init(Config); init_per_group(_Group, Config) -> Config. @@ -157,7 +157,7 @@ mysql_config(BridgeType, Config) -> MysqlPort = integer_to_list(?config(mysql_port, Config)), Server = ?config(mysql_host, Config) ++ ":" ++ MysqlPort, Name = atom_to_binary(?MODULE), - EnableBatch = ?config(enable_batch, Config), + BatchSize = ?config(batch_size, Config), QueryMode = ?config(query_mode, Config), TlsEnabled = ?config(enable_tls, Config), ConfigString = @@ -170,7 +170,7 @@ mysql_config(BridgeType, Config) -> " password = ~p\n" " sql = ~p\n" " resource_opts = {\n" - " enable_batch = ~p\n" + " batch_size = ~b\n" " query_mode = ~s\n" " }\n" " ssl = {\n" @@ -185,7 +185,7 @@ mysql_config(BridgeType, Config) -> ?MYSQL_USERNAME, ?MYSQL_PASSWORD, ?SQL_BRIDGE, - EnableBatch, + BatchSize, QueryMode, TlsEnabled ] @@ -440,7 +440,9 @@ t_simple_sql_query(Config) -> ), Request = {sql, <<"SELECT count(1) AS T">>}, Result = query_resource(Config, Request), - case ?config(enable_batch, Config) of + BatchSize = ?config(batch_size, Config), + IsBatch = BatchSize > 1, + case IsBatch of true -> ?assertEqual({error, batch_select_not_implemented}, Result); false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result) end, @@ -452,7 +454,9 @@ t_missing_data(Config) -> create_bridge(Config) ), Result = send_message(Config, #{}), - case ?config(enable_batch, Config) of + BatchSize = ?config(batch_size, Config), + IsBatch = BatchSize > 1, + case IsBatch of true -> ?assertMatch( {error, {1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}, Result @@ -469,7 +473,9 @@ t_bad_sql_parameter(Config) -> ), Request = {sql, <<"">>, [bad_parameter]}, Result = query_resource(Config, Request), - case ?config(enable_batch, Config) of + BatchSize = ?config(batch_size, Config), + IsBatch = BatchSize > 1, + case IsBatch of true -> ?assertEqual({error, invalid_request}, Result); false -> ?assertEqual({error, {invalid_params, [bad_parameter]}}, Result) end, @@ -482,7 +488,9 @@ t_unprepared_statement_query(Config) -> ), Request = {prepared_query, unprepared_query, []}, Result = query_resource(Config, Request), - case ?config(enable_batch, Config) of + BatchSize = ?config(batch_size, Config), + IsBatch = BatchSize > 1, + case IsBatch of true -> ?assertEqual({error, invalid_request}, Result); false -> ?assertEqual({error, prepared_statement_invalid}, Result) end, diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 9f597e101..f88bc42eb 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -220,16 +220,24 @@ t_check_replay(Config) -> begin ?wait_async_action( with_down_failure(Config, ProxyName, fun() -> - ct:sleep(100), - lists:foreach( - fun(_) -> - _ = publish_message(Topic, <<"test_payload">>) - end, - lists:seq(1, ?BATCH_SIZE) - ) + {_, {ok, _}} = + ?wait_async_action( + lists:foreach( + fun(_) -> + _ = publish_message(Topic, <<"test_payload">>) + end, + lists:seq(1, ?BATCH_SIZE) + ), + #{ + ?snk_kind := redis_ee_connector_send_done, + batch := true, + result := {error, _} + }, + 10_000 + ) end), #{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}}, - 10000 + 10_000 ) end, fun(Trace) -> @@ -443,8 +451,6 @@ toxiproxy_redis_bridge_config() -> Conf0 = ?REDIS_TOXYPROXY_CONNECT_CONFIG#{ <<"resource_opts">> => #{ <<"query_mode">> => <<"async">>, - <<"enable_batch">> => <<"true">>, - <<"enable_queue">> => <<"true">>, <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"health_check_interval">> => <<"1s">> @@ -457,8 +463,7 @@ invalid_command_bridge_config() -> Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS), Conf1#{ <<"resource_opts">> => #{ - <<"enable_batch">> => <<"false">>, - <<"enable_queue">> => <<"false">>, + <<"batch_size">> => <<"1">>, <<"worker_pool_size">> => <<"1">> }, <<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>] @@ -468,13 +473,10 @@ resource_configs() -> #{ batch_off => #{ <<"query_mode">> => <<"sync">>, - <<"enable_batch">> => <<"false">>, - <<"enable_queue">> => <<"false">> + <<"batch_size">> => <<"1">> }, batch_on => #{ <<"query_mode">> => <<"async">>, - <<"enable_batch">> => <<"true">>, - <<"enable_queue">> => <<"true">>, <<"worker_pool_size">> => <<"1">>, <<"batch_size">> => integer_to_binary(?BATCH_SIZE) } diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index fc28c3a90..0a64794b6 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -69,6 +69,7 @@ done if [ "${WHICH_APP}" = 'novalue' ]; then echo "must provide --app arg" + help exit 1 fi