diff --git a/src/emqttd_broker.erl b/src/emqttd_broker.erl index 5a26fd291..dc93543c5 100644 --- a/src/emqttd_broker.erl +++ b/src/emqttd_broker.erl @@ -238,11 +238,12 @@ retain(brokers) -> retain(Topic, Payload) when is_binary(Payload) -> Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), - emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg)). + Msg1 = emqttd_message:set_flag(sys, Msg), + emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg1)). publish(Topic, Payload) when is_binary(Payload) -> Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), - emqttd_pubsub:publish(Msg). + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, diff --git a/src/emqttd_mod_presence.erl b/src/emqttd_mod_presence.erl index 57a02c1bf..e83886c16 100644 --- a/src/emqttd_mod_presence.erl +++ b/src/emqttd_mod_presence.erl @@ -52,7 +52,7 @@ client_connected(ConnAck, #mqtt_client{client_id = ClientId, proplists:get_value(qos, Opts, 0), topic(connected, ClientId), iolist_to_binary(Json)), - emqttd_pubsub:publish(Msg). + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). client_disconnected(Reason, ClientId, Opts) -> Json = mochijson2:encode([{clientid, ClientId}, @@ -62,7 +62,7 @@ client_disconnected(Reason, ClientId, Opts) -> proplists:get_value(qos, Opts, 0), topic(disconnected, ClientId), iolist_to_binary(Json)), - emqttd_pubsub:publish(Msg). + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). unload(_Opts) -> emqttd_broker:unhook('client.connected', {?MODULE, client_connected}), diff --git a/src/emqttd_stats.erl b/src/emqttd_stats.erl index c6a156d81..672c0e19b 100644 --- a/src/emqttd_stats.erl +++ b/src/emqttd_stats.erl @@ -166,7 +166,7 @@ code_change(_OldVsn, State, _Extra) -> publish(Stat, Val) -> Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)), - emqttd_pubsub:publish(Msg). + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). stats_topic(Stat) -> emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). diff --git a/src/lager_emqtt_backend.erl b/src/lager_emqtt_backend.erl index 9c28090bf..01fabece0 100644 --- a/src/lager_emqtt_backend.erl +++ b/src/lager_emqtt_backend.erl @@ -77,9 +77,8 @@ publish_log(Message, State = #state{formatter = Formatter, format_config = FormatConfig}) -> Severity = lager_msg:severity(Message), Payload = Formatter:format(Message, FormatConfig), - emqttd_pubsub:publish( - emqttd_message:make( - log, topic(Severity), iolist_to_binary(Payload))), + Msg = emqttd_message:make(log, topic(Severity), iolist_to_binary(Payload)), + emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)), {ok, State}. topic(Severity) ->