fix emqttd_parser, emqttd_serialiser
This commit is contained in:
parent
684c562cc7
commit
6fedab1e12
|
@ -69,7 +69,7 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) ->
|
||||||
{ok, Peername} = emqttd_net:peername(Sock),
|
{ok, Peername} = emqttd_net:peername(Sock),
|
||||||
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
|
{ok, ConnStr} = emqttd_net:connection_string(Sock, inbound),
|
||||||
lager:info("Connect from ~s", [ConnStr]),
|
lager:info("Connect from ~s", [ConnStr]),
|
||||||
ParserState = emqttd_parser:init(PacketOpts),
|
ParserState = emqtt_parser:init(PacketOpts),
|
||||||
ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts),
|
ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts),
|
||||||
State = control_throttle(#state{transport = Transport,
|
State = control_throttle(#state{transport = Transport,
|
||||||
socket = NewSock,
|
socket = NewSock,
|
||||||
|
@ -183,7 +183,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts,
|
||||||
parse_state = ParseState,
|
parse_state = ParseState,
|
||||||
proto_state = ProtoState,
|
proto_state = ProtoState,
|
||||||
conn_name = ConnStr}) ->
|
conn_name = ConnStr}) ->
|
||||||
case emqttd_parser:parse(Bytes, ParseState) of
|
case emqtt_parser:parse(Bytes, ParseState) of
|
||||||
{more, ParseState1} ->
|
{more, ParseState1} ->
|
||||||
{noreply,
|
{noreply,
|
||||||
control_throttle(State #state{parse_state = ParseState1}),
|
control_throttle(State #state{parse_state = ParseState1}),
|
||||||
|
@ -192,7 +192,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts,
|
||||||
received_stats(Packet),
|
received_stats(Packet),
|
||||||
case emqttd_protocol:received(Packet, ProtoState) of
|
case emqttd_protocol:received(Packet, ProtoState) of
|
||||||
{ok, ProtoState1} ->
|
{ok, ProtoState1} ->
|
||||||
process_received_bytes(Rest, State#state{parse_state = emqttd_parser:init(PacketOpts),
|
process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(PacketOpts),
|
||||||
proto_state = ProtoState1});
|
proto_state = ProtoState1});
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]),
|
||||||
|
|
|
@ -232,7 +232,7 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session =
|
||||||
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername}) when is_record(Packet, mqtt_packet) ->
|
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername}) when is_record(Packet, mqtt_packet) ->
|
||||||
trace(send, Packet, State),
|
trace(send, Packet, State),
|
||||||
sent_stats(Packet),
|
sent_stats(Packet),
|
||||||
Data = emqttd_serialiser:serialise(Packet),
|
Data = emqtt_serialiser:serialise(Packet),
|
||||||
lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
|
lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
|
||||||
emqttd_metrics:inc('bytes/sent', size(Data)),
|
emqttd_metrics:inc('bytes/sent', size(Data)),
|
||||||
Transport:send(Sock, Data),
|
Transport:send(Sock, Data),
|
||||||
|
|
|
@ -187,7 +187,7 @@ subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topi
|
||||||
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
|
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
|
||||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
|
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
|
||||||
%%TODO: should be gen_event and notification...
|
%%TODO: should be gen_event and notification...
|
||||||
emqttd_msg_store:redeliver([Name || {Name, _} <- Topics], self()),
|
[emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics],
|
||||||
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
|
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
|
||||||
|
|
||||||
subscribe(SessPid, Topics) when is_pid(SessPid) ->
|
subscribe(SessPid, Topics) when is_pid(SessPid) ->
|
||||||
|
|
Loading…
Reference in New Issue