diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c604bf61..5aa1df195 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ emqttd ChangeLog ================== -0.9.0-alpha (2015-06-14) +0.9.0-alpha (2015-07-xx) ------------------------- Session and Queue @@ -19,6 +19,12 @@ Alarm Protocol Compliant +Global msgid + +Hooks + +Plugins + 0.8.6-beta (2015-06-17) ------------------------- diff --git a/doc/uuid.md b/doc/uuid.md new file mode 100644 index 000000000..5b7bd0aa0 --- /dev/null +++ b/doc/uuid.md @@ -0,0 +1,19 @@ +## Mongodb ObjectId + +* 4-byte value representing the seconds since the Unix epoch, +* 3-byte machine identifier, +* 2-byte process id, and +* 3-byte counter, starting with a random value. + +## Flake Id + +* 64bits Timestamp +* 48bits WorkerId +* 16bits Sequence + +## emqttd Id + +* 64bits Timestamp: erlang:now(), erlang:system_time +* 48bits (node+pid): Node + Pid -> Integer +* 16bits Sequence: PktId + diff --git a/include/emqttd.hrl b/include/emqttd.hrl index bf76cb389..2ea71811c 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -83,12 +83,11 @@ %% MQTT Client %%------------------------------------------------------------------------------ -record(mqtt_client, { - clientid :: binary() | undefined, + client_id :: binary() | undefined, username :: binary() | undefined, ipaddress :: inet:ip_address(), - client_pid :: pid(), - client_mon :: reference(), clean_sess :: boolean(), + client_pid :: pid(), proto_ver :: 3 | 4 }). @@ -98,7 +97,7 @@ %% MQTT Session %%------------------------------------------------------------------------------ -record(mqtt_session, { - clientid, + client_id, session_pid, subscriptions = [] }). @@ -108,18 +107,20 @@ %%------------------------------------------------------------------------------ %% MQTT Message %%------------------------------------------------------------------------------ --type mqtt_msgid() :: undefined | 1..16#ffff. +-type mqtt_msgid() :: binary() | undefined. +-type mqtt_pktid() :: 1..16#ffff | undefined. -record(mqtt_message, { + msgid :: mqtt_msgid(), %% Unique Message ID + pktid :: 1..16#ffff, %% PacketId topic :: binary(), %% Topic that the message is published to from :: binary() | atom(), %% ClientId of publisher qos = 0 :: 0 | 1 | 2, %% Message QoS retain = false :: boolean(), %% Retain flag dup = false :: boolean(), %% Dup flag sys = false :: boolean(), %% $SYS flag - msgid :: mqtt_msgid(), %% Message ID payload :: binary(), %% Payload - timestamp :: erlang:timestamp() %% Timestamp + timestamp :: erlang:timestamp() %% os:timestamp }). -type mqtt_message() :: #mqtt_message{}. diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index 35637e3d5..22a32bc09 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -32,7 +32,7 @@ -define(MQTT_PROTO_V311, 4). -define(PROTOCOL_NAMES, [ - {?MQTT_PROTO_V31, <<"MQIsdp">>}, + {?MQTT_PROTO_V31, <<"MQIsdp">>}, {?MQTT_PROTO_V311, <<"MQTT">>}]). -type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311. @@ -122,11 +122,11 @@ %%------------------------------------------------------------------------------ %% MQTT Packets %%------------------------------------------------------------------------------ --type mqtt_clientid() :: binary(). +-type mqtt_client_id() :: binary(). -type mqtt_packet_id() :: 1..16#ffff | undefined. -record(mqtt_packet_connect, { - clientid = <<>> :: mqtt_clientid(), + client_id = <<>> :: mqtt_client_id(), proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(), proto_name = <<"MQTT">> :: binary(), will_retain = false :: boolean(), diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 82ac2552d..fa01a5ccb 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -97,7 +97,7 @@ check_acl(Client, PubSub, Topic) when PubSub =:= publish orelse PubSub =:= subsc [] -> allow; AclMods -> check_acl(Client, PubSub, Topic, AclMods) end. -check_acl(#mqtt_client{clientid = ClientId}, PubSub, Topic, []) -> +check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) -> lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]), allow; check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) -> diff --git a/src/emqttd_access_rule.erl b/src/emqttd_access_rule.erl index 0b0fdc6ba..6d7e895f7 100644 --- a/src/emqttd_access_rule.erl +++ b/src/emqttd_access_rule.erl @@ -110,7 +110,7 @@ match_who(_Client, {user, all}) -> true; match_who(_Client, {client, all}) -> true; -match_who(#mqtt_client{clientid = ClientId}, {client, ClientId}) -> +match_who(#mqtt_client{client_id = ClientId}, {client, ClientId}) -> true; match_who(#mqtt_client{username = Username}, {user, Username}) -> true; @@ -145,9 +145,9 @@ feed_var(Client, Pattern) -> feed_var(Client, Pattern, []). feed_var(_Client, [], Acc) -> lists:reverse(Acc); -feed_var(Client = #mqtt_client{clientid = undefined}, [<<"$c">>|Words], Acc) -> +feed_var(Client = #mqtt_client{client_id = undefined}, [<<"$c">>|Words], Acc) -> feed_var(Client, Words, [<<"$c">>|Acc]); -feed_var(Client = #mqtt_client{clientid = ClientId}, [<<"$c">>|Words], Acc) -> +feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"$c">>|Words], Acc) -> feed_var(Client, Words, [ClientId |Acc]); feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) -> feed_var(Client, Words, [<<"$u">>|Acc]); diff --git a/src/emqttd_alarm.erl b/src/emqttd_alarm.erl index 0c7061490..f3a25ff68 100644 --- a/src/emqttd_alarm.erl +++ b/src/emqttd_alarm.erl @@ -121,12 +121,10 @@ terminate(_, _) -> ok. alarm_msg(Type, AlarmId, Json) -> - #mqtt_message{from = alarm, - qos = 1, - sys = true, - topic = topic(Type, AlarmId), - payload = iolist_to_binary(Json), - timestamp = os:timestamp()}. + Msg = emqttd_message:make(alarm, + topic(Type, AlarmId), + iolist_to_binary(Json)), + emqttd_message:set_flag(sys, Msg). topic(alert, AlarmId) -> emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); diff --git a/src/emqttd_auth_clientid.erl b/src/emqttd_auth_clientid.erl index e5171a239..2b0ef60c4 100644 --- a/src/emqttd_auth_clientid.erl +++ b/src/emqttd_auth_clientid.erl @@ -41,7 +41,7 @@ -define(AUTH_CLIENTID_TAB, mqtt_auth_clientid). --record(?AUTH_CLIENTID_TAB, {clientid, ipaddr, password}). +-record(?AUTH_CLIENTID_TAB, {client_id, ipaddr, password}). %%%============================================================================= %%% API @@ -52,7 +52,7 @@ %% @end %%------------------------------------------------------------------------------ add_clientid(ClientId) when is_binary(ClientId) -> - R = #mqtt_auth_clientid{clientid = ClientId}, + R = #mqtt_auth_clientid{client_id = ClientId}, mnesia:transaction(fun() -> mnesia:write(R) end). %%------------------------------------------------------------------------------ @@ -60,7 +60,7 @@ add_clientid(ClientId) when is_binary(ClientId) -> %% @end %%------------------------------------------------------------------------------ add_clientid(ClientId, Password) -> - R = #mqtt_auth_clientid{clientid = ClientId, password = Password}, + R = #mqtt_auth_clientid{client_id = ClientId, password = Password}, mnesia:transaction(fun() -> mnesia:write(R) end). %%------------------------------------------------------------------------------ @@ -99,15 +99,15 @@ init(Opts) -> end, {ok, Opts}. -check(#mqtt_client{clientid = undefined}, _Password, []) -> +check(#mqtt_client{client_id = undefined}, _Password, []) -> {error, "ClientId undefined"}; -check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, []) -> +check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, []) -> check_clientid_only(ClientId, IpAddress); -check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) -> +check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) -> check_clientid_only(ClientId, IpAddress); check(_Client, undefined, [{password, yes}|_]) -> {error, "Password undefined"}; -check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) -> +check(#mqtt_client{client_id = ClientId}, Password, [{password, yes}|_]) -> case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of [] -> {error, "ClientId Not Found"}; [#?AUTH_CLIENTID_TAB{password = Password}] -> ok; %% TODO: plaintext?? @@ -129,11 +129,11 @@ load(Fd, {ok, Line}, Clients) when is_list(Line) -> case string:tokens(Line, " ") of [ClientIdS] -> ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)), - [#mqtt_auth_clientid{clientid = ClientId} | Clients]; + [#mqtt_auth_clientid{client_id = ClientId} | Clients]; [ClientId, IpAddr0] -> IpAddr = string:strip(IpAddr0, right, $\n), Range = esockd_access:range(IpAddr), - [#mqtt_auth_clientid{clientid = list_to_binary(ClientId), + [#mqtt_auth_clientid{client_id = list_to_binary(ClientId), ipaddr = {IpAddr, Range}}|Clients]; BadLine -> lager:error("BadLine in clients.config: ~s", [BadLine]), diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 2129a6bdb..38183a573 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -81,7 +81,7 @@ init([Node, SubTopic, Options]) -> true -> true = erlang:monitor_node(Node, true), State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}), - emqttd_pubsub:subscribe({SubTopic, ?QOS_0}), + emqttd_pubsub:subscribe({SubTopic, State#state.qos}), {ok, State}; false -> {stop, {cannot_connect, Node}} @@ -107,7 +107,7 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({dispatch, Msg}, State = #state{node = Node, status = down}) -> - lager:warning("Bridge Dropped Msg for ~p Down:~n~p", [Node, Msg]), + lager:error("Bridge Dropped Msg for ~p Down: ~s", [Node, emqttd_message:format(Msg)]), {noreply, State}; handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) -> @@ -159,14 +159,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -%TODO: qos is not right... -transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos, - topic_prefix = Prefix, +transform(Msg = #mqtt_message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) -> - Msg1 = - if - Qos =:= undefined -> Msg; - true -> Msg#mqtt_message{qos = Qos} - end, - Msg1#mqtt_message{topic = <>}. + Msg#mqtt_message{topic = <>}. diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index e4deb6414..730e70f16 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -186,7 +186,7 @@ foldl_hooks(Hook, Args, Acc0) -> case ets:lookup(?BROKER_TAB, {hook, Hook}) of [{_, Hooks}] -> lists:foldl(fun({_Name, {M, F, A}}, Acc) -> - apply(M, F, [Acc, Args++A]) + apply(M, F, lists:append([Args, [Acc], A])) end, Acc0, Hooks); [] -> Acc0 @@ -286,23 +286,15 @@ create_topic(Topic) -> retain(brokers) -> Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")), - publish(#mqtt_message{from = broker, - retain = true, - topic = <<"$SYS/brokers">>, - payload = Payload}). + Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). retain(Topic, Payload) when is_binary(Payload) -> - publish(#mqtt_message{from = broker, - retain = true, - topic = emqttd_topic:systop(Topic), - payload = Payload}). + Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), + emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg)). publish(Topic, Payload) when is_binary(Payload) -> - publish( #mqtt_message{from = broker, - topic = emqttd_topic:systop(Topic), - payload = Payload}). - -publish(Msg) -> + Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), emqttd_pubsub:publish(Msg). uptime(#state{started_at = Ts}) -> diff --git a/src/emqttd_guid.erl b/src/emqttd_guid.erl new file mode 100644 index 000000000..a6897b73f --- /dev/null +++ b/src/emqttd_guid.erl @@ -0,0 +1,126 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% +%%% Generate global unique id for mqtt message. +%%% +%%% -------------------------------------------------------- +%%% | Timestamp | NodeID + PID | Sequence | +%%% |<------- 64bits ------->|<--- 48bits --->|<- 16bits ->| +%%% -------------------------------------------------------- +%%% +%%% 1. Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp +%%% 2. NodeId: encode node() to 2 bytes integer +%%% 3. Pid: encode pid to 4 bytes integer +%%% 4. Sequence: 2 bytes sequence no per pid +%%% +%%% @end +%%%----------------------------------------------------------------------------- + +-module(emqttd_guid). + +-export([gen/0]). + +-define(MAX_SEQ, 16#FFFF). + +-type guid() :: <<_:128>>. + +-spec gen() -> guid(). +gen() -> + Guid = case get(guid) of + undefined -> new(); + {_Ts, NPid, Seq} -> next(NPid, Seq) + end, + put(guid, Guid), enc(Guid). + +new() -> + {ts(), npid(), 0}. + +next(NPid, Seq) when Seq >= ?MAX_SEQ -> + {ts(), NPid, 0}; +next(NPid, Seq) -> + {ts(), NPid, Seq + 1}. + +enc({Ts, NPid, Seq}) -> + <>. + +ts() -> + case erlang:function_exported(erlang, system_time, 1) of + true -> %% R18 + erlang:system_time(micro_seconds); + false -> + {MegaSeconds, Seconds, MicroSeconds} = os:timestamp(), + (MegaSeconds * 1000000 + Seconds) * 1000000 + MicroSeconds + end. + +%% Copied from https://github.com/okeuday/uuid.git. +npid() -> + <> = + crypto:hash(sha, erlang:list_to_binary(erlang:atom_to_list(node()))), + + % later, when the pid format changes, handle the different format + ExternalTermFormatVersion = 131, + PidExtType = 103, + <> = erlang:term_to_binary(self()), + % 72 bits for the Erlang pid + <> = binary:part(PidBin, erlang:byte_size(PidBin), -9), + + % reduce the 160 bit NodeData checksum to 16 bits + NodeByte1 = ((((((((NodeD01 bxor NodeD02) + bxor NodeD03) + bxor NodeD04) + bxor NodeD05) + bxor NodeD06) + bxor NodeD07) + bxor NodeD08) + bxor NodeD09) + bxor NodeD10, + NodeByte2 = (((((((((NodeD11 bxor NodeD12) + bxor NodeD13) + bxor NodeD14) + bxor NodeD15) + bxor NodeD16) + bxor NodeD17) + bxor NodeD18) + bxor NodeD19) + bxor NodeD20) + bxor PidCR1, + + % reduce the Erlang pid to 32 bits + PidByte1 = PidID1 bxor PidSR4, + PidByte2 = PidID2 bxor PidSR3, + PidByte3 = PidID3 bxor PidSR2, + PidByte4 = PidID4 bxor PidSR1, + + <> = <>, + NPid. + diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 8b7afe54e..3e66670e6 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -49,14 +49,11 @@ handle_request('POST', "/mqtt/publish", Req) -> Qos = int(get_value("qos", Params, "0")), Retain = bool(get_value("retain", Params, "0")), Topic = list_to_binary(get_value("topic", Params)), - Message = list_to_binary(get_value("message", Params)), + Payload = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> - emqttd_pubsub:publish(#mqtt_message{from = http, - qos = Qos, - retain = Retain, - topic = Topic, - payload = Message}), + Msg = emqttd_message:make(http, Qos, Topic, Payload), + emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}), Req:ok({"text/plan", <<"ok\n">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index d96e5d922..4d3eaea83 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -32,12 +32,39 @@ -include("emqttd_protocol.hrl"). --export([from_packet/1, from_packet/2, to_packet/1]). +-export([make/3, make/4, from_packet/1, from_packet/2, to_packet/1]). -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]). -export([format/1]). +%%------------------------------------------------------------------------------ +%% @doc Make a message +%% @end +%%------------------------------------------------------------------------------ +-spec make(From, Topic, Payload) -> mqtt_message() when + From :: atom() | binary(), + Topic :: binary(), + Payload :: binary(). +make(From, Topic, Payload) -> + #mqtt_message{topic = Topic, + from = From, + payload = Payload, + timestamp = os:timestamp()}. + +-spec make(From, Qos, Topic, Payload) -> mqtt_message() when + From :: atom() | binary(), + Qos :: mqtt_qos(), + Topic :: binary(), + Payload :: binary(). +make(From, Qos, Topic, Payload) -> + #mqtt_message{msgid = msgid(Qos), + topic = Topic, + from = From, + qos = Qos, + payload = Payload, + timestamp = os:timestamp()}. + %%------------------------------------------------------------------------------ %% @doc Message from Packet %% @end @@ -50,12 +77,14 @@ from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId}, payload = Payload}) -> - #mqtt_message{msgid = PacketId, + #mqtt_message{msgid = msgid(Qos), + pktid = PacketId, qos = Qos, retain = Retain, dup = Dup, topic = Topic, - payload = Payload}; + payload = Payload, + timestamp = os:timestamp()}; from_packet(#mqtt_packet_connect{will_flag = false}) -> undefined; @@ -64,38 +93,44 @@ from_packet(#mqtt_packet_connect{will_retain = Retain, will_qos = Qos, will_topic = Topic, will_msg = Msg}) -> - #mqtt_message{retain = Retain, - qos = Qos, - topic = Topic, - dup = false, - payload = Msg}. + #mqtt_message{msgid = msgid(Qos), + topic = Topic, + retain = Retain, + qos = Qos, + dup = false, + payload = Msg, + timestamp = os:timestamp()}. from_packet(ClientId, Packet) -> Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}. +msgid(?QOS_0) -> + undefined; +msgid(_Qos) -> + emqttd_guid:gen(). + %%------------------------------------------------------------------------------ %% @doc Message to packet %% @end %%------------------------------------------------------------------------------ -spec to_packet(mqtt_message()) -> mqtt_packet(). -to_packet(#mqtt_message{msgid = MsgId, +to_packet(#mqtt_message{pktid = PkgId, qos = Qos, retain = Retain, dup = Dup, topic = Topic, payload = Payload}) -> - PacketId = if - Qos =:= ?QOS_0 -> undefined; - true -> MsgId - end, - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = Qos, retain = Retain, dup = Dup}, variable = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId}, + packet_id = if + Qos =:= ?QOS_0 -> undefined; + true -> PkgId + end + }, payload = Payload}. %%------------------------------------------------------------------------------ @@ -109,6 +144,8 @@ set_flag(Msg) -> -spec set_flag(atom(), mqtt_message()) -> mqtt_message(). set_flag(dup, Msg = #mqtt_message{dup = false}) -> Msg#mqtt_message{dup = true}; +set_flag(sys, Msg = #mqtt_message{sys = false}) -> + Msg#mqtt_message{sys = true}; set_flag(retain, Msg = #mqtt_message{retain = false}) -> Msg#mqtt_message{retain = true}; set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. @@ -133,7 +170,7 @@ unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. %% @doc Format MQTT Message %% @end %%------------------------------------------------------------------------------ -format(#mqtt_message{msgid=MsgId, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) -> - io_lib:format("Message(MsgId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)", - [MsgId, Qos, Retain, Dup, Topic]). +format(#mqtt_message{msgid=MsgId, pktid = PktId, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) -> + io_lib:format("Message(MsgId=~p, PktId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)", + [MsgId, PktId, Qos, Retain, Dup, Topic]). diff --git a/src/emqttd_metrics.erl b/src/emqttd_metrics.erl index f0d45ec02..863d67676 100644 --- a/src/emqttd_metrics.erl +++ b/src/emqttd_metrics.erl @@ -77,7 +77,7 @@ -define(SYSTOP_MESSAGES, [ {counter, 'messages/received'}, % Messages received {counter, 'messages/sent'}, % Messages sent - {gauge, 'messages/retained/count'},% Messagea retained + {gauge, 'messages/retained'}, % Messagea retained {gauge, 'messages/stored/count'}, % Messages stored {counter, 'messages/dropped'} % Messages dropped ]). @@ -222,9 +222,9 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= publish(Metric, Val) -> - emqttd_pubsub:publish(#mqtt_message{topic = metric_topic(Metric), - from = metrics, - payload = emqttd_util:integer_to_binary(Val)}). + Payload = emqttd_util:integer_to_binary(Val), + Msg = emqttd_message:make(metrics, metric_topic(Metric), Payload), + emqttd_pubsub:publish(Msg). create_metric({gauge, Name}) -> ets:insert(?METRIC_TAB, {{Name, 0}, 0}); diff --git a/src/emqttd_mod_autosub.erl b/src/emqttd_mod_autosub.erl index c5a1e136e..86d0b3e2e 100644 --- a/src/emqttd_mod_autosub.erl +++ b/src/emqttd_mod_autosub.erl @@ -41,11 +41,11 @@ load(Opts) -> Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2], - emqttd_broker:hook(client_connected, {?MODULE, client_connected}, + emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Topics]}), {ok, #state{topics = Topics}}. -client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) -> +client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) -> F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end, ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]}; @@ -53,6 +53,5 @@ client_connected(_ConnAck, _Client, _Topics) -> ignore. unload(_Opts) -> - emqttd_broker:unhook(client_connected, {?MODULE, client_connected}). - + emqttd_broker:unhook('client.connected', {?MODULE, client_connected}). diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 248eb6bcf..63edf024b 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -35,11 +35,11 @@ -export([client_connected/3, client_disconnected/3]). load(Opts) -> - emqttd_broker:hook(client_connected, {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}), - emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), + emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}), + emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), {ok, Opts}. -client_connected(ConnAck, #mqtt_client{clientid = ClientId, +client_connected(ConnAck, #mqtt_client{client_id = ClientId, username = Username, ipaddress = IpAddress, clean_sess = CleanSess, @@ -55,24 +55,25 @@ client_connected(ConnAck, #mqtt_client{clientid = ClientId, {protocol, ProtoVer}, {connack, ConnAck}, {ts, emqttd_util:now_to_secs()}]), - Message = #mqtt_message{from = presence, - qos = proplists:get_value(qos, Opts, 0), - topic = topic(connected, ClientId), - payload = iolist_to_binary(Json)}, - emqttd_pubsub:publish(Message). + Msg = emqttd_message:make(presence, + proplists:get_value(qos, Opts, 0), + topic(connected, ClientId), + iolist_to_binary(Json)), + emqttd_pubsub:publish(Msg). client_disconnected(Reason, ClientId, Opts) -> Json = mochijson2:encode([{clientid, ClientId}, {reason, reason(Reason)}, {ts, emqttd_util:now_to_secs()}]), - emqttd_pubsub:publish(#mqtt_message{from = presence, - qos = proplists:get_value(qos, Opts, 0), - topic = topic(disconnected, ClientId), - payload = iolist_to_binary(Json)}). + Msg = emqttd_message:make(presence, + proplists:get_value(qos, Opts, 0), + topic(disconnected, ClientId), + iolist_to_binary(Json)), + emqttd_pubsub:publish(Msg). unload(_Opts) -> - emqttd_broker:unhook(client_connected, {?MODULE, client_connected}), - emqttd_broker:unhook(client_disconnected, {?MODULE, client_disconnected}). + emqttd_broker:unhook('client.connected', {?MODULE, client_connected}), + emqttd_broker:unhook('client.disconnected', {?MODULE, client_disconnected}). topic(connected, ClientId) -> emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); diff --git a/src/emqttd_mod_rewrite.erl b/src/emqttd_mod_rewrite.erl index eaaf287ae..4f29b51d1 100644 --- a/src/emqttd_mod_rewrite.erl +++ b/src/emqttd_mod_rewrite.erl @@ -35,7 +35,7 @@ -export([load/1, reload/1, unload/1]). --export([rewrite/2]). +-export([rewrite/3, rewrite/4]). %%%============================================================================= %%% API @@ -45,22 +45,22 @@ load(Opts) -> File = proplists:get_value(file, Opts), {ok, Terms} = file:consult(File), Sections = compile(Terms), - emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe}, + emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe}, {?MODULE, rewrite, [subscribe, Sections]}), - emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}, + emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}, {?MODULE, rewrite, [unsubscribe, Sections]}), - emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, + emqttd_broker:hook('client.publish', {?MODULE, rewrite_publish}, {?MODULE, rewrite, [publish, Sections]}). -rewrite(TopicTable, [subscribe, Sections]) -> +rewrite(_ClientId, TopicTable, subscribe, Sections) -> lager:info("rewrite subscribe: ~p", [TopicTable]), [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]; -rewrite(Topics, [unsubscribe, Sections]) -> +rewrite(_ClientId, Topics, unsubscribe, Sections) -> lager:info("rewrite unsubscribe: ~p", [Topics]), - [match_topic(Topic, Sections) || Topic <- Topics]; + [match_topic(Topic, Sections) || Topic <- Topics]. -rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) -> +rewrite(Message=#mqtt_message{topic = Topic}, publish, Sections) -> %%TODO: this will not work if the client is always online. RewriteTopic = case get({rewrite, Topic}) of @@ -83,9 +83,9 @@ reload(File) -> end. unload(_) -> - emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}), - emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}), - emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}). + emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}), + emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), + emqttd_broker:unhook('client.publish', {?MODULE, rewrite_publish}). %%%============================================================================= %%% Internal functions @@ -116,7 +116,6 @@ match_rule(Topic, []) -> match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> - %%TODO: stupid??? how to replace $1, $2? Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), iolist_to_binary(lists:foldl( fun({Var, Val}, Acc) -> diff --git a/src/emqttd_packet.erl b/src/emqttd_packet.erl index 6cb6b415d..c2098c2ff 100644 --- a/src/emqttd_packet.erl +++ b/src/emqttd_packet.erl @@ -96,7 +96,7 @@ format_variable(#mqtt_packet_connect{ will_flag = WillFlag, clean_sess = CleanSess, keep_alive = KeepAlive, - clientid = ClientId, + client_id = ClientId, will_topic = WillTopic, will_msg = WillMsg, username = Username, diff --git a/src/emqttd_parser.erl b/src/emqttd_parser.erl index bcc9fa1b4..c0a398af8 100644 --- a/src/emqttd_parser.erl +++ b/src/emqttd_parser.erl @@ -112,7 +112,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) will_flag = bool(WillFlag), clean_sess = bool(CleanSession), keep_alive = KeepAlive, - clientid = ClientId, + client_id = ClientId, will_topic = WillTopic, will_msg = WillMsg, username = UserName, diff --git a/src/emqttd_protocol.erl b/src/emqttd_protocol.erl index 9be6b41fc..28ae04d56 100644 --- a/src/emqttd_protocol.erl +++ b/src/emqttd_protocol.erl @@ -47,7 +47,7 @@ proto_ver, proto_name, username, - clientid, + client_id, clean_sess, session, will_msg, @@ -70,25 +70,25 @@ init(Peername, SendFun, Opts) -> info(#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, - clientid = ClientId, + client_id = ClientId, clean_sess = CleanSess, will_msg = WillMsg}) -> [{proto_ver, ProtoVer}, {proto_name, ProtoName}, - {clientid, ClientId}, + {client_id, ClientId}, {clean_sess, CleanSess}, {will_msg, WillMsg}]. -clientid(#proto_state{clientid = ClientId}) -> +clientid(#proto_state{client_id = ClientId}) -> ClientId. client(#proto_state{peername = {Addr, _Port}, - clientid = ClientId, + client_id = ClientId, username = Username, clean_sess = CleanSess, proto_ver = ProtoVer, client_pid = Pid}) -> - #mqtt_client{clientid = ClientId, + #mqtt_client{client_id = ClientId, username = Username, ipaddress = Addr, clean_sess = CleanSess, @@ -126,12 +126,12 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, - clientid = ClientId} = Var, + client_id = ClientId} = Var, State1 = State0#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, username = Username, - clientid = ClientId, + client_id = ClientId, clean_sess = CleanSess}, trace(recv, Packet, State1), @@ -142,7 +142,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} case emqttd_access_control:auth(client(State1), Password) of ok -> %% Generate clientId if null - State2 = State1#proto_state{clientid = clientid(ClientId, State1)}, + State2 = State1#proto_state{client_id = clientid(ClientId, State1)}, %%Starting session {ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)), @@ -167,7 +167,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername} send(?CONNACK_PACKET(ReturnCode1), State3); handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), - State = #proto_state{clientid = ClientId}) -> + State = #proto_state{client_id = ClientId}) -> case check_acl(publish, Topic, State) of allow -> @@ -199,7 +199,7 @@ handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessio handle(?SUBSCRIBE_PACKET(PacketId, []), State) -> send(?SUBACK_PACKET(PacketId, []), State); -handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = ClientId, session = Session}) -> +handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = ClientId, session = Session}) -> AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable], case lists:member(deny, AllowDenies) of true -> @@ -207,7 +207,7 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), {ok, State}; false -> - TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable), + TopicTable1 = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable), %%TODO: GrantedQos should be renamed. {ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1), send(?SUBACK_PACKET(PacketId, GrantedQos), State) @@ -221,8 +221,9 @@ handle({subscribe, TopicTable}, State = #proto_state{session = Session}) -> handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) -> send(?UNSUBACK_PACKET(PacketId), State); -handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) -> - Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics), +handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{client_id = ClientId, + session = Session}) -> + Topics1 = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics), ok = emqttd_session:unsubscribe(Session, Topics1), send(?UNSUBACK_PACKET(PacketId), State); @@ -233,10 +234,10 @@ handle(?PACKET(?DISCONNECT), State) -> % clean willmsg {stop, normal, State#proto_state{will_msg = undefined}}. -publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{clientid = ClientId, session = Session}) -> +publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) -> emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)); -publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{clientid = ClientId, session = Session}) -> +publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of ok -> send(?PUBACK_PACKET(?PUBACK, PacketId), State); @@ -245,7 +246,7 @@ publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{clientid = Cli lager:error("Client ~s: publish qos1 error ~p", [ClientId, Error]) end; -publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{clientid = ClientId, session = Session}) -> +publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) -> case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of ok -> send(?PUBACK_PACKET(?PUBREC, PacketId), State); @@ -267,11 +268,11 @@ send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when SendFun(Data), {ok, State}. -trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> +trace(recv, Packet, #proto_state{peername = Peername, client_id = ClientId}) -> lager:info([{client, ClientId}], "RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]); -trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> +trace(send, Packet, #proto_state{peername = Peername, client_id = ClientId}) -> lager:info([{client, ClientId}], "SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]). @@ -282,10 +283,10 @@ redeliver({?PUBREL, PacketId}, State) -> shutdown(duplicate_id, _State) -> quiet; %% -shutdown(_, #proto_state{clientid = undefined}) -> +shutdown(_, #proto_state{client_id = undefined}) -> ignore; -shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> +shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) -> lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p", [ClientId, emqttd_net:format(Peername), Error]), send_willmsg(ClientId, WillMsg), @@ -333,16 +334,16 @@ validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) -> validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) -> lists:member({Ver, Name}, ?PROTOCOL_NAMES). -validate_clientid(#mqtt_packet_connect{clientid = ClientId}, #proto_state{max_clientid_len = MaxLen}) +validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen}) when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) -> true; %% MQTT3.1.1 allow null clientId. -validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, clientid = ClientId}, _ProtoState) +validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState) when size(ClientId) =:= 0 -> true; -validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, clientid = ClientId}, _ProtoState) -> +validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) -> lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]), false. diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 3699bfc9f..83f321fcd 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -157,19 +157,18 @@ cast(Msg) -> %% @end %%------------------------------------------------------------------------------ -spec publish(Msg :: mqtt_message()) -> ok. -publish(#mqtt_message{topic=Topic, from = From} = Msg) -> +publish(#mqtt_message{from = From} = Msg) -> trace(publish, From, Msg), - %%TODO:call hooks here... - %%Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg), + Msg1 = #mqtt_message{topic = Topic} = emqttd_broker:foldl_hooks('client.publish', [], Msg), %% Retain message first. Don't create retained topic. - case emqttd_msg_store:retain(Msg) of + case emqttd_retained:retain(Msg1) of ok -> %TODO: why unset 'retain' flag? - publish(Topic, emqttd_message:unset_flag(Msg)); + publish(Topic, emqttd_message:unset_flag(Msg1)); ignore -> - publish(Topic, Msg) + publish(Topic, Msg1) end. publish(<<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) -> diff --git a/src/emqttd_msg_store.erl b/src/emqttd_retained.erl similarity index 77% rename from src/emqttd_msg_store.erl rename to src/emqttd_retained.erl index fb2df9171..5c1b03610 100644 --- a/src/emqttd_msg_store.erl +++ b/src/emqttd_retained.erl @@ -24,7 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_msg_store). +-module(emqttd_retained). -author("Feng Lee "). @@ -37,21 +37,23 @@ -copy_mnesia({mnesia, [copy]}). %% API Function Exports --export([retain/1, redeliver/2]). +-export([retain/1, dispatch/2]). + +-record(mqtt_retained, {topic, message}). %%%============================================================================= %%% Mnesia callbacks %%%============================================================================= mnesia(boot) -> - ok = emqttd_mnesia:create_table(message, [ + ok = emqttd_mnesia:create_table(retained, [ {type, ordered_set}, {ram_copies, [node()]}, - {record_name, mqtt_message}, - {attributes, record_info(fields, mqtt_message)}]); + {record_name, mqtt_retained}, + {attributes, record_info(fields, mqtt_retained)}]); mnesia(copy) -> - ok = emqttd_mnesia:copy_table(message). + ok = emqttd_mnesia:copy_table(retained). %%%============================================================================= %%% API @@ -66,7 +68,7 @@ retain(#mqtt_message{retain = false}) -> ignore; %% RETAIN flag set to 1 and payload containing zero bytes retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> - mnesia:async_dirty(fun mnesia:delete/1, [{message, Topic}]); + mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]); retain(Msg = #mqtt_message{topic = Topic, retain = true, @@ -74,10 +76,10 @@ retain(Msg = #mqtt_message{topic = Topic, TabSize = mnesia:table_info(message, size), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> + Retained = #mqtt_retained{topic = Topic, message = Msg}, lager:debug("Retained ~s", [emqttd_message:format(Msg)]), - mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]), - emqttd_metrics:set('messages/retained/count', - mnesia:table_info(message, size)); + mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]), + emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size)); {false, _}-> lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]); {_, false}-> @@ -99,31 +101,25 @@ env() -> end. %%%----------------------------------------------------------------------------- -%% @doc Redeliver retained messages to subscribed client +%% @doc Deliver retained messages to subscribed client %% @end %%%----------------------------------------------------------------------------- --spec redeliver(Topic, CPid) -> any() when +-spec dispatch(Topic, CPid) -> any() when Topic :: binary(), CPid :: pid(). -redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) -> +dispatch(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) -> + Msgs = case emqttd_topic:wildcard(Topic) of false -> - dispatch(CPid, mnesia:dirty_read(message, Topic)); + [Msg || #mqtt_retained{message = Msg} <- mnesia:dirty_read(retained, Topic)]; true -> - Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) -> + Fun = fun(#mqtt_retained{topic = Name, message = Msg}, Acc) -> case emqttd_topic:match(Name, Topic) of true -> [Msg|Acc]; false -> Acc end end, - Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]), - dispatch(CPid, lists:reverse(Msgs)) - end. - -dispatch(_CPid, []) -> - ignore; -dispatch(CPid, Msgs) when is_list(Msgs) -> - [CPid ! {dispatch, Msg} || Msg <- Msgs]; -dispatch(CPid, Msg) when is_record(Msg, mqtt_message) -> - CPid ! {dispatch, Msg}. + mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained]) + end, + [CPid ! {dispatch, Msg} || Msg <- Msgs]. diff --git a/src/emqttd_serialiser.erl b/src/emqttd_serialiser.erl index e109c19a3..471904bce 100644 --- a/src/emqttd_serialiser.erl +++ b/src/emqttd_serialiser.erl @@ -60,7 +60,7 @@ serialise_header(#mqtt_packet_header{type = Type, VariableBin/binary, PayloadBin/binary>>. -serialise_variable(?CONNECT, #mqtt_packet_connect{clientid = ClientId, +serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId, proto_ver = ProtoVer, proto_name = ProtoName, will_retain = WillRetain, diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 93c9fd54b..da2bfdf72 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -73,13 +73,13 @@ clean_sess = true, %% ClientId: Identifier of Session - clientid :: binary(), + client_id :: binary(), %% Client Pid linked with session client_pid :: pid(), - %% Last message id of the session - message_id = 1, + %% Last packet id of the session + packet_id = 1, %% Client’s subscriptions. subscriptions :: list(), @@ -133,7 +133,7 @@ %% @doc Start a session. %% @end %%------------------------------------------------------------------------------ --spec start_link(boolean(), mqtt_clientid(), pid()) -> {ok, pid()} | {error, any()}. +-spec start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}. start_link(CleanSess, ClientId, ClientPid) -> gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []). @@ -141,7 +141,7 @@ start_link(CleanSess, ClientId, ClientPid) -> %% @doc Resume a session. %% @end %%------------------------------------------------------------------------------ --spec resume(pid(), mqtt_clientid(), pid()) -> ok. +-spec resume(pid(), mqtt_client_id(), pid()) -> ok. resume(Session, ClientId, ClientPid) -> gen_server:cast(Session, {resume, ClientId, ClientPid}). @@ -149,7 +149,7 @@ resume(Session, ClientId, ClientPid) -> %% @doc Destroy a session. %% @end %%------------------------------------------------------------------------------ --spec destroy(pid(), mqtt_clientid()) -> ok. +-spec destroy(pid(), mqtt_client_id()) -> ok. destroy(Session, ClientId) -> gen_server:call(Session, {destroy, ClientId}). @@ -182,21 +182,21 @@ publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> %% @doc PubAck message %% @end %%------------------------------------------------------------------------------ --spec puback(pid(), mqtt_msgid()) -> ok. -puback(Session, MsgId) -> - gen_server:cast(Session, {puback, MsgId}). +-spec puback(pid(), mqtt_packet_id()) -> ok. +puback(Session, PktId) -> + gen_server:cast(Session, {puback, PktId}). --spec pubrec(pid(), mqtt_msgid()) -> ok. -pubrec(Session, MsgId) -> - gen_server:cast(Session, {pubrec, MsgId}). +-spec pubrec(pid(), mqtt_packet_id()) -> ok. +pubrec(Session, PktId) -> + gen_server:cast(Session, {pubrec, PktId}). --spec pubrel(pid(), mqtt_msgid()) -> ok. -pubrel(Session, MsgId) -> - gen_server:cast(Session, {pubrel, MsgId}). +-spec pubrel(pid(), mqtt_packet_id()) -> ok. +pubrel(Session, PktId) -> + gen_server:cast(Session, {pubrel, PktId}). --spec pubcomp(pid(), mqtt_msgid()) -> ok. -pubcomp(Session, MsgId) -> - gen_server:cast(Session, {pubcomp, MsgId}). +-spec pubcomp(pid(), mqtt_packet_id()) -> ok. +pubcomp(Session, PktId) -> + gen_server:cast(Session, {pubcomp, PktId}). %%------------------------------------------------------------------------------ %% @doc Unsubscribe Topics @@ -217,7 +217,7 @@ init([CleanSess, ClientId, ClientPid]) -> SessEnv = emqttd:env(mqtt, session), Session = #session{ clean_sess = CleanSess, - clientid = ClientId, + client_id = ClientId, client_pid = ClientPid, subscriptions = [], inflight_queue = [], @@ -234,7 +234,7 @@ init([CleanSess, ClientId, ClientPid]) -> timestamp = os:timestamp()}, {ok, Session, hibernate}. -handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, +handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> %% subscribe first and don't care if the subscriptions have been existed @@ -258,13 +258,13 @@ handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, %% : 3.8.4 %% Where the Topic Filter is not identical to any existing Subscription’s filter, %% a new Subscription is created and all matching retained messages are sent. - emqttd_msg_store:redeliver(Topic, self()), + emqttd_retained:dispatch(Topic, self()), [{Topic, Qos} | Acc] end end, Subscriptions, Topics), {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}; -handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId, +handle_call({unsubscribe, Topics}, _From, Session = #session{client_id = ClientId, subscriptions = Subscriptions}) -> %% unsubscribe from topic tree @@ -284,22 +284,22 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId {reply, ok, Session#session{subscriptions = Subscriptions1}}; -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From, - Session = #session{clientid = ClientId, +handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From, + Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> case check_awaiting_rel(Session) of true -> - TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}), - AwaitingRel1 = maps:put(MsgId, {Msg, TRef}, AwaitingRel), + TRef = timer(Timeout, {timeout, awaiting_rel, PktId}), + AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel), {reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; false -> - lager:critical([{clientid, ClientId}], "Session ~s dropped Qos2 message " + lager:critical([{client, ClientId}], "Session ~s dropped Qos2 message " "for too many awaiting_rel: ~p", [ClientId, Msg]), {reply, {error, dropped}, Session} end; -handle_call({destroy, ClientId}, _From, Session = #session{clientid = ClientId}) -> +handle_call({destroy, ClientId}, _From, Session = #session{client_id = ClientId}) -> lager:warning("Session ~s destroyed", [ClientId]), {stop, {shutdown, destroy}, ok, Session}; @@ -309,7 +309,7 @@ handle_call(Req, _From, State) -> handle_cast({resume, ClientId, ClientPid}, Session) -> - #session{clientid = ClientId, + #session{client_id = ClientId, client_pid = OldClientPid, inflight_queue = InflightQ, awaiting_ack = AwaitingAck, @@ -326,7 +326,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> true = link(ClientPid), %% Redeliver PUBREL - [ClientPid ! {redeliver, {?PUBREL, MsgId}} || MsgId <- maps:keys(AwaitingComp)], + [ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)], %% Clear awaiting_ack timers [cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)], @@ -349,54 +349,54 @@ handle_cast({resume, ClientId, ClientPid}, Session) -> {noreply, dequeue(Session2), hibernate}; %% PUBRAC -handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, awaiting_ack = Awaiting}) -> - case maps:find(MsgId, Awaiting) of +handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) -> + case maps:find(PktId, Awaiting) of {ok, {_, TRef}} -> cancel_timer(TRef), - Session1 = acked(MsgId, Session), + Session1 = acked(PktId, Session), {noreply, dequeue(Session1)}; error -> - lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, MsgId]), + lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]), {noreply, Session} end; %% PUBREC -handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId, +handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp, await_rel_timeout = Timeout}) -> - case maps:find(MsgId, AwaitingAck) of + case maps:find(PktId, AwaitingAck) of {ok, {_, TRef}} -> cancel_timer(TRef), - TRef1 = timer(Timeout, {timeout, awaiting_comp, MsgId}), - Session1 = acked(MsgId, Session#session{awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)}), + TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}), + Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}), {noreply, dequeue(Session1)}; error -> - lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, MsgId]), + lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]), {noreply, Session} end; %% PUBREL -handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, +handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel}) -> - case maps:find(MsgId, AwaitingRel) of + case maps:find(PktId, AwaitingRel) of {ok, {Msg, TRef}} -> cancel_timer(TRef), emqttd_pubsub:publish(Msg), - {noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}}; + {noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}}; error -> - lager:error("Session ~s cannot find PUBREL: msgid=~p!", [ClientId, MsgId]), + lager:error("Session ~s cannot find PUBREL: pktid=~p!", [ClientId, PktId]), {noreply, Session} end; %% PUBCOMP -handle_cast({pubcomp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}) -> - case maps:find(MsgId, AwaitingComp) of +handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) -> + case maps:find(PktId, AwaitingComp) of {ok, TRef} -> cancel_timer(TRef), - {noreply, Session#session{awaiting_comp = maps:remove(MsgId, AwaitingComp)}}; + {noreply, Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}}; error -> - lager:error("Session ~s cannot find PUBCOMP: MsgId=~p", [ClientId, MsgId]), + lager:error("Session ~s cannot find PUBCOMP: PktId=~p", [ClientId, PktId]), {noreply, Session} end; @@ -417,7 +417,7 @@ handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}}, {noreply, Session}; handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, - Session = #session{clientid = ClientId, message_queue = MsgQ}) + Session = #session{client_id = ClientId, message_queue = MsgQ}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case check_inflight(Session) of @@ -428,51 +428,51 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, {noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}} end; -handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_pid = undefined, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined, awaiting_ack = AwaitingAck}) -> %% just remove awaiting - {noreply, Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck)}}; + {noreply, Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}}; -handle_info({timeout, awaiting_ack, MsgId}, Session = #session{clientid = ClientId, +handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId, inflight_queue = InflightQ, awaiting_ack = AwaitingAck}) -> - case maps:find(MsgId, AwaitingAck) of + case maps:find(PktId, AwaitingAck) of {ok, {{0, _Timeout}, _TRef}} -> - Session1 = Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ), - awaiting_ack = maps:remove(MsgId, AwaitingAck)}, + Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), + awaiting_ack = maps:remove(PktId, AwaitingAck)}, {noreply, dequeue(Session1)}; {ok, {{Retries, Timeout}, _TRef}} -> - TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}), - AwaitingAck1 = maps:put(MsgId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), + TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), + AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck), {noreply, Session#session{awaiting_ack = AwaitingAck1}}; error -> lager:error([{client, ClientId}], "Session ~s " - "cannot find Awaiting Ack:~p", [ClientId, MsgId]), + "cannot find Awaiting Ack:~p", [ClientId, PktId]), {noreply, Session} end; -handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId, +handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId, awaiting_rel = AwaitingRel}) -> - case maps:find(MsgId, AwaitingRel) of + case maps:find(PktId, AwaitingRel) of {ok, {Msg, _TRef}} -> lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n" "Drop Message:~p", [ClientId, Msg]), - {noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}}; + {noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}}; error -> - lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: MsgId=~p", [ClientId, MsgId]), + lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: PktId=~p", [ClientId, PktId]), {noreply, Session} end; -handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId, +handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = ClientId, awaiting_comp = Awaiting}) -> - case maps:find(MsgId, Awaiting) of + case maps:find(PktId, Awaiting) of {ok, _TRef} -> lager:error([{client, ClientId}], "Session ~s " - "Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]), - {noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}}; + "Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]), + {noreply, Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}}; error -> lager:error([{client, ClientId}], "Session ~s " - "Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]), + "Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]), {noreply, Session} end; @@ -481,25 +481,25 @@ handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, {stop, normal, Session}; handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, - clientid = ClientId, + client_id = ClientId, client_pid = ClientPid, expired_after = Expires}) -> lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]), TRef = timer(Expires, session_expired), {noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate}; -handle_info({'EXIT', Pid, _Reason}, Session = #session{clientid = ClientId, +handle_info({'EXIT', Pid, _Reason}, Session = #session{client_id = ClientId, client_pid = ClientPid}) -> lager:error("Session ~s received unexpected EXIT:" " client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]), {noreply, Session}; -handle_info(session_expired, Session = #session{clientid = ClientId}) -> +handle_info(session_expired, Session = #session{client_id = ClientId}) -> lager:error("Session ~s expired, shutdown now!", [ClientId]), {stop, {shutdown, expired}, Session}; -handle_info(Info, Session = #session{clientid = ClientId}) -> +handle_info(Info, Session = #session{client_id = ClientId}) -> lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]), {noreply, Session}. @@ -566,13 +566,13 @@ dequeue2(Session = #session{message_queue = Q}) -> deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> ClientPid ! {deliver, Msg}, Session; -deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{message_id = MsgId, +deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId, client_pid = ClientPid, inflight_queue = InflightQ}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> - Msg1 = Msg#mqtt_message{msgid = MsgId, dup = false}, + Msg1 = Msg#mqtt_message{pktid = PktId, dup = false}, ClientPid ! {deliver, Msg1}, - await(Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1}|InflightQ]})). + await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})). redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) -> deliver(Msg, Session); @@ -585,23 +585,23 @@ redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = Client %%------------------------------------------------------------------------------ %% Awaiting ack for qos1, qos2 message %%------------------------------------------------------------------------------ -await(#mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting, +await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting, unack_retries = Retries, unack_timeout = Timeout}) -> - TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}), - Awaiting1 = maps:put(MsgId, {{Retries, Timeout}, TRef}, Awaiting), + TRef = timer(Timeout, {timeout, awaiting_ack, PktId}), + Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting), Session#session{awaiting_ack = Awaiting1}. -acked(MsgId, Session = #session{inflight_queue = InflightQ, +acked(PktId, Session = #session{inflight_queue = InflightQ, awaiting_ack = Awaiting}) -> - Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ), - awaiting_ack = maps:remove(MsgId, Awaiting)}. + Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ), + awaiting_ack = maps:remove(PktId, Awaiting)}. -next_msgid(Session = #session{message_id = 16#ffff}) -> - Session#session{message_id = 1}; +next_packet_id(Session = #session{packet_id = 16#ffff}) -> + Session#session{packet_id = 1}; -next_msgid(Session = #session{message_id = MsgId}) -> - Session#session{message_id = MsgId + 1}. +next_packet_id(Session = #session{packet_id = Id}) -> + Session#session{packet_id = Id + 1}. timer(Timeout, TimeoutMsg) -> erlang:send_after(Timeout * 1000, self(), TimeoutMsg). diff --git a/src/emqttd_sm.erl b/src/emqttd_sm.erl index 59e32f2a1..80d9a9a66 100644 --- a/src/emqttd_sm.erl +++ b/src/emqttd_sm.erl @@ -34,6 +34,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_sm). -author("Feng Lee "). @@ -67,12 +68,11 @@ %% @doc Start a session manager %% @end %%------------------------------------------------------------------------------ --spec start_link(Id, SessStatsFun) -> {ok, pid()} | ignore | {error, any()} when +-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when Id :: pos_integer(), - %ClientStatsFun :: fun(), - SessStatsFun :: fun(). -start_link(Id, SessStatsFun) -> - gen_server:start_link(?MODULE, [Id, SessStatsFun], []). + StatsFun :: {fun(), fun()}. +start_link(Id, StatsFun) -> + gen_server:start_link(?MODULE, [Id, StatsFun], []). %%------------------------------------------------------------------------------ %% @doc Pool name. @@ -103,7 +103,7 @@ start_session(CleanSess, ClientId) -> -spec lookup_session(binary()) -> pid() | undefined. lookup_session(ClientId) -> case ets:lookup(?SESSION_TAB, ClientId) of - [{_, SessPid, _}] -> SessPid; + [{_Clean, _, SessPid, _}] -> SessPid; [] -> undefined end. @@ -129,7 +129,7 @@ init([Id, StatsFun]) -> handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> Reply = case ets:lookup(?SESSION_TAB, ClientId) of - [{_, SessPid, _MRef}] -> + [{_Clean, _, SessPid, _MRef}] -> emqttd_session:resume(SessPid, ClientId, ClientPid), {ok, SessPid}; [] -> @@ -139,7 +139,7 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> case ets:lookup(?SESSION_TAB, ClientId) of - [{_, SessPid, MRef}] -> + [{_Clean, _, SessPid, MRef}] -> erlang:demonitor(MRef, [flush]), emqttd_session:destroy(SessPid, ClientId); [] -> @@ -149,7 +149,7 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> handle_call({destroy_session, ClientId}, _From, State) -> case ets:lookup(?SESSION_TAB, ClientId) of - [{_, SessPid, MRef}] -> + [{_Clean, _, SessPid, MRef}] -> emqttd_session:destroy(SessPid, ClientId), erlang:demonitor(MRef, [flush]), ets:delete(?SESSION_TAB, ClientId); @@ -165,7 +165,7 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> - ets:match_delete(?SESSION_TAB, {'_', DownPid, MRef}), + ets:match_delete(?SESSION_TAB, {'_', '_', DownPid, MRef}), {noreply, setstats(State)}; handle_info(_Info, State) -> @@ -184,13 +184,14 @@ code_change(_OldVsn, State, _Extra) -> new_session(CleanSess, ClientId, ClientPid) -> case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of {ok, SessPid} -> - ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), + MRef = erlang:monitor(process, SessPid), + ets:insert(?SESSION_TAB, {CleanSess, ClientId, SessPid, MRef}), {ok, SessPid}; {error, Error} -> {error, Error} end. -setstats(State = #state{statsfun = StatsFun}) -> - StatsFun(ets:info(?SESSION_TAB, size)), State. - - +setstats(State = #state{statsfun = {CFun, SFun}}) -> + CFun(ets:info(?SESSION_TAB, size)), + SFun(ets:select_count(?SESSION_TAB, [{{true, '_', '_', '_'}, [], [true]}])), + State. diff --git a/src/emqttd_sm_sup.erl b/src/emqttd_sm_sup.erl index 03ee63a47..8f5dbd0a4 100644 --- a/src/emqttd_sm_sup.erl +++ b/src/emqttd_sm_sup.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_sm_sup). -author("Feng Lee "). @@ -42,19 +43,20 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - ets:new(emqttd_sm:table(), [set, named_table, public, + ets:new(emqttd_sm:table(), [set, named_table, public, {keypos, 2}, {write_concurrency, true}]), Schedulers = erlang:system_info(schedulers), gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), - %%ClientStatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), - SessStatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), Children = lists:map( fun(I) -> Name = {emqttd_sm, I}, gproc_pool:add_worker(emqttd_sm:pool(), Name, I), - {Name, {emqttd_sm, start_link, [I, SessStatsFun]}, + {Name, {emqttd_sm, start_link, [I, statsfun()]}, permanent, 10000, worker, [emqttd_sm]} end, lists:seq(1, Schedulers)), {ok, {{one_for_all, 10, 100}, Children}}. +statsfun() -> + {emqttd_stats:statsfun('clients/count', 'clients/max'), + emqttd_stats:statsfun('sessions/count', 'sessions/max')}. diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 1f005369e..8fa51fcc2 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -124,6 +124,7 @@ setstat(Stat, Val) -> %%------------------------------------------------------------------------------ %% @doc Set stats with max +%% TODO: this is wrong... %% @end %%------------------------------------------------------------------------------ -spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean(). @@ -174,9 +175,9 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= publish(Stat, Val) -> - emqttd_pubsub:publish(#mqtt_message{from = stats, - topic = stats_topic(Stat), - payload = emqttd_util:integer_to_binary(Val)}). + Msg = emqttd_message:make(stats, stats_topic(Stat), + emqttd_util:integer_to_binary(Val)), + emqttd_pubsub:publish(Msg). stats_topic(Stat) -> emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). diff --git a/test/emqttd_access_control_tests.erl b/test/emqttd_access_control_tests.erl index 6b55a0fae..7db0490ef 100644 --- a/test/emqttd_access_control_tests.erl +++ b/test/emqttd_access_control_tests.erl @@ -71,8 +71,8 @@ unregister_mod_test() -> check_acl_test() -> with_acl( fun() -> - User1 = #mqtt_client{clientid = <<"client1">>, username = <<"testuser">>}, - User2 = #mqtt_client{clientid = <<"client2">>, username = <<"xyz">>}, + User1 = #mqtt_client{client_id = <<"client1">>, username = <<"testuser">>}, + User2 = #mqtt_client{client_id = <<"client2">>, username = <<"xyz">>}, ?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)), ?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1">>)), ?assertEqual(deny, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1/x/y">>)), diff --git a/test/emqttd_access_rule_tests.erl b/test/emqttd_access_rule_tests.erl index 8f25a787f..2bae086ea 100644 --- a/test/emqttd_access_rule_tests.erl +++ b/test/emqttd_access_rule_tests.erl @@ -53,8 +53,8 @@ compile_test() -> ?assertEqual({deny, all}, compile({deny, all})). match_test() -> - User = #mqtt_client{ipaddress = {127,0,0,1}, clientid = <<"testClient">>, username = <<"TestUser">>}, - User2 = #mqtt_client{ipaddress = {192,168,0,10}, clientid = <<"testClient">>, username = <<"TestUser">>}, + User = #mqtt_client{ipaddress = {127,0,0,1}, client_id = <<"testClient">>, username = <<"TestUser">>}, + User2 = #mqtt_client{ipaddress = {192,168,0,10}, client_id = <<"testClient">>, username = <<"TestUser">>}, ?assertEqual({matched, allow}, match(User, <<"Test/Topic">>, {allow, all})), ?assertEqual({matched, deny}, match(User, <<"Test/Topic">>, {deny, all})), diff --git a/test/emqttd_guid_tests.erl b/test/emqttd_guid_tests.erl new file mode 100644 index 000000000..1ddf0610c --- /dev/null +++ b/test/emqttd_guid_tests.erl @@ -0,0 +1,38 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- + +-module(emqttd_guid_tests). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +gen_test() -> + Guid1 = emqttd_guid:gen(), + Guid2 = emqttd_guid:gen(), + ?assertMatch(<<_:128>>, Guid1), + ?assertEqual(true, Guid2 >= Guid1). + +-endif. + + + diff --git a/test/emqttd_parser_tests.erl b/test/emqttd_parser_tests.erl index fb15019eb..c415b4907 100644 --- a/test/emqttd_parser_tests.erl +++ b/test/emqttd_parser_tests.erl @@ -26,10 +26,10 @@ %%%----------------------------------------------------------------------------- -module(emqttd_parser_tests). --include("emqttd_protocol.hrl"). - -ifdef(TEST). +-include("emqttd_protocol.hrl"). + -include_lib("eunit/include/eunit.hrl"). parse_connect_test() -> @@ -43,7 +43,7 @@ parse_connect_test() -> retain = false}, variable = #mqtt_packet_connect{proto_ver = 3, proto_name = <<"MQIsdp">>, - clientid = <<"mosqpub/10451-iMac.loca">>, + client_id = <<"mosqpub/10451-iMac.loca">>, clean_sess = true, keep_alive = 60}}, <<>>}, emqttd_parser:parse(V31ConnBin, State)), %% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined) @@ -55,7 +55,7 @@ parse_connect_test() -> retain = false}, variable = #mqtt_packet_connect{proto_ver = 4, proto_name = <<"MQTT">>, - clientid = <<"mosqpub/10451-iMac.loca">>, + client_id = <<"mosqpub/10451-iMac.loca">>, clean_sess = true, keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnBin, State)), @@ -68,7 +68,7 @@ parse_connect_test() -> retain = false}, variable = #mqtt_packet_connect{proto_ver = 4, proto_name = <<"MQTT">>, - clientid = <<>>, + client_id = <<>>, clean_sess = true, keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnWithoutClientId, State)), %%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg)) @@ -80,7 +80,7 @@ parse_connect_test() -> retain = false}, variable = #mqtt_packet_connect{proto_ver = 3, proto_name = <<"MQIsdp">>, - clientid = <<"mosqpub/10452-iMac.loca">>, + client_id = <<"mosqpub/10452-iMac.loca">>, clean_sess = true, keep_alive = 60, will_retain = false,