Merge pull request #13100 from kjellwinblad/kjell/s3_aggregate_only_has_result_trace/EMQX-12429

fix: add action rendered trace for s3 in aggregated mode
This commit is contained in:
Kjell Winblad 2024-05-23 14:29:18 +02:00 committed by GitHub
commit 0e5b02c373
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 61 additions and 23 deletions

View File

@ -48,7 +48,11 @@ format_meta_map(Meta) ->
format_meta_map(Meta, Encode).
format_meta_map(Meta, Encode) ->
format_meta_map(Meta, Encode, [{packet, fun format_packet/2}, {payload, fun format_payload/2}]).
format_meta_map(Meta, Encode, [
{packet, fun format_packet/2},
{payload, fun format_payload/2},
{<<"payload">>, fun format_payload/2}
]).
format_meta_map(Meta, _Encode, []) ->
Meta;
@ -61,9 +65,21 @@ format_meta_map(Meta, Encode, [{Name, FormatFun} | Rest]) ->
format_meta_map(Meta, Encode, Rest)
end.
format_meta_data(Meta0, Encode) when is_map(Meta0) ->
Meta1 = format_meta_map(Meta0, Encode),
maps:map(fun(_K, V) -> format_meta_data(V, Encode) end, Meta1);
format_meta_data(Meta, Encode) when is_list(Meta) ->
[format_meta_data(Item, Encode) || Item <- Meta];
format_meta_data(Meta, Encode) when is_tuple(Meta) ->
List = erlang:tuple_to_list(Meta),
FormattedList = [format_meta_data(Item, Encode) || Item <- List],
erlang:list_to_tuple(FormattedList);
format_meta_data(Meta, _Encode) ->
Meta.
format_meta(Meta0, Encode) ->
Meta1 = maps:without([msg, clientid, peername, trace_tag], Meta0),
Meta2 = format_meta_map(Meta1, Encode),
Meta2 = format_meta_data(Meta1, Encode),
kvs_to_iolist(lists:sort(fun compare_meta_kvs/2, maps:to_list(Meta2))).
%% packet always goes first; payload always goes last

View File

