%%-------------------------------------------------------------------- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- %% @doc %% %% Stomp Frame: %% %% COMMAND %% header1:value1 %% header2:value2 %% %% Body^@ %% %% Stomp 1.2 BNF grammar: %% %% NULL = %% LF = %% CR = %% EOL = [CR] LF %% OCTET = %% %% frame-stream = 1*frame %% %% frame = command EOL %% *( header EOL ) %% EOL %% *OCTET %% NULL %% *( EOL ) %% %% command = client-command | server-command %% %% client-command = "SEND" %% | "SUBSCRIBE" %% | "UNSUBSCRIBE" %% | "BEGIN" %% | "COMMIT" %% | "ABORT" %% | "ACK" %% | "NACK" %% | "DISCONNECT" %% | "CONNECT" %% | "STOMP" %% %% server-command = "CONNECTED" %% | "MESSAGE" %% | "RECEIPT" %% | "ERROR" %% %% header = header-name ":" header-value %% header-name = 1* %% header-value = * %% %% @end -module(emqx_stomp_frame). -include("emqx_stomp.hrl"). -export([ init_parer_state/1 , parse/2 , serialize/1 ]). -export([ make/2 , make/3 , format/1 ]). -define(NULL, 0). -define(CR, $\r). -define(LF, $\n). -define(BSL, $\\). -define(COLON, $:). -define(IS_ESC(Ch), Ch == ?CR; Ch == ?LF; Ch == ?BSL; Ch == ?COLON). -record(parser_state, {cmd, headers = [], hdname, acc = <<>> :: binary(), limit}). -record(frame_limit, {max_header_num, max_header_length, max_body_length}). -type(result() :: {ok, stomp_frame(), binary()} | {more, parser()} | {error, any()}). -type(parser() :: #{phase := none | command | headers | hdname | hdvalue | body, pre => binary(), state := #parser_state{}}). %% @doc Initialize a parser -spec init_parer_state([proplists:property()]) -> parser(). init_parer_state(Opts) -> #{phase => none, state => #parser_state{limit = limit(Opts)}}. limit(Opts) -> #frame_limit{max_header_num = g(max_header_num, Opts, ?MAX_HEADER_NUM), max_header_length = g(max_header_length, Opts, ?MAX_BODY_LENGTH), max_body_length = g(max_body_length, Opts, ?MAX_BODY_LENGTH)}. g(Key, Opts, Val) -> proplists:get_value(Key, Opts, Val). -spec parse(binary(), parser()) -> result(). parse(<<>>, Parser) -> {more, Parser}; parse(Bytes, #{phase := body, len := Len, state := State}) -> parse(body, Bytes, State, Len); parse(Bytes, Parser = #{pre := Pre}) -> parse(<
>, maps:without([pre], Parser));
parse(<>, Parser = #{phase := none}) ->
    parse(Rest, Parser);
parse(<>, Parser = #{phase := none}) ->
    case byte_size(Rest) of
        0 -> {more, Parser};
        _ -> parse(Rest, Parser)
    end;
parse(<>, #{phase := Phase, state := State}) ->
    parse(Phase, <>, State);
parse(<>, Parser) ->
    {more, Parser#{pre => <>}};
parse(<>, _Parser) ->
    {error, linefeed_expected};

parse(<>, Parser = #{phase := Phase}) when Phase =:= hdname; Phase =:= hdvalue ->
    {more, Parser#{pre => <>}};
parse(<>, #{phase := Phase, state := State}) when Phase =:= hdname; Phase =:= hdvalue ->
    parse(Phase, Rest, acc(unescape(Ch), State));

parse(Bytes, #{phase := none, state := State}) ->
    parse(command, Bytes, State).

%% @private
parse(command, <>, State = #parser_state{acc = Acc}) ->
    parse(headers, Rest, State#parser_state{cmd = Acc, acc = <<>>});
parse(command, <>, State) ->
    parse(command, Rest, acc(Ch, State));

parse(headers, <>, State) ->
    parse(body, Rest, State, content_len(State#parser_state{acc = <<>>}));
parse(headers, Bin, State) ->
    parse(hdname, Bin, State);

parse(hdname, <>, _State) ->
    {error, unexpected_linefeed};
parse(hdname, <>, State = #parser_state{acc = Acc}) ->
    parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
parse(hdname, <>, State) ->
    parse(hdname, Rest, acc(Ch, State));

parse(hdvalue, <>, State = #parser_state{headers = Headers, hdname = Name, acc = Acc}) ->
    parse(headers, Rest, State#parser_state{headers = add_header(Name, Acc, Headers), hdname = undefined, acc = <<>>});
parse(hdvalue, <>, State) ->
    parse(hdvalue, Rest, acc(Ch, State)).

%% @private
parse(body, <<>>, State, Length) ->
    {more, #{phase => body, length => Length, state => State}};
parse(body, Bin, State, none) ->
    case binary:split(Bin, <>) of
        [Chunk, Rest] ->
            {ok, new_frame(acc(Chunk, State)), Rest};
        [Chunk] ->
            {more, #{phase => body, length => none, state => acc(Chunk, State)}}
    end;
parse(body, Bin, State, Len) when byte_size(Bin) >= (Len+1) ->
    <> = Bin,
    {ok, new_frame(acc(Chunk, State)), Rest};
parse(body, Bin, State, Len) ->
    {more, #{phase => body, length => Len - byte_size(Bin), state => acc(Bin, State)}}.

add_header(Name, Value, Headers) ->
    case lists:keyfind(Name, 1, Headers) of
        {Name, _} -> Headers;
        false     -> [{Name, Value} | Headers]
    end.

content_len(#parser_state{headers = Headers}) ->
    case lists:keyfind(<<"content-length">>, 1, Headers) of
        {_, Val} -> list_to_integer(binary_to_list(Val));
        false    -> none
    end.

new_frame(#parser_state{cmd = Cmd, headers = Headers, acc = Acc}) ->
    #stomp_frame{command = Cmd, headers = Headers, body = Acc}.

acc(Chunk, State = #parser_state{acc = Acc}) when is_binary(Chunk) ->
    State#parser_state{acc = <>};
acc(Ch, State = #parser_state{acc = Acc}) ->
    State#parser_state{acc = <>}.

%% \r (octet 92 and 114) translates to carriage return (octet 13)
%% \n (octet 92 and 110) translates to line feed (octet 10)
%% \c (octet 92 and 99) translates to : (octet 58)
%% \\ (octet 92 and 92) translates to \ (octet 92)
unescape($r)  -> ?CR;
unescape($n)  -> ?LF;
unescape($c)  -> ?COLON;
unescape($\\) -> ?BSL;
unescape(_Ch) -> {error, cannnot_unescape}.

serialize(#stomp_frame{command = Cmd, headers = Headers, body = Body}) ->
    Headers1 = lists:keydelete(<<"content-length">>, 1, Headers),
    Headers2 =
    case iolist_size(Body) of
        0   -> Headers1;
        Len -> Headers1 ++ [{<<"content-length">>, Len}]
    end,
    [Cmd, ?LF, [serialize(header, Header) || Header <- Headers2], ?LF, Body, 0].

serialize(header, {Name, Val}) when is_integer(Val) ->
    [escape(Name), ?COLON, integer_to_list(Val), ?LF];
serialize(header, {Name, Val}) ->
    [escape(Name), ?COLON, escape(Val), ?LF].

escape(Bin) when is_binary(Bin) ->
    << <<(escape(Ch))/binary>> || <> <= Bin >>;
escape(?CR)    -> <>;
escape(?LF)    -> <>;
escape(?BSL)   -> <>;
escape(?COLON) -> <>;
escape(Ch)     -> <>.


%% @doc Make a frame
make(<<"CONNECTED">>, Headers) ->
    #stomp_frame{command = <<"CONNECTED">>,
                 headers = [{<<"server">>, ?STOMP_SERVER} | Headers]};

make(Command, Headers) ->
    #stomp_frame{command = Command, headers = Headers}.

make(Command, Headers, Body) ->
    #stomp_frame{command = Command, headers = Headers, body = Body}.

%% @doc Format a frame
format(Frame) -> serialize(Frame).