fix(emqx_rule_engine): set inc_action_metrics as async_reply_fun
This commit is contained in:
parent
9708a02680
commit
ae636a52d7
|
|
@ -51,7 +51,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
send_message/2,
|
send_message/2,
|
||||||
send_message/4
|
send_message/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([config_key_path/0]).
|
-export([config_key_path/0]).
|
||||||
|
|
@ -220,14 +220,15 @@ send_to_matched_egress_bridges(Topic, Msg) ->
|
||||||
send_message(BridgeId, Message) ->
|
send_message(BridgeId, Message) ->
|
||||||
{BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
{BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
||||||
ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||||
send_message(BridgeType, BridgeName, ResId, Message).
|
send_message(BridgeType, BridgeName, ResId, Message, undefined).
|
||||||
|
|
||||||
send_message(BridgeType, BridgeName, ResId, Message) ->
|
send_message(BridgeType, BridgeName, ResId, Message, ReplyTo) ->
|
||||||
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
|
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
|
||||||
not_found ->
|
not_found ->
|
||||||
{error, bridge_not_found};
|
{error, bridge_not_found};
|
||||||
#{enable := true} = Config ->
|
#{enable := true} = Config ->
|
||||||
QueryOpts = query_opts(Config),
|
QueryOpts0 = query_opts(Config),
|
||||||
|
QueryOpts = QueryOpts0#{async_reply_fun => ReplyTo},
|
||||||
emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
|
emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
|
||||||
#{enable := false} ->
|
#{enable := false} ->
|
||||||
{error, bridge_stopped}
|
{error, bridge_stopped}
|
||||||
|
|
|
||||||
|
|
@ -308,7 +308,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
pick_call(Id, Key, Query, Timeout) ->
|
pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) ->
|
||||||
?PICK(Id, Key, Pid, begin
|
?PICK(Id, Key, Pid, begin
|
||||||
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
|
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
|
||||||
ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
|
ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
|
||||||
|
|
@ -316,14 +316,14 @@ pick_call(Id, Key, Query, Timeout) ->
|
||||||
receive
|
receive
|
||||||
{MRef, Response} ->
|
{MRef, Response} ->
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
Response;
|
maybe_apply_async_reply_fun(Response, QueryOpts);
|
||||||
{'DOWN', MRef, process, Pid, Reason} ->
|
{'DOWN', MRef, process, Pid, Reason} ->
|
||||||
error({worker_down, Reason})
|
error({worker_down, Reason})
|
||||||
after Timeout ->
|
after Timeout ->
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
receive
|
receive
|
||||||
{MRef, Response} ->
|
{MRef, Response} ->
|
||||||
Response
|
maybe_apply_async_reply_fun(Response, QueryOpts)
|
||||||
after 0 ->
|
after 0 ->
|
||||||
error(timeout)
|
error(timeout)
|
||||||
end
|
end
|
||||||
|
|
@ -1051,9 +1051,12 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) ->
|
apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, QueryOpts) ->
|
||||||
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
|
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
|
||||||
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
|
maybe_apply_async_reply_fun(
|
||||||
|
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request),
|
||||||
|
QueryOpts
|
||||||
|
);
|
||||||
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
|
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
|
||||||
?tp(call_query_async, #{
|
?tp(call_query_async, #{
|
||||||
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
|
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
|
||||||
|
|
@ -1081,12 +1084,15 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
|
||||||
end,
|
end,
|
||||||
Request
|
Request
|
||||||
);
|
);
|
||||||
apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) ->
|
apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
||||||
?tp(call_batch_query, #{
|
?tp(call_batch_query, #{
|
||||||
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
|
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
|
||||||
}),
|
}),
|
||||||
Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
|
Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
|
||||||
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
|
maybe_apply_async_reply_fun(
|
||||||
|
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch),
|
||||||
|
QueryOpts
|
||||||
|
);
|
||||||
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
||||||
?tp(call_batch_query_async, #{
|
?tp(call_batch_query_async, #{
|
||||||
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
|
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
|
||||||
|
|
@ -1118,6 +1124,14 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
|
||||||
Batch
|
Batch
|
||||||
).
|
).
|
||||||
|
|
||||||
|
maybe_apply_async_reply_fun(Result, #{async_reply_fun := {ReplyFun, Args}}) when
|
||||||
|
is_function(ReplyFun)
|
||||||
|
->
|
||||||
|
_ = erlang:apply(ReplyFun, Args ++ [Result]),
|
||||||
|
Result;
|
||||||
|
maybe_apply_async_reply_fun(Result, _) ->
|
||||||
|
Result.
|
||||||
|
|
||||||
handle_async_reply(
|
handle_async_reply(
|
||||||
#{
|
#{
|
||||||
request_ref := Ref,
|
request_ref := Ref,
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,8 @@
|
||||||
-export([
|
-export([
|
||||||
apply_rule/3,
|
apply_rule/3,
|
||||||
apply_rules/3,
|
apply_rules/3,
|
||||||
clear_rule_payload/0
|
clear_rule_payload/0,
|
||||||
|
inc_action_metrics/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-import(
|
-import(
|
||||||
|
|
@ -323,9 +324,7 @@ handle_action_list(RuleId, Actions, Selected, Envs) ->
|
||||||
handle_action(RuleId, ActId, Selected, Envs) ->
|
handle_action(RuleId, ActId, Selected, Envs) ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
|
||||||
try
|
try
|
||||||
Result = do_handle_action(ActId, Selected, Envs),
|
do_handle_action(RuleId, ActId, Selected, Envs)
|
||||||
inc_action_metrics(Result, RuleId),
|
|
||||||
Result
|
|
||||||
catch
|
catch
|
||||||
throw:out_of_service ->
|
throw:out_of_service ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||||
|
|
@ -345,21 +344,24 @@ handle_action(RuleId, ActId, Selected, Envs) ->
|
||||||
})
|
})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_handle_action({bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
|
do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
|
||||||
?TRACE(
|
?TRACE(
|
||||||
"BRIDGE",
|
"BRIDGE",
|
||||||
"bridge_action",
|
"bridge_action",
|
||||||
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
|
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
|
||||||
),
|
),
|
||||||
case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected) of
|
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]},
|
||||||
|
case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, ReplyTo) of
|
||||||
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
|
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
|
||||||
throw(out_of_service);
|
throw(out_of_service);
|
||||||
Result ->
|
Result ->
|
||||||
Result
|
Result
|
||||||
end;
|
end;
|
||||||
do_handle_action(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
|
do_handle_action(RuleId, #{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
|
||||||
%% the function can also throw 'out_of_service'
|
%% the function can also throw 'out_of_service'
|
||||||
Mod:Func(Selected, Envs, Args).
|
Result = Mod:Func(Selected, Envs, Args),
|
||||||
|
inc_action_metrics(RuleId, Result),
|
||||||
|
Result.
|
||||||
|
|
||||||
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
||||||
nested_get({path, Path}, may_decode_payload(Payload));
|
nested_get({path, Path}, may_decode_payload(Payload));
|
||||||
|
|
@ -512,14 +514,18 @@ nested_put(Alias, Val, Columns0) ->
|
||||||
Columns = handle_alias(Alias, Columns0),
|
Columns = handle_alias(Alias, Columns0),
|
||||||
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
||||||
|
|
||||||
|
inc_action_metrics(RuleId, Result) ->
|
||||||
|
_ = do_inc_action_metrics(RuleId, Result),
|
||||||
|
Result.
|
||||||
|
|
||||||
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
|
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
|
||||||
inc_action_metrics({error, {recoverable_error, _}}, RuleId) ->
|
do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) ->
|
||||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
|
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
|
||||||
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
|
do_inc_action_metrics(RuleId, ?RESOURCE_ERROR_M(R, _)) when ?IS_RES_DOWN(R) ->
|
||||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
|
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
|
||||||
inc_action_metrics({error, {unrecoverable_error, _}}, RuleId) ->
|
do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) ->
|
||||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed');
|
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed');
|
||||||
inc_action_metrics(R, RuleId) ->
|
do_inc_action_metrics(RuleId, R) ->
|
||||||
case is_ok_result(R) of
|
case is_ok_result(R) of
|
||||||
false ->
|
false ->
|
||||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue