From c72682d81b175194490ddbae691c0553bc0e8799 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 15 May 2024 14:32:15 +0200 Subject: [PATCH 01/11] fix: remove query mode from redis action trace as it only supports sync Fixes: https://emqx.atlassian.net/browse/EMQX-12336 --- apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl index 28f7c6f8e..535d6e13c 100644 --- a/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl +++ b/apps/emqx_bridge_redis/src/emqx_bridge_redis_connector.erl @@ -119,7 +119,7 @@ on_query( ), emqx_trace:rendered_action_template( MessageTag, - #{command => Cmd, batch => false, mode => sync} + #{command => Cmd, batch => false} ), Result = query(InstId, {cmd, Cmd}, RedisConnSt), ?tp( @@ -143,7 +143,7 @@ on_batch_query( [{ChannelID, _} | _] = BatchData, emqx_trace:rendered_action_template( ChannelID, - #{commands => Cmds, batch => ture, mode => sync} + #{commands => Cmds, batch => ture} ), Result = query(InstId, {cmds, Cmds}, RedisConnSt), ?tp( From 246bce85eca6fb14ec174028f987de99875022e0 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 15 May 2024 14:49:33 +0200 Subject: [PATCH 02/11] fix: make mysql action rendered trace include parameters When doing non-batch inserts with the MySQL action, prepared statements are used. We therefore need to include the parameters to the prepared statement when tracing the rendered templates. Fixes: https://emqx.atlassian.net/browse/EMQX-12335 --- apps/emqx_mysql/src/emqx_mysql.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index 3ad2fb564..6311d66f2 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -507,7 +507,13 @@ on_sql_query( LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?TRACE("QUERY", "mysql_connector_received", LogMeta), ChannelID = maps:get(channel_id, State, no_channel), - emqx_trace:rendered_action_template(ChannelID, #{sql => SQLOrKey}), + emqx_trace:rendered_action_template( + ChannelID, + #{ + sql_or_key => SQLOrKey, + parameters => Params + } + ), Worker = ecpool:get_client(PoolName), case ecpool_worker:client(Worker) of {ok, Conn} -> From 9fd8e930be9bcf5df8446c2e086a1a37143dee87 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 16 May 2024 11:19:48 +0200 Subject: [PATCH 03/11] fix(best_effort_json): only do tuple list to map without losing pairs --- apps/emqx/src/emqx_logger_jsonfmt.erl | 11 +++++++ .../test/emqx_mgmt_api_trace_SUITE.erl | 29 +++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/apps/emqx/src/emqx_logger_jsonfmt.erl b/apps/emqx/src/emqx_logger_jsonfmt.erl index 776c8f753..8c760f96c 100644 --- a/apps/emqx/src/emqx_logger_jsonfmt.erl +++ b/apps/emqx/src/emqx_logger_jsonfmt.erl @@ -219,6 +219,8 @@ best_effort_unicode(Input, Config) -> best_effort_json_obj(List, Config) when is_list(List) -> try + %% We should only do this if there are no duplicated keys + check_no_dup_tuple_list(List), json_obj(maps:from_list(List), Config) catch _:_ -> @@ -232,6 +234,15 @@ best_effort_json_obj(Map, Config) -> do_format_msg("~p", [Map], Config) end. +check_no_dup_tuple_list(List) -> + %% Crash if this is not a tuple list + lists:foreach(fun({_, _}) -> ok end, List), + Items = [K || {K, _} <- List], + NumberOfItems = length(Items), + %% Crash if there are duplicates + NumberOfItems = maps:size(maps:from_keys(Items, true)), + ok. + json(A, _) when is_atom(A) -> A; json(I, _) when is_integer(I) -> I; json(F, _) when is_float(F) -> F; diff --git a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl index 2a3e1d18a..ce29a67f6 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_trace_SUITE.erl @@ -271,6 +271,16 @@ t_http_test_json_formatter(_Config) -> }), %% We should handle report style logging ?SLOG(error, #{msg => "recursive_republish_detected"}, #{topic => Topic}), + ?TRACE("CUSTOM", "my_log_msg", #{ + topic => Topic, + %% This will be converted to map + map_key => [{a, a}, {b, b}] + }), + ?TRACE("CUSTOM", "my_log_msg", #{ + topic => Topic, + %% We should not convert this to a map as we will lose information + map_key => [{a, a}, {a, b}] + }), ok = emqx_trace_handler_SUITE:filesync(Name, topic), {ok, _Detail2} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/log_detail")), {ok, Bin} = request_api(get, api_path("trace/" ++ binary_to_list(Name) ++ "/download")), @@ -425,6 +435,25 @@ t_http_test_json_formatter(_Config) -> }, NextFun() ), + ?assertMatch( + #{ + <<"meta">> := #{ + <<"map_key">> := #{ + <<"a">> := <<"a">>, + <<"b">> := <<"b">> + } + } + }, + NextFun() + ), + ?assertMatch( + #{ + <<"meta">> := #{ + <<"map_key">> := [_, _] + } + }, + NextFun() + ), {ok, Delete} = request_api(delete, api_path("trace/" ++ binary_to_list(Name))), ?assertEqual(<<>>, Delete), From 413ad60bdbc436a1190cbcbe34380592f0465b64 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 16 May 2024 11:32:14 +0200 Subject: [PATCH 04/11] fix(cassandra): format rendered trace in a better way Fixes: https://emqx.atlassian.net/browse/EMQX-12393 --- .../src/emqx_bridge_cassandra_connector.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index 87da71449..df278b791 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -12,6 +12,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx/include/emqx_trace.hrl"). %% schema -export([roots/0, fields/1, desc/1, namespace/0]). @@ -273,11 +274,14 @@ do_batch_query(InstId, Requests, Async, #{pool_name := PoolName} = State) -> _ -> none end, emqx_trace:rendered_action_template(ChannelID, #{ - cqls => CQLs + cqls => #emqx_trace_format_func_data{data = CQLs, function = fun trace_format_cql_tuples/1} }), Res = exec_cql_batch_query(InstId, PoolName, Async, CQLs), handle_result(Res). +trace_format_cql_tuples(Tuples) -> + [CQL || {_, CQL} <- Tuples]. + parse_request_to_cql({query, CQL}) -> {query, CQL, #{}}; parse_request_to_cql({query, CQL, Params}) -> From b7c2f4a6d704c35ad9ca8d7b37c8228d622f066f Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 16 May 2024 15:40:38 +0200 Subject: [PATCH 05/11] fix(best_effor_json): make tuple list check more efficient --- apps/emqx/src/emqx_logger_jsonfmt.erl | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/apps/emqx/src/emqx_logger_jsonfmt.erl b/apps/emqx/src/emqx_logger_jsonfmt.erl index 8c760f96c..3f56bb5a4 100644 --- a/apps/emqx/src/emqx_logger_jsonfmt.erl +++ b/apps/emqx/src/emqx_logger_jsonfmt.erl @@ -219,9 +219,7 @@ best_effort_unicode(Input, Config) -> best_effort_json_obj(List, Config) when is_list(List) -> try - %% We should only do this if there are no duplicated keys - check_no_dup_tuple_list(List), - json_obj(maps:from_list(List), Config) + json_obj(convert_tuple_list_to_map(List), Config) catch _:_ -> [json(I, Config) || I <- List] @@ -234,14 +232,15 @@ best_effort_json_obj(Map, Config) -> do_format_msg("~p", [Map], Config) end. -check_no_dup_tuple_list(List) -> +%% This function will throw if the list do not only contain tuples or if there +%% are duplicate keys. +convert_tuple_list_to_map(List) -> %% Crash if this is not a tuple list - lists:foreach(fun({_, _}) -> ok end, List), - Items = [K || {K, _} <- List], - NumberOfItems = length(Items), + CandidateMap = maps:from_list(List), %% Crash if there are duplicates - NumberOfItems = maps:size(maps:from_keys(Items, true)), - ok. + NumberOfItems = length(List), + NumberOfItems = maps:size(CandidateMap), + CandidateMap. json(A, _) when is_atom(A) -> A; json(I, _) when is_integer(I) -> I; From 520e91c8fd9c3cc5770d9d852f6212adeaa5a808 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 16 May 2024 16:50:09 +0200 Subject: [PATCH 06/11] fix(greptime trace): remove async info as it is confusing Fixes: https://emqx.atlassian.net/browse/EMQX-12385 --- .../src/emqx_bridge_greptimedb_connector.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 963f0efd0..1cd808e46 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -422,7 +422,7 @@ is_auth_key(_) -> %% ------------------------------------------------------------------------------------------------- %% Query do_query(InstId, Channel, Client, Points) -> - emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => false}), + emqx_trace:rendered_action_template(Channel, #{points => Points}), case greptimedb:write_batch(Client, Points) of {ok, #{response := {affected_rows, #{value := Rows}}}} -> ?SLOG(debug, #{ @@ -465,7 +465,7 @@ do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) -> connector => InstId, points => Points }), - emqx_trace:rendered_action_template(Channel, #{points => Points, is_async => true}), + emqx_trace:rendered_action_template(Channel, #{points => Points}), WrappedReplyFunAndArgs = {fun ?MODULE:reply_callback/2, [ReplyFunAndArgs]}, ok = greptimedb:async_write_batch(Client, Points, WrappedReplyFunAndArgs). From ed16e678bef79068aa3e2df5973d1107a8833cd4 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 16 May 2024 17:31:44 +0200 Subject: [PATCH 07/11] fix(rule apply test): add default values to context Fixes: https://emqx.atlassian.net/browse/EMQX-12378 --- apps/emqx_rule_engine/src/emqx_rule_sqltester.erl | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 6d393c24a..2236057e9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -52,7 +52,8 @@ do_apply_rule( do_apply_matched_rule( Rule, Context, - StopAfterRender + StopAfterRender, + EventTopics ); false -> {error, nomatch} @@ -61,17 +62,21 @@ do_apply_rule( case lists:member(InTopic, EventTopics) of true -> %% the rule is for both publish and events, test it directly - do_apply_matched_rule(Rule, Context, StopAfterRender); + do_apply_matched_rule(Rule, Context, StopAfterRender, EventTopics); false -> {error, nomatch} end end. -do_apply_matched_rule(Rule, Context, StopAfterRender) -> +do_apply_matched_rule(Rule, Context, StopAfterRender, EventTopics) -> update_process_trace_metadata(StopAfterRender), + FullContext = fill_default_values( + hd(EventTopics), + emqx_rule_maps:atom_key_map(Context) + ), ApplyRuleRes = emqx_rule_runtime:apply_rule( Rule, - Context, + FullContext, apply_rule_environment() ), reset_trace_process_metadata(StopAfterRender), @@ -99,6 +104,7 @@ apply_rule_environment() -> #{}. -spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}. test(#{sql := Sql, context := Context}) -> + x:show(context, Context), case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> InTopic = get_in_topic(Context), From 2209b26fa5a4014e8ca6f463bd3be00b9a442fd5 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 16 May 2024 18:41:08 +0200 Subject: [PATCH 08/11] fix(rule trace): do not leak trace meta data when tracing action result Fixes: https://emqx.atlassian.net/browse/EMQX-12391 --- apps/emqx_rule_engine/src/emqx_rule_runtime.erl | 15 ++++++++++++++- apps/emqx_rule_engine/src/emqx_rule_sqltester.erl | 1 - .../emqx_rule_engine_api_rule_apply_SUITE.erl | 9 +++++---- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 1429561a7..01baf1ed2 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -740,7 +740,20 @@ nested_put(Alias, Val, Columns0) -> emqx_rule_maps:nested_put(Alias, Val, Columns). inc_action_metrics(TraceCtx, Result) -> - _ = do_inc_action_metrics(TraceCtx, Result), + SavedMetaData = logger:get_process_metadata(), + try + %% To not pollute the trace we temporary remove the process meta data + logger:unset_process_metadata(), + _ = do_inc_action_metrics(TraceCtx, Result) + after + %% Setting process metadata to undefined yields an error + case SavedMetaData of + undefined -> + ok; + _ -> + logger:set_process_metadata(SavedMetaData) + end + end, Result. do_inc_action_metrics( diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 2236057e9..449b8f52d 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -104,7 +104,6 @@ apply_rule_environment() -> #{}. -spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}. test(#{sql := Sql, context := Context}) -> - x:show(context, Context), case emqx_rule_sqlparser:parse(Sql) of {ok, Select} -> InTopic = get_in_topic(Context), diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index 9f03a7bda..b9fb1a0a3 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -419,13 +419,10 @@ t_apply_rule_test_format_action_failed(_Config) -> <<"name">> := _, <<"type">> := <<"rule_engine_test">> }, - <<"client_ids">> := [], <<"clientid">> := _, <<"reason">> := <<"MY REASON">>, <<"rule_id">> := _, - <<"rule_ids">> := [], <<"rule_trigger_time">> := _, - <<"rule_trigger_times">> := [], <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, @@ -433,7 +430,11 @@ t_apply_rule_test_format_action_failed(_Config) -> <<"time">> := _ }, LastEntryJSON - ) + ), + MetaMap = maps:get(<<"meta">>, LastEntryJSON), + ?assert(not maps:is_key(<<"client_ids">>, MetaMap)), + ?assert(not maps:is_key(<<"rule_ids">>, MetaMap)), + ?assert(not maps:is_key(<<"rule_trigger_times">>, MetaMap)) end, do_apply_rule_test_format_action_failed_test(1, CheckFun). From 2e6db85578421d12c136c380e5687c3a5905e53d Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 17 May 2024 14:13:36 +0200 Subject: [PATCH 09/11] fix(rule trace): rename rule_trigger_time(s) and cleaups This commit renames trace fields rule_trigger_time and rule_trigger_times to rule_trigger_ts and makes sure that the value for rule_trigger_ts will always be a list of timestamps. --- .../src/emqx_resource_buffer_worker.erl | 30 +++++++++----- .../src/emqx_rule_runtime.erl | 40 ++++++++++++------- .../emqx_rule_engine_api_rule_apply_SUITE.erl | 28 ++++++------- 3 files changed, 58 insertions(+), 40 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index c610df76c..990a4286f 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1190,7 +1190,8 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> %% Get the rule ids from requests RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests), ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests), - RuleTriggerTimes = lists:foldl(fun collect_rule_trigger_times/2, [], Requests), + RuleTriggerTimes0 = lists:foldl(fun collect_rule_trigger_times/2, [], Requests), + RuleTriggerTimes = lists:flatten(RuleTriggerTimes0), StopAfterRenderVal = case Requests of %% We know that the batch is not mixed since we prevent this by @@ -1203,7 +1204,7 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> logger:update_process_metadata(#{ rule_ids => RuleIDs, client_ids => ClientIDs, - rule_trigger_times => RuleTriggerTimes, + rule_trigger_ts => RuleTriggerTimes, stop_action_after_render => StopAfterRenderVal }), ok; @@ -1221,18 +1222,29 @@ collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) -> collect_client_id(?QUERY(_, _, _, _, _), Acc) -> Acc. -collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_time := Time}), Acc) -> +collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_ts := Time}), Acc) -> [Time | Acc]; collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) -> Acc. unset_rule_id_trace_meta_data() -> - logger:update_process_metadata(#{ - rule_ids => #{}, - client_ids => #{}, - stop_action_after_render => false, - rule_trigger_times => [] - }). + case logger:get_process_metadata() of + undefined -> + ok; + OldLoggerProcessMetadata -> + NewLoggerProcessMetadata = + maps:without( + [ + rule_ids, + client_ids, + stop_action_after_render, + rule_trigger_ts + ], + OldLoggerProcessMetadata + ), + logger:set_process_metadata(NewLoggerProcessMetadata), + ok + end. %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1 extract_connector_id(Id) when is_binary(Id) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 01baf1ed2..b49d3a084 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -144,12 +144,12 @@ set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) -> logger:update_process_metadata(#{ clientid => ClientID, rule_id => RuleID, - rule_trigger_time => rule_trigger_time(Columns) + rule_trigger_ts => [rule_trigger_time(Columns)] }); set_process_trace_metadata(RuleID, Columns) -> logger:update_process_metadata(#{ rule_id => RuleID, - rule_trigger_time => rule_trigger_time(Columns) + rule_trigger_ts => [rule_trigger_time(Columns)] }). rule_trigger_time(Columns) -> @@ -161,16 +161,26 @@ rule_trigger_time(Columns) -> end. reset_process_trace_metadata(#{clientid := _ClientID}) -> - Meta = logger:get_process_metadata(), - Meta1 = maps:remove(clientid, Meta), - Meta2 = maps:remove(rule_id, Meta1), - Meta3 = maps:remove(rule_trigger_time, Meta2), - logger:set_process_metadata(Meta3); + Meta0 = logger:get_process_metadata(), + Meta1 = maps:without( + [ + clientid, + rule_id, + rule_trigger_ts + ], + Meta0 + ), + logger:set_process_metadata(Meta1); reset_process_trace_metadata(_) -> - Meta = logger:get_process_metadata(), - Meta1 = maps:remove(rule_id, Meta), - Meta2 = maps:remove(rule_trigger_time, Meta1), - logger:set_process_metadata(Meta2). + Meta0 = logger:get_process_metadata(), + Meta1 = maps:without( + [ + rule_id, + rule_trigger_ts + ], + Meta0 + ), + logger:set_process_metadata(Meta1). do_apply_rule( #{ @@ -533,24 +543,24 @@ do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta #{ rule_id := RuleID, clientid := ClientID, - rule_trigger_time := Timestamp + rule_trigger_ts := Timestamp } -> #{ rule_id => RuleID, clientid => ClientID, action_id => Action, stop_action_after_render => StopAfterRender, - rule_trigger_time => Timestamp + rule_trigger_ts => Timestamp }; #{ rule_id := RuleID, - rule_trigger_time := Timestamp + rule_trigger_ts := Timestamp } -> #{ rule_id => RuleID, action_id => Action, stop_action_after_render => StopAfterRender, - rule_trigger_time => Timestamp + rule_trigger_ts => Timestamp } end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index b9fb1a0a3..c9548f8b9 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -216,18 +216,15 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> end ) end, - %% Check that rule_trigger_time meta field is present in all log entries + %% Check that rule_trigger_ts meta field is present in all log entries Log0 = read_rule_trace_file(TraceName, TraceType, Now), Log1 = binary:split(Log0, <<"\n">>, [global, trim]), Log2 = lists:join(<<",\n">>, Log1), Log3 = iolist_to_binary(["[", Log2, "]"]), {ok, LogEntries} = emqx_utils_json:safe_decode(Log3, [return_maps]), - [#{<<"meta">> := #{<<"rule_trigger_time">> := RuleTriggerTime}} | _] = LogEntries, + [#{<<"meta">> := #{<<"rule_trigger_ts">> := [RuleTriggerTime]}} | _] = LogEntries, [ - ?assert( - (maps:get(<<"rule_trigger_time">>, Meta, no_time) =:= RuleTriggerTime) orelse - (lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_times">>, Meta, []))) - ) + ?assert(lists:member(RuleTriggerTime, maps:get(<<"rule_trigger_ts">>, Meta, []))) || #{<<"meta">> := Meta} <- LogEntries ], ok. @@ -265,7 +262,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) -> <<"result">> := <<"ok">> }, <<"rule_id">> := _, - <<"rule_trigger_time">> := _, + <<"rule_trigger_ts">> := _, <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, @@ -422,7 +419,7 @@ t_apply_rule_test_format_action_failed(_Config) -> <<"clientid">> := _, <<"reason">> := <<"MY REASON">>, <<"rule_id">> := _, - <<"rule_trigger_time">> := _, + <<"rule_trigger_ts">> := _, <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, @@ -433,8 +430,7 @@ t_apply_rule_test_format_action_failed(_Config) -> ), MetaMap = maps:get(<<"meta">>, LastEntryJSON), ?assert(not maps:is_key(<<"client_ids">>, MetaMap)), - ?assert(not maps:is_key(<<"rule_ids">>, MetaMap)), - ?assert(not maps:is_key(<<"rule_trigger_times">>, MetaMap)) + ?assert(not maps:is_key(<<"rule_ids">>, MetaMap)) end, do_apply_rule_test_format_action_failed_test(1, CheckFun). @@ -495,7 +491,7 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"clientid">> := _, <<"reason">> := <<"request_expired">>, <<"rule_id">> := _, - <<"rule_trigger_time">> := _, + <<"rule_trigger_ts">> := _, <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, @@ -512,7 +508,6 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"level">> := <<"debug">>, <<"meta">> := #{ - <<"client_ids">> := [], <<"clientid">> := _, <<"id">> := _, <<"reason">> := @@ -522,9 +517,7 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"msg">> := <<"MY_RECOVERABLE_REASON">> }, <<"rule_id">> := _, - <<"rule_ids">> := [], - <<"rule_trigger_time">> := _, - <<"rule_trigger_times">> := [], + <<"rule_trigger_ts">> := _, <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ERROR">> }, @@ -532,7 +525,10 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"time">> := _ }, ReasonEntryJSON - ) + ), + MetaMap = maps:get(<<"meta">>, ReasonEntryJSON), + ?assert(not maps:is_key(<<"client_ids">>, MetaMap)), + ?assert(not maps:is_key(<<"rule_ids">>, MetaMap)) end. meck_test_connector_recoverable_errors(Reason) -> From 5ce095f30ee6302bee6adf5ad603c43c500579f2 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 17 May 2024 14:53:14 +0200 Subject: [PATCH 10/11] fix(rule trace): only include stop_after_rendering when value is true --- .../src/emqx_resource_buffer_worker.erl | 22 ++++++----- .../src/emqx_rule_runtime.erl | 38 ++++++++++++------- .../emqx_rule_engine_api_rule_apply_SUITE.erl | 9 ++--- 3 files changed, 40 insertions(+), 29 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 990a4286f..285bc57b6 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1192,21 +1192,25 @@ set_rule_id_trace_meta_data(Requests) when is_list(Requests) -> ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests), RuleTriggerTimes0 = lists:foldl(fun collect_rule_trigger_times/2, [], Requests), RuleTriggerTimes = lists:flatten(RuleTriggerTimes0), - StopAfterRenderVal = + TraceMetadata = case Requests of %% We know that the batch is not mixed since we prevent this by %% using a stop_after function in the replayq:pop call [?QUERY(_, _, _, _, #{stop_action_after_render := true}) | _] -> - true; + #{ + rule_ids => RuleIDs, + client_ids => ClientIDs, + rule_trigger_ts => RuleTriggerTimes, + stop_action_after_render => true + }; [?QUERY(_, _, _, _, _TraceCTX) | _] -> - false + #{ + rule_ids => RuleIDs, + client_ids => ClientIDs, + rule_trigger_ts => RuleTriggerTimes + } end, - logger:update_process_metadata(#{ - rule_ids => RuleIDs, - client_ids => ClientIDs, - rule_trigger_ts => RuleTriggerTimes, - stop_action_after_render => StopAfterRenderVal - }), + logger:update_process_metadata(TraceMetadata), ok; set_rule_id_trace_meta_data(Request) -> set_rule_id_trace_meta_data([Request]), diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index b49d3a084..25c5e846b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -538,30 +538,40 @@ do_handle_action_get_trace_inc_metrics_context(RuleID, Action) -> end. do_handle_action_get_trace_inc_metrics_context_unconditionally(Action, TraceMeta) -> - StopAfterRender = maps:get(stop_action_after_render, TraceMeta, false), + StopAfterRenderMap = + case maps:get(stop_action_after_render, TraceMeta, false) of + false -> + #{}; + true -> + #{stop_action_after_render => true} + end, case TraceMeta of #{ rule_id := RuleID, clientid := ClientID, rule_trigger_ts := Timestamp } -> - #{ - rule_id => RuleID, - clientid => ClientID, - action_id => Action, - stop_action_after_render => StopAfterRender, - rule_trigger_ts => Timestamp - }; + maps:merge( + #{ + rule_id => RuleID, + clientid => ClientID, + action_id => Action, + rule_trigger_ts => Timestamp + }, + StopAfterRenderMap + ); #{ rule_id := RuleID, rule_trigger_ts := Timestamp } -> - #{ - rule_id => RuleID, - action_id => Action, - stop_action_after_render => StopAfterRender, - rule_trigger_ts => Timestamp - } + maps:merge( + #{ + rule_id => RuleID, + action_id => Action, + rule_trigger_ts => Timestamp + }, + StopAfterRenderMap + ) end. action_info({bridge, BridgeType, BridgeName, _ResId}) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index c9548f8b9..a7a464842 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -263,7 +263,6 @@ do_final_log_check(Action, Bin0) when is_binary(Action) -> }, <<"rule_id">> := _, <<"rule_trigger_ts">> := _, - <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, <<"msg">> := <<"action_success">>, @@ -357,9 +356,10 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) -> ok; CheckBatchesFunRec(CurCount) -> receive - [{_, #{<<"stop_after_render">> := StopValue}} | _] = List -> + [{_, FirstMsg} | _] = List -> + StopValue = maps:get(<<"stop_after_render">>, FirstMsg, false), [ - ?assertMatch(#{<<"stop_after_render">> := StopValue}, Msg) + ?assertEqual(StopValue, maps:get(<<"stop_after_render">>, Msg, false)) || {_, Msg} <- List ], Len = length(List), @@ -420,7 +420,6 @@ t_apply_rule_test_format_action_failed(_Config) -> <<"reason">> := <<"MY REASON">>, <<"rule_id">> := _, <<"rule_trigger_ts">> := _, - <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, <<"msg">> := <<"action_failed">>, @@ -492,7 +491,6 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> <<"reason">> := <<"request_expired">>, <<"rule_id">> := _, <<"rule_trigger_ts">> := _, - <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ACTION">> }, <<"msg">> := <<"action_failed">>, @@ -518,7 +516,6 @@ out_of_service_check_fun(SendErrorMsg, Reason) -> }, <<"rule_id">> := _, <<"rule_trigger_ts">> := _, - <<"stop_action_after_render">> := false, <<"trace_tag">> := <<"ERROR">> }, <<"msg">> := SendErrorMsg, From 3c87bcde46f3827bc7bbbe5294f1f0c181869193 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 20 May 2024 10:57:15 +0200 Subject: [PATCH 11/11] fix(rule trace): restore logger metadata to its previous value --- .../src/emqx_resource_buffer_worker.erl | 27 ++++--------- .../src/emqx_rule_runtime.erl | 30 ++++----------- .../src/emqx_rule_sqltester.erl | 38 ++++++++++--------- 3 files changed, 34 insertions(+), 61 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 285bc57b6..5f269c112 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1174,12 +1174,13 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) -> {ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} -> {error, {unrecoverable_error, unhealthy_target}}; {ok, _Group, Resource} -> + PrevLoggerProcessMetadata = logger:get_process_metadata(), QueryResult = try set_rule_id_trace_meta_data(Query), do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource) after - unset_rule_id_trace_meta_data() + reset_logger_process_metadata(PrevLoggerProcessMetadata) end, QueryResult; {error, not_found} -> @@ -1216,6 +1217,11 @@ set_rule_id_trace_meta_data(Request) -> set_rule_id_trace_meta_data([Request]), ok. +reset_logger_process_metadata(undefined = _PrevProcessMetadata) -> + logger:unset_process_metadata(); +reset_logger_process_metadata(PrevProcessMetadata) -> + logger:set_process_metadata(PrevProcessMetadata). + collect_rule_id(?QUERY(_, _, _, _, #{rule_id := RuleId}), Acc) -> Acc#{RuleId => true}; collect_rule_id(?QUERY(_, _, _, _, _), Acc) -> @@ -1231,25 +1237,6 @@ collect_rule_trigger_times(?QUERY(_, _, _, _, #{rule_trigger_ts := Time}), Acc) collect_rule_trigger_times(?QUERY(_, _, _, _, _), Acc) -> Acc. -unset_rule_id_trace_meta_data() -> - case logger:get_process_metadata() of - undefined -> - ok; - OldLoggerProcessMetadata -> - NewLoggerProcessMetadata = - maps:without( - [ - rule_ids, - client_ids, - stop_action_after_render, - rule_trigger_ts - ], - OldLoggerProcessMetadata - ), - logger:set_process_metadata(NewLoggerProcessMetadata), - ok - end. - %% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1 extract_connector_id(Id) when is_binary(Id) -> case binary:split(Id, <<":">>, [global]) of diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 25c5e846b..003a2b5a3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -70,6 +70,7 @@ apply_rule_discard_result(Rule, Columns, Envs) -> ok. apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> + PrevProcessMetadata = logger:get_process_metadata(), set_process_trace_metadata(RuleID, Columns), trace_rule_sql( "rule_activated", @@ -137,7 +138,7 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) -> ), {error, {Error, StkTrace}} after - reset_process_trace_metadata(Columns) + reset_logger_process_metadata(PrevProcessMetadata) end. set_process_trace_metadata(RuleID, #{clientid := ClientID} = Columns) -> @@ -152,6 +153,11 @@ set_process_trace_metadata(RuleID, Columns) -> rule_trigger_ts => [rule_trigger_time(Columns)] }). +reset_logger_process_metadata(undefined = _PrevProcessMetadata) -> + logger:unset_process_metadata(); +reset_logger_process_metadata(PrevProcessMetadata) -> + logger:set_process_metadata(PrevProcessMetadata). + rule_trigger_time(Columns) -> case Columns of #{timestamp := Timestamp} -> @@ -160,28 +166,6 @@ rule_trigger_time(Columns) -> erlang:system_time(millisecond) end. -reset_process_trace_metadata(#{clientid := _ClientID}) -> - Meta0 = logger:get_process_metadata(), - Meta1 = maps:without( - [ - clientid, - rule_id, - rule_trigger_ts - ], - Meta0 - ), - logger:set_process_metadata(Meta1); -reset_process_trace_metadata(_) -> - Meta0 = logger:get_process_metadata(), - Meta1 = maps:without( - [ - rule_id, - rule_trigger_ts - ], - Meta0 - ), - logger:set_process_metadata(Meta1). - do_apply_rule( #{ id := RuleId, diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 449b8f52d..90a717930 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -69,18 +69,22 @@ do_apply_rule( end. do_apply_matched_rule(Rule, Context, StopAfterRender, EventTopics) -> - update_process_trace_metadata(StopAfterRender), - FullContext = fill_default_values( - hd(EventTopics), - emqx_rule_maps:atom_key_map(Context) - ), - ApplyRuleRes = emqx_rule_runtime:apply_rule( - Rule, - FullContext, - apply_rule_environment() - ), - reset_trace_process_metadata(StopAfterRender), - ApplyRuleRes. + PrevLoggerProcessMetadata = logger:get_process_metadata(), + try + update_process_trace_metadata(StopAfterRender), + FullContext = fill_default_values( + hd(EventTopics), + emqx_rule_maps:atom_key_map(Context) + ), + ApplyRuleRes = emqx_rule_runtime:apply_rule( + Rule, + FullContext, + apply_rule_environment() + ), + ApplyRuleRes + after + reset_logger_process_metadata(PrevLoggerProcessMetadata) + end. update_process_trace_metadata(true = _StopAfterRender) -> logger:update_process_metadata(#{ @@ -89,12 +93,10 @@ update_process_trace_metadata(true = _StopAfterRender) -> update_process_trace_metadata(false = _StopAfterRender) -> ok. -reset_trace_process_metadata(true = _StopAfterRender) -> - Meta = logger:get_process_metadata(), - NewMeta = maps:remove(stop_action_after_render, Meta), - logger:set_process_metadata(NewMeta); -reset_trace_process_metadata(false = _StopAfterRender) -> - ok. +reset_logger_process_metadata(undefined = _PrevProcessMetadata) -> + logger:unset_process_metadata(); +reset_logger_process_metadata(PrevProcessMetadata) -> + logger:set_process_metadata(PrevProcessMetadata). %% At the time of writing the environment passed to the apply rule function is %% not used at all for normal actions. When it is used for custom functions it