client presence management

This commit is contained in:
Feng Lee 2015-05-28 21:55:20 +08:00
parent 54245b61eb
commit d3616a2701
10 changed files with 162 additions and 117 deletions

View File

@ -77,9 +77,12 @@
%% MQTT Client
%%------------------------------------------------------------------------------
-record(mqtt_client, {
clientid :: binary(),
clientid :: binary() | undefined,
username :: binary() | undefined,
ipaddr :: inet:ip_address()
ipaddress :: inet:ip_address(),
clean_sess :: boolean(),
proto_ver :: 3 | 4,
client_pid :: pid()
}).
-type mqtt_client() :: #mqtt_client{}.

View File

@ -114,9 +114,9 @@ match_who(#mqtt_client{clientid = ClientId}, {client, ClientId}) ->
true;
match_who(#mqtt_client{username = Username}, {user, Username}) ->
true;
match_who(#mqtt_client{ipaddr = undefined}, {ipaddr, _Tup}) ->
match_who(#mqtt_client{ipaddress = undefined}, {ipaddr, _Tup}) ->
false;
match_who(#mqtt_client{ipaddr = IP}, {ipaddr, {_CDIR, Start, End}}) ->
match_who(#mqtt_client{ipaddress = IP}, {ipaddr, {_CDIR, Start, End}}) ->
I = esockd_access:atoi(IP),
I >= Start andalso I =< End;
match_who(_Client, _Who) ->

View File

@ -101,10 +101,10 @@ init(Opts) ->
check(#mqtt_client{clientid = undefined}, _Password, []) ->
{error, "ClientId undefined"};
check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, []) ->
check_clientid_only(ClientId, IpAddr);
check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, no}|_]) ->
check_clientid_only(ClientId, IpAddr);
check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, []) ->
check_clientid_only(ClientId, IpAddress);
check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) ->
check_clientid_only(ClientId, IpAddress);
check(_Client, undefined, [{password, yes}|_]) ->
{error, "Password undefined"};
check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) ->

View File

