From e9d498dde2aae4181678f9e9052730f0de40c94f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 22 Apr 2024 13:51:48 +0200 Subject: [PATCH] feat(rule tracing): add rule trigger time meta data field Fixes: https://emqx.atlassian.net/browse/EMQX-12025 --- .../src/emqx_resource_buffer_worker.erl | 16 +++++++- .../src/emqx_rule_runtime.erl | 38 +++++++++++++------ .../emqx_rule_engine_api_rule_apply_SUITE.erl | 14 +++++++ 3 files changed, 54 insertions(+), 14 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 1cbcfe0b8..e35453c94 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1163,6 +1163,7 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> %% Get the rule ids from requests RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests), ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests), + RuleTriggerTimes = lists:foldl(fun collect_rule_trigger_times/2, [], Requests), StopAfterRenderVal = case Requests of %% We know that the batch is not mixed since we prevent this by @@ -1173,7 +1174,10 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> false end, logger:update_process_metadata(#{ - rule_ids => RuleIDs, client_ids => ClientIDs, stop_action_after_render => StopAfterRenderVal + rule_ids => RuleIDs, + client_ids => ClientIDs, + rule_trigger_times => RuleTriggerTimes, + stop_action_after_render => StopAfterRenderVal }), ok; set_rule_id_trace_meta_data(Request) -> @@ -1190,9 +1194,17 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) -> collect_client_id(?QUERY(_, _, _, _, _), Acc) -> Acc. +collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_time := Time}), Acc) -> + [Time | Acc]; +collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) -> + Acc. + unset_rule_id_trace_meta_data() -> logger:update_process_metadata(#{ - rule_ids => #{}, client_ids => #{}, stop_action_after_render => false + rule_ids => #{}, + client_ids => #{}, + stop_action_after_render => false, + rule_trigger_times => [] }). %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1 diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 3872fb973..f99341a9b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -139,25 +139,35 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> reset_process_trace_metadata(Columns) end. -set_process_trace_metadata(RuleID, #{clientid := ClientID}) -> +set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) -> + logger:update_process_metadata(#{ + clientid => ClientID + }), + set_process_trace_metadata(RuleID, maps:remove(clientid, Columns)); +set_process_trace_metadata(RuleID, Columns) -> + EventTimestamp = + case Columns of + #{timestamp := Timestamp} -> + Timestamp; + _ -> + erlang:system_time(millisecond) + end, logger:update_process_metadata(#{ rule_id => RuleID, - clientid => ClientID - }); -set_process_trace_metadata(RuleID, _) -> - logger:update_process_metadata(#{ - rule_id => RuleID + rule_trigger_time => EventTimestamp }). reset_process_trace_metadata(#{clientid := _ClientID}) -> Meta = logger:get_process_metadata(), Meta1 = maps:remove(clientid, Meta), Meta2 = maps:remove(rule_id, Meta1), - logger:set_process_metadata(Meta2); + Meta3 = maps:remove(rule_trigger_time, Meta2), + logger:set_process_metadata(Meta3); reset_process_trace_metadata(_) -> Meta = logger:get_process_metadata(), Meta1 = maps:remove(rule_id, Meta), - logger:set_process_metadata(Meta1). + Meta2 = maps:remove(rule_trigger_time, Meta1), + logger:set_process_metadata(Meta2). do_apply_rule( #{ @@ -499,21 +509,25 @@ do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta case TraceMeta of #{ rule_id := RuleID, - clientid := ClientID + clientid := ClientID, + rule_trigger_time := Timestamp } -> #{ rule_id => RuleID, clientid => ClientID, action_id => Action, - stop_action_after_render => StopAfterRender + stop_action_after_render => StopAfterRender, + rule_trigger_time => Timestamp }; #{ - rule_id := RuleID + rule_id := RuleID, + rule_trigger_time := Timestamp } -> #{ rule_id => RuleID, action_id => Action, - stop_action_after_render => StopAfterRender + stop_action_after_render => StopAfterRender, + rule_trigger_time => Timestamp } end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index b1e533d31..52fa1a2e5 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -159,6 +159,20 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) -> end ) end, + %% Check that rule_trigger_time meta field is present in all log entries + Log0 = read_rule_trace_file(TraceName, TraceType, Now), + Log1 = binary:split(Log0, <<"\n">>, [global, trim]), + Log2 = lists:join(<<",\n">>, Log1), + Log3 = iolist_to_binary(["[", Log2, "]"]), + {ok, LogEntries} = emqx_utils_json:safe_decode(Log3, [return_maps]), + [#{<<"meta">> := #{<<"rule_trigger_time">> := RuleTriggerTime}} | _] = LogEntries, + [ + ?assert( + (maps:get(<<"rule_trigger_time">>, Meta, no_time) =:= RuleTriggerTime) orelse + (lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_times">>, Meta, []))) + ) + || #{<<"meta">> := Meta} <- LogEntries + ], emqx_trace:delete(TraceName), ok.