chore(stomp): parse \n as heartbeat frame
This commit is contained in:
parent
75dab4dff0
commit
d2b6e41cd1
|
@ -710,10 +710,7 @@ parse_incoming(
|
||||||
) ->
|
) ->
|
||||||
try FrameMod:parse(Data, ParseState) of
|
try FrameMod:parse(Data, ParseState) of
|
||||||
{more, NParseState} ->
|
{more, NParseState} ->
|
||||||
if
|
{Packets, State#state{parse_state = NParseState}};
|
||||||
Data == <<$\n>> -> {[Data], State#state{parse_state = NParseState}};
|
|
||||||
true -> {Packets, State#state{parse_state = NParseState}}
|
|
||||||
end;
|
|
||||||
{ok, Packet, Rest, NParseState} ->
|
{ok, Packet, Rest, NParseState} ->
|
||||||
NState = State#state{parse_state = NParseState},
|
NState = State#state{parse_state = NParseState},
|
||||||
parse_incoming(Rest, [Packet | Packets], NState)
|
parse_incoming(Rest, [Packet | Packets], NState)
|
||||||
|
|
|
@ -654,7 +654,7 @@ handle_in(
|
||||||
end,
|
end,
|
||||||
{ok, Outgoings, Channel};
|
{ok, Outgoings, Channel};
|
||||||
handle_in(
|
handle_in(
|
||||||
<<$\n>>,
|
?PACKET(?CMD_HEARTBEAT),
|
||||||
Channel = #channel{heartbeat = Heartbeat}
|
Channel = #channel{heartbeat = Heartbeat}
|
||||||
) ->
|
) ->
|
||||||
NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, 0, Heartbeat),
|
NewHeartbeat = emqx_stomp_heartbeat:reset(incoming, 0, Heartbeat),
|
||||||
|
|
|
@ -140,6 +140,9 @@ g(Key, Opts, Val) ->
|
||||||
-spec parse(binary(), parse_state()) -> parse_result().
|
-spec parse(binary(), parse_state()) -> parse_result().
|
||||||
parse(<<>>, Parser) ->
|
parse(<<>>, Parser) ->
|
||||||
{more, Parser};
|
{more, Parser};
|
||||||
|
%% treat the \n as a heartbeat frame
|
||||||
|
parse(<<$\n>>, Parser = #{phase := none}) ->
|
||||||
|
{ok, #stomp_frame{command = ?CMD_HEARTBEAT}, <<>>, Parser};
|
||||||
parse(Bytes, #{phase := body, length := Len, state := State}) ->
|
parse(Bytes, #{phase := body, length := Len, state := State}) ->
|
||||||
parse(body, Bytes, State, Len);
|
parse(body, Bytes, State, Len);
|
||||||
parse(<<?LF, Bytes/binary>>, #{phase := hdname, state := State}) ->
|
parse(<<?LF, Bytes/binary>>, #{phase := hdname, state := State}) ->
|
||||||
|
@ -346,9 +349,7 @@ serialize_pkt(
|
||||||
serialize_pkt(header, {Name, Val}) when is_integer(Val) ->
|
serialize_pkt(header, {Name, Val}) when is_integer(Val) ->
|
||||||
[escape(Name), ?COLON, integer_to_list(Val), ?LF];
|
[escape(Name), ?COLON, integer_to_list(Val), ?LF];
|
||||||
serialize_pkt(header, {Name, Val}) ->
|
serialize_pkt(header, {Name, Val}) ->
|
||||||
[escape(Name), ?COLON, escape(Val), ?LF];
|
[escape(Name), ?COLON, escape(Val), ?LF].
|
||||||
serialize_pkt(<<$\n>>, _SerializeOpts) ->
|
|
||||||
<<$\n>>.
|
|
||||||
|
|
||||||
escape(Bin) when is_binary(Bin) ->
|
escape(Bin) when is_binary(Bin) ->
|
||||||
<<<<(escape(Ch))/binary>> || <<Ch>> <= Bin>>;
|
<<<<(escape(Ch))/binary>> || <<Ch>> <= Bin>>;
|
||||||
|
|
Loading…
Reference in New Issue