@ -87,11 +87,13 @@ handle_call(info, _From, State = #state{conn_name=ConnName,
proto_state = ProtoState}) ->
{reply, [{conn_name, ConnName} | emqttd_protocol:info(ProtoState)], State};
handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_call(Req, _From, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected request - ~p",[emqttd_net:format(Peername), Req]),
{reply, {error, unsupported_request}, State}.
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_cast(Msg, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
{noreply, State}.
handle_info(timeout, State) ->
stop({shutdown, timeout}, State);
@ -102,7 +104,7 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState
%% need transfer data???
%% emqttd_client:transfer(NewPid, Data),
lager:error("Shutdown for duplicate clientid: ~s, conn:~s",
[emqttd_protocol:clientid(ProtoState), ConnName]),
[emqttd_protocol:clientid(ProtoState), ConnName]),
stop({shutdown, duplicate_id}, State);
%%TODO: ok??
@ -158,11 +160,10 @@ handle_info({keepalive, timeout}, State = #state{peername = Peername, keepalive
handle_info(Info, State = #state{peername = Peername}) ->
lager:critical("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]),
{stop, {badinfo, Info}, State}.
{noreply, State}.
terminate(Reason, #state{peername = Peername, keepalive = KeepAlive, proto_state = ProtoState}) ->
lager:info("Client ~s: ~p terminated, reason: ~p~n", [emqttd_net:format(Peername), self(), Reason]),
notify(disconnected, Reason, ProtoState),
lager:info("Client ~s terminated, reason: ~p", [emqttd_net:format(Peername), Reason]),
emqttd_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of
{undefined, _} -> ok;
@ -231,7 +232,7 @@ control_throttle(State = #state{conn_state = Flow,
{_, _} -> run_socket(State)
end.
stop(Reason, State ) ->
stop(Reason, State) ->
{stop, Reason, State}.
received_stats(?PACKET(Type)) ->
@ -253,14 +254,3 @@ inc(?DISCONNECT) ->
inc(_) ->
ignore.
%%TODO: should be moved to emqttd_protocol... for event emitted when protocol shutdown...
notify(disconnected, _Reason, undefined) -> ingore;
notify(disconnected, {shutdown, Reason}, ProtoState) ->
%emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason});
ok;
notify(disconnected, Reason, ProtoState) ->
%emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), Reason}).
ok.

View File

@ -28,6 +28,8 @@
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-behaviour(gen_server).
-define(SERVER, ?MODULE).
@ -69,10 +71,10 @@ table() -> ?CLIENT_TAB.
%% @doc Lookup client pid with clientId
%% @end
%%------------------------------------------------------------------------------
-spec lookup(ClientId :: binary()) -> pid() | undefined.
-spec lookup(ClientId :: binary()) -> mqtt_client() | undefined.
lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(?CLIENT_TAB, ClientId) of
[{_, Pid, _}] -> Pid;
[Client] -> Client;
[] -> undefined
end.

View File

@ -29,22 +29,32 @@
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
-include_lib("emqtt/include/emqtt_packet.hrl").
-include("emqttd.hrl").
-behaviour(emqttd_gen_mod).
-export([load/1, subscribe/2, unload/1]).
-export([load/1, client_connected/3, unload/1]).
-record(state, {topics}).
load(Opts) ->
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
emqttd_broker:hook(client_connected, {?MODULE, subscribe},
{?MODULE, subscribe, [Topics]}),
emqttd_broker:hook(client_connected, {?MODULE, client_connected},
{?MODULE, client_connected, [Topics]}),
{ok, #state{topics = Topics}}.
subscribe({Client, ClientId}, Topics) ->
client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) ->
F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end,
[Client ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics].
[ClientPid ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics];
client_connected(_ConnAck, _Client, _Topics) ->
ignore.
unload(_Opts) ->
emqttd_broker:unhook(client_connected, {?MODULE, subscribe}).
emqttd_broker:unhook(client_connected, {?MODULE, client_connected}).

View File

@ -28,26 +28,52 @@
-include_lib("emqtt/include/emqtt.hrl").
-include("emqttd.hrl").
-export([load/1, unload/1]).
-export([client_connected/2, client_disconnected/2]).
-export([client_connected/3, client_disconnected/3]).
load(Opts) ->
emqttd_broker:hook(client_connected, {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}),
emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}),
{ok, Opts}.
client_connected({Client, ClientId}, _Opts) ->
Topic = emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])),
Payload = iolist_to_binary(mochijson2:encode([{ts, emqttd_util:timestamp()}])),
emqttd_pubsub:publish(presence, #mqtt_message{topic = Topic, payload = Payload}).
client_connected(ConnAck, #mqtt_client{clientid = ClientId,
username = Username,
ipaddress = IpAddress,
clean_sess = CleanSess,
proto_ver = ProtoVer}, _Opts) ->
Sess = case CleanSess of
true -> false;
false -> true
end,
Json = mochijson2:encode([{username, Username},
{ipaddress, emqttd_net:ntoa(IpAddress)},
{session, Sess},
{protocol, ProtoVer},
{connack, ConnAck},
{ts, emqttd_vm:timestamp()}]),
Message = #mqtt_message{topic = topic(connected, ClientId),
payload = iolist_to_binary(Json)},
emqttd_pubsub:publish(presence, Message).
client_disconnected({ClientId, Reason}, _Opts) ->
Topic = emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])),
Payload = iolist_to_binary(mochijson2:encode([{reason, Reason}, {ts, emqttd_util:timestamp()}])),
emqttd_pubsub:publish(presence, #mqtt_message{topic = Topic, payload = Payload}).
client_disconnected(Reason, ClientId, _Opts) ->
Json = mochijson2:encode([{reason, reason(Reason)}, {ts, emqttd_vm:timestamp()}]),
emqttd_pubsub:publish(presence, #mqtt_message{topic = topic(disconnected, ClientId),
payload = iolist_to_binary(Json)}).
unload(_Opts) ->
emqttd_broker:unhook(client_connected, {?MODULE, client_connected}),
emqttd_broker:unhook(client_disconnected, {?MODULE, client_disconnected}).
topic(connected, ClientId) ->
emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/connected"]));
topic(disconnected, ClientId) ->
emqtt_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])).
reason(Reason) when is_atom(Reason) -> Reason;
reason({Error, _}) when is_atom(Error) -> Error;
reason(_) -> internal_error.

View File

