add 'sys' flag to message
This commit is contained in:
parent
4cfa07d774
commit
d6b4ff654a
|
@ -238,11 +238,12 @@ retain(brokers) ->
|
||||||
|
|
||||||
retain(Topic, Payload) when is_binary(Payload) ->
|
retain(Topic, Payload) when is_binary(Payload) ->
|
||||||
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
|
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
|
||||||
emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg)).
|
Msg1 = emqttd_message:set_flag(sys, Msg),
|
||||||
|
emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg1)).
|
||||||
|
|
||||||
publish(Topic, Payload) when is_binary(Payload) ->
|
publish(Topic, Payload) when is_binary(Payload) ->
|
||||||
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
|
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
|
||||||
emqttd_pubsub:publish(Msg).
|
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
|
||||||
|
|
||||||
uptime(#state{started_at = Ts}) ->
|
uptime(#state{started_at = Ts}) ->
|
||||||
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
|
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,
|
||||||
|
|
|
@ -52,7 +52,7 @@ client_connected(ConnAck, #mqtt_client{client_id = ClientId,
|
||||||
proplists:get_value(qos, Opts, 0),
|
proplists:get_value(qos, Opts, 0),
|
||||||
topic(connected, ClientId),
|
topic(connected, ClientId),
|
||||||
iolist_to_binary(Json)),
|
iolist_to_binary(Json)),
|
||||||
emqttd_pubsub:publish(Msg).
|
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
|
||||||
|
|
||||||
client_disconnected(Reason, ClientId, Opts) ->
|
client_disconnected(Reason, ClientId, Opts) ->
|
||||||
Json = mochijson2:encode([{clientid, ClientId},
|
Json = mochijson2:encode([{clientid, ClientId},
|
||||||
|
@ -62,7 +62,7 @@ client_disconnected(Reason, ClientId, Opts) ->
|
||||||
proplists:get_value(qos, Opts, 0),
|
proplists:get_value(qos, Opts, 0),
|
||||||
topic(disconnected, ClientId),
|
topic(disconnected, ClientId),
|
||||||
iolist_to_binary(Json)),
|
iolist_to_binary(Json)),
|
||||||
emqttd_pubsub:publish(Msg).
|
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
|
||||||
|
|
||||||
unload(_Opts) ->
|
unload(_Opts) ->
|
||||||
emqttd_broker:unhook('client.connected', {?MODULE, client_connected}),
|
emqttd_broker:unhook('client.connected', {?MODULE, client_connected}),
|
||||||
|
|
|
@ -166,7 +166,7 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
publish(Stat, Val) ->
|
publish(Stat, Val) ->
|
||||||
Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)),
|
Msg = emqttd_message:make(stats, stats_topic(Stat), bin(Val)),
|
||||||
emqttd_pubsub:publish(Msg).
|
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)).
|
||||||
|
|
||||||
stats_topic(Stat) ->
|
stats_topic(Stat) ->
|
||||||
emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).
|
emqttd_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))).
|
||||||
|
|
|
@ -77,9 +77,8 @@ publish_log(Message, State = #state{formatter = Formatter,
|
||||||
format_config = FormatConfig}) ->
|
format_config = FormatConfig}) ->
|
||||||
Severity = lager_msg:severity(Message),
|
Severity = lager_msg:severity(Message),
|
||||||
Payload = Formatter:format(Message, FormatConfig),
|
Payload = Formatter:format(Message, FormatConfig),
|
||||||
emqttd_pubsub:publish(
|
Msg = emqttd_message:make(log, topic(Severity), iolist_to_binary(Payload)),
|
||||||
emqttd_message:make(
|
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)),
|
||||||
log, topic(Severity), iolist_to_binary(Payload))),
|
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
topic(Severity) ->
|
topic(Severity) ->
|
||||||
|
|
Loading…
Reference in New Issue