retained messages statistics
This commit is contained in:
parent
a5573d0c50
commit
9a05181844
|
@ -156,7 +156,7 @@ dec(gauge, Metric, Val) ->
|
||||||
set(Metric, Val) when is_atom(Metric) ->
|
set(Metric, Val) when is_atom(Metric) ->
|
||||||
set(gauge, Metric, Val).
|
set(gauge, Metric, Val).
|
||||||
set(gauge, Metric, Val) ->
|
set(gauge, Metric, Val) ->
|
||||||
ets:insert(?METRIC_TAB, key(gauge, Metric), Val).
|
ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
|
|
@ -38,11 +38,11 @@
|
||||||
|
|
||||||
-include("emqtt_packet.hrl").
|
-include("emqtt_packet.hrl").
|
||||||
|
|
||||||
-record(mqtt_retained, {topic, qos, payload}).
|
-record(emqtt_retained, {topic, qos, payload}).
|
||||||
|
|
||||||
-record(state, {store_limit}).
|
-record(state, {store_limit}).
|
||||||
|
|
||||||
-define(RETAINED_TAB, mqtt_retained).
|
-define(RETAINED_TAB, emqtt_retained).
|
||||||
|
|
||||||
-define(STORE_LIMIT, 1000000).
|
-define(STORE_LIMIT, 1000000).
|
||||||
|
|
||||||
|
@ -82,30 +82,31 @@ subscribe(Topics, CPid) when is_pid(CPid) ->
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([Opts]) ->
|
init([Opts]) ->
|
||||||
mnesia:create_table(mqtt_retained, [
|
mnesia:create_table(?RETAINED_TAB, [
|
||||||
{type, ordered_set},
|
{type, ordered_set},
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{attributes, record_info(fields, mqtt_retained)}]),
|
{attributes, record_info(fields, emqtt_retained)}]),
|
||||||
mnesia:add_table_copy(mqtt_retained, node(), ram_copies),
|
mnesia:add_table_copy(?RETAINED_TAB, node(), ram_copies),
|
||||||
Limit = proplists:get_value(store_limit, Opts, ?STORE_LIMIT),
|
Limit = proplists:get_value(store_limit, Opts, ?STORE_LIMIT),
|
||||||
{ok, #state{store_limit = Limit}}.
|
{ok, #state{store_limit = Limit}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
{stop, {badreq, Req}, State}.
|
{stop, {badreq, Req}, State}.
|
||||||
|
|
||||||
handle_cast({retain, Msg = #mqtt_message{qos = Qos,
|
handle_cast({retain, Msg = #mqtt_message{topic = Topic,
|
||||||
topic = Topic,
|
qos = Qos,
|
||||||
payload = Payload}},
|
payload = Payload}},
|
||||||
State = #state{store_limit = Limit}) ->
|
State = #state{store_limit = Limit}) ->
|
||||||
case mnesia:table_info(?RETAINED_TAB, size) of
|
case mnesia:table_info(?RETAINED_TAB, size) of
|
||||||
Size when Size >= Limit ->
|
Size when Size >= Limit ->
|
||||||
lager:error("Dropped message(retain) for table is full: ~p", [Msg]);
|
lager:error("Dropped message(retain) for table is full: ~p", [Msg]);
|
||||||
Size ->
|
_ ->
|
||||||
lager:debug("Retained message: ~p", [Msg]),
|
lager:debug("Retained message: ~p", [Msg]),
|
||||||
mnesia:dirty_write(#mqtt_retained{qos = Qos,
|
mnesia:dirty_write(#emqtt_retained{topic = Topic,
|
||||||
topic = Topic,
|
qos = Qos,
|
||||||
payload = Payload}),
|
payload = Payload}),
|
||||||
emqtt_metrics:set('messages/retained/count', Size)
|
emqtt_metrics:set('messages/retained/count',
|
||||||
|
mnesia:table_info(?RETAINED_TAB, size))
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
@ -137,6 +138,7 @@ match(Topic, RetainedTopics) ->
|
||||||
[T || T <- RetainedTopics, emqtt_topic:match(T, Topic)]
|
[T || T <- RetainedTopics, emqtt_topic:match(T, Topic)]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retained_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) ->
|
retained_msg(#emqtt_retained{topic = Topic, qos = Qos, payload = Payload}) ->
|
||||||
#mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}.
|
#mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}.
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue