feat: support aysnc callback to connector modules

This commit is contained in:
Shawn 2022-08-08 17:52:44 +08:00
parent f1419d52f1
commit 35fe70b887
14 changed files with 173 additions and 89 deletions

View File

@ -22,6 +22,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
@ -31,6 +32,8 @@
-define(DEFAULT_POOL_SIZE, 8). -define(DEFAULT_POOL_SIZE, 8).
callback_mode() -> always_sync.
on_start(InstId, Opts) -> on_start(InstId, Opts) ->
PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolName = emqx_plugin_libs_pool:pool_name(InstId),
PoolOpts = [ PoolOpts = [

View File

@ -23,6 +23,8 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
suite() -> [{timetrap, {seconds, 60}}].
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).

View File

@ -26,6 +26,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
@ -164,6 +165,8 @@ ref(Field) -> hoconsc:ref(?MODULE, Field).
%% =================================================================== %% ===================================================================
callback_mode() -> always_sync.
on_start( on_start(
InstId, InstId,
#{ #{

View File

@ -25,6 +25,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
@ -42,6 +43,8 @@ roots() ->
fields(_) -> []. fields(_) -> [].
%% =================================================================== %% ===================================================================
callback_mode() -> always_sync.
on_start( on_start(
InstId, InstId,
#{ #{

View File

@ -25,6 +25,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
@ -139,6 +140,8 @@ mongo_fields() ->
%% =================================================================== %% ===================================================================
callback_mode() -> always_sync.
on_start( on_start(
InstId, InstId,
Config = #{ Config = #{

View File

@ -24,6 +24,7 @@
%% API and callbacks for supervisor %% API and callbacks for supervisor
-export([ -export([
callback_mode/0,
start_link/0, start_link/0,
init/1, init/1,
create_bridge/1, create_bridge/1,
@ -139,6 +140,8 @@ on_message_received(Msg, HookPoint, ResId) ->
emqx:run_hook(HookPoint, [Msg]). emqx:run_hook(HookPoint, [Msg]).
%% =================================================================== %% ===================================================================
callback_mode() -> always_sync.
on_start(InstId, Conf) -> on_start(InstId, Conf) ->
InstanceId = binary_to_atom(InstId, utf8), InstanceId = binary_to_atom(InstId, utf8),
?SLOG(info, #{ ?SLOG(info, #{

View File

@ -24,6 +24,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
@ -73,6 +74,8 @@ server(desc) -> ?DESC("server");
server(_) -> undefined. server(_) -> undefined.
%% =================================================================== %% ===================================================================
callback_mode() -> always_sync.
-spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}.
on_start( on_start(
InstId, InstId,

View File

@ -27,6 +27,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
@ -66,6 +67,8 @@ server(desc) -> ?DESC("server");
server(_) -> undefined. server(_) -> undefined.
%% =================================================================== %% ===================================================================
callback_mode() -> always_sync.
on_start( on_start(
InstId, InstId,
#{ #{

View File

@ -26,6 +26,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
@ -112,6 +113,8 @@ servers(desc) -> ?DESC("servers");
servers(_) -> undefined. servers(_) -> undefined.
%% =================================================================== %% ===================================================================
callback_mode() -> always_sync.
on_start( on_start(
InstId, InstId,
#{ #{

View File

@ -25,6 +25,7 @@
-type resource_data() :: #{ -type resource_data() :: #{
id := resource_id(), id := resource_id(),
mod := module(), mod := module(),
callback_mode := always_sync | async_if_possible,
config := resource_config(), config := resource_config(),
state := resource_state(), state := resource_state(),
status := resource_status(), status := resource_status(),

View File

@ -75,8 +75,7 @@
%% stop the instance %% stop the instance
stop/1, stop/1,
%% query the instance %% query the instance
query/2, query/2
query_async/3
]). ]).
%% Direct calls to the callback module %% Direct calls to the callback module
@ -224,12 +223,12 @@ reset_metrics(ResId) ->
%% ================================================================================= %% =================================================================================
-spec query(resource_id(), Request :: term()) -> Result :: term(). -spec query(resource_id(), Request :: term()) -> Result :: term().
query(ResId, Request) -> query(ResId, Request) ->
emqx_resource_worker:query(ResId, Request). query(ResId, Request, #{}).
-spec query_async(resource_id(), Request :: term(), emqx_resource_worker:reply_fun()) -> -spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) ->
Result :: term(). Result :: term().
query_async(ResId, Request, ReplyFun) -> query(ResId, Request, Opts) ->
emqx_resource_worker:query_async(ResId, Request, ReplyFun). emqx_resource_worker:query(ResId, Request, Opts).
-spec start(resource_id()) -> ok | {error, Reason :: term()}. -spec start(resource_id()) -> ok | {error, Reason :: term()}.
start(ResId) -> start(ResId) ->

View File

@ -53,7 +53,7 @@
-export([init/1, callback_mode/0, handle_event/4, terminate/3]). -export([init/1, callback_mode/0, handle_event/4, terminate/3]).
% State record % State record
-record(data, {id, manager_id, group, mod, config, opts, status, state, error}). -record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}).
-define(SHORT_HEALTHCHECK_INTERVAL, 1000). -define(SHORT_HEALTHCHECK_INTERVAL, 1000).
-define(HEALTHCHECK_INTERVAL, 15000). -define(HEALTHCHECK_INTERVAL, 15000).
@ -259,6 +259,7 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) ->
manager_id = MgrId, manager_id = MgrId,
group = Group, group = Group,
mod = ResourceType, mod = ResourceType,
callback_mode = ResourceType:callback_mode(),
config = Config, config = Config,
opts = Opts, opts = Opts,
status = connecting, status = connecting,
@ -559,10 +560,12 @@ maybe_reply(Actions, undefined, _Reply) ->
maybe_reply(Actions, From, Reply) -> maybe_reply(Actions, From, Reply) ->
[{reply, From, Reply} | Actions]. [{reply, From, Reply} | Actions].
-spec data_record_to_external_map_with_metrics(#data{}) -> resource_data().
data_record_to_external_map_with_metrics(Data) -> data_record_to_external_map_with_metrics(Data) ->
#{ #{
id => Data#data.id, id => Data#data.id,
mod => Data#data.mod, mod => Data#data.mod,
callback_mode => Data#data.callback_mode,
config => Data#data.config, config => Data#data.config,
status => Data#data.status, status => Data#data.status,
state => Data#data.state, state => Data#data.state,

View File

@ -27,11 +27,9 @@
-export([ -export([
start_link/3, start_link/3,
query/2,
query/3, query/3,
query_async/3,
query_async/4,
block/1, block/1,
block/2,
resume/1 resume/1
]). ]).
@ -46,6 +44,8 @@
-export([queue_item_marshaller/1, estimate_size/1]). -export([queue_item_marshaller/1, estimate_size/1]).
-export([reply_after_query/4, batch_reply_after_query/4]).
-define(RESUME_INTERVAL, 15000). -define(RESUME_INTERVAL, 15000).
%% count %% count
@ -55,8 +55,8 @@
-define(Q_ITEM(REQUEST), {q_item, REQUEST}). -define(Q_ITEM(REQUEST), {q_item, REQUEST}).
-define(QUERY(FROM, REQUEST), {FROM, REQUEST}). -define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}).
-define(REPLY(FROM, REQUEST, RESULT), {FROM, REQUEST, RESULT}). -define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}).
-define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]).
-define(RESOURCE_ERROR(Reason, Msg), -define(RESOURCE_ERROR(Reason, Msg),
@ -65,10 +65,17 @@
-define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}).
-type id() :: binary(). -type id() :: binary().
-type query() :: {query, from(), request()}.
-type request() :: term(). -type request() :: term().
-type result() :: term(). -type result() :: term().
-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. -type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
-type from() :: pid() | reply_fun(). -type from() :: pid() | reply_fun().
-type query_opts() :: #{
%% The key used for picking a resource worker
pick_key => term()
}.
-export_type([query_opts/0]).
-callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) -> -callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) ->
{{from(), result()}, NewCbState :: term()}. {{from(), result()}, NewCbState :: term()}.
@ -78,26 +85,20 @@ callback_mode() -> [state_functions].
start_link(Id, Index, Opts) -> start_link(Id, Index, Opts) ->
gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []).
-spec query(id(), request()) -> Result :: term(). -spec query(id(), request(), query_opts()) -> Result :: term().
query(Id, Request) -> query(Id, Request, Opts) ->
query(Id, self(), Request). PickKey = maps:get(pick_key, Opts, self()),
Timeout = maps:get(timeout, Opts, infinity),
-spec query(id(), term(), request()) -> Result :: term(). pick_call(Id, PickKey, {query, Request}, Timeout).
query(Id, PickKey, Request) ->
pick_query(call, Id, PickKey, {query, Request}).
-spec query_async(id(), request(), reply_fun()) -> Result :: term().
query_async(Id, Request, ReplyFun) ->
query_async(Id, self(), Request, ReplyFun).
-spec query_async(id(), term(), request(), reply_fun()) -> Result :: term().
query_async(Id, PickKey, Request, ReplyFun) ->
pick_query(cast, Id, PickKey, {query, Request, ReplyFun}).
-spec block(pid() | atom()) -> ok. -spec block(pid() | atom()) -> ok.
block(ServerRef) -> block(ServerRef) ->
gen_statem:cast(ServerRef, block). gen_statem:cast(ServerRef, block).
-spec block(pid() | atom(), [query()]) -> ok.
block(ServerRef, Query) ->
gen_statem:cast(ServerRef, {block, Query}).
-spec resume(pid() | atom()) -> ok. -spec resume(pid() | atom()) -> ok.
resume(ServerRef) -> resume(ServerRef) ->
gen_statem:cast(ServerRef, resume). gen_statem:cast(ServerRef, resume).
@ -121,6 +122,12 @@ init({Id, Index, Opts}) ->
St = #{ St = #{
id => Id, id => Id,
index => Index, index => Index,
%% query_mode = dynamic | sync | async
%% TODO:
%% dynamic mode is async mode when things are going well, but becomes sync mode
%% if the resource worker is overloaded
query_mode => maps:get(query_mode, Opts, sync),
async_reply_fun => maps:get(async_reply_fun, Opts, undefined),
batch_enabled => maps:get(batch_enabled, Opts, false), batch_enabled => maps:get(batch_enabled, Opts, false),
batch_size => BatchSize, batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
@ -135,9 +142,11 @@ running(cast, resume, _St) ->
keep_state_and_data; keep_state_and_data;
running(cast, block, St) -> running(cast, block, St) ->
{next_state, block, St}; {next_state, block, St};
running(cast, {query, Request, ReplyFun}, St) -> running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
query_or_acc(ReplyFun, Request, St); Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
running({call, From}, {query, Request}, St) -> {next_state, block, St#{queue := Q1}};
running({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) ->
From = maybe_quick_return(QM, From0, ReplyFun),
query_or_acc(From, Request, St); query_or_acc(From, Request, St);
running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
flush(St#{tref := undefined}); flush(St#{tref := undefined});
@ -149,13 +158,15 @@ running(info, Info, _St) ->
blocked(cast, block, _St) -> blocked(cast, block, _St) ->
keep_state_and_data; keep_state_and_data;
blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) ->
Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]),
{keep_state, St#{queue := Q1}};
blocked(cast, resume, St) -> blocked(cast, resume, St) ->
do_resume(St); do_resume(St);
blocked(state_timeout, resume, St) -> blocked(state_timeout, resume, St) ->
do_resume(St); do_resume(St);
blocked(cast, {query, Request, ReplyFun}, St) -> blocked({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) ->
handle_blocked(ReplyFun, Request, St); From = maybe_quick_return(QM, From0, ReplyFun),
blocked({call, From}, {query, Request}, St) ->
handle_blocked(From, Request, St). handle_blocked(From, Request, St).
terminate(_Reason, #{id := Id, index := Index}) -> terminate(_Reason, #{id := Id, index := Index}) ->
@ -173,15 +184,23 @@ estimate_size(QItem) ->
size(queue_item_marshaller(QItem)). size(queue_item_marshaller(QItem)).
%%============================================================================== %%==============================================================================
pick_query(Fun, Id, Key, Query) -> maybe_quick_return(sync, From, _ReplyFun) ->
From;
maybe_quick_return(async, From, ReplyFun) ->
ok = gen_statem:reply(From),
ReplyFun.
pick_call(Id, Key, Query, Timeout) ->
try gproc_pool:pick_worker(Id, Key) of try gproc_pool:pick_worker(Id, Key) of
Pid when is_pid(Pid) -> Pid when is_pid(Pid) ->
gen_statem:Fun(Pid, Query); gen_statem:call(Pid, Query, {clean_timeout, Timeout});
_ -> _ ->
?RESOURCE_ERROR(not_created, "resource not found") ?RESOURCE_ERROR(not_created, "resource not found")
catch catch
error:badarg -> error:badarg ->
?RESOURCE_ERROR(not_created, "resource not found") ?RESOURCE_ERROR(not_created, "resource not found");
exit:{timeout, _} ->
?RESOURCE_ERROR(timeout, "call resource timeout")
end. end.
do_resume(#{queue := undefined} = St) -> do_resume(#{queue := undefined} = St) ->
@ -190,8 +209,8 @@ do_resume(#{queue := Q, id := Id} = St) ->
case replayq:peek(Q) of case replayq:peek(Q) of
empty -> empty ->
{next_state, running, St}; {next_state, running, St};
?Q_ITEM(First) -> ?Q_ITEM(FirstQuery) ->
Result = call_query(Id, First), Result = call_query(sync, Id, FirstQuery, 1),
case handle_query_result(Id, Result, false) of case handle_query_result(Id, Result, false) of
%% Send failed because resource down %% Send failed because resource down
true -> true ->
@ -206,8 +225,8 @@ do_resume(#{queue := Q, id := Id} = St) ->
handle_blocked(From, Request, #{id := Id, queue := Q} = St) -> handle_blocked(From, Request, #{id := Id, queue := Q} = St) ->
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
_ = reply_caller(Id, ?REPLY(From, Request, Error), false), _ = reply_caller(Id, ?REPLY(From, Request, Error)),
{keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}}. {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}.
drop_head(Q) -> drop_head(Q) ->
{Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}),
@ -221,17 +240,18 @@ query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Lef
true -> flush(St); true -> flush(St);
false -> {keep_state, ensure_flush_timer(St)} false -> {keep_state, ensure_flush_timer(St)}
end; end;
query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id} = St) -> query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, query_mode := QM} = St) ->
case send_query(From, Request, Id) of case send_query(QM, From, Request, Id) of
true -> true ->
{next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}}; Query = ?QUERY(From, Request),
{next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}};
false -> false ->
{keep_state, St} {keep_state, St}
end. end.
send_query(From, Request, Id) -> send_query(QM, From, Request, Id) ->
Result = call_query(Id, Request), Result = call_query(QM, Id, ?QUERY(From, Request), 1),
reply_caller(Id, ?REPLY(From, Request, Result), false). reply_caller(Id, ?REPLY(From, Request, Result)).
flush(#{acc := []} = St) -> flush(#{acc := []} = St) ->
{keep_state, St}; {keep_state, St};
@ -240,31 +260,37 @@ flush(
id := Id, id := Id,
acc := Batch, acc := Batch,
batch_size := Size, batch_size := Size,
queue := Q0 queue := Q0,
query_mode := QM
} = St } = St
) -> ) ->
BatchResults = maybe_expand_batch_result(call_batch_query(Id, Batch), Batch), Result = call_query(QM, Id, Batch, length(Batch)),
St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}),
case batch_reply_caller(Id, BatchResults) of case batch_reply_caller(Id, Result, Batch) of
true -> true ->
Q1 = maybe_append_queue(Q0, [?Q_ITEM(Request) || ?QUERY(_, Request) <- Batch]), Q1 = maybe_append_queue(Q0, [?Q_ITEM(Query) || Query <- Batch]),
{next_state, blocked, St1#{queue := Q1}}; {next_state, blocked, St1#{queue := Q1}};
false -> false ->
{keep_state, St1} {keep_state, St1}
end. end.
maybe_append_queue(undefined, _Request) -> undefined; maybe_append_queue(undefined, _Items) -> undefined;
maybe_append_queue(Q, Request) -> replayq:append(Q, Request). maybe_append_queue(Q, Items) -> replayq:append(Q, Items).
batch_reply_caller(Id, BatchResults) -> batch_reply_caller(Id, BatchResult, Batch) ->
lists:foldl( lists:foldl(
fun(Reply, BlockWorker) -> fun(Reply, BlockWorker) ->
reply_caller(Id, Reply, BlockWorker) reply_caller(Id, Reply, BlockWorker)
end, end,
false, false,
BatchResults %% the `Mod:on_batch_query/3` returns a single result for a batch,
%% so we need to expand
?EXPAND(BatchResult, Batch)
). ).
reply_caller(Id, Reply) ->
reply_caller(Id, Reply, false).
reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) -> reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) ->
handle_query_result(Id, Result, BlockWorker); handle_query_result(Id, Result, BlockWorker);
reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) -> reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) ->
@ -295,45 +321,77 @@ handle_query_result(Id, Result, BlockWorker) ->
emqx_metrics_worker:inc(?RES_METRICS, Id, success), emqx_metrics_worker:inc(?RES_METRICS, Id, success),
BlockWorker. BlockWorker.
call_query(Id, Request) -> call_query(QM, Id, Query, QueryLen) ->
do_call_query(on_query, Id, Request, 1).
call_batch_query(Id, Batch) ->
do_call_query(on_batch_query, Id, Batch, length(Batch)).
do_call_query(Fun, Id, Data, Count) ->
case emqx_resource_manager:ets_lookup(Id) of case emqx_resource_manager:ets_lookup(Id) of
{ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> {ok, _Group, #{callback_mode := CM, mod := Mod, state := ResSt, status := connected}} ->
ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, Count), ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, QueryLen),
try Mod:Fun(Id, Data, ResourceState) of apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt);
%% if the callback module (connector) wants to return an error that
%% makes the current resource goes into the `error` state, it should
%% return `{resource_down, Reason}`
Result -> Result
catch
Err:Reason:ST ->
Msg = io_lib:format(
"call query failed, func: ~s:~s/3, error: ~0p",
[Mod, Fun, {Err, Reason, ST}]
),
?RESOURCE_ERROR(exception, Msg)
end;
{ok, _Group, #{status := stopped}} -> {ok, _Group, #{status := stopped}} ->
?RESOURCE_ERROR(stopped, "resource stopped or disabled"); ?RESOURCE_ERROR(stopped, "resource stopped or disabled");
{ok, _Group, _Data} -> {ok, _Group, #{status := S}} when S == connecting; S == disconnected ->
?RESOURCE_ERROR(not_connected, "resource not connected"); ?RESOURCE_ERROR(not_connected, "resource not connected");
{error, not_found} -> {error, not_found} ->
?RESOURCE_ERROR(not_found, "resource not found") ?RESOURCE_ERROR(not_found, "resource not found")
end. end.
%% the result is already expaned by the `Mod:on_query/3` -define(APPLY_RESOURCE(EXPR, REQ),
maybe_expand_batch_result(Results, _Batch) when is_list(Results) -> try
Results; %% if the callback module (connector) wants to return an error that
%% the `Mod:on_query/3` returns a sinle result for a batch, so it is need expand %% makes the current resource goes into the `error` state, it should
maybe_expand_batch_result(Result, Batch) -> %% return `{resource_down, Reason}`
?EXPAND(Result, Batch). EXPR
catch
ERR:REASON:STACKTRACE ->
MSG = io_lib:format(
"call query failed, func: ~s, id: ~s, error: ~0p, Request: ~0p",
[??EXPR, Id, {ERR, REASON, STACKTRACE}, REQ],
[{chars_limit, 1024}]
),
?RESOURCE_ERROR(exception, MSG)
end
).
apply_query_fun(sync, Mod, Id, ?QUERY(_From, Request), ResSt) ->
?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request);
apply_query_fun(async, Mod, Id, ?QUERY(_From, Request) = Query, ResSt) ->
ReplyFun = fun ?MODULE:reply_after_query/4,
?APPLY_RESOURCE(
begin
_ = Mod:on_query_async(Id, Request, {ReplyFun, [self(), Id, Query]}, ResSt),
ok_async
end,
Request
);
apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) ->
Requests = [Request || ?QUERY(_From, Request) <- Batch],
?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch);
apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) ->
Requests = [Request || ?QUERY(_From, Request) <- Batch],
ReplyFun = fun ?MODULE:batch_reply_after_query/4,
?APPLY_RESOURCE(
begin
_ = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [self(), Id, Batch]}, ResSt),
ok_async
end,
Batch
).
reply_after_query(Pid, Id, ?QUERY(From, Request) = Query, Result) ->
case reply_caller(Id, ?REPLY(From, Request, Result)) of
true -> ?MODULE:block(Pid, [Query]);
false -> ok
end.
batch_reply_after_query(Pid, Id, Batch, Result) ->
case batch_reply_caller(Id, Result, Batch) of
true -> ?MODULE:block(Pid, Batch);
false -> ok
end.
%%============================================================================== %%==============================================================================
call_mode(sync, _) -> sync;
call_mode(async, always_sync) -> sync;
call_mode(async, async_if_possible) -> async.
is_ok_result(ok) -> is_ok_result(ok) ->
true; true;

View File

@ -22,6 +22,7 @@
%% callbacks of behaviour emqx_resource %% callbacks of behaviour emqx_resource
-export([ -export([
callback_mode/0,
on_start/2, on_start/2,
on_stop/2, on_stop/2,
on_query/3, on_query/3,
@ -49,6 +50,8 @@ register(required) -> true;
register(default) -> false; register(default) -> false;
register(_) -> undefined. register(_) -> undefined.
callback_mode() -> always_sync.
on_start(_InstId, #{create_error := true}) -> on_start(_InstId, #{create_error := true}) ->
error("some error"); error("some error");
on_start(InstId, #{name := Name, stop_error := true} = Opts) -> on_start(InstId, #{name := Name, stop_error := true} = Opts) ->
@ -58,12 +61,6 @@ on_start(InstId, #{name := Name, stop_error := true} = Opts) ->
stop_error => true, stop_error => true,
pid => spawn_counter_process(Name, Register) pid => spawn_counter_process(Name, Register)
}}; }};
on_start(InstId, #{name := Name} = Opts) ->
Register = maps:get(register, Opts, false),
{ok, Opts#{
id => InstId,
pid => spawn_counter_process(Name, Register)
}};
on_start(InstId, #{name := Name} = Opts) -> on_start(InstId, #{name := Name} = Opts) ->
Register = maps:get(register, Opts, false), Register = maps:get(register, Opts, false),
{ok, Opts#{ {ok, Opts#{