From 6203a01320251d5fd93c938ccfab64cb01988b86 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Thu, 11 Aug 2022 00:29:50 +0800 Subject: [PATCH] feat: add inflight window to emqx_resource --- apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_resource/include/emqx_resource.hrl | 3 +- apps/emqx_resource/src/emqx_resource.erl | 1 + .../src/emqx_resource_worker.erl | 224 +++++++++++++----- .../test/emqx_connector_demo.erl | 56 ++++- .../test/emqx_resource_SUITE.erl | 136 ++++++++++- 6 files changed, 341 insertions(+), 81 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index fe19ed066..3be8d38fd 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "An OTP application"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 75cba14ad..5c561a8d3 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -27,7 +27,8 @@ -type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. -type query_opts() :: #{ %% The key used for picking a resource worker - pick_key => term() + pick_key => term(), + async_reply_fun => reply_fun() }. -type resource_data() :: #{ id := resource_id(), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index f3f2d5fb9..0d2289696 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -76,6 +76,7 @@ stop/1, %% query the instance query/2, + query/3, %% query the instance without batching and queuing messages. simple_sync_query/2, simple_async_query/3 diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index d19353a29..716842bf9 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -50,7 +50,7 @@ -export([queue_item_marshaller/1, estimate_size/1]). --export([reply_after_query/4, batch_reply_after_query/4]). +-export([reply_after_query/6, batch_reply_after_query/6]). -define(RESUME_INTERVAL, 15000). @@ -69,18 +69,18 @@ {error, {resource_error, #{reason => Reason, msg => iolist_to_binary(Msg)}}} ). -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). +-define(DEFAULT_QUEUE_SIZE, 1024 * 1024 * 1024). +-define(DEFAULT_INFLIGHT, 100). -type id() :: binary(). -type query() :: {query, from(), request()}. -type request() :: term(). -type from() :: pid() | reply_fun(). --export_type([query_opts/0]). - -callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) -> {{from(), result()}, NewCbState :: term()}. -callback_mode() -> [state_functions]. +callback_mode() -> [state_functions, state_enter]. start_link(Id, Index, Opts) -> gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). @@ -89,18 +89,18 @@ start_link(Id, Index, Opts) -> query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), - pick_call(Id, PickKey, {query, Request}, Timeout). + pick_call(Id, PickKey, {query, Request, Opts}, Timeout). %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> - Result = call_query(sync, Id, ?QUERY(self(), Request), 1), + Result = call_query(sync, Id, ?QUERY(self(), Request), #{}), _ = handle_query_result(Id, Result, false), Result. -spec simple_async_query(id(), request(), reply_fun()) -> Result :: term(). simple_async_query(Id, Request, ReplyFun) -> - Result = call_query(async, Id, ?QUERY(ReplyFun, Request), 1), + Result = call_query(async, Id, ?QUERY(ReplyFun, Request), #{}), _ = handle_query_result(Id, Result, false), Result. @@ -119,38 +119,44 @@ resume(ServerRef) -> init({Id, Index, Opts}) -> process_flag(trap_exit, true), true = gproc_pool:connect_worker(Id, {Id, Index}), + Name = name(Id, Index), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), Queue = case maps:get(queue_enabled, Opts, false) of true -> replayq:open(#{ dir => disk_queue_dir(Id, Index), - seg_bytes => 10000000, + seg_bytes => maps:get(queue_max_bytes, Opts, ?DEFAULT_QUEUE_SIZE), sizer => fun ?MODULE:estimate_size/1, marshaller => fun ?MODULE:queue_item_marshaller/1 }); false -> undefined end, + ok = inflight_new(Name), St = #{ id => Id, index => Index, + name => Name, %% query_mode = dynamic | sync | async %% TODO: %% dynamic mode is async mode when things are going well, but becomes sync mode %% if the resource worker is overloaded query_mode => maps:get(query_mode, Opts, sync), - async_reply_fun => maps:get(async_reply_fun, Opts, undefined), + async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), batch_enabled => maps:get(batch_enabled, Opts, false), batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), queue => Queue, + resume_interval => maps:get(resume_interval, Opts, ?RESUME_INTERVAL), acc => [], acc_left => BatchSize, tref => undefined }, {ok, blocked, St, {next_event, cast, resume}}. +running(enter, _, _St) -> + keep_state_and_data; running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> @@ -158,8 +164,8 @@ running(cast, block, St) -> running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) -> Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]), {next_state, block, St#{queue := Q1}}; -running({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) -> - From = maybe_quick_return(QM, From0, ReplyFun), +running({call, From0}, {query, Request, Opts}, #{query_mode := QM} = St) -> + From = maybe_quick_return(QM, From0, maps:get(async_reply_fun, Opts, undefined)), query_or_acc(From, Request, St); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); @@ -169,6 +175,8 @@ running(info, Info, _St) -> ?SLOG(error, #{msg => unexpected_msg, info => Info}), keep_state_and_data. +blocked(enter, _, #{resume_interval := ResumeT} = _St) -> + {keep_state_and_data, {state_timeout, ResumeT, resume}}; blocked(cast, block, _St) -> keep_state_and_data; blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) -> @@ -178,9 +186,11 @@ blocked(cast, resume, St) -> do_resume(St); blocked(state_timeout, resume, St) -> do_resume(St); -blocked({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) -> - From = maybe_quick_return(QM, From0, ReplyFun), - handle_blocked(From, Request, St). +blocked({call, From0}, {query, Request, Opts}, #{id := Id, queue := Q, query_mode := QM} = St) -> + From = maybe_quick_return(QM, From0, maps:get(async_reply_fun, Opts, undefined)), + Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), + _ = reply_caller(Id, ?REPLY(From, Request, Error)), + {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}. terminate(_Reason, #{id := Id, index := Index}) -> gproc_pool:disconnect_worker(Id, {Id, Index}). @@ -216,30 +226,44 @@ pick_call(Id, Key, Query, Timeout) -> ?RESOURCE_ERROR(timeout, "call resource timeout") end. -do_resume(#{queue := undefined} = St) -> +do_resume(#{queue := Q, id := Id, name := Name} = St) -> + case inflight_get_first(Name) of + empty -> + retry_first_from_queue(Q, Id, St); + {Ref, FirstQuery} -> + retry_first_sync(Id, FirstQuery, Name, Ref, undefined, St) + end. + +retry_first_from_queue(undefined, _Id, St) -> {next_state, running, St}; -do_resume(#{queue := Q, id := Id} = St) -> +retry_first_from_queue(Q, Id, St) -> case replayq:peek(Q) of empty -> {next_state, running, St}; ?Q_ITEM(FirstQuery) -> - Result = call_query(sync, Id, FirstQuery, 1), - case handle_query_result(Id, Result, false) of - %% Send failed because resource down - true -> - {keep_state, St, {state_timeout, ?RESUME_INTERVAL, resume}}; - %% Send ok or failed but the resource is working - false -> - %% We Send 'resume' to the end of the mailbox to give the worker - %% a chance to process 'query' requests. - {keep_state, St#{queue => drop_head(Q)}, {state_timeout, 0, resume}} - end + retry_first_sync(Id, FirstQuery, undefined, undefined, Q, St) end. -handle_blocked(From, Request, #{id := Id, queue := Q} = St) -> - Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = reply_caller(Id, ?REPLY(From, Request, Error)), - {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}. +retry_first_sync(Id, FirstQuery, Name, Ref, Q, #{resume_interval := ResumeT} = St0) -> + Result = call_query(sync, Id, FirstQuery, #{}), + case handle_query_result(Id, Result, false) of + %% Send failed because resource down + true -> + {keep_state, St0, {state_timeout, ResumeT, resume}}; + %% Send ok or failed but the resource is working + false -> + %% We Send 'resume' to the end of the mailbox to give the worker + %% a chance to process 'query' requests. + St = + case Q of + undefined -> + inflight_drop(Name, Ref), + St0; + _ -> + St0#{queue => drop_head(Q)} + end, + {keep_state, St, {state_timeout, 0, resume}} + end. drop_head(Q) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), @@ -254,7 +278,11 @@ query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Lef false -> {keep_state, ensure_flush_timer(St)} end; query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, query_mode := QM} = St) -> - case send_query(QM, From, Request, Id) of + QueryOpts = #{ + inflight_name => maps:get(name, St), + inflight_window => maps:get(async_inflight_window, St) + }, + case send_query(QM, From, Request, Id, QueryOpts) of true -> Query = ?QUERY(From, Request), {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}}; @@ -262,8 +290,8 @@ query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, quer {keep_state, St} end. -send_query(QM, From, Request, Id) -> - Result = call_query(QM, Id, ?QUERY(From, Request), 1), +send_query(QM, From, Request, Id, QueryOpts) -> + Result = call_query(QM, Id, ?QUERY(From, Request), QueryOpts), reply_caller(Id, ?REPLY(From, Request, Result)). flush(#{acc := []} = St) -> @@ -277,7 +305,11 @@ flush( query_mode := QM } = St ) -> - Result = call_query(QM, Id, Batch, length(Batch)), + QueryOpts = #{ + inflight_name => maps:get(name, St), + inflight_window => maps:get(async_inflight_window, St) + }, + Result = call_query(QM, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of true -> @@ -332,21 +364,21 @@ handle_query_result(Id, {error, _}, BlockWorker) -> handle_query_result(Id, {resource_down, _}, _BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down), true; +handle_query_result(_Id, {async_return, inflight_full}, _BlockWorker) -> + true; handle_query_result(_Id, {async_return, {resource_down, _}}, _BlockWorker) -> true; handle_query_result(_Id, {async_return, ok}, BlockWorker) -> BlockWorker; handle_query_result(Id, Result, BlockWorker) -> - %% assert - true = is_ok_result(Result), + assert_ok_result(Result), emqx_metrics_worker:inc(?RES_METRICS, Id, success), BlockWorker. -call_query(QM, Id, Query, QueryLen) -> +call_query(QM, Id, Query, QueryOpts) -> case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{callback_mode := CM, mod := Mod, state := ResSt, status := connected}} -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, QueryLen), - apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt); + apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> @@ -372,58 +404,119 @@ call_query(QM, Id, Query, QueryLen) -> end ). -apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt) -> +apply_query_fun(sync, Mod, Id, ?QUERY(_, Request) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt}), + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); -apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt) -> +apply_query_fun(async, Mod, Id, ?QUERY(_, Request) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{id => Id, mod => Mod, query => Query, res_st => ResSt}), - ReplyFun = fun ?MODULE:reply_after_query/4, + Name = maps:get(inflight_name, QueryOpts, undefined), + WinSize = maps:get(inflight_window, QueryOpts, undefined), ?APPLY_RESOURCE( - begin - Result = Mod:on_query_async(Id, Request, {ReplyFun, [self(), Id, Query]}, ResSt), - {async_return, Result} + case inflight_is_full(Name, WinSize) of + true -> + ?tp(inflight_full, #{id => Id, wind_size => WinSize}), + {async_return, inflight_full}; + false -> + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + ReplyFun = fun ?MODULE:reply_after_query/6, + Ref = make_message_ref(), + Args = [self(), Id, Name, Ref, Query], + ok = inflight_append(Name, Ref, Query), + Result = Mod:on_query_async(Id, Request, {ReplyFun, Args}, ResSt), + {async_return, Result} end, Request ); -apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) -> +apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, _QueryOpts) -> ?tp(call_batch_query, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), Requests = [Request || ?QUERY(_From, Request) <- Batch], + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)), ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); -apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) -> +apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{id => Id, mod => Mod, batch => Batch, res_st => ResSt}), - Requests = [Request || ?QUERY(_From, Request) <- Batch], - ReplyFun = fun ?MODULE:batch_reply_after_query/4, + Name = maps:get(inflight_name, QueryOpts, undefined), + WinSize = maps:get(inflight_window, QueryOpts, undefined), ?APPLY_RESOURCE( - begin - Result = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [self(), Id, Batch]}, ResSt), - {async_return, Result} + case inflight_is_full(Name, WinSize) of + true -> + ?tp(inflight_full, #{id => Id, wind_size => WinSize}), + {async_return, inflight_full}; + false -> + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)), + ReplyFun = fun ?MODULE:batch_reply_after_query/6, + Ref = make_message_ref(), + Args = {ReplyFun, [self(), Id, Name, Ref, Batch]}, + Requests = [Request || ?QUERY(_From, Request) <- Batch], + ok = inflight_append(Name, Ref, Batch), + Result = Mod:on_batch_query_async(Id, Requests, Args, ResSt), + {async_return, Result} end, Batch ). -reply_after_query(Pid, Id, ?QUERY(From, Request) = Query, Result) -> +reply_after_query(Pid, Id, Name, Ref, ?QUERY(From, Request), Result) -> case reply_caller(Id, ?REPLY(From, Request, Result)) of - true -> ?MODULE:block(Pid, [Query]); - false -> ok + true -> ?MODULE:block(Pid); + false -> inflight_drop(Name, Ref) end. -batch_reply_after_query(Pid, Id, Batch, Result) -> +batch_reply_after_query(Pid, Id, Name, Ref, Batch, Result) -> case batch_reply_caller(Id, Result, Batch) of - true -> ?MODULE:block(Pid, Batch); - false -> ok + true -> ?MODULE:block(Pid); + false -> inflight_drop(Name, Ref) end. +%%============================================================================== +%% the inflight queue for async query + +inflight_new(Name) -> + _ = ets:new(Name, [named_table, ordered_set, public, {write_concurrency, true}]), + ok. + +inflight_get_first(Name) -> + case ets:first(Name) of + '$end_of_table' -> + empty; + Ref -> + case ets:lookup(Name, Ref) of + [Object] -> Object; + [] -> inflight_get_first(Name) + end + end. + +inflight_is_full(undefined, _) -> + false; +inflight_is_full(Name, MaxSize) -> + case ets:info(Name, size) of + Size when Size >= MaxSize -> true; + _ -> false + end. + +inflight_append(undefined, _Ref, _Query) -> + ok; +inflight_append(Name, Ref, Query) -> + ets:insert(Name, {Ref, Query}), + ok. + +inflight_drop(undefined, _) -> + ok; +inflight_drop(Name, Ref) -> + ets:delete(Name, Ref), + ok. %%============================================================================== call_mode(sync, _) -> sync; call_mode(async, always_sync) -> sync; call_mode(async, async_if_possible) -> async. -is_ok_result(ok) -> +assert_ok_result(ok) -> true; -is_ok_result(R) when is_tuple(R) -> - erlang:element(1, R) == ok; -is_ok_result(_) -> - false. +assert_ok_result({async_return, R}) -> + assert_ok_result(R); +assert_ok_result(R) when is_tuple(R) -> + ok = erlang:element(1, R); +assert_ok_result(R) -> + error({not_ok_result, R}). -spec name(id(), integer()) -> atom(). name(Id, Index) -> @@ -447,3 +540,6 @@ cancel_flush_timer(St = #{tref := undefined}) -> cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> _ = erlang:cancel_timer(TRef), St#{tref => undefined}. + +make_message_ref() -> + erlang:unique_integer([monotonic, positive]). diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index 3bea71993..6e7bca18a 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -31,7 +31,7 @@ on_get_status/2 ]). --export([counter_loop/1, set_callback_mode/1]). +-export([counter_loop/0, set_callback_mode/1]). %% callbacks for emqx_resource config schema -export([roots/0]). @@ -84,9 +84,22 @@ on_query(_InstId, get_state, State) -> {ok, State}; on_query(_InstId, get_state_failed, State) -> {error, State}; -on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> - Pid ! {inc, N}, +on_query(_InstId, block, #{pid := Pid}) -> + Pid ! block, ok; +on_query(_InstId, resume, #{pid := Pid}) -> + Pid ! resume, + ok; +on_query(_InstId, {inc_counter, N}, #{pid := Pid}) -> + ReqRef = make_ref(), + From = {self(), ReqRef}, + Pid ! {From, {inc, N}}, + receive + {ReqRef, ok} -> ok; + {ReqRef, incorrect_status} -> {resource_down, incorrect_status} + after 1000 -> + {error, timeout} + end; on_query(_InstId, get_counter, #{pid := Pid}) -> ReqRef = make_ref(), From = {self(), ReqRef}, @@ -97,9 +110,12 @@ on_query(_InstId, get_counter, #{pid := Pid}) -> {error, timeout} end. -on_query_async(_InstId, Query, ReplyFun, State) -> - Result = on_query(_InstId, Query, State), - apply_reply(ReplyFun, Result). +on_query_async(_InstId, {inc_counter, N}, ReplyFun, #{pid := Pid}) -> + Pid ! {inc, N, ReplyFun}, + ok; +on_query_async(_InstId, get_counter, ReplyFun, #{pid := Pid}) -> + Pid ! {get, ReplyFun}, + ok. on_batch_query(InstId, BatchReq, State) -> %% Requests can be either 'get_counter' or 'inc_counter', but cannot be mixed. @@ -136,15 +152,35 @@ on_get_status(_InstId, #{pid := Pid}) -> end. spawn_counter_process(Name, Register) -> - Pid = spawn_link(?MODULE, counter_loop, [#{counter => 0}]), + Pid = spawn_link(?MODULE, counter_loop, []), true = maybe_register(Name, Pid, Register), Pid. -counter_loop(#{counter := Num} = State) -> +counter_loop() -> + counter_loop(#{counter => 0, status => running}). + +counter_loop(#{counter := Num, status := Status} = State) -> NewState = receive - {inc, N} -> - #{counter => Num + N}; + block -> + ct:pal("counter recv: ~p", [block]), + State#{status => blocked}; + resume -> + {messages, Msgs} = erlang:process_info(self(), messages), + ct:pal("counter recv: ~p, buffered msgs: ~p", [resume, length(Msgs)]), + State#{status => running}; + {inc, N, ReplyFun} when Status == running -> + apply_reply(ReplyFun, ok), + State#{counter => Num + N}; + {{FromPid, ReqRef}, {inc, N}} when Status == running -> + FromPid ! {ReqRef, ok}, + State#{counter => Num + N}; + {{FromPid, ReqRef}, {inc, _N}} when Status == blocked -> + FromPid ! {ReqRef, incorrect_status}, + State; + {get, ReplyFun} -> + apply_reply(ReplyFun, Num), + State; {{FromPid, ReqRef}, get} -> FromPid ! {ReqRef, Num}, State diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 5177e792c..5c62e22c2 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -236,7 +236,7 @@ t_batch_query_counter(_) -> ok = emqx_resource:remove_local(?ID). -t_query_counter_async(_) -> +t_query_counter_async_query(_) -> {ok, _} = emqx_resource:create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, @@ -271,24 +271,25 @@ t_query_counter_async(_) -> ?assertMatch(#{matched := 1002, success := 1002, failed := 0}, C), ok = emqx_resource:remove_local(?ID). -t_query_counter_async_2(_) -> +t_query_counter_async_callback(_) -> emqx_connector_demo:set_callback_mode(async_if_possible), Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), Insert = fun(Tab, Result) -> ets:insert(Tab, {make_ref(), Result}) end, + ReqOpts = #{async_reply_fun => {Insert, [Tab0]}}, {ok, _} = emqx_resource:create_local( ?ID, ?DEFAULT_RESOURCE_GROUP, ?TEST_RESOURCE, #{name => test_resource, register => true}, - #{query_mode => async, async_reply_fun => {Insert, [Tab0]}} + #{query_mode => async, async_inflight_window => 1000000} ), ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), ?check_trace( ?TRACE_OPTS, - inc_counter_in_parallel(1000), + inc_counter_in_parallel(1000, ReqOpts), fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) @@ -321,6 +322,117 @@ t_query_counter_async_2(_) -> ), ok = emqx_resource:remove_local(?ID). +t_query_counter_async_inflight(_) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + + Tab0 = ets:new(?FUNCTION_NAME, [bag, public]), + Insert0 = fun(Tab, Result) -> + ets:insert(Tab, {make_ref(), Result}) + end, + ReqOpts = #{async_reply_fun => {Insert0, [Tab0]}}, + WindowSize = 15, + {ok, _} = emqx_resource:create_local( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource, register => true}, + #{ + query_mode => async, + async_inflight_window => WindowSize, + worker_pool_size => 1, + resume_interval => 300 + } + ), + ?assertMatch({ok, 0}, emqx_resource:simple_sync_query(?ID, get_counter)), + + %% block the resource + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), + + %% send async query to make the inflight window full + ?check_trace( + ?TRACE_OPTS, + inc_counter_in_parallel(WindowSize, ReqOpts), + fun(Trace) -> + QueryTrace = ?of_kind(call_query_async, Trace), + ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) + end + ), + + %% this will block the resource_worker + ok = emqx_resource:query(?ID, {inc_counter, 1}), + ?assertMatch(0, ets:info(Tab0, size)), + %% sleep to make the resource_worker resume some times + timer:sleep(2000), + + %% send query now will fail because the resource is blocked. + Insert = fun(Tab, Ref, Result) -> + ets:insert(Tab, {Ref, Result}) + end, + ok = emqx_resource:query(?ID, {inc_counter, 1}, #{ + async_reply_fun => {Insert, [Tab0, tmp_query]} + }), + ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)), + + %% all response should be received after the resource is resumed. + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), + timer:sleep(1000), + ?assertEqual(WindowSize, ets:info(Tab0, size)), + + %% send async query, this time everything should be ok. + Num = 10, + ?check_trace( + ?TRACE_OPTS, + inc_counter_in_parallel(Num, ReqOpts), + fun(Trace) -> + QueryTrace = ?of_kind(call_query_async, Trace), + ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) + end + ), + timer:sleep(1000), + ?assertEqual(WindowSize + Num, ets:info(Tab0, size)), + + %% block the resource + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, block)), + %% again, send async query to make the inflight window full + ?check_trace( + ?TRACE_OPTS, + inc_counter_in_parallel(WindowSize, ReqOpts), + fun(Trace) -> + QueryTrace = ?of_kind(call_query_async, Trace), + ?assertMatch([#{query := {query, _, {inc_counter, 1}}} | _], QueryTrace) + end + ), + + %% this will block the resource_worker + ok = emqx_resource:query(?ID, {inc_counter, 1}), + + Sent = WindowSize + Num + WindowSize, + ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), + timer:sleep(1000), + ?assertEqual(Sent, ets:info(Tab0, size)), + + {ok, Counter} = emqx_resource:simple_sync_query(?ID, get_counter), + ct:pal("get_counter: ~p, sent: ~p", [Counter, Sent]), + ?assert(Sent == Counter), + + {ok, _, #{metrics := #{counters := C}}} = emqx_resource:get_instance(?ID), + ct:pal("metrics: ~p", [C]), + ?assertMatch( + #{matched := M, success := S, exception := E, failed := F, resource_down := RD} when + M >= Sent andalso M == S + E + F + RD, + C + ), + ?assert( + lists:all( + fun + ({_, ok}) -> true; + (_) -> false + end, + ets:tab2list(Tab0) + ) + ), + ok = emqx_resource:remove_local(?ID). + t_healthy_timeout(_) -> {ok, _} = emqx_resource:create_local( ?ID, @@ -550,10 +662,13 @@ t_auto_retry(_) -> %% Helpers %%------------------------------------------------------------------------------ inc_counter_in_parallel(N) -> + inc_counter_in_parallel(N, #{}). + +inc_counter_in_parallel(N, Opts) -> Parent = self(), Pids = [ erlang:spawn(fun() -> - emqx_resource:query(?ID, {inc_counter, 1}), + emqx_resource:query(?ID, {inc_counter, 1}, Opts), Parent ! {complete, self()} end) || _ <- lists:seq(1, N) @@ -567,6 +682,17 @@ inc_counter_in_parallel(N) -> || Pid <- Pids ]. +% verify_inflight_full(WindowSize) -> +% ?check_trace( +% ?TRACE_OPTS, +% emqx_resource:query(?ID, {inc_counter, 1}), +% fun(Return, Trace) -> +% QueryTrace = ?of_kind(inflight_full, Trace), +% ?assertMatch([#{wind_size := WindowSize} | _], QueryTrace), +% ?assertMatch(ok, Return) +% end +% ). + bin_config() -> <<"\"name\": \"test_resource\"">>.