refactor records

This commit is contained in:
Ery Lee 2015-04-15 12:37:44 +08:00
parent 23d774ce1a
commit 8c28bbcc7a
18 changed files with 175 additions and 150 deletions

View File

@ -128,7 +128,7 @@
-type mqtt_packet_id() :: 1..16#ffff | undefined.
-record(mqtt_packet_connect, {
client_id = <<>> :: binary(),
clientid = <<>> :: binary(),
proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(),
proto_name = <<"MQTT">> :: binary(),
will_retain = false :: boolean(),
@ -225,3 +225,4 @@
-define(PACKET(Type),
#mqtt_packet{header = #mqtt_packet_header{type = Type}}).

View File

@ -104,7 +104,7 @@ dump_variable(#mqtt_packet_connect{
will_flag = WillFlag,
clean_sess = CleanSess,
keep_alive = KeepAlive,
client_id = ClientId,
clientid = ClientId,
will_topic = WillTopic,
will_msg = WillMsg,
username = Username,

View File

@ -114,7 +114,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length)
will_flag = bool(WillFlag),
clean_sess = bool(CleanSession),
keep_alive = KeepAlive,
client_id = ClientId,
clientid = ClientId,
will_topic = WillTopic,
will_msg = WillMsg,
username = UserName,

View File

@ -60,7 +60,7 @@ serialise_header(#mqtt_packet_header{type = Type,
VariableBin/binary,
PayloadBin/binary>>.
serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
serialise_variable(?CONNECT, #mqtt_packet_connect{clientid = ClientId,
proto_ver = ProtoVer,
proto_name = ProtoName,
will_retain = WillRetain,

View File

@ -52,7 +52,7 @@
-type mqtt_topic() :: #mqtt_topic{}.
%%------------------------------------------------------------------------------
%% MQTT Topic Subscriber
%% MQTT Subscriber
%%------------------------------------------------------------------------------
-record(mqtt_subscriber, {
topic :: binary(),
@ -99,13 +99,19 @@
retain = false :: boolean(),
dup = false :: boolean(),
msgid :: mqtt_msgid(),
payload :: binary()}).
payload :: binary()
}).
-type mqtt_message() :: #mqtt_message{}.
%%------------------------------------------------------------------------------
%% MQTT Plugin
%%------------------------------------------------------------------------------
-record(mqtt_plugin, {
name,
version,
attrs,
description
}).
-record(mqtt_plugin, {name, version, attrs, description}).

View File

@ -94,29 +94,29 @@ bin(B) when is_binary(B) ->
%%
%% @end
%%%-----------------------------------------------------------------------------
-spec match(mqtt_user(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch.
match(_User, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
-spec match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch.
match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
{matched, AllowDeny};
match(User, Topic, {AllowDeny, Who, _PubSub, TopicFilters})
match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters})
when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
case match_who(User, Who) andalso match_topics(User, Topic, TopicFilters) of
case match_who(Client, Who) andalso match_topics(Client, Topic, TopicFilters) of
true -> {matched, AllowDeny};
false -> nomatch
end.
match_who(_User, all) ->
match_who(_Client, all) ->
true;
match_who(_User, {user, all}) ->
match_who(_Client, {user, all}) ->
true;
match_who(_User, {client, all}) ->
match_who(_Client, {client, all}) ->
true;
match_who(#mqtt_user{clientid = ClientId}, {client, ClientId}) ->
match_who(#mqtt_client{clientid = ClientId}, {client, ClientId}) ->
true;
match_who(#mqtt_user{username = Username}, {user, Username}) ->
match_who(#mqtt_client{username = Username}, {user, Username}) ->
true;
match_who(#mqtt_user{ipaddr = undefined}, {ipaddr, _Tup}) ->
match_who(#mqtt_client{ipaddr = undefined}, {ipaddr, _Tup}) ->
false;
match_who(#mqtt_user{ipaddr = IP}, {ipaddr, {_CDIR, Start, End}}) ->
match_who(#mqtt_client{ipaddr = IP}, {ipaddr, {_CDIR, Start, End}}) ->
I = esockd_access:atoi(IP),
I >= Start andalso I =< End;
match_who(_User, _Who) ->
@ -143,14 +143,15 @@ feed_var(User, Pattern) ->
feed_var(User, Pattern, []).
feed_var(_User, [], Acc) ->
lists:reverse(Acc);
feed_var(User = #mqtt_user{clientid = undefined}, [<<"$c">>|Words], Acc) ->
feed_var(User, Words, [<<"$c">>|Acc]);
feed_var(User = #mqtt_user{clientid = ClientId}, [<<"$c">>|Words], Acc) ->
feed_var(User, Words, [ClientId |Acc]);
feed_var(User = #mqtt_user{username = undefined}, [<<"$u">>|Words], Acc) ->
feed_var(User, Words, [<<"$u">>|Acc]);
feed_var(User = #mqtt_user{username = Username}, [<<"$u">>|Words], Acc) ->
feed_var(User, Words, [Username|Acc]);
feed_var(User, [W|Words], Acc) ->
feed_var(User, Words, [W|Acc]).
feed_var(Client = #mqtt_client{clientid = undefined}, [<<"$c">>|Words], Acc) ->
feed_var(Client, Words, [<<"$c">>|Acc]);
feed_var(Client = #mqtt_client{clientid = ClientId}, [<<"$c">>|Words], Acc) ->
feed_var(Client, Words, [ClientId |Acc]);
feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) ->
feed_var(Client, Words, [<<"$u">>|Acc]);
feed_var(Client = #mqtt_client{username = Username}, [<<"$u">>|Words], Acc) ->
feed_var(Client, Words, [Username|Acc]);
feed_var(Client, [W|Words], Acc) ->
feed_var(Client, Words, [W|Acc]).

View File

@ -83,25 +83,25 @@ start_link(AclMods) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [AclMods], []).
%% @doc Check ACL.
-spec check({User, PubSub, Topic}) -> allow | deny when
User :: mqtt_user(),
-spec check({Client, PubSub, Topic}) -> allow | deny when
Client :: mqtt_client(),
PubSub :: pubsub(),
Topic :: binary().
check({User, PubSub, Topic}) when PubSub =:= publish orelse PubSub =:= subscribe ->
check({Client, PubSub, Topic}) when PubSub =:= publish orelse PubSub =:= subscribe ->
case ets:lookup(?ACL_TABLE, acl_modules) of
[] -> allow;
[{_, AclMods}] -> check({User, PubSub, Topic}, AclMods)
[{_, AclMods}] -> check({Client, PubSub, Topic}, AclMods)
end.
check({#mqtt_user{clientid = ClientId}, PubSub, Topic}, []) ->
check({#mqtt_client{clientid = ClientId}, PubSub, Topic}, []) ->
lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]),
allow;
check({User, PubSub, Topic}, [{M, State}|AclMods]) ->
case M:check_acl({User, PubSub, Topic}, State) of
check({Client, PubSub, Topic}, [{M, State}|AclMods]) ->
case M:check_acl({Client, PubSub, Topic}, State) of
allow -> allow;
deny -> deny;
ignore -> check({User, PubSub, Topic}, AclMods)
ignore -> check({Client, PubSub, Topic}, AclMods)
end.
%% @doc Reload ACL.

View File

@ -90,13 +90,13 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
false.
%% @doc Check ACL.
-spec check_acl({User, PubSub, Topic}, State) -> allow | deny | ignore when
User :: mqtt_user(),
-spec check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when
Client :: mqtt_client(),
PubSub :: pubsub(),
Topic :: binary(),
State :: #state{}.
check_acl({User, PubSub, Topic}, #state{nomatch = Default}) ->
case match(User, Topic, lookup(PubSub)) of
check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) ->
case match(Client, Topic, lookup(PubSub)) of
{matched, allow} -> allow;
{matched, deny} -> deny;
nomatch -> Default
@ -108,12 +108,12 @@ lookup(PubSub) ->
[{PubSub, Rules}] -> Rules
end.
match(_User, _Topic, []) ->
match(_Client, _Topic, []) ->
nomatch;
match(User, Topic, [Rule|Rules]) ->
case emqttd_access_rule:match(User, Topic, Rule) of
nomatch -> match(User, Topic, Rules);
match(Client, Topic, [Rule|Rules]) ->
case emqttd_access_rule:match(Client, Topic, Rule) of
nomatch -> match(Client, Topic, Rules);
{matched, AllowDeny} -> {matched, AllowDeny}
end.

View File

@ -71,15 +71,15 @@ init(Opts) ->
end,
{ok, Opts}.
check(#mqtt_user{clientid = undefined}, _Password, []) ->
check(#mqtt_client{clientid = undefined}, _Password, []) ->
{error, "ClientId undefined"};
check(#mqtt_user{clientid = ClientId, ipaddr = IpAddr}, _Password, []) ->
check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, []) ->
check_clientid_only(ClientId, IpAddr);
check(#mqtt_user{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, no}|_]) ->
check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, no}|_]) ->
check_clientid_only(ClientId, IpAddr);
check(_User, undefined, [{password, yes}|_]) ->
check(_Client, undefined, [{password, yes}|_]) ->
{error, "Password undefined"};
check(#mqtt_user{clientid = ClientId}, Password, [{password, yes}|_]) ->
check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of
[] -> {error, "ClientId Not Found"};
[#?AUTH_CLIENTID_TABLE{password = Password}] -> ok; %% TODO: plaintext??

View File

@ -67,11 +67,11 @@ init(Opts) ->
mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), ram_copies),
{ok, Opts}.
check(#mqtt_user{username = undefined}, _Password, _Opts) ->
check(#mqtt_client{username = undefined}, _Password, _Opts) ->
{error, "Username undefined"};
check(_User, undefined, _Opts) ->
{error, "Password undefined"};
check(#mqtt_user{username = Username}, Password, _Opts) ->
check(#mqtt_client{username = Username}, Password, _Opts) ->
case mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username) of
[] ->
{error, "Username Not Found"};

View File

@ -32,6 +32,8 @@
-include_lib("emqtt/include/emqtt_packet.hrl").
-include("emqttd.hrl").
%% API Function Exports
-export([start_link/3]).

View File

@ -103,7 +103,7 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState
%% need transfer data???
%% emqttd_client:transfer(NewPid, Data),
lager:error("Shutdown for duplicate clientid: ~s, conn:~s",
[emqttd_protocol:client_id(ProtoState), ConnName]),
[emqttd_protocol:clientid(ProtoState), ConnName]),
stop({shutdown, duplicate_id}, State);
%%TODO: ok??
@ -255,7 +255,7 @@ inc(_) ->
notify(disconnected, _Reason, undefined) -> ingore;
notify(disconnected, {shutdown, Reason}, ProtoState) ->
emqttd_event:notify({disconnected, emqttd_protocol:client_id(ProtoState), [{reason, Reason}]});
emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), [{reason, Reason}]});
notify(disconnected, Reason, ProtoState) ->
emqttd_event:notify({disconnected, emqttd_protocol:client_id(ProtoState), [{reason, Reason}]}).
emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), [{reason, Reason}]}).

View File

@ -28,6 +28,8 @@
-author('feng@emqtt.io').
-include_lib("emqtt/include/emqtt_packet.hrl").
-include("emqttd.hrl").
-import(proplists, [get_value/2, get_value/3]).

View File

@ -91,10 +91,7 @@ create_tables() ->
ok = emqttd_trie:mnesia(create),
ok = emqttd_pubsub:mnesia(create),
%% TODO: retained messages, this table should not be copied...
ok = create_table(message_retained, [
{type, ordered_set},
{ram_copies, [node()]},
{attributes, record_info(fields, message_retained)}]).
ok = emqttd_retained:mnesia(create).
create_table(Table, Attrs) ->
case mnesia:create_table(Table, Attrs) of
@ -111,9 +108,9 @@ create_table(Table, Attrs) ->
%% @end
%%------------------------------------------------------------------------------
copy_tables() ->
ok = emqttd_trie:mnesia(create),
ok = emqttd_pubsub:mnesia(create),
ok = copy_table(message_retained).
ok = emqttd_trie:mnesia(replicate),
ok = emqttd_pubsub:mnesia(replicate),
ok = emqttd_retained:mnesia(replicate).
copy_table(Table) ->
case mnesia:add_table_copy(Table, node(), ram_copies) of

View File

@ -31,7 +31,7 @@
-include_lib("emqtt/include/emqtt_packet.hrl").
%% API
-export([init/2, client_id/1]).
-export([init/2, clientid/1]).
-export([received/2, send/2, redeliver/2, shutdown/2]).
@ -47,7 +47,7 @@
proto_name,
%packet_id,
username,
client_id,
clientid,
clean_sess,
session, %% session state or session pid
will_msg,
@ -63,17 +63,20 @@ init({Transport, Socket, Peername}, Opts) ->
peername = Peername,
max_clientid_len = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN)}.
client_id(#proto_state{client_id = ClientId}) -> ClientId.
clientid(#proto_state{clientid = ClientId}) -> ClientId.
client(#proto_state{peername = {Addr, _Port}, clientid = ClientId, username = Username}) ->
#mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}.
%%SHOULD be registered in emqttd_cm
info(#proto_state{proto_ver = ProtoVer,
proto_name = ProtoName,
client_id = ClientId,
clientid = ClientId,
clean_sess = CleanSess,
will_msg = WillMsg}) ->
[{proto_ver, ProtoVer},
{proto_name, ProtoName},
{client_id, ClientId},
{clientid, ClientId},
{clean_sess, CleanSess},
{will_msg, WillMsg}].
@ -92,7 +95,7 @@ received(_Packet, State = #proto_state{connected = false}) ->
{error, protocol_not_connected, State};
received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername,
client_id = ClientId}) ->
clientid = ClientId}) ->
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]),
case validate_packet(Packet) of
ok ->
@ -108,24 +111,24 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
password = Password,
clean_sess = CleanSess,
keep_alive = KeepAlive,
client_id = ClientId} = Var,
clientid = ClientId} = Var,
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]),
State1 = State#proto_state{proto_ver = ProtoVer,
username = Username,
client_id = ClientId,
clientid = ClientId,
clean_sess = CleanSess},
{ReturnCode1, State2} =
case validate_connect(Var, State) of
?CONNACK_ACCEPT ->
User = #mqtt_user{username = Username, ipaddr = Addr, clientid = ClientId},
case emqttd_auth:login(User, Password) of
Client = #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr},
case emqttd_auth:login(Client, Password) of
ok ->
ClientId1 = clientid(ClientId, State),
start_keepalive(KeepAlive),
emqttd_cm:register(ClientId1),
{?CONNACK_ACCEPT, State1#proto_state{client_id = ClientId1,
{?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1,
will_msg = willmsg(Var)}};
{error, Reason}->
lager:error("~s@~s: username '~s' login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]),
@ -142,8 +145,8 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
{ok, State2#proto_state{session = Session}};
handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
State = #proto_state{client_id = ClientId, session = Session}) ->
case emqttd_acl:check({mqtt_user(State), publish, Topic}) of
State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of
allow ->
emqttd_session:publish(Session, {?QOS_0, emqttd_message:from_packet(Packet)});
deny ->
@ -152,8 +155,8 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
{ok, State};
handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
State = #proto_state{client_id = ClientId, session = Session}) ->
case emqttd_acl:check({mqtt_user(State), publish, Topic}) of
State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of
allow ->
emqttd_session:publish(Session, {?QOS_1, emqttd_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
@ -163,8 +166,8 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
end;
handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
State = #proto_state{client_id = ClientId, session = Session}) ->
case emqttd_acl:check({mqtt_user(State), publish, Topic}) of
State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of
allow ->
NewSession = emqttd_session:publish(Session, {?QOS_2, emqttd_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
@ -187,11 +190,12 @@ handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session})
end,
{ok, NewState};
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
AllowDenies = [emqttd_acl:check({mqtt_user(State), subscribe, Topic}) || {Topic, _Qos} <- TopicTable],
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = ClientId, session = Session}) ->
AllowDenies = [emqttd_acl:check({client(State), subscribe, Topic}) || {Topic, _Qos} <- TopicTable],
case lists:member(deny, AllowDenies) of
true ->
%%TODO: return 128 QoS when deny...
lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
{ok, State};
false ->
{ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable),
@ -225,7 +229,7 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session =
{Message1, NewSession} = emqttd_session:store(Session, Message),
send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession});
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, client_id = ClientId}) when is_record(Packet, mqtt_packet) ->
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, clientid = ClientId}) when is_record(Packet, mqtt_packet) ->
lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]),
sent_stats(Packet),
Data = emqttd_serialiser:serialise(Packet),
@ -238,7 +242,7 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername
redeliver({?PUBREL, PacketId}, State) ->
send(?PUBREL_PACKET(PacketId), State).
shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) ->
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->
send_willmsg(WillMsg),
try_unregister(ClientId, self()),
lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, emqttd_net:format(Peername), Error]),
@ -248,13 +252,10 @@ willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
emqttd_message:from_packet(Packet).
clientid(<<>>, #proto_state{peername = Peername}) ->
<<"eMQTT/", (base64:encode(emqttd_net:format(Peername)))/binary>>;
<<"eMQTT_", (base64:encode(emqttd_net:format(Peername)))/binary>>;
clientid(ClientId, _State) -> ClientId.
mqtt_user(#proto_state{peername = {Addr, _Port}, client_id = ClientId, username = Username}) ->
#mqtt_user{username = Username, clientid = ClientId, ipaddr = Addr}.
send_willmsg(undefined) -> ignore;
%%TODO:should call session...
send_willmsg(WillMsg) -> emqttd_router:route(WillMsg).
@ -282,16 +283,16 @@ validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) ->
validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen})
validate_clientid(#mqtt_packet_connect{clientid = ClientId}, #proto_state{max_clientid_len = MaxLen})
when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) ->
true;
%% MQTT3.1.1 allow null clientId.
validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState)
validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, clientid = ClientId}, _ProtoState)
when size(ClientId) =:= 0 ->
true;
validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) ->
validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, clientid = ClientId}, _ProtoState) ->
lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]),
false.
@ -353,7 +354,7 @@ inc(_) ->
notify(connected, ReturnCode, #proto_state{peername = Peername,
proto_ver = ProtoVer,
client_id = ClientId,
clientid = ClientId,
clean_sess = CleanSess}) ->
Sess = case CleanSess of
true -> false;

