diff --git a/apps/emqx/include/emqx_trace.hrl b/apps/emqx/include/emqx_trace.hrl index 5c50fa706..476794223 100644 --- a/apps/emqx/include/emqx_trace.hrl +++ b/apps/emqx/include/emqx_trace.hrl @@ -20,9 +20,14 @@ -record(?TRACE, { name :: binary() | undefined | '_', - type :: clientid | topic | ip_address | undefined | '_', + type :: clientid | topic | ip_address | ruleid | undefined | '_', filter :: - emqx_types:topic() | emqx_types:clientid() | emqx_trace:ip_address() | undefined | '_', + emqx_types:topic() + | emqx_types:clientid() + | emqx_trace:ip_address() + | emqx_trace:ruleid() + | undefined + | '_', enable = true :: boolean() | '_', payload_encode = text :: hex | text | hidden | '_', extra = #{} :: map() | '_', diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index f3a5be084..8151c19b5 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -66,6 +66,9 @@ -export_type([ip_address/0]). -type ip_address() :: string(). +-export_type([ruleid/0]). +-type ruleid() :: binary(). + publish(#message{topic = <<"$SYS/", _/binary>>}) -> ignore; publish(#message{from = From, topic = Topic, payload = Payload}) when @@ -517,6 +520,9 @@ to_trace(#{type := ip_address, ip_address := Filter} = Trace, Rec) -> Error -> Error end; +to_trace(#{type := ruleid, ruleid := Filter} = Trace, Rec) -> + Trace0 = maps:without([type, ruleid], Trace), + to_trace(Trace0, Rec#?TRACE{type = ruleid, filter = Filter}); to_trace(#{type := Type}, _Rec) -> {error, io_lib:format("required ~s field", [Type])}; to_trace(#{payload_encode := PayloadEncode} = Trace, Rec) -> diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index 313826cde..3af543013 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -33,6 +33,7 @@ %% For logger handler filters callbacks -export([ + filter_ruleid/2, filter_clientid/2, filter_topic/2, filter_ip_address/2 @@ -133,6 +134,12 @@ uninstall(HandlerId) -> running() -> lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)). +-spec filter_ruleid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. +filter_ruleid(#{meta := Meta = #{ruleid := RuleId}} = Log, {MatchId, _Name}) -> + filter_ret(RuleId =:= MatchId andalso is_trace(Meta), Log); +filter_ruleid(_Log, _ExpectId) -> + stop. + -spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop. filter_clientid(#{meta := Meta = #{clientid := ClientId}} = Log, {MatchId, _Name}) -> filter_ret(ClientId =:= MatchId andalso is_trace(Meta), Log); @@ -164,7 +171,9 @@ filters(#{type := clientid, filter := Filter, name := Name}) -> filters(#{type := topic, filter := Filter, name := Name}) -> [{topic, {fun ?MODULE:filter_topic/2, {ensure_bin(Filter), Name}}}]; filters(#{type := ip_address, filter := Filter, name := Name}) -> - [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}]. + [{ip_address, {fun ?MODULE:filter_ip_address/2, {ensure_list(Filter), Name}}}]; +filters(#{type := ruleid, filter := Filter, name := Name}) -> + [{ruleid, {fun ?MODULE:filter_ruleid/2, {ensure_bin(Filter), Name}}}]. formatter(#{type := _Type, payload_encode := PayloadEncode}) -> {emqx_trace_formatter, #{ @@ -184,7 +193,8 @@ filter_traces(#{id := Id, level := Level, dst := Dst, filters := Filters}, Acc) [{Type, {FilterFun, {Filter, Name}}}] when Type =:= topic orelse Type =:= clientid orelse - Type =:= ip_address + Type =:= ip_address orelse + Type =:= ruleid -> [Init#{type => Type, filter => Filter, name => Name, filter_fun => FilterFun} | Acc]; _ -> diff --git a/apps/emqx/test/emqx_trace_handler_SUITE.erl b/apps/emqx/test/emqx_trace_handler_SUITE.erl index 59a472f3e..85a9c056b 100644 --- a/apps/emqx/test/emqx_trace_handler_SUITE.erl +++ b/apps/emqx/test/emqx_trace_handler_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("snabbkaffe/include/test_macros.hrl"). -include_lib("common_test/include/ct.hrl"). -define(CLIENT, [ @@ -29,11 +30,12 @@ {password, <<"pass">>} ]). -all() -> [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8]. +all() -> + [t_trace_clientid, t_trace_topic, t_trace_ip_address, t_trace_clientid_utf8, t_trace_rule_id]. init_per_suite(Config) -> Apps = emqx_cth_suite:start( - [emqx], + [emqx, emqx_rule_engine], #{work_dir => emqx_cth_suite:work_dir(Config)} ), [{apps, Apps} | Config]. @@ -205,6 +207,79 @@ t_trace_topic(_Config) -> ?assertEqual([], emqx_trace_handler:running()), emqtt:disconnect(T). +create_rule(Name, SQL) -> + Rule = emqx_rule_engine_SUITE:make_simple_rule(Name, SQL), + {ok, _} = emqx_rule_engine:create_rule(Rule). + +t_trace_rule_id(_Config) -> + %% Start MQTT Client + {ok, T} = emqtt:start_link(?CLIENT), + emqtt:connect(T), + %% Create rules + create_rule( + <<"test_rule_id_1">>, + <<"select 1 as rule_number from \"rule_1_topic\"">> + ), + create_rule( + <<"test_rule_id_2">>, + <<"select 2 as rule_number from \"rule_2_topic\"">> + ), + %% Start tracing + ok = emqx_trace_handler:install( + "CLI-RULE-1", ruleid, <<"test_rule_id_1">>, all, "tmp/rule_trace_1.log" + ), + ok = emqx_trace_handler:install( + "CLI-RULE-2", ruleid, <<"test_rule_id_2">>, all, "tmp/rule_trace_2.log" + ), + emqx_trace:check(), + ok = filesync("CLI-RULE-1", ruleid), + ok = filesync("CLI-RULE-2", ruleid), + + %% Verify the tracing file exits + ?assert(filelib:is_regular("tmp/rule_trace_1.log")), + ?assert(filelib:is_regular("tmp/rule_trace_2.log")), + + %% Get current traces + ?assertMatch( + [ + #{ + type := ruleid, + filter := <<"test_rule_id_1">>, + level := debug, + dst := "tmp/rule_trace_1.log", + name := <<"CLI-RULE-1">> + }, + #{ + type := ruleid, + filter := <<"test_rule_id_2">>, + name := <<"CLI-RULE-2">>, + level := debug, + dst := "tmp/rule_trace_2.log" + } + ], + emqx_trace_handler:running() + ), + + %% Trigger rule + emqtt:publish(T, <<"rule_1_topic">>, <<"my_traced_message">>), + ?retry( + 100, + 5, + begin + ok = filesync("CLI-RULE-1", ruleid), + {ok, Bin} = file:read_file("tmp/rule_trace_1.log"), + ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_traced_message">>])) + end + ), + ok = filesync("CLI-RULE-2", ruleid), + ?assert(filelib:file_size("tmp/rule_trace_2.log") =:= 0), + + %% Stop tracing + ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-1">>), + ok = emqx_trace_handler:uninstall(ruleid, <<"CLI-RULE-2">>), + ?assertEqual([], emqx_trace_handler:running()), + emqtt:disconnect(T). + t_trace_ip_address(_Config) -> {ok, T} = emqtt:start_link(?CLIENT), emqtt:connect(T), @@ -272,11 +347,11 @@ t_trace_ip_address(_Config) -> filesync(Name, Type) -> ct:sleep(50), - filesync(Name, Type, 3). + filesync(Name, Type, 5). %% sometime the handler process is not started yet. -filesync(_Name, _Type, 0) -> - ok; +filesync(Name, Type, 0) -> + ct:fail("Handler process not started ~p ~p", [Name, Type]); filesync(Name0, Type, Retry) -> Name = case is_binary(Name0) of diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index 5cdbc65ff..19edc229d 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -222,7 +222,7 @@ fields(trace) -> )}, {type, hoconsc:mk( - hoconsc:enum([clientid, topic, ip_address]), + hoconsc:enum([clientid, topic, ip_address, ruleid]), #{ description => ?DESC(filter_type), required => true, @@ -257,6 +257,15 @@ fields(trace) -> example => <<"127.0.0.1">> } )}, + {ruleid, + hoconsc:mk( + binary(), + #{ + description => ?DESC(ruleid_field), + required => false, + example => <<"my_rule">> + } + )}, {status, hoconsc:mk( hoconsc:enum([running, stopped, waiting]), diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index cb93bc9d6..ef7b5a191 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -122,6 +122,56 @@ t_http_test(_Config) -> unload(), ok. +t_http_test_rule_trace(_Config) -> + emqx_trace:clear(), + load(), + %% create + Name = atom_to_binary(?FUNCTION_NAME), + Trace = [ + {<<"name">>, Name}, + {<<"type">>, <<"ruleid">>}, + {<<"ruleid">>, Name} + ], + + {ok, Create} = request_api(post, api_path("trace"), Trace), + ?assertMatch(#{<<"name">> := Name}, json(Create)), + + {ok, List} = request_api(get, api_path("trace")), + [Data] = json(List), + ?assertEqual(Name, maps:get(<<"name">>, Data)), + + %% update + {ok, Update} = request_api(put, api_path(iolist_to_binary(["trace/", Name, "/stop"])), #{}), + ?assertEqual( + #{ + <<"enable">> => false, + <<"name">> => Name + }, + json(Update) + ), + {ok, List1} = request_api(get, api_path("trace")), + [Data1] = json(List1), + Node = atom_to_binary(node()), + ?assertMatch( + #{ + <<"status">> := <<"stopped">>, + <<"name">> := Name, + <<"log_size">> := #{Node := _}, + <<"start_at">> := _, + <<"end_at">> := _, + <<"type">> := <<"ruleid">>, + <<"ruleid">> := Name + }, + Data1 + ), + + %% delete + {ok, Delete} = request_api(delete, api_path(["trace/", Name])), + ?assertEqual(<<>>, Delete), + + unload(), + ok. + t_create_failed(_Config) -> load(), Trace = [{<<"type">>, <<"topic">>}, {<<"topic">>, <<"/x/y/z">>}], @@ -252,13 +302,16 @@ t_log_file(_Config) -> ok. create_trace(Name, ClientId, Start) -> + create_trace(Name, clientid, ClientId, Start). + +create_trace(Name, Type, TypeValue, Start) -> ?check_trace( #{timetrap => 900}, begin {ok, _} = emqx_trace:create([ {<<"name">>, Name}, - {<<"type">>, clientid}, - {<<"clientid">>, ClientId}, + {<<"type">>, Type}, + {atom_to_binary(Type), TypeValue}, {<<"start_at">>, Start} ]), ?block_until(#{?snk_kind := update_trace_done}) @@ -268,6 +321,16 @@ create_trace(Name, ClientId, Start) -> end ). +create_rule_trace(RuleId) -> + Now = erlang:system_time(second), + emqx_mgmt_api_trace_SUITE:create_trace(atom_to_binary(?FUNCTION_NAME), ruleid, RuleId, Now - 2). + +t_create_rule_trace(_Config) -> + load(), + create_rule_trace(atom_to_binary(?FUNCTION_NAME)), + unload(), + ok. + t_stream_log(_Config) -> emqx_trace:clear(), load(), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index f51908772..9a307e2c3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -69,6 +69,9 @@ apply_rule_discard_result(Rule, Columns, Envs) -> ok. apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> + ?TRACE("APPLY_RULE", "rule_activated", #{ + ruleid => RuleID, input => Columns, environment => Envs + }), ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'), clear_rule_payload(), try diff --git a/rel/i18n/emqx_mgmt_api_trace.hocon b/rel/i18n/emqx_mgmt_api_trace.hocon index 67462ab43..ba07d7d53 100644 --- a/rel/i18n/emqx_mgmt_api_trace.hocon +++ b/rel/i18n/emqx_mgmt_api_trace.hocon @@ -80,6 +80,11 @@ client_ip_addess.desc: client_ip_addess.label: """Client IP Address""" +ruleid.desc: +"""Specify the Rule ID if the trace type is 'ruleid'.""" +ruleid.label: +"""Rule ID""" + trace_status.desc: """trace status""" trace_status.label: