From d8d8d674e4ad7314e87926a9a488b5db82008583 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 27 Jul 2022 00:37:17 +0800 Subject: [PATCH] feat(resource): start emqx_resource_worker in pools --- .../src/emqx_resource_manager.erl | 9 +- apps/emqx_resource/src/emqx_resource_sup.erl | 80 ++++- .../src/emqx_resource_worker.erl | 316 +++++++++++------- 3 files changed, 278 insertions(+), 127 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index f1fa36173..ee18a6836 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -111,12 +111,15 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ok = emqx_metrics_worker:create_metrics( ?RES_METRICS, ResId, - [matched, success, failed, exception, resource_error], + [matched, success, failed, exception, resource_down], [matched] ), case maps:get(start_after_created, Opts, true) of - true -> wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)); - false -> ok + true -> + ok = emqx_resource_sup:start_workers(ResId, Opts), + wait_for_resource_ready(ResId, maps:get(wait_for_resource_ready, Opts, 5000)); + false -> + ok end, ok. diff --git a/apps/emqx_resource/src/emqx_resource_sup.erl b/apps/emqx_resource/src/emqx_resource_sup.erl index 84458f0d5..14db50d01 100644 --- a/apps/emqx_resource/src/emqx_resource_sup.erl +++ b/apps/emqx_resource/src/emqx_resource_sup.erl @@ -19,13 +19,10 @@ -behaviour(supervisor). --export([start_link/0]). +-export([start_link/0, start_workers/2, stop_workers/2]). -export([init/1]). -%% set a very large pool size in case all the workers busy --define(POOL_SIZE, 64). - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -43,3 +40,78 @@ init([]) -> modules => [emqx_resource_manager_sup] }, {ok, {SupFlags, [Metrics, ResourceManager]}}. + +start_workers(ResId, Opts) -> + PoolSize = pool_size(Opts), + _ = ensure_worker_pool(ResId, hash, [{size, PoolSize}]), + lists:foreach( + fun(Idx) -> + _ = ensure_worker_added(ResId, {ResId, Idx}, Idx), + ok = ensure_worker_started(ResId, Idx, Opts) + end, + lists:seq(1, PoolSize) + ). + +stop_workers(ResId, Opts) -> + PoolSize = pool_size(Opts), + lists:foreach( + fun(Idx) -> + ok = ensure_worker_stopped(ResId, Idx), + ok = ensure_worker_removed(ResId, {ResId, Idx}) + end, + lists:seq(1, PoolSize) + ), + _ = gproc_pool:delete(ResId), + ok. + +pool_size(Opts) -> + maps:get(worker_pool_size, Opts, erlang:system_info(schedulers_online)). + +ensure_worker_pool(Pool, Type, Opts) -> + try + gproc_pool:new(Pool, Type, Opts) + catch + error:exists -> ok + end, + ok. + +ensure_worker_added(Pool, Name, Slot) -> + try + gproc_pool:add_worker(Pool, Name, Slot) + catch + error:exists -> ok + end, + ok. + +ensure_worker_removed(Pool, Name) -> + _ = gproc_pool:remove_worker(Pool, Name), + ok. + +-define(CHILD_ID(MOD, RESID, INDEX), {MOD, RESID, INDEX}). +ensure_worker_started(ResId, Idx, Opts) -> + Mod = emqx_resource_worker, + Spec = #{ + id => ?CHILD_ID(Mod, ResId, Idx), + start => {Mod, start_link, [ResId, Idx, Opts]}, + restart => transient, + shutdown => 5000, + type => worker, + modules => [Mod] + }, + case supervisor:start_child(emqx_resource_sup, Spec) of + {ok, _Pid} -> ok; + {error, {already_started, _}} -> ok; + {error, already_present} -> ok; + {error, _} = Err -> Err + end. + +ensure_worker_stopped(ResId, Idx) -> + ChildId = ?CHILD_ID(emqx_resource_worker, ResId, Idx), + case supervisor:terminate_child(emqx_resource_sup, ChildId) of + ok -> + supervisor:delete_child(emqx_resource_sup, ChildId); + {error, not_found} -> + ok; + {error, Reason} -> + {error, Reason} + end. diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 6c3b05830..3c5c7eefe 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -14,7 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% An FIFO queue using ETS-ReplayQ as backend. +%% This module implements async message sending, disk message queuing, +%% and message batching using ReplayQ. -module(emqx_resource_worker). @@ -25,18 +26,23 @@ -behaviour(gen_statem). -export([ - start_link/2, - query/2, - query_async/3, - query_mfa/3 + start_link/3, + query/3, + query_async/4, + block/1, + resume/1 ]). -export([ callback_mode/0, - init/1 + init/1, + terminate/2, + code_change/3 ]). --export([do/3]). +-export([running/3, blocked/3]). + +-define(RESUME_INTERVAL, 15000). %% count -define(DEFAULT_BATCH_SIZE, 100). @@ -47,72 +53,136 @@ -define(REPLY(FROM, REQUEST, RESULT), {FROM, REQUEST, RESULT}). -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). +-define(RESOURCE_ERROR(Reason, Msg), {error, {resource_error, #{reason => Reason, msg => Msg}}}). +-define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). + -type id() :: binary(). -type request() :: term(). -type result() :: term(). --type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()}. +-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. -type from() :: pid() | reply_fun(). -callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) -> {{from(), result()}, NewCbState :: term()}. -callback_mode() -> [state_functions]. +callback_mode() -> [state_functions, state_enter]. -start_link(Id, Opts) -> - gen_statem:start_link({local, name(Id)}, ?MODULE, {Id, Opts}, []). +start_link(Id, Index, Opts) -> + gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). --spec query(id(), request()) -> ok. -query(Id, Request) -> - gen_statem:call(name(Id), {query, Request}). +-spec query(id(), term(), request()) -> ok. +query(Id, Key, Request) -> + gen_statem:call(pick(Id, Key), {query, Request}). --spec query_async(id(), request(), reply_fun()) -> ok. -query_async(Id, Request, ReplyFun) -> - gen_statem:cast(name(Id), {query, Request, ReplyFun}). +-spec query_async(id(), term(), request(), reply_fun()) -> ok. +query_async(Id, Key, Request, ReplyFun) -> + gen_statem:cast(pick(Id, Key), {query, Request, ReplyFun}). --spec name(id()) -> atom(). -name(Id) -> - Mod = atom_to_binary(?MODULE, utf8), - <>. +-spec block(pid() | atom()) -> ok. +block(ServerRef) -> + gen_statem:cast(ServerRef, block). -disk_cache_dir(Id) -> - filename:join([emqx:data_dir(), Id, cache]). +-spec resume(pid() | atom()) -> ok. +resume(ServerRef) -> + gen_statem:cast(ServerRef, resume). -init({Id, Opts}) -> +init({Id, Index, Opts}) -> + true = gproc_pool:connect_worker(Id, {Id, Index}), BatchSize = maps:get(batch_size, Opts, ?DEFAULT_BATCH_SIZE), Queue = - case maps:get(cache_enabled, Opts, true) of - true -> replayq:open(#{dir => disk_cache_dir(Id), seg_bytes => 10000000}); + case maps:get(queue_enabled, Opts, true) of + true -> replayq:open(#{dir => disk_queue_dir(Id), seg_bytes => 10000000}); false -> undefined end, St = #{ id => Id, + index => Index, batch_enabled => maps:get(batch_enabled, Opts, true), batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), - cache_queue => Queue, + queue => Queue, acc => [], acc_left => BatchSize, tref => undefined }, - {ok, do, St}. + {ok, blocked, St, {next_event, cast, resume}}. -do(cast, {query, Request, ReplyFun}, #{batch_enabled := true} = State) -> - do_acc(ReplyFun, Request, State); -do(cast, {query, Request, ReplyFun}, #{batch_enabled := false} = State) -> - do_query(ReplyFun, Request, State); -do({call, From}, {query, Request}, #{batch_enabled := true} = State) -> - do_acc(From, Request, State); -do({call, From}, {query, Request}, #{batch_enabled := false} = State) -> - do_query(From, Request, State); -do(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> - {keep_state, flush(St#{tref := undefined})}; -do(info, {flush, _Ref}, _St) -> +running(enter, _, _St) -> keep_state_and_data; -do(info, Info, _St) -> +running(cast, resume, _St) -> + keep_state_and_data; +running(cast, block, St) -> + {next_state, block, St}; +running(cast, {query, Request, ReplyFun}, St) -> + query_or_acc(ReplyFun, Request, St); +running({call, From}, {query, Request}, St) -> + query_or_acc(From, Request, St); +running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> + {keep_state, flush(St#{tref := undefined})}; +running(info, {flush, _Ref}, _St) -> + keep_state_and_data; +running(info, Info, _St) -> ?SLOG(error, #{msg => unexpected_msg, info => Info}), keep_state_and_data. -do_acc(From, Request, #{acc := Acc, acc_left := Left} = St0) -> +blocked(enter, _, _St) -> + keep_state_and_data; +blocked(cast, block, _St) -> + keep_state_and_data; +blocked(cast, resume, St) -> + do_resume(St); +blocked(state_timeout, resume, St) -> + do_resume(St); +blocked(cast, {query, Request, ReplyFun}, St) -> + handle_blocked(ReplyFun, Request, St); +blocked({call, From}, {query, Request}, St) -> + handle_blocked(From, Request, St). + +terminate(_Reason, #{id := Id, index := Index}) -> + gproc_pool:disconnect_worker(Id, {Id, Index}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%============================================================================== +pick(Id, Key) -> + Pid = gproc_pool:pick_worker(Id, Key), + case is_pid(Pid) of + true -> Pid; + false -> error({failed_to_pick_worker, {Id, Key}}) + end. + +do_resume(#{queue := undefined} = St) -> + {next_state, running, St}; +do_resume(#{queue := Q, id := Id} = St) -> + case replayq:peek(Q) of + empty -> + {next_state, running, St, {state_timeout, ?RESUME_INTERVAL, resume}}; + First -> + Result = call_query(Id, First), + case handle_query_result(Id, false, Result) of + %% Send failed because resource down + true -> + {keep_state, St}; + %% Send ok or failed but the resource is working + false -> + %% We Send 'resume' to the end of the mailbox to give the worker + %% a chance to process 'query' requests. + {keep_state, St#{queue => drop_head(Q)}, {state_timeout, 0, resume}} + end + end. + +drop_head(Q) -> + {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), + ok = replayq:ack(Q1, AckRef), + Q1. + +query_or_acc(From, Request, #{batch_enabled := true} = St) -> + acc_query(From, Request, St); +query_or_acc(From, Request, #{batch_enabled := false} = St) -> + send_query(From, Request, St). + +acc_query(From, Request, #{acc := Acc, acc_left := Left} = St0) -> Acc1 = [?QUERY(From, Request) | Acc], St = St0#{acc := Acc1, acc_left := Left - 1}, case Left =< 1 of @@ -120,10 +190,19 @@ do_acc(From, Request, #{acc := Acc, acc_left := Left} = St0) -> false -> {keep_state, ensure_flush_timer(St)} end. -do_query(From, Request, #{id := Id, cache_queue := Q0} = St0) -> +send_query(From, Request, #{id := Id, queue := Q} = St) -> Result = call_query(Id, Request), - Q1 = reply_caller(Id, Q0, ?REPLY(From, Request, Result)), - {keep_state, St0#{cache_queue := Q1}}. + case reply_caller(Id, Q, ?REPLY(From, Request, Result)) of + true -> + {keep_state, St#{queue := maybe_append_queue(Q, [Request])}}; + false -> + {next_state, blocked, St} + end. + +handle_blocked(From, Request, #{id := Id, queue := Q} = St) -> + Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), + _ = reply_caller(Id, Q, ?REPLY(From, Request, Error)), + {keep_state, St#{queue := maybe_append_queue(Q, [Request])}}. flush(#{acc := []} = St) -> St; @@ -132,101 +211,109 @@ flush( id := Id, acc := Batch, batch_size := Size, - cache_queue := Q0 + queue := Q0 } = St ) -> - BatchResults = call_batch_query(Id, Batch), - Q1 = batch_reply_caller(Id, Q0, BatchResults), - cancel_flush_timer( - St#{ - acc_left := Size, - acc := [], - cache_queue := Q1 - } - ). + BatchResults = maybe_expand_batch_result(call_batch_query(Id, Batch), Batch), + St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), + case batch_reply_caller(Id, BatchResults) of + true -> + Q1 = maybe_append_queue(Q0, [Request || ?QUERY(_, Request) <- Batch]), + {keep_state, St1#{queue := Q1}}; + false -> + {next_state, blocked, St1} + end. -maybe_append_cache(undefined, _Request) -> undefined; -maybe_append_cache(Q, Request) -> replayq:append(Q, Request). +maybe_append_queue(undefined, _Query) -> undefined; +maybe_append_queue(Q, Query) -> replayq:append(Q, Query). -batch_reply_caller(Id, Q, BatchResults) -> +batch_reply_caller(Id, BatchResults) -> lists:foldl( - fun(Reply, Q1) -> - reply_caller(Id, Q1, Reply) + fun(Reply, BlockWorker) -> + reply_caller(Id, BlockWorker, Reply) end, - Q, + false, BatchResults ). -reply_caller(Id, Q, ?REPLY({ReplyFun, Args}, Request, Result)) when is_function(ReplyFun) -> +reply_caller(Id, BlockWorker, ?REPLY(undefined, _, Result)) -> + handle_query_result(Id, BlockWorker, Result); +reply_caller(Id, BlockWorker, ?REPLY({ReplyFun, Args}, _, Result)) -> ?SAFE_CALL(ReplyFun(Result, Args)), - handle_query_result(Id, Q, Request, Result); -reply_caller(Id, Q, ?REPLY(From, Request, Result)) -> + handle_query_result(Id, BlockWorker, Result); +reply_caller(Id, BlockWorker, ?REPLY(From, _, Result)) -> gen_statem:reply(From, Result), - handle_query_result(Id, Q, Request, Result). + handle_query_result(Id, BlockWorker, Result). -handle_query_result(Id, Q, _Request, ok) -> +handle_query_result(Id, BlockWorker, ok) -> emqx_metrics_worker:inc(?RES_METRICS, Id, success), - Q; -handle_query_result(Id, Q, _Request, {ok, _}) -> + BlockWorker; +handle_query_result(Id, BlockWorker, {ok, _}) -> emqx_metrics_worker:inc(?RES_METRICS, Id, success), - Q; -handle_query_result(Id, Q, _Request, {error, _}) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, failed), - Q; -handle_query_result(Id, Q, Request, {error, {resource_error, #{reason := not_connected}}}) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, resource_error), - maybe_append_cache(Q, Request); -handle_query_result(Id, Q, _Request, {error, {resource_error, #{}}}) -> - emqx_metrics_worker:inc(?RES_METRICS, Id, resource_error), - Q; -handle_query_result(Id, Q, Request, {error, {exception, _}}) -> + BlockWorker; +handle_query_result(Id, BlockWorker, ?RESOURCE_ERROR_M(exception, _)) -> emqx_metrics_worker:inc(?RES_METRICS, Id, exception), - maybe_append_cache(Q, Request). + BlockWorker; +handle_query_result(_Id, _, ?RESOURCE_ERROR_M(NotWorking, _)) when + NotWorking == not_connected; NotWorking == blocked +-> + true; +handle_query_result(_Id, BlockWorker, ?RESOURCE_ERROR_M(_, _)) -> + BlockWorker; +handle_query_result(Id, BlockWorker, {error, _}) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, failed), + BlockWorker; +handle_query_result(Id, _BlockWorker, {resource_down, _}) -> + emqx_metrics_worker:inc(?RES_METRICS, Id, resource_down), + true. call_query(Id, Request) -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched), + do_call_query(on_query, Id, Request). + +call_batch_query(Id, Batch) -> + do_call_query(on_batch_query, Id, Batch). + +do_call_query(Fun, Id, Data) -> case emqx_resource_manager:ets_lookup(Id) of {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> - try Mod:on_query(Id, Request, ResourceState) of + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Data)), + try Mod:Fun(Id, Data, ResourceState) of + %% if the callback module (connector) wants to return an error that + %% makes the current resource goes into the `error` state, it should + %% return `{resource_down, Reason}` Result -> Result catch Err:Reason:ST -> - ModB = atom_to_binary(Mod, utf8), - Msg = <<"call failed, func: ", ModB/binary, ":on_query/3">>, - exception_error(Reason, Msg, {Err, Reason, ST}) + Msg = io_lib:format( + "call query failed, func: ~s:~s/3, error: ~0p", + [Mod, Fun, {Err, Reason, ST}] + ), + ?RESOURCE_ERROR(exception, Msg) end; {ok, _Group, #{status := stopped}} -> - resource_error(stopped, <<"resource stopped or disabled">>); + ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); {ok, _Group, _Data} -> - resource_error(not_connected, <<"resource not connected">>); + ?RESOURCE_ERROR(not_connected, "resource not connected"); {error, not_found} -> - resource_error(not_found, <<"resource not found">>) + ?RESOURCE_ERROR(not_found, "resource not found") end. -call_batch_query(Id, Batch) -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, length(Batch)), - case emqx_resource_manager:ets_lookup(Id) of - {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> - try Mod:on_batch_query(Id, Batch, ResourceState) of - BatchResults -> BatchResults - catch - Err:Reason:ST -> - ModB = atom_to_binary(Mod, utf8), - Msg = <<"call failed, func: ", ModB/binary, ":on_batch_query/3">>, - ?EXPAND(exception_error(Reason, Msg, {Err, Reason, ST}), Batch) - end; - {ok, _Group, _Data} -> - ?EXPAND(resource_error(not_connected, <<"resource not connected">>), Batch); - {error, not_found} -> - ?EXPAND(resource_error(not_found, <<"resource not found">>), Batch) - end. +%% the result is already expaned by the `Mod:on_query/3` +maybe_expand_batch_result(Results, _Batch) when is_list(Results) -> + Results; +%% the `Mod:on_query/3` returns a sinle result for a batch, so it is need expand +maybe_expand_batch_result(Result, Batch) -> + ?EXPAND(Result, Batch). -resource_error(Reason, Msg) -> - {error, {resource_error, #{reason => Reason, msg => Msg}}}. -exception_error(Reason, Msg, Details) -> - {error, {exception, #{reason => Reason, msg => Msg, details => Details}}}. +%%============================================================================== + +-spec name(id(), integer()) -> atom(). +name(Id, Index) -> + list_to_atom(lists:concat([?MODULE, ":", Id, ":", Index])). + +disk_queue_dir(Id) -> + filename:join([emqx:data_dir(), Id, queue]). -%% ========================================== ensure_flush_timer(St = #{tref := undefined, batch_time := T}) -> Ref = make_ref(), TRef = erlang:send_after(T, self(), {flush, Ref}), @@ -239,14 +326,3 @@ cancel_flush_timer(St = #{tref := undefined}) -> cancel_flush_timer(St = #{tref := {TRef, _Ref}}) -> _ = erlang:cancel_timer(TRef), St#{tref => undefined}. - -query_mfa(InsertMode, Request, SyncTimeout) -> - {?MODULE, query_fun(InsertMode), query_args(InsertMode, Request, SyncTimeout)}. - -query_fun(<<"sync">>) -> query; -query_fun(<<"async">>) -> query_async. - -query_args(<<"sync">>, Request, SyncTimeout) -> - [Request, SyncTimeout]; -query_args(<<"async">>, Request, _) -> - [Request].