From 8c28bbcc7a76b47e0e2822f4f2663a1c927f0718 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Wed, 15 Apr 2015 12:37:44 +0800 Subject: [PATCH] refactor records --- apps/emqtt/include/emqtt_packet.hrl | 3 +- apps/emqtt/src/emqtt_packet.erl | 2 +- apps/emqtt/src/emqtt_parser.erl | 2 +- apps/emqtt/src/emqtt_serialiser.erl | 2 +- apps/emqttd/include/emqttd.hrl | 12 +++-- apps/emqttd/src/emqttd_access_rule.erl | 43 +++++++++-------- apps/emqttd/src/emqttd_acl.erl | 16 +++---- apps/emqttd/src/emqttd_acl_internal.erl | 16 +++---- apps/emqttd/src/emqttd_auth_clientid.erl | 10 ++-- apps/emqttd/src/emqttd_auth_username.erl | 4 +- apps/emqttd/src/emqttd_bridge.erl | 2 + apps/emqttd/src/emqttd_client.erl | 6 +-- apps/emqttd/src/emqttd_http.erl | 2 + apps/emqttd/src/emqttd_mnesia.erl | 11 ++--- apps/emqttd/src/emqttd_protocol.erl | 61 ++++++++++++------------ apps/emqttd/src/emqttd_pubsub.erl | 44 ++++++++--------- apps/emqttd/src/emqttd_retained.erl | 51 ++++++++++++-------- apps/emqttd/src/emqttd_session.erl | 38 ++++++++------- 18 files changed, 175 insertions(+), 150 deletions(-) diff --git a/apps/emqtt/include/emqtt_packet.hrl b/apps/emqtt/include/emqtt_packet.hrl index 408fccb60..729eafc65 100644 --- a/apps/emqtt/include/emqtt_packet.hrl +++ b/apps/emqtt/include/emqtt_packet.hrl @@ -128,7 +128,7 @@ -type mqtt_packet_id() :: 1..16#ffff | undefined. -record(mqtt_packet_connect, { - client_id = <<>> :: binary(), + clientid = <<>> :: binary(), proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(), proto_name = <<"MQTT">> :: binary(), will_retain = false :: boolean(), @@ -225,3 +225,4 @@ -define(PACKET(Type), #mqtt_packet{header = #mqtt_packet_header{type = Type}}). + diff --git a/apps/emqtt/src/emqtt_packet.erl b/apps/emqtt/src/emqtt_packet.erl index d5d86cc54..64ead9f8e 100644 --- a/apps/emqtt/src/emqtt_packet.erl +++ b/apps/emqtt/src/emqtt_packet.erl @@ -104,7 +104,7 @@ dump_variable(#mqtt_packet_connect{ will_flag = WillFlag, clean_sess = CleanSess, keep_alive = KeepAlive, - client_id = ClientId, + clientid = ClientId, will_topic = WillTopic, will_msg = WillMsg, username = Username, diff --git a/apps/emqtt/src/emqtt_parser.erl b/apps/emqtt/src/emqtt_parser.erl index 098942f42..defef9e6b 100644 --- a/apps/emqtt/src/emqtt_parser.erl +++ b/apps/emqtt/src/emqtt_parser.erl @@ -114,7 +114,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) will_flag = bool(WillFlag), clean_sess = bool(CleanSession), keep_alive = KeepAlive, - client_id = ClientId, + clientid = ClientId, will_topic = WillTopic, will_msg = WillMsg, username = UserName, diff --git a/apps/emqtt/src/emqtt_serialiser.erl b/apps/emqtt/src/emqtt_serialiser.erl index cc4b1f0d0..db729aee2 100644 --- a/apps/emqtt/src/emqtt_serialiser.erl +++ b/apps/emqtt/src/emqtt_serialiser.erl @@ -60,7 +60,7 @@ serialise_header(#mqtt_packet_header{type = Type, VariableBin/binary, PayloadBin/binary>>. -serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId, +serialise_variable(?CONNECT, #mqtt_packet_connect{clientid = ClientId, proto_ver = ProtoVer, proto_name = ProtoName, will_retain = WillRetain, diff --git a/apps/emqttd/include/emqttd.hrl b/apps/emqttd/include/emqttd.hrl index c96f59d5f..2d488bbdd 100644 --- a/apps/emqttd/include/emqttd.hrl +++ b/apps/emqttd/include/emqttd.hrl @@ -52,7 +52,7 @@ -type mqtt_topic() :: #mqtt_topic{}. %%------------------------------------------------------------------------------ -%% MQTT Topic Subscriber +%% MQTT Subscriber %%------------------------------------------------------------------------------ -record(mqtt_subscriber, { topic :: binary(), @@ -99,13 +99,19 @@ retain = false :: boolean(), dup = false :: boolean(), msgid :: mqtt_msgid(), - payload :: binary()}). + payload :: binary() +}). -type mqtt_message() :: #mqtt_message{}. %%------------------------------------------------------------------------------ %% MQTT Plugin %%------------------------------------------------------------------------------ +-record(mqtt_plugin, { + name, + version, + attrs, + description +}). --record(mqtt_plugin, {name, version, attrs, description}). diff --git a/apps/emqttd/src/emqttd_access_rule.erl b/apps/emqttd/src/emqttd_access_rule.erl index ac226a54e..46524691e 100644 --- a/apps/emqttd/src/emqttd_access_rule.erl +++ b/apps/emqttd/src/emqttd_access_rule.erl @@ -94,29 +94,29 @@ bin(B) when is_binary(B) -> %% %% @end %%%----------------------------------------------------------------------------- --spec match(mqtt_user(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch. -match(_User, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> +-spec match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch. +match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> {matched, AllowDeny}; -match(User, Topic, {AllowDeny, Who, _PubSub, TopicFilters}) +match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> - case match_who(User, Who) andalso match_topics(User, Topic, TopicFilters) of + case match_who(Client, Who) andalso match_topics(Client, Topic, TopicFilters) of true -> {matched, AllowDeny}; false -> nomatch end. -match_who(_User, all) -> +match_who(_Client, all) -> true; -match_who(_User, {user, all}) -> +match_who(_Client, {user, all}) -> true; -match_who(_User, {client, all}) -> +match_who(_Client, {client, all}) -> true; -match_who(#mqtt_user{clientid = ClientId}, {client, ClientId}) -> +match_who(#mqtt_client{clientid = ClientId}, {client, ClientId}) -> true; -match_who(#mqtt_user{username = Username}, {user, Username}) -> +match_who(#mqtt_client{username = Username}, {user, Username}) -> true; -match_who(#mqtt_user{ipaddr = undefined}, {ipaddr, _Tup}) -> +match_who(#mqtt_client{ipaddr = undefined}, {ipaddr, _Tup}) -> false; -match_who(#mqtt_user{ipaddr = IP}, {ipaddr, {_CDIR, Start, End}}) -> +match_who(#mqtt_client{ipaddr = IP}, {ipaddr, {_CDIR, Start, End}}) -> I = esockd_access:atoi(IP), I >= Start andalso I =< End; match_who(_User, _Who) -> @@ -143,14 +143,15 @@ feed_var(User, Pattern) -> feed_var(User, Pattern, []). feed_var(_User, [], Acc) -> lists:reverse(Acc); -feed_var(User = #mqtt_user{clientid = undefined}, [<<"$c">>|Words], Acc) -> - feed_var(User, Words, [<<"$c">>|Acc]); -feed_var(User = #mqtt_user{clientid = ClientId}, [<<"$c">>|Words], Acc) -> - feed_var(User, Words, [ClientId |Acc]); -feed_var(User = #mqtt_user{username = undefined}, [<<"$u">>|Words], Acc) -> - feed_var(User, Words, [<<"$u">>|Acc]); -feed_var(User = #mqtt_user{username = Username}, [<<"$u">>|Words], Acc) -> - feed_var(User, Words, [Username|Acc]); -feed_var(User, [W|Words], Acc) -> - feed_var(User, Words, [W|Acc]). +feed_var(Client = #mqtt_client{clientid = undefined}, [<<"$c">>|Words], Acc) -> + feed_var(Client, Words, [<<"$c">>|Acc]); +feed_var(Client = #mqtt_client{clientid = ClientId}, [<<"$c">>|Words], Acc) -> + feed_var(Client, Words, [ClientId |Acc]); +feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) -> + feed_var(Client, Words, [<<"$u">>|Acc]); +feed_var(Client = #mqtt_client{username = Username}, [<<"$u">>|Words], Acc) -> + feed_var(Client, Words, [Username|Acc]); +feed_var(Client, [W|Words], Acc) -> + feed_var(Client, Words, [W|Acc]). + diff --git a/apps/emqttd/src/emqttd_acl.erl b/apps/emqttd/src/emqttd_acl.erl index 399f3d9d2..4e0d3540e 100644 --- a/apps/emqttd/src/emqttd_acl.erl +++ b/apps/emqttd/src/emqttd_acl.erl @@ -83,25 +83,25 @@ start_link(AclMods) -> gen_server:start_link({local, ?SERVER}, ?MODULE, [AclMods], []). %% @doc Check ACL. --spec check({User, PubSub, Topic}) -> allow | deny when - User :: mqtt_user(), +-spec check({Client, PubSub, Topic}) -> allow | deny when + Client :: mqtt_client(), PubSub :: pubsub(), Topic :: binary(). -check({User, PubSub, Topic}) when PubSub =:= publish orelse PubSub =:= subscribe -> +check({Client, PubSub, Topic}) when PubSub =:= publish orelse PubSub =:= subscribe -> case ets:lookup(?ACL_TABLE, acl_modules) of [] -> allow; - [{_, AclMods}] -> check({User, PubSub, Topic}, AclMods) + [{_, AclMods}] -> check({Client, PubSub, Topic}, AclMods) end. -check({#mqtt_user{clientid = ClientId}, PubSub, Topic}, []) -> +check({#mqtt_client{clientid = ClientId}, PubSub, Topic}, []) -> lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]), allow; -check({User, PubSub, Topic}, [{M, State}|AclMods]) -> - case M:check_acl({User, PubSub, Topic}, State) of +check({Client, PubSub, Topic}, [{M, State}|AclMods]) -> + case M:check_acl({Client, PubSub, Topic}, State) of allow -> allow; deny -> deny; - ignore -> check({User, PubSub, Topic}, AclMods) + ignore -> check({Client, PubSub, Topic}, AclMods) end. %% @doc Reload ACL. diff --git a/apps/emqttd/src/emqttd_acl_internal.erl b/apps/emqttd/src/emqttd_acl_internal.erl index 106345bb1..8863248b1 100644 --- a/apps/emqttd/src/emqttd_acl_internal.erl +++ b/apps/emqttd/src/emqttd_acl_internal.erl @@ -90,13 +90,13 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) -> false. %% @doc Check ACL. --spec check_acl({User, PubSub, Topic}, State) -> allow | deny | ignore when - User :: mqtt_user(), +-spec check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when + Client :: mqtt_client(), PubSub :: pubsub(), Topic :: binary(), State :: #state{}. -check_acl({User, PubSub, Topic}, #state{nomatch = Default}) -> - case match(User, Topic, lookup(PubSub)) of +check_acl({Client, PubSub, Topic}, #state{nomatch = Default}) -> + case match(Client, Topic, lookup(PubSub)) of {matched, allow} -> allow; {matched, deny} -> deny; nomatch -> Default @@ -108,12 +108,12 @@ lookup(PubSub) -> [{PubSub, Rules}] -> Rules end. -match(_User, _Topic, []) -> +match(_Client, _Topic, []) -> nomatch; -match(User, Topic, [Rule|Rules]) -> - case emqttd_access_rule:match(User, Topic, Rule) of - nomatch -> match(User, Topic, Rules); +match(Client, Topic, [Rule|Rules]) -> + case emqttd_access_rule:match(Client, Topic, Rule) of + nomatch -> match(Client, Topic, Rules); {matched, AllowDeny} -> {matched, AllowDeny} end. diff --git a/apps/emqttd/src/emqttd_auth_clientid.erl b/apps/emqttd/src/emqttd_auth_clientid.erl index 4f45d02b2..bde3ce398 100644 --- a/apps/emqttd/src/emqttd_auth_clientid.erl +++ b/apps/emqttd/src/emqttd_auth_clientid.erl @@ -71,15 +71,15 @@ init(Opts) -> end, {ok, Opts}. -check(#mqtt_user{clientid = undefined}, _Password, []) -> +check(#mqtt_client{clientid = undefined}, _Password, []) -> {error, "ClientId undefined"}; -check(#mqtt_user{clientid = ClientId, ipaddr = IpAddr}, _Password, []) -> +check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, []) -> check_clientid_only(ClientId, IpAddr); -check(#mqtt_user{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, no}|_]) -> +check(#mqtt_client{clientid = ClientId, ipaddr = IpAddr}, _Password, [{password, no}|_]) -> check_clientid_only(ClientId, IpAddr); -check(_User, undefined, [{password, yes}|_]) -> +check(_Client, undefined, [{password, yes}|_]) -> {error, "Password undefined"}; -check(#mqtt_user{clientid = ClientId}, Password, [{password, yes}|_]) -> +check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) -> case mnesia:dirty_read(?AUTH_CLIENTID_TABLE, ClientId) of [] -> {error, "ClientId Not Found"}; [#?AUTH_CLIENTID_TABLE{password = Password}] -> ok; %% TODO: plaintext?? diff --git a/apps/emqttd/src/emqttd_auth_username.erl b/apps/emqttd/src/emqttd_auth_username.erl index 187578b66..884c66bc4 100644 --- a/apps/emqttd/src/emqttd_auth_username.erl +++ b/apps/emqttd/src/emqttd_auth_username.erl @@ -67,11 +67,11 @@ init(Opts) -> mnesia:add_table_copy(?AUTH_USERNAME_TABLE, node(), ram_copies), {ok, Opts}. -check(#mqtt_user{username = undefined}, _Password, _Opts) -> +check(#mqtt_client{username = undefined}, _Password, _Opts) -> {error, "Username undefined"}; check(_User, undefined, _Opts) -> {error, "Password undefined"}; -check(#mqtt_user{username = Username}, Password, _Opts) -> +check(#mqtt_client{username = Username}, Password, _Opts) -> case mnesia:dirty_read(?AUTH_USERNAME_TABLE, Username) of [] -> {error, "Username Not Found"}; diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index 946928e30..21cfea37b 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -32,6 +32,8 @@ -include_lib("emqtt/include/emqtt_packet.hrl"). +-include("emqttd.hrl"). + %% API Function Exports -export([start_link/3]). diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index b1f6c837e..5b40edd18 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -103,7 +103,7 @@ handle_info({stop, duplicate_id, _NewPid}, State=#state{proto_state = ProtoState %% need transfer data??? %% emqttd_client:transfer(NewPid, Data), lager:error("Shutdown for duplicate clientid: ~s, conn:~s", - [emqttd_protocol:client_id(ProtoState), ConnName]), + [emqttd_protocol:clientid(ProtoState), ConnName]), stop({shutdown, duplicate_id}, State); %%TODO: ok?? @@ -255,7 +255,7 @@ inc(_) -> notify(disconnected, _Reason, undefined) -> ingore; notify(disconnected, {shutdown, Reason}, ProtoState) -> - emqttd_event:notify({disconnected, emqttd_protocol:client_id(ProtoState), [{reason, Reason}]}); + emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), [{reason, Reason}]}); notify(disconnected, Reason, ProtoState) -> - emqttd_event:notify({disconnected, emqttd_protocol:client_id(ProtoState), [{reason, Reason}]}). + emqttd_event:notify({disconnected, emqttd_protocol:clientid(ProtoState), [{reason, Reason}]}). diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index f4da0d8b3..a3d4c70a2 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -28,6 +28,8 @@ -author('feng@emqtt.io'). +-include_lib("emqtt/include/emqtt_packet.hrl"). + -include("emqttd.hrl"). -import(proplists, [get_value/2, get_value/3]). diff --git a/apps/emqttd/src/emqttd_mnesia.erl b/apps/emqttd/src/emqttd_mnesia.erl index 890210292..a06cd1b64 100644 --- a/apps/emqttd/src/emqttd_mnesia.erl +++ b/apps/emqttd/src/emqttd_mnesia.erl @@ -91,10 +91,7 @@ create_tables() -> ok = emqttd_trie:mnesia(create), ok = emqttd_pubsub:mnesia(create), %% TODO: retained messages, this table should not be copied... - ok = create_table(message_retained, [ - {type, ordered_set}, - {ram_copies, [node()]}, - {attributes, record_info(fields, message_retained)}]). + ok = emqttd_retained:mnesia(create). create_table(Table, Attrs) -> case mnesia:create_table(Table, Attrs) of @@ -111,9 +108,9 @@ create_table(Table, Attrs) -> %% @end %%------------------------------------------------------------------------------ copy_tables() -> - ok = emqttd_trie:mnesia(create), - ok = emqttd_pubsub:mnesia(create), - ok = copy_table(message_retained). + ok = emqttd_trie:mnesia(replicate), + ok = emqttd_pubsub:mnesia(replicate), + ok = emqttd_retained:mnesia(replicate). copy_table(Table) -> case mnesia:add_table_copy(Table, node(), ram_copies) of diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 4cf33dd12..e76b4eaa5 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -31,7 +31,7 @@ -include_lib("emqtt/include/emqtt_packet.hrl"). %% API --export([init/2, client_id/1]). +-export([init/2, clientid/1]). -export([received/2, send/2, redeliver/2, shutdown/2]). @@ -47,7 +47,7 @@ proto_name, %packet_id, username, - client_id, + clientid, clean_sess, session, %% session state or session pid will_msg, @@ -63,17 +63,20 @@ init({Transport, Socket, Peername}, Opts) -> peername = Peername, max_clientid_len = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN)}. -client_id(#proto_state{client_id = ClientId}) -> ClientId. +clientid(#proto_state{clientid = ClientId}) -> ClientId. + +client(#proto_state{peername = {Addr, _Port}, clientid = ClientId, username = Username}) -> + #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}. %%SHOULD be registered in emqttd_cm info(#proto_state{proto_ver = ProtoVer, proto_name = ProtoName, - client_id = ClientId, + clientid = ClientId, clean_sess = CleanSess, will_msg = WillMsg}) -> [{proto_ver, ProtoVer}, {proto_name, ProtoName}, - {client_id, ClientId}, + {clientid, ClientId}, {clean_sess, CleanSess}, {will_msg, WillMsg}]. @@ -92,7 +95,7 @@ received(_Packet, State = #proto_state{connected = false}) -> {error, protocol_not_connected, State}; received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername, - client_id = ClientId}) -> + clientid = ClientId}) -> lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]), case validate_packet(Packet) of ok -> @@ -108,24 +111,24 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = password = Password, clean_sess = CleanSess, keep_alive = KeepAlive, - client_id = ClientId} = Var, + clientid = ClientId} = Var, lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]), State1 = State#proto_state{proto_ver = ProtoVer, username = Username, - client_id = ClientId, + clientid = ClientId, clean_sess = CleanSess}, {ReturnCode1, State2} = case validate_connect(Var, State) of ?CONNACK_ACCEPT -> - User = #mqtt_user{username = Username, ipaddr = Addr, clientid = ClientId}, - case emqttd_auth:login(User, Password) of + Client = #mqtt_client{clientid = ClientId, username = Username, ipaddr = Addr}, + case emqttd_auth:login(Client, Password) of ok -> - ClientId1 = clientid(ClientId, State), + ClientId1 = clientid(ClientId, State), start_keepalive(KeepAlive), emqttd_cm:register(ClientId1), - {?CONNACK_ACCEPT, State1#proto_state{client_id = ClientId1, + {?CONNACK_ACCEPT, State1#proto_state{clientid = ClientId1, will_msg = willmsg(Var)}}; {error, Reason}-> lager:error("~s@~s: username '~s' login failed - ~s", [ClientId, emqttd_net:format(Peername), Username, Reason]), @@ -142,8 +145,8 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername = {ok, State2#proto_state{session = Session}}; handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), - State = #proto_state{client_id = ClientId, session = Session}) -> - case emqttd_acl:check({mqtt_user(State), publish, Topic}) of + State = #proto_state{clientid = ClientId, session = Session}) -> + case emqttd_acl:check({client(State), publish, Topic}) of allow -> emqttd_session:publish(Session, {?QOS_0, emqttd_message:from_packet(Packet)}); deny -> @@ -152,8 +155,8 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), {ok, State}; handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), - State = #proto_state{client_id = ClientId, session = Session}) -> - case emqttd_acl:check({mqtt_user(State), publish, Topic}) of + State = #proto_state{clientid = ClientId, session = Session}) -> + case emqttd_acl:check({client(State), publish, Topic}) of allow -> emqttd_session:publish(Session, {?QOS_1, emqttd_message:from_packet(Packet)}), send(?PUBACK_PACKET(?PUBACK, PacketId), State); @@ -163,8 +166,8 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), end; handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), - State = #proto_state{client_id = ClientId, session = Session}) -> - case emqttd_acl:check({mqtt_user(State), publish, Topic}) of + State = #proto_state{clientid = ClientId, session = Session}) -> + case emqttd_acl:check({client(State), publish, Topic}) of allow -> NewSession = emqttd_session:publish(Session, {?QOS_2, emqttd_message:from_packet(Packet)}), send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); @@ -187,11 +190,12 @@ handle(?PUBACK_PACKET(Type, PacketId), State = #proto_state{session = Session}) end, {ok, NewState}; -handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) -> - AllowDenies = [emqttd_acl:check({mqtt_user(State), subscribe, Topic}) || {Topic, _Qos} <- TopicTable], +handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = ClientId, session = Session}) -> + AllowDenies = [emqttd_acl:check({client(State), subscribe, Topic}) || {Topic, _Qos} <- TopicTable], case lists:member(deny, AllowDenies) of true -> %%TODO: return 128 QoS when deny... + lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]), {ok, State}; false -> {ok, NewSession, GrantedQos} = emqttd_session:subscribe(Session, TopicTable), @@ -225,7 +229,7 @@ send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = {Message1, NewSession} = emqttd_session:store(Session, Message), send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession}); -send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, client_id = ClientId}) when is_record(Packet, mqtt_packet) -> +send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, clientid = ClientId}) when is_record(Packet, mqtt_packet) -> lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]), sent_stats(Packet), Data = emqttd_serialiser:serialise(Packet), @@ -238,7 +242,7 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername redeliver({?PUBREL, PacketId}, State) -> send(?PUBREL_PACKET(PacketId), State). -shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) -> +shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> send_willmsg(WillMsg), try_unregister(ClientId, self()), lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, emqttd_net:format(Peername), Error]), @@ -248,13 +252,10 @@ willmsg(Packet) when is_record(Packet, mqtt_packet_connect) -> emqttd_message:from_packet(Packet). clientid(<<>>, #proto_state{peername = Peername}) -> - <<"eMQTT/", (base64:encode(emqttd_net:format(Peername)))/binary>>; + <<"eMQTT_", (base64:encode(emqttd_net:format(Peername)))/binary>>; clientid(ClientId, _State) -> ClientId. -mqtt_user(#proto_state{peername = {Addr, _Port}, client_id = ClientId, username = Username}) -> - #mqtt_user{username = Username, clientid = ClientId, ipaddr = Addr}. - send_willmsg(undefined) -> ignore; %%TODO:should call session... send_willmsg(WillMsg) -> emqttd_router:route(WillMsg). @@ -282,16 +283,16 @@ validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) -> validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) -> lists:member({Ver, Name}, ?PROTOCOL_NAMES). -validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen}) +validate_clientid(#mqtt_packet_connect{clientid = ClientId}, #proto_state{max_clientid_len = MaxLen}) when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) -> true; %% MQTT3.1.1 allow null clientId. -validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState) +validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, clientid = ClientId}, _ProtoState) when size(ClientId) =:= 0 -> true; -validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) -> +validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, clientid = ClientId}, _ProtoState) -> lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]), false. @@ -353,7 +354,7 @@ inc(_) -> notify(connected, ReturnCode, #proto_state{peername = Peername, proto_ver = ProtoVer, - client_id = ClientId, + clientid = ClientId, clean_sess = CleanSess}) -> Sess = case CleanSess of true -> false; diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 082800939..f2533bb8a 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -128,7 +128,7 @@ subscribe(Topics = [{_Topic, _Qos}|_]) -> -spec subscribe(Topic :: binary(), Qos :: mqtt_qos()) -> {ok, Qos :: mqtt_qos()}. subscribe(Topic, Qos) when is_binary(Topic) andalso ?IS_QOS(Qos) -> TopicRecord = #mqtt_topic{topic = Topic, node = node()}, - Subscriber = #topic_subscriber{topic = Topic, qos = Qos, subpid = self()}, + Subscriber = #mqtt_subscriber{topic = Topic, qos = Qos, subpid = self()}, F = fun() -> case insert_topic(TopicRecord) of ok -> insert_subscriber(Subscriber); @@ -188,7 +188,7 @@ publish(Topic, Msg) when is_binary(Topic) -> %%------------------------------------------------------------------------------ -spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer(). dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) -> - case mnesia:dirty_read(topic_subscriber, Topic) of + case mnesia:dirty_read(subscriber, Topic) of [] -> %%TODO: not right when clusted... setstats(dropped); @@ -224,7 +224,7 @@ init([]) -> process_flag(priority, high), process_flag(min_heap_size, 1024*1024), mnesia:subscribe({table, topic, simple}), - mnesia:subscribe({table, topic_subscriber, simple}), + mnesia:subscribe({table, subscriber, simple}), {ok, #state{submap = maps:new()}}. handle_call(Req, _From, State) -> @@ -265,10 +265,10 @@ handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State = #state{submap = SubMa true -> Node = node(), F = fun() -> - Subscribers = mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid), - lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) -> - mnesia:delete_object(Sub), - try_remove_topic(#topic{name = Topic, node = Node}) + Subscribers = mnesia:index_read(subscriber, DownPid, #mqtt_subscriber.subpid), + lists:foreach(fun(Sub = #mqtt_subscriber{topic = Topic}) -> + mnesia:delete_object(subscriber, Sub, write), + try_remove_topic(#mqtt_topic{topic = Topic, node = Node}) end, Subscribers) end, NewState = @@ -292,7 +292,7 @@ handle_info(Info, State) -> terminate(_Reason, _State) -> mnesia:unsubscribe({table, topic, simple}), - mnesia:unsubscribe({table, topic_subscriber, simple}), + mnesia:unsubscribe({table, subscriber, simple}), %%TODO: clear topics belongs to this node??? ok. @@ -302,27 +302,27 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= %%% Internal functions %%%============================================================================= -insert_topic(Topic = #topic{name = Name}) -> - case mnesia:wread({topic, Name}) of +insert_topic(Record = #mqtt_topic{topic = Topic}) -> + case mnesia:wread({topic, Topic}) of [] -> - ok = emqttd_trie:insert(Name), - mnesia:write(Topic); - Topics -> - case lists:member(Topic, Topics) of + ok = emqttd_trie:insert(Topic), + mnesia:write(topic, Record, write); + Records -> + case lists:member(Record, Records) of true -> ok; - false -> mnesia:write(Topic) + false -> mnesia:write(topic, Record, write) end end. insert_subscriber(Subscriber) -> - mnesia:write(Subscriber). + mnesia:write(subscriber, Subscriber, write). -try_remove_topic(Topic = #topic{name = Name}) -> - case mnesia:read({topic_subscriber, Name}) of +try_remove_topic(Record = #mqtt_topic{topic = Topic}) -> + case mnesia:read({subscriber, Topic}) of [] -> - mnesia:delete_object(Topic), - case mnesia:read(topic, Name) of - [] -> emqttd_trie:delete(Name); + mnesia:delete_object(topic, Record, write), + case mnesia:read(topic, Topic) of + [] -> emqttd_trie:delete(Topic); _ -> ok end; _ -> @@ -335,7 +335,7 @@ setstats(topics) -> setstats(subscribers) -> emqttd_broker:setstats('subscribers/count', 'subscribers/max', - mnesia:table_info(topic_subscriber, size)); + mnesia:table_info(subscriber, size)); setstats(dropped) -> emqttd_metrics:inc('messages/dropped'). diff --git a/apps/emqttd/src/emqttd_retained.erl b/apps/emqttd/src/emqttd_retained.erl index 8bf03e915..c34bd0dde 100644 --- a/apps/emqttd/src/emqttd_retained.erl +++ b/apps/emqttd/src/emqttd_retained.erl @@ -22,6 +22,8 @@ %%% @doc %%% emqttd retained messages. %%% +%%% TODO: need to redesign later. +%%% %%% @end %%%----------------------------------------------------------------------------- -module(emqttd_retained). @@ -30,31 +32,45 @@ -include("emqttd.hrl"). --define(RETAINED_TABLE, message_retained). +%% Mnesia callbacks +-export([mnesia/1]). + +-mnesia_create({mnesia, [create]}). +-mnesia_replicate({mnesia, [replicate]}). %% API Function Exports -export([retain/1, redeliver/2]). +mnesia(create) -> + ok = emqtt_mnesia:create_table(message, [ + {type, ordered_set}, + {ram_copies, [node()]}, + {record_name, mqtt_message}, + {attributes, record_info(fields, mqtt_message)}]); +mnesia(replicate) -> + ok = emqtt_mnesia:copy_table(message). + + + %% @doc retain message. -spec retain(mqtt_message()) -> ok | ignore. retain(#mqtt_message{retain = false}) -> ignore; %% RETAIN flag set to 1 and payload containing zero bytes retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> - mnesia:async_dirty(fun mnesia:delete/1, [{?RETAINED_TABLE, Topic}]); + mnesia:async_dirty(fun mnesia:delete/1, [{message, Topic}]); -retain(Msg = #mqtt_message{retain = true, - topic = Topic, +retain(Msg = #mqtt_message{topic = Topic, + retain = true, qos = Qos, payload = Payload}) -> - TabSize = mnesia:table_info(?RETAINED_TABLE, size), + TabSize = mnesia:table_info(message, size), case {TabSize < limit(table), size(Payload) < limit(payload)} of {true, true} -> lager:debug("Retained: store message: ~p", [Msg]), - RetainedMsg = #message_retained{topic = Topic, qos = Qos, payload = Payload}, - mnesia:async_dirty(fun mnesia:write/1, [RetainedMsg]), + mnesia:async_dirty(fun mnesia:write/3, [message, Msg, write]), emqttd_metrics:set('messages/retained/count', - mnesia:table_info(?RETAINED_TABLE, size)); + mnesia:table_info(message, size)); {false, _}-> lager:error("Dropped retained message(topic=~s) for table is full!", [Topic]); {_, false}-> @@ -83,26 +99,23 @@ redeliver(Topics, CPid) when is_pid(CPid) -> lists:foreach(fun(Topic) -> case emqtt_topic:wildcard(Topic) of false -> - dispatch(CPid, mnesia:dirty_read(message_retained, Topic)); + dispatch(CPid, mnesia:dirty_read(message, Topic)); true -> - Fun = fun(Msg = #message_retained{topic = Name}, Acc) -> + Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) -> case emqtt_topic:match(Name, Topic) of true -> [Msg|Acc]; false -> Acc end end, - RetainedMsgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], ?RETAINED_TABLE]), - dispatch(CPid, lists:reverse(RetainedMsgs)) + Msgs = mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], message]), + dispatch(CPid, lists:reverse(Msgs)) end end, Topics). dispatch(_CPid, []) -> ignore; -dispatch(CPid, RetainedMsgs) when is_list(RetainedMsgs) -> - CPid ! {dispatch, {self(), [mqtt_msg(Msg) || Msg <- RetainedMsgs]}}; -dispatch(CPid, RetainedMsg) when is_record(RetainedMsg, message_retained) -> - CPid ! {dispatch, {self(), mqtt_msg(RetainedMsg)}}. - -mqtt_msg(#message_retained{topic = Topic, qos = Qos, payload = Payload}) -> - #mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}. +dispatch(CPid, Msgs) when is_list(Msgs) -> + CPid ! {dispatch, {self(), [Msg || Msg <- Msgs]}}; +dispatch(CPid, Msg) when is_record(Msg, mqtt_message) -> + CPid ! {dispatch, {self(), Msg}}. diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 757f96a21..5547faf21 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -28,6 +28,8 @@ -include("emqttd.hrl"). +-include_lib("emqtt/include/emqtt_packet.hrl"). + %% API Function Exports -export([start/1, resume/3, @@ -47,7 +49,7 @@ terminate/2, code_change/3]). -record(session_state, { - client_id :: binary(), + clientid :: binary(), client_pid :: pid(), message_id = 1, submap :: map(), @@ -122,7 +124,7 @@ publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) -> %% @end %%------------------------------------------------------------------------------ -spec puback(session(), {mqtt_packet_type(), mqtt_packet_id()}) -> session(). -puback(SessState = #session_state{client_id = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> +puback(SessState = #session_state{clientid = ClientId, awaiting_ack = Awaiting}, {?PUBACK, PacketId}) -> case maps:is_key(PacketId, Awaiting) of true -> ok; false -> lager:warning("Session ~s: PUBACK PacketId '~p' not found!", [ClientId, PacketId]) @@ -132,7 +134,7 @@ puback(SessPid, {?PUBACK, PacketId}) when is_pid(SessPid) -> gen_server:cast(SessPid, {puback, PacketId}), SessPid; %% PUBREC -puback(SessState = #session_state{client_id = ClientId, +puback(SessState = #session_state{clientid = ClientId, awaiting_ack = AwaitingAck, awaiting_comp = AwaitingComp}, {?PUBREC, PacketId}) -> case maps:is_key(PacketId, AwaitingAck) of @@ -146,7 +148,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> gen_server:cast(SessPid, {pubrec, PacketId}), SessPid; %% PUBREL -puback(SessState = #session_state{client_id = ClientId, +puback(SessState = #session_state{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> case maps:find(PacketId, Awaiting) of {ok, Msg} -> emqttd_router:route(Msg); @@ -158,7 +160,7 @@ puback(SessPid, {?PUBREL, PacketId}) when is_pid(SessPid) -> gen_server:cast(SessPid, {pubrel, PacketId}), SessPid; %% PUBCOMP -puback(SessState = #session_state{client_id = ClientId, +puback(SessState = #session_state{clientid = ClientId, awaiting_comp = AwaitingComp}, {?PUBCOMP, PacketId}) -> case maps:is_key(PacketId, AwaitingComp) of true -> ok; @@ -176,7 +178,7 @@ puback(SessPid, {?PUBCOMP, PacketId}) when is_pid(SessPid) -> %% @end %%------------------------------------------------------------------------------ -spec subscribe(session(), [{binary(), mqtt_qos()}]) -> {ok, session(), [mqtt_qos()]}. -subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) -> +subscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) -> Resubs = [Topic || {Name, _Qos} = Topic <- Topics, maps:is_key(Name, SubMap)], case Resubs of [] -> ok; @@ -199,7 +201,7 @@ subscribe(SessPid, Topics) when is_pid(SessPid) -> %% @end %%------------------------------------------------------------------------------ -spec unsubscribe(session(), [binary()]) -> {ok, session()}. -unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Topics) -> +unsubscribe(SessState = #session_state{clientid = ClientId, submap = SubMap}, Topics) -> %%TODO: refactor later. case Topics -- maps:keys(SubMap) of [] -> ok; @@ -238,7 +240,7 @@ store(SessState = #session_state{message_id = MsgId, awaiting_ack = Awaiting}, {Message1, next_msg_id(SessState#session_state{awaiting_ack = Awaiting1})}. initial_state(ClientId) -> - #session_state{client_id = ClientId, + #session_state{clientid = ClientId, submap = #{}, awaiting_ack = #{}, awaiting_rel = #{}, @@ -278,12 +280,12 @@ handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. handle_cast({resume, ClientId, ClientPid}, State = #session_state{ - client_id = ClientId, - client_pid = undefined, - msg_queue = Queue, - awaiting_ack = AwaitingAck, - awaiting_comp = AwaitingComp, - expire_timer = ETimer}) -> + clientid = ClientId, + client_pid = undefined, + msg_queue = Queue, + awaiting_ack = AwaitingAck, + awaiting_comp = AwaitingComp, + expire_timer = ETimer}) -> lager:info("Session ~s resumed by ~p", [ClientId, ClientPid]), %cancel timeout timer erlang:cancel_timer(ETimer), @@ -328,7 +330,7 @@ handle_cast({pubcomp, PacketId}, State) -> NewState = puback(State, {?PUBCOMP, PacketId}), {noreply, NewState}; -handle_cast({destroy, ClientId}, State = #session_state{client_id = ClientId}) -> +handle_cast({destroy, ClientId}, State = #session_state{clientid = ClientId}) -> lager:warning("Session ~s destroyed", [ClientId]), {stop, normal, State}; @@ -342,14 +344,14 @@ handle_info({dispatch, {_From, Messages}}, State) when is_list(Messages) -> handle_info({dispatch, {_From, Message}}, State) -> {noreply, dispatch(Message, State)}; -handle_info({'EXIT', ClientPid, Reason}, State = #session_state{client_id = ClientId, +handle_info({'EXIT', ClientPid, Reason}, State = #session_state{clientid = ClientId, client_pid = ClientPid, expires = Expires}) -> lager:warning("Session: client ~s@~p exited, caused by ~p", [ClientId, ClientPid, Reason]), Timer = erlang:send_after(Expires * 1000, self(), session_expired), {noreply, State#session_state{client_pid = undefined, expire_timer = Timer}}; -handle_info(session_expired, State = #session_state{client_id = ClientId}) -> +handle_info(session_expired, State = #session_state{clientid = ClientId}) -> lager:warning("Session ~s expired!", [ClientId]), {stop, {shutdown, expired}, State}; @@ -366,7 +368,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -dispatch(Message, State = #session_state{client_id = ClientId, +dispatch(Message, State = #session_state{clientid = ClientId, client_pid = undefined}) -> queue(ClientId, Message, State);