@ -42,7 +42,7 @@ format(
%% an external call to create the JSON text
Time = emqx_utils_calendar:now_to_rfc3339(microsecond),
LogMap2 = LogMap1#{time => Time},
LogMap3 = prepare_log_map(LogMap2, PEncode),
LogMap3 = prepare_log_data(LogMap2, PEncode),
[emqx_logger_jsonfmt:best_effort_json(LogMap3, [force_utf8]), "\n"].
%%%-----------------------------------------------------------------
@ -85,9 +85,17 @@ do_maybe_format_msg({report, Report} = Msg, #{report_cb := Cb} = Meta, Config) -
do_maybe_format_msg(Msg, Meta, Config) ->
emqx_logger_jsonfmt:format_msg(Msg, Meta, Config).
prepare_log_map(LogMap, PEncode) ->
prepare_log_data(LogMap, PEncode) when is_map(LogMap) ->
NewKeyValuePairs = [prepare_key_value(K, V, PEncode) || {K, V} <- maps:to_list(LogMap)],
maps:from_list(NewKeyValuePairs).
maps:from_list(NewKeyValuePairs);
prepare_log_data(V, PEncode) when is_list(V) ->
[prepare_log_data(Item, PEncode) || Item <- V];
prepare_log_data(V, PEncode) when is_tuple(V) ->
List = erlang:tuple_to_list(V),
PreparedList = [prepare_log_data(Item, PEncode) || Item <- List],
erlang:list_to_tuple(PreparedList);
prepare_log_data(V, _PEncode) ->
V.
prepare_key_value(host, {I1, I2, I3, I4} = IP, _PEncode) when
is_integer(I1),
@ -118,6 +126,8 @@ prepare_key_value(payload = K, V, PEncode) ->
V
end,
{K, NewV};
prepare_key_value(<<"payload">>, V, PEncode) ->
prepare_key_value(payload, V, PEncode);
prepare_key_value(packet = K, V, PEncode) ->
NewV =
try
@ -167,10 +177,8 @@ prepare_key_value(action_id = K, V, _PEncode) ->
_:_ ->
{K, V}
end;
prepare_key_value(K, V, PEncode) when is_map(V) ->
{K, prepare_log_map(V, PEncode)};
prepare_key_value(K, V, _PEncode) ->
{K, V}.
prepare_key_value(K, V, PEncode) ->
{K, prepare_log_data(V, PEncode)}.
format_packet(undefined, _) -> "";
format_packet(Packet, Encode) -> emqx_packet:format(Packet, Encode).

View File

@ -310,7 +310,7 @@ on_query(InstId, {Tag, Data}, #{client_config := Config, channels := Channels})
ChannelState = #{mode := direct} ->
run_simple_upload(InstId, Tag, Data, ChannelState, Config);
ChannelState = #{mode := aggregated} ->
run_aggregated_upload(InstId, [Data], ChannelState);
run_aggregated_upload(InstId, Tag, [Data], ChannelState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
@ -321,7 +321,7 @@ on_batch_query(InstId, [{Tag, Data0} | Rest], #{channels := Channels}) ->
case maps:get(Tag, Channels, undefined) of
ChannelState = #{mode := aggregated} ->
Records = [Data0 | [Data || {_, Data} <- Rest]],
run_aggregated_upload(InstId, Records, ChannelState);
run_aggregated_upload(InstId, Tag, Records, ChannelState);
undefined ->
{error, {unrecoverable_error, {invalid_message_tag, Tag}}}
end.
@ -362,8 +362,12 @@ run_simple_upload(
{error, map_error(Reason)}
end.
run_aggregated_upload(InstId, Records, #{aggreg_id := AggregId}) ->
run_aggregated_upload(InstId, ChannelID, Records, #{aggreg_id := AggregId}) ->
Timestamp = erlang:system_time(second),
emqx_trace:rendered_action_template(ChannelID, #{
mode => aggregated,
records => Records
}),
case emqx_connector_aggregator:push_records(AggregId, Timestamp, Records) of
ok ->
?tp(s3_bridge_aggreg_push_ok, #{instance_id => InstId, name => AggregId}),

View File

@ -91,13 +91,16 @@ end_per_testcase(_TestCase, _Config) ->
ok.
t_basic_apply_rule_trace_ruleid(Config) ->
basic_apply_rule_test_helper(get_action(Config), ruleid, false).
basic_apply_rule_test_helper(get_action(Config), ruleid, false, text).
t_basic_apply_rule_trace_ruleid_hidden_payload(Config) ->
basic_apply_rule_test_helper(get_action(Config), ruleid, false, hidden).
t_basic_apply_rule_trace_clientid(Config) ->
basic_apply_rule_test_helper(get_action(Config), clientid, false).
basic_apply_rule_test_helper(get_action(Config), clientid, false, text).
t_basic_apply_rule_trace_ruleid_stop_after_render(Config) ->
basic_apply_rule_test_helper(get_action(Config), ruleid, true).
basic_apply_rule_test_helper(get_action(Config), ruleid, true, text).
get_action(Config) ->
case ?config(group_name, Config) of
@ -135,10 +138,10 @@ republish_action() ->
console_print_action() ->
#{<<"function">> => <<"console">>}.
basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
basic_apply_rule_test_helper(Action, TraceType, StopAfterRender, PayloadEncode) ->
%% Create Rule
RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>,
SQL = <<"SELECT payload.id as id, payload as payload FROM \"", RuleTopic/binary, "\"">>,
{ok, #{<<"id">> := RuleId}} =
emqx_bridge_testlib:create_rule_and_action(
Action,
@ -157,12 +160,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
clientid ->
ClientId
end,
create_trace(TraceName, TraceType, TraceValue),
create_trace(TraceName, TraceType, TraceValue, PayloadEncode),
%% ===================================
Context = #{
clientid => ClientId,
event_type => message_publish,
payload => <<"{\"msg\": \"hello\"}">>,
payload => <<"{\"msg\": \"my_payload_msg\"}">>,
qos => 1,
topic => RuleTopic,
username => <<"u_emqx">>
@ -179,6 +182,12 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) ->
begin
Bin = read_rule_trace_file(TraceName, TraceType, Now),
io:format("THELOG:~n~s", [Bin]),
case PayloadEncode of
hidden ->
?assertEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>]));
text ->
?assertNotEqual(nomatch, binary:match(Bin, [<<"my_payload_msg">>]))
end,
?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"SQL_yielded_result">>])),
case Action of
@ -273,7 +282,7 @@ do_final_log_check(Action, Bin0) when is_binary(Action) ->
do_final_log_check(_, _) ->
ok.
create_trace(TraceName, TraceType, TraceValue) ->
create_trace(TraceName, TraceType, TraceValue, PayloadEncode) ->
Now = erlang:system_time(second) - 10,
Start = Now,
End = Now + 60,
@ -283,7 +292,8 @@ create_trace(TraceName, TraceType, TraceValue) ->
TraceType => TraceValue,
start_at => Start,
end_at => End,
formatter => json
formatter => json,
payload_encode => PayloadEncode
},
{ok, _} = CreateRes = emqx_trace:create(Trace),
emqx_common_test_helpers:on_exit(fun() ->
@ -323,7 +333,7 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
?FUNCTION_NAME,
SQL
),
create_trace(Name, ruleid, RuleID),
create_trace(Name, ruleid, RuleID, text),
Now = erlang:system_time(second) - 10,
%% Stop
ParmsStopAfterRender = apply_rule_parms(true, Name),
@ -588,7 +598,7 @@ do_apply_rule_test_format_action_failed_test(BatchSize, CheckLastTraceEntryFun)
?FUNCTION_NAME,
SQL
),
create_trace(Name, ruleid, RuleID),
create_trace(Name, ruleid, RuleID, text),
Now = erlang:system_time(second) - 10,
%% Stop
ParmsNoStopAfterRender = apply_rule_parms(false, Name),