diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index c610df76c..990a4286f 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1190,7 +1190,8 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> %% Get the rule ids from requests RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests), ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests), - RuleTriggerTimes = lists:foldl(fun collect_rule_trigger_times/2, [], Requests), + RuleTriggerTimes0 = lists:foldl(fun collect_rule_trigger_times/2, [], Requests), + RuleTriggerTimes = lists:flatten(RuleTriggerTimes0), StopAfterRenderVal = case Requests of %% We know that the batch is not mixed since we prevent this by @@ -1203,7 +1204,7 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> logger:update_process_metadata(#{ rule_ids => RuleIDs, client_ids => ClientIDs, - rule_trigger_times => RuleTriggerTimes, + rule_trigger_ts => RuleTriggerTimes, stop_action_after_render => StopAfterRenderVal }), ok; @@ -1221,18 +1222,29 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) -> collect_client_id(?QUERY(_, _, _, _, _), Acc) -> Acc. -collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_time := Time}), Acc) -> +collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_ts := Time}), Acc) -> [Time | Acc]; collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) -> Acc. unset_rule_id_trace_meta_data() -> - logger:update_process_metadata(#{ - rule_ids => #{}, - client_ids => #{}, - stop_action_after_render => false, - rule_trigger_times => [] - }). + case logger:get_process_metadata() of + undefined -> + ok; + OldLoggerProcessMetadata -> + NewLoggerProcessMetadata = + maps:without( + [ + rule_ids, + client_ids, + stop_action_after_render, + rule_trigger_ts + ], + OldLoggerProcessMetadata + ), + logger:set_process_metadata(NewLoggerProcessMetadata), + ok + end. %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1 extract_connector_id(Id) when is_binary(Id) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 01baf1ed2..b49d3a084 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -144,12 +144,12 @@ set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) -> logger:update_process_metadata(#{ clientid => ClientID, rule_id => RuleID, - rule_trigger_time => rule_trigger_time(Columns) + rule_trigger_ts => [rule_trigger_time(Columns)] }); set_process_trace_metadata(RuleID, Columns) -> logger:update_process_metadata(#{ rule_id => RuleID, - rule_trigger_time => rule_trigger_time(Columns) + rule_trigger_ts => [rule_trigger_time(Columns)] }). rule_trigger_time(Columns) -> @@ -161,16 +161,26 @@ rule_trigger_time(Columns) -> end. reset_process_trace_metadata(#{clientid := _ClientID}) -> - Meta = logger:get_process_metadata(), - Meta1 = maps:remove(clientid, Meta), - Meta2 = maps:remove(rule_id, Meta1), - Meta3 = maps:remove(rule_trigger_time, Meta2), - logger:set_process_metadata(Meta3); + Meta0 = logger:get_process_metadata(), + Meta1 = maps:without( + [ + clientid, + rule_id, + rule_trigger_ts + ], + Meta0 + ), + logger:set_process_metadata(Meta1); reset_process_trace_metadata(_) -> - Meta = logger:get_process_metadata(), - Meta1 = maps:remove(rule_id, Meta), - Meta2 = maps:remove(rule_trigger_time, Meta1), - logger:set_process_metadata(Meta2). + Meta0 = logger:get_process_metadata(), + Meta1 = maps:without( + [ + rule_id, + rule_trigger_ts + ], + Meta0 + ), + logger:set_process_metadata(Meta1). do_apply_rule( #{ @@ -533,24 +543,24 @@ do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta #{ rule_id := RuleID, clientid := ClientID, - rule_trigger_time := Timestamp + rule_trigger_ts := Timestamp } -> #{ rule_id => RuleID, clientid => ClientID, action_id => Action, stop_action_after_render => StopAfterRender, - rule_trigger_time => Timestamp + rule_trigger_ts => Timestamp }; #{ rule_id := RuleID, - rule_trigger_time := Timestamp + rule_trigger_ts := Timestamp } -> #{ rule_id => RuleID, action_id => Action, stop_action_after_render => StopAfterRender, - rule_trigger_time => Timestamp + rule_trigger_ts => Timestamp } end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index b9fb1a0a3..c9548f8b9 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -216,18 +216,15 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> end ) end, - %% Check that rule_trigger_time meta field is present in all log entries + %% Check that rule_trigger_ts meta field is present in all log entries Log0 = read_rule_trace_file(TraceName, TraceType, Now), Log1 = binary:split(Log0, <<"\n">>, [global, trim]), Log2 = lists:join(<<",\n">>, Log1), Log3 = iolist_to_binary(["[", Log2, "]"]), {ok, LogEntries} = emqx_utils_json:safe_decode(Log3, [return_maps]), - [#{<<"meta">> := #{<<"rule_trigger_time">> := RuleTriggerTime}} | _] = LogEntries, + [#{<<"meta">> := #{<<"rule_trigger_ts">> := [RuleTriggerTime]}} | _] = LogEntries, [ - ?assert( - (maps:get(<<"rule_trigger_time">>, Meta, no_time) =:= RuleTriggerTime) orelse - (lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_times">>, Meta, []))) - ) + ?assert(lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_ts">>, Meta, []))) || #{<<"meta">> := Meta} <- LogEntries ], ok. @@ -265,7 +262,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) -> <<"result">> := <<"ok">> }, <<"rule_id">> := _, - <<"rule_trigger_time">> := _, + <<"rule_trigger_ts">> := _, <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, @@ -422,7 +419,7 @@ t_apply_rule_test_format_action_failed(_Config) -> <<"clientid">> := _, <<"reason">> := <<"MY REASON">>, <<"rule_id">> := _, - <<"rule_trigger_time">> := _, + <<"rule_trigger_ts">> := _, <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, @@ -433,8 +430,7 @@ t_apply_rule_test_format_action_failed(_Config) -> ), MetaMap = maps:get(<<"meta">>, LastEntryJSON), ?assert(not maps:is_key(<<"client_ids">>, MetaMap)), - ?assert(not maps:is_key(<<"rule_ids">>, MetaMap)), - ?assert(not maps:is_key(<<"rule_trigger_times">>, MetaMap)) + ?assert(not maps:is_key(<<"rule_ids">>, MetaMap)) end, do_apply_rule_test_format_action_failed_test(1, CheckFun). @@ -495,7 +491,7 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"clientid">> := _, <<"reason">> := <<"request_expired">>, <<"rule_id">> := _, - <<"rule_trigger_time">> := _, + <<"rule_trigger_ts">> := _, <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, @@ -512,7 +508,6 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"level">> := <<"debug">>, <<"meta">> := #{ - <<"client_ids">> := [], <<"clientid">> := _, <<"id">> := _, <<"reason">> := @@ -522,9 +517,7 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"msg">> := <<"MY_RECOVERABLE_REASON">> }, <<"rule_id">> := _, - <<"rule_ids">> := [], - <<"rule_trigger_time">> := _, - <<"rule_trigger_times">> := [], + <<"rule_trigger_ts">> := _, <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ERROR">> }, @@ -532,7 +525,10 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"time">> := _ }, ReasonEntryJSON - ) + ), + MetaMap = maps:get(<<"meta">>, ReasonEntryJSON), + ?assert(not maps:is_key(<<"client_ids">>, MetaMap)), + ?assert(not maps:is_key(<<"rule_ids">>, MetaMap)) end. meck_test_connector_recoverable_errors(Reason) ->