diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 990a4286f..285bc57b6 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1192,21 +1192,25 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests), RuleTriggerTimes0 = lists:foldl(fun collect_rule_trigger_times/2, [], Requests), RuleTriggerTimes = lists:flatten(RuleTriggerTimes0), - StopAfterRenderVal = + TraceMetadata = case Requests of %% We know that the batch is not mixed since we prevent this by %% using a stop_after function in the replayq:pop call [?QUERY(_, _, _, _, #{stop_action_after_render := true}) | _] -> - true; + #{ + rule_ids => RuleIDs, + client_ids => ClientIDs, + rule_trigger_ts => RuleTriggerTimes, + stop_action_after_render => true + }; [?QUERY(_, _, _, _, _TraceCTX) | _] -> - false + #{ + rule_ids => RuleIDs, + client_ids => ClientIDs, + rule_trigger_ts => RuleTriggerTimes + } end, - logger:update_process_metadata(#{ - rule_ids => RuleIDs, - client_ids => ClientIDs, - rule_trigger_ts => RuleTriggerTimes, - stop_action_after_render => StopAfterRenderVal - }), + logger:update_process_metadata(TraceMetadata), ok; set_rule_id_trace_meta_data(Request) -> set_rule_id_trace_meta_data([Request]), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index b49d3a084..25c5e846b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -538,30 +538,40 @@ do_handle_action_get_trace_inc_metrics_context(RuleID, Action) -> end. do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta) -> - StopAfterRender = maps:get(stop_action_after_render, TraceMeta, false), + StopAfterRenderMap = + case maps:get(stop_action_after_render, TraceMeta, false) of + false -> + #{}; + true -> + #{stop_action_after_render => true} + end, case TraceMeta of #{ rule_id := RuleID, clientid := ClientID, rule_trigger_ts := Timestamp } -> - #{ - rule_id => RuleID, - clientid => ClientID, - action_id => Action, - stop_action_after_render => StopAfterRender, - rule_trigger_ts => Timestamp - }; + maps:merge( + #{ + rule_id => RuleID, + clientid => ClientID, + action_id => Action, + rule_trigger_ts => Timestamp + }, + StopAfterRenderMap + ); #{ rule_id := RuleID, rule_trigger_ts := Timestamp } -> - #{ - rule_id => RuleID, - action_id => Action, - stop_action_after_render => StopAfterRender, - rule_trigger_ts => Timestamp - } + maps:merge( + #{ + rule_id => RuleID, + action_id => Action, + rule_trigger_ts => Timestamp + }, + StopAfterRenderMap + ) end. action_info({bridge, BridgeType, BridgeName, _ResId}) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index c9548f8b9..a7a464842 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -263,7 +263,6 @@ do_final_log_check(Action, Bin0) when is_binary(Action) -> }, <<"rule_id">> := _, <<"rule_trigger_ts">> := _, - <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, <<"msg">> := <<"action_success">>, @@ -357,9 +356,10 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) -> ok; CheckBatchesFunRec(CurCount) -> receive - [{_, #{<<"stop_after_render">> := StopValue}} | _] = List -> + [{_, FirstMsg} | _] = List -> + StopValue = maps:get(<<"stop_after_render">>, FirstMsg, false), [ - ?assertMatch(#{<<"stop_after_render">> := StopValue}, Msg) + ?assertEqual(StopValue, maps:get(<<"stop_after_render">>, Msg, false)) || {_, Msg} <- List ], Len = length(List), @@ -420,7 +420,6 @@ t_apply_rule_test_format_action_failed(_Config) -> <<"reason">> := <<"MY REASON">>, <<"rule_id">> := _, <<"rule_trigger_ts">> := _, - <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, <<"msg">> := <<"action_failed">>, @@ -492,7 +491,6 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"reason">> := <<"request_expired">>, <<"rule_id">> := _, <<"rule_trigger_ts">> := _, - <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, <<"msg">> := <<"action_failed">>, @@ -518,7 +516,6 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> }, <<"rule_id">> := _, <<"rule_trigger_ts">> := _, - <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ERROR">> }, <<"msg">> := SendErrorMsg,