diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl index cd8451ac9..480950143 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_jwks_connector.erl @@ -22,6 +22,7 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, on_query/3, @@ -31,6 +32,8 @@ -define(DEFAULT_POOL_SIZE, 8). +callback_mode() -> always_sync. + on_start(InstId, Opts) -> PoolName = emqx_plugin_libs_pool:pool_name(InstId), PoolOpts = [ diff --git a/apps/emqx_authz/test/emqx_authz_api_cache_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_cache_SUITE.erl index 306fe3f13..0c49cc03a 100644 --- a/apps/emqx_authz/test/emqx_authz_api_cache_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_cache_SUITE.erl @@ -23,6 +23,8 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +suite() -> [{timetrap, {seconds, 60}}]. + all() -> emqx_common_test_helpers:all(?MODULE). diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index e0d5ccfe0..c5a1b89db 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -26,6 +26,7 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, on_query/3, @@ -164,6 +165,8 @@ ref(Field) -> hoconsc:ref(?MODULE, Field). %% =================================================================== +callback_mode() -> always_sync. + on_start( InstId, #{ diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 51d18b534..d53c0e41b 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -25,6 +25,7 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, on_query/3, @@ -42,6 +43,8 @@ roots() -> fields(_) -> []. %% =================================================================== +callback_mode() -> always_sync. + on_start( InstId, #{ diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index db8b1e632..07208545f 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -25,6 +25,7 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, on_query/3, @@ -139,6 +140,8 @@ mongo_fields() -> %% =================================================================== +callback_mode() -> always_sync. + on_start( InstId, Config = #{ diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 66682caeb..e37f6a9a2 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -24,6 +24,7 @@ %% API and callbacks for supervisor -export([ + callback_mode/0, start_link/0, init/1, create_bridge/1, @@ -139,6 +140,8 @@ on_message_received(Msg, HookPoint, ResId) -> emqx:run_hook(HookPoint, [Msg]). %% =================================================================== +callback_mode() -> always_sync. + on_start(InstId, Conf) -> InstanceId = binary_to_atom(InstId, utf8), ?SLOG(info, #{ diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index e818bc6ef..b379e511c 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -24,6 +24,7 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, on_query/3, @@ -73,6 +74,8 @@ server(desc) -> ?DESC("server"); server(_) -> undefined. %% =================================================================== +callback_mode() -> always_sync. + -spec on_start(binary(), hoconsc:config()) -> {ok, state()} | {error, _}. on_start( InstId, diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index d31c1316f..4b188e5a5 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -27,6 +27,7 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, on_query/3, @@ -66,6 +67,8 @@ server(desc) -> ?DESC("server"); server(_) -> undefined. %% =================================================================== +callback_mode() -> always_sync. + on_start( InstId, #{ diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index 4826a170b..fae628d9e 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -26,6 +26,7 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, on_query/3, @@ -112,6 +113,8 @@ servers(desc) -> ?DESC("servers"); servers(_) -> undefined. %% =================================================================== +callback_mode() -> always_sync. + on_start( InstId, #{ diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 13ffff587..c691789c2 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -25,6 +25,7 @@ -type resource_data() :: #{ id := resource_id(), mod := module(), + callback_mode := always_sync | async_if_possible, config := resource_config(), state := resource_state(), status := resource_status(), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index af047060c..a54a77e19 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -75,8 +75,7 @@ %% stop the instance stop/1, %% query the instance - query/2, - query_async/3 + query/2 ]). %% Direct calls to the callback module @@ -224,12 +223,12 @@ reset_metrics(ResId) -> %% ================================================================================= -spec query(resource_id(), Request :: term()) -> Result :: term(). query(ResId, Request) -> - emqx_resource_worker:query(ResId, Request). + query(ResId, Request, #{}). --spec query_async(resource_id(), Request :: term(), emqx_resource_worker:reply_fun()) -> +-spec query(resource_id(), Request :: term(), emqx_resource_worker:query_opts()) -> Result :: term(). -query_async(ResId, Request, ReplyFun) -> - emqx_resource_worker:query_async(ResId, Request, ReplyFun). +query(ResId, Request, Opts) -> + emqx_resource_worker:query(ResId, Request, Opts). -spec start(resource_id()) -> ok | {error, Reason :: term()}. start(ResId) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 7dad85d5b..d6e5a1493 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -53,7 +53,7 @@ -export([init/1, callback_mode/0, handle_event/4, terminate/3]). % State record --record(data, {id, manager_id, group, mod, config, opts, status, state, error}). +-record(data, {id, manager_id, group, mod, callback_mode, config, opts, status, state, error}). -define(SHORT_HEALTHCHECK_INTERVAL, 1000). -define(HEALTHCHECK_INTERVAL, 15000). @@ -259,6 +259,7 @@ start_link(MgrId, ResId, Group, ResourceType, Config, Opts) -> manager_id = MgrId, group = Group, mod = ResourceType, + callback_mode = ResourceType:callback_mode(), config = Config, opts = Opts, status = connecting, @@ -559,10 +560,12 @@ maybe_reply(Actions, undefined, _Reply) -> maybe_reply(Actions, From, Reply) -> [{reply, From, Reply} | Actions]. +-spec data_record_to_external_map_with_metrics(#data{}) -> resource_data(). data_record_to_external_map_with_metrics(Data) -> #{ id => Data#data.id, mod => Data#data.mod, + callback_mode => Data#data.callback_mode, config => Data#data.config, status => Data#data.status, state => Data#data.state, diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index 055fbfc53..2115fed86 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -27,11 +27,9 @@ -export([ start_link/3, - query/2, query/3, - query_async/3, - query_async/4, block/1, + block/2, resume/1 ]). @@ -46,6 +44,8 @@ -export([queue_item_marshaller/1, estimate_size/1]). +-export([reply_after_query/4, batch_reply_after_query/4]). + -define(RESUME_INTERVAL, 15000). %% count @@ -55,8 +55,8 @@ -define(Q_ITEM(REQUEST), {q_item, REQUEST}). --define(QUERY(FROM, REQUEST), {FROM, REQUEST}). --define(REPLY(FROM, REQUEST, RESULT), {FROM, REQUEST, RESULT}). +-define(QUERY(FROM, REQUEST), {query, FROM, REQUEST}). +-define(REPLY(FROM, REQUEST, RESULT), {reply, FROM, REQUEST, RESULT}). -define(EXPAND(RESULT, BATCH), [?REPLY(FROM, REQUEST, RESULT) || ?QUERY(FROM, REQUEST) <- BATCH]). -define(RESOURCE_ERROR(Reason, Msg), @@ -65,10 +65,17 @@ -define(RESOURCE_ERROR_M(Reason, Msg), {error, {resource_error, #{reason := Reason, msg := Msg}}}). -type id() :: binary(). +-type query() :: {query, from(), request()}. -type request() :: term(). -type result() :: term(). -type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined. -type from() :: pid() | reply_fun(). +-type query_opts() :: #{ + %% The key used for picking a resource worker + pick_key => term() +}. + +-export_type([query_opts/0]). -callback batcher_flush(Acc :: [{from(), request()}], CbState :: term()) -> {{from(), result()}, NewCbState :: term()}. @@ -78,26 +85,20 @@ callback_mode() -> [state_functions]. start_link(Id, Index, Opts) -> gen_statem:start_link({local, name(Id, Index)}, ?MODULE, {Id, Index, Opts}, []). --spec query(id(), request()) -> Result :: term(). -query(Id, Request) -> - query(Id, self(), Request). - --spec query(id(), term(), request()) -> Result :: term(). -query(Id, PickKey, Request) -> - pick_query(call, Id, PickKey, {query, Request}). - --spec query_async(id(), request(), reply_fun()) -> Result :: term(). -query_async(Id, Request, ReplyFun) -> - query_async(Id, self(), Request, ReplyFun). - --spec query_async(id(), term(), request(), reply_fun()) -> Result :: term(). -query_async(Id, PickKey, Request, ReplyFun) -> - pick_query(cast, Id, PickKey, {query, Request, ReplyFun}). +-spec query(id(), request(), query_opts()) -> Result :: term(). +query(Id, Request, Opts) -> + PickKey = maps:get(pick_key, Opts, self()), + Timeout = maps:get(timeout, Opts, infinity), + pick_call(Id, PickKey, {query, Request}, Timeout). -spec block(pid() | atom()) -> ok. block(ServerRef) -> gen_statem:cast(ServerRef, block). +-spec block(pid() | atom(), [query()]) -> ok. +block(ServerRef, Query) -> + gen_statem:cast(ServerRef, {block, Query}). + -spec resume(pid() | atom()) -> ok. resume(ServerRef) -> gen_statem:cast(ServerRef, resume). @@ -121,6 +122,12 @@ init({Id, Index, Opts}) -> St = #{ id => Id, index => Index, + %% query_mode = dynamic | sync | async + %% TODO: + %% dynamic mode is async mode when things are going well, but becomes sync mode + %% if the resource worker is overloaded + query_mode => maps:get(query_mode, Opts, sync), + async_reply_fun => maps:get(async_reply_fun, Opts, undefined), batch_enabled => maps:get(batch_enabled, Opts, false), batch_size => BatchSize, batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), @@ -135,9 +142,11 @@ running(cast, resume, _St) -> keep_state_and_data; running(cast, block, St) -> {next_state, block, St}; -running(cast, {query, Request, ReplyFun}, St) -> - query_or_acc(ReplyFun, Request, St); -running({call, From}, {query, Request}, St) -> +running(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) -> + Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]), + {next_state, block, St#{queue := Q1}}; +running({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) -> + From = maybe_quick_return(QM, From0, ReplyFun), query_or_acc(From, Request, St); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); @@ -149,13 +158,15 @@ running(info, Info, _St) -> blocked(cast, block, _St) -> keep_state_and_data; +blocked(cast, {block, [?QUERY(_, _) | _] = Batch}, #{queue := Q} = St) when is_list(Batch) -> + Q1 = maybe_append_queue(Q, [?Q_ITEM(Query) || Query <- Batch]), + {keep_state, St#{queue := Q1}}; blocked(cast, resume, St) -> do_resume(St); blocked(state_timeout, resume, St) -> do_resume(St); -blocked(cast, {query, Request, ReplyFun}, St) -> - handle_blocked(ReplyFun, Request, St); -blocked({call, From}, {query, Request}, St) -> +blocked({call, From0}, {query, Request}, #{query_mode := QM, async_reply_fun := ReplyFun} = St) -> + From = maybe_quick_return(QM, From0, ReplyFun), handle_blocked(From, Request, St). terminate(_Reason, #{id := Id, index := Index}) -> @@ -173,15 +184,23 @@ estimate_size(QItem) -> size(queue_item_marshaller(QItem)). %%============================================================================== -pick_query(Fun, Id, Key, Query) -> +maybe_quick_return(sync, From, _ReplyFun) -> + From; +maybe_quick_return(async, From, ReplyFun) -> + ok = gen_statem:reply(From), + ReplyFun. + +pick_call(Id, Key, Query, Timeout) -> try gproc_pool:pick_worker(Id, Key) of Pid when is_pid(Pid) -> - gen_statem:Fun(Pid, Query); + gen_statem:call(Pid, Query, {clean_timeout, Timeout}); _ -> ?RESOURCE_ERROR(not_created, "resource not found") catch error:badarg -> - ?RESOURCE_ERROR(not_created, "resource not found") + ?RESOURCE_ERROR(not_created, "resource not found"); + exit:{timeout, _} -> + ?RESOURCE_ERROR(timeout, "call resource timeout") end. do_resume(#{queue := undefined} = St) -> @@ -190,8 +209,8 @@ do_resume(#{queue := Q, id := Id} = St) -> case replayq:peek(Q) of empty -> {next_state, running, St}; - ?Q_ITEM(First) -> - Result = call_query(Id, First), + ?Q_ITEM(FirstQuery) -> + Result = call_query(sync, Id, FirstQuery, 1), case handle_query_result(Id, Result, false) of %% Send failed because resource down true -> @@ -206,8 +225,8 @@ do_resume(#{queue := Q, id := Id} = St) -> handle_blocked(From, Request, #{id := Id, queue := Q} = St) -> Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = reply_caller(Id, ?REPLY(From, Request, Error), false), - {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}}. + _ = reply_caller(Id, ?REPLY(From, Request, Error)), + {keep_state, St#{queue := maybe_append_queue(Q, [?Q_ITEM(?QUERY(From, Request))])}}. drop_head(Q) -> {Q1, AckRef, _} = replayq:pop(Q, #{count_limit => 1}), @@ -221,17 +240,18 @@ query_or_acc(From, Request, #{batch_enabled := true, acc := Acc, acc_left := Lef true -> flush(St); false -> {keep_state, ensure_flush_timer(St)} end; -query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id} = St) -> - case send_query(From, Request, Id) of +query_or_acc(From, Request, #{batch_enabled := false, queue := Q, id := Id, query_mode := QM} = St) -> + case send_query(QM, From, Request, Id) of true -> - {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Request)])}}; + Query = ?QUERY(From, Request), + {next_state, blocked, St#{queue := maybe_append_queue(Q, [?Q_ITEM(Query)])}}; false -> {keep_state, St} end. -send_query(From, Request, Id) -> - Result = call_query(Id, Request), - reply_caller(Id, ?REPLY(From, Request, Result), false). +send_query(QM, From, Request, Id) -> + Result = call_query(QM, Id, ?QUERY(From, Request), 1), + reply_caller(Id, ?REPLY(From, Request, Result)). flush(#{acc := []} = St) -> {keep_state, St}; @@ -240,31 +260,37 @@ flush( id := Id, acc := Batch, batch_size := Size, - queue := Q0 + queue := Q0, + query_mode := QM } = St ) -> - BatchResults = maybe_expand_batch_result(call_batch_query(Id, Batch), Batch), + Result = call_query(QM, Id, Batch, length(Batch)), St1 = cancel_flush_timer(St#{acc_left := Size, acc := []}), - case batch_reply_caller(Id, BatchResults) of + case batch_reply_caller(Id, Result, Batch) of true -> - Q1 = maybe_append_queue(Q0, [?Q_ITEM(Request) || ?QUERY(_, Request) <- Batch]), + Q1 = maybe_append_queue(Q0, [?Q_ITEM(Query) || Query <- Batch]), {next_state, blocked, St1#{queue := Q1}}; false -> {keep_state, St1} end. -maybe_append_queue(undefined, _Request) -> undefined; -maybe_append_queue(Q, Request) -> replayq:append(Q, Request). +maybe_append_queue(undefined, _Items) -> undefined; +maybe_append_queue(Q, Items) -> replayq:append(Q, Items). -batch_reply_caller(Id, BatchResults) -> +batch_reply_caller(Id, BatchResult, Batch) -> lists:foldl( fun(Reply, BlockWorker) -> reply_caller(Id, Reply, BlockWorker) end, false, - BatchResults + %% the `Mod:on_batch_query/3` returns a single result for a batch, + %% so we need to expand + ?EXPAND(BatchResult, Batch) ). +reply_caller(Id, Reply) -> + reply_caller(Id, Reply, false). + reply_caller(Id, ?REPLY(undefined, _, Result), BlockWorker) -> handle_query_result(Id, Result, BlockWorker); reply_caller(Id, ?REPLY({ReplyFun, Args}, _, Result), BlockWorker) when is_function(ReplyFun) -> @@ -295,45 +321,77 @@ handle_query_result(Id, Result, BlockWorker) -> emqx_metrics_worker:inc(?RES_METRICS, Id, success), BlockWorker. -call_query(Id, Request) -> - do_call_query(on_query, Id, Request, 1). - -call_batch_query(Id, Batch) -> - do_call_query(on_batch_query, Id, Batch, length(Batch)). - -do_call_query(Fun, Id, Data, Count) -> +call_query(QM, Id, Query, QueryLen) -> case emqx_resource_manager:ets_lookup(Id) of - {ok, _Group, #{mod := Mod, state := ResourceState, status := connected}} -> - ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, Count), - try Mod:Fun(Id, Data, ResourceState) of - %% if the callback module (connector) wants to return an error that - %% makes the current resource goes into the `error` state, it should - %% return `{resource_down, Reason}` - Result -> Result - catch - Err:Reason:ST -> - Msg = io_lib:format( - "call query failed, func: ~s:~s/3, error: ~0p", - [Mod, Fun, {Err, Reason, ST}] - ), - ?RESOURCE_ERROR(exception, Msg) - end; + {ok, _Group, #{callback_mode := CM, mod := Mod, state := ResSt, status := connected}} -> + ok = emqx_metrics_worker:inc(?RES_METRICS, Id, matched, QueryLen), + apply_query_fun(call_mode(QM, CM), Mod, Id, Query, ResSt); {ok, _Group, #{status := stopped}} -> ?RESOURCE_ERROR(stopped, "resource stopped or disabled"); - {ok, _Group, _Data} -> + {ok, _Group, #{status := S}} when S == connecting; S == disconnected -> ?RESOURCE_ERROR(not_connected, "resource not connected"); {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") end. -%% the result is already expaned by the `Mod:on_query/3` -maybe_expand_batch_result(Results, _Batch) when is_list(Results) -> - Results; -%% the `Mod:on_query/3` returns a sinle result for a batch, so it is need expand -maybe_expand_batch_result(Result, Batch) -> - ?EXPAND(Result, Batch). +-define(APPLY_RESOURCE(EXPR, REQ), + try + %% if the callback module (connector) wants to return an error that + %% makes the current resource goes into the `error` state, it should + %% return `{resource_down, Reason}` + EXPR + catch + ERR:REASON:STACKTRACE -> + MSG = io_lib:format( + "call query failed, func: ~s, id: ~s, error: ~0p, Request: ~0p", + [??EXPR, Id, {ERR, REASON, STACKTRACE}, REQ], + [{chars_limit, 1024}] + ), + ?RESOURCE_ERROR(exception, MSG) + end +). + +apply_query_fun(sync, Mod, Id, ?QUERY(_From, Request), ResSt) -> + ?APPLY_RESOURCE(Mod:on_query(Id, Request, ResSt), Request); +apply_query_fun(async, Mod, Id, ?QUERY(_From, Request) = Query, ResSt) -> + ReplyFun = fun ?MODULE:reply_after_query/4, + ?APPLY_RESOURCE( + begin + _ = Mod:on_query_async(Id, Request, {ReplyFun, [self(), Id, Query]}, ResSt), + ok_async + end, + Request + ); +apply_query_fun(sync, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) -> + Requests = [Request || ?QUERY(_From, Request) <- Batch], + ?APPLY_RESOURCE(Mod:on_batch_query(Id, Requests, ResSt), Batch); +apply_query_fun(async, Mod, Id, [?QUERY(_, _) | _] = Batch, ResSt) -> + Requests = [Request || ?QUERY(_From, Request) <- Batch], + ReplyFun = fun ?MODULE:batch_reply_after_query/4, + ?APPLY_RESOURCE( + begin + _ = Mod:on_batch_query_async(Id, Requests, {ReplyFun, [self(), Id, Batch]}, ResSt), + ok_async + end, + Batch + ). + +reply_after_query(Pid, Id, ?QUERY(From, Request) = Query, Result) -> + case reply_caller(Id, ?REPLY(From, Request, Result)) of + true -> ?MODULE:block(Pid, [Query]); + false -> ok + end. + +batch_reply_after_query(Pid, Id, Batch, Result) -> + case batch_reply_caller(Id, Result, Batch) of + true -> ?MODULE:block(Pid, Batch); + false -> ok + end. %%============================================================================== +call_mode(sync, _) -> sync; +call_mode(async, always_sync) -> sync; +call_mode(async, async_if_possible) -> async. is_ok_result(ok) -> true; diff --git a/apps/emqx_resource/test/emqx_connector_demo.erl b/apps/emqx_resource/test/emqx_connector_demo.erl index e9c77e915..740f110ec 100644 --- a/apps/emqx_resource/test/emqx_connector_demo.erl +++ b/apps/emqx_resource/test/emqx_connector_demo.erl @@ -22,6 +22,7 @@ %% callbacks of behaviour emqx_resource -export([ + callback_mode/0, on_start/2, on_stop/2, on_query/3, @@ -49,6 +50,8 @@ register(required) -> true; register(default) -> false; register(_) -> undefined. +callback_mode() -> always_sync. + on_start(_InstId, #{create_error := true}) -> error("some error"); on_start(InstId, #{name := Name, stop_error := true} = Opts) -> @@ -58,12 +61,6 @@ on_start(InstId, #{name := Name, stop_error := true} = Opts) -> stop_error => true, pid => spawn_counter_process(Name, Register) }}; -on_start(InstId, #{name := Name} = Opts) -> - Register = maps:get(register, Opts, false), - {ok, Opts#{ - id => InstId, - pid => spawn_counter_process(Name, Register) - }}; on_start(InstId, #{name := Name} = Opts) -> Register = maps:get(register, Opts, false), {ok, Opts#{