rm emqttd_backend.erl

This commit is contained in:
Feng 2016-08-01 15:07:38 +08:00
parent 4d29b2e486
commit 285421f073
5 changed files with 47 additions and 199 deletions

View File

@ -28,9 +28,15 @@
-define(ERTS_MINIMUM, "7.0").
-define(SYSTOP, <<"$SYS">>). %% System Topic
-define(QUEUE, <<"$queue">>). %% Queue
-define(SHARE, <<"$share">>). %% Shared
%%--------------------------------------------------------------------
%% Sys/Queue/Share Topics' Prefix
%%--------------------------------------------------------------------
-define(SYSTOP, <<"$SYS/">>). %% System Topic
-define(QUEUE, <<"$queue/">>). %% Queue Topic
-define(SHARE, <<"$share/">>). %% Shared Topic
%%--------------------------------------------------------------------
%% PubSub
@ -38,7 +44,7 @@
-type pubsub() :: publish | subscribe.
-define(IS_PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)).
-define(PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)).
%%--------------------------------------------------------------------
%% MQTT Topic
@ -69,24 +75,24 @@
qos = 0 :: 0 | 1 | 2
}).
-type mqtt_subscription() :: #mqtt_subscription{}.
-type(mqtt_subscription() :: #mqtt_subscription{}).
%% {<<"a/b/c">>, '$queue', <<"client1">>}
%% {<<"a/b/c">>, undefined, <0.31.0>}
%% {<<"a/b/c">>, <<"group1">>, <<"client2">>}
-record(mqtt_subscription, {topic, share, destination :: pid() | binary()}).
%% -record(mqtt_subscription, {topic, share, destination :: pid() | binary()}).
%%--------------------------------------------------------------------
%<<"group1">>, <<"client2">>}% MQTT Credential
%% MQTT Credential
%%--------------------------------------------------------------------
-record(mqtt_credential, {
clientid :: binary() | undefined, %% ClientId
username :: binary() | undefined, %% Username
cookie :: binary() | undefined,
token :: binary() | undefined
token :: binary() | undefined,
cookie :: binary() | undefined
}).
-type(mqtt_credential() || #mqtt_credential{}).
-type(mqtt_credential() :: #mqtt_credential{}).
%%--------------------------------------------------------------------
%% MQTT Client
@ -108,7 +114,7 @@
connected_at :: erlang:timestamp()
}).
-type mqtt_client() :: #mqtt_client{}.
-type(mqtt_client() :: #mqtt_client{}).
%%--------------------------------------------------------------------
%% MQTT Session
@ -119,26 +125,29 @@
persistent :: boolean()
}).
-type mqtt_session() :: #mqtt_session{}.
-type(mqtt_session() :: #mqtt_session{}).
%%--------------------------------------------------------------------
%% MQTT Message
%%--------------------------------------------------------------------
-type mqtt_msgid() :: binary() | undefined.
-type mqtt_pktid() :: 1..16#ffff | undefined.
-type(mqtt_msgid() :: binary() | undefined).
-type(mqtt_pktid() :: 1..16#ffff | undefined).
-record(mqtt_message, {
msgid :: mqtt_msgid(), %% Global unique message ID
pktid :: mqtt_pktid(), %% PacketId
topic :: binary(), %% Topic that the message is published to
qos = 0 :: 0 | 1 | 2, %% Message QoS
flags = [] :: [retain | dup | sys], %% Message Flags
retain = false :: boolean(), %% Retain flag
dup = false :: boolean(), %% Dup flag
sys = false :: boolean(), %% $SYS flag
payload :: binary(), %% Payload
timestamp :: erlang:timestamp(), %% os:timestamp
extra = [] :: list()
msgid :: mqtt_msgid(), %% Global unique message ID
pktid :: mqtt_pktid(), %% PacketId
topic :: binary(), %% Topic that the message is published to
sender :: pid(), %% Pid of the sender/publisher
from,
credential :: mqtt_credential(), %% Credential of the sender/publisher
qos = 0 :: 0 | 1 | 2, %% Message QoS
flags = [] :: [retain | dup | sys], %% Message Flags
retain = false :: boolean(), %% Retain flag
dup = false :: boolean(), %% Dup flag
sys = false :: boolean(), %% $SYS flag
payload :: binary(), %% Payload
timestamp :: erlang:timestamp(), %% os:timestamp
extra = [] :: list()
}).
-type(mqtt_message() :: #mqtt_message{}).
@ -147,10 +156,9 @@
%% MQTT Delivery
%%--------------------------------------------------------------------
-record(mqtt_delivery, {
sender :: pid(), %% Pid of the sender/publisher
credential :: mqtt_credential(), %% Credential of the sender/publisher
message :: mqtt_message(), %% Message
flow_through :: [node()]
message :: mqtt_message(), %% Message
dispatched = [] :: list(),
flow_through :: [node()]
}).
%%--------------------------------------------------------------------
@ -164,7 +172,7 @@
timestamp :: erlang:timestamp() %% Timestamp
}).
-type mqtt_alarm() :: #mqtt_alarm{}.
-type(mqtt_alarm() :: #mqtt_alarm{}).
%%--------------------------------------------------------------------
%% MQTT Plugin
@ -176,7 +184,7 @@
active = false
}).
-type mqtt_plugin() :: #mqtt_plugin{}.
-type(mqtt_plugin() :: #mqtt_plugin{}).
%%--------------------------------------------------------------------
%% MQTT CLI Command
@ -191,5 +199,5 @@
descr
}).
-type mqtt_cli() :: #mqtt_cli{}.
-type(mqtt_cli() :: #mqtt_cli{}).

View File

@ -26,6 +26,9 @@
-export([create/2, lookup/2, publish/1, subscribe/1, subscribe/3,
unsubscribe/1, unsubscribe/3]).
%% Route and Forward API
%% -export([route/2, forward/2]).
%% Hooks API
-export([hook/4, hook/3, unhook/2, run_hooks/3]).

View File

@ -70,7 +70,7 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
Client :: mqtt_client(),
PubSub :: pubsub(),
Topic :: binary()).
check_acl(Client, PubSub, Topic) when ?IS_PUBSUB(PubSub) ->
check_acl(Client, PubSub, Topic) when ?PUBSUB(PubSub) ->
case lookup_mods(acl) of
[] -> allow;
AclMods -> check_acl(Client, PubSub, Topic, AclMods)

