diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 9659a2961..7391028ee 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -228,7 +228,7 @@ send_message(BridgeType, BridgeName, ResId, Message, ReplyTo) -> {error, bridge_not_found}; #{enable := true} = Config -> QueryOpts0 = query_opts(Config), - QueryOpts = QueryOpts0#{async_reply_fun => ReplyTo}, + QueryOpts = QueryOpts0#{reply_to => ReplyTo}, emqx_resource:query(ResId, {send_message, Message}, QueryOpts); #{enable := false} -> {error, bridge_stopped} diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 6824290b2..0d021b615 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -163,9 +163,7 @@ simple_async_query(Id, Request, QueryOpts0) -> Ref = make_request_ref(), Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), _ = handle_query_result(Id, Result, _HasBeenSent = false), - maybe_apply_async_reply_fun( - Result, QueryOpts - ). + maybe_reply_to(Result, QueryOpts). simple_query_opts() -> ensure_expire_at(#{simple_query => true, timeout => infinity}). @@ -323,23 +321,23 @@ pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) -> receive {MRef, Response} -> erlang:demonitor(MRef, [flush]), - maybe_apply_async_reply_fun(Response, QueryOpts); + maybe_reply_to(Response, QueryOpts); {'DOWN', MRef, process, Pid, Reason} -> error({worker_down, Reason}) after Timeout -> erlang:demonitor(MRef, [flush]), receive {MRef, Response} -> - maybe_apply_async_reply_fun(Response, QueryOpts) + maybe_reply_to(Response, QueryOpts) after 0 -> error(timeout) end end end). -pick_cast(Id, Key, Query) -> +pick_cast(Id, Key, Query = {query, _Request, QueryOpts}) -> ?PICK(Id, Key, Pid, begin - ReplyTo = undefined, + ReplyTo = maps:get(reply_to, QueryOpts, undefined), erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)), ok end). @@ -1058,12 +1056,9 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> end ). -apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), - maybe_apply_async_reply_fun( - ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request), - QueryOpts - ); + ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async @@ -1096,7 +1091,7 @@ apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, R id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch), - maybe_apply_async_reply_fun( + maybe_reply_to( ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch), QueryOpts ); @@ -1131,12 +1126,10 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re Batch ). -maybe_apply_async_reply_fun(Result, #{async_reply_fun := {ReplyFun, Args}}) when - is_function(ReplyFun) --> - _ = erlang:apply(ReplyFun, Args ++ [Result]), +maybe_reply_to(Result, #{reply_to := ReplyTo}) -> + do_reply_caller(ReplyTo, Result), Result; -maybe_apply_async_reply_fun(Result, _) -> +maybe_reply_to(Result, _) -> Result. handle_async_reply(