diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index 7295035cd..2aa9ae8f7 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -214,13 +214,13 @@ create(Topic) -> emqttd_pubsub:create(Topic). retain(Topic, Payload) when is_binary(Payload) -> - emqttd_router:route(#mqtt_message{retain = true, - topic = Topic, - payload = Payload}). + emqttd_router:route(broker, #mqtt_message{retain = true, + topic = Topic, + payload = Payload}). publish(Topic, Payload) when is_binary(Payload) -> - emqttd_router:route(#mqtt_message{topic = Topic, - payload = Payload}). + emqttd_router:route(broker, #mqtt_message{topic = Topic, + payload = Payload}). uptime(#state{started_at = Ts}) -> Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, diff --git a/apps/emqttd/src/emqttd_client.erl b/apps/emqttd/src/emqttd_client.erl index 7b1c68a63..236847000 100644 --- a/apps/emqttd/src/emqttd_client.erl +++ b/apps/emqttd/src/emqttd_client.erl @@ -43,8 +43,6 @@ -include_lib("emqtt/include/emqtt_packet.hrl"). --include("emqttd.hrl"). - %%Client State... -record(state, {transport, socket, diff --git a/apps/emqttd/src/emqttd_event.erl b/apps/emqttd/src/emqttd_event.erl index 701de9e63..b551655e9 100644 --- a/apps/emqttd/src/emqttd_event.erl +++ b/apps/emqttd/src/emqttd_event.erl @@ -75,13 +75,13 @@ init([]) -> handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, - emqttd_router:route(Msg), + emqttd_router:route(event, Msg), {ok, State}; handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> Topic = <>, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, - emqttd_router:route(Msg), + emqttd_router:route(event, Msg), {ok, State}; handle_event({subscribed, ClientId, TopicTable}, State) -> diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index ad7c42bc6..889bc10a9 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -55,10 +55,10 @@ handle('POST', "/mqtt/publish", Req) -> Message = list_to_binary(get_value("message", Params)), case {validate(qos, Qos), validate(topic, Topic)} of {true, true} -> - emqttd_router:route(#mqtt_message{qos = Qos, - retain = Retain, - topic = Topic, - payload = Message}), + emqttd_router:route(http, #mqtt_message{qos = Qos, + retain = Retain, + topic = Topic, + payload = Message}), Req:ok({"text/plan", <<"ok\n">>}); {false, _} -> Req:respond({400, [], <<"Bad QoS">>}); diff --git a/apps/emqttd/src/emqttd_metrics.erl b/apps/emqttd/src/emqttd_metrics.erl index 9c4e1e1bf..fac00e034 100644 --- a/apps/emqttd/src/emqttd_metrics.erl +++ b/apps/emqttd/src/emqttd_metrics.erl @@ -220,7 +220,7 @@ systop(Name) when is_atom(Name) -> list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). publish(Topic, Payload) -> - emqttd_router:route(#mqtt_message{topic = Topic, payload = Payload}). + emqttd_router:route(metrics, #mqtt_message{topic = Topic, payload = Payload}). new_metric({gauge, Name}) -> ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 6390954e6..c25104fa5 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -249,7 +249,7 @@ redeliver({?PUBREL, PacketId}, State) -> send(?PUBREL_PACKET(PacketId), State). shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> - send_willmsg(WillMsg), + send_willmsg(ClientId, WillMsg), try_unregister(ClientId, self()), lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, emqttd_net:format(Peername), Error]), ok. @@ -262,9 +262,11 @@ clientid(<<>>, #proto_state{peername = Peername}) -> clientid(ClientId, _State) -> ClientId. -send_willmsg(undefined) -> ignore; +send_willmsg(_ClientId, undefined) -> + ignore; %%TODO:should call session... -send_willmsg(WillMsg) -> emqttd_router:route(WillMsg). +send_willmsg(ClientId, WillMsg) -> + emqttd_router:route(ClientId, WillMsg). start_keepalive(0) -> ignore; start_keepalive(Sec) when Sec > 0 -> diff --git a/apps/emqttd/src/emqttd_retained.erl b/apps/emqttd/src/emqttd_retained.erl index a77cab166..5f48fce8f 100644 --- a/apps/emqttd/src/emqttd_retained.erl +++ b/apps/emqttd/src/emqttd_retained.erl @@ -62,7 +62,6 @@ retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) -> retain(Msg = #mqtt_message{topic = Topic, retain = true, - qos = Qos, payload = Payload}) -> TabSize = mnesia:table_info(message, size), case {TabSize < limit(table), size(Payload) < limit(payload)} of diff --git a/apps/emqttd/src/emqttd_router.erl b/apps/emqttd/src/emqttd_router.erl index 3a442daaf..8b88233dc 100644 --- a/apps/emqttd/src/emqttd_router.erl +++ b/apps/emqttd/src/emqttd_router.erl @@ -20,11 +20,10 @@ %% SOFTWARE. %%------------------------------------------------------------------------------ -%%route chain... statistics +%%TODO: route chain... statistics -module(emqttd_router). -include_lib("emqtt/include/emqtt.hrl"). -%-include("emqttd.hrl"). -behaviour(gen_server). @@ -34,7 +33,7 @@ -export([start_link/0]). %%Router Chain--> --->In Out<--- --export([route/1]). +-export([route/2]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -58,22 +57,21 @@ start_link() -> %%------------------------------------------------------------------------------ %% @doc -%% Route mqtt message. +%% Route mqtt message. From is clienid or module. %% %% @end %%------------------------------------------------------------------------------ --spec route(mqtt_message()) -> ok. -route(Msg) -> - lager:debug("Route ~p", [emqtt_message:format(Msg)]), - % TODO: need to retain? - emqttd_retained:retain(Msg), +-spec route(From :: binary() | atom(), Msg :: mqtt_message()) -> ok. +route(From, Msg) -> + lager:debug("Route from ~s: ~p", [From, emqtt_message:format(Msg)]), + % TODO: retained message should be stored in emqttd_pubsub... + % emqttd_retained:retain(Msg), % unset flag and pubsub - emqttd_pubsub:publish(emqtt_message:unset_flag(Msg)). + emqttd_pubsub:publish(Msg). %%%============================================================================= %%% gen_server callbacks %%%============================================================================= - init([]) -> {ok, #state{}, hibernate}. diff --git a/apps/emqttd/src/emqttd_session.erl b/apps/emqttd/src/emqttd_session.erl index 806736f84..970d68695 100644 --- a/apps/emqttd/src/emqttd_session.erl +++ b/apps/emqttd/src/emqttd_session.erl @@ -102,19 +102,19 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) -> %% @end %%------------------------------------------------------------------------------ -spec publish(session(), {mqtt_qos(), mqtt_message()}) -> session(). -publish(Session, {?QOS_0, Message}) -> - emqttd_router:route(Message), Session; +publish(Session = #session_state{clientid = ClientId}, {?QOS_0, Message}) -> + emqttd_router:route(ClientId, Message), Session; -publish(Session, {?QOS_1, Message}) -> - emqttd_router:route(Message), Session; +publish(Session = #session_state{clientid = ClientId}, {?QOS_1, Message}) -> + emqttd_router:route(ClientId, Message), Session; publish(SessState = #session_state{awaiting_rel = AwaitingRel}, - {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> + {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> %% store in awaiting_rel SessState#session_state{awaiting_rel = maps:put(MsgId, Message, AwaitingRel)}; publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) -> - gen_server:cast(SessPid, {publish, ?QOS_2, Message}), + gen_server:cast(SessPid, {publish, {?QOS_2, Message}}), SessPid. %%------------------------------------------------------------------------------ @@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) -> puback(SessState = #session_state{clientid = ClientId, awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> case maps:find(PacketId, Awaiting) of - {ok, Msg} -> emqttd_router:route(Msg); + {ok, Msg} -> emqttd_router:route(ClientId, Msg); error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) end, SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)}; @@ -310,7 +310,7 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{ msg_queue = emqttd_queue:clear(Queue), expire_timer = undefined}, hibernate}; -handle_cast({publish, ?QOS_2, Message}, State) -> +handle_cast({publish, {?QOS_2, Message}}, State) -> NewState = publish(State, {?QOS_2, Message}), {noreply, NewState};