@ -29,19 +29,18 @@
-author("Feng Lee <feng@emqtt.io>").
-include_lib("emqtt/include/emqtt.hrl").
-include_lib("emqtt/include/emqtt_packet.hrl").
-include("emqttd.hrl").
%% API
-export([init/3, clientid/1]).
-export([init/3, info/1, clientid/1, client/1]).
-export([received/2, send/2, redeliver/2, shutdown/2]).
-export([handle/2]).
-export([info/1]).
%% Protocol State
-record(proto_state, {
peername,
@ -49,30 +48,29 @@
connected = false, %received CONNECT action?
proto_ver,
proto_name,
%packet_id,
username,
clientid,
clean_sess,
session, %% session state or session pid
session, %% session state or session pid
will_msg,
max_clientid_len = ?MAX_CLIENTID_LEN
max_clientid_len = ?MAX_CLIENTID_LEN,
client_pid
}).
-type proto_state() :: #proto_state{}.
%%------------------------------------------------------------------------------
%% @doc Init protocol
%% @end
%%------------------------------------------------------------------------------
init(Peername, SendFun, Opts) ->
MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
#proto_state{
peername = Peername,
sendfun = SendFun,
max_clientid_len = MaxLen}.
max_clientid_len = MaxLen,
client_pid = self()}.
clientid(#proto_state{clientid = ClientId}) -> ClientId.
client(#proto_state{peername = {Addr, _Port}, clientid = ClientId, username = Username}) ->
#mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}.
%%SHOULD be registered in emqttd_cm
info(#proto_state{proto_ver = ProtoVer,
proto_name = ProtoName,
clientid = ClientId,
@ -80,11 +78,27 @@ info(#proto_state{proto_ver = ProtoVer,
will_msg = WillMsg}) ->
[{proto_ver, ProtoVer},
{proto_name, ProtoName},
{clientid, ClientId},
{clientid, ClientId},
{clean_sess, CleanSess},
{will_msg, WillMsg}].
%%CONNECT Client requests a connection to a Server
clientid(#proto_state{clientid = ClientId}) ->
ClientId.
client(#proto_state{peername = {Addr, _Port},
clientid = ClientId,
username = Username,
clean_sess = CleanSess,
proto_ver = ProtoVer,
client_pid = Pid}) ->
#mqtt_client{clientid = ClientId,
username = Username,
ipaddress = Addr,
clean_sess = CleanSess,
proto_ver = ProtoVer,
client_pid = Pid}.
%% CONNECT Client requests a connection to a Server
%%A Client can only send the CONNECT Packet once over a Network Connection.
-spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}.
@ -107,42 +121,45 @@ received(Packet = ?PACKET(_Type), State) ->
{error, Reason, State}
end.
handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = {Addr, _}}) ->
handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername = {Addr, _}}) ->
#mqtt_packet_connect{proto_ver = ProtoVer,
proto_name = ProtoName,
username = Username,
password = Password,
clean_sess = CleanSess,
keep_alive = KeepAlive,
clientid = ClientId} = Var,
clientid = ClientId} = Var,
trace(recv, Packet, State#proto_state{clientid = ClientId}), %%TODO: fix later...
State1 = State0#proto_state{proto_ver = ProtoVer,
proto_name = ProtoName,
username = Username,
clientid = ClientId,
clean_sess = CleanSess},
State1 = State#proto_state{proto_ver = ProtoVer,
username = Username,
clientid = ClientId,
clean_sess = CleanSess},
{ReturnCode1, State2} =
case validate_connect(Var, State) of
trace(recv, Packet, State1),
{ReturnCode1, State3} =
case validate_connect(Var, State1) of
?CONNACK_ACCEPT ->
Client = #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr},
case emqttd_access_control:auth(Client, Password) of
case emqttd_access_control:auth(client(State1), Password) of
ok ->
%% Generate one if null
ClientId1 = clientid(ClientId, State),
%% Register clientId
emqttd_cm:register(ClientId1),
%% Generate clientId if null
State2 = State1#proto_state{clientid = clientid(ClientId, State1)},
%% Register the client to cm
emqttd_cm:register(client(State2)),
%%Starting session
{ok, Session} = emqttd_session:start({CleanSess, ClientId1, self()}),
{ok, Session} = emqttd_session:start({CleanSess, clientid(State2), self()}),
%% Start keepalive
start_keepalive(KeepAlive),
%% Run hooks
emqttd_broker:foreach_hooks(client_connected, [{self(), ClientId1}]),
{?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1,
session = Session,
will_msg = willmsg(Var)}};
%% ACCEPT
{?CONNACK_ACCEPT, State2#proto_state{session = Session, will_msg = willmsg(Var)}};
{error, Reason}->
lager:error("~s@~s: username '~s' login failed - ~s",
lager:error("~s@~s: username '~s', login failed - ~s",
[ClientId, emqttd_net:format(Peername), Username, Reason]),
{?CONNACK_CREDENTIALS, State1}
@ -150,9 +167,10 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
ReturnCode ->
{ReturnCode, State1}
end,
%%TODO: this is not right...
notify(connected, ReturnCode1, State2),
send(?CONNACK_PACKET(ReturnCode1), State2);
%% Run hooks
emqttd_broker:foreach_hooks(client_connected, [ReturnCode1, client(State3)]),
%% Send connack
send(?CONNACK_PACKET(ReturnCode1), State3);
handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
@ -251,7 +269,6 @@ send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) ->
%% message from session
send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) ->
send(emqtt_message:to_packet(Message), State);
%% message(qos1, qos2) not from session
send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session})
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
@ -279,12 +296,21 @@ trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
redeliver({?PUBREL, PacketId}, State) ->
send(?PUBREL_PACKET(PacketId), State).
shutdown(duplicate_id, _State) ->
quiet; %%
shutdown(normal, #proto_state{peername = Peername, clientid = ClientId}) ->
lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown",
[ClientId, emqttd_net:format(Peername)]),
try_unregister(ClientId),
emqttd_broker:foreach_hooks(client_disconnected, [normal, ClientId]);
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->
send_willmsg(ClientId, WillMsg),
try_unregister(ClientId, self()),
lager:info([{client, ClientId}], "Protocol ~s@~s Shutdown: ~p",
lager:info([{client, ClientId}], "Protocol ~s@~s: Shutdown for ~p",
[ClientId, emqttd_net:format(Peername), Error]),
ok.
send_willmsg(ClientId, WillMsg),
try_unregister(ClientId),
emqttd_broker:foreach_hooks(client_disconnected, [Error, ClientId]).
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
emqtt_message:from_packet(Packet).
@ -377,8 +403,8 @@ validate_qos(undefined) -> true;
validate_qos(Qos) when Qos =< ?QOS_2 -> true;
validate_qos(_) -> false.
try_unregister(undefined, _) -> ok;
try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId).
try_unregister(undefined) -> ok;
try_unregister(ClientId) -> emqttd_cm:unregister(ClientId).
%% publish ACL is cached in process dictionary.
check_acl(publish, Topic, State) ->
@ -411,18 +437,3 @@ inc(?PINGRESP) ->
inc(_) ->
ingore.
notify(connected, ReturnCode, #proto_state{peername = Peername,
proto_ver = ProtoVer,
clientid = ClientId,
clean_sess = CleanSess}) ->
Sess = case CleanSess of
true -> false;
false -> true
end,
Params = [{from, emqttd_net:format(Peername)},
{protocol, ProtoVer},
{session, Sess},
{connack, ReturnCode}].
%emqttd_event:notify({connected, ClientId, Params}).

