diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 5d53234f7..41e5673d9 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -32,7 +32,8 @@ expire_at => infinity | integer(), async_reply_fun => reply_fun(), simple_query => boolean(), - is_buffer_supported => boolean() + is_buffer_supported => boolean(), + reply_to => reply_fun() }. -type resource_data() :: #{ id := resource_id(), diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 0d021b615..5f352e181 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -62,7 +62,7 @@ -define(COLLECT_REQ_LIMIT, 1000). -define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}). -define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}). --define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)). +-define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)). -define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef), {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef} @@ -149,9 +149,10 @@ simple_sync_query(Id, Request, QueryOpts0) -> QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), - Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + ReplyTo = maps:get(reply_to, QueryOpts0, undefined), + Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), - maybe_reply_to(Result, QueryOpts). + Result. %% simple async-query the resource without batching and queuing. -spec simple_async_query(id(), request(), query_opts()) -> term(). @@ -161,9 +162,12 @@ simple_async_query(Id, Request, QueryOpts0) -> QueryOpts = maps:merge(simple_query_opts(), QueryOpts0), emqx_resource_metrics:matched_inc(Id), Ref = make_request_ref(), - Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), + ReplyTo = maps:get(reply_to, QueryOpts0, undefined), + Result = call_query( + async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts + ), _ = handle_query_result(Id, Result, _HasBeenSent = false), - maybe_reply_to(Result, QueryOpts). + Result. simple_query_opts() -> ensure_expire_at(#{simple_query => true, timeout => infinity}). @@ -1056,9 +1060,14 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> end ). -apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) -> +apply_query_fun( + sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, QueryOpts +) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), - ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); + maybe_reply_to( + ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request), + QueryOpts + ); apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async @@ -1086,7 +1095,9 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re end, Request ); -apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) -> +apply_query_fun( + sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts +) -> ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 7a769c5ed..f05159e30 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -200,7 +200,11 @@ init_per_group(metrics_fail_simple, Config) -> (_) -> simple_async end), meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}), - meck:expect(?BRIDGE_IMPL, on_query_async, 4, {error, {unrecoverable_error, mecked_failure}}), + meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {ReplyFun, Args}, _) -> + Result = {error, {unrecoverable_error, mecked_failure}}, + erlang:apply(ReplyFun, Args ++ [Result]), + Result + end), [{mecked, [?BRIDGE_IMPL]} | Config]; init_per_group(_Groupname, Config) -> Config.