Merge pull request #347 from emqtt/dev-feng
fix issue #346, and spelling errors
This commit is contained in:
commit
8e90707d63
|
@ -115,10 +115,10 @@
|
||||||
%% or inflight window is full.
|
%% or inflight window is full.
|
||||||
{max_length, 100},
|
{max_length, 100},
|
||||||
|
|
||||||
%% Low-water mark of queued messsages
|
%% Low-water mark of queued messages
|
||||||
{low_watermark, 0.2},
|
{low_watermark, 0.2},
|
||||||
|
|
||||||
%% High-water mark of queued messsages
|
%% High-water mark of queued messages
|
||||||
{high_watermark, 0.6},
|
{high_watermark, 0.6},
|
||||||
|
|
||||||
%% Queue Qos0 messages?
|
%% Queue Qos0 messages?
|
||||||
|
|
|
@ -107,10 +107,10 @@
|
||||||
%% or inflight window is full.
|
%% or inflight window is full.
|
||||||
{max_length, 100},
|
{max_length, 100},
|
||||||
|
|
||||||
%% Low-water mark of queued messsages
|
%% Low-water mark of queued messages
|
||||||
{low_watermark, 0.2},
|
{low_watermark, 0.2},
|
||||||
|
|
||||||
%% High-water mark of queued messsages
|
%% High-water mark of queued messages
|
||||||
{high_watermark, 0.6},
|
{high_watermark, 0.6},
|
||||||
|
|
||||||
%% Queue Qos0 messages?
|
%% Queue Qos0 messages?
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
%%% @end
|
%%% @end
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% TODO: should match topic tree
|
||||||
-module(emqttd_retained).
|
-module(emqttd_retained).
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
-author("Feng Lee <feng@emqtt.io>").
|
||||||
|
@ -150,9 +151,9 @@ dispatch(Topic, CPid) when is_binary(Topic) ->
|
||||||
init([]) ->
|
init([]) ->
|
||||||
StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'),
|
StatsFun = emqttd_stats:statsfun('retained/count', 'retained/max'),
|
||||||
%% One second
|
%% One second
|
||||||
{ok, StatsTimer} = timer:send_interval(1000, stats),
|
{ok, StatsTimer} = timer:send_interval(timer:seconds(1), stats),
|
||||||
%% Five minutes
|
%% Five minutes
|
||||||
{ok, ExpireTimer} = timer:send_interval(300 * 1000, expire),
|
{ok, ExpireTimer} = timer:send_interval(timer:minutes(5), expire),
|
||||||
{ok, #state{stats_fun = StatsFun,
|
{ok, #state{stats_fun = StatsFun,
|
||||||
expired_after = env(expired_after),
|
expired_after = env(expired_after),
|
||||||
stats_timer = StatsTimer,
|
stats_timer = StatsTimer,
|
||||||
|
@ -169,6 +170,10 @@ handle_info(stats, State = #state{stats_fun = StatsFun}) ->
|
||||||
StatsFun(mnesia:table_info(retained, size)),
|
StatsFun(mnesia:table_info(retained, size)),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
|
handle_info(expire, State = #state{expired_after = Never})
|
||||||
|
when Never =:= 0 orelse Never =:= undefined ->
|
||||||
|
{noreply, State, hibernate};
|
||||||
|
|
||||||
handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
|
handle_info(expire, State = #state{expired_after = ExpiredAfter}) ->
|
||||||
expire(emqttd_util:now_to_secs(os:timestamp()) - ExpiredAfter),
|
expire(emqttd_util:now_to_secs(os:timestamp()) - ExpiredAfter),
|
||||||
{noreply, State, hibernate};
|
{noreply, State, hibernate};
|
||||||
|
|
|
@ -170,9 +170,9 @@ destroy(SessPid, ClientId) ->
|
||||||
subscribe(SessPid, TopicTable) ->
|
subscribe(SessPid, TopicTable) ->
|
||||||
subscribe(SessPid, TopicTable, fun(_) -> ok end).
|
subscribe(SessPid, TopicTable, fun(_) -> ok end).
|
||||||
|
|
||||||
-spec subscribe(pid(), [{binary(), mqtt_qos()}], Callback :: fun()) -> ok.
|
-spec subscribe(pid(), [{binary(), mqtt_qos()}], AckFun :: fun()) -> ok.
|
||||||
subscribe(SessPid, TopicTable, Callback) ->
|
subscribe(SessPid, TopicTable, AckFun) ->
|
||||||
gen_server2:cast(SessPid, {subscribe, TopicTable, Callback}).
|
gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Publish message
|
%% @doc Publish message
|
||||||
|
@ -296,20 +296,20 @@ handle_call(Req, _From, State) ->
|
||||||
lager:error("Unexpected Request: ~p", [Req]),
|
lager:error("Unexpected Request: ~p", [Req]),
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
|
|
||||||
handle_cast({subscribe, TopicTable0, Callback}, Session = #session{
|
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{
|
||||||
client_id = ClientId, subscriptions = Subscriptions}) ->
|
client_id = ClientId, subscriptions = Subscriptions}) ->
|
||||||
|
|
||||||
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
TopicTable = emqttd_broker:foldl_hooks('client.subscribe', [ClientId], TopicTable0),
|
||||||
|
|
||||||
case TopicTable -- Subscriptions of
|
case TopicTable -- Subscriptions of
|
||||||
[] ->
|
[] ->
|
||||||
catch Callback([Qos || {_, Qos} <- TopicTable]),
|
catch AckFun([Qos || {_, Qos} <- TopicTable]),
|
||||||
noreply(Session);
|
noreply(Session);
|
||||||
_ ->
|
_ ->
|
||||||
%% subscribe first and don't care if the subscriptions have been existed
|
%% subscribe first and don't care if the subscriptions have been existed
|
||||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
||||||
|
|
||||||
catch Callback(GrantedQos),
|
catch AckFun(GrantedQos),
|
||||||
|
|
||||||
emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
|
emqttd_broker:foreach_hooks('client.subscribe.after', [ClientId, TopicTable]),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue