From fae3e1e32f5fa252099624df889f41ee7f761fde Mon Sep 17 00:00:00 2001 From: Feng Date: Thu, 15 Oct 2015 18:59:51 +0800 Subject: [PATCH 1/3] AckFun --- src/emqttd_session.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index 4cb927ef4..d2163d3f4 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -170,9 +170,9 @@ destroy(SessPid, ClientId) -> subscribe(SessPid, TopicTable) -> subscribe(SessPid, TopicTable, fun(_) -> ok end). --spec subscribe(pid(), [{binary(), mqtt_qos()}], Callback :: fun()) -> ok. -subscribe(SessPid, TopicTable, Callback) -> - gen_server2:cast(SessPid, {subscribe, TopicTable, Callback}). +-spec subscribe(pid(), [{binary(), mqtt_qos()}], AckFun :: fun()) -> ok. +subscribe(SessPid, TopicTable, AckFun) -> + gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}). %%------------------------------------------------------------------------------ %% @doc Publish message @@ -296,20 +296,20 @@ handle_call(Req, _From, State) -> lager:error("Unexpected Request: ~p", [Req]), {reply, ok, State}. -handle_cast({subscribe, TopicTable0, Callback}, Session = #session{ +handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{ client_id = ClientId, subscriptions = Subscriptions}) -> TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0), case TopicTable -- Subscriptions of [] -> - catch Callback([Qos || {_, Qos} <- TopicTable]), + catch AckFun([Qos || {_, Qos} <- TopicTable]), noreply(Session); _ -> %% subscribe first and don't care if the subscriptions have been existed {ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable), - catch Callback(GrantedQos), + catch AckFun(GrantedQos), emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]), From 61a68187672adb44ee9d0da1f0af9adc611fb950 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 16 Oct 2015 14:19:20 +0800 Subject: [PATCH 2/3] fix spelling error --- rel/files/emqttd.config.development | 4 ++-- rel/files/emqttd.config.production | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rel/files/emqttd.config.development b/rel/files/emqttd.config.development index 9b8eedb2d..4989110f9 100644 --- a/rel/files/emqttd.config.development +++ b/rel/files/emqttd.config.development @@ -115,10 +115,10 @@ %% or inflight window is full. {max_length, 100}, - %% Low-water mark of queued messsages + %% Low-water mark of queued messages {low_watermark, 0.2}, - %% High-water mark of queued messsages + %% High-water mark of queued messages {high_watermark, 0.6}, %% Queue Qos0 messages? diff --git a/rel/files/emqttd.config.production b/rel/files/emqttd.config.production index 639630672..53b1ea3a0 100644 --- a/rel/files/emqttd.config.production +++ b/rel/files/emqttd.config.production @@ -107,10 +107,10 @@ %% or inflight window is full. {max_length, 100}, - %% Low-water mark of queued messsages + %% Low-water mark of queued messages {low_watermark, 0.2}, - %% High-water mark of queued messsages + %% High-water mark of queued messages {high_watermark, 0.6}, %% Queue Qos0 messages? From a35d51ea24ee7da490ae4408033f34974b1627a5 Mon Sep 17 00:00:00 2001 From: Feng Date: Fri, 16 Oct 2015 14:58:51 +0800 Subject: [PATCH 3/3] fix issue #346 --- src/emqttd_retained.erl | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/emqttd_retained.erl b/src/emqttd_retained.erl index b82570800..8e797255b 100644 --- a/src/emqttd_retained.erl +++ b/src/emqttd_retained.erl @@ -25,6 +25,7 @@ %%% @end %%%----------------------------------------------------------------------------- +%% TODO: should match topic tree -module(emqttd_retained). -author("Feng Lee "). @@ -150,9 +151,9 @@ dispatch(Topic, CPid) when is_binary(Topic) -> init([]) -> StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'), %% One second - {ok, StatsTimer} = timer:send_interval(1000, stats), + {ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats), %% Five minutes - {ok, ExpireTimer} = timer:send_interval(300 * 1000, expire), + {ok, ExpireTimer} = timer:send_interval(timer:minutes(5), expire), {ok, #state{stats_fun = StatsFun, expired_after = env(expired_after), stats_timer = StatsTimer, @@ -169,6 +170,10 @@ handle_info(stats, State = #state{stats_fun = StatsFun}) -> StatsFun(mnesia:table_info(retained, size)), {noreply, State, hibernate}; +handle_info(expire, State = #state{expired_after = Never}) + when Never =:= 0 orelse Never =:= undefined -> + {noreply, State, hibernate}; + handle_info(expire, State = #state{expired_after = ExpiredAfter}) -> expire(emqttd_util:now_to_secs(os:timestamp()) - ExpiredAfter), {noreply, State, hibernate};