From 95652c77d038275d5a829d341d81c1b2b66b20b5 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Wed, 15 Apr 2015 20:47:28 +0800 Subject: [PATCH] trace --- apps/emqttd/src/emqttd_access_rule.erl | 21 +++++++++-------- apps/emqttd/src/emqttd_plugin.erl | 7 +----- apps/emqttd/src/emqttd_protocol.erl | 31 +++++++++++++++----------- apps/emqttd/src/emqttd_router.erl | 4 ++-- 4 files changed, 31 insertions(+), 32 deletions(-) diff --git a/apps/emqttd/src/emqttd_access_rule.erl b/apps/emqttd/src/emqttd_access_rule.erl index 46524691e..2ad23b56b 100644 --- a/apps/emqttd/src/emqttd_access_rule.erl +++ b/apps/emqttd/src/emqttd_access_rule.erl @@ -119,29 +119,29 @@ match_who(#mqtt_client{ipaddr = undefined}, {ipaddr, _Tup}) -> match_who(#mqtt_client{ipaddr = IP}, {ipaddr, {_CDIR, Start, End}}) -> I = esockd_access:atoi(IP), I >= Start andalso I =< End; -match_who(_User, _Who) -> +match_who(_Client, _Who) -> false. -match_topics(_User, _Topic, []) -> +match_topics(_Client, _Topic, []) -> false; -match_topics(User, Topic, [{pattern, PatternFilter}|Filters]) -> - TopicFilter = feed_var(User, PatternFilter), +match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) -> + TopicFilter = feed_var(Client, PatternFilter), case match_topic(emqtt_topic:words(Topic), TopicFilter) of true -> true; - false -> match_topics(User, Topic, Filters) + false -> match_topics(Client, Topic, Filters) end; -match_topics(User, Topic, [TopicFilter|Filters]) -> +match_topics(Client, Topic, [TopicFilter|Filters]) -> case match_topic(emqtt_topic:words(Topic), TopicFilter) of true -> true; - false -> match_topics(User, Topic, Filters) + false -> match_topics(Client, Topic, Filters) end. match_topic(Topic, TopicFilter) -> emqtt_topic:match(Topic, TopicFilter). -feed_var(User, Pattern) -> - feed_var(User, Pattern, []). -feed_var(_User, [], Acc) -> +feed_var(Client, Pattern) -> + feed_var(Client, Pattern, []). +feed_var(_Client, [], Acc) -> lists:reverse(Acc); feed_var(Client = #mqtt_client{clientid = undefined}, [<<"$c">>|Words], Acc) -> feed_var(Client, Words, [<<"$c">>|Acc]); @@ -154,4 +154,3 @@ feed_var(Client = #mqtt_client{username = Username}, [<<"$u">>|Words], Acc) -> feed_var(Client, [W|Words], Acc) -> feed_var(Client, Words, [W|Acc]). - diff --git a/apps/emqttd/src/emqttd_plugin.erl b/apps/emqttd/src/emqttd_plugin.erl index 650b890c4..8963035e5 100644 --- a/apps/emqttd/src/emqttd_plugin.erl +++ b/apps/emqttd/src/emqttd_plugin.erl @@ -33,7 +33,7 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/0, allow/3]). +-export([start_link/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -46,11 +46,6 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -allow(subscribe, User, Topic) -> - true; -allow(publish, User, Topic) -> - true. - %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 5ec97378f..6390954e6 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -95,9 +95,8 @@ received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> received(_Packet, State = #proto_state{connected = false}) -> {error, protocol_not_connected, State}; -received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername, - clientid = ClientId}) -> - lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]), +received(Packet = ?PACKET(_Type), State) -> + trace(recv, Packet, State), case validate_packet(Packet) of ok -> handle(Packet, State); @@ -114,7 +113,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = keep_alive = KeepAlive, clientid = ClientId} = Var, - lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]), + trace(recv, Packet, State), State1 = State#proto_state{proto_ver = ProtoVer, username = Username, @@ -149,7 +148,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case emqttd_acl:check({client(State), publish, Topic}) of allow -> - emqttd_session:publish(Session, {?QOS_0, emqttd_message:from_packet(Packet)}); + emqttd_session:publish(Session, {?QOS_0, emqtt_message:from_packet(Packet)}); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]) end, @@ -159,7 +158,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case emqttd_acl:check({client(State), publish, Topic}) of allow -> - emqttd_session:publish(Session, {?QOS_1, emqttd_message:from_packet(Packet)}), + emqttd_session:publish(Session, {?QOS_1, emqtt_message:from_packet(Packet)}), send(?PUBACK_PACKET(?PUBACK, PacketId), State); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), @@ -170,7 +169,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case emqttd_acl:check({client(State), publish, Topic}) of allow -> - NewSession = emqttd_session:publish(Session, {?QOS_2, emqttd_message:from_packet(Packet)}), + NewSession = emqttd_session:publish(Session, {?QOS_2, emqtt_message:from_packet(Packet)}), send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), @@ -218,20 +217,20 @@ handle(?PACKET(?DISCONNECT), State) -> -spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}. %% qos0 message send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) -> - send(emqttd_message:to_packet(Message), State); + send(emqtt_message:to_packet(Message), State); %% message from session send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) -> - send(emqttd_message:to_packet(Message), State); + send(emqtt_message:to_packet(Message), State); %% message(qos1, qos2) not from session send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session}) when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) -> {Message1, NewSession} = emqttd_session:store(Session, Message), - send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession}); + send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession}); -send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, clientid = ClientId}) when is_record(Packet, mqtt_packet) -> - lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]), +send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername}) when is_record(Packet, mqtt_packet) -> + trace(send, Packet, State), sent_stats(Packet), Data = emqttd_serialiser:serialise(Packet), lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]), @@ -239,6 +238,12 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername Transport:send(Sock, Data), {ok, State}. +trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> + lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]); + +trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> + lager:debug("SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]). + %% @doc redeliver PUBREL PacketId redeliver({?PUBREL, PacketId}, State) -> send(?PUBREL_PACKET(PacketId), State). @@ -250,7 +255,7 @@ shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg ok. willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> - emqttd_message:from_packet(Packet). + emqtt_message:from_packet(Packet). clientid(<<>>, #proto_state{peername = Peername}) -> <<"eMQTT_", (base64:encode(emqttd_net:format(Peername)))/binary>>; diff --git a/apps/emqttd/src/emqttd_router.erl b/apps/emqttd/src/emqttd_router.erl index 0e6d8efb9..3a442daaf 100644 --- a/apps/emqttd/src/emqttd_router.erl +++ b/apps/emqttd/src/emqttd_router.erl @@ -64,11 +64,11 @@ start_link() -> %%------------------------------------------------------------------------------ -spec route(mqtt_message()) -> ok. route(Msg) -> - lager:debug("Route ~s", [emqttd_message:dump(Msg)]), + lager:debug("Route ~p", [emqtt_message:format(Msg)]), % TODO: need to retain? emqttd_retained:retain(Msg), % unset flag and pubsub - emqttd_pubsub:publish(emqttd_message:unset_flag(Msg)). + emqttd_pubsub:publish(emqtt_message:unset_flag(Msg)). %%%============================================================================= %%% gen_server callbacks