From 611ca0dca4470908edd04abebc72f889ab12e4ed Mon Sep 17 00:00:00 2001 From: Feng Date: Mon, 31 Oct 2016 10:46:22 +0800 Subject: [PATCH] move modules to plugins --- src/emqttd_mod_presence.erl | 75 ------------ src/emqttd_mod_retainer.erl | 202 -------------------------------- src/emqttd_mod_subscription.erl | 52 -------- 3 files changed, 329 deletions(-) delete mode 100644 src/emqttd_mod_presence.erl delete mode 100644 src/emqttd_mod_retainer.erl delete mode 100644 src/emqttd_mod_subscription.erl diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl deleted file mode 100644 index 29fe2915d..000000000 --- a/src/emqttd_mod_presence.erl +++ /dev/null @@ -1,75 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_mod_presence). - --behaviour(emqttd_gen_mod). - --include("emqttd.hrl"). - --export([load/1, unload/1]). - --export([on_client_connected/3, on_client_disconnected/3]). - -load(Opts) -> - emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Opts]), - emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Opts]). - -on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId, - username = Username, - peername = {IpAddr, _}, - clean_sess = CleanSess, - proto_ver = ProtoVer}, Opts) -> - Json = mochijson2:encode([{clientid, ClientId}, - {username, Username}, - {ipaddress, list_to_binary(emqttd_net:ntoa(IpAddr))}, - {session, sess(CleanSess)}, - {protocol, ProtoVer}, - {connack, ConnAck}, - {ts, emqttd_time:now_to_secs()}]), - Msg = message(qos(Opts), topic(connected, ClientId), Json), - emqttd:publish(emqttd_message:set_flag(sys, Msg)), - {ok, Client}. - -on_client_disconnected(Reason, #mqtt_client{client_id = ClientId}, Opts) -> - Json = mochijson2:encode([{clientid, ClientId}, - {reason, reason(Reason)}, - {ts, emqttd_time:now_to_secs()}]), - Msg = message(qos(Opts), topic(disconnected, ClientId), Json), - emqttd:publish(emqttd_message:set_flag(sys, Msg)), - ok. - -unload(_Opts) -> - emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3), - emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3). - -sess(false) -> true; -sess(true) -> false. - -qos(Opts) -> proplists:get_value(qos, Opts, 0). - -message(Qos, Topic, Json) -> - emqttd_message:make(presence, Qos, Topic, iolist_to_binary(Json)). - -topic(connected, ClientId) -> - emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); -topic(disconnected, ClientId) -> - emqttd_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])). - -reason(Reason) when is_atom(Reason) -> Reason; -reason({Error, _}) when is_atom(Error) -> Error; -reason(_) -> internal_error. - diff --git a/src/emqttd_mod_retainer.erl b/src/emqttd_mod_retainer.erl deleted file mode 100644 index 5742a264c..000000000 --- a/src/emqttd_mod_retainer.erl +++ /dev/null @@ -1,202 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_mod_retainer). - --behaviour(gen_server). - --behaviour(emqttd_gen_mod). - --include("emqttd.hrl"). - --include("emqttd_internal.hrl"). - --include_lib("stdlib/include/ms_transform.hrl"). - -%% gen_mod Callbacks --export([load/1, unload/1]). - -%% Hook Callbacks --export([on_session_subscribed/4, on_message_publish/2]). - -%% API Function Exports --export([start_link/1]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(mqtt_retained, {topic, msg}). - --record(state, {stats_fun, expired_after, stats_timer, expire_timer}). - -%%-------------------------------------------------------------------- -%% Load/Unload -%%-------------------------------------------------------------------- - -load(Env) -> - emqttd_mod_sup:start_child(spec(Env)), - emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]), - emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]). - -on_session_subscribed(_ClientId, _Username, {Topic, _Opts}, _Env) -> - SessPid = self(), - Msgs = case emqttd_topic:wildcard(Topic) of - false -> read_messages(Topic); - true -> match_messages(Topic) - end, - lists:foreach(fun(Msg) -> SessPid ! {dispatch, Topic, Msg} end, lists:reverse(Msgs)). - -on_message_publish(Msg = #mqtt_message{retain = false}, _Env) -> - {ok, Msg}; - -%% RETAIN flag set to 1 and payload containing zero bytes -on_message_publish(Msg = #mqtt_message{retain = true, topic = Topic, payload = <<>>}, _Env) -> - mnesia:dirty_delete(mqtt_retained, Topic), - {stop, Msg}; - -on_message_publish(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}, Env) -> - case {is_table_full(Env), is_too_big(size(Payload), Env)} of - {false, false} -> - mnesia:dirty_write(#mqtt_retained{topic = Topic, msg = Msg}), - emqttd_metrics:set('messages/retained', retained_count()); - {true, _}-> - lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]); - {_, true}-> - lager:error("Cannot retain message(topic=~s, payload_size=~p)" - " for payload is too big!", [Topic, size(Payload)]) - end, - {ok, Msg#mqtt_message{retain = false}}. - -is_table_full(Env) -> - Limit = proplists:get_value(max_message_num, Env, 0), - Limit > 0 andalso (retained_count() > Limit). - -is_too_big(Size, Env) -> - Limit = proplists:get_value(max_payload_size, Env, 0), - Limit > 0 andalso (Size > Limit). - -unload(_Env) -> - emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4), - emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2), - emqttd_mod_sup:stop_child(?MODULE). - -spec(Env) -> - {?MODULE, {?MODULE, start_link, [Env]}, permanent, 5000, worker, [?MODULE]}. - -%%-------------------------------------------------------------------- -%% API -%%-------------------------------------------------------------------- - -%% @doc Start the retainer --spec(start_link(Env :: list()) -> {ok, pid()} | ignore | {error, any()}). -start_link(Env) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []). - -%%-------------------------------------------------------------------- -%% gen_server Callbacks -%%-------------------------------------------------------------------- - -init([Env]) -> - Copy = case proplists:get_value(storage_type, Env, disc) of - disc -> disc_copies; - ram -> ram_copies - end, - ok = emqttd_mnesia:create_table(mqtt_retained, [ - {type, ordered_set}, - {Copy, [node()]}, - {record_name, mqtt_retained}, - {attributes, record_info(fields, mqtt_retained)}, - {storage_properties, [{ets, [compressed]}, - {dets, [{auto_save, 1000}]}]}]), - ok = emqttd_mnesia:copy_table(mqtt_retained), - StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'), - {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), - State = #state{stats_fun = StatsFun, stats_timer = StatsTimer}, - {ok, init_expire_timer(proplists:get_value(expired_after, Env, 0), State)}. - -init_expire_timer(0, State) -> - State; -init_expire_timer(undefined, State) -> - State; -init_expire_timer(Secs, State) -> - {ok, Timer} = timer:send_interval(timer:seconds(Secs), expire), - State#state{expired_after = Secs, expire_timer = Timer}. - -handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). - -handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). - -handle_info(stats, State = #state{stats_fun = StatsFun}) -> - StatsFun(retained_count()), - {noreply, State, hibernate}; - -handle_info(expire, State = #state{expired_after = Never}) - when Never =:= 0 orelse Never =:= undefined -> - {noreply, State, hibernate}; - -handle_info(expire, State = #state{expired_after = ExpiredAfter}) -> - expire_messages(emqttd_time:now_to_secs() - ExpiredAfter), - {noreply, State, hibernate}; - -handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). - -terminate(_Reason, _State = #state{stats_timer = TRef1, expire_timer = TRef2}) -> - timer:cancel(TRef1), - timer:cancel(TRef2). - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal Functions -%%-------------------------------------------------------------------- - --spec(read_messages(binary()) -> [mqtt_message()]). -read_messages(Topic) -> - [Msg || #mqtt_retained{msg = Msg} <- mnesia:dirty_read(mqtt_retained, Topic)]. - --spec(match_messages(binary()) -> [mqtt_message()]). -match_messages(Filter) -> - %% TODO: optimize later... - Fun = fun(#mqtt_retained{topic = Name, msg = Msg}, Acc) -> - case emqttd_topic:match(Name, Filter) of - true -> [Msg|Acc]; - false -> Acc - end - end, - mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], mqtt_retained]). - --spec(expire_messages(pos_integer()) -> any()). -expire_messages(Time) when is_integer(Time) -> - mnesia:transaction( - fun() -> - Match = ets:fun2ms( - fun(#mqtt_retained{topic = Topic, msg = #mqtt_message{timestamp = Ts}}) - when Time > Ts -> Topic - end), - Topics = mnesia:select(mqtt_retained, Match, write), - lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages - (Topic) -> mnesia:delete({mqtt_retained, Topic}) - end, Topics) - end). - --spec(retained_count() -> non_neg_integer()). -retained_count() -> mnesia:table_info(mqtt_retained, size). - diff --git a/src/emqttd_mod_subscription.erl b/src/emqttd_mod_subscription.erl deleted file mode 100644 index 904e29084..000000000 --- a/src/emqttd_mod_subscription.erl +++ /dev/null @@ -1,52 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2012-2016 Feng Lee . -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqttd_mod_subscription). - --behaviour(emqttd_gen_mod). - --include("emqttd.hrl"). - --include("emqttd_protocol.hrl"). - --export([load/1, on_client_connected/3, unload/1]). - -load(Opts) -> - Topics = [{iolist_to_binary(Topic), QoS} || {Topic, QoS} <- Opts, ?IS_QOS(QoS)], - emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]). - -on_client_connected(?CONNACK_ACCEPT, Client = #mqtt_client{client_id = ClientId, - client_pid = ClientPid, - username = Username}, Topics) -> - - Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, - TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics], - emqttd_client:subscribe(ClientPid, TopicTable), - {ok, Client}; - -on_client_connected(_ConnAck, _Client, _State) -> - ok. - -unload(_Opts) -> - emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3). - -rep(<<"%c">>, ClientId, Topic) -> - emqttd_topic:feed_var(<<"%c">>, ClientId, Topic); -rep(<<"%u">>, undefined, Topic) -> - Topic; -rep(<<"%u">>, Username, Topic) -> - emqttd_topic:feed_var(<<"%u">>, Username, Topic). -