diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index 3508374c4..ccceb0b9d 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -21,12 +21,17 @@ -include("emqttd_protocol.hrl"). --export([make/3, make/4, from_packet/1, from_packet/2, to_packet/1]). +-export([make/3, make/4, from_packet/1, from_packet/2, from_packet/3, + to_packet/1]). -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]). -export([format/1]). +-ifdef(TEST). +-compile(export_all). +-endif. + %% @doc Make a message -spec make(From, Topic, Payload) -> mqtt_message() when From :: atom() | binary(), @@ -72,12 +77,16 @@ from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, from_packet(#mqtt_packet_connect{will_flag = false}) -> undefined; -from_packet(#mqtt_packet_connect{will_retain = Retain, +from_packet(#mqtt_packet_connect{client_id = ClientId, + username = Username, + will_retain = Retain, will_qos = Qos, will_topic = Topic, will_msg = Msg}) -> #mqtt_message{msgid = msgid(Qos), topic = Topic, + from = ClientId, + sender = Username, retain = Retain, qos = Qos, dup = false, @@ -85,7 +94,12 @@ from_packet(#mqtt_packet_connect{will_retain = Retain, timestamp = os:timestamp()}. from_packet(ClientId, Packet) -> - Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}. + Msg = from_packet(Packet), + Msg#mqtt_message{from = ClientId}. + +from_packet(Username, ClientId, Packet) -> + Msg = from_packet(Packet), + Msg#mqtt_message{from = ClientId, sender = Username}. msgid(?QOS_0) -> undefined; @@ -140,10 +154,10 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) -> unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. %% @doc Format MQTT Message -format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From, +format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From, sender = Sender, qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> - io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)", - [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]). + io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Sender=~s, Topic=~s)", + [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Sender, Topic]). i(true) -> 1; i(false) -> 0; diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index ff076f528..85c9fb0bc 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -224,8 +224,9 @@ process(?PACKET(?DISCONNECT), State) -> {stop, normal, State#proto_state{will_msg = undefined}}. publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), - #proto_state{client_id = ClientId, session = Session}) -> - emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)); + #proto_state{client_id = ClientId, username = Username, session = Session}) -> + Msg = emqttd_message:from_packet(Username, ClientId, Packet), + emqttd_session:publish(Session, Msg); publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> with_puback(?PUBACK, Packet, State); @@ -234,8 +235,10 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_2, _PacketId), State) -> with_puback(?PUBREC, Packet, State). with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), - State = #proto_state{client_id = ClientId, session = Session}) -> - Msg = emqttd_message:from_packet(ClientId, Packet), + State = #proto_state{client_id = ClientId, + username = Username, + session = Session}) -> + Msg = emqttd_message:from_packet(Username, ClientId, Packet), case emqttd_session:publish(Session, Msg) of ok -> send(?PUBACK_PACKET(Type, PacketId), State);