View File

@ -30,8 +30,7 @@
-export([apply_module_attributes/1,
all_module_attributes/1,
cancel_timer/1,
timestamp/0, microsecs/0]).
cancel_timer/1]).
-export([integer_to_binary/1]).
@ -91,11 +90,4 @@ cancel_timer(Ref) ->
integer_to_binary(I) when is_integer(I) ->
list_to_binary(integer_to_list(I)).
timestamp() ->
{MegaSecs, Secs, _MicroSecs} = os:timestamp(),
MegaSecs * 1000000 + Secs.
microsecs() ->
{Mega, Sec, Micro} = erlang:now(),
(Mega * 1000000 + Sec) * 1000000 + Micro.

View File

@ -24,12 +24,23 @@
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_vm).
-author("Feng Lee <feng@emqtt.io>").
-export([timestamp/0, microsecs/0]).
-export([loads/0]).
timestamp() ->
{MegaSecs, Secs, _MicroSecs} = os:timestamp(),
MegaSecs * 1000000 + Secs.
microsecs() ->
{Mega, Sec, Micro} = erlang:now(),
(Mega * 1000000 + Sec) * 1000000 + Micro.
loads() ->
[{load1, ftos(cpu_sup:avg1()/256)},
{load5, ftos(cpu_sup:avg5()/256)},