fix: use reply_to instead of async_reply_fun

This commit is contained in:
Stefan Strigler 2023-06-28 16:09:05 +02:00
parent 1363108678
commit 40dd34a704
2 changed files with 12 additions and 19 deletions

View File

@ -228,7 +228,7 @@ send_message(BridgeType, BridgeName, ResId, Message, ReplyTo) ->
{error, bridge_not_found}; {error, bridge_not_found};
#{enable := true} = Config -> #{enable := true} = Config ->
QueryOpts0 = query_opts(Config), QueryOpts0 = query_opts(Config),
QueryOpts = QueryOpts0#{async_reply_fun => ReplyTo}, QueryOpts = QueryOpts0#{reply_to => ReplyTo},
emqx_resource:query(ResId, {send_message, Message}, QueryOpts); emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
#{enable := false} -> #{enable := false} ->
{error, bridge_stopped} {error, bridge_stopped}

View File

@ -163,9 +163,7 @@ simple_async_query(Id, Request, QueryOpts0) ->
Ref = make_request_ref(), Ref = make_request_ref(),
Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts), Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
_ = handle_query_result(Id, Result, _HasBeenSent = false), _ = handle_query_result(Id, Result, _HasBeenSent = false),
maybe_apply_async_reply_fun( maybe_reply_to(Result, QueryOpts).
Result, QueryOpts
).
simple_query_opts() -> simple_query_opts() ->
ensure_expire_at(#{simple_query => true, timeout => infinity}). ensure_expire_at(#{simple_query => true, timeout => infinity}).
@ -323,23 +321,23 @@ pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) ->
receive receive
{MRef, Response} -> {MRef, Response} ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
maybe_apply_async_reply_fun(Response, QueryOpts); maybe_reply_to(Response, QueryOpts);
{'DOWN', MRef, process, Pid, Reason} -> {'DOWN', MRef, process, Pid, Reason} ->
error({worker_down, Reason}) error({worker_down, Reason})
after Timeout -> after Timeout ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
receive receive
{MRef, Response} -> {MRef, Response} ->
maybe_apply_async_reply_fun(Response, QueryOpts) maybe_reply_to(Response, QueryOpts)
after 0 -> after 0 ->
error(timeout) error(timeout)
end end
end end
end). end).
pick_cast(Id, Key, Query) -> pick_cast(Id, Key, Query = {query, _Request, QueryOpts}) ->
?PICK(Id, Key, Pid, begin ?PICK(Id, Key, Pid, begin
ReplyTo = undefined, ReplyTo = maps:get(reply_to, QueryOpts, undefined),
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)), erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
ok ok
end). end).
@ -1058,12 +1056,9 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
end end
). ).
apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, QueryOpts) -> apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) ->
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
maybe_apply_async_reply_fun( ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request),
QueryOpts
);
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) -> apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
?tp(call_query_async, #{ ?tp(call_query_async, #{
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
@ -1096,7 +1091,7 @@ apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, R
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
}), }),
Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch), Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
maybe_apply_async_reply_fun( maybe_reply_to(
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch), ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch),
QueryOpts QueryOpts
); );
@ -1131,12 +1126,10 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
Batch Batch
). ).
maybe_apply_async_reply_fun(Result, #{async_reply_fun := {ReplyFun, Args}}) when maybe_reply_to(Result, #{reply_to := ReplyTo}) ->
is_function(ReplyFun) do_reply_caller(ReplyTo, Result),
->
_ = erlang:apply(ReplyFun, Args ++ [Result]),
Result; Result;
maybe_apply_async_reply_fun(Result, _) -> maybe_reply_to(Result, _) ->
Result. Result.
handle_async_reply( handle_async_reply(