From edb45d7029b0535e0a113d97d875b1ba4e63d6c6 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 9 Sep 2016 19:38:42 +0800 Subject: [PATCH] retained_message -> mqtt_retained --- docs/source/changes.rst | 37 ++++++++++++++++++++++++++++++++++--- src/emqttd_mod_retainer.erl | 31 +++++++++++++++---------------- 2 files changed, 49 insertions(+), 19 deletions(-) diff --git a/docs/source/changes.rst b/docs/source/changes.rst index 33c400101..e2126807c 100644 --- a/docs/source/changes.rst +++ b/docs/source/changes.rst @@ -7,9 +7,40 @@ Changes .. _release_2.0: -------------------------------------- -Version 2.0-beta1 (West of West Lake) -------------------------------------- +------------------------------- +Version 2.0 (West of West Lake) +------------------------------- + +*Release Date: 2016-09-30* + +*Release Name: West of West Lake* + +TODO: ... + +----------------- +Version 2.0-beta2 +----------------- + +*Release Date: 2016-09-10* + +API Breaking Changes +-------------------- + +'$u', '$c' variables in emqttd.conf and modules/acl.conf changed to '%u', '%c' + +Improve the design of mqtt retained message, replace emqttd_retainer with emqttd_mod_retainer. + +Add 'session.subscribed', 'session.unsubscribed' hooks, remove 'client.subscribe.after' hook + +Tab 'retained_message' -> 'mqtt_retained' + +Bugfix +------ + + +----------------- +Version 2.0-beta1 +----------------- *Release Date: 2016-08-30* diff --git a/src/emqttd_mod_retainer.erl b/src/emqttd_mod_retainer.erl index 32887bab4..5742a264c 100644 --- a/src/emqttd_mod_retainer.erl +++ b/src/emqttd_mod_retainer.erl @@ -39,7 +39,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(retained_message, {topic, msg}). +-record(mqtt_retained, {topic, msg}). -record(state, {stats_fun, expired_after, stats_timer, expire_timer}). @@ -65,13 +65,13 @@ on_message_publish(Msg = #mqtt_message{retain = false}, _Env) -> %% RETAIN flag set to 1 and payload containing zero bytes on_message_publish(Msg = #mqtt_message{retain = true, topic = Topic, payload = <<>>}, _Env) -> - mnesia:dirty_delete(retained_message, Topic), + mnesia:dirty_delete(mqtt_retained, Topic), {stop, Msg}; on_message_publish(Msg = #mqtt_message{topic = Topic, retain = true, payload = Payload}, Env) -> case {is_table_full(Env), is_too_big(size(Payload), Env)} of {false, false} -> - mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}), + mnesia:dirty_write(#mqtt_retained{topic = Topic, msg = Msg}), emqttd_metrics:set('messages/retained', retained_count()); {true, _}-> lager:error("Cannot retain message(topic=~s) for table is full!", [Topic]); @@ -111,20 +111,19 @@ start_link(Env) -> %%-------------------------------------------------------------------- init([Env]) -> - Copy = case proplists:get_value(storage, Env, disc) of + Copy = case proplists:get_value(storage_type, Env, disc) of disc -> disc_copies; ram -> ram_copies end, - ok = emqttd_mnesia:create_table(retained_message, [ + ok = emqttd_mnesia:create_table(mqtt_retained, [ {type, ordered_set}, {Copy, [node()]}, - {record_name, retained_message}, - {attributes, record_info(fields, retained_message)}, + {record_name, mqtt_retained}, + {attributes, record_info(fields, mqtt_retained)}, {storage_properties, [{ets, [compressed]}, {dets, [{auto_save, 1000}]}]}]), - ok = emqttd_mnesia:copy_table(retained_message), + ok = emqttd_mnesia:copy_table(mqtt_retained), StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'), - %% One second {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), State = #state{stats_fun = StatsFun, stats_timer = StatsTimer}, {ok, init_expire_timer(proplists:get_value(expired_after, Env, 0), State)}. @@ -171,33 +170,33 @@ code_change(_OldVsn, State, _Extra) -> -spec(read_messages(binary()) -> [mqtt_message()]). read_messages(Topic) -> - [Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)]. + [Msg || #mqtt_retained{msg = Msg} <- mnesia:dirty_read(mqtt_retained, Topic)]. -spec(match_messages(binary()) -> [mqtt_message()]). match_messages(Filter) -> %% TODO: optimize later... - Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) -> + Fun = fun(#mqtt_retained{topic = Name, msg = Msg}, Acc) -> case emqttd_topic:match(Name, Filter) of true -> [Msg|Acc]; false -> Acc end end, - mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], retained_message]). + mnesia:async_dirty(fun mnesia:foldl/3, [Fun, [], mqtt_retained]). -spec(expire_messages(pos_integer()) -> any()). expire_messages(Time) when is_integer(Time) -> mnesia:transaction( fun() -> Match = ets:fun2ms( - fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = Ts}}) + fun(#mqtt_retained{topic = Topic, msg = #mqtt_message{timestamp = Ts}}) when Time > Ts -> Topic end), - Topics = mnesia:select(retained_message, Match, write), + Topics = mnesia:select(mqtt_retained, Match, write), lists:foreach(fun(<<"$SYS/", _/binary>>) -> ok; %% ignore $SYS/# messages - (Topic) -> mnesia:delete({retained_message, Topic}) + (Topic) -> mnesia:delete({mqtt_retained, Topic}) end, Topics) end). -spec(retained_count() -> non_neg_integer()). -retained_count() -> mnesia:table_info(retained_message, size). +retained_count() -> mnesia:table_info(mqtt_retained, size).