diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index c25104fa5..64caf9853 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -148,7 +148,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case emqttd_acl:check({client(State), publish, Topic}) of allow -> - emqttd_session:publish(Session, {?QOS_0, emqtt_message:from_packet(Packet)}); + emqttd_session:publish(Session, ClientId, {?QOS_0, emqtt_message:from_packet(Packet)}); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]) end, @@ -158,7 +158,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case emqttd_acl:check({client(State), publish, Topic}) of allow -> - emqttd_session:publish(Session, {?QOS_1, emqtt_message:from_packet(Packet)}), + emqttd_session:publish(Session, ClientId, {?QOS_1, emqtt_message:from_packet(Packet)}), send(?PUBACK_PACKET(?PUBACK, PacketId), State); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), @@ -169,7 +169,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case emqttd_acl:check({client(State), publish, Topic}) of allow -> - NewSession = emqttd_session:publish(Session, {?QOS_2, emqtt_message:from_packet(Packet)}), + NewSession = emqttd_session:publish(Session, ClientId, {?QOS_2, emqtt_message:from_packet(Packet)}), send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), @@ -239,10 +239,10 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername {ok, State}. trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> - lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]); + lager:info("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]); trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> - lager:debug("SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]). + lager:info("SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]). %% @doc redeliver PUBREL PacketId redeliver({?PUBREL, PacketId}, State) -> diff --git a/apps/emqttd/src/emqttd_router.erl b/apps/emqttd/src/emqttd_router.erl index 8b88233dc..cdadae663 100644 --- a/apps/emqttd/src/emqttd_router.erl +++ b/apps/emqttd/src/emqttd_router.erl @@ -63,7 +63,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec route(From :: binary() | atom(), Msg :: mqtt_message()) -> ok. route(From, Msg) -> - lager:debug("Route from ~s: ~p", [From, emqtt_message:format(Msg)]), + lager:info("Route ~s from ~s", [emqtt_message:format(Msg), From]), % TODO: retained message should be stored in emqttd_pubsub... % emqttd_retained:retain(Msg), % unset flag and pubsub diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 970d68695..35c00b532 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -33,7 +33,7 @@ %% API Function Exports -export([start/1, resume/3, - publish/2, + publish/3, puback/2, subscribe/2, unsubscribe/2, @@ -101,20 +101,20 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> %% %% @end %%------------------------------------------------------------------------------ --spec publish(session(), {mqtt_qos(), mqtt_message()}) -> session(). -publish(Session = #session_state{clientid = ClientId}, {?QOS_0, Message}) -> +-spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session(). +publish(Session, ClientId, {?QOS_0, Message}) -> emqttd_router:route(ClientId, Message), Session; -publish(Session = #session_state{clientid = ClientId}, {?QOS_1, Message}) -> +publish(Session, ClientId, {?QOS_1, Message}) -> emqttd_router:route(ClientId, Message), Session; -publish(SessState = #session_state{awaiting_rel = AwaitingRel}, +publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId, {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> %% store in awaiting_rel SessState#session_state{awaiting_rel = maps:put(MsgId, Message, AwaitingRel)}; -publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {publish, {?QOS_2, Message}}), +publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) -> + gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}), SessPid. %%------------------------------------------------------------------------------ @@ -310,8 +310,8 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{ msg_queue = emqttd_queue:clear(Queue), expire_timer = undefined}, hibernate}; -handle_cast({publish, {?QOS_2, Message}}, State) -> - NewState = publish(State, {?QOS_2, Message}), +handle_cast({publish, ClientId, {?QOS_2, Message}}, State) -> + NewState = publish(State, ClientId, {?QOS_2, Message}), {noreply, NewState}; handle_cast({puback, PacketId}, State) ->