diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 3f2cac46b..fa3f0a0f6 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -23,6 +23,7 @@ -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting | stopped. -type callback_mode() :: always_sync | async_if_possible. +-type query_mode() :: async | sync | dynamic. -type result() :: term(). -type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. -type query_opts() :: #{ @@ -34,6 +35,7 @@ id := resource_id(), mod := module(), callback_mode := callback_mode(), + query_mode := query_mode(), config := resource_config(), state := resource_state(), status := resource_status(), @@ -67,7 +69,7 @@ batch_time => pos_integer(), enable_queue => boolean(), queue_max_bytes => pos_integer(), - query_mode => async | sync | dynamic, + query_mode => query_mode(), resume_interval => pos_integer(), async_inflight_window => pos_integer() }. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 0295292dd..7f1e689ee 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -18,6 +18,7 @@ -include("emqx_resource.hrl"). -include("emqx_resource_utils.hrl"). +-include("emqx_resource_errors.hrl"). %% APIs for resource types @@ -254,7 +255,19 @@ query(ResId, Request) -> -spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) -> Result :: term(). query(ResId, Request, Opts) -> - emqx_resource_worker:query(ResId, Request, Opts). + case emqx_resource_manager:ets_lookup(ResId) of + {ok, _Group, #{query_mode := QM, status := connected}} -> + case QM of + sync -> emqx_resource_worker:sync_query(ResId, Request, Opts); + async -> emqx_resource_worker:async_query(ResId, Request, Opts) + end; + {ok, _Group, #{status := stopped}} -> + ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); + {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> + ?RESOURCE_ERROR(not_connected, "resource not connected"); + {error, not_found} -> + ?RESOURCE_ERROR(not_found, "resource not found") + end. -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). simple_sync_query(ResId, Request) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 2f6964380..82db194dc 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -53,7 +53,9 @@ -export([init/1, callback_mode/0, handle_event/4, terminate/3]). % State record --record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}). +-record(data, { + id, manager_id, group, mod, callback_mode, query_mode, config, opts, status, state, error +}). -type data() :: #data{}. -define(ETS_TABLE, ?MODULE). @@ -264,6 +266,11 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> group = Group, mod = ResourceType, callback_mode = emqx_resource:get_callback_mode(ResourceType), + %% query_mode = dynamic | sync | async + %% TODO: + %% dynamic mode is async mode when things are going well, but becomes sync mode + %% if the resource worker is overloaded + query_mode = maps:get(query_mode, Opts, sync), config = Config, opts = Opts, status = connecting, @@ -585,6 +592,7 @@ data_record_to_external_map_with_metrics(Data) -> id => Data#data.id, mod => Data#data.mod, callback_mode => Data#data.callback_mode, + query_mode => Data#data.query_mode, config => Data#data.config, status => Data#data.status, state => Data#data.state, diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 26b9706c9..e1b51ff12 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -29,7 +29,8 @@ -export([ start_link/3, - query/3, + sync_query/3, + async_query/3, block/1, block/2, resume/1 @@ -72,12 +73,17 @@ callback_mode() -> [state_functions, state_enter]. start_link(Id, Index, Opts) -> gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). --spec query(id(), request(), query_opts()) -> Result :: term(). -query(Id, Request, Opts) -> +-spec sync_query(id(), request(), query_opts()) -> Result :: term(). +sync_query(Id, Request, Opts) -> PickKey = maps:get(pick_key, Opts, self()), Timeout = maps:get(timeout, Opts, infinity), pick_call(Id, PickKey, {query, Request, Opts}, Timeout). +-spec async_query(id(), request(), query_opts()) -> Result :: term(). +async_query(Id, Request, Opts) -> + PickKey = maps:get(pick_key, Opts, self()), + pick_cast(Id, PickKey, {query, Request, Opts}). + %% simple query the resource without batching and queuing messages. -spec simple_sync_query(id(), request()) -> Result :: term(). simple_sync_query(Id, Request) -> @@ -125,11 +131,6 @@ init({Id, Index, Opts}) -> id => Id, index => Index, name => Name, - %% query_mode = dynamic | sync | async - %% TODO: - %% dynamic mode is async mode when things are going well, but becomes sync mode - %% if the resource worker is overloaded - query_mode => maps:get(query_mode, Opts, sync), async_inflight_window => maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT), enable_batch => maps:get(enable_batch, Opts, false), batch_size => BatchSize, @@ -151,9 +152,11 @@ running(cast, block, St) -> running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) -> Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]), {next_state, block, St#{queue := Q1}}; -running({call, From0}, {query, Request, Opts}, #{query_mode := QM} = St) -> - From = maybe_quick_return(QM, From0, maps:get(async_reply_fun, Opts, undefined)), +running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); +running(cast, {query, Request, Opts}, St) -> + ReplayFun = maps:get(async_reply_fun, Opts, undefined), + query_or_acc(ReplayFun, Request, St); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); running(info, {flush, _Ref}, _St) -> @@ -173,11 +176,15 @@ blocked(cast, resume, St) -> do_resume(St); blocked(state_timeout, resume, St) -> do_resume(St); -blocked({call, From0}, {query, Request, Opts}, #{id := Id, queue := Q, query_mode := QM} = St) -> - From = maybe_quick_return(QM, From0, maps:get(async_reply_fun, Opts, undefined)), +blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), _ = reply_caller(Id, ?REPLY(From, Request, Error)), - {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}. + {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}; +blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> + ReplayFun = maps:get(async_reply_fun, Opts, undefined), + Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), + _ = reply_caller(Id, ?REPLY(ReplayFun, Request, Error)), + {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(ReplayFun, Request))])}}. terminate(_Reason, #{id := Id, index := Index}) -> gproc_pool:disconnect_worker(Id, {Id, Index}). @@ -194,24 +201,25 @@ estimate_size(QItem) -> size(queue_item_marshaller(QItem)). %%============================================================================== -maybe_quick_return(sync, From, _ReplyFun) -> - From; -maybe_quick_return(async, From, ReplyFun) -> - gen_statem:reply(From, ok), - ReplyFun. - -pick_call(Id, Key, Query, Timeout) -> - try gproc_pool:pick_worker(Id, Key) of +-define(PICK(ID, KEY, EXPR), + try gproc_pool:pick_worker(ID, KEY) of Pid when is_pid(Pid) -> - gen_statem:call(Pid, Query, {clean_timeout, Timeout}); + EXPR; _ -> - ?RESOURCE_ERROR(not_created, "resource not found") + ?RESOURCE_ERROR(not_created, "resource not created") catch error:badarg -> - ?RESOURCE_ERROR(not_created, "resource not found"); + ?RESOURCE_ERROR(not_created, "resource not created"); exit:{timeout, _} -> ?RESOURCE_ERROR(timeout, "call resource timeout") - end. + end +). + +pick_call(Id, Key, Query, Timeout) -> + ?PICK(Id, Key, gen_statem:call(Pid, Query, {clean_timeout, Timeout})). + +pick_cast(Id, Key, Query) -> + ?PICK(Id, Key, gen_statem:cast(Pid, Query)). do_resume(#{queue := Q, id := Id, name := Name} = St) -> case inflight_get_first(Name) of @@ -264,12 +272,12 @@ query_or_acc(From, Request, #{enable_batch := true, acc := Acc, acc_left := Left true -> flush(St); false -> {keep_state, ensure_flush_timer(St)} end; -query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, query_mode := QM} = St) -> +query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id} = St) -> QueryOpts = #{ inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) }, - case send_query(QM, From, Request, Id, QueryOpts) of + case send_query(From, Request, Id, QueryOpts) of true -> Query = ?QUERY(From, Request), {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}}; @@ -277,8 +285,8 @@ query_or_acc(From, Request, #{enable_batch := false, queue := Q, id := Id, query {keep_state, St} end. -send_query(QM, From, Request, Id, QueryOpts) -> - Result = call_query(QM, Id, ?QUERY(From, Request), QueryOpts), +send_query(From, Request, Id, QueryOpts) -> + Result = call_query(configured, Id, ?QUERY(From, Request), QueryOpts), reply_caller(Id, ?REPLY(From, Request, Result)). flush(#{acc := []} = St) -> @@ -288,15 +296,14 @@ flush( id := Id, acc := Batch, batch_size := Size, - queue := Q0, - query_mode := QM + queue := Q0 } = St ) -> QueryOpts = #{ inflight_name => maps:get(name, St), inflight_window => maps:get(async_inflight_window, St) }, - Result = call_query(QM, Id, Batch, QueryOpts), + Result = call_query(configured, Id, Batch, QueryOpts), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), case batch_reply_caller(Id, Result, Batch) of true -> @@ -362,9 +369,15 @@ handle_query_result(Id, Result, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, success), BlockWorker. -call_query(QM, Id, Query, QueryOpts) -> +call_query(QM0, Id, Query, QueryOpts) -> case emqx_resource_manager:ets_lookup(Id) of - {ok, _Group, #{callback_mode := CM, mod := Mod, state := ResSt, status := connected}} -> + {ok, _Group, #{mod := Mod, state := ResSt, status := connected} = Data} -> + QM = + case QM0 of + configured -> maps:get(query_mode, Data); + _ -> QM0 + end, + CM = maps:get(callback_mode, Data), apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt, QueryOpts); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index ddd671b75..06160f6c7 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -133,7 +133,7 @@ t_create_remove_local(_) -> {error, _} = emqx_resource:remove_local(?ID), ?assertMatch( - ?RESOURCE_ERROR(not_created), + ?RESOURCE_ERROR(not_found), emqx_resource:query(?ID, get_state) ), ?assertNot(is_process_alive(Pid)). @@ -183,7 +183,7 @@ t_query(_) -> {ok, #{pid := _}} = emqx_resource:query(?ID, get_state), ?assertMatch( - ?RESOURCE_ERROR(not_created), + ?RESOURCE_ERROR(not_found), emqx_resource:query(<<"unknown">>, get_state) ), @@ -371,6 +371,7 @@ t_query_counter_async_inflight(_) -> ok = emqx_resource:query(?ID, {inc_counter, 1}, #{ async_reply_fun => {Insert, [Tab0, tmp_query]} }), + timer:sleep(100), ?assertMatch([{_, {error, {resource_error, #{reason := blocked}}}}], ets:take(Tab0, tmp_query)), %% all response should be received after the resource is resumed.