From e10c87b825ace3b9778e3f577ac5563152181001 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 May 2024 10:25:27 +0200 Subject: [PATCH 1/3] fix: add action rendered trace for s3 in aggregated mode Fixes: https://emqx.atlassian.net/browse/EMQX-12429 --- .../src/emqx_bridge_s3_connector.erl | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 88b161033..dd0362e5d 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -310,7 +310,7 @@ on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ChannelState = #{mode := direct} -> run_simple_upload(InstId, Tag, Data, ChannelState, Config); ChannelState = #{mode := aggregated} -> - run_aggregated_upload(InstId, [Data], ChannelState); + run_aggregated_upload(InstId, Tag, [Data], ChannelState); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. @@ -321,7 +321,7 @@ on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) -> case maps:get(Tag, Channels, undefined) of ChannelState = #{mode := aggregated} -> Records = [Data0 | [Data || {_, Data} <- Rest]], - run_aggregated_upload(InstId, Records, ChannelState); + run_aggregated_upload(InstId, Tag, Records, ChannelState); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. @@ -362,8 +362,15 @@ run_simple_upload( {error, map_error(Reason)} end. -run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) -> +run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> Timestamp = erlang:system_time(second), + emqx_trace:rendered_action_template(ChannelID, #{ + mode => aggregated, + records => #emqx_trace_format_func_data{ + function = fun render_records/1, + data = Records + } + }), case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of ok -> ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), @@ -372,6 +379,13 @@ run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) -> {error, {unrecoverable_error, Reason}} end. +render_records(Records) -> + try + [unicode:characters_to_binary(R) || R <- Records] + catch + _:_ -> Records + end. + map_error({socket_error, _} = Reason) -> {recoverable_error, Reason}; map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 -> From fb7688ab94dcb04411441199a5c759e1d21be2b2 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 May 2024 12:30:56 +0200 Subject: [PATCH 2/3] fix(trace): make sure that the payload encode works with nested payloads This commit makes sure that the trace setting for payload encode works even when the payload is in a nested structure or when the payload key is a binary instead of an atom. Fixes: https://emqx.atlassian.net/browse/EMQX-12424 --- .../src/emqx_trace/emqx_trace_formatter.erl | 20 ++++++++++-- .../emqx_trace/emqx_trace_json_formatter.erl | 22 +++++++++---- .../emqx_rule_engine_api_rule_apply_SUITE.erl | 32 ++++++++++++------- 3 files changed, 54 insertions(+), 20 deletions(-) diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl index c1213459c..34469f835 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -48,7 +48,11 @@ format_meta_map(Meta) -> format_meta_map(Meta, Encode). format_meta_map(Meta, Encode) -> - format_meta_map(Meta, Encode, [{packet, fun format_packet/2}, {payload, fun format_payload/2}]). + format_meta_map(Meta, Encode, [ + {packet, fun format_packet/2}, + {payload, fun format_payload/2}, + {<<"payload">>, fun format_payload/2} + ]). format_meta_map(Meta, _Encode, []) -> Meta; @@ -61,9 +65,21 @@ format_meta_map(Meta, Encode, [{Name, FormatFun} | Rest]) -> format_meta_map(Meta, Encode, Rest) end. +format_meta_data(Meta0, Encode) when is_map(Meta0) -> + Meta1 = format_meta_map(Meta0, Encode), + maps:map(fun(_K, V) -> format_meta_data(V, Encode) end, Meta1); +format_meta_data(Meta, Encode) when is_list(Meta) -> + [format_meta_data(Item, Encode) || Item <- Meta]; +format_meta_data(Meta, Encode) when is_tuple(Meta) -> + List = erlang:tuple_to_list(Meta), + FormattedList = [format_meta_data(Item, Encode) || Item <- List], + erlang:list_to_tuple(FormattedList); +format_meta_data(Meta, _Encode) -> + Meta. + format_meta(Meta0, Encode) -> Meta1 = maps:without([msg, clientid, peername, trace_tag], Meta0), - Meta2 = format_meta_map(Meta1, Encode), + Meta2 = format_meta_data(Meta1, Encode), kvs_to_iolist(lists:sort(fun compare_meta_kvs/2, maps:to_list(Meta2))). %% packet always goes first; payload always goes last diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index 85b846349..e06473650 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -42,7 +42,7 @@ format( %% an external call to create the JSON text Time = emqx_utils_calendar:now_to_rfc3339(microsecond), LogMap2 = LogMap1#{time => Time}, - LogMap3 = prepare_log_map(LogMap2, PEncode), + LogMap3 = prepare_log_data(LogMap2, PEncode), [emqx_logger_jsonfmt:best_effort_json(LogMap3, [force_utf8]), "\n"]. %%%----------------------------------------------------------------- @@ -85,9 +85,17 @@ do_maybe_format_msg({report, Report} = Msg, #{report_cb := Cb} = Meta, Config) - do_maybe_format_msg(Msg, Meta, Config) -> emqx_logger_jsonfmt:format_msg(Msg, Meta, Config). -prepare_log_map(LogMap, PEncode) -> +prepare_log_data(LogMap, PEncode) when is_map(LogMap) -> NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)], - maps:from_list(NewKeyValuePairs). + maps:from_list(NewKeyValuePairs); +prepare_log_data(V, PEncode) when is_list(V) -> + [prepare_log_data(Item, PEncode) || Item <- V]; +prepare_log_data(V, PEncode) when is_tuple(V) -> + List = erlang:tuple_to_list(V), + PreparedList = [prepare_log_data(Item, PEncode) || Item <- List], + erlang:list_to_tuple(PreparedList); +prepare_log_data(V, _PEncode) -> + V. prepare_key_value(host, {I1, I2, I3, I4} = IP, _PEncode) when is_integer(I1), @@ -118,6 +126,8 @@ prepare_key_value(payload = K, V, PEncode) -> V end, {K, NewV}; +prepare_key_value(<<"payload">>, V, PEncode) -> + prepare_key_value(payload, V, PEncode); prepare_key_value(packet = K, V, PEncode) -> NewV = try @@ -167,10 +177,8 @@ prepare_key_value(action_id = K, V, _PEncode) -> _:_ -> {K, V} end; -prepare_key_value(K, V, PEncode) when is_map(V) -> - {K, prepare_log_map(V, PEncode)}; -prepare_key_value(K, V, _PEncode) -> - {K, V}. +prepare_key_value(K, V, PEncode) -> + {K, prepare_log_data(V, PEncode)}. format_packet(undefined, _) -> ""; format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index a7a464842..0f0395013 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -91,13 +91,16 @@ end_per_testcase(_TestCase, _Config) -> ok. t_basic_apply_rule_trace_ruleid(Config) -> - basic_apply_rule_test_helper(get_action(Config), ruleid, false). + basic_apply_rule_test_helper(get_action(Config), ruleid, false, text). + +t_basic_apply_rule_trace_ruleid_hidden_payload(Config) -> + basic_apply_rule_test_helper(get_action(Config), ruleid, false, hidden). t_basic_apply_rule_trace_clientid(Config) -> - basic_apply_rule_test_helper(get_action(Config), clientid, false). + basic_apply_rule_test_helper(get_action(Config), clientid, false, text). t_basic_apply_rule_trace_ruleid_stop_after_render(Config) -> - basic_apply_rule_test_helper(get_action(Config), ruleid, true). + basic_apply_rule_test_helper(get_action(Config), ruleid, true, text). get_action(Config) -> case ?config(group_name, Config) of @@ -135,10 +138,10 @@ republish_action() -> console_print_action() -> #{<<"function">> => <<"console">>}. -basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> +basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) -> %% Create Rule RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]), - SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>, + SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>, {ok, #{<<"id">> := RuleId}} = emqx_bridge_testlib:create_rule_and_action( Action, @@ -157,12 +160,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> clientid -> ClientId end, - create_trace(TraceName, TraceType, TraceValue), + create_trace(TraceName, TraceType, TraceValue, PayloadEncode), %% =================================== Context = #{ clientid => ClientId, event_type => message_publish, - payload => <<"{\"msg\": \"hello\"}">>, + payload => <<"{\"msg\": \"my_payload_msg\"}">>, qos => 1, topic => RuleTopic, username => <<"u_emqx">> @@ -179,6 +182,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> begin Bin = read_rule_trace_file(TraceName, TraceType, Now), io:format("THELOG:~n~s", [Bin]), + case PayloadEncode of + hidden -> + ?assertEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>])); + text -> + ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>])) + end, ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])), case Action of @@ -273,7 +282,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) -> do_final_log_check(_, _) -> ok. -create_trace(TraceName, TraceType, TraceValue) -> +create_trace(TraceName, TraceType, TraceValue, PayloadEncode) -> Now = erlang:system_time(second) - 10, Start = Now, End = Now + 60, @@ -283,7 +292,8 @@ create_trace(TraceName, TraceType, TraceValue) -> TraceType => TraceValue, start_at => Start, end_at => End, - formatter => json + formatter => json, + payload_encode => PayloadEncode }, {ok, _} = CreateRes = emqx_trace:create(Trace), emqx_common_test_helpers:on_exit(fun() -> @@ -323,7 +333,7 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) -> ?FUNCTION_NAME, SQL ), - create_trace(Name, ruleid, RuleID), + create_trace(Name, ruleid, RuleID, text), Now = erlang:system_time(second) - 10, %% Stop ParmsStopAfterRender = apply_rule_parms(true, Name), @@ -588,7 +598,7 @@ do_apply_rule_test_format_action_failed_test(BatchSize, CheckLastTraceEntryFun) ?FUNCTION_NAME, SQL ), - create_trace(Name, ruleid, RuleID), + create_trace(Name, ruleid, RuleID, text), Now = erlang:system_time(second) - 10, %% Stop ParmsNoStopAfterRender = apply_rule_parms(false, Name), From 89b47e8ffccbeff5b9149f3fc2ab9866d4e611fa Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 23 May 2024 12:41:30 +0200 Subject: [PATCH 3/3] fix(s3 tracing): do not format records as IO data in aggregate mode --- apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index dd0362e5d..bc9f37935 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -366,10 +366,7 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> Timestamp = erlang:system_time(second), emqx_trace:rendered_action_template(ChannelID, #{ mode => aggregated, - records => #emqx_trace_format_func_data{ - function = fun render_records/1, - data = Records - } + records => Records }), case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of ok -> @@ -379,13 +376,6 @@ run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> {error, {unrecoverable_error, Reason}} end. -render_records(Records) -> - try - [unicode:characters_to_binary(R) || R <- Records] - catch - _:_ -> Records - end. - map_error({socket_error, _} = Reason) -> {recoverable_error, Reason}; map_error(Reason = {aws_error, Status, _, _Body}) when Status >= 500 ->