Merge branch 'dev' into dev-hd
This commit is contained in:
commit
474aea2c71
|
@ -2,7 +2,7 @@
|
||||||
emqttd ChangeLog
|
emqttd ChangeLog
|
||||||
==================
|
==================
|
||||||
|
|
||||||
0.9.0-alpha (2015-06-14)
|
0.9.0-alpha (2015-07-xx)
|
||||||
-------------------------
|
-------------------------
|
||||||
|
|
||||||
Session and Queue
|
Session and Queue
|
||||||
|
@ -19,6 +19,12 @@ Alarm
|
||||||
|
|
||||||
Protocol Compliant
|
Protocol Compliant
|
||||||
|
|
||||||
|
Global msgid
|
||||||
|
|
||||||
|
Hooks
|
||||||
|
|
||||||
|
Plugins
|
||||||
|
|
||||||
|
|
||||||
0.8.6-beta (2015-06-17)
|
0.8.6-beta (2015-06-17)
|
||||||
-------------------------
|
-------------------------
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
## Mongodb ObjectId
|
||||||
|
|
||||||
|
* 4-byte value representing the seconds since the Unix epoch,
|
||||||
|
* 3-byte machine identifier,
|
||||||
|
* 2-byte process id, and
|
||||||
|
* 3-byte counter, starting with a random value.
|
||||||
|
|
||||||
|
## Flake Id
|
||||||
|
|
||||||
|
* 64bits Timestamp
|
||||||
|
* 48bits WorkerId
|
||||||
|
* 16bits Sequence
|
||||||
|
|
||||||
|
## emqttd Id
|
||||||
|
|
||||||
|
* 64bits Timestamp: erlang:now(), erlang:system_time
|
||||||
|
* 48bits (node+pid): Node + Pid -> Integer
|
||||||
|
* 16bits Sequence: PktId
|
||||||
|
|
|
@ -83,12 +83,11 @@
|
||||||
%% MQTT Client
|
%% MQTT Client
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-record(mqtt_client, {
|
-record(mqtt_client, {
|
||||||
clientid :: binary() | undefined,
|
client_id :: binary() | undefined,
|
||||||
username :: binary() | undefined,
|
username :: binary() | undefined,
|
||||||
ipaddress :: inet:ip_address(),
|
ipaddress :: inet:ip_address(),
|
||||||
client_pid :: pid(),
|
|
||||||
client_mon :: reference(),
|
|
||||||
clean_sess :: boolean(),
|
clean_sess :: boolean(),
|
||||||
|
client_pid :: pid(),
|
||||||
proto_ver :: 3 | 4
|
proto_ver :: 3 | 4
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -98,7 +97,7 @@
|
||||||
%% MQTT Session
|
%% MQTT Session
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-record(mqtt_session, {
|
-record(mqtt_session, {
|
||||||
clientid,
|
client_id,
|
||||||
session_pid,
|
session_pid,
|
||||||
subscriptions = []
|
subscriptions = []
|
||||||
}).
|
}).
|
||||||
|
@ -108,18 +107,20 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Message
|
%% MQTT Message
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-type mqtt_msgid() :: undefined | 1..16#ffff.
|
-type mqtt_msgid() :: binary() | undefined.
|
||||||
|
-type mqtt_pktid() :: 1..16#ffff | undefined.
|
||||||
|
|
||||||
-record(mqtt_message, {
|
-record(mqtt_message, {
|
||||||
|
msgid :: mqtt_msgid(), %% Unique Message ID
|
||||||
|
pktid :: 1..16#ffff, %% PacketId
|
||||||
topic :: binary(), %% Topic that the message is published to
|
topic :: binary(), %% Topic that the message is published to
|
||||||
from :: binary() | atom(), %% ClientId of publisher
|
from :: binary() | atom(), %% ClientId of publisher
|
||||||
qos = 0 :: 0 | 1 | 2, %% Message QoS
|
qos = 0 :: 0 | 1 | 2, %% Message QoS
|
||||||
retain = false :: boolean(), %% Retain flag
|
retain = false :: boolean(), %% Retain flag
|
||||||
dup = false :: boolean(), %% Dup flag
|
dup = false :: boolean(), %% Dup flag
|
||||||
sys = false :: boolean(), %% $SYS flag
|
sys = false :: boolean(), %% $SYS flag
|
||||||
msgid :: mqtt_msgid(), %% Message ID
|
|
||||||
payload :: binary(), %% Payload
|
payload :: binary(), %% Payload
|
||||||
timestamp :: erlang:timestamp() %% Timestamp
|
timestamp :: erlang:timestamp() %% os:timestamp
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type mqtt_message() :: #mqtt_message{}.
|
-type mqtt_message() :: #mqtt_message{}.
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
-define(MQTT_PROTO_V311, 4).
|
-define(MQTT_PROTO_V311, 4).
|
||||||
|
|
||||||
-define(PROTOCOL_NAMES, [
|
-define(PROTOCOL_NAMES, [
|
||||||
{?MQTT_PROTO_V31, <<"MQIsdp">>},
|
{?MQTT_PROTO_V31, <<"MQIsdp">>},
|
||||||
{?MQTT_PROTO_V311, <<"MQTT">>}]).
|
{?MQTT_PROTO_V311, <<"MQTT">>}]).
|
||||||
|
|
||||||
-type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311.
|
-type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311.
|
||||||
|
@ -122,11 +122,11 @@
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% MQTT Packets
|
%% MQTT Packets
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-type mqtt_clientid() :: binary().
|
-type mqtt_client_id() :: binary().
|
||||||
-type mqtt_packet_id() :: 1..16#ffff | undefined.
|
-type mqtt_packet_id() :: 1..16#ffff | undefined.
|
||||||
|
|
||||||
-record(mqtt_packet_connect, {
|
-record(mqtt_packet_connect, {
|
||||||
clientid = <<>> :: mqtt_clientid(),
|
client_id = <<>> :: mqtt_client_id(),
|
||||||
proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(),
|
proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(),
|
||||||
proto_name = <<"MQTT">> :: binary(),
|
proto_name = <<"MQTT">> :: binary(),
|
||||||
will_retain = false :: boolean(),
|
will_retain = false :: boolean(),
|
||||||
|
|
|
@ -97,7 +97,7 @@ check_acl(Client, PubSub, Topic) when PubSub =:= publish orelse PubSub =:= subsc
|
||||||
[] -> allow;
|
[] -> allow;
|
||||||
AclMods -> check_acl(Client, PubSub, Topic, AclMods)
|
AclMods -> check_acl(Client, PubSub, Topic, AclMods)
|
||||||
end.
|
end.
|
||||||
check_acl(#mqtt_client{clientid = ClientId}, PubSub, Topic, []) ->
|
check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) ->
|
||||||
lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]),
|
lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]),
|
||||||
allow;
|
allow;
|
||||||
check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) ->
|
check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) ->
|
||||||
|
|
|
@ -110,7 +110,7 @@ match_who(_Client, {user, all}) ->
|
||||||
true;
|
true;
|
||||||
match_who(_Client, {client, all}) ->
|
match_who(_Client, {client, all}) ->
|
||||||
true;
|
true;
|
||||||
match_who(#mqtt_client{clientid = ClientId}, {client, ClientId}) ->
|
match_who(#mqtt_client{client_id = ClientId}, {client, ClientId}) ->
|
||||||
true;
|
true;
|
||||||
match_who(#mqtt_client{username = Username}, {user, Username}) ->
|
match_who(#mqtt_client{username = Username}, {user, Username}) ->
|
||||||
true;
|
true;
|
||||||
|
@ -145,9 +145,9 @@ feed_var(Client, Pattern) ->
|
||||||
feed_var(Client, Pattern, []).
|
feed_var(Client, Pattern, []).
|
||||||
feed_var(_Client, [], Acc) ->
|
feed_var(_Client, [], Acc) ->
|
||||||
lists:reverse(Acc);
|
lists:reverse(Acc);
|
||||||
feed_var(Client = #mqtt_client{clientid = undefined}, [<<"$c">>|Words], Acc) ->
|
feed_var(Client = #mqtt_client{client_id = undefined}, [<<"$c">>|Words], Acc) ->
|
||||||
feed_var(Client, Words, [<<"$c">>|Acc]);
|
feed_var(Client, Words, [<<"$c">>|Acc]);
|
||||||
feed_var(Client = #mqtt_client{clientid = ClientId}, [<<"$c">>|Words], Acc) ->
|
feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"$c">>|Words], Acc) ->
|
||||||
feed_var(Client, Words, [ClientId |Acc]);
|
feed_var(Client, Words, [ClientId |Acc]);
|
||||||
feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) ->
|
feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) ->
|
||||||
feed_var(Client, Words, [<<"$u">>|Acc]);
|
feed_var(Client, Words, [<<"$u">>|Acc]);
|
||||||
|
|
|
@ -121,12 +121,10 @@ terminate(_, _) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
alarm_msg(Type, AlarmId, Json) ->
|
alarm_msg(Type, AlarmId, Json) ->
|
||||||
#mqtt_message{from = alarm,
|
Msg = emqttd_message:make(alarm,
|
||||||
qos = 1,
|
topic(Type, AlarmId),
|
||||||
sys = true,
|
iolist_to_binary(Json)),
|
||||||
topic = topic(Type, AlarmId),
|
emqttd_message:set_flag(sys, Msg).
|
||||||
payload = iolist_to_binary(Json),
|
|
||||||
timestamp = os:timestamp()}.
|
|
||||||
|
|
||||||
topic(alert, AlarmId) ->
|
topic(alert, AlarmId) ->
|
||||||
emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);
|
emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);
|
||||||
|
|
|
@ -41,7 +41,7 @@
|
||||||
|
|
||||||
-define(AUTH_CLIENTID_TAB, mqtt_auth_clientid).
|
-define(AUTH_CLIENTID_TAB, mqtt_auth_clientid).
|
||||||
|
|
||||||
-record(?AUTH_CLIENTID_TAB, {clientid, ipaddr, password}).
|
-record(?AUTH_CLIENTID_TAB, {client_id, ipaddr, password}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -52,7 +52,7 @@
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
add_clientid(ClientId) when is_binary(ClientId) ->
|
add_clientid(ClientId) when is_binary(ClientId) ->
|
||||||
R = #mqtt_auth_clientid{clientid = ClientId},
|
R = #mqtt_auth_clientid{client_id = ClientId},
|
||||||
mnesia:transaction(fun() -> mnesia:write(R) end).
|
mnesia:transaction(fun() -> mnesia:write(R) end).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -60,7 +60,7 @@ add_clientid(ClientId) when is_binary(ClientId) ->
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
add_clientid(ClientId, Password) ->
|
add_clientid(ClientId, Password) ->
|
||||||
R = #mqtt_auth_clientid{clientid = ClientId, password = Password},
|
R = #mqtt_auth_clientid{client_id = ClientId, password = Password},
|
||||||
mnesia:transaction(fun() -> mnesia:write(R) end).
|
mnesia:transaction(fun() -> mnesia:write(R) end).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -99,15 +99,15 @@ init(Opts) ->
|
||||||
end,
|
end,
|
||||||
{ok, Opts}.
|
{ok, Opts}.
|
||||||
|
|
||||||
check(#mqtt_client{clientid = undefined}, _Password, []) ->
|
check(#mqtt_client{client_id = undefined}, _Password, []) ->
|
||||||
{error, "ClientId undefined"};
|
{error, "ClientId undefined"};
|
||||||
check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, []) ->
|
check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, []) ->
|
||||||
check_clientid_only(ClientId, IpAddress);
|
check_clientid_only(ClientId, IpAddress);
|
||||||
check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) ->
|
check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) ->
|
||||||
check_clientid_only(ClientId, IpAddress);
|
check_clientid_only(ClientId, IpAddress);
|
||||||
check(_Client, undefined, [{password, yes}|_]) ->
|
check(_Client, undefined, [{password, yes}|_]) ->
|
||||||
{error, "Password undefined"};
|
{error, "Password undefined"};
|
||||||
check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) ->
|
check(#mqtt_client{client_id = ClientId}, Password, [{password, yes}|_]) ->
|
||||||
case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
|
case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
|
||||||
[] -> {error, "ClientId Not Found"};
|
[] -> {error, "ClientId Not Found"};
|
||||||
[#?AUTH_CLIENTID_TAB{password = Password}] -> ok; %% TODO: plaintext??
|
[#?AUTH_CLIENTID_TAB{password = Password}] -> ok; %% TODO: plaintext??
|
||||||
|
@ -129,11 +129,11 @@ load(Fd, {ok, Line}, Clients) when is_list(Line) ->
|
||||||
case string:tokens(Line, " ") of
|
case string:tokens(Line, " ") of
|
||||||
[ClientIdS] ->
|
[ClientIdS] ->
|
||||||
ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)),
|
ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)),
|
||||||
[#mqtt_auth_clientid{clientid = ClientId} | Clients];
|
[#mqtt_auth_clientid{client_id = ClientId} | Clients];
|
||||||
[ClientId, IpAddr0] ->
|
[ClientId, IpAddr0] ->
|
||||||
IpAddr = string:strip(IpAddr0, right, $\n),
|
IpAddr = string:strip(IpAddr0, right, $\n),
|
||||||
Range = esockd_access:range(IpAddr),
|
Range = esockd_access:range(IpAddr),
|
||||||
[#mqtt_auth_clientid{clientid = list_to_binary(ClientId),
|
[#mqtt_auth_clientid{client_id = list_to_binary(ClientId),
|
||||||
ipaddr = {IpAddr, Range}}|Clients];
|
ipaddr = {IpAddr, Range}}|Clients];
|
||||||
BadLine ->
|
BadLine ->
|
||||||
lager:error("BadLine in clients.config: ~s", [BadLine]),
|
lager:error("BadLine in clients.config: ~s", [BadLine]),
|
||||||
|
|
|
@ -81,7 +81,7 @@ init([Node, SubTopic, Options]) ->
|
||||||
true ->
|
true ->
|
||||||
true = erlang:monitor_node(Node, true),
|
true = erlang:monitor_node(Node, true),
|
||||||
State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}),
|
State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}),
|
||||||
emqttd_pubsub:subscribe({SubTopic, ?QOS_0}),
|
emqttd_pubsub:subscribe({SubTopic, State#state.qos}),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
false ->
|
false ->
|
||||||
{stop, {cannot_connect, Node}}
|
{stop, {cannot_connect, Node}}
|
||||||
|
@ -107,7 +107,7 @@ handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({dispatch, Msg}, State = #state{node = Node, status = down}) ->
|
handle_info({dispatch, Msg}, State = #state{node = Node, status = down}) ->
|
||||||
lager:warning("Bridge Dropped Msg for ~p Down:~n~p", [Node, Msg]),
|
lager:error("Bridge Dropped Msg for ~p Down: ~s", [Node, emqttd_message:format(Msg)]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) ->
|
handle_info({dispatch, Msg}, State = #state{node = Node, status = up}) ->
|
||||||
|
@ -159,14 +159,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
%TODO: qos is not right...
|
transform(Msg = #mqtt_message{topic = Topic}, #state{topic_prefix = Prefix,
|
||||||
transform(Msg = #mqtt_message{topic = Topic}, #state{qos = Qos,
|
|
||||||
topic_prefix = Prefix,
|
|
||||||
topic_suffix = Suffix}) ->
|
topic_suffix = Suffix}) ->
|
||||||
Msg1 =
|
Msg#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.
|
||||||
if
|
|
||||||
Qos =:= undefined -> Msg;
|
|
||||||
true -> Msg#mqtt_message{qos = Qos}
|
|
||||||
end,
|
|
||||||
Msg1#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.
|
|
||||||
|
|
||||||
|
|
|
@ -186,7 +186,7 @@ foldl_hooks(Hook, Args, Acc0) ->
|
||||||
case ets:lookup(?BROKER_TAB, {hook, Hook}) of
|
case ets:lookup(?BROKER_TAB, {hook, Hook}) of
|
||||||
[{_, Hooks}] ->
|
[{_, Hooks}] ->
|
||||||
lists:foldl(fun({_Name, {M, F, A}}, Acc) ->
|
lists:foldl(fun({_Name, {M, F, A}}, Acc) ->
|
||||||
apply(M, F, [Acc, Args++A])
|
apply(M, F, lists:append([Args, [Acc], A]))
|
||||||
end, Acc0, Hooks);
|
end, Acc0, Hooks);
|
||||||
[] ->
|
[] ->
|
||||||
Acc0
|
Acc0
|
||||||
|
@ -286,23 +286,15 @@ create_topic(Topic) ->
|
||||||
|
|
||||||
retain(brokers) ->
|
retain(brokers) ->
|
||||||
Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")),
|
Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")),
|
||||||
publish(#mqtt_message{from = broker,
|
Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload),
|
||||||
retain = true,
|
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
|
||||||
topic = <<"$SYS/brokers">>,
|
|
||||||
payload = Payload}).
|
|
||||||
|
|
||||||
retain(Topic, Payload) when is_binary(Payload) ->
|
retain(Topic, Payload) when is_binary(Payload) ->
|
||||||
publish(#mqtt_message{from = broker,
|
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
|
||||||
retain = true,
|
emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg)).
|
||||||
topic = emqttd_topic:systop(Topic),
|
|
||||||
payload = Payload}).
|
|
||||||
|
|
||||||
publish(Topic, Payload) when is_binary(Payload) ->
|
publish(Topic, Payload) when is_binary(Payload) ->
|
||||||
publish( #mqtt_message{from = broker,
|
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
|
||||||
topic = emqttd_topic:systop(Topic),
|
|
||||||
payload = Payload}).
|
|
||||||
|
|
||||||
publish(Msg) ->
|
|
||||||
emqttd_pubsub:publish(Msg).
|
emqttd_pubsub:publish(Msg).
|
||||||
|
|
||||||
uptime(#state{started_at = Ts}) ->
|
uptime(#state{started_at = Ts}) ->
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
|
||||||
|
%%%
|
||||||
|
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
%%% of this software and associated documentation files (the "Software"), to deal
|
||||||
|
%%% in the Software without restriction, including without limitation the rights
|
||||||
|
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
%%% copies of the Software, and to permit persons to whom the Software is
|
||||||
|
%%% furnished to do so, subject to the following conditions:
|
||||||
|
%%%
|
||||||
|
%%% The above copyright notice and this permission notice shall be included in all
|
||||||
|
%%% copies or substantial portions of the Software.
|
||||||
|
%%%
|
||||||
|
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
%%% SOFTWARE.
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% Generate global unique id for mqtt message.
|
||||||
|
%%%
|
||||||
|
%%% --------------------------------------------------------
|
||||||
|
%%% | Timestamp | NodeID + PID | Sequence |
|
||||||
|
%%% |<------- 64bits ------->|<--- 48bits --->|<- 16bits ->|
|
||||||
|
%%% --------------------------------------------------------
|
||||||
|
%%%
|
||||||
|
%%% 1. Timestamp: erlang:system_time if Erlang >= R18, otherwise os:timestamp
|
||||||
|
%%% 2. NodeId: encode node() to 2 bytes integer
|
||||||
|
%%% 3. Pid: encode pid to 4 bytes integer
|
||||||
|
%%% 4. Sequence: 2 bytes sequence no per pid
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqttd_guid).
|
||||||
|
|
||||||
|
-export([gen/0]).
|
||||||
|
|
||||||
|
-define(MAX_SEQ, 16#FFFF).
|
||||||
|
|
||||||
|
-type guid() :: <<_:128>>.
|
||||||
|
|
||||||
|
-spec gen() -> guid().
|
||||||
|
gen() ->
|
||||||
|
Guid = case get(guid) of
|
||||||
|
undefined -> new();
|
||||||
|
{_Ts, NPid, Seq} -> next(NPid, Seq)
|
||||||
|
end,
|
||||||
|
put(guid, Guid), enc(Guid).
|
||||||
|
|
||||||
|
new() ->
|
||||||
|
{ts(), npid(), 0}.
|
||||||
|
|
||||||
|
next(NPid, Seq) when Seq >= ?MAX_SEQ ->
|
||||||
|
{ts(), NPid, 0};
|
||||||
|
next(NPid, Seq) ->
|
||||||
|
{ts(), NPid, Seq + 1}.
|
||||||
|
|
||||||
|
enc({Ts, NPid, Seq}) ->
|
||||||
|
<<Ts:64, NPid:48, Seq:16>>.
|
||||||
|
|
||||||
|
ts() ->
|
||||||
|
case erlang:function_exported(erlang, system_time, 1) of
|
||||||
|
true -> %% R18
|
||||||
|
erlang:system_time(micro_seconds);
|
||||||
|
false ->
|
||||||
|
{MegaSeconds, Seconds, MicroSeconds} = os:timestamp(),
|
||||||
|
(MegaSeconds * 1000000 + Seconds) * 1000000 + MicroSeconds
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Copied from https://github.com/okeuday/uuid.git.
|
||||||
|
npid() ->
|
||||||
|
<<NodeD01, NodeD02, NodeD03, NodeD04, NodeD05,
|
||||||
|
NodeD06, NodeD07, NodeD08, NodeD09, NodeD10,
|
||||||
|
NodeD11, NodeD12, NodeD13, NodeD14, NodeD15,
|
||||||
|
NodeD16, NodeD17, NodeD18, NodeD19, NodeD20>> =
|
||||||
|
crypto:hash(sha, erlang:list_to_binary(erlang:atom_to_list(node()))),
|
||||||
|
|
||||||
|
% later, when the pid format changes, handle the different format
|
||||||
|
ExternalTermFormatVersion = 131,
|
||||||
|
PidExtType = 103,
|
||||||
|
<<ExternalTermFormatVersion:8,
|
||||||
|
PidExtType:8,
|
||||||
|
PidBin/binary>> = erlang:term_to_binary(self()),
|
||||||
|
% 72 bits for the Erlang pid
|
||||||
|
<<PidID1:8, PidID2:8, PidID3:8, PidID4:8, % ID (Node specific, 15 bits)
|
||||||
|
PidSR1:8, PidSR2:8, PidSR3:8, PidSR4:8, % Serial (extra uniqueness)
|
||||||
|
PidCR1:8 % Node Creation Count
|
||||||
|
>> = binary:part(PidBin, erlang:byte_size(PidBin), -9),
|
||||||
|
|
||||||
|
% reduce the 160 bit NodeData checksum to 16 bits
|
||||||
|
NodeByte1 = ((((((((NodeD01 bxor NodeD02)
|
||||||
|
bxor NodeD03)
|
||||||
|
bxor NodeD04)
|
||||||
|
bxor NodeD05)
|
||||||
|
bxor NodeD06)
|
||||||
|
bxor NodeD07)
|
||||||
|
bxor NodeD08)
|
||||||
|
bxor NodeD09)
|
||||||
|
bxor NodeD10,
|
||||||
|
NodeByte2 = (((((((((NodeD11 bxor NodeD12)
|
||||||
|
bxor NodeD13)
|
||||||
|
bxor NodeD14)
|
||||||
|
bxor NodeD15)
|
||||||
|
bxor NodeD16)
|
||||||
|
bxor NodeD17)
|
||||||
|
bxor NodeD18)
|
||||||
|
bxor NodeD19)
|
||||||
|
bxor NodeD20)
|
||||||
|
bxor PidCR1,
|
||||||
|
|
||||||
|
% reduce the Erlang pid to 32 bits
|
||||||
|
PidByte1 = PidID1 bxor PidSR4,
|
||||||
|
PidByte2 = PidID2 bxor PidSR3,
|
||||||
|
PidByte3 = PidID3 bxor PidSR2,
|
||||||
|
PidByte4 = PidID4 bxor PidSR1,
|
||||||
|
|
||||||
|
<<NPid:48>> = <<NodeByte1:8, NodeByte2:8,
|
||||||
|
PidByte1:8, PidByte2:8,
|
||||||
|
PidByte3:8, PidByte4:8>>,
|
||||||
|
NPid.
|
||||||
|
|
|
@ -49,14 +49,11 @@ handle_request('POST', "/mqtt/publish", Req) ->
|
||||||
Qos = int(get_value("qos", Params, "0")),
|
Qos = int(get_value("qos", Params, "0")),
|
||||||
Retain = bool(get_value("retain", Params, "0")),
|
Retain = bool(get_value("retain", Params, "0")),
|
||||||
Topic = list_to_binary(get_value("topic", Params)),
|
Topic = list_to_binary(get_value("topic", Params)),
|
||||||
Message = list_to_binary(get_value("message", Params)),
|
Payload = list_to_binary(get_value("message", Params)),
|
||||||
case {validate(qos, Qos), validate(topic, Topic)} of
|
case {validate(qos, Qos), validate(topic, Topic)} of
|
||||||
{true, true} ->
|
{true, true} ->
|
||||||
emqttd_pubsub:publish(#mqtt_message{from = http,
|
Msg = emqttd_message:make(http, Qos, Topic, Payload),
|
||||||
qos = Qos,
|
emqttd_pubsub:publish(Msg#mqtt_message{retain = Retain}),
|
||||||
retain = Retain,
|
|
||||||
topic = Topic,
|
|
||||||
payload = Message}),
|
|
||||||
Req:ok({"text/plan", <<"ok\n">>});
|
Req:ok({"text/plan", <<"ok\n">>});
|
||||||
{false, _} ->
|
{false, _} ->
|
||||||
Req:respond({400, [], <<"Bad QoS">>});
|
Req:respond({400, [], <<"Bad QoS">>});
|
||||||
|
|
|
@ -32,12 +32,39 @@
|
||||||
|
|
||||||
-include("emqttd_protocol.hrl").
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-export([from_packet/1, from_packet/2, to_packet/1]).
|
-export([make/3, make/4, from_packet/1, from_packet/2, to_packet/1]).
|
||||||
|
|
||||||
-export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
|
-export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
|
||||||
|
|
||||||
-export([format/1]).
|
-export([format/1]).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Make a message
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec make(From, Topic, Payload) -> mqtt_message() when
|
||||||
|
From :: atom() | binary(),
|
||||||
|
Topic :: binary(),
|
||||||
|
Payload :: binary().
|
||||||
|
make(From, Topic, Payload) ->
|
||||||
|
#mqtt_message{topic = Topic,
|
||||||
|
from = From,
|
||||||
|
payload = Payload,
|
||||||
|
timestamp = os:timestamp()}.
|
||||||
|
|
||||||
|
-spec make(From, Qos, Topic, Payload) -> mqtt_message() when
|
||||||
|
From :: atom() | binary(),
|
||||||
|
Qos :: mqtt_qos(),
|
||||||
|
Topic :: binary(),
|
||||||
|
Payload :: binary().
|
||||||
|
make(From, Qos, Topic, Payload) ->
|
||||||
|
#mqtt_message{msgid = msgid(Qos),
|
||||||
|
topic = Topic,
|
||||||
|
from = From,
|
||||||
|
qos = Qos,
|
||||||
|
payload = Payload,
|
||||||
|
timestamp = os:timestamp()}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Message from Packet
|
%% @doc Message from Packet
|
||||||
%% @end
|
%% @end
|
||||||
|
@ -50,12 +77,14 @@ from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
variable = #mqtt_packet_publish{topic_name = Topic,
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
||||||
packet_id = PacketId},
|
packet_id = PacketId},
|
||||||
payload = Payload}) ->
|
payload = Payload}) ->
|
||||||
#mqtt_message{msgid = PacketId,
|
#mqtt_message{msgid = msgid(Qos),
|
||||||
|
pktid = PacketId,
|
||||||
qos = Qos,
|
qos = Qos,
|
||||||
retain = Retain,
|
retain = Retain,
|
||||||
dup = Dup,
|
dup = Dup,
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
payload = Payload};
|
payload = Payload,
|
||||||
|
timestamp = os:timestamp()};
|
||||||
|
|
||||||
from_packet(#mqtt_packet_connect{will_flag = false}) ->
|
from_packet(#mqtt_packet_connect{will_flag = false}) ->
|
||||||
undefined;
|
undefined;
|
||||||
|
@ -64,38 +93,44 @@ from_packet(#mqtt_packet_connect{will_retain = Retain,
|
||||||
will_qos = Qos,
|
will_qos = Qos,
|
||||||
will_topic = Topic,
|
will_topic = Topic,
|
||||||
will_msg = Msg}) ->
|
will_msg = Msg}) ->
|
||||||
#mqtt_message{retain = Retain,
|
#mqtt_message{msgid = msgid(Qos),
|
||||||
qos = Qos,
|
topic = Topic,
|
||||||
topic = Topic,
|
retain = Retain,
|
||||||
dup = false,
|
qos = Qos,
|
||||||
payload = Msg}.
|
dup = false,
|
||||||
|
payload = Msg,
|
||||||
|
timestamp = os:timestamp()}.
|
||||||
|
|
||||||
from_packet(ClientId, Packet) ->
|
from_packet(ClientId, Packet) ->
|
||||||
Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}.
|
Msg = from_packet(Packet), Msg#mqtt_message{from = ClientId}.
|
||||||
|
|
||||||
|
msgid(?QOS_0) ->
|
||||||
|
undefined;
|
||||||
|
msgid(_Qos) ->
|
||||||
|
emqttd_guid:gen().
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Message to packet
|
%% @doc Message to packet
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec to_packet(mqtt_message()) -> mqtt_packet().
|
-spec to_packet(mqtt_message()) -> mqtt_packet().
|
||||||
to_packet(#mqtt_message{msgid = MsgId,
|
to_packet(#mqtt_message{pktid = PkgId,
|
||||||
qos = Qos,
|
qos = Qos,
|
||||||
retain = Retain,
|
retain = Retain,
|
||||||
dup = Dup,
|
dup = Dup,
|
||||||
topic = Topic,
|
topic = Topic,
|
||||||
payload = Payload}) ->
|
payload = Payload}) ->
|
||||||
|
|
||||||
PacketId = if
|
|
||||||
Qos =:= ?QOS_0 -> undefined;
|
|
||||||
true -> MsgId
|
|
||||||
end,
|
|
||||||
|
|
||||||
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
qos = Qos,
|
qos = Qos,
|
||||||
retain = Retain,
|
retain = Retain,
|
||||||
dup = Dup},
|
dup = Dup},
|
||||||
variable = #mqtt_packet_publish{topic_name = Topic,
|
variable = #mqtt_packet_publish{topic_name = Topic,
|
||||||
packet_id = PacketId},
|
packet_id = if
|
||||||
|
Qos =:= ?QOS_0 -> undefined;
|
||||||
|
true -> PkgId
|
||||||
|
end
|
||||||
|
},
|
||||||
payload = Payload}.
|
payload = Payload}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -109,6 +144,8 @@ set_flag(Msg) ->
|
||||||
-spec set_flag(atom(), mqtt_message()) -> mqtt_message().
|
-spec set_flag(atom(), mqtt_message()) -> mqtt_message().
|
||||||
set_flag(dup, Msg = #mqtt_message{dup = false}) ->
|
set_flag(dup, Msg = #mqtt_message{dup = false}) ->
|
||||||
Msg#mqtt_message{dup = true};
|
Msg#mqtt_message{dup = true};
|
||||||
|
set_flag(sys, Msg = #mqtt_message{sys = false}) ->
|
||||||
|
Msg#mqtt_message{sys = true};
|
||||||
set_flag(retain, Msg = #mqtt_message{retain = false}) ->
|
set_flag(retain, Msg = #mqtt_message{retain = false}) ->
|
||||||
Msg#mqtt_message{retain = true};
|
Msg#mqtt_message{retain = true};
|
||||||
set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
|
set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
|
||||||
|
@ -133,7 +170,7 @@ unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
|
||||||
%% @doc Format MQTT Message
|
%% @doc Format MQTT Message
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
format(#mqtt_message{msgid=MsgId, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) ->
|
format(#mqtt_message{msgid=MsgId, pktid = PktId, qos=Qos, retain=Retain, dup=Dup, topic=Topic}) ->
|
||||||
io_lib:format("Message(MsgId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
|
io_lib:format("Message(MsgId=~p, PktId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
|
||||||
[MsgId, Qos, Retain, Dup, Topic]).
|
[MsgId, PktId, Qos, Retain, Dup, Topic]).
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@
|
||||||
-define(SYSTOP_MESSAGES, [
|
-define(SYSTOP_MESSAGES, [
|
||||||
{counter, 'messages/received'}, % Messages received
|
{counter, 'messages/received'}, % Messages received
|
||||||
{counter, 'messages/sent'}, % Messages sent
|
{counter, 'messages/sent'}, % Messages sent
|
||||||
{gauge, 'messages/retained/count'},% Messagea retained
|
{gauge, 'messages/retained'}, % Messagea retained
|
||||||
{gauge, 'messages/stored/count'}, % Messages stored
|
{gauge, 'messages/stored/count'}, % Messages stored
|
||||||
{counter, 'messages/dropped'} % Messages dropped
|
{counter, 'messages/dropped'} % Messages dropped
|
||||||
]).
|
]).
|
||||||
|
@ -222,9 +222,9 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
publish(Metric, Val) ->
|
publish(Metric, Val) ->
|
||||||
emqttd_pubsub:publish(#mqtt_message{topic = metric_topic(Metric),
|
Payload = emqttd_util:integer_to_binary(Val),
|
||||||
from = metrics,
|
Msg = emqttd_message:make(metrics, metric_topic(Metric), Payload),
|
||||||
payload = emqttd_util:integer_to_binary(Val)}).
|
emqttd_pubsub:publish(Msg).
|
||||||
|
|
||||||
create_metric({gauge, Name}) ->
|
create_metric({gauge, Name}) ->
|
||||||
ets:insert(?METRIC_TAB, {{Name, 0}, 0});
|
ets:insert(?METRIC_TAB, {{Name, 0}, 0});
|
||||||
|
|
|
@ -41,11 +41,11 @@
|
||||||
|
|
||||||
load(Opts) ->
|
load(Opts) ->
|
||||||
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
|
Topics = [{list_to_binary(Topic), Qos} || {Topic, Qos} <- Opts, 0 =< Qos, Qos =< 2],
|
||||||
emqttd_broker:hook(client_connected, {?MODULE, client_connected},
|
emqttd_broker:hook('client.connected', {?MODULE, client_connected},
|
||||||
{?MODULE, client_connected, [Topics]}),
|
{?MODULE, client_connected, [Topics]}),
|
||||||
{ok, #state{topics = Topics}}.
|
{ok, #state{topics = Topics}}.
|
||||||
|
|
||||||
client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) ->
|
client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) ->
|
||||||
F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end,
|
F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end,
|
||||||
ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]};
|
ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]};
|
||||||
|
|
||||||
|
@ -53,6 +53,5 @@ client_connected(_ConnAck, _Client, _Topics) ->
|
||||||
ignore.
|
ignore.
|
||||||
|
|
||||||
unload(_Opts) ->
|
unload(_Opts) ->
|
||||||
emqttd_broker:unhook(client_connected, {?MODULE, client_connected}).
|
emqttd_broker:unhook('client.connected', {?MODULE, client_connected}).
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -35,11 +35,11 @@
|
||||||
-export([client_connected/3, client_disconnected/3]).
|
-export([client_connected/3, client_disconnected/3]).
|
||||||
|
|
||||||
load(Opts) ->
|
load(Opts) ->
|
||||||
emqttd_broker:hook(client_connected, {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}),
|
emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}),
|
||||||
emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}),
|
emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}),
|
||||||
{ok, Opts}.
|
{ok, Opts}.
|
||||||
|
|
||||||
client_connected(ConnAck, #mqtt_client{clientid = ClientId,
|
client_connected(ConnAck, #mqtt_client{client_id = ClientId,
|
||||||
username = Username,
|
username = Username,
|
||||||
ipaddress = IpAddress,
|
ipaddress = IpAddress,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
|
@ -55,24 +55,25 @@ client_connected(ConnAck, #mqtt_client{clientid = ClientId,
|
||||||
{protocol, ProtoVer},
|
{protocol, ProtoVer},
|
||||||
{connack, ConnAck},
|
{connack, ConnAck},
|
||||||
{ts, emqttd_util:now_to_secs()}]),
|
{ts, emqttd_util:now_to_secs()}]),
|
||||||
Message = #mqtt_message{from = presence,
|
Msg = emqttd_message:make(presence,
|
||||||
qos = proplists:get_value(qos, Opts, 0),
|
proplists:get_value(qos, Opts, 0),
|
||||||
topic = topic(connected, ClientId),
|
topic(connected, ClientId),
|
||||||
payload = iolist_to_binary(Json)},
|
iolist_to_binary(Json)),
|
||||||
emqttd_pubsub:publish(Message).
|
emqttd_pubsub:publish(Msg).
|
||||||
|
|
||||||
client_disconnected(Reason, ClientId, Opts) ->
|
client_disconnected(Reason, ClientId, Opts) ->
|
||||||
Json = mochijson2:encode([{clientid, ClientId},
|
Json = mochijson2:encode([{clientid, ClientId},
|
||||||
{reason, reason(Reason)},
|
{reason, reason(Reason)},
|
||||||
{ts, emqttd_util:now_to_secs()}]),
|
{ts, emqttd_util:now_to_secs()}]),
|
||||||
emqttd_pubsub:publish(#mqtt_message{from = presence,
|
Msg = emqttd_message:make(presence,
|
||||||
qos = proplists:get_value(qos, Opts, 0),
|
proplists:get_value(qos, Opts, 0),
|
||||||
topic = topic(disconnected, ClientId),
|
topic(disconnected, ClientId),
|
||||||
payload = iolist_to_binary(Json)}).
|
iolist_to_binary(Json)),
|
||||||
|
emqttd_pubsub:publish(Msg).
|
||||||
|
|
||||||
unload(_Opts) ->
|
unload(_Opts) ->
|
||||||
emqttd_broker:unhook(client_connected, {?MODULE, client_connected}),
|
emqttd_broker:unhook('client.connected', {?MODULE, client_connected}),
|
||||||
emqttd_broker:unhook(client_disconnected, {?MODULE, client_disconnected}).
|
emqttd_broker:unhook('client.disconnected', {?MODULE, client_disconnected}).
|
||||||
|
|
||||||
topic(connected, ClientId) ->
|
topic(connected, ClientId) ->
|
||||||
emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"]));
|
emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"]));
|
||||||
|
|
|
@ -35,7 +35,7 @@
|
||||||
|
|
||||||
-export([load/1, reload/1, unload/1]).
|
-export([load/1, reload/1, unload/1]).
|
||||||
|
|
||||||
-export([rewrite/2]).
|
-export([rewrite/3, rewrite/4]).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -45,22 +45,22 @@ load(Opts) ->
|
||||||
File = proplists:get_value(file, Opts),
|
File = proplists:get_value(file, Opts),
|
||||||
{ok, Terms} = file:consult(File),
|
{ok, Terms} = file:consult(File),
|
||||||
Sections = compile(Terms),
|
Sections = compile(Terms),
|
||||||
emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe},
|
emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe},
|
||||||
{?MODULE, rewrite, [subscribe, Sections]}),
|
{?MODULE, rewrite, [subscribe, Sections]}),
|
||||||
emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe},
|
emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe},
|
||||||
{?MODULE, rewrite, [unsubscribe, Sections]}),
|
{?MODULE, rewrite, [unsubscribe, Sections]}),
|
||||||
emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish},
|
emqttd_broker:hook('client.publish', {?MODULE, rewrite_publish},
|
||||||
{?MODULE, rewrite, [publish, Sections]}).
|
{?MODULE, rewrite, [publish, Sections]}).
|
||||||
|
|
||||||
rewrite(TopicTable, [subscribe, Sections]) ->
|
rewrite(_ClientId, TopicTable, subscribe, Sections) ->
|
||||||
lager:info("rewrite subscribe: ~p", [TopicTable]),
|
lager:info("rewrite subscribe: ~p", [TopicTable]),
|
||||||
[{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable];
|
[{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable];
|
||||||
|
|
||||||
rewrite(Topics, [unsubscribe, Sections]) ->
|
rewrite(_ClientId, Topics, unsubscribe, Sections) ->
|
||||||
lager:info("rewrite unsubscribe: ~p", [Topics]),
|
lager:info("rewrite unsubscribe: ~p", [Topics]),
|
||||||
[match_topic(Topic, Sections) || Topic <- Topics];
|
[match_topic(Topic, Sections) || Topic <- Topics].
|
||||||
|
|
||||||
rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) ->
|
rewrite(Message=#mqtt_message{topic = Topic}, publish, Sections) ->
|
||||||
%%TODO: this will not work if the client is always online.
|
%%TODO: this will not work if the client is always online.
|
||||||
RewriteTopic =
|
RewriteTopic =
|
||||||
case get({rewrite, Topic}) of
|
case get({rewrite, Topic}) of
|
||||||
|
@ -83,9 +83,9 @@ reload(File) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
unload(_) ->
|
unload(_) ->
|
||||||
emqttd_broker:unhook(client_subscribe, {?MODULE, rewrite_subscribe}),
|
emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}),
|
||||||
emqttd_broker:unhook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}),
|
emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
|
||||||
emqttd_broker:unhook(client_publish, {?MODULE, rewrite_publish}).
|
emqttd_broker:unhook('client.publish', {?MODULE, rewrite_publish}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
|
@ -116,7 +116,6 @@ match_rule(Topic, []) ->
|
||||||
match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
|
match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
|
||||||
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
|
case re:run(Topic, MP, [{capture, all_but_first, list}]) of
|
||||||
{match, Captured} ->
|
{match, Captured} ->
|
||||||
%%TODO: stupid??? how to replace $1, $2?
|
|
||||||
Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured),
|
Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured),
|
||||||
iolist_to_binary(lists:foldl(
|
iolist_to_binary(lists:foldl(
|
||||||
fun({Var, Val}, Acc) ->
|
fun({Var, Val}, Acc) ->
|
||||||
|
|
|
@ -96,7 +96,7 @@ format_variable(#mqtt_packet_connect{
|
||||||
will_flag = WillFlag,
|
will_flag = WillFlag,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
keep_alive = KeepAlive,
|
keep_alive = KeepAlive,
|
||||||
clientid = ClientId,
|
client_id = ClientId,
|
||||||
will_topic = WillTopic,
|
will_topic = WillTopic,
|
||||||
will_msg = WillMsg,
|
will_msg = WillMsg,
|
||||||
username = Username,
|
username = Username,
|
||||||
|
|
|
@ -112,7 +112,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length)
|
||||||
will_flag = bool(WillFlag),
|
will_flag = bool(WillFlag),
|
||||||
clean_sess = bool(CleanSession),
|
clean_sess = bool(CleanSession),
|
||||||
keep_alive = KeepAlive,
|
keep_alive = KeepAlive,
|
||||||
clientid = ClientId,
|
client_id = ClientId,
|
||||||
will_topic = WillTopic,
|
will_topic = WillTopic,
|
||||||
will_msg = WillMsg,
|
will_msg = WillMsg,
|
||||||
username = UserName,
|
username = UserName,
|
||||||
|
|
|
@ -47,7 +47,7 @@
|
||||||
proto_ver,
|
proto_ver,
|
||||||
proto_name,
|
proto_name,
|
||||||
username,
|
username,
|
||||||
clientid,
|
client_id,
|
||||||
clean_sess,
|
clean_sess,
|
||||||
session,
|
session,
|
||||||
will_msg,
|
will_msg,
|
||||||
|
@ -70,25 +70,25 @@ init(Peername, SendFun, Opts) ->
|
||||||
|
|
||||||
info(#proto_state{proto_ver = ProtoVer,
|
info(#proto_state{proto_ver = ProtoVer,
|
||||||
proto_name = ProtoName,
|
proto_name = ProtoName,
|
||||||
clientid = ClientId,
|
client_id = ClientId,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
will_msg = WillMsg}) ->
|
will_msg = WillMsg}) ->
|
||||||
[{proto_ver, ProtoVer},
|
[{proto_ver, ProtoVer},
|
||||||
{proto_name, ProtoName},
|
{proto_name, ProtoName},
|
||||||
{clientid, ClientId},
|
{client_id, ClientId},
|
||||||
{clean_sess, CleanSess},
|
{clean_sess, CleanSess},
|
||||||
{will_msg, WillMsg}].
|
{will_msg, WillMsg}].
|
||||||
|
|
||||||
clientid(#proto_state{clientid = ClientId}) ->
|
clientid(#proto_state{client_id = ClientId}) ->
|
||||||
ClientId.
|
ClientId.
|
||||||
|
|
||||||
client(#proto_state{peername = {Addr, _Port},
|
client(#proto_state{peername = {Addr, _Port},
|
||||||
clientid = ClientId,
|
client_id = ClientId,
|
||||||
username = Username,
|
username = Username,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
client_pid = Pid}) ->
|
client_pid = Pid}) ->
|
||||||
#mqtt_client{clientid = ClientId,
|
#mqtt_client{client_id = ClientId,
|
||||||
username = Username,
|
username = Username,
|
||||||
ipaddress = Addr,
|
ipaddress = Addr,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
|
@ -126,12 +126,12 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
|
||||||
password = Password,
|
password = Password,
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
keep_alive = KeepAlive,
|
keep_alive = KeepAlive,
|
||||||
clientid = ClientId} = Var,
|
client_id = ClientId} = Var,
|
||||||
|
|
||||||
State1 = State0#proto_state{proto_ver = ProtoVer,
|
State1 = State0#proto_state{proto_ver = ProtoVer,
|
||||||
proto_name = ProtoName,
|
proto_name = ProtoName,
|
||||||
username = Username,
|
username = Username,
|
||||||
clientid = ClientId,
|
client_id = ClientId,
|
||||||
clean_sess = CleanSess},
|
clean_sess = CleanSess},
|
||||||
|
|
||||||
trace(recv, Packet, State1),
|
trace(recv, Packet, State1),
|
||||||
|
@ -142,7 +142,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
|
||||||
case emqttd_access_control:auth(client(State1), Password) of
|
case emqttd_access_control:auth(client(State1), Password) of
|
||||||
ok ->
|
ok ->
|
||||||
%% Generate clientId if null
|
%% Generate clientId if null
|
||||||
State2 = State1#proto_state{clientid = clientid(ClientId, State1)},
|
State2 = State1#proto_state{client_id = clientid(ClientId, State1)},
|
||||||
|
|
||||||
%%Starting session
|
%%Starting session
|
||||||
{ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)),
|
{ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)),
|
||||||
|
@ -167,7 +167,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
|
||||||
send(?CONNACK_PACKET(ReturnCode1), State3);
|
send(?CONNACK_PACKET(ReturnCode1), State3);
|
||||||
|
|
||||||
handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
|
handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
|
||||||
State = #proto_state{clientid = ClientId}) ->
|
State = #proto_state{client_id = ClientId}) ->
|
||||||
|
|
||||||
case check_acl(publish, Topic, State) of
|
case check_acl(publish, Topic, State) of
|
||||||
allow ->
|
allow ->
|
||||||
|
@ -199,7 +199,7 @@ handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessio
|
||||||
handle(?SUBSCRIBE_PACKET(PacketId, []), State) ->
|
handle(?SUBSCRIBE_PACKET(PacketId, []), State) ->
|
||||||
send(?SUBACK_PACKET(PacketId, []), State);
|
send(?SUBACK_PACKET(PacketId, []), State);
|
||||||
|
|
||||||
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = ClientId, session = Session}) ->
|
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = ClientId, session = Session}) ->
|
||||||
AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
|
AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
|
||||||
case lists:member(deny, AllowDenies) of
|
case lists:member(deny, AllowDenies) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -207,7 +207,7 @@ handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid =
|
||||||
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
|
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
false ->
|
false ->
|
||||||
TopicTable1 = emqttd_broker:foldl_hooks(client_subscribe, [], TopicTable),
|
TopicTable1 = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable),
|
||||||
%%TODO: GrantedQos should be renamed.
|
%%TODO: GrantedQos should be renamed.
|
||||||
{ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1),
|
{ok, GrantedQos} = emqttd_session:subscribe(Session, TopicTable1),
|
||||||
send(?SUBACK_PACKET(PacketId, GrantedQos), State)
|
send(?SUBACK_PACKET(PacketId, GrantedQos), State)
|
||||||
|
@ -221,8 +221,9 @@ handle({subscribe, TopicTable}, State = #proto_state{session = Session}) ->
|
||||||
handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
|
handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
|
||||||
send(?UNSUBACK_PACKET(PacketId), State);
|
send(?UNSUBACK_PACKET(PacketId), State);
|
||||||
|
|
||||||
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
|
handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{client_id = ClientId,
|
||||||
Topics1 = emqttd_broker:foldl_hooks(client_unsubscribe, [], Topics),
|
session = Session}) ->
|
||||||
|
Topics1 = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics),
|
||||||
ok = emqttd_session:unsubscribe(Session, Topics1),
|
ok = emqttd_session:unsubscribe(Session, Topics1),
|
||||||
send(?UNSUBACK_PACKET(PacketId), State);
|
send(?UNSUBACK_PACKET(PacketId), State);
|
||||||
|
|
||||||
|
@ -233,10 +234,10 @@ handle(?PACKET(?DISCONNECT), State) ->
|
||||||
% clean willmsg
|
% clean willmsg
|
||||||
{stop, normal, State#proto_state{will_msg = undefined}}.
|
{stop, normal, State#proto_state{will_msg = undefined}}.
|
||||||
|
|
||||||
publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{clientid = ClientId, session = Session}) ->
|
publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) ->
|
||||||
emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet));
|
emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet));
|
||||||
|
|
||||||
publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{clientid = ClientId, session = Session}) ->
|
publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
|
||||||
case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
|
case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
|
||||||
ok ->
|
ok ->
|
||||||
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
|
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
|
||||||
|
@ -245,7 +246,7 @@ publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{clientid = Cli
|
||||||
lager:error("Client ~s: publish qos1 error ~p", [ClientId, Error])
|
lager:error("Client ~s: publish qos1 error ~p", [ClientId, Error])
|
||||||
end;
|
end;
|
||||||
|
|
||||||
publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{clientid = ClientId, session = Session}) ->
|
publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
|
||||||
case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
|
case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
|
||||||
ok ->
|
ok ->
|
||||||
send(?PUBACK_PACKET(?PUBREC, PacketId), State);
|
send(?PUBACK_PACKET(?PUBREC, PacketId), State);
|
||||||
|
@ -267,11 +268,11 @@ send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when
|
||||||
SendFun(Data),
|
SendFun(Data),
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
|
trace(recv, Packet, #proto_state{peername = Peername, client_id = ClientId}) ->
|
||||||
lager:info([{client, ClientId}], "RECV from ~s@~s: ~s",
|
lager:info([{client, ClientId}], "RECV from ~s@~s: ~s",
|
||||||
[ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]);
|
[ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]);
|
||||||
|
|
||||||
trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
|
trace(send, Packet, #proto_state{peername = Peername, client_id = ClientId}) ->
|
||||||
lager:info([{client, ClientId}], "SEND to ~s@~s: ~s",
|
lager:info([{client, ClientId}], "SEND to ~s@~s: ~s",
|
||||||
[ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]).
|
[ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]).
|
||||||
|
|
||||||
|
@ -282,10 +283,10 @@ redeliver({?PUBREL, PacketId}, State) ->
|
||||||
shutdown(duplicate_id, _State) ->
|
shutdown(duplicate_id, _State) ->
|
||||||
quiet; %%
|
quiet; %%
|
||||||
|
|
||||||
shutdown(_, #proto_state{clientid = undefined}) ->
|
shutdown(_, #proto_state{client_id = undefined}) ->
|
||||||
ignore;
|
ignore;
|
||||||
|
|
||||||
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->
|
shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) ->
|
||||||
lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p",
|
lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p",
|
||||||
[ClientId, emqttd_net:format(Peername), Error]),
|
[ClientId, emqttd_net:format(Peername), Error]),
|
||||||
send_willmsg(ClientId, WillMsg),
|
send_willmsg(ClientId, WillMsg),
|
||||||
|
@ -333,16 +334,16 @@ validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) ->
|
||||||
validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
|
validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
|
||||||
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
|
||||||
|
|
||||||
validate_clientid(#mqtt_packet_connect{clientid = ClientId}, #proto_state{max_clientid_len = MaxLen})
|
validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen})
|
||||||
when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) ->
|
when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) ->
|
||||||
true;
|
true;
|
||||||
|
|
||||||
%% MQTT3.1.1 allow null clientId.
|
%% MQTT3.1.1 allow null clientId.
|
||||||
validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, clientid = ClientId}, _ProtoState)
|
validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState)
|
||||||
when size(ClientId) =:= 0 ->
|
when size(ClientId) =:= 0 ->
|
||||||
true;
|
true;
|
||||||
|
|
||||||
validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, clientid = ClientId}, _ProtoState) ->
|
validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) ->
|
||||||
lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]),
|
lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]),
|
||||||
false.
|
false.
|
||||||
|
|
||||||
|
|
|
@ -157,19 +157,18 @@ cast(Msg) ->
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec publish(Msg :: mqtt_message()) -> ok.
|
-spec publish(Msg :: mqtt_message()) -> ok.
|
||||||
publish(#mqtt_message{topic=Topic, from = From} = Msg) ->
|
publish(#mqtt_message{from = From} = Msg) ->
|
||||||
trace(publish, From, Msg),
|
trace(publish, From, Msg),
|
||||||
|
|
||||||
%%TODO:call hooks here...
|
Msg1 = #mqtt_message{topic = Topic} = emqttd_broker:foldl_hooks('client.publish', [], Msg),
|
||||||
%%Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg),
|
|
||||||
|
|
||||||
%% Retain message first. Don't create retained topic.
|
%% Retain message first. Don't create retained topic.
|
||||||
case emqttd_msg_store:retain(Msg) of
|
case emqttd_retained:retain(Msg1) of
|
||||||
ok ->
|
ok ->
|
||||||
%TODO: why unset 'retain' flag?
|
%TODO: why unset 'retain' flag?
|
||||||
publish(Topic, emqttd_message:unset_flag(Msg));
|
publish(Topic, emqttd_message:unset_flag(Msg1));
|
||||||
ignore ->
|
ignore ->
|
||||||
publish(Topic, Msg)
|
publish(Topic, Msg1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
publish(<<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
|
publish(<<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_msg_store).
|
-module(emqttd_retained).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
|
||||||
|
@ -37,21 +37,23 @@
|
||||||
-copy_mnesia({mnesia, [copy]}).
|
-copy_mnesia({mnesia, [copy]}).
|
||||||
|
|
||||||
%% API Function Exports
|
%% API Function Exports
|
||||||
-export([retain/1, redeliver/2]).
|
-export([retain/1, dispatch/2]).
|
||||||
|
|
||||||
|
-record(mqtt_retained, {topic, message}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% Mnesia callbacks
|
%%% Mnesia callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
mnesia(boot) ->
|
mnesia(boot) ->
|
||||||
ok = emqttd_mnesia:create_table(message, [
|
ok = emqttd_mnesia:create_table(retained, [
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{record_name, mqtt_message},
|
{record_name, mqtt_retained},
|
||||||
{attributes, record_info(fields, mqtt_message)}]);
|
{attributes, record_info(fields, mqtt_retained)}]);
|
||||||
|
|
||||||
mnesia(copy) ->
|
mnesia(copy) ->
|
||||||
ok = emqttd_mnesia:copy_table(message).
|
ok = emqttd_mnesia:copy_table(retained).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -66,7 +68,7 @@ retain(#mqtt_message{retain = false}) -> ignore;
|
||||||
|
|
||||||
%% RETAIN flag set to 1 and payload containing zero bytes
|
%% RETAIN flag set to 1 and payload containing zero bytes
|
||||||
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
|
||||||
mnesia:async_dirty(fun mnesia:delete/1, [{message, Topic}]);
|
mnesia:async_dirty(fun mnesia:delete/1, [{retained, Topic}]);
|
||||||
|
|
||||||
retain(Msg = #mqtt_message{topic = Topic,
|
retain(Msg = #mqtt_message{topic = Topic,
|
||||||
retain = true,
|
retain = true,
|
||||||
|
@ -74,10 +76,10 @@ retain(Msg = #mqtt_message{topic = Topic,
|
||||||
TabSize = mnesia:table_info(message, size),
|
TabSize = mnesia:table_info(message, size),
|
||||||
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
case {TabSize < limit(table), size(Payload) < limit(payload)} of
|
||||||
{true, true} ->
|
{true, true} ->
|
||||||
|
Retained = #mqtt_retained{topic = Topic, message = Msg},
|
||||||
lager:debug("Retained ~s", [emqttd_message:format(Msg)]),
|
lager:debug("Retained ~s", [emqttd_message:format(Msg)]),
|
||||||
mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]),
|
mnesia:async_dirty(fun mnesia:write/3, [retained, Retained, write]),
|
||||||
emqttd_metrics:set('messages/retained/count',
|
emqttd_metrics:set('messages/retained', mnesia:table_info(retained, size));
|
||||||
mnesia:table_info(message, size));
|
|
||||||
{false, _}->
|
{false, _}->
|
||||||
lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
|
lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
|
||||||
{_, false}->
|
{_, false}->
|
||||||
|
@ -99,31 +101,25 @@ env() ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%% @doc Redeliver retained messages to subscribed client
|
%% @doc Deliver retained messages to subscribed client
|
||||||
%% @end
|
%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-spec redeliver(Topic, CPid) -> any() when
|
-spec dispatch(Topic, CPid) -> any() when
|
||||||
Topic :: binary(),
|
Topic :: binary(),
|
||||||
CPid :: pid().
|
CPid :: pid().
|
||||||
redeliver(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
|
dispatch(Topic, CPid) when is_binary(Topic) andalso is_pid(CPid) ->
|
||||||
|
Msgs =
|
||||||
case emqttd_topic:wildcard(Topic) of
|
case emqttd_topic:wildcard(Topic) of
|
||||||
false ->
|
false ->
|
||||||
dispatch(CPid, mnesia:dirty_read(message, Topic));
|
[Msg || #mqtt_retained{message = Msg} <- mnesia:dirty_read(retained, Topic)];
|
||||||
true ->
|
true ->
|
||||||
Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) ->
|
Fun = fun(#mqtt_retained{topic = Name, message = Msg}, Acc) ->
|
||||||
case emqttd_topic:match(Name, Topic) of
|
case emqttd_topic:match(Name, Topic) of
|
||||||
true -> [Msg|Acc];
|
true -> [Msg|Acc];
|
||||||
false -> Acc
|
false -> Acc
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]),
|
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained])
|
||||||
dispatch(CPid, lists:reverse(Msgs))
|
end,
|
||||||
end.
|
[CPid ! {dispatch, Msg} || Msg <- Msgs].
|
||||||
|
|
||||||
dispatch(_CPid, []) ->
|
|
||||||
ignore;
|
|
||||||
dispatch(CPid, Msgs) when is_list(Msgs) ->
|
|
||||||
[CPid ! {dispatch, Msg} || Msg <- Msgs];
|
|
||||||
dispatch(CPid, Msg) when is_record(Msg, mqtt_message) ->
|
|
||||||
CPid ! {dispatch, Msg}.
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ serialise_header(#mqtt_packet_header{type = Type,
|
||||||
VariableBin/binary,
|
VariableBin/binary,
|
||||||
PayloadBin/binary>>.
|
PayloadBin/binary>>.
|
||||||
|
|
||||||
serialise_variable(?CONNECT, #mqtt_packet_connect{clientid = ClientId,
|
serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
|
||||||
proto_ver = ProtoVer,
|
proto_ver = ProtoVer,
|
||||||
proto_name = ProtoName,
|
proto_name = ProtoName,
|
||||||
will_retain = WillRetain,
|
will_retain = WillRetain,
|
||||||
|
|
|
@ -73,13 +73,13 @@
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
|
|
||||||
%% ClientId: Identifier of Session
|
%% ClientId: Identifier of Session
|
||||||
clientid :: binary(),
|
client_id :: binary(),
|
||||||
|
|
||||||
%% Client Pid linked with session
|
%% Client Pid linked with session
|
||||||
client_pid :: pid(),
|
client_pid :: pid(),
|
||||||
|
|
||||||
%% Last message id of the session
|
%% Last packet id of the session
|
||||||
message_id = 1,
|
packet_id = 1,
|
||||||
|
|
||||||
%% Client’s subscriptions.
|
%% Client’s subscriptions.
|
||||||
subscriptions :: list(),
|
subscriptions :: list(),
|
||||||
|
@ -133,7 +133,7 @@
|
||||||
%% @doc Start a session.
|
%% @doc Start a session.
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_link(boolean(), mqtt_clientid(), pid()) -> {ok, pid()} | {error, any()}.
|
-spec start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}.
|
||||||
start_link(CleanSess, ClientId, ClientPid) ->
|
start_link(CleanSess, ClientId, ClientPid) ->
|
||||||
gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
|
gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ start_link(CleanSess, ClientId, ClientPid) ->
|
||||||
%% @doc Resume a session.
|
%% @doc Resume a session.
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec resume(pid(), mqtt_clientid(), pid()) -> ok.
|
-spec resume(pid(), mqtt_client_id(), pid()) -> ok.
|
||||||
resume(Session, ClientId, ClientPid) ->
|
resume(Session, ClientId, ClientPid) ->
|
||||||
gen_server:cast(Session, {resume, ClientId, ClientPid}).
|
gen_server:cast(Session, {resume, ClientId, ClientPid}).
|
||||||
|
|
||||||
|
@ -149,7 +149,7 @@ resume(Session, ClientId, ClientPid) ->
|
||||||
%% @doc Destroy a session.
|
%% @doc Destroy a session.
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec destroy(pid(), mqtt_clientid()) -> ok.
|
-spec destroy(pid(), mqtt_client_id()) -> ok.
|
||||||
destroy(Session, ClientId) ->
|
destroy(Session, ClientId) ->
|
||||||
gen_server:call(Session, {destroy, ClientId}).
|
gen_server:call(Session, {destroy, ClientId}).
|
||||||
|
|
||||||
|
@ -182,21 +182,21 @@ publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
|
||||||
%% @doc PubAck message
|
%% @doc PubAck message
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec puback(pid(), mqtt_msgid()) -> ok.
|
-spec puback(pid(), mqtt_packet_id()) -> ok.
|
||||||
puback(Session, MsgId) ->
|
puback(Session, PktId) ->
|
||||||
gen_server:cast(Session, {puback, MsgId}).
|
gen_server:cast(Session, {puback, PktId}).
|
||||||
|
|
||||||
-spec pubrec(pid(), mqtt_msgid()) -> ok.
|
-spec pubrec(pid(), mqtt_packet_id()) -> ok.
|
||||||
pubrec(Session, MsgId) ->
|
pubrec(Session, PktId) ->
|
||||||
gen_server:cast(Session, {pubrec, MsgId}).
|
gen_server:cast(Session, {pubrec, PktId}).
|
||||||
|
|
||||||
-spec pubrel(pid(), mqtt_msgid()) -> ok.
|
-spec pubrel(pid(), mqtt_packet_id()) -> ok.
|
||||||
pubrel(Session, MsgId) ->
|
pubrel(Session, PktId) ->
|
||||||
gen_server:cast(Session, {pubrel, MsgId}).
|
gen_server:cast(Session, {pubrel, PktId}).
|
||||||
|
|
||||||
-spec pubcomp(pid(), mqtt_msgid()) -> ok.
|
-spec pubcomp(pid(), mqtt_packet_id()) -> ok.
|
||||||
pubcomp(Session, MsgId) ->
|
pubcomp(Session, PktId) ->
|
||||||
gen_server:cast(Session, {pubcomp, MsgId}).
|
gen_server:cast(Session, {pubcomp, PktId}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Unsubscribe Topics
|
%% @doc Unsubscribe Topics
|
||||||
|
@ -217,7 +217,7 @@ init([CleanSess, ClientId, ClientPid]) ->
|
||||||
SessEnv = emqttd:env(mqtt, session),
|
SessEnv = emqttd:env(mqtt, session),
|
||||||
Session = #session{
|
Session = #session{
|
||||||
clean_sess = CleanSess,
|
clean_sess = CleanSess,
|
||||||
clientid = ClientId,
|
client_id = ClientId,
|
||||||
client_pid = ClientPid,
|
client_pid = ClientPid,
|
||||||
subscriptions = [],
|
subscriptions = [],
|
||||||
inflight_queue = [],
|
inflight_queue = [],
|
||||||
|
@ -234,7 +234,7 @@ init([CleanSess, ClientId, ClientPid]) ->
|
||||||
timestamp = os:timestamp()},
|
timestamp = os:timestamp()},
|
||||||
{ok, Session, hibernate}.
|
{ok, Session, hibernate}.
|
||||||
|
|
||||||
handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId,
|
handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId,
|
||||||
subscriptions = Subscriptions}) ->
|
subscriptions = Subscriptions}) ->
|
||||||
|
|
||||||
%% subscribe first and don't care if the subscriptions have been existed
|
%% subscribe first and don't care if the subscriptions have been existed
|
||||||
|
@ -258,13 +258,13 @@ handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId,
|
||||||
%% <MQTT V3.1.1>: 3.8.4
|
%% <MQTT V3.1.1>: 3.8.4
|
||||||
%% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
%% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
||||||
%% a new Subscription is created and all matching retained messages are sent.
|
%% a new Subscription is created and all matching retained messages are sent.
|
||||||
emqttd_msg_store:redeliver(Topic, self()),
|
emqttd_retained:dispatch(Topic, self()),
|
||||||
[{Topic, Qos} | Acc]
|
[{Topic, Qos} | Acc]
|
||||||
end
|
end
|
||||||
end, Subscriptions, Topics),
|
end, Subscriptions, Topics),
|
||||||
{reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}};
|
{reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}};
|
||||||
|
|
||||||
handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId,
|
handle_call({unsubscribe, Topics}, _From, Session = #session{client_id = ClientId,
|
||||||
subscriptions = Subscriptions}) ->
|
subscriptions = Subscriptions}) ->
|
||||||
|
|
||||||
%% unsubscribe from topic tree
|
%% unsubscribe from topic tree
|
||||||
|
@ -284,22 +284,22 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId
|
||||||
|
|
||||||
{reply, ok, Session#session{subscriptions = Subscriptions1}};
|
{reply, ok, Session#session{subscriptions = Subscriptions1}};
|
||||||
|
|
||||||
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From,
|
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
|
||||||
Session = #session{clientid = ClientId,
|
Session = #session{client_id = ClientId,
|
||||||
awaiting_rel = AwaitingRel,
|
awaiting_rel = AwaitingRel,
|
||||||
await_rel_timeout = Timeout}) ->
|
await_rel_timeout = Timeout}) ->
|
||||||
case check_awaiting_rel(Session) of
|
case check_awaiting_rel(Session) of
|
||||||
true ->
|
true ->
|
||||||
TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}),
|
TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
|
||||||
AwaitingRel1 = maps:put(MsgId, {Msg, TRef}, AwaitingRel),
|
AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
|
||||||
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
|
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
|
||||||
false ->
|
false ->
|
||||||
lager:critical([{clientid, ClientId}], "Session ~s dropped Qos2 message "
|
lager:critical([{client, ClientId}], "Session ~s dropped Qos2 message "
|
||||||
"for too many awaiting_rel: ~p", [ClientId, Msg]),
|
"for too many awaiting_rel: ~p", [ClientId, Msg]),
|
||||||
{reply, {error, dropped}, Session}
|
{reply, {error, dropped}, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call({destroy, ClientId}, _From, Session = #session{clientid = ClientId}) ->
|
handle_call({destroy, ClientId}, _From, Session = #session{client_id = ClientId}) ->
|
||||||
lager:warning("Session ~s destroyed", [ClientId]),
|
lager:warning("Session ~s destroyed", [ClientId]),
|
||||||
{stop, {shutdown, destroy}, ok, Session};
|
{stop, {shutdown, destroy}, ok, Session};
|
||||||
|
|
||||||
|
@ -309,7 +309,7 @@ handle_call(Req, _From, State) ->
|
||||||
|
|
||||||
handle_cast({resume, ClientId, ClientPid}, Session) ->
|
handle_cast({resume, ClientId, ClientPid}, Session) ->
|
||||||
|
|
||||||
#session{clientid = ClientId,
|
#session{client_id = ClientId,
|
||||||
client_pid = OldClientPid,
|
client_pid = OldClientPid,
|
||||||
inflight_queue = InflightQ,
|
inflight_queue = InflightQ,
|
||||||
awaiting_ack = AwaitingAck,
|
awaiting_ack = AwaitingAck,
|
||||||
|
@ -326,7 +326,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
|
||||||
true = link(ClientPid),
|
true = link(ClientPid),
|
||||||
|
|
||||||
%% Redeliver PUBREL
|
%% Redeliver PUBREL
|
||||||
[ClientPid ! {redeliver, {?PUBREL, MsgId}} || MsgId <- maps:keys(AwaitingComp)],
|
[ClientPid ! {redeliver, {?PUBREL, PktId}} || PktId <- maps:keys(AwaitingComp)],
|
||||||
|
|
||||||
%% Clear awaiting_ack timers
|
%% Clear awaiting_ack timers
|
||||||
[cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)],
|
[cancel_timer(TRef) || {_, TRef} <- maps:values(AwaitingAck)],
|
||||||
|
@ -349,54 +349,54 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
|
||||||
{noreply, dequeue(Session2), hibernate};
|
{noreply, dequeue(Session2), hibernate};
|
||||||
|
|
||||||
%% PUBRAC
|
%% PUBRAC
|
||||||
handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, awaiting_ack = Awaiting}) ->
|
handle_cast({puback, PktId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) ->
|
||||||
case maps:find(MsgId, Awaiting) of
|
case maps:find(PktId, Awaiting) of
|
||||||
{ok, {_, TRef}} ->
|
{ok, {_, TRef}} ->
|
||||||
cancel_timer(TRef),
|
cancel_timer(TRef),
|
||||||
Session1 = acked(MsgId, Session),
|
Session1 = acked(PktId, Session),
|
||||||
{noreply, dequeue(Session1)};
|
{noreply, dequeue(Session1)};
|
||||||
error ->
|
error ->
|
||||||
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, MsgId]),
|
lager:error("Session ~s cannot find PUBACK '~p'!", [ClientId, PktId]),
|
||||||
{noreply, Session}
|
{noreply, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% PUBREC
|
%% PUBREC
|
||||||
handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId,
|
handle_cast({pubrec, PktId}, Session = #session{client_id = ClientId,
|
||||||
awaiting_ack = AwaitingAck,
|
awaiting_ack = AwaitingAck,
|
||||||
awaiting_comp = AwaitingComp,
|
awaiting_comp = AwaitingComp,
|
||||||
await_rel_timeout = Timeout}) ->
|
await_rel_timeout = Timeout}) ->
|
||||||
case maps:find(MsgId, AwaitingAck) of
|
case maps:find(PktId, AwaitingAck) of
|
||||||
{ok, {_, TRef}} ->
|
{ok, {_, TRef}} ->
|
||||||
cancel_timer(TRef),
|
cancel_timer(TRef),
|
||||||
TRef1 = timer(Timeout, {timeout, awaiting_comp, MsgId}),
|
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
|
||||||
Session1 = acked(MsgId, Session#session{awaiting_comp = maps:put(MsgId, TRef1, AwaitingComp)}),
|
Session1 = acked(PktId, Session#session{awaiting_comp = maps:put(PktId, TRef1, AwaitingComp)}),
|
||||||
{noreply, dequeue(Session1)};
|
{noreply, dequeue(Session1)};
|
||||||
error ->
|
error ->
|
||||||
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, MsgId]),
|
lager:error("Session ~s cannot find PUBREC '~p'!", [ClientId, PktId]),
|
||||||
{noreply, Session}
|
{noreply, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% PUBREL
|
%% PUBREL
|
||||||
handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId,
|
handle_cast({pubrel, PktId}, Session = #session{client_id = ClientId,
|
||||||
awaiting_rel = AwaitingRel}) ->
|
awaiting_rel = AwaitingRel}) ->
|
||||||
case maps:find(MsgId, AwaitingRel) of
|
case maps:find(PktId, AwaitingRel) of
|
||||||
{ok, {Msg, TRef}} ->
|
{ok, {Msg, TRef}} ->
|
||||||
cancel_timer(TRef),
|
cancel_timer(TRef),
|
||||||
emqttd_pubsub:publish(Msg),
|
emqttd_pubsub:publish(Msg),
|
||||||
{noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}};
|
{noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}};
|
||||||
error ->
|
error ->
|
||||||
lager:error("Session ~s cannot find PUBREL: msgid=~p!", [ClientId, MsgId]),
|
lager:error("Session ~s cannot find PUBREL: pktid=~p!", [ClientId, PktId]),
|
||||||
{noreply, Session}
|
{noreply, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% PUBCOMP
|
%% PUBCOMP
|
||||||
handle_cast({pubcomp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}) ->
|
handle_cast({pubcomp, PktId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) ->
|
||||||
case maps:find(MsgId, AwaitingComp) of
|
case maps:find(PktId, AwaitingComp) of
|
||||||
{ok, TRef} ->
|
{ok, TRef} ->
|
||||||
cancel_timer(TRef),
|
cancel_timer(TRef),
|
||||||
{noreply, Session#session{awaiting_comp = maps:remove(MsgId, AwaitingComp)}};
|
{noreply, Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)}};
|
||||||
error ->
|
error ->
|
||||||
lager:error("Session ~s cannot find PUBCOMP: MsgId=~p", [ClientId, MsgId]),
|
lager:error("Session ~s cannot find PUBCOMP: PktId=~p", [ClientId, PktId]),
|
||||||
{noreply, Session}
|
{noreply, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -417,7 +417,7 @@ handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
|
||||||
{noreply, Session};
|
{noreply, Session};
|
||||||
|
|
||||||
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
|
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
|
||||||
Session = #session{clientid = ClientId, message_queue = MsgQ})
|
Session = #session{client_id = ClientId, message_queue = MsgQ})
|
||||||
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
|
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
|
||||||
|
|
||||||
case check_inflight(Session) of
|
case check_inflight(Session) of
|
||||||
|
@ -428,51 +428,51 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
|
||||||
{noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}}
|
{noreply, Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_pid = undefined,
|
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
|
||||||
awaiting_ack = AwaitingAck}) ->
|
awaiting_ack = AwaitingAck}) ->
|
||||||
%% just remove awaiting
|
%% just remove awaiting
|
||||||
{noreply, Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck)}};
|
{noreply, Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)}};
|
||||||
|
|
||||||
handle_info({timeout, awaiting_ack, MsgId}, Session = #session{clientid = ClientId,
|
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_id = ClientId,
|
||||||
inflight_queue = InflightQ,
|
inflight_queue = InflightQ,
|
||||||
awaiting_ack = AwaitingAck}) ->
|
awaiting_ack = AwaitingAck}) ->
|
||||||
case maps:find(MsgId, AwaitingAck) of
|
case maps:find(PktId, AwaitingAck) of
|
||||||
{ok, {{0, _Timeout}, _TRef}} ->
|
{ok, {{0, _Timeout}, _TRef}} ->
|
||||||
Session1 = Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ),
|
Session1 = Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
|
||||||
awaiting_ack = maps:remove(MsgId, AwaitingAck)},
|
awaiting_ack = maps:remove(PktId, AwaitingAck)},
|
||||||
{noreply, dequeue(Session1)};
|
{noreply, dequeue(Session1)};
|
||||||
{ok, {{Retries, Timeout}, _TRef}} ->
|
{ok, {{Retries, Timeout}, _TRef}} ->
|
||||||
TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}),
|
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
|
||||||
AwaitingAck1 = maps:put(MsgId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck),
|
AwaitingAck1 = maps:put(PktId, {{Retries-1, Timeout*2}, TRef}, AwaitingAck),
|
||||||
{noreply, Session#session{awaiting_ack = AwaitingAck1}};
|
{noreply, Session#session{awaiting_ack = AwaitingAck1}};
|
||||||
error ->
|
error ->
|
||||||
lager:error([{client, ClientId}], "Session ~s "
|
lager:error([{client, ClientId}], "Session ~s "
|
||||||
"cannot find Awaiting Ack:~p", [ClientId, MsgId]),
|
"cannot find Awaiting Ack:~p", [ClientId, PktId]),
|
||||||
{noreply, Session}
|
{noreply, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId,
|
handle_info({timeout, awaiting_rel, PktId}, Session = #session{client_id = ClientId,
|
||||||
awaiting_rel = AwaitingRel}) ->
|
awaiting_rel = AwaitingRel}) ->
|
||||||
case maps:find(MsgId, AwaitingRel) of
|
case maps:find(PktId, AwaitingRel) of
|
||||||
{ok, {Msg, _TRef}} ->
|
{ok, {Msg, _TRef}} ->
|
||||||
lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n"
|
lager:error([{client, ClientId}], "Session ~s AwaitingRel Timout!~n"
|
||||||
"Drop Message:~p", [ClientId, Msg]),
|
"Drop Message:~p", [ClientId, Msg]),
|
||||||
{noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}};
|
{noreply, Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)}};
|
||||||
error ->
|
error ->
|
||||||
lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: MsgId=~p", [ClientId, MsgId]),
|
lager:error([{client, ClientId}], "Session ~s Cannot find AwaitingRel: PktId=~p", [ClientId, PktId]),
|
||||||
{noreply, Session}
|
{noreply, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId,
|
handle_info({timeout, awaiting_comp, PktId}, Session = #session{client_id = ClientId,
|
||||||
awaiting_comp = Awaiting}) ->
|
awaiting_comp = Awaiting}) ->
|
||||||
case maps:find(MsgId, Awaiting) of
|
case maps:find(PktId, Awaiting) of
|
||||||
{ok, _TRef} ->
|
{ok, _TRef} ->
|
||||||
lager:error([{client, ClientId}], "Session ~s "
|
lager:error([{client, ClientId}], "Session ~s "
|
||||||
"Awaiting PUBCOMP Timout: MsgId=~p!", [ClientId, MsgId]),
|
"Awaiting PUBCOMP Timout: PktId=~p!", [ClientId, PktId]),
|
||||||
{noreply, Session#session{awaiting_comp = maps:remove(MsgId, Awaiting)}};
|
{noreply, Session#session{awaiting_comp = maps:remove(PktId, Awaiting)}};
|
||||||
error ->
|
error ->
|
||||||
lager:error([{client, ClientId}], "Session ~s "
|
lager:error([{client, ClientId}], "Session ~s "
|
||||||
"Cannot find Awaiting PUBCOMP: MsgId=~p", [ClientId, MsgId]),
|
"Cannot find Awaiting PUBCOMP: PktId=~p", [ClientId, PktId]),
|
||||||
{noreply, Session}
|
{noreply, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -481,25 +481,25 @@ handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
|
||||||
{stop, normal, Session};
|
{stop, normal, Session};
|
||||||
|
|
||||||
handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
|
handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
|
||||||
clientid = ClientId,
|
client_id = ClientId,
|
||||||
client_pid = ClientPid,
|
client_pid = ClientPid,
|
||||||
expired_after = Expires}) ->
|
expired_after = Expires}) ->
|
||||||
lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]),
|
lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]),
|
||||||
TRef = timer(Expires, session_expired),
|
TRef = timer(Expires, session_expired),
|
||||||
{noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate};
|
{noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate};
|
||||||
|
|
||||||
handle_info({'EXIT', Pid, _Reason}, Session = #session{clientid = ClientId,
|
handle_info({'EXIT', Pid, _Reason}, Session = #session{client_id = ClientId,
|
||||||
client_pid = ClientPid}) ->
|
client_pid = ClientPid}) ->
|
||||||
|
|
||||||
lager:error("Session ~s received unexpected EXIT:"
|
lager:error("Session ~s received unexpected EXIT:"
|
||||||
" client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]),
|
" client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]),
|
||||||
{noreply, Session};
|
{noreply, Session};
|
||||||
|
|
||||||
handle_info(session_expired, Session = #session{clientid = ClientId}) ->
|
handle_info(session_expired, Session = #session{client_id = ClientId}) ->
|
||||||
lager:error("Session ~s expired, shutdown now!", [ClientId]),
|
lager:error("Session ~s expired, shutdown now!", [ClientId]),
|
||||||
{stop, {shutdown, expired}, Session};
|
{stop, {shutdown, expired}, Session};
|
||||||
|
|
||||||
handle_info(Info, Session = #session{clientid = ClientId}) ->
|
handle_info(Info, Session = #session{client_id = ClientId}) ->
|
||||||
lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]),
|
lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]),
|
||||||
{noreply, Session}.
|
{noreply, Session}.
|
||||||
|
|
||||||
|
@ -566,13 +566,13 @@ dequeue2(Session = #session{message_queue = Q}) ->
|
||||||
deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
|
deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
|
||||||
ClientPid ! {deliver, Msg}, Session;
|
ClientPid ! {deliver, Msg}, Session;
|
||||||
|
|
||||||
deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{message_id = MsgId,
|
deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId,
|
||||||
client_pid = ClientPid,
|
client_pid = ClientPid,
|
||||||
inflight_queue = InflightQ})
|
inflight_queue = InflightQ})
|
||||||
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
|
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
|
||||||
Msg1 = Msg#mqtt_message{msgid = MsgId, dup = false},
|
Msg1 = Msg#mqtt_message{pktid = PktId, dup = false},
|
||||||
ClientPid ! {deliver, Msg1},
|
ClientPid ! {deliver, Msg1},
|
||||||
await(Msg1, next_msgid(Session#session{inflight_queue = [{MsgId, Msg1}|InflightQ]})).
|
await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})).
|
||||||
|
|
||||||
redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) ->
|
redeliver(Msg = #mqtt_message{qos = ?QOS_0}, Session) ->
|
||||||
deliver(Msg, Session);
|
deliver(Msg, Session);
|
||||||
|
@ -585,23 +585,23 @@ redeliver(Msg = #mqtt_message{qos = QoS}, Session = #session{client_pid = Client
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Awaiting ack for qos1, qos2 message
|
%% Awaiting ack for qos1, qos2 message
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
await(#mqtt_message{msgid = MsgId}, Session = #session{awaiting_ack = Awaiting,
|
await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting,
|
||||||
unack_retries = Retries,
|
unack_retries = Retries,
|
||||||
unack_timeout = Timeout}) ->
|
unack_timeout = Timeout}) ->
|
||||||
TRef = timer(Timeout, {timeout, awaiting_ack, MsgId}),
|
TRef = timer(Timeout, {timeout, awaiting_ack, PktId}),
|
||||||
Awaiting1 = maps:put(MsgId, {{Retries, Timeout}, TRef}, Awaiting),
|
Awaiting1 = maps:put(PktId, {{Retries, Timeout}, TRef}, Awaiting),
|
||||||
Session#session{awaiting_ack = Awaiting1}.
|
Session#session{awaiting_ack = Awaiting1}.
|
||||||
|
|
||||||
acked(MsgId, Session = #session{inflight_queue = InflightQ,
|
acked(PktId, Session = #session{inflight_queue = InflightQ,
|
||||||
awaiting_ack = Awaiting}) ->
|
awaiting_ack = Awaiting}) ->
|
||||||
Session#session{inflight_queue = lists:keydelete(MsgId, 1, InflightQ),
|
Session#session{inflight_queue = lists:keydelete(PktId, 1, InflightQ),
|
||||||
awaiting_ack = maps:remove(MsgId, Awaiting)}.
|
awaiting_ack = maps:remove(PktId, Awaiting)}.
|
||||||
|
|
||||||
next_msgid(Session = #session{message_id = 16#ffff}) ->
|
next_packet_id(Session = #session{packet_id = 16#ffff}) ->
|
||||||
Session#session{message_id = 1};
|
Session#session{packet_id = 1};
|
||||||
|
|
||||||
next_msgid(Session = #session{message_id = MsgId}) ->
|
next_packet_id(Session = #session{packet_id = Id}) ->
|
||||||
Session#session{message_id = MsgId + 1}.
|
Session#session{packet_id = Id + 1}.
|
||||||
|
|
||||||
timer(Timeout, TimeoutMsg) ->
|
timer(Timeout, TimeoutMsg) ->
|
||||||
erlang:send_after(Timeout * 1000, self(), TimeoutMsg).
|
erlang:send_after(Timeout * 1000, self(), TimeoutMsg).
|
||||||
|
|
|
@ -34,6 +34,7 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqttd_sm).
|
-module(emqttd_sm).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
@ -67,12 +68,11 @@
|
||||||
%% @doc Start a session manager
|
%% @doc Start a session manager
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec start_link(Id, SessStatsFun) -> {ok, pid()} | ignore | {error, any()} when
|
-spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
|
||||||
Id :: pos_integer(),
|
Id :: pos_integer(),
|
||||||
%ClientStatsFun :: fun(),
|
StatsFun :: {fun(), fun()}.
|
||||||
SessStatsFun :: fun().
|
start_link(Id, StatsFun) ->
|
||||||
start_link(Id, SessStatsFun) ->
|
gen_server:start_link(?MODULE, [Id, StatsFun], []).
|
||||||
gen_server:start_link(?MODULE, [Id, SessStatsFun], []).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Pool name.
|
%% @doc Pool name.
|
||||||
|
@ -103,7 +103,7 @@ start_session(CleanSess, ClientId) ->
|
||||||
-spec lookup_session(binary()) -> pid() | undefined.
|
-spec lookup_session(binary()) -> pid() | undefined.
|
||||||
lookup_session(ClientId) ->
|
lookup_session(ClientId) ->
|
||||||
case ets:lookup(?SESSION_TAB, ClientId) of
|
case ets:lookup(?SESSION_TAB, ClientId) of
|
||||||
[{_, SessPid, _}] -> SessPid;
|
[{_Clean, _, SessPid, _}] -> SessPid;
|
||||||
[] -> undefined
|
[] -> undefined
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ init([Id, StatsFun]) ->
|
||||||
handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
|
handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
|
||||||
Reply =
|
Reply =
|
||||||
case ets:lookup(?SESSION_TAB, ClientId) of
|
case ets:lookup(?SESSION_TAB, ClientId) of
|
||||||
[{_, SessPid, _MRef}] ->
|
[{_Clean, _, SessPid, _MRef}] ->
|
||||||
emqttd_session:resume(SessPid, ClientId, ClientPid),
|
emqttd_session:resume(SessPid, ClientId, ClientPid),
|
||||||
{ok, SessPid};
|
{ok, SessPid};
|
||||||
[] ->
|
[] ->
|
||||||
|
@ -139,7 +139,7 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
|
||||||
|
|
||||||
handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
|
handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
|
||||||
case ets:lookup(?SESSION_TAB, ClientId) of
|
case ets:lookup(?SESSION_TAB, ClientId) of
|
||||||
[{_, SessPid, MRef}] ->
|
[{_Clean, _, SessPid, MRef}] ->
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
emqttd_session:destroy(SessPid, ClientId);
|
emqttd_session:destroy(SessPid, ClientId);
|
||||||
[] ->
|
[] ->
|
||||||
|
@ -149,7 +149,7 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
|
||||||
|
|
||||||
handle_call({destroy_session, ClientId}, _From, State) ->
|
handle_call({destroy_session, ClientId}, _From, State) ->
|
||||||
case ets:lookup(?SESSION_TAB, ClientId) of
|
case ets:lookup(?SESSION_TAB, ClientId) of
|
||||||
[{_, SessPid, MRef}] ->
|
[{_Clean, _, SessPid, MRef}] ->
|
||||||
emqttd_session:destroy(SessPid, ClientId),
|
emqttd_session:destroy(SessPid, ClientId),
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
ets:delete(?SESSION_TAB, ClientId);
|
ets:delete(?SESSION_TAB, ClientId);
|
||||||
|
@ -165,7 +165,7 @@ handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
|
||||||
ets:match_delete(?SESSION_TAB, {'_', DownPid, MRef}),
|
ets:match_delete(?SESSION_TAB, {'_', '_', DownPid, MRef}),
|
||||||
{noreply, setstats(State)};
|
{noreply, setstats(State)};
|
||||||
|
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
|
@ -184,13 +184,14 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
new_session(CleanSess, ClientId, ClientPid) ->
|
new_session(CleanSess, ClientId, ClientPid) ->
|
||||||
case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of
|
case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of
|
||||||
{ok, SessPid} ->
|
{ok, SessPid} ->
|
||||||
ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}),
|
MRef = erlang:monitor(process, SessPid),
|
||||||
|
ets:insert(?SESSION_TAB, {CleanSess, ClientId, SessPid, MRef}),
|
||||||
{ok, SessPid};
|
{ok, SessPid};
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
{error, Error}
|
{error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
setstats(State = #state{statsfun = StatsFun}) ->
|
setstats(State = #state{statsfun = {CFun, SFun}}) ->
|
||||||
StatsFun(ets:info(?SESSION_TAB, size)), State.
|
CFun(ets:info(?SESSION_TAB, size)),
|
||||||
|
SFun(ets:select_count(?SESSION_TAB, [{{true, '_', '_', '_'}, [], [true]}])),
|
||||||
|
State.
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqttd_sm_sup).
|
-module(emqttd_sm_sup).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
@ -42,19 +43,20 @@ start_link() ->
|
||||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
ets:new(emqttd_sm:table(), [set, named_table, public,
|
ets:new(emqttd_sm:table(), [set, named_table, public, {keypos, 2},
|
||||||
{write_concurrency, true}]),
|
{write_concurrency, true}]),
|
||||||
Schedulers = erlang:system_info(schedulers),
|
Schedulers = erlang:system_info(schedulers),
|
||||||
gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
|
gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
|
||||||
%%ClientStatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
|
|
||||||
SessStatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
|
|
||||||
Children = lists:map(
|
Children = lists:map(
|
||||||
fun(I) ->
|
fun(I) ->
|
||||||
Name = {emqttd_sm, I},
|
Name = {emqttd_sm, I},
|
||||||
gproc_pool:add_worker(emqttd_sm:pool(), Name, I),
|
gproc_pool:add_worker(emqttd_sm:pool(), Name, I),
|
||||||
{Name, {emqttd_sm, start_link, [I, SessStatsFun]},
|
{Name, {emqttd_sm, start_link, [I, statsfun()]},
|
||||||
permanent, 10000, worker, [emqttd_sm]}
|
permanent, 10000, worker, [emqttd_sm]}
|
||||||
end, lists:seq(1, Schedulers)),
|
end, lists:seq(1, Schedulers)),
|
||||||
{ok, {{one_for_all, 10, 100}, Children}}.
|
{ok, {{one_for_all, 10, 100}, Children}}.
|
||||||
|
|
||||||
|
statsfun() ->
|
||||||
|
{emqttd_stats:statsfun('clients/count', 'clients/max'),
|
||||||
|
emqttd_stats:statsfun('sessions/count', 'sessions/max')}.
|
||||||
|
|
||||||
|
|
|
@ -124,6 +124,7 @@ setstat(Stat, Val) ->
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Set stats with max
|
%% @doc Set stats with max
|
||||||
|
%% TODO: this is wrong...
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().
|
-spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().
|
||||||
|
@ -174,9 +175,9 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
publish(Stat, Val) ->
|
publish(Stat, Val) ->
|
||||||
emqttd_pubsub:publish(#mqtt_message{from = stats,
|
Msg = emqttd_message:make(stats, stats_topic(Stat),
|
||||||
topic = stats_topic(Stat),
|
emqttd_util:integer_to_binary(Val)),
|
||||||
payload = emqttd_util:integer_to_binary(Val)}).
|
emqttd_pubsub:publish(Msg).
|
||||||
|
|
||||||
stats_topic(Stat) ->
|
stats_topic(Stat) ->
|
||||||
emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).
|
emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).
|
||||||
|
|
|
@ -71,8 +71,8 @@ unregister_mod_test() ->
|
||||||
check_acl_test() ->
|
check_acl_test() ->
|
||||||
with_acl(
|
with_acl(
|
||||||
fun() ->
|
fun() ->
|
||||||
User1 = #mqtt_client{clientid = <<"client1">>, username = <<"testuser">>},
|
User1 = #mqtt_client{client_id = <<"client1">>, username = <<"testuser">>},
|
||||||
User2 = #mqtt_client{clientid = <<"client2">>, username = <<"xyz">>},
|
User2 = #mqtt_client{client_id = <<"client2">>, username = <<"xyz">>},
|
||||||
?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)),
|
?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"users/testuser/1">>)),
|
||||||
?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1">>)),
|
?assertEqual(allow, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1">>)),
|
||||||
?assertEqual(deny, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1/x/y">>)),
|
?assertEqual(deny, emqttd_access_control:check_acl(User1, subscribe, <<"clients/client1/x/y">>)),
|
||||||
|
|
|
@ -53,8 +53,8 @@ compile_test() ->
|
||||||
?assertEqual({deny, all}, compile({deny, all})).
|
?assertEqual({deny, all}, compile({deny, all})).
|
||||||
|
|
||||||
match_test() ->
|
match_test() ->
|
||||||
User = #mqtt_client{ipaddress = {127,0,0,1}, clientid = <<"testClient">>, username = <<"TestUser">>},
|
User = #mqtt_client{ipaddress = {127,0,0,1}, client_id = <<"testClient">>, username = <<"TestUser">>},
|
||||||
User2 = #mqtt_client{ipaddress = {192,168,0,10}, clientid = <<"testClient">>, username = <<"TestUser">>},
|
User2 = #mqtt_client{ipaddress = {192,168,0,10}, client_id = <<"testClient">>, username = <<"TestUser">>},
|
||||||
|
|
||||||
?assertEqual({matched, allow}, match(User, <<"Test/Topic">>, {allow, all})),
|
?assertEqual({matched, allow}, match(User, <<"Test/Topic">>, {allow, all})),
|
||||||
?assertEqual({matched, deny}, match(User, <<"Test/Topic">>, {deny, all})),
|
?assertEqual({matched, deny}, match(User, <<"Test/Topic">>, {deny, all})),
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
%%% @Copyright (C) 2012-2015, Feng Lee <feng@emqtt.io>
|
||||||
|
%%%
|
||||||
|
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
%%% of this software and associated documentation files (the "Software"), to deal
|
||||||
|
%%% in the Software without restriction, including without limitation the rights
|
||||||
|
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
%%% copies of the Software, and to permit persons to whom the Software is
|
||||||
|
%%% furnished to do so, subject to the following conditions:
|
||||||
|
%%%
|
||||||
|
%%% The above copyright notice and this permission notice shall be included in all
|
||||||
|
%%% copies or substantial portions of the Software.
|
||||||
|
%%%
|
||||||
|
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
%%% SOFTWARE.
|
||||||
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqttd_guid_tests).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
gen_test() ->
|
||||||
|
Guid1 = emqttd_guid:gen(),
|
||||||
|
Guid2 = emqttd_guid:gen(),
|
||||||
|
?assertMatch(<<_:128>>, Guid1),
|
||||||
|
?assertEqual(true, Guid2 >= Guid1).
|
||||||
|
|
||||||
|
-endif.
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -26,10 +26,10 @@
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
-module(emqttd_parser_tests).
|
-module(emqttd_parser_tests).
|
||||||
|
|
||||||
-include("emqttd_protocol.hrl").
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
-include("emqttd_protocol.hrl").
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
parse_connect_test() ->
|
parse_connect_test() ->
|
||||||
|
@ -43,7 +43,7 @@ parse_connect_test() ->
|
||||||
retain = false},
|
retain = false},
|
||||||
variable = #mqtt_packet_connect{proto_ver = 3,
|
variable = #mqtt_packet_connect{proto_ver = 3,
|
||||||
proto_name = <<"MQIsdp">>,
|
proto_name = <<"MQIsdp">>,
|
||||||
clientid = <<"mosqpub/10451-iMac.loca">>,
|
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60}}, <<>>}, emqttd_parser:parse(V31ConnBin, State)),
|
keep_alive = 60}}, <<>>}, emqttd_parser:parse(V31ConnBin, State)),
|
||||||
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
%% CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10451-iMac.loca, ProtoName=MQTT, ProtoVsn=4, CleanSess=true, KeepAlive=60, Username=undefined, Password=undefined)
|
||||||
|
@ -55,7 +55,7 @@ parse_connect_test() ->
|
||||||
retain = false},
|
retain = false},
|
||||||
variable = #mqtt_packet_connect{proto_ver = 4,
|
variable = #mqtt_packet_connect{proto_ver = 4,
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
clientid = <<"mosqpub/10451-iMac.loca">>,
|
client_id = <<"mosqpub/10451-iMac.loca">>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnBin, State)),
|
keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnBin, State)),
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ parse_connect_test() ->
|
||||||
retain = false},
|
retain = false},
|
||||||
variable = #mqtt_packet_connect{proto_ver = 4,
|
variable = #mqtt_packet_connect{proto_ver = 4,
|
||||||
proto_name = <<"MQTT">>,
|
proto_name = <<"MQTT">>,
|
||||||
clientid = <<>>,
|
client_id = <<>>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnWithoutClientId, State)),
|
keep_alive = 60 } }, <<>>}, emqttd_parser:parse(V311ConnWithoutClientId, State)),
|
||||||
%%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
|
%%CONNECT(Qos=0, Retain=false, Dup=false, ClientId=mosqpub/10452-iMac.loca, ProtoName=MQIsdp, ProtoVsn=3, CleanSess=true, KeepAlive=60, Username=test, Password=******, Will(Qos=1, Retain=false, Topic=/will, Msg=willmsg))
|
||||||
|
@ -80,7 +80,7 @@ parse_connect_test() ->
|
||||||
retain = false},
|
retain = false},
|
||||||
variable = #mqtt_packet_connect{proto_ver = 3,
|
variable = #mqtt_packet_connect{proto_ver = 3,
|
||||||
proto_name = <<"MQIsdp">>,
|
proto_name = <<"MQIsdp">>,
|
||||||
clientid = <<"mosqpub/10452-iMac.loca">>,
|
client_id = <<"mosqpub/10452-iMac.loca">>,
|
||||||
clean_sess = true,
|
clean_sess = true,
|
||||||
keep_alive = 60,
|
keep_alive = 60,
|
||||||
will_retain = false,
|
will_retain = false,
|
||||||
|
|
Loading…
Reference in New Issue