View File

@ -128,7 +128,7 @@ subscribe(Topics = [{_Topic, _Qos}|_]) ->
-spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}.
subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) ->
TopicRecord = #mqtt_topic{topic = Topic, node = node()},
Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = self()},
Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, subpid = self()},
F = fun() ->
case insert_topic(TopicRecord) of
ok -> insert_subscriber(Subscriber);
@ -188,7 +188,7 @@ publish(Topic, Msg) when is_binary(Topic) ->
%%------------------------------------------------------------------------------
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
case mnesia:dirty_read(topic_subscriber, Topic) of
case mnesia:dirty_read(subscriber, Topic) of
[] ->
%%TODO: not right when clusted...
setstats(dropped);
@ -224,7 +224,7 @@ init([]) ->
process_flag(priority, high),
process_flag(min_heap_size, 1024*1024),
mnesia:subscribe({table, topic, simple}),
mnesia:subscribe({table, topic_subscriber, simple}),
mnesia:subscribe({table, subscriber, simple}),
{ok, #state{submap = maps:new()}}.
handle_call(Req, _From, State) ->
@ -265,10 +265,10 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa
true ->
Node = node(),
F = fun() ->
Subscribers = mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid),
lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) ->
mnesia:delete_object(Sub),
try_remove_topic(#topic{name = Topic, node = Node})
Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.subpid),
lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) ->
mnesia:delete_object(subscriber, Sub, write),
try_remove_topic(#mqtt_topic{topic = Topic, node = Node})
end, Subscribers)
end,
NewState =
@ -292,7 +292,7 @@ handle_info(Info, State) ->
terminate(_Reason, _State) ->
mnesia:unsubscribe({table, topic, simple}),
mnesia:unsubscribe({table, topic_subscriber, simple}),
mnesia:unsubscribe({table, subscriber, simple}),
%%TODO: clear topics belongs to this node???
ok.
@ -302,27 +302,27 @@ code_change(_OldVsn, State, _Extra) ->
%%%=============================================================================
%%% Internal functions
%%%=============================================================================
insert_topic(Topic = #topic{name = Name}) ->
case mnesia:wread({topic, Name}) of
insert_topic(Record = #mqtt_topic{topic = Topic}) ->
case mnesia:wread({topic, Topic}) of
[] ->
ok = emqttd_trie:insert(Name),
mnesia:write(Topic);
Topics ->
case lists:member(Topic, Topics) of
ok = emqttd_trie:insert(Topic),
mnesia:write(topic, Record, write);
Records ->
case lists:member(Record, Records) of
true -> ok;
false -> mnesia:write(Topic)
false -> mnesia:write(topic, Record, write)
end
end.
insert_subscriber(Subscriber) ->
mnesia:write(Subscriber).
mnesia:write(subscriber, Subscriber, write).
try_remove_topic(Topic = #topic{name = Name}) ->
case mnesia:read({topic_subscriber, Name}) of
try_remove_topic(Record = #mqtt_topic{topic = Topic}) ->
case mnesia:read({subscriber, Topic}) of
[] ->
mnesia:delete_object(Topic),
case mnesia:read(topic, Name) of
[] -> emqttd_trie:delete(Name);
mnesia:delete_object(topic, Record, write),
case mnesia:read(topic, Topic) of
[] -> emqttd_trie:delete(Topic);
_ -> ok
end;
_ ->
@ -335,7 +335,7 @@ setstats(topics) ->
setstats(subscribers) ->
emqttd_broker:setstats('subscribers/count',
'subscribers/max',
mnesia:table_info(topic_subscriber, size));
mnesia:table_info(subscriber, size));
setstats(dropped) ->
emqttd_metrics:inc('messages/dropped').

View File

@ -22,6 +22,8 @@
%%% @doc
%%% emqttd retained messages.
%%%
%%% TODO: need to redesign later.
%%%
%%% @end
%%%-----------------------------------------------------------------------------
-module(emqttd_retained).
@ -30,31 +32,45 @@
-include("emqttd.hrl").
-define(RETAINED_TABLE, message_retained).
%% Mnesia callbacks
-export([mnesia/1]).
-mnesia_create({mnesia, [create]}).
-mnesia_replicate({mnesia, [replicate]}).
%% API Function Exports
-export([retain/1, redeliver/2]).
mnesia(create) ->
ok = emqtt_mnesia:create_table(message, [
{type, ordered_set},
{ram_copies, [node()]},
{record_name, mqtt_message},
{attributes, record_info(fields, mqtt_message)}]);
mnesia(replicate) ->
ok = emqtt_mnesia:copy_table(message).
%% @doc retain message.
-spec retain(mqtt_message()) -> ok | ignore.
retain(#mqtt_message{retain = false}) -> ignore;
%% RETAIN flag set to 1 and payload containing zero bytes
retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
mnesia:async_dirty(fun mnesia:delete/1, [{?RETAINED_TABLE, Topic}]);
mnesia:async_dirty(fun mnesia:delete/1, [{message, Topic}]);
retain(Msg = #mqtt_message{retain = true,
topic = Topic,
retain(Msg = #mqtt_message{topic = Topic,
retain = true,
qos = Qos,
payload = Payload}) ->
TabSize = mnesia:table_info(?RETAINED_TABLE, size),
TabSize = mnesia:table_info(message, size),
case {TabSize < limit(table), size(Payload) < limit(payload)} of
{true, true} ->
lager:debug("Retained: store message: ~p", [Msg]),
RetainedMsg = #message_retained{topic = Topic, qos = Qos, payload = Payload},
mnesia:async_dirty(fun mnesia:write/1, [RetainedMsg]),
mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]),
emqttd_metrics:set('messages/retained/count',
mnesia:table_info(?RETAINED_TABLE, size));
mnesia:table_info(message, size));
{false, _}->
lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]);
{_, false}->
@ -83,26 +99,23 @@ redeliver(Topics, CPid) when is_pid(CPid) ->
lists:foreach(fun(Topic) ->
case emqtt_topic:wildcard(Topic) of
false ->
dispatch(CPid, mnesia:dirty_read(message_retained, Topic));
dispatch(CPid, mnesia:dirty_read(message, Topic));
true ->
Fun = fun(Msg = #message_retained{topic = Name}, Acc) ->
Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) ->
case emqtt_topic:match(Name, Topic) of
true -> [Msg|Acc];
false -> Acc
end
end,
RetainedMsgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], ?RETAINED_TABLE]),
dispatch(CPid, lists:reverse(RetainedMsgs))
Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]),
dispatch(CPid, lists:reverse(Msgs))
end
end, Topics).
dispatch(_CPid, []) ->
ignore;
dispatch(CPid, RetainedMsgs) when is_list(RetainedMsgs) ->
CPid ! {dispatch, {self(), [mqtt_msg(Msg) || Msg <- RetainedMsgs]}};
dispatch(CPid, RetainedMsg) when is_record(RetainedMsg, message_retained) ->
CPid ! {dispatch, {self(), mqtt_msg(RetainedMsg)}}.
mqtt_msg(#message_retained{topic = Topic, qos = Qos, payload = Payload}) ->
#mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}.
dispatch(CPid, Msgs) when is_list(Msgs) ->
CPid ! {dispatch, {self(), [Msg || Msg <- Msgs]}};
dispatch(CPid, Msg) when is_record(Msg, mqtt_message) ->
CPid ! {dispatch, {self(), Msg}}.

View File

@ -28,6 +28,8 @@
-include("emqttd.hrl").
-include_lib("emqtt/include/emqtt_packet.hrl").
%% API Function Exports
-export([start/1,
resume/3,
@ -47,7 +49,7 @@
terminate/2, code_change/3]).
-record(session_state, {
client_id :: binary(),
clientid :: binary(),
client_pid :: pid(),
message_id = 1,
submap :: map(),
@ -122,7 +124,7 @@ publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) ->
%% @end
%%------------------------------------------------------------------------------
-spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session().
puback(SessState = #session_state{client_id = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) ->
puback(SessState = #session_state{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) ->
case maps:is_key(PacketId, Awaiting) of
true -> ok;
false -> lager:warning("Session ~s: PUBACK PacketId '~p' not found!", [ClientId, PacketId])
@ -132,7 +134,7 @@ puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {puback, PacketId}), SessPid;
%% PUBREC
puback(SessState = #session_state{client_id = ClientId,
puback(SessState = #session_state{clientid = ClientId,
awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) ->
case maps:is_key(PacketId, AwaitingAck) of
@ -146,7 +148,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubrec, PacketId}), SessPid;
%% PUBREL
puback(SessState = #session_state{client_id = ClientId,
puback(SessState = #session_state{clientid = ClientId,
awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
case maps:find(PacketId, Awaiting) of
{ok, Msg} -> emqttd_router:route(Msg);
@ -158,7 +160,7 @@ puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {pubrel, PacketId}), SessPid;
%% PUBCOMP
puback(SessState = #session_state{client_id = ClientId,
puback(SessState = #session_state{clientid = ClientId,
awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) ->
case maps:is_key(PacketId, AwaitingComp) of
true -> ok;
@ -176,7 +178,7 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) ->
%% @end
%%------------------------------------------------------------------------------
-spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}.
subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) ->
subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) ->
Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)],
case Resubs of
[] -> ok;
@ -199,7 +201,7 @@ subscribe(SessPid, Topics) when is_pid(SessPid) ->
%% @end
%%------------------------------------------------------------------------------
-spec unsubscribe(session(), [binary()]) -> {ok, session()}.
unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) ->
unsubscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) ->
%%TODO: refactor later.
case Topics -- maps:keys(SubMap) of
[] -> ok;
@ -238,7 +240,7 @@ store(SessState = #session_state{message_id = MsgId, awaiting_ack = Awaiting},
{Message1, next_msg_id(SessState#session_state{awaiting_ack = Awaiting1})}.
initial_state(ClientId) ->
#session_state{client_id = ClientId,
#session_state{clientid = ClientId,
submap = #{},
awaiting_ack = #{},
awaiting_rel = #{},
@ -278,12 +280,12 @@ handle_call(Req, _From, State) ->
{stop, {badreq, Req}, State}.
handle_cast({resume, ClientId, ClientPid}, State = #session_state{
client_id = ClientId,
client_pid = undefined,
msg_queue = Queue,
awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp,
expire_timer = ETimer}) ->
clientid = ClientId,
client_pid = undefined,
msg_queue = Queue,
awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp,
expire_timer = ETimer}) ->
lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]),
%cancel timeout timer
erlang:cancel_timer(ETimer),
@ -328,7 +330,7 @@ handle_cast({pubcomp, PacketId}, State) ->
NewState = puback(State, {?PUBCOMP, PacketId}),
{noreply, NewState};
handle_cast({destroy, ClientId}, State = #session_state{client_id = ClientId}) ->
handle_cast({destroy, ClientId}, State = #session_state{clientid = ClientId}) ->
lager:warning("Session ~s destroyed", [ClientId]),
{stop, normal, State};
@ -342,14 +344,14 @@ handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) ->
handle_info({dispatch, {_From, Message}}, State) ->
{noreply, dispatch(Message, State)};
handle_info({'EXIT', ClientPid, Reason}, State = #session_state{client_id = ClientId,
handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId,
client_pid = ClientPid,
expires = Expires}) ->
lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]),
Timer = erlang:send_after(Expires * 1000, self(), session_expired),
{noreply, State#session_state{client_pid = undefined, expire_timer = Timer}};
handle_info(session_expired, State = #session_state{client_id = ClientId}) ->
handle_info(session_expired, State = #session_state{clientid = ClientId}) ->
lager:warning("Session ~s expired!", [ClientId]),
{stop, {shutdown, expired}, State};
@ -366,7 +368,7 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=============================================================================
dispatch(Message, State = #session_state{client_id = ClientId,
dispatch(Message, State = #session_state{clientid = ClientId,
client_pid = undefined}) ->
queue(ClientId, Message, State);