emqx/apps/emqx_stomp/src/emqx_stomp_frame.erl

257 lines
8.6 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc
%%
%% Stomp Frame:
%%
%% COMMAND
%% header1:value1
%% header2:value2
%%
%% Body^@
%%
%% Stomp 1.2 BNF grammar:
%%
%% NULL = <US-ASCII null (octet 0)>
%% LF = <US-ASCII line feed (aka newline) (octet 10)>
%% CR = <US-ASCII carriage return (octet 13)>
%% EOL = [CR] LF
%% OCTET = <any 8-bit sequence of data>
%%
%% frame-stream = 1*frame
%%
%% frame = command EOL
%% *( header EOL )
%% EOL
%% *OCTET
%% NULL
%% *( EOL )
%%
%% command = client-command | server-command
%%
%% client-command = "SEND"
%% | "SUBSCRIBE"
%% | "UNSUBSCRIBE"
%% | "BEGIN"
%% | "COMMIT"
%% | "ABORT"
%% | "ACK"
%% | "NACK"
%% | "DISCONNECT"
%% | "CONNECT"
%% | "STOMP"
%%
%% server-command = "CONNECTED"
%% | "MESSAGE"
%% | "RECEIPT"
%% | "ERROR"
%%
%% header = header-name ":" header-value
%% header-name = 1*<any OCTET except CR or LF or ":">
%% header-value = *<any OCTET except CR or LF or ":">
%%
%% @end
-module(emqx_stomp_frame).
-include("emqx_stomp.hrl").
-export([ init_parer_state/1
, parse/2
, serialize/1
]).
-export([ make/2
, make/3
, format/1
]).
-define(NULL, 0).
-define(CR, $\r).
-define(LF, $\n).
-define(BSL, $\\).
-define(COLON, $:).
-define(IS_ESC(Ch), Ch == ?CR; Ch == ?LF; Ch == ?BSL; Ch == ?COLON).
-record(parser_state, {cmd,
headers = [],
hdname,
acc = <<>> :: binary(),
limit}).
-record(frame_limit, {max_header_num, max_header_length, max_body_length}).
-type(result() :: {ok, stomp_frame(), binary()}
| {more, parser()}
| {error, any()}).
-type(parser() :: #{phase := none | command | headers | hdname | hdvalue | body,
pre => binary(),
state := #parser_state{}}).
%% @doc Initialize a parser
-spec init_parer_state([proplists:property()]) -> parser().
init_parer_state(Opts) ->
#{phase => none, state => #parser_state{limit = limit(Opts)}}.
limit(Opts) ->
#frame_limit{max_header_num = g(max_header_num, Opts, ?MAX_HEADER_NUM),
max_header_length = g(max_header_length, Opts, ?MAX_BODY_LENGTH),
max_body_length = g(max_body_length, Opts, ?MAX_BODY_LENGTH)}.
g(Key, Opts, Val) ->
proplists:get_value(Key, Opts, Val).
-spec parse(binary(), parser()) -> result().
parse(<<>>, Parser) ->
{more, Parser};
parse(Bytes, #{phase := body, len := Len, state := State}) ->
parse(body, Bytes, State, Len);
parse(Bytes, Parser = #{pre := Pre}) ->
parse(<<Pre/binary, Bytes/binary>>, maps:without([pre], Parser));
parse(<<?CR, Rest/binary>>, Parser = #{phase := none}) ->
parse(Rest, Parser);
parse(<<?LF, Rest/binary>>, Parser = #{phase := none}) ->
case byte_size(Rest) of
0 -> {more, Parser};
_ -> parse(Rest, Parser)
end;
parse(<<?CR, ?LF, Rest/binary>>, #{phase := Phase, state := State}) ->
parse(Phase, <<?LF, Rest/binary>>, State);
parse(<<?CR>>, Parser) ->
{more, Parser#{pre => <<?CR>>}};
parse(<<?CR, _Ch:8, _Rest/binary>>, _Parser) ->
{error, linefeed_expected};
parse(<<?BSL>>, Parser = #{phase := Phase}) when Phase =:= hdname; Phase =:= hdvalue ->
{more, Parser#{pre => <<?BSL>>}};
parse(<<?BSL, Ch:8, Rest/binary>>, #{phase := Phase, state := State}) when Phase =:= hdname; Phase =:= hdvalue ->
parse(Phase, Rest, acc(unescape(Ch), State));
parse(Bytes, #{phase := none, state := State}) ->
parse(command, Bytes, State).
%% @private
parse(command, <<?LF, Rest/binary>>, State = #parser_state{acc = Acc}) ->
parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
parse(command, <<Ch:8, Rest/binary>>, State) ->
parse(command, Rest, acc(Ch, State));
parse(headers, <<?LF, Rest/binary>>, State) ->
parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
parse(headers, Bin, State) ->
parse(hdname, Bin, State);
parse(hdname, <<?LF, _Rest/binary>>, _State) ->
{error, unexpected_linefeed};
parse(hdname, <<?COLON, Rest/binary>>, State = #parser_state{acc = Acc}) ->
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
parse(hdname, <<Ch:8, Rest/binary>>, State) ->
parse(hdname, Rest, acc(Ch, State));
parse(hdvalue, <<?LF, Rest/binary>>, State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
parse(headers, Rest, State#parser_state{headers = add_header(Name, Acc, Headers), hdname = undefined, acc = <<>>});
parse(hdvalue, <<Ch:8, Rest/binary>>, State) ->
parse(hdvalue, Rest, acc(Ch, State)).
%% @private
parse(body, <<>>, State, Length) ->
{more, #{phase => body, length => Length, state => State}};
parse(body, Bin, State, none) ->
case binary:split(Bin, <<?NULL>>) of
[Chunk, Rest] ->
{ok, new_frame(acc(Chunk, State)), Rest};
[Chunk] ->
{more, #{phase => body, length => none, state => acc(Chunk, State)}}
end;
parse(body, Bin, State, Len) when byte_size(Bin) >= (Len+1) ->
<<Chunk:Len/binary, ?NULL, Rest/binary>> = Bin,
{ok, new_frame(acc(Chunk, State)), Rest};
parse(body, Bin, State, Len) ->
{more, #{phase => body, length => Len - byte_size(Bin), state => acc(Bin, State)}}.
add_header(Name, Value, Headers) ->
case lists:keyfind(Name, 1, Headers) of
{Name, _} -> Headers;
false -> [{Name, Value} | Headers]
end.
content_len(#parser_state{headers = Headers}) ->
case lists:keyfind(<<"content-length">>, 1, Headers) of
{_, Val} -> list_to_integer(binary_to_list(Val));
false -> none
end.
new_frame(#parser_state{cmd = Cmd, headers = Headers, acc = Acc}) ->
#stomp_frame{command = Cmd, headers = Headers, body = Acc}.
acc(Chunk, State = #parser_state{acc = Acc}) when is_binary(Chunk) ->
State#parser_state{acc = <<Acc/binary, Chunk/binary>>};
acc(Ch, State = #parser_state{acc = Acc}) ->
State#parser_state{acc = <<Acc/binary, Ch:8>>}.
%% \r (octet 92 and 114) translates to carriage return (octet 13)
%% \n (octet 92 and 110) translates to line feed (octet 10)
%% \c (octet 92 and 99) translates to : (octet 58)
%% \\ (octet 92 and 92) translates to \ (octet 92)
unescape($r) -> ?CR;
unescape($n) -> ?LF;
unescape($c) -> ?COLON;
unescape($\\) -> ?BSL;
unescape(_Ch) -> {error, cannnot_unescape}.
serialize(#stomp_frame{command = Cmd, headers = Headers, body = Body}) ->
Headers1 = lists:keydelete(<<"content-length">>, 1, Headers),
Headers2 =
case iolist_size(Body) of
0 -> Headers1;
Len -> Headers1 ++ [{<<"content-length">>, Len}]
end,
[Cmd, ?LF, [serialize(header, Header) || Header <- Headers2], ?LF, Body, 0].
serialize(header, {Name, Val}) when is_integer(Val) ->
[escape(Name), ?COLON, integer_to_list(Val), ?LF];
serialize(header, {Name, Val}) ->
[escape(Name), ?COLON, escape(Val), ?LF].
escape(Bin) when is_binary(Bin) ->
<< <<(escape(Ch))/binary>> || <<Ch>> <= Bin >>;
escape(?CR) -> <<?BSL, $r>>;
escape(?LF) -> <<?BSL, $n>>;
escape(?BSL) -> <<?BSL, ?BSL>>;
escape(?COLON) -> <<?BSL, $c>>;
escape(Ch) -> <<Ch>>.
%% @doc Make a frame
make(<<"CONNECTED">>, Headers) ->
#stomp_frame{command = <<"CONNECTED">>,
headers = [{<<"server">>, ?STOMP_SERVER} | Headers]};
make(Command, Headers) ->
#stomp_frame{command = Command, headers = Headers}.
make(Command, Headers, Body) ->
#stomp_frame{command = Command, headers = Headers, body = Body}.
%% @doc Format a frame
format(Frame) -> serialize(Frame).