This commit is contained in:
Feng Lee 2015-06-15 19:55:59 +08:00
parent c487348c2a
commit d255a98c09
17 changed files with 35 additions and 101 deletions

View File

@ -280,20 +280,24 @@ create_topic(Topic) ->
retain(brokers) ->
Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")),
publish(#mqtt_message{retain = true, topic = <<"$SYS/brokers">>, payload = Payload}).
publish(#mqtt_message{from = broker,
retain = true,
topic = <<"$SYS/brokers">>,
payload = Payload}).
retain(Topic, Payload) when is_binary(Payload) ->
publish(#mqtt_message{retain = true,
publish(#mqtt_message{from = broker,
retain = true,
topic = emqtt_topic:systop(Topic),
payload = Payload}).
publish(Topic, Payload) when is_binary(Payload) ->
publish( #mqtt_message{topic = emqtt_topic:systop(Topic),
publish( #mqtt_message{from = broker,
topic = emqtt_topic:systop(Topic),
payload = Payload}).
publish(Msg) ->
emqttd_pubsub:publish(broker, Msg).
emqttd_pubsub:publish(Msg).
uptime(#state{started_at = Ts}) ->
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,

View File

@ -114,8 +114,8 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} =
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState),
handle_info({subscribe, TopicTable}, #state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
handle_info({inet_reply, _Ref, ok}, State) ->

View File

@ -53,7 +53,8 @@ handle_request('POST', "/mqtt/publish", Req) ->
Message = list_to_binary(get_value("message", Params)),
case {validate(qos, Qos), validate(topic, Topic)} of
{true, true} ->
emqttd_pubsub:publish(http, #mqtt_message{qos = Qos,
emqttd_pubsub:publish(#mqtt_message{from = http,
qos = Qos,
retain = Retain,
topic = Topic,
payload = Message}),

View File

@ -192,7 +192,8 @@ code_change(_OldVsn, State, _Extra) ->
%%%=============================================================================
publish(Metric, Val) ->
emqttd_pubsub:publish(metrics, #mqtt_message{topic = metric_topic(Metric),
emqttd_pubsub:publish(#mqtt_message{topic = metric_topic(Metric),
from = metrics,
payload = emqttd_util:integer_to_binary(Val)}).
create_metric({gauge, Name}) ->

View File

@ -49,7 +49,7 @@ load(Opts) ->
client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) ->
F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end,
[ClientPid ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics];
ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]};
client_connected(_ConnAck, _Client, _Topics) ->
ignore.

View File

@ -56,14 +56,16 @@ client_connected(ConnAck, #mqtt_client{clientid = ClientId,
{protocol, ProtoVer},
{connack, ConnAck},
{ts, emqttd_vm:timestamp()}]),
Message = #mqtt_message{qos = proplists:get_value(qos, Opts, 0),
Message = #mqtt_message{from = presence,
qos = proplists:get_value(qos, Opts, 0),
topic = topic(connected, ClientId),
payload = iolist_to_binary(Json)},
emqttd_pubsub:publish(presence, Message).
emqttd_pubsub:publish(Message).
client_disconnected(Reason, ClientId, Opts) ->
Json = mochijson2:encode([{reason, reason(Reason)}, {ts, emqttd_vm:timestamp()}]),
emqttd_pubsub:publish(presence, #mqtt_message{qos = proplists:get_value(qos, Opts, 0),
emqttd_pubsub:publish(#mqtt_message{from = presence,
qos = proplists:get_value(qos, Opts, 0),
topic = topic(disconnected, ClientId),
payload = iolist_to_binary(Json)}).

View File

@ -299,7 +299,7 @@ shutdown(normal, #proto_state{peername = Peername, clientid = ClientId, will_msg
lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown",
[ClientId, emqttd_net:format(Peername)]),
try_unregister(ClientId),
send_willmsg(ClientId, WillMsg);
send_willmsg(ClientId, WillMsg),
emqttd_broker:foreach_hooks(client_disconnected, [normal, ClientId]);
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->

View File

@ -55,7 +55,7 @@ start_session(CleanSess, ClientId, ClientPid) ->
%%%=============================================================================
init([]) ->
{ok, {{simple_one_for_one, 0, 1},
{ok, {{simple_one_for_one, 10, 10},
[{session, {emqttd_session, start_link, []},
transient, 10000, worker, [emqttd_session]}]}}.

View File

@ -140,7 +140,7 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, MRef}] ->
erlang:demonitor(MRef, [flush]),
emqttd_session:destroy_session(SessPid, ClientId);
emqttd_session:destroy(SessPid, ClientId);
[] ->
ok
end,

View File

@ -154,7 +154,8 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%=============================================================================
publish(Stat, Val) ->
emqttd_pubsub:publish(stats, #mqtt_message{topic = stats_topic(Stat),
emqttd_pubsub:publish(#mqtt_message{from = stats,
topic = stats_topic(Stat),
payload = emqttd_util:integer_to_binary(Val)}).
stats_topic(Stat) ->

View File

@ -133,9 +133,9 @@ handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState})
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({deliver, Message}, #state{proto_state = ProtoState} = State) ->
handle_info({deliver, Message}, #client_state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState),
{noreply, State#state{proto_state = ProtoState1}};
{noreply, State#client_state{proto_state = ProtoState1}};
handle_info({redeliver, {?PUBREL, PacketId}}, #client_state{proto_state = ProtoState} = State) ->
{ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState),

View File

@ -1,14 +0,0 @@
## Overview
ZenMQ is the core architecture of distributed pubsub messaging queue written in Erlang.
## Responsibilties
* Topic Trie Tree
* Message Route
* Queue Management
* Broker Cluster
* Distributed Broker
**Notice that this is an experimental design**

View File

@ -1,12 +0,0 @@
{application, zenmq,
[
{description, ""},
{vsn, "1"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, { zenmq_app, []}},
{env, []}
]}.

View File

@ -1,2 +0,0 @@
-module(zenmq).

View File

@ -1,16 +0,0 @@
-module(zenmq_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
%% ===================================================================
%% Application callbacks
%% ===================================================================
start(_StartType, _StartArgs) ->
zenmq_sup:start_link().
stop(_State) ->
ok.

View File

@ -1,4 +0,0 @@
-module(zenmq_router).

View File

@ -1,27 +0,0 @@
-module(zenmq_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
%% ===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, []} }.