diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 760dba041..46b48189f 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -169,15 +169,17 @@ retain(brokers) -> Payload = list_to_binary(string:join([atom_to_list(N) || N <- emqttd_mnesia:running_nodes()], ",")), Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), - emqttd:publish(emqttd_message:set_flag(sys, Msg)). + Msg1 = emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg)), + emqttd:publish(Msg1). retain(Topic, Payload) when is_binary(Payload) -> Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), - emqttd:publish(emqttd_message:set_flag(retain, Msg)). + Msg1 = emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg)), + emqttd:publish(Msg1). publish(Topic, Payload) when is_binary(Payload) -> Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), - emqttd:publish(Msg). + emqttd:publish(emqttd_message:set_flag(sys, Msg)). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 04f0014d6..fc1f300b9 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -523,7 +523,7 @@ print({{ClientId, _ClientPid}, SessInfo}) -> "message_queue=~w, message_dropped=~w, " "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " "created_at=~w)~n", - [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). + [ClientId | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]). format(created_at, Val) -> emqttd_time:now_to_secs(Val); diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index b49511000..2db444e8d 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -41,14 +41,16 @@ on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId, {protocol, ProtoVer}, {connack, ConnAck}, {ts, emqttd_time:now_to_secs()}]), - emqttd:publish(message(qos(Opts), topic(connected, ClientId), Json)), + Msg = message(qos(Opts), topic(connected, ClientId), Json), + emqttd:publish(emqttd_message:set_flag(sys, Msg)), {ok, Client}. on_client_disconnected(Reason, ClientId, Opts) -> Json = mochijson2:encode([{clientid, ClientId}, {reason, reason(Reason)}, {ts, emqttd_time:now_to_secs()}]), - emqttd:publish(message(qos(Opts), topic(disconnected, ClientId), Json)). + Msg = message(qos(Opts), topic(disconnected, ClientId), Json), + emqttd:publish(emqttd_message:set_flag(sys, Msg)). unload(_Opts) -> emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3), diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index b95494f8d..46686bb28 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -704,8 +704,8 @@ sess_info(#session{clean_sess = CleanSess, [{clean_sess, CleanSess}, {max_inflight, MaxInflight}, {inflight_queue, length(InflightQueue)}, - {message_queue, proplists:get_value(len, Stats)}, - {message_dropped,proplists:get_value(dropped, Stats)}, + {message_queue, get_value(len, Stats)}, + {message_dropped,get_value(dropped, Stats)}, {awaiting_rel, maps:size(AwaitingRel)}, {awaiting_ack, maps:size(AwaitingAck)}, {awaiting_comp, maps:size(AwaitingComp)}, diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index 755cdb515..3dbc0e96c 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -165,7 +165,8 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- publish(Stat, Val) -> - emqttd:publish(emqttd_message:make(stats, stats_topic(Stat), bin(Val))). + Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)), + emqttd:publish(emqttd_message:set_flag(sys, Msg)). stats_topic(Stat) -> emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index 7e0d0b6ac..e2e70b33e 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -16,8 +16,6 @@ -module(emqttd_topic). --import(lists, [reverse/1]). - -export([match/2, validate/1, triples/1, words/1, wildcard/1]). -export([join/1, feed_var/3, is_queue/1, systop/1]). @@ -113,7 +111,7 @@ triples(Topic) when is_binary(Topic) -> triples(words(Topic), root, []). triples([], _Parent, Acc) -> - reverse(Acc); + lists:reverse(Acc); triples([W|Words], Parent, Acc) -> Node = join(Parent, W), diff --git a/src/lager_emqtt_backend.erl b/src/lager_emqtt_backend.erl index 9deccac52..c98e87de9 100644 --- a/src/lager_emqtt_backend.erl +++ b/src/lager_emqtt_backend.erl @@ -77,7 +77,8 @@ publish_log(Message, State = #state{formatter = Formatter, format_config = FormatConfig}) -> Severity = lager_msg:severity(Message), Payload = Formatter:format(Message, FormatConfig), - emqttd:publish(emqttd_message:make(log, topic(Severity), iolist_to_binary(Payload))), + Msg = emqttd_message:make(log, topic(Severity), iolist_to_binary(Payload)), + emqttd:publish(emqttd_message:set_flag(sys, Msg)), {ok, State}. topic(Severity) ->