diff --git a/CHANGELOG.md b/CHANGELOG.md index 311009faa..870c3c18a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,20 @@ emqttd ChangeLog ================== +0.9.3-alpha (2015-07-25) +------------------------- + +Wiki: [Bridge](https://github.com/emqtt/emqttd/wiki/Bridge) + +Improve: emqttd_protocol.hrl to define 'QOS_I' + +Improve: emqttd_pubsub to add subscribe/2 API + +Improve: ./bin/emqttd_ctl to support new bridges command + +Bugfix: issue #206 - Cannot bridge two nodes + + 0.9.2-alpha (2015-07-18) ------------------------- diff --git a/README.md b/README.md index bdbd58112..a6951e315 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ emqttd is aimed to provide a solid, enterprise grade, extensible open-source MQT * Client Authentication with username, password. * Client ACL control with ipaddress, clientid, username. * Cluster brokers on several servers. -* Bridge brokers locally or remotelly +* [Bridge](https://github.com/emqtt/emqttd/wiki/Bridge) brokers locally or remotelly * 500K+ concurrent clients connections per server * Extensible architecture with Hooks, Modules and Plugins * Passed eclipse paho interoperability tests diff --git a/rel/files/emqttd_ctl b/rel/files/emqttd_ctl index d88f9ff03..f98a3da66 100755 --- a/rel/files/emqttd_ctl +++ b/rel/files/emqttd_ctl @@ -225,13 +225,19 @@ case "$1" in fi if [[ $# -eq 2 ]] && [[ $2 = "list" ]]; then $NODETOOL rpc emqttd_ctl bridges list - elif [ $# -eq 4 ]; then + elif [[ $# -eq 2 ]] && [[ $2 = "options" ]]; then + $NODETOOL rpc emqttd_ctl bridges options + elif [[ $# -eq 4 ]] && [[ $2 = "stop" ]]; then + shift + $NODETOOL rpc emqttd_ctl bridges $@ + elif [[ $# -ge 4 ]] && [[ $2 = "start" ]]; then shift $NODETOOL rpc emqttd_ctl bridges $@ else echo "Usage: " echo "$SCRIPT bridges list" echo "$SCRIPT bridges start " + echo "$SCRIPT bridges start " echo "$SCRIPT bridges stop " exit 1 fi @@ -308,8 +314,10 @@ case "$1" in echo " plugins unload #unload plugin" echo " ----------------------------------------------------------------" echo " bridges list #query bridges" + echo " bridges options #bridge options" echo " bridges start #start bridge" - echo " bridges stop #stop bridge" + echo " bridges start #start bridge with options" + echo " bridges stop #stop bridge" echo " ----------------------------------------------------------------" echo " useradd #add user" echo " userdel #delete user" diff --git a/src/emqttd.app.src b/src/emqttd.app.src index fb21ec2f5..9aae391fb 100644 --- a/src/emqttd.app.src +++ b/src/emqttd.app.src @@ -1,7 +1,7 @@ {application, emqttd, [ {description, "Erlang MQTT Broker"}, - {vsn, "0.9.2"}, + {vsn, "0.9.3"}, {modules, []}, {registered, []}, {applications, [kernel, diff --git a/src/emqttd_bridge.erl b/src/emqttd_bridge.erl index d59f187cd..dfb68add3 100644 --- a/src/emqttd_bridge.erl +++ b/src/emqttd_bridge.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_bridge). -author("Feng Lee "). @@ -44,18 +45,18 @@ -define(PING_DOWN_INTERVAL, 1000). -record(state, {node, subtopic, - qos, + qos = ?QOS_2, topic_suffix = <<>>, topic_prefix = <<>>, - mqueue = emqttd_mqueue:mqueue(), - max_queue_len = 0, + mqueue :: emqttd_mqueue:mqueue(), + max_queue_len = 10000, ping_down_interval = ?PING_DOWN_INTERVAL, status = up}). --type option() :: {max_queue_len, pos_integer()} | - {qos, mqtt_qos()} | +-type option() :: {qos, mqtt_qos()} | {topic_suffix, binary()} | {topic_prefix, binary()} | + {max_queue_len, pos_integer()} | {ping_down_interval, pos_integer()}. -export_type([option/0]). @@ -85,7 +86,7 @@ init([Node, SubTopic, Options]) -> MQueue = emqttd_mqueue:new(qname(Node, SubTopic), [{max_len, State#state.max_queue_len}], emqttd_alarm:alarm_fun()), - emqttd_pubsub:subscribe({SubTopic, State#state.qos}), + emqttd_pubsub:subscribe(SubTopic, State#state.qos), {ok, State#state{mqueue = MQueue}}; false -> {stop, {cannot_connect, Node}} @@ -102,7 +103,9 @@ parse_opts([{topic_prefix, Prefix} | Opts], State) -> parse_opts([{max_queue_len, Len} | Opts], State) -> parse_opts(Opts, State#state{max_queue_len = Len}); parse_opts([{ping_down_interval, Interval} | Opts], State) -> - parse_opts(Opts, State#state{ping_down_interval = Interval*1000}). + parse_opts(Opts, State#state{ping_down_interval = Interval*1000}); +parse_opts([_Opt | Opts], State) -> + parse_opts(Opts, State). qname(Node, SubTopic) when is_atom(Node) -> qname(atom_to_list(Node), SubTopic); diff --git a/src/emqttd_bridge_sup.erl b/src/emqttd_bridge_sup.erl index d7ffc0f80..0df4b1343 100644 --- a/src/emqttd_bridge_sup.erl +++ b/src/emqttd_bridge_sup.erl @@ -24,6 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- + -module(emqttd_bridge_sup). -author("Feng Lee "). @@ -63,8 +64,13 @@ start_bridge(Node, SubTopic) when is_atom(Node) and is_binary(SubTopic) -> -spec start_bridge(atom(), binary(), [emqttd_bridge:option()]) -> {ok, pid()} | {error, any()}. start_bridge(Node, SubTopic, Options) when is_atom(Node) and is_binary(SubTopic) -> - Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options), - supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)). + case Node =:= node() of + true -> + {error, bridge_to_self}; + false -> + Options1 = emqttd_opts:merge(emqttd_broker:env(bridge), Options), + supervisor:start_child(?MODULE, bridge_spec(Node, SubTopic, Options1)) + end. %%------------------------------------------------------------------------------ %% @doc Stop a bridge diff --git a/src/emqttd_ctl.erl b/src/emqttd_ctl.erl index 0a3debac6..acccb5b5a 100644 --- a/src/emqttd_ctl.erl +++ b/src/emqttd_ctl.erl @@ -147,13 +147,29 @@ listeners([]) -> end, esockd:listeners()). bridges(["list"]) -> - lists:foreach(fun({{Node, Topic}, _Pid}) -> - ?PRINT("bridge: ~s ~s~n", [Node, Topic]) + lists:foreach(fun({{Node, Topic}, _Pid}) -> + ?PRINT("bridge: ~s ~s~n", [Node, Topic]) end, emqttd_bridge_sup:bridges()); +bridges(["options"]) -> + ?PRINT_MSG("Options:~n"), + ?PRINT_MSG(" qos = 0 | 1 | 2~n"), + ?PRINT_MSG(" prefix = string~n"), + ?PRINT_MSG(" suffix = string~n"), + ?PRINT_MSG(" queue = integer~n"), + ?PRINT_MSG("Example:~n"), + ?PRINT_MSG(" qos=2,prefix=abc/,suffix=/yxz,queue=1000~n"); + bridges(["start", SNode, Topic]) -> case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic)) of - {ok, _} -> ?PRINT_MSG("bridge is started.~n"); + {ok, _} -> ?PRINT_MSG("bridge is started.~n"); + {error, Error} -> ?PRINT("error: ~p~n", [Error]) + end; + +bridges(["start", SNode, Topic, OptStr]) -> + Opts = parse_opts(bridge, OptStr), + case emqttd_bridge_sup:start_bridge(list_to_atom(SNode), bin(Topic), Opts) of + {ok, _} -> ?PRINT_MSG("bridge is started.~n"); {error, Error} -> ?PRINT("error: ~p~n", [Error]) end; @@ -229,3 +245,19 @@ node_name(SNode) -> bin(S) when is_list(S) -> list_to_binary(S); bin(B) when is_binary(B) -> B. +parse_opts(Cmd, OptStr) -> + Tokens = string:tokens(OptStr, ","), + [parse_opt(Cmd, list_to_atom(Opt), Val) + || [Opt, Val] <- [string:tokens(S, "=") || S <- Tokens]]. + +parse_opt(bridge, qos, Qos) -> + {qos, list_to_integer(Qos)}; +parse_opt(bridge, suffix, Suffix) -> + {topic_suffix, list_to_binary(Suffix)}; +parse_opt(bridge, prefix, Prefix) -> + {topic_prefix, list_to_binary(Prefix)}; +parse_opt(bridge, queue, Len) -> + {max_queue_len, list_to_integer(Len)}; +parse_opt(_Cmd, Opt, _Val) -> + ?PRINT("Bad Option: ~s~n", [Opt]). + diff --git a/src/emqttd_pubsub.erl b/src/emqttd_pubsub.erl index 6deda1950..b2604d3cc 100644 --- a/src/emqttd_pubsub.erl +++ b/src/emqttd_pubsub.erl @@ -43,7 +43,7 @@ -export([start_link/2]). -export([create/1, - subscribe/1, + subscribe/1, subscribe/2, unsubscribe/1, publish/1]). @@ -128,15 +128,21 @@ create(Topic) when is_binary(Topic) -> %% @doc Subscribe topic %% @end %%------------------------------------------------------------------------------ --spec subscribe({Topic, Qos} | list({Topic, Qos})) -> +-spec subscribe({Topic, Qos} | list({Topic, Qos})) -> {ok, Qos | list(Qos)} | {error, any()} when Topic :: binary(), - Qos :: mqtt_qos(). -subscribe({Topic, Qos}) when is_binary(Topic) andalso ?IS_QOS(Qos) -> - call({subscribe, self(), Topic, Qos}); + Qos :: mqtt_qos() | mqtt_qos_name(). +subscribe({Topic, Qos}) when is_binary(Topic) andalso (?IS_QOS(Qos) orelse is_atom(Qos)) -> + call({subscribe, self(), Topic, ?QOS_I(Qos)}); subscribe(Topics = [{_Topic, _Qos} | _]) -> - call({subscribe, self(), Topics}). + call({subscribe, self(), [{Topic, ?QOS_I(Qos)} || {Topic, Qos} <- Topics]}). + +-spec subscribe(Topic, Qos) -> {ok, Qos} when + Topic :: binary(), + Qos :: mqtt_qos() | mqtt_qos_name(). +subscribe(Topic, Qos) -> + subscribe({Topic, Qos}). %%------------------------------------------------------------------------------ %% @doc Unsubscribe Topic or Topics