Merge pull request #11126 from sstrigler/EMQX-8842-fix-rule-metrics
fix(emqx_rule_engine): set inc_action_metrics as async_reply_fun
This commit is contained in:
commit
07cf250093
|
@ -51,7 +51,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
send_message/2,
|
send_message/2,
|
||||||
send_message/4
|
send_message/5
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([config_key_path/0]).
|
-export([config_key_path/0]).
|
||||||
|
@ -220,14 +220,14 @@ send_to_matched_egress_bridges(Topic, Msg) ->
|
||||||
send_message(BridgeId, Message) ->
|
send_message(BridgeId, Message) ->
|
||||||
{BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
{BridgeType, BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
||||||
ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
ResId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||||
send_message(BridgeType, BridgeName, ResId, Message).
|
send_message(BridgeType, BridgeName, ResId, Message, #{}).
|
||||||
|
|
||||||
send_message(BridgeType, BridgeName, ResId, Message) ->
|
send_message(BridgeType, BridgeName, ResId, Message, QueryOpts0) ->
|
||||||
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
|
case emqx:get_config([?ROOT_KEY, BridgeType, BridgeName], not_found) of
|
||||||
not_found ->
|
not_found ->
|
||||||
{error, bridge_not_found};
|
{error, bridge_not_found};
|
||||||
#{enable := true} = Config ->
|
#{enable := true} = Config ->
|
||||||
QueryOpts = query_opts(Config),
|
QueryOpts = maps:merge(query_opts(Config), QueryOpts0),
|
||||||
emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
|
emqx_resource:query(ResId, {send_message, Message}, QueryOpts);
|
||||||
#{enable := false} ->
|
#{enable := false} ->
|
||||||
{error, bridge_stopped}
|
{error, bridge_stopped}
|
||||||
|
|
|
@ -96,11 +96,10 @@ delete_all_bridges() ->
|
||||||
).
|
).
|
||||||
|
|
||||||
%% test helpers
|
%% test helpers
|
||||||
parse_and_check(Config, ConfigString, Name) ->
|
parse_and_check(BridgeType, BridgeName, ConfigString) ->
|
||||||
BridgeType = ?config(bridge_type, Config),
|
|
||||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
||||||
#{<<"bridges">> := #{BridgeType := #{Name := BridgeConfig}}} = RawConf,
|
#{<<"bridges">> := #{BridgeType := #{BridgeName := BridgeConfig}}} = RawConf,
|
||||||
BridgeConfig.
|
BridgeConfig.
|
||||||
|
|
||||||
resource_id(Config) ->
|
resource_id(Config) ->
|
||||||
|
|
|
@ -119,13 +119,14 @@ bridge_config(TestCase, _TestGroup, Config) ->
|
||||||
Host = ?config(bridge_host, Config),
|
Host = ?config(bridge_host, Config),
|
||||||
Port = ?config(bridge_port, Config),
|
Port = ?config(bridge_port, Config),
|
||||||
Version = ?config(iotdb_version, Config),
|
Version = ?config(iotdb_version, Config),
|
||||||
|
Type = ?config(bridge_type, Config),
|
||||||
Name = <<
|
Name = <<
|
||||||
(atom_to_binary(TestCase))/binary, UniqueNum/binary
|
(atom_to_binary(TestCase))/binary, UniqueNum/binary
|
||||||
>>,
|
>>,
|
||||||
ServerURL = iotdb_server_url(Host, Port),
|
ServerURL = iotdb_server_url(Host, Port),
|
||||||
ConfigString =
|
ConfigString =
|
||||||
io_lib:format(
|
io_lib:format(
|
||||||
"bridges.iotdb.~s {\n"
|
"bridges.~s.~s {\n"
|
||||||
" enable = true\n"
|
" enable = true\n"
|
||||||
" base_url = \"~s\"\n"
|
" base_url = \"~s\"\n"
|
||||||
" authentication = {\n"
|
" authentication = {\n"
|
||||||
|
@ -142,12 +143,13 @@ bridge_config(TestCase, _TestGroup, Config) ->
|
||||||
" }\n"
|
" }\n"
|
||||||
"}\n",
|
"}\n",
|
||||||
[
|
[
|
||||||
|
Type,
|
||||||
Name,
|
Name,
|
||||||
ServerURL,
|
ServerURL,
|
||||||
Version
|
Version
|
||||||
]
|
]
|
||||||
),
|
),
|
||||||
{Name, ConfigString, emqx_bridge_testlib:parse_and_check(Config, ConfigString, Name)}.
|
{Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}.
|
||||||
|
|
||||||
make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
|
make_iotdb_payload(DeviceId, Measurement, Type, Value) ->
|
||||||
#{
|
#{
|
||||||
|
|
|
@ -32,7 +32,8 @@
|
||||||
expire_at => infinity | integer(),
|
expire_at => infinity | integer(),
|
||||||
async_reply_fun => reply_fun(),
|
async_reply_fun => reply_fun(),
|
||||||
simple_query => boolean(),
|
simple_query => boolean(),
|
||||||
is_buffer_supported => boolean()
|
is_buffer_supported => boolean(),
|
||||||
|
reply_to => reply_fun()
|
||||||
}.
|
}.
|
||||||
-type resource_data() :: #{
|
-type resource_data() :: #{
|
||||||
id := resource_id(),
|
id := resource_id(),
|
||||||
|
|
|
@ -292,7 +292,7 @@ query(ResId, Request, Opts) ->
|
||||||
{simple_sync, _} ->
|
{simple_sync, _} ->
|
||||||
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
|
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
|
||||||
%% so the buffer worker does not need to lookup the cache again
|
%% so the buffer worker does not need to lookup the cache again
|
||||||
emqx_resource_buffer_worker:simple_sync_query(ResId, Request);
|
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
|
||||||
{sync, _} ->
|
{sync, _} ->
|
||||||
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
|
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
|
||||||
{async, _} ->
|
{async, _} ->
|
||||||
|
|
|
@ -38,6 +38,7 @@
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
simple_sync_query/2,
|
simple_sync_query/2,
|
||||||
|
simple_sync_query/3,
|
||||||
simple_async_query/3
|
simple_async_query/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -61,7 +62,7 @@
|
||||||
-define(COLLECT_REQ_LIMIT, 1000).
|
-define(COLLECT_REQ_LIMIT, 1000).
|
||||||
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
|
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
|
||||||
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
|
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
|
||||||
-define(SIMPLE_QUERY(REQUEST), ?QUERY(undefined, REQUEST, false, infinity)).
|
-define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)).
|
||||||
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
|
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
|
||||||
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
|
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
|
||||||
{Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
|
{Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
|
||||||
|
@ -133,6 +134,10 @@ async_query(Id, Request, Opts0) ->
|
||||||
%% simple query the resource without batching and queuing.
|
%% simple query the resource without batching and queuing.
|
||||||
-spec simple_sync_query(id(), request()) -> term().
|
-spec simple_sync_query(id(), request()) -> term().
|
||||||
simple_sync_query(Id, Request) ->
|
simple_sync_query(Id, Request) ->
|
||||||
|
simple_sync_query(Id, Request, #{}).
|
||||||
|
|
||||||
|
-spec simple_sync_query(id(), request(), query_opts()) -> term().
|
||||||
|
simple_sync_query(Id, Request, QueryOpts0) ->
|
||||||
%% Note: since calling this function implies in bypassing the
|
%% Note: since calling this function implies in bypassing the
|
||||||
%% buffer workers, and each buffer worker index is used when
|
%% buffer workers, and each buffer worker index is used when
|
||||||
%% collecting gauge metrics, we use this dummy index. If this
|
%% collecting gauge metrics, we use this dummy index. If this
|
||||||
|
@ -141,10 +146,11 @@ simple_sync_query(Id, Request) ->
|
||||||
%% `emqx_resource_metrics:*_shift/3'.
|
%% `emqx_resource_metrics:*_shift/3'.
|
||||||
?tp(simple_sync_query, #{id => Id, request => Request}),
|
?tp(simple_sync_query, #{id => Id, request => Request}),
|
||||||
Index = undefined,
|
Index = undefined,
|
||||||
QueryOpts = simple_query_opts(),
|
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
|
||||||
emqx_resource_metrics:matched_inc(Id),
|
emqx_resource_metrics:matched_inc(Id),
|
||||||
Ref = make_request_ref(),
|
Ref = make_request_ref(),
|
||||||
Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
|
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
|
||||||
|
Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts),
|
||||||
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
|
@ -156,7 +162,10 @@ simple_async_query(Id, Request, QueryOpts0) ->
|
||||||
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
|
QueryOpts = maps:merge(simple_query_opts(), QueryOpts0),
|
||||||
emqx_resource_metrics:matched_inc(Id),
|
emqx_resource_metrics:matched_inc(Id),
|
||||||
Ref = make_request_ref(),
|
Ref = make_request_ref(),
|
||||||
Result = call_query(async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(Request), QueryOpts),
|
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
|
||||||
|
Result = call_query(
|
||||||
|
async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts
|
||||||
|
),
|
||||||
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
|
@ -308,7 +317,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
pick_call(Id, Key, Query, Timeout) ->
|
pick_call(Id, Key, Query = {_, _, QueryOpts}, Timeout) ->
|
||||||
?PICK(Id, Key, Pid, begin
|
?PICK(Id, Key, Pid, begin
|
||||||
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
|
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
|
||||||
ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
|
ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
|
||||||
|
@ -316,23 +325,23 @@ pick_call(Id, Key, Query, Timeout) ->
|
||||||
receive
|
receive
|
||||||
{MRef, Response} ->
|
{MRef, Response} ->
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
Response;
|
maybe_reply_to(Response, QueryOpts);
|
||||||
{'DOWN', MRef, process, Pid, Reason} ->
|
{'DOWN', MRef, process, Pid, Reason} ->
|
||||||
error({worker_down, Reason})
|
error({worker_down, Reason})
|
||||||
after Timeout ->
|
after Timeout ->
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
receive
|
receive
|
||||||
{MRef, Response} ->
|
{MRef, Response} ->
|
||||||
Response
|
maybe_reply_to(Response, QueryOpts)
|
||||||
after 0 ->
|
after 0 ->
|
||||||
error(timeout)
|
error(timeout)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end).
|
end).
|
||||||
|
|
||||||
pick_cast(Id, Key, Query) ->
|
pick_cast(Id, Key, Query = {query, _Request, QueryOpts}) ->
|
||||||
?PICK(Id, Key, Pid, begin
|
?PICK(Id, Key, Pid, begin
|
||||||
ReplyTo = undefined,
|
ReplyTo = maps:get(reply_to, QueryOpts, undefined),
|
||||||
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
|
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
|
||||||
ok
|
ok
|
||||||
end).
|
end).
|
||||||
|
@ -1051,9 +1060,14 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
apply_query_fun(sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, _QueryOpts) ->
|
apply_query_fun(
|
||||||
|
sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, QueryOpts
|
||||||
|
) ->
|
||||||
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
|
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
|
||||||
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request);
|
maybe_reply_to(
|
||||||
|
?APPLY_RESOURCE(call_query, Mod:on_query(Id, Request, ResSt), Request),
|
||||||
|
QueryOpts
|
||||||
|
);
|
||||||
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
|
apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, QueryOpts) ->
|
||||||
?tp(call_query_async, #{
|
?tp(call_query_async, #{
|
||||||
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
|
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
|
||||||
|
@ -1081,12 +1095,17 @@ apply_query_fun(async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, Re
|
||||||
end,
|
end,
|
||||||
Request
|
Request
|
||||||
);
|
);
|
||||||
apply_query_fun(sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, _QueryOpts) ->
|
apply_query_fun(
|
||||||
|
sync, Mod, Id, _Index, _Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts
|
||||||
|
) ->
|
||||||
?tp(call_batch_query, #{
|
?tp(call_batch_query, #{
|
||||||
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
|
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
|
||||||
}),
|
}),
|
||||||
Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
|
Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
|
||||||
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch);
|
maybe_reply_to(
|
||||||
|
?APPLY_RESOURCE(call_batch_query, Mod:on_batch_query(Id, Requests, ResSt), Batch),
|
||||||
|
QueryOpts
|
||||||
|
);
|
||||||
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, ResSt, QueryOpts) ->
|
||||||
?tp(call_batch_query_async, #{
|
?tp(call_batch_query_async, #{
|
||||||
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
|
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => async
|
||||||
|
@ -1118,6 +1137,12 @@ apply_query_fun(async, Mod, Id, Index, Ref, [?QUERY(_, _, _, _) | _] = Batch, Re
|
||||||
Batch
|
Batch
|
||||||
).
|
).
|
||||||
|
|
||||||
|
maybe_reply_to(Result, #{reply_to := ReplyTo}) ->
|
||||||
|
do_reply_caller(ReplyTo, Result),
|
||||||
|
Result;
|
||||||
|
maybe_reply_to(Result, _) ->
|
||||||
|
Result.
|
||||||
|
|
||||||
handle_async_reply(
|
handle_async_reply(
|
||||||
#{
|
#{
|
||||||
request_ref := Ref,
|
request_ref := Ref,
|
||||||
|
|
|
@ -23,7 +23,8 @@
|
||||||
-export([
|
-export([
|
||||||
apply_rule/3,
|
apply_rule/3,
|
||||||
apply_rules/3,
|
apply_rules/3,
|
||||||
clear_rule_payload/0
|
clear_rule_payload/0,
|
||||||
|
inc_action_metrics/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-import(
|
-import(
|
||||||
|
@ -323,9 +324,7 @@ handle_action_list(RuleId, Actions, Selected, Envs) ->
|
||||||
handle_action(RuleId, ActId, Selected, Envs) ->
|
handle_action(RuleId, ActId, Selected, Envs) ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
|
||||||
try
|
try
|
||||||
Result = do_handle_action(ActId, Selected, Envs),
|
do_handle_action(RuleId, ActId, Selected, Envs)
|
||||||
inc_action_metrics(Result, RuleId),
|
|
||||||
Result
|
|
||||||
catch
|
catch
|
||||||
throw:out_of_service ->
|
throw:out_of_service ->
|
||||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||||
|
@ -345,21 +344,29 @@ handle_action(RuleId, ActId, Selected, Envs) ->
|
||||||
})
|
})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_handle_action({bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
|
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target).
|
||||||
|
do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
|
||||||
?TRACE(
|
?TRACE(
|
||||||
"BRIDGE",
|
"BRIDGE",
|
||||||
"bridge_action",
|
"bridge_action",
|
||||||
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
|
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
|
||||||
),
|
),
|
||||||
case emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected) of
|
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]},
|
||||||
|
case
|
||||||
|
emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
|
||||||
|
of
|
||||||
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
|
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
|
||||||
throw(out_of_service);
|
throw(out_of_service);
|
||||||
|
?RESOURCE_ERROR_M(R, _) when ?IS_RES_DOWN(R) ->
|
||||||
|
throw(out_of_service);
|
||||||
Result ->
|
Result ->
|
||||||
Result
|
Result
|
||||||
end;
|
end;
|
||||||
do_handle_action(#{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
|
do_handle_action(RuleId, #{mod := Mod, func := Func, args := Args}, Selected, Envs) ->
|
||||||
%% the function can also throw 'out_of_service'
|
%% the function can also throw 'out_of_service'
|
||||||
Mod:Func(Selected, Envs, Args).
|
Result = Mod:Func(Selected, Envs, Args),
|
||||||
|
inc_action_metrics(RuleId, Result),
|
||||||
|
Result.
|
||||||
|
|
||||||
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
||||||
nested_get({path, Path}, may_decode_payload(Payload));
|
nested_get({path, Path}, may_decode_payload(Payload));
|
||||||
|
@ -512,14 +519,15 @@ nested_put(Alias, Val, Columns0) ->
|
||||||
Columns = handle_alias(Alias, Columns0),
|
Columns = handle_alias(Alias, Columns0),
|
||||||
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
||||||
|
|
||||||
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
|
inc_action_metrics(RuleId, Result) ->
|
||||||
inc_action_metrics({error, {recoverable_error, _}}, RuleId) ->
|
_ = do_inc_action_metrics(RuleId, Result),
|
||||||
|
Result.
|
||||||
|
|
||||||
|
do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) ->
|
||||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
|
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
|
||||||
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
|
do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) ->
|
||||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
|
|
||||||
inc_action_metrics({error, {unrecoverable_error, _}}, RuleId) ->
|
|
||||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed');
|
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed');
|
||||||
inc_action_metrics(R, RuleId) ->
|
do_inc_action_metrics(RuleId, R) ->
|
||||||
case is_ok_result(R) of
|
case is_ok_result(R) of
|
||||||
false ->
|
false ->
|
||||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||||
|
|
|
@ -19,12 +19,11 @@
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
|
||||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||||
|
|
||||||
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
|
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
|
||||||
|
@ -38,7 +37,11 @@ all() ->
|
||||||
{group, runtime},
|
{group, runtime},
|
||||||
{group, events},
|
{group, events},
|
||||||
{group, telemetry},
|
{group, telemetry},
|
||||||
{group, bugs}
|
{group, bugs},
|
||||||
|
{group, metrics},
|
||||||
|
{group, metrics_simple},
|
||||||
|
{group, metrics_fail},
|
||||||
|
{group, metrics_fail_simple}
|
||||||
].
|
].
|
||||||
|
|
||||||
suite() ->
|
suite() ->
|
||||||
|
@ -116,6 +119,22 @@ groups() ->
|
||||||
{bugs, [], [
|
{bugs, [], [
|
||||||
t_sqlparse_payload_as,
|
t_sqlparse_payload_as,
|
||||||
t_sqlparse_nested_get
|
t_sqlparse_nested_get
|
||||||
|
]},
|
||||||
|
{metrics, [], [
|
||||||
|
t_rule_metrics_sync,
|
||||||
|
t_rule_metrics_async
|
||||||
|
]},
|
||||||
|
{metrics_simple, [], [
|
||||||
|
t_rule_metrics_sync,
|
||||||
|
t_rule_metrics_async
|
||||||
|
]},
|
||||||
|
{metrics_fail, [], [
|
||||||
|
t_rule_metrics_sync_fail,
|
||||||
|
t_rule_metrics_async_fail
|
||||||
|
]},
|
||||||
|
{metrics_fail_simple, [], [
|
||||||
|
t_rule_metrics_sync_fail,
|
||||||
|
t_rule_metrics_async_fail
|
||||||
]}
|
]}
|
||||||
].
|
].
|
||||||
|
|
||||||
|
@ -128,7 +147,7 @@ init_per_suite(Config) ->
|
||||||
emqx_rule_funcs_demo:module_info(),
|
emqx_rule_funcs_demo:module_info(),
|
||||||
application:load(emqx_conf),
|
application:load(emqx_conf),
|
||||||
ok = emqx_common_test_helpers:start_apps(
|
ok = emqx_common_test_helpers:start_apps(
|
||||||
[emqx_conf, emqx_rule_engine, emqx_authz],
|
[emqx_conf, emqx_rule_engine, emqx_authz, emqx_bridge],
|
||||||
fun set_special_configs/1
|
fun set_special_configs/1
|
||||||
),
|
),
|
||||||
Config.
|
Config.
|
||||||
|
@ -160,14 +179,41 @@ on_get_resource_status(_id, _) -> #{}.
|
||||||
group(_Groupname) ->
|
group(_Groupname) ->
|
||||||
[].
|
[].
|
||||||
|
|
||||||
|
-define(BRIDGE_IMPL, emqx_bridge_mqtt_connector).
|
||||||
init_per_group(registry, Config) ->
|
init_per_group(registry, Config) ->
|
||||||
Config;
|
Config;
|
||||||
|
init_per_group(metrics_fail, Config) ->
|
||||||
|
meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
|
||||||
|
meck:expect(?BRIDGE_IMPL, on_query_async, 4, {error, {unrecoverable_error, mecked_failure}}),
|
||||||
|
[{mecked, [?BRIDGE_IMPL]} | Config];
|
||||||
|
init_per_group(metrics_simple, Config) ->
|
||||||
|
meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]),
|
||||||
|
meck:expect(?BRIDGE_IMPL, query_mode, fun
|
||||||
|
(#{resource_opts := #{query_mode := sync}}) -> simple_sync;
|
||||||
|
(_) -> simple_async
|
||||||
|
end),
|
||||||
|
[{mecked, [?BRIDGE_IMPL]} | Config];
|
||||||
|
init_per_group(metrics_fail_simple, Config) ->
|
||||||
|
meck:new(?BRIDGE_IMPL, [non_strict, no_link, passthrough]),
|
||||||
|
meck:expect(?BRIDGE_IMPL, query_mode, fun
|
||||||
|
(#{resource_opts := #{query_mode := sync}}) -> simple_sync;
|
||||||
|
(_) -> simple_async
|
||||||
|
end),
|
||||||
|
meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}),
|
||||||
|
meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {ReplyFun, Args}, _) ->
|
||||||
|
Result = {error, {unrecoverable_error, mecked_failure}},
|
||||||
|
erlang:apply(ReplyFun, Args ++ [Result]),
|
||||||
|
Result
|
||||||
|
end),
|
||||||
|
[{mecked, [?BRIDGE_IMPL]} | Config];
|
||||||
init_per_group(_Groupname, Config) ->
|
init_per_group(_Groupname, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_group(_Groupname, _Config) ->
|
end_per_group(_Groupname, Config) ->
|
||||||
ok.
|
case ?config(mecked, Config) of
|
||||||
|
undefined -> ok;
|
||||||
|
Mecked -> meck:unload(Mecked)
|
||||||
|
end.
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Testcase specific setup/teardown
|
%% Testcase specific setup/teardown
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -2822,6 +2868,114 @@ t_get_rule_ids_by_action_reference_ingress_bridge(_Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Test cases for rule metrics
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(BRIDGE_TYPE, <<"mqtt">>).
|
||||||
|
-define(BRIDGE_NAME, <<"bridge_over_troubled_water">>).
|
||||||
|
-define(BRIDGE_CONFIG(QMODE), #{
|
||||||
|
<<"server">> => <<"127.0.0.1:1883">>,
|
||||||
|
<<"username">> => <<"user1">>,
|
||||||
|
<<"password">> => <<"">>,
|
||||||
|
<<"proto_ver">> => <<"v4">>,
|
||||||
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
|
<<"egress">> =>
|
||||||
|
#{
|
||||||
|
<<"local">> =>
|
||||||
|
#{
|
||||||
|
<<"topic">> => <<"foo/#">>
|
||||||
|
},
|
||||||
|
<<"remote">> =>
|
||||||
|
#{
|
||||||
|
<<"topic">> => <<"bar/${topic}">>,
|
||||||
|
<<"payload">> => <<"${payload}">>,
|
||||||
|
<<"qos">> => <<"${qos}">>,
|
||||||
|
<<"retain">> => <<"${retain}">>
|
||||||
|
}
|
||||||
|
},
|
||||||
|
<<"resource_opts">> =>
|
||||||
|
#{
|
||||||
|
<<"health_check_interval">> => <<"5s">>,
|
||||||
|
<<"query_mode">> => QMODE,
|
||||||
|
<<"request_ttl">> => <<"3s">>,
|
||||||
|
<<"worker_pool_size">> => 1
|
||||||
|
}
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(SUCCESSS_METRICS, #{
|
||||||
|
matched := 1,
|
||||||
|
'actions.total' := 1,
|
||||||
|
'actions.failed' := 0,
|
||||||
|
'actions.success' := 1
|
||||||
|
}).
|
||||||
|
-define(FAIL_METRICS, #{
|
||||||
|
matched := 1,
|
||||||
|
'actions.total' := 1,
|
||||||
|
'actions.failed' := 1,
|
||||||
|
'actions.success' := 0
|
||||||
|
}).
|
||||||
|
|
||||||
|
t_rule_metrics_sync(_Config) ->
|
||||||
|
do_test_rule_metrics_success(<<"sync">>).
|
||||||
|
|
||||||
|
t_rule_metrics_async(_Config) ->
|
||||||
|
do_test_rule_metrics_success(<<"async">>).
|
||||||
|
|
||||||
|
t_rule_metrics_sync_fail(_Config) ->
|
||||||
|
do_test_rule_metrics_fail(<<"sync">>).
|
||||||
|
|
||||||
|
t_rule_metrics_async_fail(_Config) ->
|
||||||
|
do_test_rule_metrics_fail(<<"async">>).
|
||||||
|
|
||||||
|
do_test_rule_metrics_success(QMode) ->
|
||||||
|
?assertMatch(
|
||||||
|
?SUCCESSS_METRICS,
|
||||||
|
do_test_rule_metrics(QMode)
|
||||||
|
).
|
||||||
|
|
||||||
|
do_test_rule_metrics_fail(QMode) ->
|
||||||
|
?assertMatch(
|
||||||
|
?FAIL_METRICS,
|
||||||
|
do_test_rule_metrics(QMode)
|
||||||
|
).
|
||||||
|
|
||||||
|
do_test_rule_metrics(QMode) ->
|
||||||
|
BridgeId = create_bridge(?BRIDGE_TYPE, ?BRIDGE_NAME, ?BRIDGE_CONFIG(QMode)),
|
||||||
|
RuleId = <<"rule:test_metrics_bridge_action">>,
|
||||||
|
{ok, #{id := RuleId}} =
|
||||||
|
emqx_rule_engine:create_rule(
|
||||||
|
#{
|
||||||
|
id => RuleId,
|
||||||
|
sql => <<"SELECT * FROM \"topic/#\"">>,
|
||||||
|
actions => [BridgeId]
|
||||||
|
}
|
||||||
|
),
|
||||||
|
timer:sleep(100),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
matched := 0,
|
||||||
|
'actions.total' := 0,
|
||||||
|
'actions.failed' := 0,
|
||||||
|
'actions.success' := 0
|
||||||
|
},
|
||||||
|
emqx_metrics_worker:get_counters(rule_metrics, RuleId)
|
||||||
|
),
|
||||||
|
MsgId = emqx_guid:gen(),
|
||||||
|
emqx:publish(#message{id = MsgId, topic = <<"topic/test">>, payload = <<"hello">>}),
|
||||||
|
timer:sleep(100),
|
||||||
|
on_exit(
|
||||||
|
fun() ->
|
||||||
|
emqx_rule_engine:delete_rule(RuleId),
|
||||||
|
emqx_bridge:remove(?BRIDGE_TYPE, ?BRIDGE_NAME)
|
||||||
|
end
|
||||||
|
),
|
||||||
|
emqx_metrics_worker:get_counters(rule_metrics, RuleId).
|
||||||
|
|
||||||
|
create_bridge(Type, Name, Config) ->
|
||||||
|
{ok, _Bridge} = emqx_bridge:create(Type, Name, Config),
|
||||||
|
emqx_bridge_resource:bridge_id(Type, Name).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal helpers
|
%% Internal helpers
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Rule metrics for async mode bridges will set failure counters correctly now.
|
Loading…
Reference in New Issue