diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 236847000..44c728c78 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -69,7 +69,7 @@ init([SockArgs = {Transport, Sock, _SockFun}, PacketOpts]) -> {ok, Peername} = emqttd_net:peername(Sock), {ok, ConnStr} = emqttd_net:connection_string(Sock, inbound), lager:info("Connect from ~s", [ConnStr]), - ParserState = emqttd_parser:init(PacketOpts), + ParserState = emqtt_parser:init(PacketOpts), ProtoState = emqttd_protocol:init({Transport, NewSock, Peername}, PacketOpts), State = control_throttle(#state{transport = Transport, socket = NewSock, @@ -183,7 +183,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts, parse_state = ParseState, proto_state = ProtoState, conn_name = ConnStr}) -> - case emqttd_parser:parse(Bytes, ParseState) of + case emqtt_parser:parse(Bytes, ParseState) of {more, ParseState1} -> {noreply, control_throttle(State #state{parse_state = ParseState1}), @@ -192,7 +192,7 @@ process_received_bytes(Bytes, State = #state{packet_opts = PacketOpts, received_stats(Packet), case emqttd_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - process_received_bytes(Rest, State#state{parse_state = emqttd_parser:init(PacketOpts), + process_received_bytes(Rest, State#state{parse_state = emqtt_parser:init(PacketOpts), proto_state = ProtoState1}); {error, Error} -> lager:error("MQTT protocol error ~p for connection ~p~n", [Error, ConnStr]), diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 64caf9853..694a6ec35 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -232,7 +232,7 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername}) when is_record(Packet, mqtt_packet) -> trace(send, Packet, State), sent_stats(Packet), - Data = emqttd_serialiser:serialise(Packet), + Data = emqtt_serialiser:serialise(Packet), lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]), emqttd_metrics:inc('bytes/sent', size(Data)), Transport:send(Sock, Data), diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 2f3f78a2a..6f1a0960c 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -187,7 +187,7 @@ subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topi SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics), {ok, GrantedQos} = emqttd_pubsub:subscribe(Topics), %%TODO: should be gen_event and notification... - emqttd_msg_store:redeliver([Name || {Name, _} <- Topics], self()), + [emqttd_msg_store:redeliver(Name, self()) || {Name, _} <- Topics], {ok, SessState#session_state{submap = SubMap1}, GrantedQos}; subscribe(SessPid, Topics) when is_pid(SessPid) ->