diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index f0b877dc5..a9cc37778 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2012-2017 Feng Lee . +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ -module(emqttd_protocol). +-author("Feng Lee "). + -include("emqttd.hrl"). -include("emqttd_protocol.hrl"). @@ -25,9 +27,11 @@ -import(proplists, [get_value/2, get_value/3]). %% API --export([init/3, info/1, clientid/1, client/1, session/1]). +-export([init/3, info/1, stats/1, clientid/1, client/1, session/1]). --export([received/2, handle/2, send/2, redeliver/2, shutdown/2]). +-export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]). + +-export([received/2, send/2]). -export([process/2]). @@ -39,17 +43,20 @@ session, ws_initial_headers, %% Headers from first HTTP request for websocket client connected_at}). --type proto_state() :: #proto_state{}. +-type(proto_state() :: #proto_state{}). -define(INFO_KEYS, [client_id, username, clean_sess, proto_ver, proto_name, keepalive, will_msg, ws_initial_headers, connected_at]). +-define(STATS_KEYS, [recv_pkt, recv_msg, send_pkt, send_msg]). + -define(LOG(Level, Format, Args, State), lager:Level([{client, State#proto_state.client_id}], "Client(~s@~s): " ++ Format, [State#proto_state.client_id, esockd_net:format(State#proto_state.peername) | Args])). %% @doc Init protocol init(Peername, SendFun, Opts) -> + lists:foreach(fun(K) -> put(K, 0) end, ?STATS_KEYS), MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), WsInitialHeaders = get_value(ws_initial_headers, Opts), #proto_state{peername = Peername, @@ -61,6 +68,9 @@ init(Peername, SendFun, Opts) -> info(ProtoState) -> ?record_to_proplist(proto_state, ProtoState, ?INFO_KEYS). +stats(_ProtoState) -> + [{K, get(K)} || K <- ?STATS_KEYS]. + clientid(#proto_state{client_id = ClientId}) -> ClientId. @@ -115,22 +125,22 @@ received(Packet = ?PACKET(_Type), State) -> {error, Reason, State} end. -handle({subscribe, RawTopicTable}, ProtoState = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> +subscribe(RawTopicTable, ProtoState = #proto_state{client_id = ClientId, + username = Username, + session = Session}) -> TopicTable = parse_topic_table(RawTopicTable), - case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of + case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of {ok, TopicTable1} -> emqttd_session:subscribe(Session, TopicTable1); {stop, _} -> ok end, - {ok, ProtoState}; + {ok, ProtoState}. -handle({unsubscribe, RawTopics}, ProtoState = #proto_state{client_id = ClientId, - username = Username, - session = Session}) -> - case emqttd:run_hooks('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of +unsubscribe(RawTopics, ProtoState = #proto_state{client_id = ClientId, + username = Username, + session = Session}) -> + case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of {ok, TopicTable} -> emqttd_session:unsubscribe(Session, TopicTable); {stop, _} -> @@ -138,6 +148,9 @@ handle({unsubscribe, RawTopics}, ProtoState = #proto_state{client_id = ClientId, end, {ok, ProtoState}. +%% @doc Send PUBREL +pubrel(PacketId, State) -> send(?PUBREL_PACKET(PacketId), State). + process(Packet = ?CONNECT_PACKET(Var), State0) -> #mqtt_packet_connect{proto_ver = ProtoVer, @@ -187,7 +200,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) -> {ReturnCode, false, State1} end, %% Run hooks - emqttd:run_hooks('client.connected', [ReturnCode1], client(State3)), + emqttd_hooks:run('client.connected', [ReturnCode1], client(State3)), %% Send connack send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3), %% stop if authentication failure @@ -220,8 +233,11 @@ process(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); %% TODO: refactor later... -process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session = Session, - client_id = ClientId, username = Username, is_superuser = IsSuperuser}) -> +process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), + State = #proto_state{session = Session, + client_id = ClientId, + username = Username, + is_superuser = IsSuperuser}) -> Client = client(State), TopicTable = parse_topic_table(RawTopicTable), AllowDenies = if IsSuperuser -> []; @@ -232,7 +248,7 @@ process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session ?LOG(error, "Cannot SUBSCRIBE ~p for ACL Deny", [TopicTable], State), send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State); false -> - case emqttd:run_hooks('client.subscribe', [ClientId, Username], TopicTable) of + case emqttd_hooks:run('client.subscribe', [ClientId, Username], TopicTable) of {ok, TopicTable1} -> emqttd_session:subscribe(Session, PacketId, TopicTable1), {ok, State}; {stop, _} -> @@ -244,9 +260,11 @@ process(?SUBSCRIBE_PACKET(PacketId, RawTopicTable), State = #proto_state{session process(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); -process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics), State = #proto_state{ - client_id = ClientId, username = Username, session = Session}) -> - case emqttd:run_hooks('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of +process(?UNSUBSCRIBE_PACKET(PacketId, RawTopics), + State = #proto_state{client_id = ClientId, + username = Username, + session = Session}) -> + case emqttd_hooks:run('client.unsubscribe', [ClientId, Username], parse_topics(RawTopics)) of {ok, TopicTable} -> emqttd_session:unsubscribe(Session, TopicTable); {stop, _} -> @@ -262,7 +280,9 @@ process(?PACKET(?DISCONNECT), State) -> {stop, normal, State#proto_state{will_msg = undefined}}. publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), - #proto_state{client_id = ClientId, username = Username, session = Session}) -> + #proto_state{client_id = ClientId, + username = Username, + session = Session}) -> Msg = emqttd_message:from_packet(Username, ClientId, Packet), emqttd_session:publish(Session, Msg); @@ -287,7 +307,7 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), -spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). send(Msg, State = #proto_state{client_id = ClientId, username = Username}) when is_record(Msg, mqtt_message) -> - emqttd:run_hooks('message.delivered', [ClientId, Username], Msg), + emqttd_hooks:run('message.delivered', [ClientId, Username], Msg), send(emqttd_message:to_packet(Msg), State); send(Packet, State = #proto_state{sendfun = SendFun}) @@ -297,15 +317,15 @@ send(Packet, State = #proto_state{sendfun = SendFun}) SendFun(Packet), {ok, State}. -trace(recv, Packet, ProtoState) -> +trace(recv, Packet = ?PACKET(Type), ProtoState) -> + inc(recv_pkt), ?IF(Type =:= ?PUBLISH, inc(recv_msg), ok), ?LOG(info, "RECV ~s", [emqttd_packet:format(Packet)], ProtoState); -trace(send, Packet, ProtoState) -> +trace(send, Packet = ?PACKET(Type), ProtoState) -> + inc(send_pkt), ?IF(Type =:= ?PUBLISH, inc(send_msg), ok), ?LOG(info, "SEND ~s", [emqttd_packet:format(Packet)], ProtoState). -%% @doc redeliver PUBREL PacketId -redeliver({?PUBREL, PacketId}, State) -> - send(?PUBREL_PACKET(PacketId), State). +inc(Key) -> put(Key, get(Key) + 1). stop_if_auth_failure(RC, State) when RC == ?CONNACK_CREDENTIALS; RC == ?CONNACK_AUTH -> {stop, {shutdown, auth_failure}, State}; @@ -325,7 +345,7 @@ shutdown(Error, State = #proto_state{will_msg = WillMsg}) -> ?LOG(info, "Shutdown for ~p", [Error], State), Client = client(State), send_willmsg(Client, WillMsg), - emqttd:run_hooks('client.disconnected', [Error], Client), + emqttd_hooks:run('client.disconnected', [Error], Client), %% let it down %% emqttd_cm:unreg(ClientId). ok. @@ -375,19 +395,19 @@ validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) -> validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen}) - when (size(ClientId) >= 1) andalso (size(ClientId) =< MaxLen) -> + when (byte_size(ClientId) >= 1) andalso (byte_size(ClientId) =< MaxLen) -> true; %% Issue#599: Null clientId and clean_sess = false validate_clientid(#mqtt_packet_connect{client_id = ClientId, clean_sess = CleanSess}, _ProtoState) - when size(ClientId) == 0 andalso (not CleanSess) -> + when byte_size(ClientId) == 0 andalso (not CleanSess) -> false; %% MQTT3.1.1 allow null clientId. -validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, +validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V4, client_id = ClientId}, _ProtoState) - when size(ClientId) =:= 0 -> + when byte_size(ClientId) =:= 0 -> true; validate_clientid(#mqtt_packet_connect{proto_ver = ProtoVer,