feat(rule trace): add support for ruleid as a trace type

This commit is contained in:
Kjell Winblad 2024-03-27 11:11:16 +01:00
parent 069cd4fbb4
commit 59a442cdb5
8 changed files with 188 additions and 12 deletions

View File

@ -20,9 +20,14 @@
-record(?TRACE, {
name :: binary() | undefined | '_',
type :: clientid | topic | ip_address | undefined | '_',
type :: clientid | topic | ip_address | ruleid | undefined | '_',
filter ::
emqx_types:topic() | emqx_types:clientid() | emqx_trace:ip_address() | undefined | '_',
emqx_types:topic()
| emqx_types:clientid()
| emqx_trace:ip_address()
| emqx_trace:ruleid()
| undefined
| '_',
enable = true :: boolean() | '_',
payload_encode = text :: hex | text | hidden | '_',
extra = #{} :: map() | '_',

View File

@ -66,6 +66,9 @@
-export_type([ip_address/0]).
-type ip_address() :: string().
-export_type([ruleid/0]).
-type ruleid() :: binary().
publish(#message{topic = <<"$SYS/", _/binary>>}) ->
ignore;
publish(#message{from = From, topic = Topic, payload = Payload}) when
@ -517,6 +520,9 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) ->
Error ->
Error
end;
to_trace(#{type := ruleid, ruleid := Filter} = Trace, Rec) ->
Trace0 = maps:without([type, ruleid], Trace),
to_trace(Trace0, Rec#?TRACE{type = ruleid, filter = Filter});
to_trace(#{type := Type}, _Rec) ->
{error, io_lib:format("required ~s field", [Type])};
to_trace(#{payload_encode := PayloadEncode} = Trace, Rec) ->

View File

@ -33,6 +33,7 @@
%% For logger handler filters callbacks
-export([
filter_ruleid/2,
filter_clientid/2,
filter_topic/2,
filter_ip_address/2
@ -133,6 +134,12 @@ uninstall(HandlerId) ->
running() ->
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
-spec filter_ruleid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
filter_ruleid(#{meta := Meta = #{ruleid := RuleId}} = Log, {MatchId, _Name}) ->
filter_ret(RuleId =:= MatchId andalso is_trace(Meta), Log);
filter_ruleid(_Log, _ExpectId) ->
stop.
-spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
filter_clientid(#{meta := Meta = #{clientid := ClientId}} = Log, {MatchId, _Name}) ->
filter_ret(ClientId =:= MatchId andalso is_trace(Meta), Log);
@ -164,7 +171,9 @@ filters(#{type := clientid, filter := Filter, name := Name}) ->
filters(#{type := topic, filter := Filter, name := Name}) ->
[{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}];
filters(#{type := ip_address, filter := Filter, name := Name}) ->
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}].
[{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}];
filters(#{type := ruleid, filter := Filter, name := Name}) ->
[{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}].
formatter(#{type := _Type, payload_encode := PayloadEncode}) ->
{emqx_trace_formatter, #{
@ -184,7 +193,8 @@ filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc)
[{Type, {FilterFun, {Filter, Name}}}] when
Type =:= topic orelse
Type =:= clientid orelse
Type =:= ip_address
Type =:= ip_address orelse
Type =:= ruleid
->
[Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc];
_ ->

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/test_macros.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CLIENT, [
@ -29,11 +30,12 @@
{password, <<"pass">>}
]).
all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8].
all() ->
[t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8, t_trace_rule_id].
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[emqx],
[emqx, emqx_rule_engine],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[{apps, Apps} | Config].
@ -205,6 +207,79 @@ t_trace_topic(_Config) ->
?assertEqual([], emqx_trace_handler:running()),
emqtt:disconnect(T).
create_rule(Name, SQL) ->
Rule = emqx_rule_engine_SUITE:make_simple_rule(Name, SQL),
{ok, _} = emqx_rule_engine:create_rule(Rule).
t_trace_rule_id(_Config) ->
%% Start MQTT Client
{ok, T} = emqtt:start_link(?CLIENT),
emqtt:connect(T),
%% Create rules
create_rule(
<<"test_rule_id_1">>,
<<"select 1 as rule_number from \"rule_1_topic\"">>
),
create_rule(
<<"test_rule_id_2">>,
<<"select 2 as rule_number from \"rule_2_topic\"">>
),
%% Start tracing
ok = emqx_trace_handler:install(
"CLI-RULE-1", ruleid, <<"test_rule_id_1">>, all, "tmp/rule_trace_1.log"
),
ok = emqx_trace_handler:install(
"CLI-RULE-2", ruleid, <<"test_rule_id_2">>, all, "tmp/rule_trace_2.log"
),
emqx_trace:check(),
ok = filesync("CLI-RULE-1", ruleid),
ok = filesync("CLI-RULE-2", ruleid),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/rule_trace_1.log")),
?assert(filelib:is_regular("tmp/rule_trace_2.log")),
%% Get current traces
?assertMatch(
[
#{
type := ruleid,
filter := <<"test_rule_id_1">>,
level := debug,
dst := "tmp/rule_trace_1.log",
name := <<"CLI-RULE-1">>
},
#{
type := ruleid,
filter := <<"test_rule_id_2">>,
name := <<"CLI-RULE-2">>,
level := debug,
dst := "tmp/rule_trace_2.log"
}
],
emqx_trace_handler:running()
),
%% Trigger rule
emqtt:publish(T, <<"rule_1_topic">>, <<"my_traced_message">>),
?retry(
100,
5,
begin
ok = filesync("CLI-RULE-1", ruleid),
{ok, Bin} = file:read_file("tmp/rule_trace_1.log"),
?assertNotEqual(nomatch, binary:match(Bin, [<<"my_traced_message">>]))
end
),
ok = filesync("CLI-RULE-2", ruleid),
?assert(filelib:file_size("tmp/rule_trace_2.log") =:= 0),
%% Stop tracing
ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-1">>),
ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-2">>),
?assertEqual([], emqx_trace_handler:running()),
emqtt:disconnect(T).
t_trace_ip_address(_Config) ->
{ok, T} = emqtt:start_link(?CLIENT),
emqtt:connect(T),
@ -272,11 +347,11 @@ t_trace_ip_address(_Config) ->
filesync(Name, Type) ->
ct:sleep(50),
filesync(Name, Type, 3).
filesync(Name, Type, 5).
%% sometime the handler process is not started yet.
filesync(_Name, _Type, 0) ->
ok;
filesync(Name, Type, 0) ->
ct:fail("Handler process not started ~p ~p", [Name, Type]);
filesync(Name0, Type, Retry) ->
Name =
case is_binary(Name0) of

View File

@ -222,7 +222,7 @@ fields(trace) ->
)},
{type,
hoconsc:mk(
hoconsc:enum([clientid, topic, ip_address]),
hoconsc:enum([clientid, topic, ip_address, ruleid]),
#{
description => ?DESC(filter_type),
required => true,
@ -257,6 +257,15 @@ fields(trace) ->
example => <<"127.0.0.1">>
}
)},
{ruleid,
hoconsc:mk(
binary(),
#{
description => ?DESC(ruleid_field),
required => false,
example => <<"my_rule">>
}
)},
{status,
hoconsc:mk(
hoconsc:enum([running, stopped, waiting]),

View File

@ -122,6 +122,56 @@ t_http_test(_Config) ->
unload(),
ok.
t_http_test_rule_trace(_Config) ->
emqx_trace:clear(),
load(),
%% create
Name = atom_to_binary(?FUNCTION_NAME),
Trace = [
{<<"name">>, Name},
{<<"type">>, <<"ruleid">>},
{<<"ruleid">>, Name}
],
{ok, Create} = request_api(post, api_path("trace"), Trace),
?assertMatch(#{<<"name">> := Name}, json(Create)),
{ok, List} = request_api(get, api_path("trace")),
[Data] = json(List),
?assertEqual(Name, maps:get(<<"name">>, Data)),
%% update
{ok, Update} = request_api(put, api_path(iolist_to_binary(["trace/", Name, "/stop"])), #{}),
?assertEqual(
#{
<<"enable">> => false,
<<"name">> => Name
},
json(Update)
),
{ok, List1} = request_api(get, api_path("trace")),
[Data1] = json(List1),
Node = atom_to_binary(node()),
?assertMatch(
#{
<<"status">> := <<"stopped">>,
<<"name">> := Name,
<<"log_size">> := #{Node := _},
<<"start_at">> := _,
<<"end_at">> := _,
<<"type">> := <<"ruleid">>,
<<"ruleid">> := Name
},
Data1
),
%% delete
{ok, Delete} = request_api(delete, api_path(["trace/", Name])),
?assertEqual(<<>>, Delete),
unload(),
ok.
t_create_failed(_Config) ->
load(),
Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}],
@ -252,13 +302,16 @@ t_log_file(_Config) ->
ok.
create_trace(Name, ClientId, Start) ->
create_trace(Name, clientid, ClientId, Start).
create_trace(Name, Type, TypeValue, Start) ->
?check_trace(
#{timetrap => 900},
begin
{ok, _} = emqx_trace:create([
{<<"name">>, Name},
{<<"type">>, clientid},
{<<"clientid">>, ClientId},
{<<"type">>, Type},
{atom_to_binary(Type), TypeValue},
{<<"start_at">>, Start}
]),
?block_until(#{?snk_kind := update_trace_done})
@ -268,6 +321,16 @@ create_trace(Name, ClientId, Start) ->
end
).
create_rule_trace(RuleId) ->
Now = erlang:system_time(second),
emqx_mgmt_api_trace_SUITE:create_trace(atom_to_binary(?FUNCTION_NAME), ruleid, RuleId, Now - 2).
t_create_rule_trace(_Config) ->
load(),
create_rule_trace(atom_to_binary(?FUNCTION_NAME)),
unload(),
ok.
t_stream_log(_Config) ->
emqx_trace:clear(),
load(),

View File

@ -69,6 +69,9 @@ apply_rule_discard_result(Rule, Columns, Envs) ->
ok.
apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
?TRACE("APPLY_RULE", "rule_activated", #{
ruleid => RuleID, input => Columns, environment => Envs
}),
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'),
clear_rule_payload(),
try

View File

@ -80,6 +80,11 @@ client_ip_addess.desc:
client_ip_addess.label:
"""Client IP Address"""
ruleid.desc:
"""Specify the Rule ID if the trace type is 'ruleid'."""
ruleid.label:
"""Rule ID"""
trace_status.desc:
"""trace status"""
trace_status.label: