diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 9bdc1b3c2..9659a2961 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -51,7 +51,7 @@ -export([ send_message/2, - send_message/4 + send_message/5 ]). -export([config_key_path/0]). @@ -220,14 +220,15 @@ send_to_matched_egress_bridges(Topic, Msg) -> send_message(BridgeId, Message) -> {BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId), ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), - send_message(BridgeType, BridgeName, ResId, Message). + send_message(BridgeType, BridgeName, ResId, Message, undefined). -send_message(BridgeType, BridgeName, ResId, Message) -> +send_message(BridgeType, BridgeName, ResId, Message, ReplyTo) -> case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of not_found -> {error, bridge_not_found}; #{enable := true} = Config -> - QueryOpts = query_opts(Config), + QueryOpts0 = query_opts(Config), + QueryOpts = QueryOpts0#{async_reply_fun => ReplyTo}, emqx_resource:query(ResId, {send_message, Message}, QueryOpts); #{enable := false} -> {error, bridge_stopped} diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8f93e0def..a8d9a6c15 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -308,7 +308,7 @@ code_change(_OldVsn, State, _Extra) -> end ). -pick_call(Id, Key, Query, Timeout) -> +pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) -> ?PICK(Id, Key, Pid, begin MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), ReplyTo = {fun ?MODULE:reply_call/2, [MRef]}, @@ -316,14 +316,14 @@ pick_call(Id, Key, Query, Timeout) -> receive {MRef, Response} -> erlang:demonitor(MRef, [flush]), - Response; + maybe_apply_async_reply_fun(Response, QueryOpts); {'DOWN', MRef, process, Pid, Reason} -> error({worker_down, Reason}) after Timeout -> erlang:demonitor(MRef, [flush]), receive {MRef, Response} -> - Response + maybe_apply_async_reply_fun(Response, QueryOpts) after 0 -> error(timeout) end @@ -1051,9 +1051,12 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> end ). -apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, QueryOpts) -> ?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}), - ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request); + maybe_apply_async_reply_fun( + ?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request), + QueryOpts + ); apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) -> ?tp(call_query_async, #{ id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async @@ -1081,12 +1084,15 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re end, Request ); -apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) -> +apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync }), Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch), - ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch); + maybe_apply_async_reply_fun( + ?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch), + QueryOpts + ); apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) -> ?tp(call_batch_query_async, #{ id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async @@ -1118,6 +1124,14 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re Batch ). +maybe_apply_async_reply_fun(Result, #{async_reply_fun := {ReplyFun, Args}}) when + is_function(ReplyFun) +-> + _ = erlang:apply(ReplyFun, Args ++ [Result]), + Result; +maybe_apply_async_reply_fun(Result, _) -> + Result. + handle_async_reply( #{ request_ref := Ref, diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index c0afa0939..7b981b781 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -23,7 +23,8 @@ -export([ apply_rule/3, apply_rules/3, - clear_rule_payload/0 + clear_rule_payload/0, + inc_action_metrics/2 ]). -import( @@ -323,9 +324,7 @@ handle_action_list(RuleId, Actions, Selected, Envs) -> handle_action(RuleId, ActId, Selected, Envs) -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'), try - Result = do_handle_action(ActId, Selected, Envs), - inc_action_metrics(Result, RuleId), - Result + do_handle_action(RuleId, ActId, Selected, Envs) catch throw:out_of_service -> ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'), @@ -345,21 +344,24 @@ handle_action(RuleId, ActId, Selected, Envs) -> }) end. -do_handle_action({bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) -> +do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) -> ?TRACE( "BRIDGE", "bridge_action", #{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)} ), - case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected) of + ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]}, + case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, ReplyTo) of {error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped -> throw(out_of_service); Result -> Result end; -do_handle_action(#{mod := Mod, func := Func, args := Args}, Selected, Envs) -> +do_handle_action(RuleId, #{mod := Mod, func := Func, args := Args}, Selected, Envs) -> %% the function can also throw 'out_of_service' - Mod:Func(Selected, Envs, Args). + Result = Mod:Func(Selected, Envs, Args), + inc_action_metrics(RuleId, Result), + Result. eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> nested_get({path, Path}, may_decode_payload(Payload)); @@ -512,14 +514,18 @@ nested_put(Alias, Val, Columns0) -> Columns = handle_alias(Alias, Columns0), emqx_rule_maps:nested_put(Alias, Val, Columns). +inc_action_metrics(RuleId, Result) -> + _ = do_inc_action_metrics(RuleId, Result), + Result. + -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). -inc_action_metrics({error, {recoverable_error, _}}, RuleId) -> +do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); -inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> +do_inc_action_metrics(RuleId, ?RESOURCE_ERROR_M(R, _)) when ?IS_RES_DOWN(R) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); -inc_action_metrics({error, {unrecoverable_error, _}}, RuleId) -> +do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'); -inc_action_metrics(R, RuleId) -> +do_inc_action_metrics(RuleId, R) -> case is_ok_result(R) of false -> emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),