diff --git a/apps/emqx/include/emqx_mqtt.hrl b/apps/emqx/include/emqx_mqtt.hrl index d8922fb98..4d0188f71 100644 --- a/apps/emqx/include/emqx_mqtt.hrl +++ b/apps/emqx/include/emqx_mqtt.hrl @@ -680,6 +680,7 @@ end). -define(THROW_SERIALIZE_ERROR(Reason), erlang:throw({?FRAME_SERIALIZE_ERROR, Reason})). -define(MAX_PAYLOAD_FORMAT_SIZE, 1024). +-define(TRUNCATED_PAYLOAD_SIZE, 100). -define(MAX_PAYLOAD_FORMAT_LIMIT(Bin), (byte_size(Bin) =< ?MAX_PAYLOAD_FORMAT_SIZE)). -endif. diff --git a/apps/emqx/src/emqx_packet.erl b/apps/emqx/src/emqx_packet.erl index 1f7aaa4c9..9cb23be2e 100644 --- a/apps/emqx/src/emqx_packet.erl +++ b/apps/emqx/src/emqx_packet.erl @@ -55,6 +55,8 @@ format/2 ]). +-export([format_truncated_payload/3]). + -define(TYPE_NAMES, {'CONNECT', 'CONNACK', 'PUBLISH', 'PUBACK', 'PUBREC', 'PUBREL', 'PUBCOMP', 'SUBSCRIBE', 'SUBACK', 'UNSUBSCRIBE', 'UNSUBACK', 'PINGREQ', 'PINGRESP', 'DISCONNECT', 'AUTH'} @@ -614,21 +616,33 @@ format_password(undefined) -> ""; format_password(<<>>) -> ""; format_password(_Password) -> "******". +format_payload(_, hidden) -> + "Payload=******"; format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> ["Payload=", unicode:characters_to_list(Payload)]; format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> ["Payload(hex)=", binary:encode_hex(Payload)]; -format_payload(_, hidden) -> - "Payload=******"; -format_payload(<> = Payload, _) -> +format_payload(<> = Payload, Type) -> [ "Payload=", - Part, - "... The ", - integer_to_list(byte_size(Payload) - 100), - " bytes of this log are truncated" + format_truncated_payload(Part, byte_size(Payload), Type) ]. +format_truncated_payload(Bin, Size, Type) -> + Bin2 = + case Type of + text -> Bin; + hex -> binary:encode_hex(Bin) + end, + unicode:characters_to_list( + [ + Bin2, + "... The ", + integer_to_list(Size - ?TRUNCATED_PAYLOAD_SIZE), + " bytes of this log are truncated" + ] + ). + i(true) -> 1; i(false) -> 0; i(I) when is_integer(I) -> I. diff --git a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl index 42623e91a..be3d858f5 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_formatter.erl @@ -76,13 +76,8 @@ format_payload(_, hidden) -> format_payload(Payload, text) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> unicode:characters_to_list(Payload); format_payload(Payload, hex) when ?MAX_PAYLOAD_FORMAT_LIMIT(Payload) -> binary:encode_hex(Payload); -format_payload(<> = Payload, _) -> - [ - Part, - "... The ", - integer_to_list(byte_size(Payload) - 100), - " bytes of this log are truncated" - ]. +format_payload(<> = Payload, Type) -> + emqx_packet:format_truncated_payload(Part, byte_size(Payload), Type). to_iolist(Atom) when is_atom(Atom) -> atom_to_list(Atom); to_iolist(Int) when is_integer(Int) -> integer_to_list(Int); diff --git a/apps/emqx/test/emqx_trace_SUITE.erl b/apps/emqx/test/emqx_trace_SUITE.erl index 1bbe084fd..ce7d7e887 100644 --- a/apps/emqx/test/emqx_trace_SUITE.erl +++ b/apps/emqx/test/emqx_trace_SUITE.erl @@ -311,6 +311,60 @@ t_client_event(_Config) -> ?assert(erlang:byte_size(Bin3) > 0), ok. +t_client_huge_payload_truncated(_Config) -> + ClientId = <<"client-truncated1">>, + Now = erlang:system_time(second), + Name = <<"test_client_id_truncated1">>, + {ok, _} = emqx_trace:create([ + {<<"name">>, Name}, + {<<"type">>, clientid}, + {<<"clientid">>, ClientId}, + {<<"start_at">>, Now} + ]), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), + {ok, Client} = emqtt:start_link([{clean_start, true}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + emqtt:ping(Client), + NormalPayload = iolist_to_binary(lists:duplicate(1024, "x")), + ok = emqtt:publish(Client, <<"/test">>, #{}, NormalPayload, [{qos, 0}]), + HugePayload1 = iolist_to_binary(lists:duplicate(1025, "y")), + ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload1, [{qos, 0}]), + HugePayload2 = iolist_to_binary(lists:duplicate(1024 * 10, "y")), + ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload2, [{qos, 0}]), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), + {ok, _} = emqx_trace:create([ + {<<"name">>, <<"test_topic">>}, + {<<"type">>, topic}, + {<<"topic">>, <<"/test">>}, + {<<"start_at">>, Now} + ]), + ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic), + {ok, Bin} = file:read_file(emqx_trace:log_file(Name, Now)), + ok = emqtt:publish(Client, <<"/test">>, #{}, NormalPayload, [{qos, 0}]), + ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload1, [{qos, 0}]), + ok = emqtt:publish(Client, <<"/test">>, #{}, HugePayload2, [{qos, 0}]), + ok = emqtt:disconnect(Client), + ok = emqx_trace_handler_SUITE:filesync(Name, clientid), + ok = emqx_trace_handler_SUITE:filesync(<<"test_topic">>, topic), + {ok, Bin2} = file:read_file(emqx_trace:log_file(Name, Now)), + {ok, Bin3} = file:read_file(emqx_trace:log_file(<<"test_topic">>, Now)), + ct:pal("Bin ~p Bin2 ~p Bin3 ~p", [byte_size(Bin), byte_size(Bin2), byte_size(Bin3)]), + ?assert(erlang:byte_size(Bin) > 1024), + ?assert(erlang:byte_size(Bin) < erlang:byte_size(Bin2)), + ?assert(erlang:byte_size(Bin3) > 1024), + + %% Don't have format crash + CrashBin = <<"CRASH">>, + ?assertEqual(nomatch, binary:match(Bin, [CrashBin])), + ?assertEqual(nomatch, binary:match(Bin2, [CrashBin])), + ?assertEqual(nomatch, binary:match(Bin3, [CrashBin])), + %% have "this log are truncated" for huge payload + TruncatedLog = <<"this log are truncated">>, + ?assertNotEqual(nomatch, binary:match(Bin, [TruncatedLog])), + ?assertNotEqual(nomatch, binary:match(Bin2, [TruncatedLog])), + ?assertNotEqual(nomatch, binary:match(Bin3, [TruncatedLog])), + ok. + t_get_log_filename(_Config) -> Now = erlang:system_time(second), Name = <<"name1">>,