From f16d56c8b911e5af4881e376008e6d00918a5ee9 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 18 Jan 2015 11:36:21 +0800 Subject: [PATCH 1/4] retained messages --- apps/emqtt/src/emqtt_server.erl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/emqtt/src/emqtt_server.erl b/apps/emqtt/src/emqtt_server.erl index c464656e3..6cc58a14d 100644 --- a/apps/emqtt/src/emqtt_server.erl +++ b/apps/emqtt/src/emqtt_server.erl @@ -71,7 +71,9 @@ retain(Msg = #mqtt_message{retain = true}) -> %% subscribe(Topics, CPid) when is_pid(CPid) -> + lager:info("Retained Topics: ~p", [match(Topics)]), RetainedMsgs = lists:flatten([mnesia:dirty_read(?RETAINED_TAB, Topic) || Topic <- match(Topics)]), + lager:info("Retained Messages: ~p", [RetainedMsgs]), lists:foreach(fun(Msg) -> CPid ! {dispatch, {self(), retained_msg(Msg)}} end, RetainedMsgs). From 52abcef341b48caaeb6ed37519d9a6ceacc9edc7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 18 Jan 2015 12:12:52 +0800 Subject: [PATCH 2/4] fix retained --- apps/emqtt/src/emqtt_message.erl | 11 +++ apps/emqtt/src/emqtt_retained.erl | 114 ------------------------------ apps/emqtt/src/emqtt_router.erl | 3 +- apps/emqtt/src/emqtt_server.erl | 13 ++-- 4 files changed, 20 insertions(+), 121 deletions(-) delete mode 100644 apps/emqtt/src/emqtt_retained.erl diff --git a/apps/emqtt/src/emqtt_message.erl b/apps/emqtt/src/emqtt_message.erl index aa4c415f0..d8dde9814 100644 --- a/apps/emqtt/src/emqtt_message.erl +++ b/apps/emqtt/src/emqtt_message.erl @@ -32,6 +32,8 @@ -export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]). +-export([dump/1]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -120,3 +122,12 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) -> Msg#mqtt_message{retain = false}; unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. + +%% +%% @doc dump message +%% +dump(#mqtt_message{msgid= MsgId, qos = Qos, retain = Retain, dup = Dup, topic = Topic}) -> + io_lib:format("Message(MsgId=~p, Qos=~p, Retain=~s, Dup=~s, Topic=~s)", + [ MsgId, Qos, Retain, Dup, Topic ]). + + diff --git a/apps/emqtt/src/emqtt_retained.erl b/apps/emqtt/src/emqtt_retained.erl deleted file mode 100644 index f7edd79f1..000000000 --- a/apps/emqtt/src/emqtt_retained.erl +++ /dev/null @@ -1,114 +0,0 @@ -%%----------------------------------------------------------------------------- -%% Copyright (c) 2012-2015, Feng Lee -%% -%% Permission is hereby granted, free of charge, to any person obtaining a copy -%% of this software and associated documentation files (the "Software"), to deal -%% in the Software without restriction, including without limitation the rights -%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -%% copies of the Software, and to permit persons to whom the Software is -%% furnished to do so, subject to the following conditions: -%% -%% The above copyright notice and this permission notice shall be included in all -%% copies or substantial portions of the Software. -%% -%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -%% SOFTWARE. -%%------------------------------------------------------------------------------ - --module(emqtt_retained). - --author('feng@emqtt.io'). - -%%TODO: FIXME Later... - -%% -%% <> - -%% RETAIN -%% Position: byte 1, bit 0. - -%% This flag is only used on PUBLISH messages. When a client sends a PUBLISH to a server, if the Retain flag is set (1), the server should hold on to the message after it has been delivered to the current subscribers. - -%% When a new subscription is established on a topic, the last retained message on that topic should be sent to the subscriber with the Retain flag set. If there is no retained message, nothing is sent - -%% This is useful where publishers send messages on a "report by exception" basis, where it might be some time between messages. This allows new subscribers to instantly receive data with the retained, or Last Known Good, value. - -%% When a server sends a PUBLISH to a client as a result of a subscription that already existed when the original PUBLISH arrived, the Retain flag should not be set, regardless of the Retain flag of the original PUBLISH. This allows a client to distinguish messages that are being received because they were retained and those that are being received "live". - -%% Retained messages should be kept over restarts of the server. - -%% A server may delete a retained message if it receives a message with a zero-length payload and the Retain flag set on the same topic. - --include("emqtt.hrl"). - --export([start_link/0, - retain/1, - lookup/1, - insert/2, - delete/1, - send/2]). - --behaviour(gen_server). - --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --record(state, {}). - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -retain(Msg = #mqtt_message{retain = true}) -> - Msg; - -retain(Msg) -> Msg. - -lookup(Topic) -> - ets:lookup(retained_msg, Topic). - -insert(Topic, Msg) -> - gen_server:cast(?MODULE, {insert, Topic, Msg}). - -delete(Topic) -> - gen_server:cast(?MODULE, {delete, Topic}). - -send(Topic, Client) -> - [Client ! {dispatch, {self(), Msg}} ||{_, Msg} <- lookup(Topic)]. - -init([]) -> - ets:new(retained_msg, [set, protected, named_table]), - {ok, #state{}}. - -handle_call(Req, _From, State) -> - {stop, {badreq,Req}, State}. - -handle_cast({insert, Topic, Msg}, State) -> - ets:insert(retained_msg, {Topic, Msg}), - {noreply, State}; - -handle_cast({delete, Topic}, State) -> - ets:delete(retained_msg, Topic), - {noreply, State}; - -handle_cast(Msg, State) -> - {stop, {badmsg, Msg}, State}. - -handle_info(Info, State) -> - {stop, {badinfo, Info}, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - diff --git a/apps/emqtt/src/emqtt_router.erl b/apps/emqtt/src/emqtt_router.erl index 3026f98fa..43e6bd419 100644 --- a/apps/emqtt/src/emqtt_router.erl +++ b/apps/emqtt/src/emqtt_router.erl @@ -65,8 +65,9 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). route(Msg) -> + lager:info("Route message: ~s", [emqtt_message:dump(Msg)]), % need to retain? - emqtt_retained:retain(Msg), + emqtt_server:retain(Msg), % unset flag and pubsub emqtt_pubsub:publish( emqtt_message:unset_flag(Msg) ). diff --git a/apps/emqtt/src/emqtt_server.erl b/apps/emqtt/src/emqtt_server.erl index 6cc58a14d..9360eaa8f 100644 --- a/apps/emqtt/src/emqtt_server.erl +++ b/apps/emqtt/src/emqtt_server.erl @@ -25,6 +25,7 @@ -author('feng@slimpp.io'). -include("emqtt.hrl"). + -include("emqtt_topic.hrl"). -behaviour(gen_server). @@ -91,8 +92,8 @@ init([RetainOpts]) -> Limit = proplists:get_value(store_limit, RetainOpts, ?STORE_LIMIT), {ok, #state{store_limit = Limit}}. -handle_call(_Request, _From, State) -> - {reply, ok, State}. +handle_call(Req, _From, State) -> + {stop, {badreq, Req}, State}. handle_cast({retain, Msg = #mqtt_message{ qos = Qos, topic = Topic, @@ -108,11 +109,11 @@ handle_cast({retain, Msg = #mqtt_message{ qos = Qos, end, {noreply, State}; -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(Msg, State) -> + {stop, {badmsg, Msg}, State}. -handle_info(_Info, State) -> - {noreply, State}. +handle_info(Info, State) -> + {stop, {badinfo, Info}, State}. terminate(_Reason, _State) -> ok. From f3057c08e4ac75cf2eaf4dcb970482b923629d38 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 18 Jan 2015 12:57:33 +0800 Subject: [PATCH 3/4] 0.3.0 changes --- CHANGELOG.md | 28 +++++++++++++++++++++++ apps/emqtt/src/x.erl | 50 ------------------------------------------ scripts/mosquitto_test | 19 ++++++++++++++++ 3 files changed, 47 insertions(+), 50 deletions(-) delete mode 100644 apps/emqtt/src/x.erl create mode 100755 scripts/mosquitto_test diff --git a/CHANGELOG.md b/CHANGELOG.md index 9011e9dc0..d582ef4f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,34 @@ eMQTT ChangeLog ================== +v0.3.0-alpha (2015-01-18) +------------------------ + +NOTICE: Full MQTT 3.1.1 support now! + +Feature: Passed org.eclipse.paho.mqtt.testing/interoperability tests + +Feature: Qos0, Qos1 and Qos2 publish and suscribe + +Feature: session(clean_sess=false) management and offline messages + +Feature: redeliver awaiting puback/pubrec messages(doc: Chapter 4.4) + +Feature: retain messages, add emqtt_server module + +Feature: MQTT 3.1.1 null client_id support + +Bugfix: keepalive timeout to send will message + +Improve: overlapping subscription support + +Improve: add emqtt_packet:dump to dump packets + +Test: passed org.eclipse.paho.mqtt.testing/interoperability + +Test: simple cluster test + + v0.2.1-beta (2015-01-08) ------------------------ diff --git a/apps/emqtt/src/x.erl b/apps/emqtt/src/x.erl deleted file mode 100644 index 06e3e515d..000000000 --- a/apps/emqtt/src/x.erl +++ /dev/null @@ -1,50 +0,0 @@ --module(x). --behaviour(gen_server). --define(SERVER, ?MODULE). - -%% ------------------------------------------------------------------ -%% API Function Exports -%% ------------------------------------------------------------------ - --export([start_link/0]). - -%% ------------------------------------------------------------------ -%% gen_server Function Exports -%% ------------------------------------------------------------------ - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%% ------------------------------------------------------------------ -%% API Function Definitions -%% ------------------------------------------------------------------ - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%% ------------------------------------------------------------------ -%% gen_server Function Definitions -%% ------------------------------------------------------------------ - -init(Args) -> - {ok, Args}. - -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ - diff --git a/scripts/mosquitto_test b/scripts/mosquitto_test new file mode 100755 index 000000000..d5a504134 --- /dev/null +++ b/scripts/mosquitto_test @@ -0,0 +1,19 @@ +#!/bin/sh +# -*- tab-width:4;indent-tabs-mode:nil -*- +# ex: ts=4 sw=4 et + +# slimple publish +mosquitto_pub -t xxx/yyy -m hello +if [ "$?" == 0 ]; then + echo "[Success]: slimple publish" +else + echo "[Failure]: slimple publish" +fi + +# publish will willmsg +mosquitto_pub -q 1 -t a/b/c -m hahah -u test -P public --will-topic /will --will-payload willmsg --will-qos 1 +if [ "$?" == 0 ]; then + echo "[Success]: publish with willmsg" +else + echo "[Failure]: publish with willmsg" +fi From ed3048232ad0142c9c09eb67cc569a293e31c040 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sun, 18 Jan 2015 13:25:02 +0800 Subject: [PATCH 4/4] name, setcookie max processes --- rel/files/vm.args | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rel/files/vm.args b/rel/files/vm.args index cf869a472..1c790f3a1 100644 --- a/rel/files/vm.args +++ b/rel/files/vm.args @@ -1,8 +1,8 @@ ## Name of the node --sname emqtt +-name emqtt@127.0.0.1 ## Cookie for distributed erlang --setcookie emqtt +-setcookie emqttsecretcookie ## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive ## (Disabled by default..use with caution!) @@ -14,7 +14,7 @@ +A 32 ## max process numbers -+P 100000 ++P 1000000 ## Increase number of concurrent ports/sockets -env ERL_MAX_PORTS 4096 @@ -25,3 +25,4 @@ ## Tweak GC to run more often ##-env ERL_FULLSWEEP_AFTER 10 +#