merge PR#462

This commit is contained in:
Feng 2016-03-12 15:55:17 +08:00
commit ba5fcfdfae
7 changed files with 17 additions and 13 deletions

View File

@ -169,15 +169,17 @@ retain(brokers) ->
Payload = list_to_binary(string:join([atom_to_list(N) || Payload = list_to_binary(string:join([atom_to_list(N) ||
N <- emqttd_mnesia:running_nodes()], ",")), N <- emqttd_mnesia:running_nodes()], ",")),
Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload),
emqttd:publish(emqttd_message:set_flag(sys, Msg)). Msg1 = emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg)),
emqttd:publish(Msg1).
retain(Topic, Payload) when is_binary(Payload) -> retain(Topic, Payload) when is_binary(Payload) ->
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
emqttd:publish(emqttd_message:set_flag(retain, Msg)). Msg1 = emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg)),
emqttd:publish(Msg1).
publish(Topic, Payload) when is_binary(Payload) -> publish(Topic, Payload) when is_binary(Payload) ->
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
emqttd:publish(Msg). emqttd:publish(emqttd_message:set_flag(sys, Msg)).
uptime(#state{started_at = Ts}) -> uptime(#state{started_at = Ts}) ->
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,

View File

@ -523,7 +523,7 @@ print({{ClientId, _ClientPid}, SessInfo}) ->
"message_queue=~w, message_dropped=~w, " "message_queue=~w, message_dropped=~w, "
"awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, "
"created_at=~w)~n", "created_at=~w)~n",
[ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). [ClientId | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]).
format(created_at, Val) -> format(created_at, Val) ->
emqttd_time:now_to_secs(Val); emqttd_time:now_to_secs(Val);

View File

@ -41,14 +41,16 @@ on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId,
{protocol, ProtoVer}, {protocol, ProtoVer},
{connack, ConnAck}, {connack, ConnAck},
{ts, emqttd_time:now_to_secs()}]), {ts, emqttd_time:now_to_secs()}]),
emqttd:publish(message(qos(Opts), topic(connected, ClientId), Json)), Msg = message(qos(Opts), topic(connected, ClientId), Json),
emqttd:publish(emqttd_message:set_flag(sys, Msg)),
{ok, Client}. {ok, Client}.
on_client_disconnected(Reason, ClientId, Opts) -> on_client_disconnected(Reason, ClientId, Opts) ->
Json = mochijson2:encode([{clientid, ClientId}, Json = mochijson2:encode([{clientid, ClientId},
{reason, reason(Reason)}, {reason, reason(Reason)},
{ts, emqttd_time:now_to_secs()}]), {ts, emqttd_time:now_to_secs()}]),
emqttd:publish(message(qos(Opts), topic(disconnected, ClientId), Json)). Msg = message(qos(Opts), topic(disconnected, ClientId), Json),
emqttd:publish(emqttd_message:set_flag(sys, Msg)).
unload(_Opts) -> unload(_Opts) ->
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3), emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),

View File

@ -704,8 +704,8 @@ sess_info(#session{clean_sess = CleanSess,
[{clean_sess, CleanSess}, [{clean_sess, CleanSess},
{max_inflight, MaxInflight}, {max_inflight, MaxInflight},
{inflight_queue, length(InflightQueue)}, {inflight_queue, length(InflightQueue)},
{message_queue, proplists:get_value(len, Stats)}, {message_queue, get_value(len, Stats)},
{message_dropped,proplists:get_value(dropped, Stats)}, {message_dropped,get_value(dropped, Stats)},
{awaiting_rel, maps:size(AwaitingRel)}, {awaiting_rel, maps:size(AwaitingRel)},
{awaiting_ack, maps:size(AwaitingAck)}, {awaiting_ack, maps:size(AwaitingAck)},
{awaiting_comp, maps:size(AwaitingComp)}, {awaiting_comp, maps:size(AwaitingComp)},

View File

@ -165,7 +165,8 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
publish(Stat, Val) -> publish(Stat, Val) ->
emqttd:publish(emqttd_message:make(stats, stats_topic(Stat), bin(Val))). Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)),
emqttd:publish(emqttd_message:set_flag(sys, Msg)).
stats_topic(Stat) -> stats_topic(Stat) ->
emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).

View File

@ -16,8 +16,6 @@
-module(emqttd_topic). -module(emqttd_topic).
-import(lists, [reverse/1]).
-export([match/2, validate/1, triples/1, words/1, wildcard/1]). -export([match/2, validate/1, triples/1, words/1, wildcard/1]).
-export([join/1, feed_var/3, is_queue/1, systop/1]). -export([join/1, feed_var/3, is_queue/1, systop/1]).
@ -113,7 +111,7 @@ triples(Topic) when is_binary(Topic) ->
triples(words(Topic), root, []). triples(words(Topic), root, []).
triples([], _Parent, Acc) -> triples([], _Parent, Acc) ->
reverse(Acc); lists:reverse(Acc);
triples([W|Words], Parent, Acc) -> triples([W|Words], Parent, Acc) ->
Node = join(Parent, W), Node = join(Parent, W),

View File

@ -77,7 +77,8 @@ publish_log(Message, State = #state{formatter = Formatter,
format_config = FormatConfig}) -> format_config = FormatConfig}) ->
Severity = lager_msg:severity(Message), Severity = lager_msg:severity(Message),
Payload = Formatter:format(Message, FormatConfig), Payload = Formatter:format(Message, FormatConfig),
emqttd:publish(emqttd_message:make(log, topic(Severity), iolist_to_binary(Payload))), Msg = emqttd_message:make(log, topic(Severity), iolist_to_binary(Payload)),
emqttd:publish(emqttd_message:set_flag(sys, Msg)),
{ok, State}. {ok, State}.
topic(Severity) -> topic(Severity) ->