diff --git a/apps/emqtt/src/emqtt_metrics.erl b/apps/emqtt/src/emqtt_metrics.erl index 208782033..798abb6e0 100644 --- a/apps/emqtt/src/emqtt_metrics.erl +++ b/apps/emqtt/src/emqtt_metrics.erl @@ -156,7 +156,7 @@ dec(gauge, Metric, Val) -> set(Metric, Val) when is_atom(Metric) -> set(gauge, Metric, Val). set(gauge, Metric, Val) -> - ets:insert(?METRIC_TAB, key(gauge, Metric), Val). + ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}). %%------------------------------------------------------------------------------ %% @doc diff --git a/apps/emqtt/src/emqtt_server.erl b/apps/emqtt/src/emqtt_server.erl index e1e38b1bb..f5b883999 100644 --- a/apps/emqtt/src/emqtt_server.erl +++ b/apps/emqtt/src/emqtt_server.erl @@ -38,11 +38,11 @@ -include("emqtt_packet.hrl"). --record(mqtt_retained, {topic, qos, payload}). +-record(emqtt_retained, {topic, qos, payload}). -record(state, {store_limit}). --define(RETAINED_TAB, mqtt_retained). +-define(RETAINED_TAB, emqtt_retained). -define(STORE_LIMIT, 1000000). @@ -82,30 +82,31 @@ subscribe(Topics, CPid) when is_pid(CPid) -> %%%============================================================================= init([Opts]) -> - mnesia:create_table(mqtt_retained, [ - {type, ordered_set}, - {ram_copies, [node()]}, - {attributes, record_info(fields, mqtt_retained)}]), - mnesia:add_table_copy(mqtt_retained, node(), ram_copies), + mnesia:create_table(?RETAINED_TAB, [ + {type, ordered_set}, + {ram_copies, [node()]}, + {attributes, record_info(fields, emqtt_retained)}]), + mnesia:add_table_copy(?RETAINED_TAB, node(), ram_copies), Limit = proplists:get_value(store_limit, Opts, ?STORE_LIMIT), {ok, #state{store_limit = Limit}}. handle_call(Req, _From, State) -> {stop, {badreq, Req}, State}. -handle_cast({retain, Msg = #mqtt_message{qos = Qos, - topic = Topic, +handle_cast({retain, Msg = #mqtt_message{topic = Topic, + qos = Qos, payload = Payload}}, State = #state{store_limit = Limit}) -> case mnesia:table_info(?RETAINED_TAB, size) of Size when Size >= Limit -> lager:error("Dropped message(retain) for table is full: ~p", [Msg]); - Size -> + _ -> lager:debug("Retained message: ~p", [Msg]), - mnesia:dirty_write(#mqtt_retained{qos = Qos, - topic = Topic, - payload = Payload}), - emqtt_metrics:set('messages/retained/count', Size) + mnesia:dirty_write(#emqtt_retained{topic = Topic, + qos = Qos, + payload = Payload}), + emqtt_metrics:set('messages/retained/count', + mnesia:table_info(?RETAINED_TAB, size)) end, {noreply, State}; @@ -137,6 +138,7 @@ match(Topic, RetainedTopics) -> [T || T <- RetainedTopics, emqtt_topic:match(T, Topic)] end. -retained_msg(#mqtt_retained{topic = Topic, qos = Qos, payload = Payload}) -> +retained_msg(#emqtt_retained{topic = Topic, qos = Qos, payload = Payload}) -> #mqtt_message{qos = Qos, retain = true, topic = Topic, payload = Payload}. +