dump
This commit is contained in:
parent
899569dd34
commit
8bbce8d0b7
|
@ -84,6 +84,7 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-record(mqtt_packet_connect, {
|
-record(mqtt_packet_connect, {
|
||||||
proto_ver,
|
proto_ver,
|
||||||
|
proto_name,
|
||||||
will_retain,
|
will_retain,
|
||||||
will_qos,
|
will_qos,
|
||||||
will_flag,
|
will_flag,
|
||||||
|
|
|
@ -75,7 +75,7 @@ handle_call({go, Sock}, _From, #state{socket = Sock}) ->
|
||||||
conn_state = running,
|
conn_state = running,
|
||||||
conserve = false,
|
conserve = false,
|
||||||
parse_state = emqtt_packet:initial_state(),
|
parse_state = emqtt_packet:initial_state(),
|
||||||
proto_state = emqtt_protocol:initial_state(Sock)}), 10000};
|
proto_state = emqtt_protocol:initial_state(Sock, Peername)}), 10000};
|
||||||
|
|
||||||
handle_call(info, _From, State = #state{
|
handle_call(info, _From, State = #state{
|
||||||
conn_name=ConnName, proto_state = ProtoState}) ->
|
conn_name=ConnName, proto_state = ProtoState}) ->
|
||||||
|
|
|
@ -31,6 +31,8 @@
|
||||||
|
|
||||||
-export([parse/2, serialise/1]).
|
-export([parse/2, serialise/1]).
|
||||||
|
|
||||||
|
-export([dump/1]).
|
||||||
|
|
||||||
-define(MAX_LEN, 16#fffffff).
|
-define(MAX_LEN, 16#fffffff).
|
||||||
-define(HIGHBIT, 2#10000000).
|
-define(HIGHBIT, 2#10000000).
|
||||||
-define(LOWBITS, 2#01111111).
|
-define(LOWBITS, 2#01111111).
|
||||||
|
@ -86,6 +88,7 @@ parse_frame(Bin, #mqtt_packet_header{ type = Type,
|
||||||
wrap(Header,
|
wrap(Header,
|
||||||
#mqtt_packet_connect{
|
#mqtt_packet_connect{
|
||||||
proto_ver = ProtoVersion,
|
proto_ver = ProtoVersion,
|
||||||
|
proto_name = ProtoName,
|
||||||
will_retain = bool(WillRetain),
|
will_retain = bool(WillRetain),
|
||||||
will_qos = WillQos,
|
will_qos = WillQos,
|
||||||
will_flag = bool(WillFlag),
|
will_flag = bool(WillFlag),
|
||||||
|
@ -255,3 +258,78 @@ opt(X) when is_integer(X) -> X.
|
||||||
protocol_name_approved(Ver, Name) ->
|
protocol_name_approved(Ver, Name) ->
|
||||||
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
||||||
|
|
||||||
|
dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) when
|
||||||
|
Payload =:= undefined orelse Payload =:= <<>> ->
|
||||||
|
dump_header(Header, dump_variable(Variable));
|
||||||
|
|
||||||
|
dump(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->
|
||||||
|
dump_header(Header, dump_variable(Variable, Payload)).
|
||||||
|
|
||||||
|
dump_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, retain = Retain}, S) ->
|
||||||
|
io_lib:format("~s(Qos=~p, Retain=~s, Dup=~s, ~s)", [dump_type(Type), QoS, Retain, Dup, S]).
|
||||||
|
|
||||||
|
dump_variable( #mqtt_packet_connect {
|
||||||
|
proto_ver = ProtoVer,
|
||||||
|
proto_name = ProtoName,
|
||||||
|
will_retain = WillRetain,
|
||||||
|
will_qos = WillQoS,
|
||||||
|
will_flag = WillFlag,
|
||||||
|
clean_sess = CleanSess,
|
||||||
|
keep_alive = KeepAlive,
|
||||||
|
client_id = ClientId,
|
||||||
|
will_topic = WillTopic,
|
||||||
|
will_msg = WillMsg,
|
||||||
|
username = Username,
|
||||||
|
password = Password} ) ->
|
||||||
|
io_lib:format("ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanSess=~s, KeepAlive=~p, Username=~s, Password=~s",
|
||||||
|
[ClientId, ProtoName, ProtoVer, CleanSess, KeepAlive, Username, Password]); %%TODO: Will
|
||||||
|
|
||||||
|
dump_variable( #mqtt_packet_connack {
|
||||||
|
ack_flags = AckFlags,
|
||||||
|
return_code = ReturnCode } ) ->
|
||||||
|
io_lib:format("AckFlags=~p, RetainCode=~p", [AckFlags, ReturnCode]);
|
||||||
|
|
||||||
|
dump_variable( #mqtt_packet_publish {
|
||||||
|
topic_name = TopicName,
|
||||||
|
packet_id = PacketId} ) ->
|
||||||
|
io_lib:format("TopicName=~s, PacketId=~p", [TopicName, PacketId]);
|
||||||
|
|
||||||
|
dump_variable( #mqtt_packet_puback {
|
||||||
|
packet_id = PacketId } ) ->
|
||||||
|
io_lib:format("PacketId=~p", [PacketId]);
|
||||||
|
|
||||||
|
dump_variable( #mqtt_packet_subscribe {
|
||||||
|
packet_id = PacketId,
|
||||||
|
topic_table = TopicTable }) ->
|
||||||
|
L = [{Name, QoS} || #mqtt_topic{name = Name, qos = QoS} <- TopicTable],
|
||||||
|
io_lib:format("PacketId=~p, TopicTable=~p", [PacketId, L]);
|
||||||
|
|
||||||
|
dump_variable( #mqtt_packet_suback {
|
||||||
|
packet_id = PacketId,
|
||||||
|
qos_table = QosTable} ) ->
|
||||||
|
io_lib:format("PacketId=~p, QosTable=~p", [PacketId, QosTable]);
|
||||||
|
|
||||||
|
dump_variable(PacketId) when is_integer(PacketId) ->
|
||||||
|
io_lib:format("PacketId=~p", [PacketId]);
|
||||||
|
|
||||||
|
%%TODO: not right...
|
||||||
|
dump_variable(undefined) -> "".
|
||||||
|
|
||||||
|
dump_variable(Variable, Payload) ->
|
||||||
|
dump_variable(Variable).
|
||||||
|
|
||||||
|
dump_type(?CONNECT) -> "CONNECT";
|
||||||
|
dump_type(?CONNACK) -> "CONNACK";
|
||||||
|
dump_type(?PUBLISH) -> "PUBLISH";
|
||||||
|
dump_type(?PUBACK) -> "PUBACK";
|
||||||
|
dump_type(?PUBREC) -> "PUBREC";
|
||||||
|
dump_type(?PUBREL) -> "PUBREL";
|
||||||
|
dump_type(?PUBCOMP) -> "PUBCOMP";
|
||||||
|
dump_type(?SUBSCRIBE) -> "SUBSCRIBE";
|
||||||
|
dump_type(?SUBACK) -> "SUBACK";
|
||||||
|
dump_type(?UNSUBSCRIBE) -> "UNSUBSCRIBE";
|
||||||
|
dump_type(?UNSUBACK) -> "UNSUBACK";
|
||||||
|
dump_type(?PINGREQ) -> "PINGREQ";
|
||||||
|
dump_type(?PINGRESP) -> "PINGRESP";
|
||||||
|
dump_type(?DISCONNECT) -> "DISCONNECT".
|
||||||
|
|
||||||
|
|
|
@ -26,9 +26,12 @@
|
||||||
|
|
||||||
-include("emqtt_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
-record(proto_state, {
|
-record(proto_state, {
|
||||||
socket,
|
socket,
|
||||||
connected = false, %received CONNECT action?
|
peer_name,
|
||||||
|
connected = false, %received CONNECT action?
|
||||||
|
proto_vsn,
|
||||||
|
proto_name,
|
||||||
packet_id,
|
packet_id,
|
||||||
client_id,
|
client_id,
|
||||||
clean_sess,
|
clean_sess,
|
||||||
|
@ -40,7 +43,7 @@
|
||||||
|
|
||||||
-type proto_state() :: #proto_state{}.
|
-type proto_state() :: #proto_state{}.
|
||||||
|
|
||||||
-export([initial_state/1]).
|
-export([initial_state/2]).
|
||||||
|
|
||||||
-export([handle_packet/2, send_packet/2, client_terminated/1]).
|
-export([handle_packet/2, send_packet/2, client_terminated/1]).
|
||||||
|
|
||||||
|
@ -49,24 +52,29 @@
|
||||||
-define(PACKET_TYPE(Packet, Type),
|
-define(PACKET_TYPE(Packet, Type),
|
||||||
Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}).
|
Packet = #mqtt_packet { header = #mqtt_packet_header { type = Type }}).
|
||||||
|
|
||||||
initial_state(Socket) ->
|
initial_state(Socket, Peername) ->
|
||||||
#proto_state{
|
#proto_state{
|
||||||
socket = Socket,
|
socket = Socket,
|
||||||
|
peer_name = Peername,
|
||||||
packet_id = 1,
|
packet_id = 1,
|
||||||
subscriptions = [],
|
subscriptions = [],
|
||||||
awaiting_ack = gb_trees:empty(),
|
awaiting_ack = gb_trees:empty(),
|
||||||
awaiting_rel = gb_trees:empty()
|
awaiting_rel = gb_trees:empty()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
info(#proto_state{ packet_id = PacketId,
|
info(#proto_state{ proto_vsn = ProtoVsn,
|
||||||
|
proto_name = ProtoName,
|
||||||
|
packet_id = PacketId,
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
will_msg = WillMsg,
|
will_msg = WillMsg,
|
||||||
subscriptions= Subs }) ->
|
subscriptions= Subs }) ->
|
||||||
[ {packet_id, PacketId},
|
[ {packet_id, PacketId},
|
||||||
{client_id, ClientId},
|
{proto_vsn, ProtoVsn},
|
||||||
{clean_sess, CleanSess},
|
{proto_name, ProtoName},
|
||||||
{will_msg, WillMsg},
|
{client_id, ClientId},
|
||||||
|
{clean_sess, CleanSess},
|
||||||
|
{will_msg, WillMsg},
|
||||||
{subscriptions, Subs} ].
|
{subscriptions, Subs} ].
|
||||||
|
|
||||||
-spec handle_packet(Packet, State) -> {ok, NewState} | {error, any()} when
|
-spec handle_packet(Packet, State) -> {ok, NewState} | {error, any()} when
|
||||||
|
@ -80,7 +88,7 @@ info(#proto_state{ packet_id = PacketId,
|
||||||
handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = false}) ->
|
handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = false}) ->
|
||||||
handle_packet(?CONNECT, Packet, State#proto_state{connected = true});
|
handle_packet(?CONNECT, Packet, State#proto_state{connected = true});
|
||||||
|
|
||||||
handle_packet(?PACKET_TYPE(Packet, ?CONNECT), State = #proto_state{connected = true}) ->
|
handle_packet(?PACKET_TYPE(_Packet, ?CONNECT), State = #proto_state{connected = true}) ->
|
||||||
{error, protocol_bad_connect, State};
|
{error, protocol_bad_connect, State};
|
||||||
|
|
||||||
%%Received other packets when CONNECT not arrived.
|
%%Received other packets when CONNECT not arrived.
|
||||||
|
@ -88,8 +96,8 @@ handle_packet(_Packet, State = #proto_state{connected = false}) ->
|
||||||
{error, protocol_not_connected, State};
|
{error, protocol_not_connected, State};
|
||||||
|
|
||||||
handle_packet(?PACKET_TYPE(Packet, Type),
|
handle_packet(?PACKET_TYPE(Packet, Type),
|
||||||
State = #proto_state{client_id = ClientId}) ->
|
State = #proto_state { peer_name = PeerName, client_id = ClientId }) ->
|
||||||
lager:info("packet from ~s: ~p", [ClientId, Packet]),
|
lager:info("RECV from ~s/~s: ~s", [PeerName, ClientId, emqtt_packet:dump(Packet)]),
|
||||||
case validate_packet(Type, Packet) of
|
case validate_packet(Type, Packet) of
|
||||||
ok ->
|
ok ->
|
||||||
handle_packet(Type, Packet, State);
|
handle_packet(Type, Packet, State);
|
||||||
|
@ -97,29 +105,28 @@ handle_packet(?PACKET_TYPE(Packet, Type),
|
||||||
{error, Reason, State}
|
{error, Reason, State}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
handle_packet(?CONNECT, #mqtt_packet {
|
handle_packet(?CONNECT, Packet = #mqtt_packet {
|
||||||
variable = #mqtt_packet_connect {
|
variable = #mqtt_packet_connect {
|
||||||
username = Username,
|
username = Username,
|
||||||
password = Password,
|
password = Password,
|
||||||
proto_ver = ProtoVersion,
|
proto_ver = ProtoVersion,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
keep_alive = KeepAlive,
|
keep_alive = KeepAlive,
|
||||||
client_id = ClientId } = Var },
|
client_id = ClientId } = Var },
|
||||||
State0 = #proto_state{socket = Sock}) ->
|
State = #proto_state{ peer_name = PeerName} ) ->
|
||||||
|
lager:info("RECV from ~s/~s: ~s", [PeerName, ClientId, emqtt_packet:dump(Packet)]),
|
||||||
State = State0#proto_state{client_id = ClientId},
|
|
||||||
{ReturnCode, State1} =
|
{ReturnCode, State1} =
|
||||||
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
|
case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
|
||||||
valid_client_id(ClientId)} of
|
valid_client_id(ClientId)} of
|
||||||
{false, _} ->
|
{false, _} ->
|
||||||
{?CONNACK_PROTO_VER, State};
|
{?CONNACK_PROTO_VER, State#proto_state{client_id = ClientId}};
|
||||||
{_, false} ->
|
{_, false} ->
|
||||||
{?CONNACK_INVALID_ID, State};
|
{?CONNACK_INVALID_ID, State#proto_state{client_id = ClientId}};
|
||||||
_ ->
|
_ ->
|
||||||
case emqtt_auth:check(Username, Password) of
|
case emqtt_auth:check(Username, Password) of
|
||||||
false ->
|
false ->
|
||||||
lager:error("MQTT login failed - no credentials"),
|
lager:error("MQTT login failed - no credentials"),
|
||||||
{?CONNACK_CREDENTIALS, State};
|
{?CONNACK_CREDENTIALS, State#proto_state{client_id = ClientId}};
|
||||||
true ->
|
true ->
|
||||||
lager:info("connect from clientid: ~s, keepalive: ~p", [ClientId, KeepAlive]),
|
lager:info("connect from clientid: ~s, keepalive: ~p", [ClientId, KeepAlive]),
|
||||||
start_keepalive(KeepAlive),
|
start_keepalive(KeepAlive),
|
||||||
|
@ -129,10 +136,9 @@ handle_packet(?CONNECT, #mqtt_packet {
|
||||||
client_id = ClientId }}
|
client_id = ClientId }}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
lager:info("[SENT] MQTT CONNACK: ~p", [ReturnCode]),
|
send_packet( #mqtt_packet {
|
||||||
send_packet(Sock, #mqtt_packet {
|
header = #mqtt_packet_header { type = ?CONNACK },
|
||||||
header = #mqtt_packet_header { type = ?CONNACK },
|
variable = #mqtt_packet_connack{ return_code = ReturnCode }}, State ),
|
||||||
variable = #mqtt_packet_connack{ return_code = ReturnCode }}),
|
|
||||||
{ok, State1};
|
{ok, State1};
|
||||||
|
|
||||||
handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
||||||
|
@ -143,21 +149,19 @@ handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
||||||
handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
||||||
header = #mqtt_packet_header { qos = ?QOS_1 },
|
header = #mqtt_packet_header { qos = ?QOS_1 },
|
||||||
variable = #mqtt_packet_publish{packet_id = PacketId}},
|
variable = #mqtt_packet_publish{packet_id = PacketId}},
|
||||||
State=#proto_state{socket=Sock}) ->
|
State) ->
|
||||||
emqtt_router:route(make_message(Packet)),
|
emqtt_router:route(make_message(Packet)),
|
||||||
send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header{ type = ?PUBACK },
|
send_packet( make_packet(?PUBACK, PacketId), State ),
|
||||||
variable = #mqtt_packet_puback { packet_id = PacketId}}),
|
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
handle_packet(?PUBLISH, Packet = #mqtt_packet {
|
||||||
header = #mqtt_packet_header { qos = ?QOS_2 },
|
header = #mqtt_packet_header { qos = ?QOS_2 },
|
||||||
variable = #mqtt_packet_publish{packet_id = PacketId}},
|
variable = #mqtt_packet_publish{packet_id = PacketId}},
|
||||||
State=#proto_state{socket=Sock}) ->
|
State) ->
|
||||||
%%FIXME: this is not right...should store it first...
|
%%FIXME: this is not right...should store it first...
|
||||||
emqtt_router:route(make_message(Packet)),
|
emqtt_router:route(make_message(Packet)),
|
||||||
put({msg, PacketId}, pubrec),
|
put({msg, PacketId}, pubrec),
|
||||||
send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header{ type = ?PUBREC },
|
send_packet( make_packet(?PUBREC, PacketId), State ),
|
||||||
variable = #mqtt_packet_puback { packet_id = PacketId}}),
|
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_packet(?PUBACK, #mqtt_packet {}, State) ->
|
handle_packet(?PUBACK, #mqtt_packet {}, State) ->
|
||||||
|
@ -165,25 +169,20 @@ handle_packet(?PUBACK, #mqtt_packet {}, State) ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_packet(?PUBREC, #mqtt_packet {
|
handle_packet(?PUBREC, #mqtt_packet {
|
||||||
variable = #mqtt_packet_puback { packet_id = PktId }},
|
variable = #mqtt_packet_puback { packet_id = PacketId }},
|
||||||
State=#proto_state{socket=Sock}) ->
|
State) ->
|
||||||
%FIXME Later: should release the message here
|
%FIXME Later: should release the message here
|
||||||
send_packet(Sock, #mqtt_packet {
|
send_packet( make_packet(?PUBREL, PacketId), State ),
|
||||||
header = #mqtt_packet_header { type = ?PUBREL},
|
|
||||||
variable = #mqtt_packet_puback { packet_id = PktId}}),
|
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_packet(?PUBREL, #mqtt_packet {
|
handle_packet(?PUBREL, #mqtt_packet { variable = #mqtt_packet_puback { packet_id = PacketId}}, State) ->
|
||||||
variable = #mqtt_packet_puback { packet_id = PktId}},
|
|
||||||
State=#proto_state{socket=Sock}) ->
|
|
||||||
%%FIXME: not right...
|
%%FIXME: not right...
|
||||||
erase({msg, PktId}),
|
erase({msg, PacketId}),
|
||||||
send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header { type = ?PUBCOMP},
|
send_packet( make_packet(?PUBCOMP, PacketId), State ),
|
||||||
variable = #mqtt_packet_puback { packet_id = PktId}}),
|
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_packet(?PUBCOMP, #mqtt_packet {
|
handle_packet(?PUBCOMP, #mqtt_packet {
|
||||||
variable = #mqtt_packet_puback{packet_id = _PktId}}, State) ->
|
variable = #mqtt_packet_puback{packet_id = _PacketId}}, State) ->
|
||||||
%TODO: fixme later
|
%TODO: fixme later
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
|
@ -192,7 +191,7 @@ handle_packet(?SUBSCRIBE, #mqtt_packet {
|
||||||
packet_id = PacketId,
|
packet_id = PacketId,
|
||||||
topic_table = Topics},
|
topic_table = Topics},
|
||||||
payload = undefined},
|
payload = undefined},
|
||||||
State = #proto_state{socket=Sock}) ->
|
State) ->
|
||||||
|
|
||||||
%%FIXME: this is not right...
|
%%FIXME: this is not right...
|
||||||
[emqtt_pubsub:subscribe({Name, Qos}, self()) ||
|
[emqtt_pubsub:subscribe({Name, Qos}, self()) ||
|
||||||
|
@ -200,10 +199,10 @@ handle_packet(?SUBSCRIBE, #mqtt_packet {
|
||||||
|
|
||||||
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
|
GrantedQos = [Qos || #mqtt_topic{qos=Qos} <- Topics],
|
||||||
|
|
||||||
send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK },
|
send_packet(#mqtt_packet { header = #mqtt_packet_header { type = ?SUBACK },
|
||||||
variable = #mqtt_packet_suback{
|
variable = #mqtt_packet_suback{
|
||||||
packet_id = PacketId,
|
packet_id = PacketId,
|
||||||
qos_table = GrantedQos }}),
|
qos_table = GrantedQos }}, State),
|
||||||
|
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
|
@ -212,27 +211,30 @@ handle_packet(?UNSUBSCRIBE, #mqtt_packet {
|
||||||
packet_id = PacketId,
|
packet_id = PacketId,
|
||||||
topic_table = Topics },
|
topic_table = Topics },
|
||||||
payload = undefined},
|
payload = undefined},
|
||||||
State = #proto_state{socket = Sock, client_id = ClientId}) ->
|
State = #proto_state{client_id = ClientId}) ->
|
||||||
|
|
||||||
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
|
[emqtt_pubsub:unsubscribe(Name, self()) || #mqtt_topic{name=Name} <- Topics],
|
||||||
|
|
||||||
send_packet(Sock, #mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK },
|
send_packet(#mqtt_packet { header = #mqtt_packet_header {type = ?UNSUBACK },
|
||||||
variable = #mqtt_packet_suback{packet_id = PacketId }}),
|
variable = #mqtt_packet_suback{packet_id = PacketId }}, State),
|
||||||
|
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_packet(?PINGREQ, #mqtt_packet{}, #proto_state{socket=Sock}=State) ->
|
handle_packet(?PINGREQ, #mqtt_packet{}, State) ->
|
||||||
send_packet(Sock, make_packet(?PINGRESP)),
|
send_packet(make_packet(?PINGRESP), State),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle_packet(?DISCONNECT, #mqtt_packet{}, State=#proto_state{client_id=ClientId}) ->
|
handle_packet(?DISCONNECT, #mqtt_packet{}, State=#proto_state{peer_name = PeerName, client_id = ClientId}) ->
|
||||||
lager:info("~s disconnected", [ClientId]),
|
lager:info("RECV from ~s/~s: DISCONNECT", [PeerName, ClientId]),
|
||||||
{stop, State}.
|
{stop, State}.
|
||||||
|
|
||||||
|
|
||||||
make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT ->
|
make_packet(Type) when Type >= ?CONNECT andalso Type =< ?DISCONNECT ->
|
||||||
#mqtt_packet{ header = #mqtt_packet_header { type = Type } }.
|
#mqtt_packet{ header = #mqtt_packet_header { type = Type } }.
|
||||||
|
|
||||||
|
make_packet(PubAck, PacketId) when PubAck >= ?PUBACK andalso PubAck =< ?PUBREC ->
|
||||||
|
#mqtt_packet { header = #mqtt_packet_header { type = PubAck},
|
||||||
|
variable = #mqtt_packet_puback { packet_id = PacketId}}.
|
||||||
|
|
||||||
-spec send_message(Message, State) -> {ok, NewState} when
|
-spec send_message(Message, State) -> {ok, NewState} when
|
||||||
Message :: mqtt_message(),
|
Message :: mqtt_message(),
|
||||||
State :: proto_state(),
|
State :: proto_state(),
|
||||||
|
@ -244,7 +246,7 @@ send_message(Message = #mqtt_message{
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
dup = Dup,
|
dup = Dup,
|
||||||
payload = Payload},
|
payload = Payload},
|
||||||
State = #proto_state{socket = Sock, packet_id = PacketId}) ->
|
State = #proto_state{packet_id = PacketId}) ->
|
||||||
|
|
||||||
Packet = #mqtt_packet {
|
Packet = #mqtt_packet {
|
||||||
header = #mqtt_packet_header {
|
header = #mqtt_packet_header {
|
||||||
|
@ -260,7 +262,7 @@ send_message(Message = #mqtt_message{
|
||||||
end },
|
end },
|
||||||
payload = Payload},
|
payload = Payload},
|
||||||
|
|
||||||
send_packet(Sock, Packet),
|
send_packet(Packet, State),
|
||||||
if
|
if
|
||||||
Qos == ?QOS_0 ->
|
Qos == ?QOS_0 ->
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
@ -268,8 +270,8 @@ send_message(Message = #mqtt_message{
|
||||||
{ok, next_packet_id(State)}
|
{ok, next_packet_id(State)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
send_packet(Sock, Packet) ->
|
send_packet(Packet, #proto_state{socket = Sock, peer_name = PeerName, client_id = ClientId}) ->
|
||||||
lager:info("send packet:~p", [Packet]),
|
lager:info("SENT to ~s/~s: ~p", [PeerName, ClientId, emqtt_packet:dump(Packet)]),
|
||||||
%%FIXME Later...
|
%%FIXME Later...
|
||||||
erlang:port_command(Sock, emqtt_packet:serialise(Packet)).
|
erlang:port_command(Sock, emqtt_packet:serialise(Packet)).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue