From 8fa63244bb85a4224e9c04129a7e44656bef4941 Mon Sep 17 00:00:00 2001 From: erylee Date: Sun, 23 Dec 2012 16:53:38 +0800 Subject: [PATCH] fix issue#3 'PUBLISH' RETAIN --- docs/keepalive.md | 1 + src/emqtt_client.erl | 10 +++++ src/emqtt_retained.erl | 97 ++++++++++++++++++++++++++++++++++++++++++ src/emqtt_router.erl | 5 ++- src/emqtt_sup.erl | 3 +- 5 files changed, 112 insertions(+), 4 deletions(-) create mode 100644 docs/keepalive.md create mode 100644 src/emqtt_retained.erl diff --git a/docs/keepalive.md b/docs/keepalive.md new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/docs/keepalive.md @@ -0,0 +1 @@ + diff --git a/src/emqtt_client.erl b/src/emqtt_client.erl index f615a0e68..d76617f10 100644 --- a/src/emqtt_client.erl +++ b/src/emqtt_client.erl @@ -238,7 +238,11 @@ process_request(?PUBLISH, dup = Dup, message_id = MessageId, payload = Payload }, + emqtt_router:publish(Topic, Msg), + + %Retained? + retained(Retain, Topic, Msg), send_frame(Sock, #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, @@ -357,4 +361,10 @@ valid_client_id(ClientId) -> ClientIdLen = size(ClientId), 1 =< ClientIdLen andalso ClientIdLen =< ?CLIENT_ID_MAXLEN. +retained(false, _Topic, _Msg) -> + ignore; +retained(true, Topic, #mqtt_msg{payload = <<>>}) -> + emqtt_retained:delete(Topic); +retained(true, Topic, Msg) -> + emqtt_retained:insert(Topic, Msg). diff --git a/src/emqtt_retained.erl b/src/emqtt_retained.erl new file mode 100644 index 000000000..6d4009e4a --- /dev/null +++ b/src/emqtt_retained.erl @@ -0,0 +1,97 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% Developer of the eMQTT Code is +%% Copyright (c) 2012 Ery Lee. All rights reserved. +%% + +-module(emqtt_retained). + +%% +%% <> + +%% RETAIN +%% Position: byte 1, bit 0. + +%% This flag is only used on PUBLISH messages. When a client sends a PUBLISH to a server, if the Retain flag is set (1), the server should hold on to the message after it has been delivered to the current subscribers. + +%% When a new subscription is established on a topic, the last retained message on that topic should be sent to the subscriber with the Retain flag set. If there is no retained message, nothing is sent + +%% This is useful where publishers send messages on a "report by exception" basis, where it might be some time between messages. This allows new subscribers to instantly receive data with the retained, or Last Known Good, value. + +%% When a server sends a PUBLISH to a client as a result of a subscription that already existed when the original PUBLISH arrived, the Retain flag should not be set, regardless of the Retain flag of the original PUBLISH. This allows a client to distinguish messages that are being received because they were retained and those that are being received "live". + +%% Retained messages should be kept over restarts of the server. + +%% A server may delete a retained message if it receives a message with a zero-length payload and the Retain flag set on the same topic. + +-include("emqtt.hrl"). + +-export([start_link/0, + lookup/1, + insert/2, + delete/1, + send/2]). + +-behaviour(gen_server). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {}). + +start_link() -> + gen_server2:start_link({local, ?MODULE}, ?MODULE, [], []). + +lookup(Topic) -> + ets:lookup(retained_msg, Topic). + +insert(Topic, Msg) -> + gen_server2:cast(?MODULE, {insert, Topic, Msg}). + +delete(Topic) -> + gen_server2:cast(?MODULE, {delete, Topic}). + +send(Topic, Client) -> + [Client ! {route, Msg} ||{_, Msg} <- lookup(Topic)]. + +init([]) -> + ets:new(retained_msg, [set, protected, named_table]), + ?INFO("~p is started.", [?MODULE]), + {ok, #state{}}. + +handle_call(Req, _From, State) -> + {stop, {badreq,Req}, State}. + +handle_cast({insert, Topic, Msg}, State) -> + ets:insert(retained_msg, {Topic, Msg}), + {noreply, State}; + +handle_cast({delete, Topic}, State) -> + ets:delete(retained_msg, Topic), + {noreply, State}; + +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. + +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + diff --git a/src/emqtt_router.erl b/src/emqtt_router.erl index 34d98b83b..4a743a3d7 100644 --- a/src/emqtt_router.erl +++ b/src/emqtt_router.erl @@ -73,6 +73,7 @@ handle_call({subscribe, Topic, Client}, _From, State) -> end, Ref = erlang:monitor(process, Client), ets:insert(subscriber, #subscriber{topic=Topic, client=Client, monref=Ref}), + emqtt_retained:send(Topic, Client), {reply, ok, State}; handle_call(Req, _From, State) -> @@ -104,8 +105,8 @@ handle_info(Info, State) -> terminate(_Reason, _State) -> ok. -code_change(_OldVsn, _State, _Extra) -> - ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %-------------------------------------- % internal functions diff --git a/src/emqtt_sup.erl b/src/emqtt_sup.erl index ed9f44148..59ec7fdd7 100644 --- a/src/emqtt_sup.erl +++ b/src/emqtt_sup.erl @@ -27,6 +27,7 @@ start_link(Listeners) -> init([Listeners]) -> {ok, { {one_for_all, 5, 10}, [ ?CHILD(emqtt_auth, worker), + ?CHILD(emqtt_retained, worker), ?CHILD(emqtt_router, worker), ?CHILD(emqtt_client_sup, supervisor) | listener_children(Listeners) ]} @@ -37,5 +38,3 @@ listener_children(Listeners) -> {emqtt_client_sup, start_client, []}) || Listener <- Listeners]). - -