%%-------------------------------------------------------------------- %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% @doc EMQ X Bridge Sysk Frame %%-------------------------------------------------------------------- -module(emqx_bridge_syskeeper_frame). %% API -export([ versions/0, current_version/0, make_state_with_conf/1, make_state/1, encode/3, parse/2, parse_handshake/1 ]). -export([ bool2int/1, int2bool/1, marshaller/1, serialize_variable_byte_integer/1, parse_variable_byte_integer/1 ]). -export_type([state/0, versions/0, handshake/0, forward/0, packet/0]). -include("emqx_bridge_syskeeper.hrl"). -type state() :: #{ handler := atom(), version := versions(), ack => boolean() }. -type versions() :: 1. -type handshake() :: #{type := handshake, version := versions()}. -type forward() :: #{type := forward, ack := boolean(), messages := list(map())}. -type heartbeat() :: #{type := heartbeat}. -type packet() :: handshake() | forward() | heartbeat(). -callback version() -> versions(). -callback encode(packet_type_val(), packet_data(), state()) -> binary(). -callback parse(packet_type(), binary(), state()) -> packet(). -define(HIGHBIT, 2#10000000). -define(LOWBITS, 2#01111111). -define(MULTIPLIER_MAX, 16#200000). -export_type([packet_type/0]). %%------------------------------------------------------------------- %%% API %%------------------------------------------------------------------- -spec versions() -> list(versions()). versions() -> [1]. -spec current_version() -> versions(). current_version() -> 1. -spec make_state_with_conf(map()) -> state(). make_state_with_conf(#{ack_mode := Mode}) -> State = make_state(current_version()), State#{ack => Mode =:= need_ack}. -spec make_state(versions()) -> state(). make_state(Version) -> case lists:member(Version, versions()) of true -> Handler = erlang:list_to_existing_atom( io_lib:format("emqx_bridge_syskeeper_frame_v~B", [Version]) ), #{ handler => Handler, version => Version }; _ -> erlang:throw({unsupport_version, Version}) end. -spec encode(packet_type(), term(), state()) -> binary(). encode(Type, Data, #{handler := Handler} = State) -> Handler:encode(packet_type_val(Type), Data, State). -spec parse(binary(), state()) -> _. parse(<> = Bin, #{handler := Handler} = State) -> Type = to_packet_type(TypeVal), Handler:parse(Type, Bin, State). parse_handshake(Data) -> State = make_state(1), parse_handshake(Data, State). parse_handshake(Data, #{version := Version} = State) -> case parse(Data, State) of {ok, #{type := handshake, version := Version} = Shake} -> {ok, {State, Shake}}; {ok, #{type := handshake, version := NewVersion}} -> State2 = make_state(NewVersion), parse_handshake(Data, State2); Error -> Error end. bool2int(true) -> 1; bool2int(_) -> 0. int2bool(1) -> true; int2bool(_) -> false. marshaller(Item) when is_binary(Item) -> erlang:binary_to_term(Item); marshaller(Item) -> erlang:term_to_binary(Item). serialize_variable_byte_integer(N) when N =< ?LOWBITS -> <<0:1, N:7>>; serialize_variable_byte_integer(N) -> <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. parse_variable_byte_integer(Bin) -> parse_variable_byte_integer(Bin, 1, 0). %%------------------------------------------------------------------- %%% Internal functions %%------------------------------------------------------------------- to_packet_type(?TYPE_HANDSHAKE) -> handshake; to_packet_type(?TYPE_FORWARD) -> forward; to_packet_type(?TYPE_HEARTBEAT) -> heartbeat. packet_type_val(handshake) -> ?TYPE_HANDSHAKE; packet_type_val(forward) -> ?TYPE_FORWARD; packet_type_val(heartbeat) -> ?TYPE_HEARTBEAT. parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value) when Multiplier > ?MULTIPLIER_MAX -> {error, malformed_variable_byte_integer}; parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> {ok, Value + Len * Multiplier, Rest}; parse_variable_byte_integer(<<>>, _Multiplier, _Value) -> {error, incomplete}.