View File

@ -1,153 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_backend).
-include("emqttd.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%% Mnesia Callbacks
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% Retained Message API
-export([retain_message/1, read_messages/1, match_messages/1, delete_message/1,
expire_messages/1, retained_count/0]).
%% Static Subscription API
-export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1,
del_subscription/2]).
-record(retained_message, {topic, msg}).
%%--------------------------------------------------------------------
%% Mnesia callbacks
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = emqttd_mnesia:create_table(retained_message, [
{type, ordered_set},
{disc_copies, [node()]},
{record_name, retained_message},
{attributes, record_info(fields, retained_message)},
{storage_properties, [{ets, [compressed]},
{dets, [{auto_save, 1000}]}]}]),
ok = emqttd_mnesia:create_table(backend_subscription, [
{type, bag},
{disc_copies, [node()]},
{record_name, mqtt_subscription},
{attributes, record_info(fields, mqtt_subscription)},
{storage_properties, [{ets, [compressed]},
{dets, [{auto_save, 5000}]}]}]);
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(retained_message),
ok = emqttd_mnesia:copy_table(backend_subscription).
%%--------------------------------------------------------------------
%% Retained Message
%%--------------------------------------------------------------------
-spec(retain_message(mqtt_message()) -> ok).
retain_message(Msg = #mqtt_message{topic = Topic}) ->
mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}).
-spec(read_messages(binary()) -> [mqtt_message()]).
read_messages(Topic) ->
[Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)].
-spec(match_messages(binary()) -> [mqtt_message()]).
match_messages(Filter) ->
%% TODO: optimize later...
Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) ->
case emqttd_topic:match(Name, Filter) of
true -> [Msg|Acc];
false -> Acc
end
end,
mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]).
-spec(delete_message(binary()) -> ok).
delete_message(Topic) ->
mnesia:dirty_delete(retained_message, Topic).
-spec(expire_messages(pos_integer()) -> any()).
expire_messages(Time) when is_integer(Time) ->
mnesia:transaction(
fun() ->
Match = ets:fun2ms(
fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}})
when Time > (MegaSecs * 1000000 + Secs) -> Topic
end),
Topics = mnesia:select(retained_message, Match, write),
lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages
(Topic) -> mnesia:delete({retained_message, Topic})
end, Topics)
end).
-spec(retained_count() -> non_neg_integer()).
retained_count() ->
mnesia:table_info(retained_message, size).
%%--------------------------------------------------------------------
%% Static Subscriptions
%%--------------------------------------------------------------------
%% @doc Add a static subscription manually.
-spec(add_subscription(mqtt_subscription()) -> ok | {error, already_existed}).
add_subscription(Subscription = #mqtt_subscription{subid = SubId, topic = Topic}) ->
Pattern = match_pattern(SubId, Topic),
return(mnesia:transaction(fun() ->
case mnesia:match_object(backend_subscription, Pattern, write) of
[] ->
mnesia:write(backend_subscription, Subscription, write);
[Subscription] ->
mnesia:abort(already_existed);
[Subscription1] -> %% QoS is different
mnesia:delete_object(backend_subscription, Subscription1, write),
mnesia:write(backend_subscription, Subscription, write)
end
end)).
%% @doc Lookup static subscriptions.
-spec(lookup_subscriptions(binary()) -> list(mqtt_subscription())).
lookup_subscriptions(ClientId) when is_binary(ClientId) ->
mnesia:dirty_read(backend_subscription, ClientId).
%% @doc Delete static subscriptions by ClientId manually.
-spec(del_subscriptions(binary()) -> ok).
del_subscriptions(ClientId) when is_binary(ClientId) ->
return(mnesia:transaction(fun mnesia:delete/1, [{backend_subscription, ClientId}])).
%% @doc Delete a static subscription manually.
-spec(del_subscription(binary(), binary()) -> ok).
del_subscription(ClientId, Topic) when is_binary(ClientId) andalso is_binary(Topic) ->
return(mnesia:transaction(fun del_subscription_/1, [match_pattern(ClientId, Topic)])).
del_subscription_(Pattern) ->
lists:foreach(fun(Subscription) ->
mnesia:delete_object(backend_subscription, Subscription, write)
end, mnesia:match_object(backend_subscription, Pattern, write)).
match_pattern(SubId, Topic) ->
#mqtt_subscription{subid = SubId, topic = Topic, qos = '_'}.
return({atomic, ok}) -> ok;
return({aborted, Reason}) -> {error, Reason}.

View File

@ -25,32 +25,22 @@
-export([load/1, on_client_connected/3, unload/1]).
-record(state, {topics, backend = false}).
load(Opts) ->
Topics = [{iolist_to_binary(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)],
State = #state{topics = Topics, backend = lists:member(backend, Opts)},
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [State]).
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]).
on_client_connected(?CONNACK_ACCEPT, Client = #mqtt_client{client_id = ClientId,
client_pid = ClientPid,
username = Username},
#state{topics = Topics, backend = Backend}) ->
username = Username}, Topics) ->
Replace = fun(Topic) -> rep(<<"$u">>, Username, rep(<<"$c">>, ClientId, Topic)) end,
TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- with_backend(Backend, ClientId, Topics)],
TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics],
emqttd_client:subscribe(ClientPid, TopicTable),
{ok, Client};
on_client_connected(_ConnAck, _Client, _State) ->
ok.
with_backend(false, _ClientId, TopicTable) ->
TopicTable;
with_backend(true, ClientId, TopicTable) ->
Fun = fun(#mqtt_subscription{topic = Topic, qos = Qos}) -> {Topic, Qos} end,
emqttd_opts:merge([Fun(Sub) || Sub <- emqttd_backend:lookup_subscriptions(ClientId)], TopicTable).
unload(_Opts) ->
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3).