retained_message

This commit is contained in:
Feng 2016-03-12 16:38:30 +08:00
parent ba5fcfdfae
commit b08bd5a578
1 changed files with 13 additions and 13 deletions

View File

@ -34,6 +34,8 @@
-export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1, -export([add_subscription/1, lookup_subscriptions/1, del_subscriptions/1,
del_subscription/2]). del_subscription/2]).
-record(retained_message, {topic, msg}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia callbacks %% Mnesia callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -42,9 +44,8 @@ mnesia(boot) ->
ok = emqttd_mnesia:create_table(retained_message, [ ok = emqttd_mnesia:create_table(retained_message, [
{type, ordered_set}, {type, ordered_set},
{disc_copies, [node()]}, {disc_copies, [node()]},
{record_name, mqtt_message}, {record_name, retained_message},
{index, [#mqtt_message.topic]}, {attributes, record_info(fields, retained_message)},
{attributes, record_info(fields, mqtt_message)},
{storage_properties, [{ets, [compressed]}, {storage_properties, [{ets, [compressed]},
{dets, [{auto_save, 1000}]}]}]), {dets, [{auto_save, 1000}]}]}]),
ok = emqttd_mnesia:create_table(backend_subscription, [ ok = emqttd_mnesia:create_table(backend_subscription, [
@ -64,17 +65,17 @@ mnesia(copy) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(retain_message(mqtt_message()) -> ok). -spec(retain_message(mqtt_message()) -> ok).
retain_message(Msg) when is_record(Msg, mqtt_message) -> retain_message(Msg = #mqtt_message{topic = Topic}) ->
mnesia:dirty_write(retained_message, Msg). mnesia:dirty_write(#retained_message{topic = Topic, msg = Msg}).
-spec(read_messages(binary()) -> [mqtt_message()]). -spec(read_messages(binary()) -> [mqtt_message()]).
read_messages(Topic) -> read_messages(Topic) ->
mnesia:dirty_index_read(retained_message, Topic, #mqtt_message.topic). [Msg || #retained_message{msg = Msg} <- mnesia:dirty_read(retained_message, Topic)].
-spec(match_messages(binary()) -> [mqtt_message()]). -spec(match_messages(binary()) -> [mqtt_message()]).
match_messages(Filter) -> match_messages(Filter) ->
%% TODO: optimize later... %% TODO: optimize later...
Fun = fun(Msg = #mqtt_message{topic = Name}, Acc) -> Fun = fun(#retained_message{topic = Name, msg = Msg}, Acc) ->
case emqttd_topic:match(Name, Filter) of case emqttd_topic:match(Name, Filter) of
true -> [Msg|Acc]; true -> [Msg|Acc];
false -> Acc false -> Acc
@ -84,19 +85,18 @@ match_messages(Filter) ->
-spec(delete_message(binary()) -> ok). -spec(delete_message(binary()) -> ok).
delete_message(Topic) -> delete_message(Topic) ->
%%TODO: no transaction??? mnesia:dirty_delete(retained_message, Topic).
[mnesia:dirty_delete_object(retained_message, Msg) || Msg <- read_messages(Topic)].
-spec(expire_messages(pos_integer()) -> any()). -spec(expire_messages(pos_integer()) -> any()).
expire_messages(Time) when is_integer(Time) -> expire_messages(Time) when is_integer(Time) ->
mnesia:transaction( mnesia:transaction(
fun() -> fun() ->
Match = ets:fun2ms( Match = ets:fun2ms(
fun(#mqtt_message{msgid = MsgId, timestamp = {MegaSecs, Secs, _}}) fun(#retained_message{topic = Topic, msg = #mqtt_message{timestamp = {MegaSecs, Secs, _}}})
when Time > (MegaSecs * 1000000 + Secs) -> MsgId when Time > (MegaSecs * 1000000 + Secs) -> Topic
end), end),
MsgIds = mnesia:select(retained_message, Match, write), Topics = mnesia:select(retained_message, Match, write),
lists:foreach(fun(MsgId) -> mnesia:delete({retained_message, MsgId}) end, MsgIds) lists:foreach(fun(Topic) -> mnesia:delete({retained_message, Topic}) end, Topics)
end). end).
-spec(retained_count() -> non_neg_integer()). -spec(retained_count() -> non_neg_integer()).