From b4298f2b0538178aa335df7226ef5adb6d4fc6b3 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 27 Apr 2015 12:02:44 +0800 Subject: [PATCH] fix options, stats --- apps/emqttd/src/emqttd_access_control.erl | 11 +- apps/emqttd/src/emqttd_app.erl | 33 +++--- apps/emqttd/src/emqttd_broker.erl | 103 ++++++++++-------- apps/emqttd/src/emqttd_cm.erl | 2 +- apps/emqttd/src/emqttd_ctl.erl | 2 +- apps/emqttd/src/emqttd_metrics.erl | 38 ++++--- apps/emqttd/src/emqttd_mnesia.erl | 4 +- apps/emqttd/src/emqttd_pubsub.erl | 4 +- apps/emqttd/src/emqttd_pubsub_sup.erl | 9 +- apps/emqttd/src/emqttd_session.erl | 6 +- apps/emqttd/src/emqttd_sm.erl | 2 +- apps/emqttd/src/emqttd_stats.erl | 20 ++-- .../src/{emqttd_utils.erl => emqttd_util.erl} | 2 +- rel/files/app.config | 9 +- 14 files changed, 127 insertions(+), 118 deletions(-) rename apps/emqttd/src/{emqttd_utils.erl => emqttd_util.erl} (99%) diff --git a/apps/emqttd/src/emqttd_access_control.erl b/apps/emqttd/src/emqttd_access_control.erl index 68ea96d69..994957a5d 100644 --- a/apps/emqttd/src/emqttd_access_control.erl +++ b/apps/emqttd/src/emqttd_access_control.erl @@ -35,7 +35,7 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/1, +-export([start_link/0, auth/2, % authentication check_acl/3, % acl check reload_acl/0, % reload acl @@ -58,9 +58,9 @@ %% @doc Start access control server %% @end %%------------------------------------------------------------------------------ --spec start_link(AcOpts :: list()) -> {ok, pid()} | ignore | {error, any()}. -start_link(AcOpts) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [AcOpts], []). +-spec start_link() -> {ok, pid()} | ignore | {error, any()}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%------------------------------------------------------------------------------ %% @doc Authenticate MQTT Client @@ -151,7 +151,8 @@ stop() -> %%% gen_server callbacks %%%============================================================================= -init([AcOpts]) -> +init([]) -> + {ok, AcOpts} = application:get_env(access_control), ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), ets:insert(?ACCESS_CONTROL_TAB, {auth_modules, init_mods(auth, proplists:get_value(auth, AcOpts))}), ets:insert(?ACCESS_CONTROL_TAB, {acl_modules, init_mods(acl, proplists:get_value(acl, AcOpts))}), diff --git a/apps/emqttd/src/emqttd_app.erl b/apps/emqttd/src/emqttd_app.erl index cdd35250c..fe29c0c1d 100644 --- a/apps/emqttd/src/emqttd_app.erl +++ b/apps/emqttd/src/emqttd_app.erl @@ -67,25 +67,20 @@ print_vsn() -> ?PRINT("~s ~s is running now~n", [Desc, Vsn]). start_servers(Sup) -> - {ok, PubSubOpts} = application:get_env(pubsub), - {ok, BrokerOpts} = application:get_env(broker), - {ok, MetricOpts} = application:get_env(metrics), - {ok, AccessOpts} = application:get_env(access_control), - Servers = [ - {"emqttd event", emqttd_event}, - {"emqttd trace", emqttd_trace}, - {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, - {"emqttd client manager", {supervisor, emqttd_cm_sup}}, - {"emqttd session manager", emqttd_sm}, - {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, - {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}, PubSubOpts}, - %{"emqttd router", emqttd_router}, - {"emqttd broker", emqttd_broker, BrokerOpts}, - {"emqttd stats", emqttd_stats}, - {"emqttd metrics", emqttd_metrics, MetricOpts}, - {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, - {"emqttd access control", emqttd_access_control, AccessOpts}, - {"emqttd system monitor", emqttd_sysmon}], + Servers = [{"emqttd event", emqttd_event}, + {"emqttd trace", emqttd_trace}, + {"emqttd pooler", {supervisor, emqttd_pooler_sup}}, + {"emqttd client manager", {supervisor, emqttd_cm_sup}}, + {"emqttd session manager", emqttd_sm}, + {"emqttd session supervisor", {supervisor, emqttd_session_sup}}, + {"emqttd pubsub", {supervisor, emqttd_pubsub_sup}}, + %{"emqttd router", emqttd_router}, + {"emqttd broker", emqttd_broker}, + {"emqttd stats", emqttd_stats}, + {"emqttd metrics", emqttd_metrics}, + {"emqttd bridge supervisor", {supervisor, emqttd_bridge_sup}}, + {"emqttd access control", emqttd_access_control}, + {"emqttd system monitor", emqttd_sysmon}], [start_server(Sup, Server) || Server <- Servers]. start_server(_Sup, {Name, F}) when is_function(F) -> diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 3fe16180c..811bef7e7 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -37,9 +37,13 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/1]). +-export([start_link/0]). --export([version/0, uptime/0, datetime/0, sysdescr/0]). +%% Event API +-export([subscribe/1, notify/2]). + +%% Broker API +-export([env/1, version/0, uptime/0, datetime/0, sysdescr/0]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -47,7 +51,7 @@ -define(BROKER_TAB, mqtt_broker). --record(state, {started_at, sys_interval, tick_timer}). +-record(state, {started_at, sys_interval, tick_tref}). %%%============================================================================= %%% API @@ -57,9 +61,33 @@ %% @doc Start emqttd broker %% @end %%------------------------------------------------------------------------------ --spec start_link([tuple()]) -> {ok, pid()} | ignore | {error, term()}. -start_link(Options) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [Options], []). +-spec start_link() -> {ok, pid()} | ignore | {error, any()}. +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%------------------------------------------------------------------------------ +%% @doc Subscribe broker event +%% @end +%%------------------------------------------------------------------------------ +-spec subscribe(EventType :: any()) -> ok. +subscribe(EventType) -> + gproc:reg({p, l, {broker, EventType}}). + +%%------------------------------------------------------------------------------ +%% @doc Notify broker event +%% @end +%%------------------------------------------------------------------------------ +-spec notify(EventType :: any(), Event :: any()) -> ok. +notify(EventType, Event) -> + Key = {broker, EventType}, + gproc:send({p, l, Key}, {self(), Key, Event}). + +%%------------------------------------------------------------------------------ +%% @doc Get broker env +%% @end +%%------------------------------------------------------------------------------ +env(Name) -> + proplists:get_value(Name, application:get_env(emqttd, mqtt_broker, [])). %%------------------------------------------------------------------------------ %% @doc Get broker version @@ -100,19 +128,16 @@ datetime() -> %%% gen_server callbacks %%%============================================================================= -init([Options]) -> +init([]) -> random:seed(now()), - ets:new(?BROKER_TAB, [set, public, named_table, {write_concurrency, true}]), - [ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics], + ets:new(?BROKER_TAB, [set, public, named_table]), % Create $SYS Topics - [ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS], - SysInterval = proplists:get_value(sys_interval, Options, 60), - State = #state{started_at = os:timestamp(), sys_interval = SysInterval}, - Delay = if - SysInterval == 0 -> 0; - true -> random:uniform(SysInterval) - end, - {ok, tick(Delay, State), hibernate}. + [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], + % Tick + SysInterval = env(sys_interval), + {ok, TRef} = timer:send_interval(timer:seconds(SysInterval), tick), + State = #state{started_at = os:timestamp(), sys_interval = SysInterval, tick_tref = TRef}, + {ok, State, hibernate}. handle_call(uptime, _From, State) -> {reply, uptime(State), State}; @@ -124,13 +149,11 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info(tick, State) -> - retain(systop(version), list_to_binary(version())), - retain(systop(sysdescr), list_to_binary(sysdescr())), - publish(systop(uptime), list_to_binary(uptime(State))), - publish(systop(datetime), list_to_binary(datetime())), - [publish(systop(Stat), i2b(Val)) - || {Stat, Val} <- ets:tab2list(?BROKER_TAB)], - {noreply, tick(State), hibernate}; + retain(version, list_to_binary(version())), + retain(sysdescr, list_to_binary(sysdescr())), + publish(uptime, list_to_binary(uptime(State))), + publish(datetime, list_to_binary(datetime())), + {noreply, State, hibernate}; handle_info(_Info, State) -> {noreply, State}. @@ -145,20 +168,21 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= -systop(Name) when is_atom(Name) -> - list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). - -create(Topic) -> - emqttd_pubsub:create(Topic). +create_topic(Topic) -> + emqttd_pubsub:create(emqtt_topic:systop(Topic)). retain(Topic, Payload) when is_binary(Payload) -> - emqttd_pubsub:publish(broker, #mqtt_message{retain = true, - topic = Topic, - payload = Payload}). + publish(#mqtt_message{retain = true, + topic = emqtt_topic:systop(Topic), + payload = Payload}). publish(Topic, Payload) when is_binary(Payload) -> - emqttd_pubsub:publish(broker, #mqtt_message{topic = Topic, - payload = Payload}). + publish( #mqtt_message{topic = emqtt_topic:systop(Topic), + payload = Payload}). + +publish(Msg) -> + emqttd_pubsub:publish(broker, Msg). + uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, @@ -179,14 +203,3 @@ uptime(hours, H) -> uptime(days, D) -> [integer_to_list(D), " days,"]. -tick(State = #state{sys_interval = SysInterval}) -> - tick(SysInterval, State). - -tick(0, State) -> - State; -tick(Delay, State) -> - State#state{tick_timer = erlang:send_after(Delay * 1000, self(), tick)}. - -i2b(I) when is_integer(I) -> - list_to_binary(integer_to_list(I)). - diff --git a/apps/emqttd/src/emqttd_cm.erl b/apps/emqttd/src/emqttd_cm.erl index 809e88a12..f7916e562 100644 --- a/apps/emqttd/src/emqttd_cm.erl +++ b/apps/emqttd/src/emqttd_cm.erl @@ -94,7 +94,7 @@ unregister(ClientId) when is_binary(ClientId) -> init([Id, TabId]) -> gproc_pool:connect_worker(?POOL, {?MODULE, Id}), - StatsFun = emqttd_broker:statsfun('clients/count', 'clients/max'), + StatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'), {ok, #state{tab = TabId, statsfun = StatsFun}}. handle_call({register, ClientId, Pid}, _From, State = #state{tab = Tab}) -> diff --git a/apps/emqttd/src/emqttd_ctl.erl b/apps/emqttd/src/emqttd_ctl.erl index 0da5003b9..ff43ad7c4 100644 --- a/apps/emqttd/src/emqttd_ctl.erl +++ b/apps/emqttd/src/emqttd_ctl.erl @@ -124,7 +124,7 @@ broker([]) -> [?PRINT("~s: ~s~n", [Fun, emqttd_broker:Fun()]) || Fun <- Funs]. stats([]) -> - [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_broker:getstats()]. + [?PRINT("~s: ~p~n", [Stat, Val]) || {Stat, Val} <- emqttd_stats:getstats()]. metrics([]) -> [?PRINT("~s: ~p~n", [Metric, Val]) || {Metric, Val} <- emqttd_metrics:all()]. diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index df85e9a71..98d2ec81c 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -50,7 +50,7 @@ -define(METRIC_TAB, mqtt_metric). --record(state, {tick}). +-record(state, {tick_tref}). %%%============================================================================= %%% API @@ -60,7 +60,7 @@ %% @doc Start metrics server %% @end %%------------------------------------------------------------------------------ --spec start_link() -> {ok, pid()} | ignore | {error, term()}. +-spec start_link() -> {ok, pid()} | ignore | {error, any()}. start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -157,34 +157,33 @@ key(counter, Metric) -> init([]) -> random:seed(now()), - {ok, BrokerOpts} = application:get_env(mqtt_broker), Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % Create metrics table ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), % Init metrics - [new_metric(Metric) || Metric <- Metrics], + [create_metric(Metric) || Metric <- Metrics], % $SYS Topics for metrics - [ok = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics], - % Tick to publish stats - Tick = emqttd_tick:new(proplists:get_value(sys_interval, BrokerOpts, 60)), - {ok, #state{tick = Tick}, hibernate}. + [ok = create_topic(Topic) || {_, Topic} <- Metrics], + % Tick to publish metrics + {ok, TRef} = timer:send_interval(timer:seconds(emqttd_broker:env(sys_interval)), tick), + {ok, #state{tick_tref = TRef}, hibernate}. handle_call(_Req, _From, State) -> - {reply, {error, badreq}, State}. + {reply, error, State}. handle_cast(_Msg, State) -> {noreply, State}. -handle_info(tick, State = #state{tick = Tick}) -> +handle_info(tick, State) -> % publish metric message [publish(Metric, Val) || {Metric, Val} <- all()], - {noreply, State#state{tick = emqttd_tick:tick(Tick)}, hibernate}; + {noreply, State, hibernate}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{tick_tref = TRef}) -> + timer:cancel(TRef), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -194,14 +193,17 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= publish(Metric, Val) -> - emqttd_pubsub:publish(metrics, #mqtt_message{ - topic = emqtt_topic:systop(Metric), - payload = emqttd_utils:integer_to_binary(Val)}). + emqttd_pubsub:publish(metrics, #mqtt_message{topic = emqtt_topic:systop(Metric), + payload = emqttd_util:integer_to_binary(Val)}). -new_metric({gauge, Name}) -> +create_metric({gauge, Name}) -> ets:insert(?METRIC_TAB, {{Name, 0}, 0}); -new_metric({counter, Name}) -> +create_metric({counter, Name}) -> Schedulers = lists:seq(1, erlang:system_info(schedulers)), [ets:insert(?METRIC_TAB, {{Name, I}, 0}) || I <- Schedulers]. +create_topic(Topic) -> + emqttd_pubsub:create(emqtt_topic:systop(Topic)). + + diff --git a/apps/emqttd/src/emqttd_mnesia.erl b/apps/emqttd/src/emqttd_mnesia.erl index bea986ad1..3bfc36fb0 100644 --- a/apps/emqttd/src/emqttd_mnesia.erl +++ b/apps/emqttd/src/emqttd_mnesia.erl @@ -86,7 +86,7 @@ init_tables() -> %% @end %%------------------------------------------------------------------------------ create_tables() -> - emqttd_utils:apply_module_attributes(boot_mnesia). + emqttd_util:apply_module_attributes(boot_mnesia). create_table(Table, Attrs) -> case mnesia:create_table(Table, Attrs) of @@ -103,7 +103,7 @@ create_table(Table, Attrs) -> %% @end %%------------------------------------------------------------------------------ copy_tables() -> - emqttd_utils:apply_module_attributes(copy_mnesia). + emqttd_util:apply_module_attributes(copy_mnesia). copy_table(Table) -> case mnesia:add_table_copy(Table, node(), ram_copies) of diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 016d93584..92a46936c 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -355,10 +355,10 @@ setstats(all) -> setstats(topics), setstats(subscribers); setstats(topics) -> - emqttd_broker:setstat('topics/count', + emqttd_stats:setstat('topics/count', mnesia:table_info(topic, size)); setstats(subscribers) -> - emqttd_broker:setstats('subscribers/count', + emqttd_stats:setstats('subscribers/count', 'subscribers/max', mnesia:table_info(subscriber, size)). diff --git a/apps/emqttd/src/emqttd_pubsub_sup.erl b/apps/emqttd/src/emqttd_pubsub_sup.erl index 590615ae0..fc5a55cdb 100644 --- a/apps/emqttd/src/emqttd_pubsub_sup.erl +++ b/apps/emqttd/src/emqttd_pubsub_sup.erl @@ -33,15 +33,16 @@ -behaviour(supervisor). %% API --export([start_link/1]). +-export([start_link/0]). %% Supervisor callbacks -export([init/1]). -start_link(Opts) -> - supervisor:start_link({local, ?MODULE}, ?MODULE, [Opts]). +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). -init([Opts]) -> +init([]) -> + {ok, Opts} = application:get_env(mqtt_pubsub), Schedulers = erlang:system_info(schedulers), PoolSize = proplists:get_value(pool_size, Opts, Schedulers), gproc_pool:new(pubsub, hash, [{size, PoolSize}]), diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 28ba042e7..7e1696adb 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -46,7 +46,7 @@ -export([store/2]). %% Start gen_server --export([start_link/3]). +-export([start_link/2]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -301,7 +301,7 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{ end, %% cancel timeout timer - emqttd_utils:cancel_timer(ETimer), + emqttd_util:cancel_timer(ETimer), %% redelivery PUBREL lists:foreach(fun(PacketId) -> @@ -410,7 +410,7 @@ next_msg_id(State = #session_state{message_id = MsgId}) -> start_expire_timer(State = #session_state{expires = Expires, expire_timer = OldTimer}) -> - emqttd_utils:cancel_timer(OldTimer), + emqttd_util:cancel_timer(OldTimer), Timer = erlang:send_after(Expires * 1000, self(), session_expired), State#session_state{expire_timer = Timer}. diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index c44b9dd5d..70b265f0f 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -100,7 +100,7 @@ destroy_session(ClientId) -> init([]) -> process_flag(trap_exit, true), TabId = ets:new(?SESSION_TAB, [set, protected, named_table]), - StatsFun = emqttd_broker:statsfun('sessions/count', 'sessions/max'), + StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'), {ok, #state{tabid = TabId, statsfun = StatsFun}}. handle_call({start_session, ClientId, ClientPid}, _From, State = #state{tabid = Tab}) -> diff --git a/apps/emqttd/src/emqttd_stats.erl b/apps/emqttd/src/emqttd_stats.erl index 307ce2af6..550d7aa89 100644 --- a/apps/emqttd/src/emqttd_stats.erl +++ b/apps/emqttd/src/emqttd_stats.erl @@ -49,7 +49,7 @@ -define(STATS_TAB, mqtt_stats). --record(state, {tick}). +-record(state, {tick_tref}). %%%============================================================================= %%% API @@ -122,15 +122,14 @@ setstats(Stat, MaxStat, Val) -> init([]) -> random:seed(now()), - {ok, Options} = application:get_env(mqtt_broker), ets:new(?STATS_TAB, [set, public, named_table, {write_concurrency, true}]), Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB, [ets:insert(?STATS_TAB, {Topic, 0}) || Topic <- Topics], % Create $SYS Topics [ok = emqttd_pubsub:create(emqtt_topic:systop(Topic)) || Topic <- Topics], % Tick to publish stats - Tick = emqttd_tick:new(proplists:get_value(sys_interval, Options, 60)), - {ok, #state{tick = Tick}, hibernate}. + {ok, TRef} = timer:send_interval(timer:seconds(emqttd_broker:env(sys_interval)), tick), + {ok, #state{tick_tref = TRef}, hibernate}. handle_call(_Request, _From, State) -> {reply, error, State}. @@ -138,15 +137,15 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info(tick, State = #state{tick = Tick}) -> +handle_info(tick, State) -> [publish(Stat, Val) || {Stat, Val} <- ets:tab2list(?STATS_TAB)], - {noreply, State#state{tick = emqttd_tick:tick(Tick)}, hibernate}; + {noreply, State, hibernate}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, #state{tick_tref = TRef}) -> + timer:cancel(TRef), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -155,8 +154,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= publish(Stat, Val) -> - emqttd_pubsub:publish(stats, #mqtt_message{ - topic = emqtt_topic:systop(Stat), - payload = emqttd_utils:integer_to_binary(Val)}). + emqttd_pubsub:publish(stats, #mqtt_message{topic = emqtt_topic:systop(Stat), + payload = emqttd_util:integer_to_binary(Val)}). diff --git a/apps/emqttd/src/emqttd_utils.erl b/apps/emqttd/src/emqttd_util.erl similarity index 99% rename from apps/emqttd/src/emqttd_utils.erl rename to apps/emqttd/src/emqttd_util.erl index a5a94b931..db6baf21c 100644 --- a/apps/emqttd/src/emqttd_utils.erl +++ b/apps/emqttd/src/emqttd_util.erl @@ -24,7 +24,7 @@ %%% %%% @end %%%----------------------------------------------------------------------------- --module(emqttd_utils). +-module(emqttd_util). -author("Feng Lee "). diff --git a/rel/files/app.config b/rel/files/app.config index 01cb541e2..6b4867c33 100644 --- a/rel/files/app.config +++ b/rel/files/app.config @@ -77,13 +77,12 @@ {max_playload_size, 4096} ]}, %% PubSub - {mqtt_pubsub, []}, - %% Metrics - {metrics, [ - {pub_interval, 60} + {mqtt_pubsub, [ + %% default should be scheduler numbers + %% {pool_size, 4} ]}, %% Bridge - {bridge, [ + {mqtt_bridge, [ {max_queue_len, 1000}, %NO effect now {ping_down_interval, 1} %seconds ]},