diff --git a/include/emqttd.hrl b/include/emqttd.hrl index b10fc0475..1e5ed2d3a 100644 --- a/include/emqttd.hrl +++ b/include/emqttd.hrl @@ -28,9 +28,15 @@ -define(ERTS_MINIMUM, "7.0"). --define(SYSTOP, <<"$SYS">>). %% System Topic --define(QUEUE, <<"$queue">>). %% Queue --define(SHARE, <<"$share">>). %% Shared +%%-------------------------------------------------------------------- +%% Sys/Queue/Share Topics' Prefix +%%-------------------------------------------------------------------- + +-define(SYSTOP, <<"$SYS/">>). %% System Topic + +-define(QUEUE, <<"$queue/">>). %% Queue Topic + +-define(SHARE, <<"$share/">>). %% Shared Topic %%-------------------------------------------------------------------- %% PubSub @@ -38,7 +44,7 @@ -type pubsub() :: publish | subscribe. --define(IS_PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)). +-define(PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)). %%-------------------------------------------------------------------- %% MQTT Topic @@ -69,24 +75,24 @@ qos = 0 :: 0 | 1 | 2 }). --type mqtt_subscription() :: #mqtt_subscription{}. +-type(mqtt_subscription() :: #mqtt_subscription{}). %% {<<"a/b/c">>, '$queue', <<"client1">>} %% {<<"a/b/c">>, undefined, <0.31.0>} %% {<<"a/b/c">>, <<"group1">>, <<"client2">>} --record(mqtt_subscription, {topic, share, destination :: pid() | binary()}). +%% -record(mqtt_subscription, {topic, share, destination :: pid() | binary()}). %%-------------------------------------------------------------------- -%<<"group1">>, <<"client2">>}% MQTT Credential +%% MQTT Credential %%-------------------------------------------------------------------- -record(mqtt_credential, { clientid :: binary() | undefined, %% ClientId username :: binary() | undefined, %% Username - cookie :: binary() | undefined, - token :: binary() | undefined + token :: binary() | undefined, + cookie :: binary() | undefined }). --type(mqtt_credential() || #mqtt_credential{}). +-type(mqtt_credential() :: #mqtt_credential{}). %%-------------------------------------------------------------------- %% MQTT Client @@ -108,7 +114,7 @@ connected_at :: erlang:timestamp() }). --type mqtt_client() :: #mqtt_client{}. +-type(mqtt_client() :: #mqtt_client{}). %%-------------------------------------------------------------------- %% MQTT Session @@ -119,26 +125,29 @@ persistent :: boolean() }). --type mqtt_session() :: #mqtt_session{}. +-type(mqtt_session() :: #mqtt_session{}). %%-------------------------------------------------------------------- %% MQTT Message %%-------------------------------------------------------------------- --type mqtt_msgid() :: binary() | undefined. --type mqtt_pktid() :: 1..16#ffff | undefined. +-type(mqtt_msgid() :: binary() | undefined). +-type(mqtt_pktid() :: 1..16#ffff | undefined). -record(mqtt_message, { - msgid :: mqtt_msgid(), %% Global unique message ID - pktid :: mqtt_pktid(), %% PacketId - topic :: binary(), %% Topic that the message is published to - qos = 0 :: 0 | 1 | 2, %% Message QoS - flags = [] :: [retain | dup | sys], %% Message Flags - retain = false :: boolean(), %% Retain flag - dup = false :: boolean(), %% Dup flag - sys = false :: boolean(), %% $SYS flag - payload :: binary(), %% Payload - timestamp :: erlang:timestamp(), %% os:timestamp - extra = [] :: list() + msgid :: mqtt_msgid(), %% Global unique message ID + pktid :: mqtt_pktid(), %% PacketId + topic :: binary(), %% Topic that the message is published to + sender :: pid(), %% Pid of the sender/publisher + from, + credential :: mqtt_credential(), %% Credential of the sender/publisher + qos = 0 :: 0 | 1 | 2, %% Message QoS + flags = [] :: [retain | dup | sys], %% Message Flags + retain = false :: boolean(), %% Retain flag + dup = false :: boolean(), %% Dup flag + sys = false :: boolean(), %% $SYS flag + payload :: binary(), %% Payload + timestamp :: erlang:timestamp(), %% os:timestamp + extra = [] :: list() }). -type(mqtt_message() :: #mqtt_message{}). @@ -147,10 +156,9 @@ %% MQTT Delivery %%-------------------------------------------------------------------- -record(mqtt_delivery, { - sender :: pid(), %% Pid of the sender/publisher - credential :: mqtt_credential(), %% Credential of the sender/publisher - message :: mqtt_message(), %% Message - flow_through :: [node()] + message :: mqtt_message(), %% Message + dispatched = [] :: list(), + flow_through :: [node()] }). %%-------------------------------------------------------------------- @@ -164,7 +172,7 @@ timestamp :: erlang:timestamp() %% Timestamp }). --type mqtt_alarm() :: #mqtt_alarm{}. +-type(mqtt_alarm() :: #mqtt_alarm{}). %%-------------------------------------------------------------------- %% MQTT Plugin @@ -176,7 +184,7 @@ active = false }). --type mqtt_plugin() :: #mqtt_plugin{}. +-type(mqtt_plugin() :: #mqtt_plugin{}). %%-------------------------------------------------------------------- %% MQTT CLI Command @@ -191,5 +199,5 @@ descr }). --type mqtt_cli() :: #mqtt_cli{}. +-type(mqtt_cli() :: #mqtt_cli{}). diff --git a/src/emqttd.erl b/src/emqttd.erl index c5b697105..70b1459a3 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -26,6 +26,9 @@ -export([create/2, lookup/2, publish/1, subscribe/1, subscribe/3, unsubscribe/1, unsubscribe/3]). +%% Route and Forward API +%% -export([route/2, forward/2]). + %% Hooks API -export([hook/4, hook/3, unhook/2, run_hooks/3]). diff --git a/src/emqttd_access_control.erl b/src/emqttd_access_control.erl index 079bdbec3..c2c8fb9b3 100644 --- a/src/emqttd_access_control.erl +++ b/src/emqttd_access_control.erl @@ -70,7 +70,7 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) -> Client :: mqtt_client(), PubSub :: pubsub(), Topic :: binary()). -check_acl(Client, PubSub, Topic) when ?IS_PUBSUB(PubSub) -> +check_acl(Client, PubSub, Topic) when ?PUBSUB(PubSub) -> case lookup_mods(acl) of [] -> allow; AclMods -> check_acl(Client, PubSub, Topic, AclMods) diff --git a/src/emqttd_backend.erl b/src/emqttd_backend.erl deleted file mode 100644 index ead4c59d5..000000000 --- a/src/emqttd_backend.erl +++ /dev/null @@ -1,153 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_backend). - --include("emqttd.hrl"). - --include_lib("stdlib/include/ms_transform.hrl"). - -%% Mnesia Callbacks --export([mnesia/1]). - --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). - -%% Retained Message API --export([retain_message/1, read_messages/1, match_messages/1, delete_message/1, - expire_messages/1, retained_count/0]). - -%% Static Subscription API --export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1, - del_subscription/2]). - --record(retained_message, {topic, msg}). - -%%-------------------------------------------------------------------- -%% Mnesia callbacks -%%-------------------------------------------------------------------- - -mnesia(boot) -> - ok = emqttd_mnesia:create_table(retained_message, [ - {type, ordered_set}, - {disc_copies, [node()]}, - {record_name, retained_message}, - {attributes, record_info(fields, retained_message)}, - {storage_properties, [{ets, [compressed]}, - {dets, [{auto_save, 1000}]}]}]), - ok = emqttd_mnesia:create_table(backend_subscription, [ - {type, bag}, - {disc_copies, [node()]}, - {record_name, mqtt_subscription}, - {attributes, record_info(fields, mqtt_subscription)}, - {storage_properties, [{ets, [compressed]}, - {dets, [{auto_save, 5000}]}]}]); - -mnesia(copy) -> - ok = emqttd_mnesia:copy_table(retained_message), - ok = emqttd_mnesia:copy_table(backend_subscription). - -%%-------------------------------------------------------------------- -%% Retained Message -%%-------------------------------------------------------------------- - --spec(retain_message(mqtt_message()) -> ok). -retain_message(Msg = #mqtt_message{topic = Topic}) -> - mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}). - --spec(read_messages(binary()) -> [mqtt_message()]). -read_messages(Topic) -> - [Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)]. - --spec(match_messages(binary()) -> [mqtt_message()]). -match_messages(Filter) -> - %% TODO: optimize later... - Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) -> - case emqttd_topic:match(Name, Filter) of - true -> [Msg|Acc]; - false -> Acc - end - end, - mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]). - --spec(delete_message(binary()) -> ok). -delete_message(Topic) -> - mnesia:dirty_delete(retained_message, Topic). - --spec(expire_messages(pos_integer()) -> any()). -expire_messages(Time) when is_integer(Time) -> - mnesia:transaction( - fun() -> - Match = ets:fun2ms( - fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}}) - when Time > (MegaSecs * 1000000 + Secs) -> Topic - end), - Topics = mnesia:select(retained_message, Match, write), - lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages - (Topic) -> mnesia:delete({retained_message, Topic}) - end, Topics) - end). - --spec(retained_count() -> non_neg_integer()). -retained_count() -> - mnesia:table_info(retained_message, size). - -%%-------------------------------------------------------------------- -%% Static Subscriptions -%%-------------------------------------------------------------------- - -%% @doc Add a static subscription manually. --spec(add_subscription(mqtt_subscription()) -> ok | {error, already_existed}). -add_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) -> - Pattern = match_pattern(SubId, Topic), - return(mnesia:transaction(fun() -> - case mnesia:match_object(backend_subscription, Pattern, write) of - [] -> - mnesia:write(backend_subscription, Subscription, write); - [Subscription] -> - mnesia:abort(already_existed); - [Subscription1] -> %% QoS is different - mnesia:delete_object(backend_subscription, Subscription1, write), - mnesia:write(backend_subscription, Subscription, write) - end - end)). - -%% @doc Lookup static subscriptions. --spec(lookup_subscriptions(binary()) -> list(mqtt_subscription())). -lookup_subscriptions(ClientId) when is_binary(ClientId) -> - mnesia:dirty_read(backend_subscription, ClientId). - -%% @doc Delete static subscriptions by ClientId manually. --spec(del_subscriptions(binary()) -> ok). -del_subscriptions(ClientId) when is_binary(ClientId) -> - return(mnesia:transaction(fun mnesia:delete/1, [{backend_subscription, ClientId}])). - -%% @doc Delete a static subscription manually. --spec(del_subscription(binary(), binary()) -> ok). -del_subscription(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) -> - return(mnesia:transaction(fun del_subscription_/1, [match_pattern(ClientId, Topic)])). - -del_subscription_(Pattern) -> - lists:foreach(fun(Subscription) -> - mnesia:delete_object(backend_subscription, Subscription, write) - end, mnesia:match_object(backend_subscription, Pattern, write)). - -match_pattern(SubId, Topic) -> - #mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}. - -return({atomic, ok}) -> ok; -return({aborted, Reason}) -> {error, Reason}. - diff --git a/src/emqttd_mod_subscription.erl b/src/emqttd_mod_subscription.erl index c23ab6848..a545dcfaf 100644 --- a/src/emqttd_mod_subscription.erl +++ b/src/emqttd_mod_subscription.erl @@ -25,32 +25,22 @@ -export([load/1, on_client_connected/3, unload/1]). --record(state, {topics, backend = false}). - load(Opts) -> Topics = [{iolist_to_binary(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)], - State = #state{topics = Topics, backend = lists:member(backend, Opts)}, - emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [State]). + emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]). on_client_connected(?CONNACK_ACCEPT, Client = #mqtt_client{client_id = ClientId, client_pid = ClientPid, - username = Username}, - #state{topics = Topics, backend = Backend}) -> + username = Username}, Topics) -> Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end, - TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- with_backend(Backend, ClientId, Topics)], + TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics], emqttd_client:subscribe(ClientPid, TopicTable), {ok, Client}; on_client_connected(_ConnAck, _Client, _State) -> ok. -with_backend(false, _ClientId, TopicTable) -> - TopicTable; -with_backend(true, ClientId, TopicTable) -> - Fun = fun(#mqtt_subscription{topic = Topic, qos = Qos}) -> {Topic, Qos} end, - emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_backend:lookup_subscriptions(ClientId)], TopicTable). - unload(_Opts) -> emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3).