From b32563339032f1cc00a7fb61082e264bfa212ba7 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Mon, 19 Sep 2022 14:26:46 +0800 Subject: [PATCH] refactor(resource): resume from queue/inflight-window with async-sending and batching --- .../src/emqx_resource_worker.erl | 167 +++++++++++------- 1 file changed, 105 insertions(+), 62 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 4f7ac63d8..288edcf4f 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -208,14 +208,6 @@ terminate(_Reason, #{id := Id, index := Index}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -queue_item_marshaller(?Q_ITEM(_) = I) -> - term_to_binary(I); -queue_item_marshaller(Bin) when is_binary(Bin) -> - binary_to_term(Bin). - -estimate_size(QItem) -> - size(queue_item_marshaller(QItem)). - %%============================================================================== -define(PICK(ID, KEY, EXPR), try gproc_pool:pick_worker(ID, KEY) of @@ -237,26 +229,60 @@ pick_call(Id, Key, Query, Timeout) -> pick_cast(Id, Key, Query) -> ?PICK(Id, Key, gen_statem:cast(Pid, Query)). -do_resume(#{queue := Q, id := Id, name := Name} = St) -> +do_resume(#{id := Id, name := Name} = St) -> case inflight_get_first(Name) of empty -> - retry_first_from_queue(Q, Id, St); + retry_queue(St); {Ref, FirstQuery} -> - retry_first_sync(Id, FirstQuery, Name, Ref, undefined, St) + %% We retry msgs in inflight window sync, as if we send them + %% async, they will be appended to the end of inflight window again. + retry_inflight_sync(Id, Ref, FirstQuery, Name, St) end. -retry_first_from_queue(undefined, _Id, St) -> +retry_queue(#{queue := undefined} = St) -> {next_state, running, St}; -retry_first_from_queue(Q, Id, St) -> - case replayq:peek(Q) of - empty -> +retry_queue(#{queue := Q, id := Id, enable_batch := false, resume_interval := ResumeT} = St) -> + case get_first_n_from_queue(Q, 1) of + [] -> {next_state, running, St}; - ?Q_ITEM(FirstQuery) -> - retry_first_sync(Id, FirstQuery, undefined, undefined, Q, St) + [?QUERY(_, Request, HasSent) = Query] -> + QueryOpts = #{inflight_name => maps:get(name, St)}, + Result = call_query(configured, Id, Query, QueryOpts), + case reply_caller(Id, ?REPLY(undefined, Request, HasSent, Result)) of + true -> + {keep_state, St, {state_timeout, ResumeT, resume}}; + false -> + retry_queue(St#{queue := drop_head(Q, Id)}) + end + end; +retry_queue( + #{ + queue := Q, + id := Id, + enable_batch := true, + batch_size := BatchSize, + resume_interval := ResumeT + } = St +) -> + case get_first_n_from_queue(Q, BatchSize) of + [] -> + {next_state, running, St}; + Batch0 -> + QueryOpts = #{inflight_name => maps:get(name, St)}, + Result = call_query(configured, Id, Batch0, QueryOpts), + %% The caller has been replied with ?RESOURCE_ERROR(blocked, _) before saving into the queue, + %% we now change the 'from' field to 'undefined' so it will not reply the caller again. + Batch = [?QUERY(undefined, Request, HasSent) || ?QUERY(_, Request, HasSent) <- Batch0], + case batch_reply_caller(Id, Result, Batch) of + true -> + {keep_state, St, {state_timeout, ResumeT, resume}}; + false -> + retry_queue(St#{queue := drop_first_n_from_queue(Q, length(Batch), Id)}) + end end. -retry_first_sync( - Id, ?QUERY(_, _, HasSent) = Query, Name, Ref, Q, #{resume_interval := ResumeT} = St0 +retry_inflight_sync( + Id, Ref, ?QUERY(_, _, HasSent) = Query, Name, #{resume_interval := ResumeT} = St0 ) -> Result = call_query(sync, Id, Query, #{}), case handle_query_result(Id, Result, HasSent, false) of @@ -265,25 +291,10 @@ retry_first_sync( {keep_state, St0, {state_timeout, ResumeT, resume}}; %% Send ok or failed but the resource is working false -> - %% We Send 'resume' to the end of the mailbox to give the worker - %% a chance to process 'query' requests. - St = - case Q of - undefined -> - inflight_drop(Name, Ref), - St0; - _ -> - St0#{queue => drop_head(Id, Q)} - end, - {keep_state, St, {state_timeout, 0, resume}} + inflight_drop(Name, Ref), + do_resume(St0) end. -drop_head(Id, Q) -> - {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), - ok = replayq:ack(Q1, AckRef), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1), - Q1. - query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left, id := Id} = St0) -> Acc1 = [?QUERY(From, Request, false) | Acc], emqx_metrics_worker:inc(?RES_METRICS, Id, 'batching'), @@ -299,7 +310,7 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) Result = call_query(configured, Id, ?QUERY(From, Request, false), QueryOpts), case reply_caller(Id, ?REPLY(From, Request, false, Result)) of true -> - Query = ?QUERY(From, Request, 1), + Query = ?QUERY(From, Request, false), {next_state, blocked, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(Query)])}}; false -> {keep_state, St} @@ -330,29 +341,6 @@ flush( {keep_state, St1} end. -maybe_append_queue(Id, undefined, _Items) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), - undefined; -maybe_append_queue(Id, Q, Items) -> - Q2 = - case replayq:overflow(Q) of - Overflow when Overflow =< 0 -> - Q; - Overflow -> - PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, - {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), - ok = replayq:ack(Q1, QAckRef), - Dropped = length(Items2), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), - emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), - ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), - Q1 - end, - emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), - replayq:append(Q2, Items). - batch_reply_caller(Id, BatchResult, Batch) -> lists:foldl( fun(Reply, BlockWorker) -> @@ -550,12 +538,67 @@ drop_inflight_and_resume(Pid, Name, Ref) -> inflight_drop(Name, Ref) end. +%%============================================================================== +%% operations for queue +queue_item_marshaller(?Q_ITEM(_) = I) -> + term_to_binary(I); +queue_item_marshaller(Bin) when is_binary(Bin) -> + binary_to_term(Bin). + +estimate_size(QItem) -> + size(queue_item_marshaller(QItem)). + +maybe_append_queue(Id, undefined, _Items) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_not_enabled'), + undefined; +maybe_append_queue(Id, Q, Items) -> + Q2 = + case replayq:overflow(Q) of + Overflow when Overflow =< 0 -> + Q; + Overflow -> + PopOpts = #{bytes_limit => Overflow, count_limit => 999999999}, + {Q1, QAckRef, Items2} = replayq:pop(Q, PopOpts), + ok = replayq:ack(Q1, QAckRef), + Dropped = length(Items2), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -Dropped), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped'), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'dropped.queue_full'), + ?SLOG(error, #{msg => drop_query, reason => queue_full, dropped => Dropped}), + Q1 + end, + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing'), + replayq:append(Q2, Items). + +get_first_n_from_queue(Q, N) -> + get_first_n_from_queue(Q, N, []). + +get_first_n_from_queue(_Q, 0, Acc) -> + lists:reverse(Acc); +get_first_n_from_queue(Q, N, Acc) when N > 0 -> + case replayq:peek(Q) of + empty -> Acc; + ?Q_ITEM(Query) -> get_first_n_from_queue(Q, N - 1, [Query | Acc]) + end. + +drop_first_n_from_queue(Q, 0, _Id) -> + Q; +drop_first_n_from_queue(Q, N, Id) when N > 0 -> + drop_first_n_from_queue(drop_head(Q, Id), N - 1, Id). + +drop_head(Q, Id) -> + {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), + ok = replayq:ack(Q1, AckRef), + emqx_metrics_worker:inc(?RES_METRICS, Id, 'queuing', -1), + Q1. + %%============================================================================== %% the inflight queue for async query -define(SIZE_REF, -1). inflight_new(Name, InfltWinSZ) -> _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), - inflight_append(Name, ?SIZE_REF, {size, InfltWinSZ}), + inflight_append(Name, ?SIZE_REF, {max_size, InfltWinSZ}), ok. inflight_get_first(Name) -> @@ -575,7 +618,7 @@ inflight_get_first(Name) -> inflight_is_full(undefined) -> false; inflight_is_full(Name) -> - [{_, {size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF), + [{_, {max_size, MaxSize}}] = ets:lookup(Name, ?SIZE_REF), case ets:info(Name, size) of Size when Size > MaxSize -> true; _ -> false