diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl index c1213459c..34469f835 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -48,7 +48,11 @@ format_meta_map(Meta) -> format_meta_map(Meta, Encode). format_meta_map(Meta, Encode) -> - format_meta_map(Meta, Encode, [{packet, fun format_packet/2}, {payload, fun format_payload/2}]). + format_meta_map(Meta, Encode, [ + {packet, fun format_packet/2}, + {payload, fun format_payload/2}, + {<<"payload">>, fun format_payload/2} + ]). format_meta_map(Meta, _Encode, []) -> Meta; @@ -61,9 +65,21 @@ format_meta_map(Meta, Encode, [{Name, FormatFun} | Rest]) -> format_meta_map(Meta, Encode, Rest) end. +format_meta_data(Meta0, Encode) when is_map(Meta0) -> + Meta1 = format_meta_map(Meta0, Encode), + maps:map(fun(_K, V) -> format_meta_data(V, Encode) end, Meta1); +format_meta_data(Meta, Encode) when is_list(Meta) -> + [format_meta_data(Item, Encode) || Item <- Meta]; +format_meta_data(Meta, Encode) when is_tuple(Meta) -> + List = erlang:tuple_to_list(Meta), + FormattedList = [format_meta_data(Item, Encode) || Item <- List], + erlang:list_to_tuple(FormattedList); +format_meta_data(Meta, _Encode) -> + Meta. + format_meta(Meta0, Encode) -> Meta1 = maps:without([msg, clientid, peername, trace_tag], Meta0), - Meta2 = format_meta_map(Meta1, Encode), + Meta2 = format_meta_data(Meta1, Encode), kvs_to_iolist(lists:sort(fun compare_meta_kvs/2, maps:to_list(Meta2))). %% packet always goes first; payload always goes last diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index 85b846349..e06473650 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -42,7 +42,7 @@ format( %% an external call to create the JSON text Time = emqx_utils_calendar:now_to_rfc3339(microsecond), LogMap2 = LogMap1#{time => Time}, - LogMap3 = prepare_log_map(LogMap2, PEncode), + LogMap3 = prepare_log_data(LogMap2, PEncode), [emqx_logger_jsonfmt:best_effort_json(LogMap3, [force_utf8]), "\n"]. %%%----------------------------------------------------------------- @@ -85,9 +85,17 @@ do_maybe_format_msg({report, Report} = Msg, #{report_cb := Cb} = Meta, Config) - do_maybe_format_msg(Msg, Meta, Config) -> emqx_logger_jsonfmt:format_msg(Msg, Meta, Config). -prepare_log_map(LogMap, PEncode) -> +prepare_log_data(LogMap, PEncode) when is_map(LogMap) -> NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)], - maps:from_list(NewKeyValuePairs). + maps:from_list(NewKeyValuePairs); +prepare_log_data(V, PEncode) when is_list(V) -> + [prepare_log_data(Item, PEncode) || Item <- V]; +prepare_log_data(V, PEncode) when is_tuple(V) -> + List = erlang:tuple_to_list(V), + PreparedList = [prepare_log_data(Item, PEncode) || Item <- List], + erlang:list_to_tuple(PreparedList); +prepare_log_data(V, _PEncode) -> + V. prepare_key_value(host, {I1, I2, I3, I4} = IP, _PEncode) when is_integer(I1), @@ -118,6 +126,8 @@ prepare_key_value(payload = K, V, PEncode) -> V end, {K, NewV}; +prepare_key_value(<<"payload">>, V, PEncode) -> + prepare_key_value(payload, V, PEncode); prepare_key_value(packet = K, V, PEncode) -> NewV = try @@ -167,10 +177,8 @@ prepare_key_value(action_id = K, V, _PEncode) -> _:_ -> {K, V} end; -prepare_key_value(K, V, PEncode) when is_map(V) -> - {K, prepare_log_map(V, PEncode)}; -prepare_key_value(K, V, _PEncode) -> - {K, V}. +prepare_key_value(K, V, PEncode) -> + {K, prepare_log_data(V, PEncode)}. format_packet(undefined, _) -> ""; format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode). diff --git a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl index 88b161033..bc9f37935 100644 --- a/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl +++ b/apps/emqx_bridge_s3/src/emqx_bridge_s3_connector.erl @@ -310,7 +310,7 @@ on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels}) ChannelState = #{mode := direct} -> run_simple_upload(InstId, Tag, Data, ChannelState, Config); ChannelState = #{mode := aggregated} -> - run_aggregated_upload(InstId, [Data], ChannelState); + run_aggregated_upload(InstId, Tag, [Data], ChannelState); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. @@ -321,7 +321,7 @@ on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) -> case maps:get(Tag, Channels, undefined) of ChannelState = #{mode := aggregated} -> Records = [Data0 | [Data || {_, Data} <- Rest]], - run_aggregated_upload(InstId, Records, ChannelState); + run_aggregated_upload(InstId, Tag, Records, ChannelState); undefined -> {error, {unrecoverable_error, {invalid_message_tag, Tag}}} end. @@ -362,8 +362,12 @@ run_simple_upload( {error, map_error(Reason)} end. -run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) -> +run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) -> Timestamp = erlang:system_time(second), + emqx_trace:rendered_action_template(ChannelID, #{ + mode => aggregated, + records => Records + }), case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of ok -> ?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}), diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index a7a464842..0f0395013 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -91,13 +91,16 @@ end_per_testcase(_TestCase, _Config) -> ok. t_basic_apply_rule_trace_ruleid(Config) -> - basic_apply_rule_test_helper(get_action(Config), ruleid, false). + basic_apply_rule_test_helper(get_action(Config), ruleid, false, text). + +t_basic_apply_rule_trace_ruleid_hidden_payload(Config) -> + basic_apply_rule_test_helper(get_action(Config), ruleid, false, hidden). t_basic_apply_rule_trace_clientid(Config) -> - basic_apply_rule_test_helper(get_action(Config), clientid, false). + basic_apply_rule_test_helper(get_action(Config), clientid, false, text). t_basic_apply_rule_trace_ruleid_stop_after_render(Config) -> - basic_apply_rule_test_helper(get_action(Config), ruleid, true). + basic_apply_rule_test_helper(get_action(Config), ruleid, true, text). get_action(Config) -> case ?config(group_name, Config) of @@ -135,10 +138,10 @@ republish_action() -> console_print_action() -> #{<<"function">> => <<"console">>}. -basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> +basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) -> %% Create Rule RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]), - SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>, + SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>, {ok, #{<<"id">> := RuleId}} = emqx_bridge_testlib:create_rule_and_action( Action, @@ -157,12 +160,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> clientid -> ClientId end, - create_trace(TraceName, TraceType, TraceValue), + create_trace(TraceName, TraceType, TraceValue, PayloadEncode), %% =================================== Context = #{ clientid => ClientId, event_type => message_publish, - payload => <<"{\"msg\": \"hello\"}">>, + payload => <<"{\"msg\": \"my_payload_msg\"}">>, qos => 1, topic => RuleTopic, username => <<"u_emqx">> @@ -179,6 +182,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> begin Bin = read_rule_trace_file(TraceName, TraceType, Now), io:format("THELOG:~n~s", [Bin]), + case PayloadEncode of + hidden -> + ?assertEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>])); + text -> + ?assertNotEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>])) + end, ?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])), ?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])), case Action of @@ -273,7 +282,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) -> do_final_log_check(_, _) -> ok. -create_trace(TraceName, TraceType, TraceValue) -> +create_trace(TraceName, TraceType, TraceValue, PayloadEncode) -> Now = erlang:system_time(second) - 10, Start = Now, End = Now + 60, @@ -283,7 +292,8 @@ create_trace(TraceName, TraceType, TraceValue) -> TraceType => TraceValue, start_at => Start, end_at => End, - formatter => json + formatter => json, + payload_encode => PayloadEncode }, {ok, _} = CreateRes = emqx_trace:create(Trace), emqx_common_test_helpers:on_exit(fun() -> @@ -323,7 +333,7 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) -> ?FUNCTION_NAME, SQL ), - create_trace(Name, ruleid, RuleID), + create_trace(Name, ruleid, RuleID, text), Now = erlang:system_time(second) - 10, %% Stop ParmsStopAfterRender = apply_rule_parms(true, Name), @@ -588,7 +598,7 @@ do_apply_rule_test_format_action_failed_test(BatchSize, CheckLastTraceEntryFun) ?FUNCTION_NAME, SQL ), - create_trace(Name, ruleid, RuleID), + create_trace(Name, ruleid, RuleID, text), Now = erlang:system_time(second) - 10, %% Stop ParmsNoStopAfterRender = apply_rule_parms(false, Name),