feat(rule tracing): add rule trigger time meta data field

Fixes:
https://emqx.atlassian.net/browse/EMQX-12025
This commit is contained in:
Kjell Winblad 2024-04-22 13:51:48 +02:00
parent f9eda1883f
commit e9d498dde2
3 changed files with 54 additions and 14 deletions

View File

@ -1163,6 +1163,7 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
%% Get the rule ids from requests %% Get the rule ids from requests
RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests), RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests),
ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests), ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests),
RuleTriggerTimes = lists:foldl(fun collect_rule_trigger_times/2, [], Requests),
StopAfterRenderVal = StopAfterRenderVal =
case Requests of case Requests of
%% We know that the batch is not mixed since we prevent this by %% We know that the batch is not mixed since we prevent this by
@ -1173,7 +1174,10 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
false false
end, end,
logger:update_process_metadata(#{ logger:update_process_metadata(#{
rule_ids => RuleIDs, client_ids => ClientIDs, stop_action_after_render => StopAfterRenderVal rule_ids => RuleIDs,
client_ids => ClientIDs,
rule_trigger_times => RuleTriggerTimes,
stop_action_after_render => StopAfterRenderVal
}), }),
ok; ok;
set_rule_id_trace_meta_data(Request) -> set_rule_id_trace_meta_data(Request) ->
@ -1190,9 +1194,17 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) ->
collect_client_id(?QUERY(_, _, _, _, _), Acc) -> collect_client_id(?QUERY(_, _, _, _, _), Acc) ->
Acc. Acc.
collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_time := Time}), Acc) ->
[Time | Acc];
collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) ->
Acc.
unset_rule_id_trace_meta_data() -> unset_rule_id_trace_meta_data() ->
logger:update_process_metadata(#{ logger:update_process_metadata(#{
rule_ids => #{}, client_ids => #{}, stop_action_after_render => false rule_ids => #{},
client_ids => #{},
stop_action_after_render => false,
rule_trigger_times => []
}). }).
%% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1 %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1

View File

@ -139,25 +139,35 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
reset_process_trace_metadata(Columns) reset_process_trace_metadata(Columns)
end. end.
set_process_trace_metadata(RuleID, #{clientid := ClientID}) -> set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) ->
logger:update_process_metadata(#{
clientid => ClientID
}),
set_process_trace_metadata(RuleID, maps:remove(clientid, Columns));
set_process_trace_metadata(RuleID, Columns) ->
EventTimestamp =
case Columns of
#{timestamp := Timestamp} ->
Timestamp;
_ ->
erlang:system_time(millisecond)
end,
logger:update_process_metadata(#{ logger:update_process_metadata(#{
rule_id => RuleID, rule_id => RuleID,
clientid => ClientID rule_trigger_time => EventTimestamp
});
set_process_trace_metadata(RuleID, _) ->
logger:update_process_metadata(#{
rule_id => RuleID
}). }).
reset_process_trace_metadata(#{clientid := _ClientID}) -> reset_process_trace_metadata(#{clientid := _ClientID}) ->
Meta = logger:get_process_metadata(), Meta = logger:get_process_metadata(),
Meta1 = maps:remove(clientid, Meta), Meta1 = maps:remove(clientid, Meta),
Meta2 = maps:remove(rule_id, Meta1), Meta2 = maps:remove(rule_id, Meta1),
logger:set_process_metadata(Meta2); Meta3 = maps:remove(rule_trigger_time, Meta2),
logger:set_process_metadata(Meta3);
reset_process_trace_metadata(_) -> reset_process_trace_metadata(_) ->
Meta = logger:get_process_metadata(), Meta = logger:get_process_metadata(),
Meta1 = maps:remove(rule_id, Meta), Meta1 = maps:remove(rule_id, Meta),
logger:set_process_metadata(Meta1). Meta2 = maps:remove(rule_trigger_time, Meta1),
logger:set_process_metadata(Meta2).
do_apply_rule( do_apply_rule(
#{ #{
@ -499,21 +509,25 @@ do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta
case TraceMeta of case TraceMeta of
#{ #{
rule_id := RuleID, rule_id := RuleID,
clientid := ClientID clientid := ClientID,
rule_trigger_time := Timestamp
} -> } ->
#{ #{
rule_id => RuleID, rule_id => RuleID,
clientid => ClientID, clientid => ClientID,
action_id => Action, action_id => Action,
stop_action_after_render => StopAfterRender stop_action_after_render => StopAfterRender,
rule_trigger_time => Timestamp
}; };
#{ #{
rule_id := RuleID rule_id := RuleID,
rule_trigger_time := Timestamp
} -> } ->
#{ #{
rule_id => RuleID, rule_id => RuleID,
action_id => Action, action_id => Action,
stop_action_after_render => StopAfterRender stop_action_after_render => StopAfterRender,
rule_trigger_time => Timestamp
} }
end. end.

View File

@ -159,6 +159,20 @@ basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
end end
) )
end, end,
%% Check that rule_trigger_time meta field is present in all log entries
Log0 = read_rule_trace_file(TraceName, TraceType, Now),
Log1 = binary:split(Log0, <<"\n">>, [global, trim]),
Log2 = lists:join(<<",\n">>, Log1),
Log3 = iolist_to_binary(["[", Log2, "]"]),
{ok, LogEntries} = emqx_utils_json:safe_decode(Log3, [return_maps]),
[#{<<"meta">> := #{<<"rule_trigger_time">> := RuleTriggerTime}} | _] = LogEntries,
[
?assert(
(maps:get(<<"rule_trigger_time">>, Meta, no_time) =:= RuleTriggerTime) orelse
(lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_times">>, Meta, [])))
)
|| #{<<"meta">> := Meta} <- LogEntries
],
emqx_trace:delete(TraceName), emqx_trace:delete(TraceName),
ok. ok.