From 87dafdd7b2866a454544282c07d95343b42d754b Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 2 Jul 2015 23:22:27 +0800 Subject: [PATCH] client_id --- include/emqttd.hrl | 11 ++++----- include/emqttd_protocol.hrl | 6 ++--- src/emqttd_access_control.erl | 2 +- src/emqttd_access_rule.erl | 6 ++--- src/emqttd_auth_clientid.erl | 18 +++++++------- src/emqttd_mod_autosub.erl | 2 +- src/emqttd_mod_presence.erl | 2 +- src/emqttd_packet.erl | 2 +- src/emqttd_parser.erl | 2 +- src/emqttd_protocol.erl | 42 ++++++++++++++++---------------- src/emqttd_serialiser.erl | 2 +- src/emqttd_session.erl | 46 +++++++++++++++++------------------ src/emqttd_sm.erl | 31 +++++++++++------------ src/emqttd_sm_sup.erl | 10 +++++--- src/emqttd_stats.erl | 1 + 15 files changed, 93 insertions(+), 90 deletions(-) diff --git a/include/emqttd.hrl b/include/emqttd.hrl index bf76cb389..3540fbcda 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -83,12 +83,11 @@ %% MQTT Client %%------------------------------------------------------------------------------ -record(mqtt_client, { - clientid :: binary() | undefined, + client_id :: binary() | undefined, username :: binary() | undefined, ipaddress :: inet:ip_address(), - client_pid :: pid(), - client_mon :: reference(), clean_sess :: boolean(), + client_pid :: pid(), proto_ver :: 3 | 4 }). @@ -98,7 +97,7 @@ %% MQTT Session %%------------------------------------------------------------------------------ -record(mqtt_session, { - clientid, + client_id, session_pid, subscriptions = [] }). @@ -108,16 +107,16 @@ %%------------------------------------------------------------------------------ %% MQTT Message %%------------------------------------------------------------------------------ --type mqtt_msgid() :: undefined | 1..16#ffff. +-type mqtt_msgid() :: binary(). -record(mqtt_message, { + msgid :: mqtt_msgid(), %% Unique Message ID topic :: binary(), %% Topic that the message is published to from :: binary() | atom(), %% ClientId of publisher qos = 0 :: 0 | 1 | 2, %% Message QoS retain = false :: boolean(), %% Retain flag dup = false :: boolean(), %% Dup flag sys = false :: boolean(), %% $SYS flag - msgid :: mqtt_msgid(), %% Message ID payload :: binary(), %% Payload timestamp :: erlang:timestamp() %% Timestamp }). diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index 35637e3d5..22a32bc09 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -32,7 +32,7 @@ -define(MQTT_PROTO_V311, 4). -define(PROTOCOL_NAMES, [ - {?MQTT_PROTO_V31, <<"MQIsdp">>}, + {?MQTT_PROTO_V31, <<"MQIsdp">>}, {?MQTT_PROTO_V311, <<"MQTT">>}]). -type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311. @@ -122,11 +122,11 @@ %%------------------------------------------------------------------------------ %% MQTT Packets %%------------------------------------------------------------------------------ --type mqtt_clientid() :: binary(). +-type mqtt_client_id() :: binary(). -type mqtt_packet_id() :: 1..16#ffff | undefined. -record(mqtt_packet_connect, { - clientid = <<>> :: mqtt_clientid(), + client_id = <<>> :: mqtt_client_id(), proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(), proto_name = <<"MQTT">> :: binary(), will_retain = false :: boolean(), diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 82ac2552d..fa01a5ccb 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -97,7 +97,7 @@ check_acl(Client, PubSub, Topic) when PubSub =:= publish orelse PubSub =:= subsc [] -> allow; AclMods -> check_acl(Client, PubSub, Topic, AclMods) end. -check_acl(#mqtt_client{clientid = ClientId}, PubSub, Topic, []) -> +check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) -> lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]), allow; check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) -> diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 0b0fdc6ba..6d7e895f7 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -110,7 +110,7 @@ match_who(_Client, {user, all}) -> true; match_who(_Client, {client, all}) -> true; -match_who(#mqtt_client{clientid = ClientId}, {client, ClientId}) -> +match_who(#mqtt_client{client_id = ClientId}, {client, ClientId}) -> true; match_who(#mqtt_client{username = Username}, {user, Username}) -> true; @@ -145,9 +145,9 @@ feed_var(Client, Pattern) -> feed_var(Client, Pattern, []). feed_var(_Client, [], Acc) -> lists:reverse(Acc); -feed_var(Client = #mqtt_client{clientid = undefined}, [<<"$c">>|Words], Acc) -> +feed_var(Client = #mqtt_client{client_id = undefined}, [<<"$c">>|Words], Acc) -> feed_var(Client, Words, [<<"$c">>|Acc]); -feed_var(Client = #mqtt_client{clientid = ClientId}, [<<"$c">>|Words], Acc) -> +feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"$c">>|Words], Acc) -> feed_var(Client, Words, [ClientId |Acc]); feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) -> feed_var(Client, Words, [<<"$u">>|Acc]); diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index e5171a239..2b0ef60c4 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -41,7 +41,7 @@ -define(AUTH_CLIENTID_TAB, mqtt_auth_clientid). --record(?AUTH_CLIENTID_TAB, {clientid, ipaddr, password}). +-record(?AUTH_CLIENTID_TAB, {client_id, ipaddr, password}). %%%============================================================================= %%% API @@ -52,7 +52,7 @@ %% @end %%------------------------------------------------------------------------------ add_clientid(ClientId) when is_binary(ClientId) -> - R = #mqtt_auth_clientid{clientid = ClientId}, + R = #mqtt_auth_clientid{client_id = ClientId}, mnesia:transaction(fun() -> mnesia:write(R) end). %%------------------------------------------------------------------------------ @@ -60,7 +60,7 @@ add_clientid(ClientId) when is_binary(ClientId) -> %% @end %%------------------------------------------------------------------------------ add_clientid(ClientId, Password) -> - R = #mqtt_auth_clientid{clientid = ClientId, password = Password}, + R = #mqtt_auth_clientid{client_id = ClientId, password = Password}, mnesia:transaction(fun() -> mnesia:write(R) end). %%------------------------------------------------------------------------------ @@ -99,15 +99,15 @@ init(Opts) -> end, {ok, Opts}. -check(#mqtt_client{clientid = undefined}, _Password, []) -> +check(#mqtt_client{client_id = undefined}, _Password, []) -> {error, "ClientId undefined"}; -check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, []) -> +check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, []) -> check_clientid_only(ClientId, IpAddress); -check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) -> +check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) -> check_clientid_only(ClientId, IpAddress); check(_Client, undefined, [{password, yes}|_]) -> {error, "Password undefined"}; -check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) -> +check(#mqtt_client{client_id = ClientId}, Password, [{password, yes}|_]) -> case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of [] -> {error, "ClientId Not Found"}; [#?AUTH_CLIENTID_TAB{password = Password}] -> ok; %% TODO: plaintext?? @@ -129,11 +129,11 @@ load(Fd, {ok, Line}, Clients) when is_list(Line) -> case string:tokens(Line, " ") of [ClientIdS] -> ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)), - [#mqtt_auth_clientid{clientid = ClientId} | Clients]; + [#mqtt_auth_clientid{client_id = ClientId} | Clients]; [ClientId, IpAddr0] -> IpAddr = string:strip(IpAddr0, right, $\n), Range = esockd_access:range(IpAddr), - [#mqtt_auth_clientid{clientid = list_to_binary(ClientId), + [#mqtt_auth_clientid{client_id = list_to_binary(ClientId), ipaddr = {IpAddr, Range}}|Clients]; BadLine -> lager:error("BadLine in clients.config: ~s", [BadLine]), diff --git a/src/emqttd_mod_autosub.erl b/src/emqttd_mod_autosub.erl index c5a1e136e..45461be88 100644 --- a/src/emqttd_mod_autosub.erl +++ b/src/emqttd_mod_autosub.erl @@ -45,7 +45,7 @@ load(Opts) -> {?MODULE, client_connected, [Topics]}), {ok, #state{topics = Topics}}. -client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) -> +client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) -> F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end, ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]}; diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 248eb6bcf..017a8418f 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -39,7 +39,7 @@ load(Opts) -> emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), {ok, Opts}. -client_connected(ConnAck, #mqtt_client{clientid = ClientId, +client_connected(ConnAck, #mqtt_client{client_id = ClientId, username = Username, ipaddress = IpAddress, clean_sess = CleanSess, diff --git a/src/emqttd_packet.erl b/src/emqttd_packet.erl index 6cb6b415d..c2098c2ff 100644 --- a/src/emqttd_packet.erl +++ b/src/emqttd_packet.erl @@ -96,7 +96,7 @@ format_variable(#mqtt_packet_connect{ will_flag = WillFlag, clean_sess = CleanSess, keep_alive = KeepAlive, - clientid = ClientId, + client_id = ClientId, will_topic = WillTopic, will_msg = WillMsg, username = Username, diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index bcc9fa1b4..c0a398af8 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -112,7 +112,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) will_flag = bool(WillFlag), clean_sess = bool(CleanSession), keep_alive = KeepAlive, - clientid = ClientId, + client_id = ClientId, will_topic = WillTopic, will_msg = WillMsg, username = UserName, diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 9be6b41fc..1ade6405e 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -47,7 +47,7 @@ proto_ver, proto_name, username, - clientid, + client_id, clean_sess, session, will_msg, @@ -70,25 +70,25 @@ init(Peername, SendFun, Opts) -> info(#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, - clientid = ClientId, + client_id = ClientId, clean_sess = CleanSess, will_msg = WillMsg}) -> [{proto_ver, ProtoVer}, {proto_name, ProtoName}, - {clientid, ClientId}, + {client_id, ClientId}, {clean_sess, CleanSess}, {will_msg, WillMsg}]. -clientid(#proto_state{clientid = ClientId}) -> +clientid(#proto_state{client_id = ClientId}) -> ClientId. client(#proto_state{peername = {Addr, _Port}, - clientid = ClientId, + client_id = ClientId, username = Username, clean_sess = CleanSess, proto_ver = ProtoVer, client_pid = Pid}) -> - #mqtt_client{clientid = ClientId, + #mqtt_client{client_id = ClientId, username = Username, ipaddress = Addr, clean_sess = CleanSess, @@ -126,12 +126,12 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, - clientid = ClientId} = Var, + client_id = ClientId} = Var, State1 = State0#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, username = Username, - clientid = ClientId, + client_id = ClientId, clean_sess = CleanSess}, trace(recv, Packet, State1), @@ -142,7 +142,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} case emqttd_access_control:auth(client(State1), Password) of ok -> %% Generate clientId if null - State2 = State1#proto_state{clientid = clientid(ClientId, State1)}, + State2 = State1#proto_state{client_id = clientid(ClientId, State1)}, %%Starting session {ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)), @@ -167,7 +167,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} send(?CONNACK_PACKET(ReturnCode1), State3); handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), - State = #proto_state{clientid = ClientId}) -> + State = #proto_state{client_id = ClientId}) -> case check_acl(publish, Topic, State) of allow -> @@ -199,7 +199,7 @@ handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessio handle(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); -handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = ClientId, session = Session}) -> +handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = ClientId, session = Session}) -> AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable], case lists:member(deny, AllowDenies) of true -> @@ -233,10 +233,10 @@ handle(?PACKET(?DISCONNECT), State) -> % clean willmsg {stop, normal, State#proto_state{will_msg = undefined}}. -publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{clientid = ClientId, session = Session}) -> +publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) -> emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)); -publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{clientid = ClientId, session = Session}) -> +publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of ok -> send(?PUBACK_PACKET(?PUBACK, PacketId), State); @@ -245,7 +245,7 @@ publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{clientid = Cli lager:error("Client ~s: publish qos1 error ~p", [ClientId, Error]) end; -publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{clientid = ClientId, session = Session}) -> +publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of ok -> send(?PUBACK_PACKET(?PUBREC, PacketId), State); @@ -267,11 +267,11 @@ send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when SendFun(Data), {ok, State}. -trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> +trace(recv, Packet, #proto_state{peername = Peername, client_id = ClientId}) -> lager:info([{client, ClientId}], "RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]); -trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> +trace(send, Packet, #proto_state{peername = Peername, client_id = ClientId}) -> lager:info([{client, ClientId}], "SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]). @@ -282,10 +282,10 @@ redeliver({?PUBREL, PacketId}, State) -> shutdown(duplicate_id, _State) -> quiet; %% -shutdown(_, #proto_state{clientid = undefined}) -> +shutdown(_, #proto_state{client_id = undefined}) -> ignore; -shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> +shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) -> lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p", [ClientId, emqttd_net:format(Peername), Error]), send_willmsg(ClientId, WillMsg), @@ -333,16 +333,16 @@ validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) -> validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) -> lists:member({Ver, Name}, ?PROTOCOL_NAMES). -validate_clientid(#mqtt_packet_connect{clientid = ClientId}, #proto_state{max_clientid_len = MaxLen}) +validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen}) when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) -> true; %% MQTT3.1.1 allow null clientId. -validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, clientid = ClientId}, _ProtoState) +validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState) when size(ClientId) =:= 0 -> true; -validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, clientid = ClientId}, _ProtoState) -> +validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) -> lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]), false. diff --git a/src/emqttd_serialiser.erl b/src/emqttd_serialiser.erl index e109c19a3..471904bce 100644 --- a/src/emqttd_serialiser.erl +++ b/src/emqttd_serialiser.erl @@ -60,7 +60,7 @@ serialise_header(#mqtt_packet_header{type = Type, VariableBin/binary, PayloadBin/binary>>. -serialise_variable(?CONNECT, #mqtt_packet_connect{clientid = ClientId, +serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId, proto_ver = ProtoVer, proto_name = ProtoName, will_retain = WillRetain, diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 93c9fd54b..3b2fb4382 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -73,7 +73,7 @@ clean_sess = true, %% ClientId: Identifier of Session - clientid :: binary(), + client_id :: binary(), %% Client Pid linked with session client_pid :: pid(), @@ -133,7 +133,7 @@ %% @doc Start a session. %% @end %%------------------------------------------------------------------------------ --spec start_link(boolean(), mqtt_clientid(), pid()) -> {ok, pid()} | {error, any()}. +-spec start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}. start_link(CleanSess, ClientId, ClientPid) -> gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []). @@ -141,7 +141,7 @@ start_link(CleanSess, ClientId, ClientPid) -> %% @doc Resume a session. %% @end %%------------------------------------------------------------------------------ --spec resume(pid(), mqtt_clientid(), pid()) -> ok. +-spec resume(pid(), mqtt_client_id(), pid()) -> ok. resume(Session, ClientId, ClientPid) -> gen_server:cast(Session, {resume, ClientId, ClientPid}). @@ -149,7 +149,7 @@ resume(Session, ClientId, ClientPid) -> %% @doc Destroy a session. %% @end %%------------------------------------------------------------------------------ --spec destroy(pid(), mqtt_clientid()) -> ok. +-spec destroy(pid(), mqtt_client_id()) -> ok. destroy(Session, ClientId) -> gen_server:call(Session, {destroy, ClientId}). @@ -217,7 +217,7 @@ init([CleanSess, ClientId, ClientPid]) -> SessEnv = emqttd:env(mqtt, session), Session = #session{ clean_sess = CleanSess, - clientid = ClientId, + client_id = ClientId, client_pid = ClientPid, subscriptions = [], inflight_queue = [], @@ -234,7 +234,7 @@ init([CleanSess, ClientId, ClientPid]) -> timestamp = os:timestamp()}, {ok, Session, hibernate}. -handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, +handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> %% subscribe first and don't care if the subscriptions have been existed @@ -264,7 +264,7 @@ handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, end, Subscriptions, Topics), {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}; -handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId, +handle_call({unsubscribe, Topics}, _From, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> %% unsubscribe from topic tree @@ -285,7 +285,7 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId {reply, ok, Session#session{subscriptions = Subscriptions1}}; handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From, - Session = #session{clientid = ClientId, + Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> case check_awaiting_rel(Session) of @@ -294,12 +294,12 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From, AwaitingRel1 = maps:put(MsgId, {Msg, TRef}, AwaitingRel), {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; false -> - lager:critical([{clientid, ClientId}], "Session ~s dropped Qos2 message " + lager:critical([{client, ClientId}], "Session ~s dropped Qos2 message " "for too many awaiting_rel: ~p", [ClientId, Msg]), {reply, {error, dropped}, Session} end; -handle_call({destroy, ClientId}, _From, Session = #session{clientid = ClientId}) -> +handle_call({destroy, ClientId}, _From, Session = #session{client_id = ClientId}) -> lager:warning("Session ~s destroyed", [ClientId]), {stop, {shutdown, destroy}, ok, Session}; @@ -309,7 +309,7 @@ handle_call(Req, _From, State) -> handle_cast({resume, ClientId, ClientPid}, Session) -> - #session{clientid = ClientId, + #session{client_id = ClientId, client_pid = OldClientPid, inflight_queue = InflightQ, awaiting_ack = AwaitingAck, @@ -349,7 +349,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> {noreply, dequeue(Session2), hibernate}; %% PUBRAC -handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, awaiting_ack = Awaiting}) -> +handle_cast({puback, MsgId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> case maps:find(MsgId, Awaiting) of {ok, {_, TRef}} -> cancel_timer(TRef), @@ -361,7 +361,7 @@ handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, awaiting_ac end; %% PUBREC -handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId, +handle_cast({pubrec, MsgId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, await_rel_timeout = Timeout}) -> @@ -377,7 +377,7 @@ handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId, end; %% PUBREL -handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, +handle_cast({pubrel, MsgId}, Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel}) -> case maps:find(MsgId, AwaitingRel) of {ok, {Msg, TRef}} -> @@ -390,7 +390,7 @@ handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, end; %% PUBCOMP -handle_cast({pubcomp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}) -> +handle_cast({pubcomp, MsgId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) -> case maps:find(MsgId, AwaitingComp) of {ok, TRef} -> cancel_timer(TRef), @@ -417,7 +417,7 @@ handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, {noreply, Session}; handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, - Session = #session{clientid = ClientId, message_queue = MsgQ}) + Session = #session{client_id = ClientId, message_queue = MsgQ}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case check_inflight(Session) of @@ -433,7 +433,7 @@ handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_pid = unde %% just remove awaiting {noreply, Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck)}}; -handle_info({timeout, awaiting_ack, MsgId}, Session = #session{clientid = ClientId, +handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_id = ClientId, inflight_queue = InflightQ, awaiting_ack = AwaitingAck}) -> case maps:find(MsgId, AwaitingAck) of @@ -451,7 +451,7 @@ handle_info({timeout, awaiting_ack, MsgId}, Session = #session{clientid = Client {noreply, Session} end; -handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId, +handle_info({timeout, awaiting_rel, MsgId}, Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel}) -> case maps:find(MsgId, AwaitingRel) of {ok, {Msg, _TRef}} -> @@ -463,7 +463,7 @@ handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = Client {noreply, Session} end; -handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId, +handle_info({timeout, awaiting_comp, MsgId}, Session = #session{client_id = ClientId, awaiting_comp = Awaiting}) -> case maps:find(MsgId, Awaiting) of {ok, _TRef} -> @@ -481,25 +481,25 @@ handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, {stop, normal, Session}; handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, - clientid = ClientId, + client_id = ClientId, client_pid = ClientPid, expired_after = Expires}) -> lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]), TRef = timer(Expires, session_expired), {noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate}; -handle_info({'EXIT', Pid, _Reason}, Session = #session{clientid = ClientId, +handle_info({'EXIT', Pid, _Reason}, Session = #session{client_id = ClientId, client_pid = ClientPid}) -> lager:error("Session ~s received unexpected EXIT:" " client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]), {noreply, Session}; -handle_info(session_expired, Session = #session{clientid = ClientId}) -> +handle_info(session_expired, Session = #session{client_id = ClientId}) -> lager:error("Session ~s expired, shutdown now!", [ClientId]), {stop, {shutdown, expired}, Session}; -handle_info(Info, Session = #session{clientid = ClientId}) -> +handle_info(Info, Session = #session{client_id = ClientId}) -> lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]), {noreply, Session}. diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 59e32f2a1..80d9a9a66 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -34,6 +34,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_sm). -author("Feng Lee "). @@ -67,12 +68,11 @@ %% @doc Start a session manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, SessStatsFun) -> {ok, pid()} | ignore | {error, any()} when +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - %ClientStatsFun :: fun(), - SessStatsFun :: fun(). -start_link(Id, SessStatsFun) -> - gen_server:start_link(?MODULE, [Id, SessStatsFun], []). + StatsFun :: {fun(), fun()}. +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). %%------------------------------------------------------------------------------ %% @doc Pool name. @@ -103,7 +103,7 @@ start_session(CleanSess, ClientId) -> -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> case ets:lookup(?SESSION_TAB, ClientId) of - [{_, SessPid, _}] -> SessPid; + [{_Clean, _, SessPid, _}] -> SessPid; [] -> undefined end. @@ -129,7 +129,7 @@ init([Id, StatsFun]) -> handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> Reply = case ets:lookup(?SESSION_TAB, ClientId) of - [{_, SessPid, _MRef}] -> + [{_Clean, _, SessPid, _MRef}] -> emqttd_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> @@ -139,7 +139,7 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> case ets:lookup(?SESSION_TAB, ClientId) of - [{_, SessPid, MRef}] -> + [{_Clean, _, SessPid, MRef}] -> erlang:demonitor(MRef, [flush]), emqttd_session:destroy(SessPid, ClientId); [] -> @@ -149,7 +149,7 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> handle_call({destroy_session, ClientId}, _From, State) -> case ets:lookup(?SESSION_TAB, ClientId) of - [{_, SessPid, MRef}] -> + [{_Clean, _, SessPid, MRef}] -> emqttd_session:destroy(SessPid, ClientId), erlang:demonitor(MRef, [flush]), ets:delete(?SESSION_TAB, ClientId); @@ -165,7 +165,7 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?SESSION_TAB, {'_', DownPid, MRef}), + ets:match_delete(?SESSION_TAB, {'_', '_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> @@ -184,13 +184,14 @@ code_change(_OldVsn, State, _Extra) -> new_session(CleanSess, ClientId, ClientPid) -> case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of {ok, SessPid} -> - ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), + MRef = erlang:monitor(process, SessPid), + ets:insert(?SESSION_TAB, {CleanSess, ClientId, SessPid, MRef}), {ok, SessPid}; {error, Error} -> {error, Error} end. -setstats(State = #state{statsfun = StatsFun}) -> - StatsFun(ets:info(?SESSION_TAB, size)), State. - - +setstats(State = #state{statsfun = {CFun, SFun}}) -> + CFun(ets:info(?SESSION_TAB, size)), + SFun(ets:select_count(?SESSION_TAB, [{{true, '_', '_', '_'}, [], [true]}])), + State. diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index 03ee63a47..8f5dbd0a4 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_sm_sup). -author("Feng Lee "). @@ -42,19 +43,20 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - ets:new(emqttd_sm:table(), [set, named_table, public, + ets:new(emqttd_sm:table(), [set, named_table, public, {keypos, 2}, {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), - %%ClientStatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), - SessStatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), Children = lists:map( fun(I) -> Name = {emqttd_sm, I}, gproc_pool:add_worker(emqttd_sm:pool(), Name, I), - {Name, {emqttd_sm, start_link, [I, SessStatsFun]}, + {Name, {emqttd_sm, start_link, [I, statsfun()]}, permanent, 10000, worker, [emqttd_sm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. +statsfun() -> + {emqttd_stats:statsfun('clients/count', 'clients/max'), + emqttd_stats:statsfun('sessions/count', 'sessions/max')}. diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 1f005369e..24ef12a6c 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -124,6 +124,7 @@ setstat(Stat, Val) -> %%------------------------------------------------------------------------------ %% @doc Set stats with max +%% TODO: this is wrong... %% @end %%------------------------------------------------------------------------------ -spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().