From d6b4ff654a3867f30b6f00ca40788a6dc7fe5253 Mon Sep 17 00:00:00 2001 From: Frank Feng Date: Sat, 5 Mar 2016 21:16:16 +0800 Subject: [PATCH 1/2] add 'sys' flag to message --- src/emqttd_broker.erl | 5 +++-- src/emqttd_mod_presence.erl | 4 ++-- src/emqttd_stats.erl | 2 +- src/lager_emqtt_backend.erl | 5 ++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 5a26fd291..dc93543c5 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -238,11 +238,12 @@ retain(brokers) -> retain(Topic, Payload) when is_binary(Payload) -> Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), - emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg)). + Msg1 = emqttd_message:set_flag(sys, Msg), + emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg1)). publish(Topic, Payload) when is_binary(Payload) -> Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), - emqttd_pubsub:publish(Msg). + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 57a02c1bf..e83886c16 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -52,7 +52,7 @@ client_connected(ConnAck, #mqtt_client{client_id = ClientId, proplists:get_value(qos, Opts, 0), topic(connected, ClientId), iolist_to_binary(Json)), - emqttd_pubsub:publish(Msg). + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). client_disconnected(Reason, ClientId, Opts) -> Json = mochijson2:encode([{clientid, ClientId}, @@ -62,7 +62,7 @@ client_disconnected(Reason, ClientId, Opts) -> proplists:get_value(qos, Opts, 0), topic(disconnected, ClientId), iolist_to_binary(Json)), - emqttd_pubsub:publish(Msg). + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). unload(_Opts) -> emqttd_broker:unhook('client.connected', {?MODULE, client_connected}), diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index c6a156d81..672c0e19b 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -166,7 +166,7 @@ code_change(_OldVsn, State, _Extra) -> publish(Stat, Val) -> Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)), - emqttd_pubsub:publish(Msg). + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). stats_topic(Stat) -> emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). diff --git a/src/lager_emqtt_backend.erl b/src/lager_emqtt_backend.erl index 9c28090bf..01fabece0 100644 --- a/src/lager_emqtt_backend.erl +++ b/src/lager_emqtt_backend.erl @@ -77,9 +77,8 @@ publish_log(Message, State = #state{formatter = Formatter, format_config = FormatConfig}) -> Severity = lager_msg:severity(Message), Payload = Formatter:format(Message, FormatConfig), - emqttd_pubsub:publish( - emqttd_message:make( - log, topic(Severity), iolist_to_binary(Payload))), + Msg = emqttd_message:make(log, topic(Severity), iolist_to_binary(Payload)), + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)), {ok, State}. topic(Severity) -> From 9c9633ab277a8f314e1c755b5f8c450299afdd3e Mon Sep 17 00:00:00 2001 From: Frank Feng Date: Sat, 5 Mar 2016 23:12:25 +0800 Subject: [PATCH 2/2] just to make it more readable --- src/emqttd_cli.erl | 2 +- src/emqttd_session.erl | 4 ++-- src/emqttd_topic.erl | 4 +--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/emqttd_cli.erl b/src/emqttd_cli.erl index 645556044..64ebdebd3 100644 --- a/src/emqttd_cli.erl +++ b/src/emqttd_cli.erl @@ -471,7 +471,7 @@ print({{ClientId, _ClientPid}, SessInfo}) -> "message_queue=~w, message_dropped=~w, " "awaiting_rel=~w, awaiting_ack=~w, awaiting_comp=~w, " "created_at=~w)~n", - [ClientId | [format(Key, proplists:get_value(Key, SessInfo)) || Key <- InfoKeys]]). + [ClientId | [format(Key, get_value(Key, SessInfo)) || Key <- InfoKeys]]). print(topic, Topic, Records) -> Nodes = [Node || #mqtt_topic{node = Node} <- Records], diff --git a/src/emqttd_session.erl b/src/emqttd_session.erl index f9779509f..c48853099 100644 --- a/src/emqttd_session.erl +++ b/src/emqttd_session.erl @@ -700,8 +700,8 @@ sess_info(#session{clean_sess = CleanSess, [{clean_sess, CleanSess}, {max_inflight, MaxInflight}, {inflight_queue, length(InflightQueue)}, - {message_queue, proplists:get_value(len, Stats)}, - {message_dropped,proplists:get_value(dropped, Stats)}, + {message_queue, get_value(len, Stats)}, + {message_dropped,get_value(dropped, Stats)}, {awaiting_rel, maps:size(AwaitingRel)}, {awaiting_ack, maps:size(AwaitingAck)}, {awaiting_comp, maps:size(AwaitingComp)}, diff --git a/src/emqttd_topic.erl b/src/emqttd_topic.erl index f3c1f1a33..b58d3b8d5 100644 --- a/src/emqttd_topic.erl +++ b/src/emqttd_topic.erl @@ -16,8 +16,6 @@ -module(emqttd_topic). --import(lists, [reverse/1]). - -export([match/2, validate/1, triples/1, words/1, wildcard/1]). -export([join/1, feed_var/3, is_queue/1, systop/1]). @@ -113,7 +111,7 @@ triples(Topic) when is_binary(Topic) -> triples(words(Topic), root, []). triples([], _Parent, Acc) -> - reverse(Acc); + lists:reverse(Acc); triples([W|Words], Parent, Acc) -> Node = join(Parent, W),