emqttd:publish

This commit is contained in:
Feng 2016-03-08 13:21:17 +08:00
parent 67f1e15d4a
commit 73b7de7cb0
1 changed files with 5 additions and 5 deletions

View File

@ -157,7 +157,7 @@ init([]) ->
emqttd_time:seed(), emqttd_time:seed(),
ets:new(?BROKER_TAB, [set, public, named_table]), ets:new(?BROKER_TAB, [set, public, named_table]),
% Create $SYS Topics % Create $SYS Topics
emqttd_pubsub:create(topic, <<"$SYS/brokers">>), emqttd:create(topic, <<"$SYS/brokers">>),
[ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS], [ok = create_topic(Topic) || Topic <- ?SYSTOP_BROKERS],
% Tick % Tick
{ok, #state{started_at = os:timestamp(), {ok, #state{started_at = os:timestamp(),
@ -228,21 +228,21 @@ insert_hooks(Key, Hooks) ->
ets:insert(?BROKER_TAB, {Key, Hooks}), ok. ets:insert(?BROKER_TAB, {Key, Hooks}), ok.
create_topic(Topic) -> create_topic(Topic) ->
emqttd_pubsub:create(topic, emqttd_topic:systop(Topic)). emqttd:create(topic, emqttd_topic:systop(Topic)).
retain(brokers) -> retain(brokers) ->
Payload = list_to_binary(string:join([atom_to_list(N) || Payload = list_to_binary(string:join([atom_to_list(N) ||
N <- emqttd_mnesia:running_nodes()], ",")), N <- emqttd_mnesia:running_nodes()], ",")),
Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload), Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload),
emqttd_pubsub:publish(emqttd_message:set_flag(sys, Msg)). emqttd:publish(emqttd_message:set_flag(sys, Msg)).
retain(Topic, Payload) when is_binary(Payload) -> retain(Topic, Payload) when is_binary(Payload) ->
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
emqttd_pubsub:publish(emqttd_message:set_flag(retain, Msg)). emqttd:publish(emqttd_message:set_flag(retain, Msg)).
publish(Topic, Payload) when is_binary(Payload) -> publish(Topic, Payload) when is_binary(Payload) ->
Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload), Msg = emqttd_message:make(broker, emqttd_topic:systop(Topic), Payload),
emqttd_pubsub:publish(Msg). emqttd:publish(Msg).
uptime(#state{started_at = Ts}) -> uptime(#state{started_at = Ts}) ->
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,