From c65d047fda61dc58a404de96ec0936b1a82fbe7c Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 23 Jul 2015 16:04:21 +0800 Subject: [PATCH 1/8] QOS_I --- include/emqttd_protocol.hrl | 20 ++++++++++++++++ src/emqttd_message.erl | 6 ++--- src/emqttd_qos.erl | 47 +++++++++++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+), 3 deletions(-) create mode 100644 src/emqttd_qos.erl diff --git a/include/emqttd_protocol.hrl b/include/emqttd_protocol.hrl index 22a32bc09..c5adb66b3 100644 --- a/include/emqttd_protocol.hrl +++ b/include/emqttd_protocol.hrl @@ -49,6 +49,26 @@ -type mqtt_qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2. +-type mqtt_qos_name() :: qos0 | at_most_once | + qos1 | at_least_once | + qos2 | exactly_once. + +-define(QOS_I(Name), + begin + (case Name of + ?QOS_0 -> ?QOS_0; + qos0 -> ?QOS_0; + at_most_once -> ?QOS_0; + ?QOS_1 -> ?QOS_1; + qos1 -> ?QOS_1; + at_least_once -> ?QOS_1; + ?QOS_2 -> ?QOS_2; + qos2 -> ?QOS_2; + exactly_once -> ?QOS_2 + end) + end). + + %%------------------------------------------------------------------------------ %% Max ClientId Length. Why 1024? NiDongDe! %%------------------------------------------------------------------------------ diff --git a/src/emqttd_message.erl b/src/emqttd_message.erl index 00608ea69..8f3d4ed40 100644 --- a/src/emqttd_message.erl +++ b/src/emqttd_message.erl @@ -55,14 +55,14 @@ make(From, Topic, Payload) -> -spec make(From, Qos, Topic, Payload) -> mqtt_message() when From :: atom() | binary(), - Qos :: mqtt_qos(), + Qos :: mqtt_qos() | mqtt_qos_name(), Topic :: binary(), Payload :: binary(). make(From, Qos, Topic, Payload) -> #mqtt_message{msgid = msgid(Qos), topic = Topic, from = From, - qos = Qos, + qos = ?QOS_I(Qos), payload = Payload, timestamp = os:timestamp()}. @@ -107,7 +107,7 @@ from_packet(ClientId, Packet) -> msgid(?QOS_0) -> undefined; -msgid(_Qos) -> +msgid(Qos) when Qos =:= ?QOS_1 orelse Qos =:= ?QOS_2 -> emqttd_guid:gen(). %%------------------------------------------------------------------------------ diff --git a/src/emqttd_qos.erl b/src/emqttd_qos.erl new file mode 100644 index 000000000..49e863e17 --- /dev/null +++ b/src/emqttd_qos.erl @@ -0,0 +1,47 @@ +%%%----------------------------------------------------------------------------- +%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved. +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd Qos Functions. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_qos). + +-include("emqttd_protocol.hrl"). + +-export([a/1, i/1]). + +a(?QOS_0) -> qos0; +a(?QOS_1) -> qos1; +a(?QOS_2) -> qos2; +a(qos0) -> qos0; +a(qos1) -> qos1; +a(qos2) -> qos2. + +i(?QOS_0) -> ?QOS_0; +i(?QOS_1) -> ?QOS_1; +i(?QOS_2) -> ?QOS_2; +i(qos0) -> ?QOS_0; +i(qos1) -> ?QOS_1; +i(qos2) -> ?QOS_2. + + From 268fcace65549037dae4084e6316447ad54ba0d7 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Jul 2015 10:47:06 +0800 Subject: [PATCH 2/8] mqueue --- src/emqttd_bridge.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index d59f187cd..74a21fadd 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -44,10 +44,10 @@ -define(PING_DOWN_INTERVAL, 1000). -record(state, {node, subtopic, - qos, + qos = ?QOS_2, topic_suffix = <<>>, topic_prefix = <<>>, - mqueue = emqttd_mqueue:mqueue(), + mqueue :: emqttd_mqueue:mqueue(), max_queue_len = 0, ping_down_interval = ?PING_DOWN_INTERVAL, status = up}). From c6c3926962cd70f4e655e9a4ea6a25472424569c Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Jul 2015 14:15:37 +0800 Subject: [PATCH 3/8] bridges start --- rel/files/emqttd_ctl | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index d88f9ff03..f98a3da66 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -225,13 +225,19 @@ case "$1" in fi if [[ $# -eq 2 ]] && [[ $2 = "list" ]]; then $NODETOOL rpc emqttd_ctl bridges list - elif [ $# -eq 4 ]; then + elif [[ $# -eq 2 ]] && [[ $2 = "options" ]]; then + $NODETOOL rpc emqttd_ctl bridges options + elif [[ $# -eq 4 ]] && [[ $2 = "stop" ]]; then + shift + $NODETOOL rpc emqttd_ctl bridges $@ + elif [[ $# -ge 4 ]] && [[ $2 = "start" ]]; then shift $NODETOOL rpc emqttd_ctl bridges $@ else echo "Usage: " echo "$SCRIPT bridges list" echo "$SCRIPT bridges start " + echo "$SCRIPT bridges start " echo "$SCRIPT bridges stop " exit 1 fi @@ -308,8 +314,10 @@ case "$1" in echo " plugins unload #unload plugin" echo " ----------------------------------------------------------------" echo " bridges list #query bridges" + echo " bridges options #bridge options" echo " bridges start #start bridge" - echo " bridges stop #stop bridge" + echo " bridges start #start bridge with options" + echo " bridges stop #stop bridge" echo " ----------------------------------------------------------------" echo " useradd #add user" echo " userdel #delete user" From 98250313ab916625a1ed3fc22d16d8d81dd25035 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Jul 2015 14:16:36 +0800 Subject: [PATCH 4/8] fix issue #206 --- src/emqttd_bridge.erl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index 74a21fadd..dfb68add3 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_bridge). -author("Feng Lee "). @@ -47,15 +48,15 @@ qos = ?QOS_2, topic_suffix = <<>>, topic_prefix = <<>>, - mqueue :: emqttd_mqueue:mqueue(), - max_queue_len = 0, + mqueue :: emqttd_mqueue:mqueue(), + max_queue_len = 10000, ping_down_interval = ?PING_DOWN_INTERVAL, status = up}). --type option() :: {max_queue_len, pos_integer()} | - {qos, mqtt_qos()} | +-type option() :: {qos, mqtt_qos()} | {topic_suffix, binary()} | {topic_prefix, binary()} | + {max_queue_len, pos_integer()} | {ping_down_interval, pos_integer()}. -export_type([option/0]). @@ -85,7 +86,7 @@ init([Node, SubTopic, Options]) -> MQueue = emqttd_mqueue:new(qname(Node, SubTopic), [{max_len, State#state.max_queue_len}], emqttd_alarm:alarm_fun()), - emqttd_pubsub:subscribe({SubTopic, State#state.qos}), + emqttd_pubsub:subscribe(SubTopic, State#state.qos), {ok, State#state{mqueue = MQueue}}; false -> {stop, {cannot_connect, Node}} @@ -102,7 +103,9 @@ parse_opts([{topic_prefix, Prefix} | Opts], State) -> parse_opts([{max_queue_len, Len} | Opts], State) -> parse_opts(Opts, State#state{max_queue_len = Len}); parse_opts([{ping_down_interval, Interval} | Opts], State) -> - parse_opts(Opts, State#state{ping_down_interval = Interval*1000}). + parse_opts(Opts, State#state{ping_down_interval = Interval*1000}); +parse_opts([_Opt | Opts], State) -> + parse_opts(Opts, State). qname(Node, SubTopic) when is_atom(Node) -> qname(atom_to_list(Node), SubTopic); From 8c7d2ebe1cf36693ce15180445f2c67f1f1aff59 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Jul 2015 14:34:02 +0800 Subject: [PATCH 5/8] 0.9.3 --- CHANGELOG.md | 14 ++++++++++++++ src/emqttd_bridge_sup.erl | 10 ++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 311009faa..870c3c18a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,20 @@ emqttd ChangeLog ================== +0.9.3-alpha (2015-07-25) +------------------------- + +Wiki: [Bridge](https://github.com/emqtt/emqttd/wiki/Bridge) + +Improve: emqttd_protocol.hrl to define 'QOS_I' + +Improve: emqttd_pubsub to add subscribe/2 API + +Improve: ./bin/emqttd_ctl to support new bridges command + +Bugfix: issue #206 - Cannot bridge two nodes + + 0.9.2-alpha (2015-07-18) ------------------------- diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index d7ffc0f80..0df4b1343 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_bridge_sup). -author("Feng Lee "). @@ -63,8 +64,13 @@ start_bridge(Node, SubTopic) when is_atom(Node) and is_binary(SubTopic) -> -spec start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}. start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic) -> - Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options), - supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)). + case Node =:= node() of + true -> + {error, bridge_to_self}; + false -> + Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options), + supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)) + end. %%------------------------------------------------------------------------------ %% @doc Stop a bridge From d81946b0d608f1024c603fe75c04882d2dba0800 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Jul 2015 14:34:11 +0800 Subject: [PATCH 6/8] 0.9.3 --- src/emqttd.app.src | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/emqttd.app.src b/src/emqttd.app.src index fb21ec2f5..9aae391fb 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.9.2"}, + {vsn, "0.9.3"}, {modules, []}, {registered, []}, {applications, [kernel, From 90b3344db04066cd8a1bed0ce5dde270e2ae750f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Jul 2015 14:34:25 +0800 Subject: [PATCH 7/8] bridges command --- src/emqttd_ctl.erl | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 0a3debac6..acccb5b5a 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -147,13 +147,29 @@ listeners([]) -> end, esockd:listeners()). bridges(["list"]) -> - lists:foreach(fun({{Node, Topic}, _Pid}) -> - ?PRINT("bridge: ~s ~s~n", [Node, Topic]) + lists:foreach(fun({{Node, Topic}, _Pid}) -> + ?PRINT("bridge: ~s ~s~n", [Node, Topic]) end, emqttd_bridge_sup:bridges()); +bridges(["options"]) -> + ?PRINT_MSG("Options:~n"), + ?PRINT_MSG(" qos = 0 | 1 | 2~n"), + ?PRINT_MSG(" prefix = string~n"), + ?PRINT_MSG(" suffix = string~n"), + ?PRINT_MSG(" queue = integer~n"), + ?PRINT_MSG("Example:~n"), + ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); + bridges(["start", SNode, Topic]) -> case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic)) of - {ok, _} -> ?PRINT_MSG("bridge is started.~n"); + {ok, _} -> ?PRINT_MSG("bridge is started.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end; + +bridges(["start", SNode, Topic, OptStr]) -> + Opts = parse_opts(bridge, OptStr), + case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic), Opts) of + {ok, _} -> ?PRINT_MSG("bridge is started.~n"); {error, Error} -> ?PRINT("error: ~p~n", [Error]) end; @@ -229,3 +245,19 @@ node_name(SNode) -> bin(S) when is_list(S) -> list_to_binary(S); bin(B) when is_binary(B) -> B. +parse_opts(Cmd, OptStr) -> + Tokens = string:tokens(OptStr, ","), + [parse_opt(Cmd, list_to_atom(Opt), Val) + || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]]. + +parse_opt(bridge, qos, Qos) -> + {qos, list_to_integer(Qos)}; +parse_opt(bridge, suffix, Suffix) -> + {topic_suffix, list_to_binary(Suffix)}; +parse_opt(bridge, prefix, Prefix) -> + {topic_prefix, list_to_binary(Prefix)}; +parse_opt(bridge, queue, Len) -> + {max_queue_len, list_to_integer(Len)}; +parse_opt(_Cmd, Opt, _Val) -> + ?PRINT("Bad Option: ~s~n", [Opt]). + From e370e18b199744a2ceade4c5c56383f76d056937 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 25 Jul 2015 14:34:44 +0800 Subject: [PATCH 8/8] subscribe/2 --- src/emqttd_pubsub.erl | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 6deda1950..b2604d3cc 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -43,7 +43,7 @@ -export([start_link/2]). -export([create/1, - subscribe/1, + subscribe/1, subscribe/2, unsubscribe/1, publish/1]). @@ -128,15 +128,21 @@ create(Topic) when is_binary(Topic) -> %% @doc Subscribe topic %% @end %%------------------------------------------------------------------------------ --spec subscribe({Topic, Qos} | list({Topic, Qos})) -> +-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), - Qos :: mqtt_qos(). -subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) -> - call({subscribe, self(), Topic, Qos}); + Qos :: mqtt_qos() | mqtt_qos_name(). +subscribe({Topic, Qos}) when is_binary(Topic) andalso (?IS_QOS(Qos) orelse is_atom(Qos)) -> + call({subscribe, self(), Topic, ?QOS_I(Qos)}); subscribe(Topics = [{_Topic, _Qos} | _]) -> - call({subscribe, self(), Topics}). + call({subscribe, self(), [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- Topics]}). + +-spec subscribe(Topic, Qos) -> {ok, Qos} when + Topic :: binary(), + Qos :: mqtt_qos() | mqtt_qos_name(). +subscribe(Topic, Qos) -> + subscribe({Topic, Qos}). %%------------------------------------------------------------------------------ %% @doc Unsubscribe Topic or Topics