From b466b8778b8ff8dcd907249901a17333e0598718 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 11 Mar 2015 00:15:27 +0800 Subject: [PATCH] pub_interval = 0 --- apps/emqttd/src/emqttd.erl | 9 ++++- apps/emqttd/src/emqttd_app.erl | 5 ++- apps/emqttd/src/emqttd_bridge.erl | 56 ++++++++++++++++++++++----- apps/emqttd/src/emqttd_bridge_sup.erl | 8 +--- apps/emqttd/src/emqttd_broker.erl | 9 ++++- apps/emqttd/src/emqttd_cm.erl | 2 +- apps/emqttd/src/emqttd_metrics.erl | 12 ++++-- apps/emqttd/src/emqttd_plugin.erl | 2 +- apps/emqttd/src/emqttd_protocol.erl | 2 +- apps/emqttd/src/emqttd_pubsub.erl | 2 +- apps/emqttd/src/emqttd_serialiser.erl | 2 +- apps/emqttd/src/emqttd_server.erl | 2 +- apps/emqttd/src/emqttd_session.erl | 2 +- 13 files changed, 82 insertions(+), 31 deletions(-) diff --git a/apps/emqttd/src/emqttd.erl b/apps/emqttd/src/emqttd.erl index d63f7cecb..435408f47 100644 --- a/apps/emqttd/src/emqttd.erl +++ b/apps/emqttd/src/emqttd.erl @@ -28,7 +28,7 @@ -author('feng@emqtt.io'). --export([start/0, open/1]). +-export([start/0, open/1, is_running/1]). -define(MQTT_SOCKOPTS, [ binary, @@ -69,4 +69,11 @@ open({http, Port, Options}) -> MFArgs = {emqttd_http, handle, []}, mochiweb:start_http(Port, Options, MFArgs). +is_running(Node) -> + case rpc:call(Node, erlang, whereis, [emqttd]) of + {badrpc, _} -> false; + undefined -> false; + Pid when is_pid(Pid) -> true + end. + diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index 6f9c1c0b1..e7dad3684 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -64,7 +64,7 @@ start(_StartType, _StartArgs) -> ok = emqttd_mnesia:wait(), {ok, Listeners} = application:get_env(listen), emqttd:open(Listeners), - register(emqtt, self()), + register(emqttd, self()), print_vsn(), {ok, Sup}. @@ -103,8 +103,9 @@ start_servers(Sup) -> {"emqttd auth", emqttd_auth}, {"emqttd pubsub", emqttd_pubsub}, {"emqttd router", emqttd_router}, - {"emqttd broker", emqttd_broker, BrokerOpts}, + {"emqttd broker", emqttd_broker, BrokerOpts}, {"emqttd metrics", emqttd_metrics, MetricOpts}, + {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, {"emqttd monitor", emqttd_monitor} ]). diff --git a/apps/emqttd/src/emqttd_bridge.erl b/apps/emqttd/src/emqttd_bridge.erl index 086e9f4a8..ba2736090 100644 --- a/apps/emqttd/src/emqttd_bridge.erl +++ b/apps/emqttd/src/emqttd_bridge.erl @@ -39,7 +39,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {node, local_topic, status = running}). +-define(PING_INTERVAL, 1000). + +-record(state, {node, local_topic, status = up}). %%%============================================================================= %%% API @@ -53,9 +55,15 @@ start_link(Node, LocalTopic) -> %%%============================================================================= init([Node, LocalTopic]) -> - emqttd_pubsub:subscribe({LocalTopic, ?QOS_0}, self()), - %%TODO: monitor nodes... - {ok, #state{node = Node, local_topic = LocalTopic}}. + process_flag(trap_exit, true), + case net_kernel:connect_node(Node) of + true -> + true = erlang:monitor_node(Node, true), + emqttd_pubsub:subscribe({LocalTopic, ?QOS_0}, self()), + {ok, #state{node = Node, local_topic = LocalTopic}}; + false -> + {stop, {cannot_connect, Node}} + end. handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -63,13 +71,43 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. +handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = down}) -> + lager:warning("Bridge Dropped Msg for ~p Down:~n~p", [Node, Msg]), + {noreply, State}; + +handle_info({dispatch, {_From, Msg}}, State = #state{node = Node, status = up}) -> + rpc:cast(Node, emqttd_router, route, [Msg]), + {noreply, State}; + handle_info({nodedown, Node}, State = #state{node = Node}) -> - %%.... + lager:warning("Bridge Node Down: ~p", [Node]), + erlang:send_after(?PING_INTERVAL, self(), ping_down_node), {noreply, State#state{status = down}}; -handle_info({dispatch, {_From, Msg}}, State = #state{node = Node}) -> - %%TODO: CAST - rpc:call(Node, emqttd_router, route, [Msg]), +handle_info({nodeup, Node}, State = #state{node = Node}) -> + %% TODO: Really fast?? + case emqttd:is_running(Node) of + true -> + lager:warning("Bridge Node Up: ~p", [Node]), + {noreply, State#state{status = up}}; + false -> + self() ! {nodedown, Node}, + {noreply, State#state{status = down}} + end; + +handle_info(ping_down_node, State = #state{node = Node}) -> + Self = self(), + spawn_link(fun() -> + case net_kernel:connect_node(Node) of + true -> %%TODO: this is not right... fixme later + Self ! {nodeup, Node}; + false -> + erlang:send_after(?PING_INTERVAL, Self, ping_down_node) + end + end), + {noreply, State}; + +handle_info({'EXIT', _Pid, normal}, State) -> {noreply, State}; handle_info(Info, State) -> @@ -86,5 +124,3 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= - - diff --git a/apps/emqttd/src/emqttd_bridge_sup.erl b/apps/emqttd/src/emqttd_bridge_sup.erl index e1b85c2be..ecdddc5e4 100644 --- a/apps/emqttd/src/emqttd_bridge_sup.erl +++ b/apps/emqttd/src/emqttd_bridge_sup.erl @@ -55,13 +55,7 @@ start_link() -> %%------------------------------------------------------------------------------ -spec start_bridge(atom(), binary()) -> {ok, pid()} | {error, any()}. start_bridge(Node, LocalTopic) when is_atom(Node) and is_binary(LocalTopic) -> - %%TODO: mv this code to emqttd_bridge??? - case net_kernel:connect_node(Node) of - true -> - supervisor:start_child(?MODULE, bridge_spec(Node, LocalTopic)); - false -> - {error, {cannot_connect, Node}} - end. + supervisor:start_child(?MODULE, bridge_spec(Node, LocalTopic)). %%------------------------------------------------------------------------------ %% @doc diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 34b2da602..ea73efa3e 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -154,7 +154,11 @@ init([Options]) -> [{atomic, _} = create(systop(Topic)) || Topic <- Topics], SysInterval = proplists:get_value(sys_interval, Options, 60), State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, - {ok, tick(random:uniform(SysInterval), State), hibernate}. + Delay = if + SysInterval == 0 -> 0; + true -> random:uniform(SysInterval) + end, + {ok, tick(Delay, State), hibernate}. handle_call(uptime, _From, State) -> {reply, uptime(State), State}; @@ -224,9 +228,12 @@ uptime(days, D) -> tick(State = #state{sys_interval = SysInterval}) -> tick(SysInterval, State). +tick(0, State) -> + State; tick(Delay, State) -> State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}. i2b(I) when is_integer(I) -> list_to_binary(integer_to_list(I)). + diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 352e2977e..c2f7cc079 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt client manager. +%%% emqttd client manager. %%% %%% @end %%%----------------------------------------------------------------------------- diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index 5017cac54..ad1b4b85d 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -56,7 +56,7 @@ %%------------------------------------------------------------------------------ %% @doc -%% Start emqtt metrics. +%% Start emqttd metrics. %% %% @end %%------------------------------------------------------------------------------ @@ -174,7 +174,7 @@ key(counter, Metric) -> %%% gen_server callbacks %%%============================================================================= -init(Options) -> +init([Options]) -> random:seed(now()), Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % Create metrics table @@ -184,7 +184,11 @@ init(Options) -> % $SYS Topics for metrics [{atomic, _} = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], PubInterval = proplists:get_value(pub_interval, Options, 60), - {ok, tick(random:uniform(PubInterval), #state{pub_interval = PubInterval}), hibernate}. + Delay = if + PubInterval == 0 -> 0; + true -> random:uniform(PubInterval) + end, + {ok, tick(Delay, #state{pub_interval = PubInterval}), hibernate}. handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. @@ -226,6 +230,8 @@ new_metric({counter, Name}) -> tick(State = #state{pub_interval = PubInterval}) -> tick(PubInterval, State). +tick(0, State) -> + State; tick(Delay, State) -> State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}. diff --git a/apps/emqttd/src/emqttd_plugin.erl b/apps/emqttd/src/emqttd_plugin.erl index 38c01f4a3..099966362 100644 --- a/apps/emqttd/src/emqttd_plugin.erl +++ b/apps/emqttd/src/emqttd_plugin.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt plugin framework. +%%% emqttd plugin framework. %%% %%% @end %%%----------------------------------------------------------------------------- diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 867694c9c..941f0635e 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt protocol. +%%% emqttd protocol. %%% %%% @end %%%----------------------------------------------------------------------------- diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 20d99c187..67b841dd8 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt core pubsub. +%%% emqttd core pubsub. %%% %%% @end %%%----------------------------------------------------------------------------- diff --git a/apps/emqttd/src/emqttd_serialiser.erl b/apps/emqttd/src/emqttd_serialiser.erl index 839510ea7..ab7d81bd2 100644 --- a/apps/emqttd/src/emqttd_serialiser.erl +++ b/apps/emqttd/src/emqttd_serialiser.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt packet serialiser. +%%% emqttd packet serialiser. %%% %%% @end %%%----------------------------------------------------------------------------- diff --git a/apps/emqttd/src/emqttd_server.erl b/apps/emqttd/src/emqttd_server.erl index 238e32c02..d70cebca1 100644 --- a/apps/emqttd/src/emqttd_server.erl +++ b/apps/emqttd/src/emqttd_server.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt server. retain messages??? +%%% emqttd server. retain messages??? %%% TODO: redesign... %%% @end %%%----------------------------------------------------------------------------- diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 714fac6fe..9754f8bd7 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqtt session. +%%% emqttd session. %%% %%% @end %%%-----------------------------------------------------------------------------