diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 4145a92a7..52f96bcd2 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -970,6 +970,12 @@ close_socket(State = #state{socket = Socket}) -> %% Inc incoming/outgoing stats inc_incoming_stats(Ctx, FrameMod, Packet) -> + do_inc_incoming_stats(FrameMod:type(Packet), Ctx, FrameMod, Packet). + +%% If a mailformed packet is received, the type of the packet is undefined. +do_inc_incoming_stats(undefined, _Ctx, _FrameMod, _Packet) -> + ok; +do_inc_incoming_stats(Type, Ctx, FrameMod, Packet) -> inc_counter(recv_pkt, 1), case FrameMod:is_message(Packet) of true -> @@ -978,9 +984,7 @@ inc_incoming_stats(Ctx, FrameMod, Packet) -> false -> ok end, - Name = list_to_atom( - lists:concat(["packets.", FrameMod:type(Packet), ".received"]) - ), + Name = list_to_atom(lists:concat(["packets.", Type, ".received"])), emqx_gateway_ctx:metrics_inc(Ctx, Name). inc_outgoing_stats(Ctx, FrameMod, Packet) -> diff --git a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src index 38da1e18b..1fda99700 100644 --- a/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src +++ b/apps/emqx_gateway_stomp/src/emqx_gateway_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_gateway_stomp, [ {description, "Stomp Gateway"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [kernel, stdlib, emqx, emqx_gateway]}, {env, []}, diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 316432dea..07dfd5f46 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -499,7 +499,7 @@ handle_in( [{MountedTopic, SubOpts} | _] -> NSubs = [{SubId, MountedTopic, Ack, SubOpts} | Subs], NChannel1 = NChannel#channel{subscriptions = NSubs}, - handle_out(receipt, receipt_id(Headers), NChannel1) + handle_out_and_update(receipt, receipt_id(Headers), NChannel1) end; {error, ErrMsg, NChannel} -> ?SLOG(error, #{ @@ -541,7 +541,7 @@ handle_in( false -> {ok, Channel} end, - handle_out(receipt, receipt_id(Headers), NChannel); + handle_out_and_update(receipt, receipt_id(Headers), NChannel); %% XXX: How to ack a frame ??? handle_in(Frame = ?PACKET(?CMD_ACK, Headers), Channel) -> case header(<<"transaction">>, Headers) of @@ -638,12 +638,12 @@ handle_in( ] end, {ok, Outgoings, Channel}; +handle_in({frame_error, Reason}, Channel = #channel{conn_state = idle}) -> + shutdown(Reason, Channel); handle_in({frame_error, Reason}, Channel = #channel{conn_state = _ConnState}) -> - ?SLOG(error, #{ - msg => "unexpected_frame_error", - reason => Reason - }), - shutdown(Reason, Channel). + ErrMsg = io_lib:format("Frame error: ~0p", [Reason]), + Frame = error_frame(undefined, ErrMsg), + shutdown(Reason, Frame, Channel). with_transaction(Headers, Channel = #channel{transaction = Trans}, Fun) -> Id = header(<<"transaction">>, Headers), @@ -769,6 +769,12 @@ handle_out(receipt, ReceiptId, Channel) -> Frame = receipt_frame(ReceiptId), {ok, {outgoing, Frame}, Channel}. +handle_out_and_update(receipt, undefined, Channel) -> + {ok, [{event, updated}], Channel}; +handle_out_and_update(receipt, ReceiptId, Channel) -> + Frame = receipt_frame(ReceiptId), + {ok, [{outgoing, Frame}, {event, updated}], Channel}. + %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- @@ -812,7 +818,7 @@ handle_call( ), NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts} | Subs], NChannel1 = NChannel#channel{subscriptions = NSubs}, - reply({ok, {MountedTopic, NSubOpts}}, NChannel1); + reply({ok, {MountedTopic, NSubOpts}}, [{event, updated}], NChannel1); {error, ErrMsg, NChannel} -> ?SLOG(error, #{ msg => "failed_to_subscribe_topic", @@ -841,6 +847,7 @@ handle_call( ), reply( ok, + [{event, updated}], Channel#channel{ subscriptions = lists:keydelete(MountedTopic, 2, Subs) } @@ -1107,6 +1114,9 @@ terminate(Reason, #channel{ reply(Reply, Channel) -> {reply, Reply, Channel}. +reply(Reply, Msgs, Channel) -> + {reply, Reply, Msgs, Channel}. + shutdown(Reason, Channel) -> {shutdown, Reason, Channel}. diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl index 4913d6b2a..561f9e229 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_frame.erl @@ -129,8 +129,8 @@ initial_parse_state(Opts) -> limit(Opts) -> #frame_limit{ - max_header_num = g(max_header_num, Opts, ?MAX_HEADER_NUM), - max_header_length = g(max_header_length, Opts, ?MAX_HEADER_LENGTH), + max_header_num = g(max_headers, Opts, ?MAX_HEADER_NUM), + max_header_length = g(max_headers_length, Opts, ?MAX_HEADER_LENGTH), max_body_length = g(max_body_length, Opts, ?MAX_BODY_LENGTH) }. @@ -243,7 +243,9 @@ content_len(#parser_state{headers = Headers}) -> false -> none end. -new_frame(#parser_state{cmd = Cmd, headers = Headers, acc = Acc}) -> +new_frame(#parser_state{cmd = Cmd, headers = Headers, acc = Acc, limit = Limit}) -> + ok = check_max_headers(Headers, Limit), + ok = check_max_body(Acc, Limit), #stomp_frame{command = Cmd, headers = Headers, body = Acc}. acc(Chunk, State = #parser_state{acc = Acc}) when is_binary(Chunk) -> @@ -261,6 +263,57 @@ unescape($c) -> ?COLON; unescape($\\) -> ?BSL; unescape(_Ch) -> error(cannnot_unescape). +check_max_headers( + Headers, + #frame_limit{ + max_header_num = MaxNum, + max_header_length = MaxLen + } +) -> + HeadersLen = length(Headers), + case HeadersLen > MaxNum of + true -> + error( + {too_many_headers, #{ + max_header_num => MaxNum, + received_headers_num => length(Headers) + }} + ); + false -> + ok + end, + lists:foreach( + fun({Name, Val}) -> + Len = byte_size(Name) + byte_size(Val), + case Len > MaxLen of + true -> + error( + {too_long_header, #{ + max_header_length => MaxLen, + found_header_length => Len + }} + ); + false -> + ok + end + end, + Headers + ). + +check_max_body(Acc, #frame_limit{max_body_length = MaxLen}) -> + Len = byte_size(Acc), + case Len > MaxLen of + true -> + error( + {too_long_body, #{ + max_body_length => MaxLen, + received_body_length => Len + }} + ); + false -> + ok + end. + %%-------------------------------------------------------------------- %% Serialize funcs %%-------------------------------------------------------------------- @@ -330,7 +383,10 @@ make(Command, Headers, Body) -> #stomp_frame{command = Command, headers = Headers, body = Body}. %% @doc Format a frame -format(Frame) -> serialize_pkt(Frame, #{}). +format({frame_error, _Reason} = Error) -> + Error; +format(Frame) -> + serialize_pkt(Frame, #{}). is_message(#stomp_frame{command = CMD}) when CMD == ?CMD_SEND; @@ -373,4 +429,6 @@ type(?CMD_RECEIPT) -> type(?CMD_ERROR) -> error; type(?CMD_HEARTBEAT) -> - heartbeat. + heartbeat; +type(_) -> + undefined. diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 4323cf32f..196ed703c 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -40,7 +40,12 @@ " username = \"${Packet.headers.login}\"\n" " password = \"${Packet.headers.passcode}\"\n" " }\n" - " listeners.tcp.default {\n" + " frame {\n" + " max_headers = 10\n" + " max_headers_length = 100\n" + " max_body_length = 1024\n" + " }\n" + " listeners.tcp.default {\n" " bind = 61613\n" " }\n" "}\n" @@ -256,6 +261,10 @@ t_subscribe(_) -> ] ), + %% assert subscription stats + [ClientInfo1] = clients(), + ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1), + %% Unsubscribe gen_tcp:send( Sock, @@ -278,6 +287,10 @@ t_subscribe(_) -> }, _, _} = parse(Data2), + %% assert subscription stats + [ClientInfo2] = clients(), + ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2), + gen_tcp:send( Sock, serialize( @@ -697,6 +710,129 @@ t_sticky_packets_truncate_after_headers(_) -> ?assert(false, "waiting message timeout") end end). + +t_frame_error_in_connect(_) -> + with_connection(fun(Sock) -> + gen_tcp:send( + Sock, + serialize( + <<"CONNECT">>, + [ + {<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>}, + {<<"custome_header1">>, <<"val">>}, + {<<"custome_header2">>, <<"val">>}, + {<<"custome_header3">>, <<"val">>}, + {<<"custome_header4">>, <<"val">>}, + {<<"custome_header5">>, <<"val">>}, + {<<"custome_header6">>, <<"val">>} + ] + ) + ), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end). + +t_frame_error_too_many_headers(_) -> + Frame = serialize( + <<"SEND">>, + [ + {<<"destination">>, <<"/queue/foo">>}, + {<<"custome_header1">>, <<"val">>}, + {<<"custome_header2">>, <<"val">>}, + {<<"custome_header3">>, <<"val">>}, + {<<"custome_header4">>, <<"val">>}, + {<<"custome_header5">>, <<"val">>}, + {<<"custome_header6">>, <<"val">>}, + {<<"custome_header7">>, <<"val">>}, + {<<"custome_header8">>, <<"val">>}, + {<<"custome_header9">>, <<"val">>}, + {<<"custome_header10">>, <<"val">>} + ], + <<"test">> + ), + Assert = + fun(Sock) -> + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ErrorFrame, _, _} = parse(Data), + ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame), + ?assertMatch( + match, re:run(ErrorFrame#stomp_frame.body, "too_many_headers", [{capture, none}]) + ), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end, + test_frame_error(Frame, Assert). + +t_frame_error_too_long_header(_) -> + LongHeaderVal = emqx_utils:bin_to_hexstr(crypto:strong_rand_bytes(50), upper), + Frame = serialize( + <<"SEND">>, + [ + {<<"destination">>, <<"/queue/foo">>}, + {<<"custome_header10">>, LongHeaderVal} + ], + <<"test">> + ), + Assert = + fun(Sock) -> + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ErrorFrame, _, _} = parse(Data), + ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame), + ?assertMatch( + match, re:run(ErrorFrame#stomp_frame.body, "too_long_header", [{capture, none}]) + ), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end, + test_frame_error(Frame, Assert). + +t_frame_error_too_long_body(_) -> + LongBody = emqx_utils:bin_to_hexstr(crypto:strong_rand_bytes(513), upper), + Frame = serialize( + <<"SEND">>, + [{<<"destination">>, <<"/queue/foo">>}], + LongBody + ), + Assert = + fun(Sock) -> + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, ErrorFrame, _, _} = parse(Data), + ?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame), + ?assertMatch( + match, re:run(ErrorFrame#stomp_frame.body, "too_long_body", [{capture, none}]) + ), + ?assertMatch({error, closed}, gen_tcp:recv(Sock, 0)) + end, + test_frame_error(Frame, Assert). + +test_frame_error(Frame, AssertFun) -> + with_connection(fun(Sock) -> + gen_tcp:send( + Sock, + serialize( + <<"CONNECT">>, + [ + {<<"accept-version">>, ?STOMP_VER}, + {<<"host">>, <<"127.0.0.1:61613">>}, + {<<"login">>, <<"guest">>}, + {<<"passcode">>, <<"guest">>}, + {<<"heart-beat">>, <<"0,0">>} + ] + ) + ), + {ok, Data} = gen_tcp:recv(Sock, 0), + {ok, + #stomp_frame{ + command = <<"CONNECTED">>, + headers = _, + body = _ + }, + _, _} = parse(Data), + gen_tcp:send(Sock, Frame), + AssertFun(Sock) + end). + t_rest_clienit_info(_) -> with_connection(fun(Sock) -> gen_tcp:send( @@ -802,10 +938,14 @@ t_rest_clienit_info(_) -> {200, Subs1} = request(get, ClientPath ++ "/subscriptions"), ?assertEqual(2, length(Subs1)), + {200, StompClient2} = request(get, ClientPath), + ?assertMatch(#{subscriptions_cnt := 2}, StompClient2), {204, _} = request(delete, ClientPath ++ "/subscriptions/t%2Fa"), {200, Subs2} = request(get, ClientPath ++ "/subscriptions"), ?assertEqual(1, length(Subs2)), + {200, StompClient3} = request(get, ClientPath), + ?assertMatch(#{subscriptions_cnt := 1}, StompClient3), %% kickout {204, _} = request(delete, ClientPath), @@ -844,9 +984,9 @@ serialize(Command, Headers, Body) -> parse(Data) -> ProtoEnv = #{ - max_headers => 10, - max_header_length => 1024, - max_body_length => 8192 + max_headers => 1024, + max_header_length => 10240, + max_body_length => 81920 }, Parser = emqx_stomp_frame:initial_parse_state(ProtoEnv), emqx_stomp_frame:parse(Data, Parser). @@ -855,3 +995,7 @@ get_field(command, #stomp_frame{command = Command}) -> Command; get_field(body, #stomp_frame{body = Body}) -> Body. + +clients() -> + {200, Clients} = request(get, "/gateways/stomp/clients"), + maps:get(data, Clients). diff --git a/changes/ce/fix-10977.en.md b/changes/ce/fix-10977.en.md new file mode 100644 index 000000000..9bd0d6b60 --- /dev/null +++ b/changes/ce/fix-10977.en.md @@ -0,0 +1 @@ +Fix delay in updating subscription count metric and correct configuration issues in Stomp gateway.