diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 526d824b4..20bd27b45 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -280,20 +280,24 @@ create_topic(Topic) -> retain(brokers) -> Payload = list_to_binary(string:join([atom_to_list(N) || N <- running_nodes()], ",")), - publish(#mqtt_message{retain = true, topic = <<"$SYS/brokers">>, payload = Payload}). + publish(#mqtt_message{from = broker, + retain = true, + topic = <<"$SYS/brokers">>, + payload = Payload}). retain(Topic, Payload) when is_binary(Payload) -> - publish(#mqtt_message{retain = true, + publish(#mqtt_message{from = broker, + retain = true, topic = emqtt_topic:systop(Topic), payload = Payload}). publish(Topic, Payload) when is_binary(Payload) -> - publish( #mqtt_message{topic = emqtt_topic:systop(Topic), + publish( #mqtt_message{from = broker, + topic = emqtt_topic:systop(Topic), payload = Payload}). publish(Msg) -> - emqttd_pubsub:publish(broker, Msg). - + emqttd_pubsub:publish(Msg). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index a61b3373b..cbcfffb4b 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -114,8 +114,8 @@ handle_info({redeliver, {?PUBREL, PacketId}}, #state{proto_state = ProtoState} = {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; -handle_info({subscribe, Topic, Qos}, #state{proto_state = ProtoState} = State) -> - {ok, ProtoState1} = emqttd_protocol:handle({subscribe, Topic, Qos}, ProtoState), +handle_info({subscribe, TopicTable}, #state{proto_state = ProtoState} = State) -> + {ok, ProtoState1} = emqttd_protocol:handle({subscribe, TopicTable}, ProtoState), {noreply, State#state{proto_state = ProtoState1}}; handle_info({inet_reply, _Ref, ok}, State) -> diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index 71c844ead..2c5c1a24f 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -53,10 +53,11 @@ handle_request('POST', "/mqtt/publish", Req) -> Message = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> - emqttd_pubsub:publish(http, #mqtt_message{qos = Qos, - retain = Retain, - topic = Topic, - payload = Message}), + emqttd_pubsub:publish(#mqtt_message{from = http, + qos = Qos, + retain = Retain, + topic = Topic, + payload = Message}), Req:ok({"text/plan", <<"ok\n">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index 550d5fcd7..84350f939 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -192,8 +192,9 @@ code_change(_OldVsn, State, _Extra) -> %%%============================================================================= publish(Metric, Val) -> - emqttd_pubsub:publish(metrics, #mqtt_message{topic = metric_topic(Metric), - payload = emqttd_util:integer_to_binary(Val)}). + emqttd_pubsub:publish(#mqtt_message{topic = metric_topic(Metric), + from = metrics, + payload = emqttd_util:integer_to_binary(Val)}). create_metric({gauge, Name}) -> ets:insert(?METRIC_TAB, {{Name, 0}, 0}); diff --git a/apps/emqttd/src/emqttd_mod_autosub.erl b/apps/emqttd/src/emqttd_mod_autosub.erl index a28db1642..8f7af60ac 100644 --- a/apps/emqttd/src/emqttd_mod_autosub.erl +++ b/apps/emqttd/src/emqttd_mod_autosub.erl @@ -49,7 +49,7 @@ load(Opts) -> client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) -> F = fun(Topic) -> emqtt_topic:feed_var(<<"$c">>, ClientId, Topic) end, - [ClientPid ! {subscribe, F(Topic), Qos} || {Topic, Qos} <- Topics]; + ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]}; client_connected(_ConnAck, _Client, _Topics) -> ignore. diff --git a/apps/emqttd/src/emqttd_mod_presence.erl b/apps/emqttd/src/emqttd_mod_presence.erl index 3e5dbccb2..74c94a1c9 100644 --- a/apps/emqttd/src/emqttd_mod_presence.erl +++ b/apps/emqttd/src/emqttd_mod_presence.erl @@ -56,16 +56,18 @@ client_connected(ConnAck, #mqtt_client{clientid = ClientId, {protocol, ProtoVer}, {connack, ConnAck}, {ts, emqttd_vm:timestamp()}]), - Message = #mqtt_message{qos = proplists:get_value(qos, Opts, 0), + Message = #mqtt_message{from = presence, + qos = proplists:get_value(qos, Opts, 0), topic = topic(connected, ClientId), payload = iolist_to_binary(Json)}, - emqttd_pubsub:publish(presence, Message). + emqttd_pubsub:publish(Message). client_disconnected(Reason, ClientId, Opts) -> Json = mochijson2:encode([{reason, reason(Reason)}, {ts, emqttd_vm:timestamp()}]), - emqttd_pubsub:publish(presence, #mqtt_message{qos = proplists:get_value(qos, Opts, 0), - topic = topic(disconnected, ClientId), - payload = iolist_to_binary(Json)}). + emqttd_pubsub:publish(#mqtt_message{from = presence, + qos = proplists:get_value(qos, Opts, 0), + topic = topic(disconnected, ClientId), + payload = iolist_to_binary(Json)}). unload(_Opts) -> emqttd_broker:unhook(client_connected, {?MODULE, client_connected}), diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index c8dff7bf3..086bd6be3 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -299,7 +299,7 @@ shutdown(normal, #proto_state{peername = Peername, clientid = ClientId, will_msg lager:info([{client, ClientId}], "Client ~s@~s: normal shutdown", [ClientId, emqttd_net:format(Peername)]), try_unregister(ClientId), - send_willmsg(ClientId, WillMsg); + send_willmsg(ClientId, WillMsg), emqttd_broker:foreach_hooks(client_disconnected, [normal, ClientId]); shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> diff --git a/apps/emqttd/src/emqttd_session_sup.erl b/apps/emqttd/src/emqttd_session_sup.erl index 9fe7f8ca3..ff12b5055 100644 --- a/apps/emqttd/src/emqttd_session_sup.erl +++ b/apps/emqttd/src/emqttd_session_sup.erl @@ -55,7 +55,7 @@ start_session(CleanSess, ClientId, ClientPid) -> %%%============================================================================= init([]) -> - {ok, {{simple_one_for_one, 0, 1}, + {ok, {{simple_one_for_one, 10, 10}, [{session, {emqttd_session, start_link, []}, transient, 10000, worker, [emqttd_session]}]}}. diff --git a/apps/emqttd/src/emqttd_sm.erl b/apps/emqttd/src/emqttd_sm.erl index f1e7041f2..bbebf3db5 100644 --- a/apps/emqttd/src/emqttd_sm.erl +++ b/apps/emqttd/src/emqttd_sm.erl @@ -140,7 +140,7 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> case ets:lookup(?SESSION_TAB, ClientId) of [{_, SessPid, MRef}] -> erlang:demonitor(MRef, [flush]), - emqttd_session:destroy_session(SessPid, ClientId); + emqttd_session:destroy(SessPid, ClientId); [] -> ok end, diff --git a/apps/emqttd/src/emqttd_stats.erl b/apps/emqttd/src/emqttd_stats.erl index 0b5e65522..45cec4ffe 100644 --- a/apps/emqttd/src/emqttd_stats.erl +++ b/apps/emqttd/src/emqttd_stats.erl @@ -154,8 +154,9 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= publish(Stat, Val) -> - emqttd_pubsub:publish(stats, #mqtt_message{topic = stats_topic(Stat), - payload = emqttd_util:integer_to_binary(Val)}). + emqttd_pubsub:publish(#mqtt_message{from = stats, + topic = stats_topic(Stat), + payload = emqttd_util:integer_to_binary(Val)}). stats_topic(Stat) -> emqtt_topic:systop(list_to_binary(lists:concat(['stats/', Stat]))). diff --git a/apps/emqttd/src/emqttd_ws_client.erl b/apps/emqttd/src/emqttd_ws_client.erl index aa4006b4f..25dc334c7 100644 --- a/apps/emqttd/src/emqttd_ws_client.erl +++ b/apps/emqttd/src/emqttd_ws_client.erl @@ -133,9 +133,9 @@ handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) handle_cast(_Msg, State) -> {noreply, State}. -handle_info({deliver, Message}, #state{proto_state = ProtoState} = State) -> +handle_info({deliver, Message}, #client_state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:send(Message, ProtoState), - {noreply, State#state{proto_state = ProtoState1}}; + {noreply, State#client_state{proto_state = ProtoState1}}; handle_info({redeliver, {?PUBREL, PacketId}}, #client_state{proto_state = ProtoState} = State) -> {ok, ProtoState1} = emqttd_protocol:redeliver({?PUBREL, PacketId}, ProtoState), diff --git a/apps/zenmq/README b/apps/zenmq/README deleted file mode 100644 index e91132732..000000000 --- a/apps/zenmq/README +++ /dev/null @@ -1,14 +0,0 @@ -## Overview - -ZenMQ is the core architecture of distributed pubsub messaging queue written in Erlang. - -## Responsibilties - -* Topic Trie Tree -* Message Route -* Queue Management -* Broker Cluster -* Distributed Broker - -**Notice that this is an experimental design** - diff --git a/apps/zenmq/src/zenmq.app.src b/apps/zenmq/src/zenmq.app.src deleted file mode 100644 index 2d191d048..000000000 --- a/apps/zenmq/src/zenmq.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, zenmq, - [ - {description, ""}, - {vsn, "1"}, - {registered, []}, - {applications, [ - kernel, - stdlib - ]}, - {mod, { zenmq_app, []}}, - {env, []} - ]}. diff --git a/apps/zenmq/src/zenmq.erl b/apps/zenmq/src/zenmq.erl deleted file mode 100644 index 1e96b16f5..000000000 --- a/apps/zenmq/src/zenmq.erl +++ /dev/null @@ -1,2 +0,0 @@ --module(zenmq). - diff --git a/apps/zenmq/src/zenmq_app.erl b/apps/zenmq/src/zenmq_app.erl deleted file mode 100644 index 15200771a..000000000 --- a/apps/zenmq/src/zenmq_app.erl +++ /dev/null @@ -1,16 +0,0 @@ --module(zenmq_app). - --behaviour(application). - -%% Application callbacks --export([start/2, stop/1]). - -%% =================================================================== -%% Application callbacks -%% =================================================================== - -start(_StartType, _StartArgs) -> - zenmq_sup:start_link(). - -stop(_State) -> - ok. diff --git a/apps/zenmq/src/zenmq_router.erl b/apps/zenmq/src/zenmq_router.erl deleted file mode 100644 index ecd6511e0..000000000 --- a/apps/zenmq/src/zenmq_router.erl +++ /dev/null @@ -1,4 +0,0 @@ - --module(zenmq_router). - - diff --git a/apps/zenmq/src/zenmq_sup.erl b/apps/zenmq/src/zenmq_sup.erl deleted file mode 100644 index f626a12b1..000000000 --- a/apps/zenmq/src/zenmq_sup.erl +++ /dev/null @@ -1,27 +0,0 @@ --module(zenmq_sup). - --behaviour(supervisor). - -%% API --export([start_link/0]). - -%% Supervisor callbacks --export([init/1]). - -%% Helper macro for declaring children of supervisor --define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). - -%% =================================================================== -%% API functions -%% =================================================================== - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -%% =================================================================== -%% Supervisor callbacks -%% =================================================================== - -init([]) -> - {ok, { {one_for_one, 5, 10}, []} }. -