fix retained

This commit is contained in:
Feng Lee 2015-01-18 12:12:52 +08:00
parent f16d56c8b9
commit 52abcef341
4 changed files with 20 additions and 121 deletions

View File

@ -32,6 +32,8 @@
-export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]). -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]).
-export([dump/1]).
%%---------------------------------------------------------------------------- %%----------------------------------------------------------------------------
-ifdef(use_specs). -ifdef(use_specs).
@ -120,3 +122,12 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
Msg#mqtt_message{retain = false}; Msg#mqtt_message{retain = false};
unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%%
%% @doc dump message
%%
dump(#mqtt_message{msgid= MsgId, qos = Qos, retain = Retain, dup = Dup, topic = Topic}) ->
io_lib:format("Message(MsgId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)",
[ MsgId, Qos, Retain, Dup, Topic ]).

View File

@ -1,114 +0,0 @@
%%-----------------------------------------------------------------------------
%% Copyright (c) 2012-2015, Feng Lee <feng@emqtt.io>
%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in all
%% copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
%% SOFTWARE.
%%------------------------------------------------------------------------------
-module(emqtt_retained).
-author('feng@emqtt.io').
%%TODO: FIXME Later...
%%
%% <<MQTT_V3.1_Protocol_Specific>>
%% RETAIN
%% Position: byte 1, bit 0.
%% This flag is only used on PUBLISH messages. When a client sends a PUBLISH to a server, if the Retain flag is set (1), the server should hold on to the message after it has been delivered to the current subscribers.
%% When a new subscription is established on a topic, the last retained message on that topic should be sent to the subscriber with the Retain flag set. If there is no retained message, nothing is sent
%% This is useful where publishers send messages on a "report by exception" basis, where it might be some time between messages. This allows new subscribers to instantly receive data with the retained, or Last Known Good, value.
%% When a server sends a PUBLISH to a client as a result of a subscription that already existed when the original PUBLISH arrived, the Retain flag should not be set, regardless of the Retain flag of the original PUBLISH. This allows a client to distinguish messages that are being received because they were retained and those that are being received "live".
%% Retained messages should be kept over restarts of the server.
%% A server may delete a retained message if it receives a message with a zero-length payload and the Retain flag set on the same topic.
-include("emqtt.hrl").
-export([start_link/0,
retain/1,
lookup/1,
insert/2,
delete/1,
send/2]).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
-record(state, {}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
retain(Msg = #mqtt_message{retain = true}) ->
Msg;
retain(Msg) -> Msg.
lookup(Topic) ->
ets:lookup(retained_msg, Topic).
insert(Topic, Msg) ->
gen_server:cast(?MODULE, {insert, Topic, Msg}).
delete(Topic) ->
gen_server:cast(?MODULE, {delete, Topic}).
send(Topic, Client) ->
[Client ! {dispatch, {self(), Msg}} ||{_, Msg} <- lookup(Topic)].
init([]) ->
ets:new(retained_msg, [set, protected, named_table]),
{ok, #state{}}.
handle_call(Req, _From, State) ->
{stop, {badreq,Req}, State}.
handle_cast({insert, Topic, Msg}, State) ->
ets:insert(retained_msg, {Topic, Msg}),
{noreply, State};
handle_cast({delete, Topic}, State) ->
ets:delete(retained_msg, Topic),
{noreply, State};
handle_cast(Msg, State) ->
{stop, {badmsg, Msg}, State}.
handle_info(Info, State) ->
{stop, {badinfo, Info}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -65,8 +65,9 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
route(Msg) -> route(Msg) ->
lager:info("Route message: ~s", [emqtt_message:dump(Msg)]),
% need to retain? % need to retain?
emqtt_retained:retain(Msg), emqtt_server:retain(Msg),
% unset flag and pubsub % unset flag and pubsub
emqtt_pubsub:publish( emqtt_message:unset_flag(Msg) ). emqtt_pubsub:publish( emqtt_message:unset_flag(Msg) ).

View File

@ -25,6 +25,7 @@
-author('feng@slimpp.io'). -author('feng@slimpp.io').
-include("emqtt.hrl"). -include("emqtt.hrl").
-include("emqtt_topic.hrl"). -include("emqtt_topic.hrl").
-behaviour(gen_server). -behaviour(gen_server).
@ -91,8 +92,8 @@ init([RetainOpts]) ->
Limit = proplists:get_value(store_limit, RetainOpts, ?STORE_LIMIT), Limit = proplists:get_value(store_limit, RetainOpts, ?STORE_LIMIT),
{ok, #state{store_limit = Limit}}. {ok, #state{store_limit = Limit}}.
handle_call(_Request, _From, State) -> handle_call(Req, _From, State) ->
{reply, ok, State}. {stop, {badreq, Req}, State}.
handle_cast({retain, Msg = #mqtt_message{ qos = Qos, handle_cast({retain, Msg = #mqtt_message{ qos = Qos,
topic = Topic, topic = Topic,
@ -108,11 +109,11 @@ handle_cast({retain, Msg = #mqtt_message{ qos = Qos,
end, end,
{noreply, State}; {noreply, State};
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
{noreply, State}. {stop, {badmsg, Msg}, State}.
handle_info(_Info, State) -> handle_info(Info, State) ->
{noreply, State}. {stop, {badinfo, Info}, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ok. ok.