diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index 92cec7703..7c421f75e 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -17,6 +17,10 @@ File format: * Fix updating `emqx_auth_mnesia.conf` password and restarting the new password does not take effect [#6717] * Fix import data crash when emqx_auth_mnesia's record is not empty [#6717] * Fix `os_mon.sysmem_high_watermark` may not alert after reboot. +* Enhancement: Log client status before killing it for holding the lock for too long. + [emqx-6959](https://github.com/emqx/emqx/pull/6959) + [ekka-144](https://github.com/emqx/ekka/pull/144) + [ekka-146](https://github.com/emqx/ekka/pull/146) ## v4.3.11 @@ -46,7 +50,7 @@ Important notes: * Clustering malfunction fixes [#6221, #6381] Mostly changes made in [ekka](https://github.com/emqx/ekka/pull/134)
From 0.8.1.4 to 0.8.1.6, fixes included intra-cluster RPC call timeouts,
- also fixed `ekka_locker` process crashe after killing a hanged lock owner. + also fixed `ekka_locker` process crashed after killing a hanged lock owner. * Improved log message when TCP proxy is in use but proxy_protocol configuration is not turned on [#6416]
"please check proxy_protocol config for specific listeners and zones" to hint a misconfiguration @@ -101,6 +105,8 @@ Important notes: properties such as protocol name, protocol version, username (if any) peer-host
etc. are filled as MQTT message headers. +* Format the message id to hex strings in the log message [#6961] + ## v4.3.0~10 Older version changes are not tracked here. diff --git a/apps/emqx_stomp/src/emqx_stomp_frame.erl b/apps/emqx_stomp/src/emqx_stomp_frame.erl index fa37c2f64..51f23e778 100644 --- a/apps/emqx_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_stomp/src/emqx_stomp_frame.erl @@ -123,6 +123,8 @@ parse(<<>>, Parser) -> parse(Bytes, #{phase := body, length := Len, state := State}) -> parse(body, Bytes, State, Len); +parse(<>, #{phase := hdname, state := State}) -> + parse(body, Bytes, State, content_len(State)); parse(Bytes, #{phase := Phase, state := State}) when Phase =/= none -> parse(Phase, Bytes, State); diff --git a/apps/emqx_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_stomp/test/emqx_stomp_SUITE.erl index e2599ab51..f4503d791 100644 --- a/apps/emqx_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_stomp/test/emqx_stomp_SUITE.erl @@ -359,6 +359,35 @@ t_1000_msg_send(_) -> lists:foreach(fun(_) -> RecvFun() end, lists:seq(1, 1000)) end). +t_sticky_packets_truncate_after_headers(_) -> + with_connection(fun(Sock) -> + gen_tcp:send(Sock, serialize(<<"CONNECT">>, + [{<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}])), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, #stomp_frame{command = <<"CONNECTED">>, + headers = _, + body = _}, _} = parse(Data), + + Topic = <<"/queue/foo">>, + + emqx:subscribe(Topic), + gen_tcp:send(Sock, ["SEND\n", + "content-length:3\n", + "destination:/queue/foo\n"]), + timer:sleep(300), + gen_tcp:send(Sock, ["\nfoo",0]), + receive + {deliver, Topic, _Msg}-> + ok + after 100 -> + ?assert(false, "waiting message timeout") + end + end). + with_connection(DoFun) -> {ok, Sock} = gen_tcp:connect({127, 0, 0, 1}, 61613, diff --git a/rebar.config b/rebar.config index 9c2b72881..0778d3961 100644 --- a/rebar.config +++ b/rebar.config @@ -43,7 +43,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.4"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.7"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.0"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}} diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 66194a6bb..87b70308a 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -11,6 +11,7 @@ , {load_module,emqx_os_mon,brutal_purge,soft_purge,[]} , {load_module,emqx,brutal_purge,soft_purge,[]} , {load_module,emqx_app,brutal_purge,soft_purge,[]} + , {load_module,emqx_message,brutal_purge,soft_purge,[]} , {load_module,emqx_limiter,brutal_purge,soft_purge,[]} ]}, {<<".*">>,[]} @@ -26,6 +27,7 @@ , {load_module,emqx_os_mon,brutal_purge,soft_purge,[]} , {load_module,emqx,brutal_purge,soft_purge,[]} , {load_module,emqx_app,brutal_purge,soft_purge,[]} + , {load_module,emqx_message,brutal_purge,soft_purge,[]} , {load_module,emqx_limiter,brutal_purge,soft_purge,[]} ]}, {<<".*">>,[]} diff --git a/src/emqx_message.erl b/src/emqx_message.erl index faae621d8..a0ee98434 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -342,7 +342,13 @@ format(#message{id = Id, flags = Flags, headers = Headers}) -> io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)", - [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]). + [printable_msg_id(Id), QoS, Topic, From, format(flags, Flags), + format(headers, Headers)]). + +printable_msg_id(undefined) -> + <<>>; +printable_msg_id(Id) -> + emqx_guid:to_hexstr(Id). format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);