route with From

This commit is contained in:
Feng Lee 2015-04-15 21:15:09 +08:00
parent 95652c77d0
commit 4fe72445fd
9 changed files with 34 additions and 37 deletions

View File

@ -214,13 +214,13 @@ create(Topic) ->
emqttd_pubsub:create(Topic). emqttd_pubsub:create(Topic).
retain(Topic, Payload) when is_binary(Payload) -> retain(Topic, Payload) when is_binary(Payload) ->
emqttd_router:route(#mqtt_message{retain = true, emqttd_router:route(broker, #mqtt_message{retain = true,
topic = Topic, topic = Topic,
payload = Payload}). payload = Payload}).
publish(Topic, Payload) when is_binary(Payload) -> publish(Topic, Payload) when is_binary(Payload) ->
emqttd_router:route(#mqtt_message{topic = Topic, emqttd_router:route(broker, #mqtt_message{topic = Topic,
payload = Payload}). payload = Payload}).
uptime(#state{started_at = Ts}) -> uptime(#state{started_at = Ts}) ->
Secs = timer:now_diff(os:timestamp(), Ts) div 1000000, Secs = timer:now_diff(os:timestamp(), Ts) div 1000000,

View File

@ -43,8 +43,6 @@
-include_lib("emqtt/include/emqtt_packet.hrl"). -include_lib("emqtt/include/emqtt_packet.hrl").
-include("emqttd.hrl").
%%Client State... %%Client State...
-record(state, {transport, -record(state, {transport,
socket, socket,

View File

@ -75,13 +75,13 @@ init([]) ->
handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) -> handle_event({connected, ClientId, Params}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>, Topic = <<SysTop/binary, "clients/", ClientId/binary, "/connected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)}, Msg = #mqtt_message{topic = Topic, payload = payload(connected, Params)},
emqttd_router:route(Msg), emqttd_router:route(event, Msg),
{ok, State}; {ok, State};
handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) -> handle_event({disconnectd, ClientId, Reason}, State = #state{systop = SysTop}) ->
Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>, Topic = <<SysTop/binary, "clients/", ClientId/binary, "/disconnected">>,
Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)}, Msg = #mqtt_message{topic = Topic, payload = payload(disconnected, Reason)},
emqttd_router:route(Msg), emqttd_router:route(event, Msg),
{ok, State}; {ok, State};
handle_event({subscribed, ClientId, TopicTable}, State) -> handle_event({subscribed, ClientId, TopicTable}, State) ->

View File

@ -55,10 +55,10 @@ handle('POST', "/mqtt/publish", Req) ->
Message = list_to_binary(get_value("message", Params)), Message = list_to_binary(get_value("message", Params)),
case {validate(qos, Qos), validate(topic, Topic)} of case {validate(qos, Qos), validate(topic, Topic)} of
{true, true} -> {true, true} ->
emqttd_router:route(#mqtt_message{qos = Qos, emqttd_router:route(http, #mqtt_message{qos = Qos,
retain = Retain, retain = Retain,
topic = Topic, topic = Topic,
payload = Message}), payload = Message}),
Req:ok({"text/plan", <<"ok\n">>}); Req:ok({"text/plan", <<"ok\n">>});
{false, _} -> {false, _} ->
Req:respond({400, [], <<"Bad QoS">>}); Req:respond({400, [], <<"Bad QoS">>});

View File

@ -220,7 +220,7 @@ systop(Name) when is_atom(Name) ->
list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])). list_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])).
publish(Topic, Payload) -> publish(Topic, Payload) ->
emqttd_router:route(#mqtt_message{topic = Topic, payload = Payload}). emqttd_router:route(metrics, #mqtt_message{topic = Topic, payload = Payload}).
new_metric({gauge, Name}) -> new_metric({gauge, Name}) ->
ets:insert(?METRIC_TABLE, {{Name, 0}, 0}); ets:insert(?METRIC_TABLE, {{Name, 0}, 0});

View File

@ -249,7 +249,7 @@ redeliver({?PUBREL, PacketId}, State) ->
send(?PUBREL_PACKET(PacketId), State). send(?PUBREL_PACKET(PacketId), State).
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) ->
send_willmsg(WillMsg), send_willmsg(ClientId, WillMsg),
try_unregister(ClientId, self()), try_unregister(ClientId, self()),
lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, emqttd_net:format(Peername), Error]), lager:debug("Protocol ~s@~s Shutdown: ~p", [ClientId, emqttd_net:format(Peername), Error]),
ok. ok.
@ -262,9 +262,11 @@ clientid(<<>>, #proto_state{peername = Peername}) ->
clientid(ClientId, _State) -> ClientId. clientid(ClientId, _State) -> ClientId.
send_willmsg(undefined) -> ignore; send_willmsg(_ClientId, undefined) ->
ignore;
%%TODO:should call session... %%TODO:should call session...
send_willmsg(WillMsg) -> emqttd_router:route(WillMsg). send_willmsg(ClientId, WillMsg) ->
emqttd_router:route(ClientId, WillMsg).
start_keepalive(0) -> ignore; start_keepalive(0) -> ignore;
start_keepalive(Sec) when Sec > 0 -> start_keepalive(Sec) when Sec > 0 ->

View File

@ -62,7 +62,6 @@ retain(#mqtt_message{retain = true, topic = Topic, payload = <<>>}) ->
retain(Msg = #mqtt_message{topic = Topic, retain(Msg = #mqtt_message{topic = Topic,
retain = true, retain = true,
qos = Qos,
payload = Payload}) -> payload = Payload}) ->
TabSize = mnesia:table_info(message, size), TabSize = mnesia:table_info(message, size),
case {TabSize < limit(table), size(Payload) < limit(payload)} of case {TabSize < limit(table), size(Payload) < limit(payload)} of

View File

@ -20,11 +20,10 @@
%% SOFTWARE. %% SOFTWARE.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%%route chain... statistics %%TODO: route chain... statistics
-module(emqttd_router). -module(emqttd_router).
-include_lib("emqtt/include/emqtt.hrl"). -include_lib("emqtt/include/emqtt.hrl").
%-include("emqttd.hrl").
-behaviour(gen_server). -behaviour(gen_server).
@ -34,7 +33,7 @@
-export([start_link/0]). -export([start_link/0]).
%%Router Chain--> --->In Out<--- %%Router Chain--> --->In Out<---
-export([route/1]). -export([route/2]).
%% gen_server Function Exports %% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -58,22 +57,21 @@ start_link() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc %% @doc
%% Route mqtt message. %% Route mqtt message. From is clienid or module.
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec route(mqtt_message()) -> ok. -spec route(From :: binary() | atom(), Msg :: mqtt_message()) -> ok.
route(Msg) -> route(From, Msg) ->
lager:debug("Route ~p", [emqtt_message:format(Msg)]), lager:debug("Route from ~s: ~p", [From, emqtt_message:format(Msg)]),
% TODO: need to retain? % TODO: retained message should be stored in emqttd_pubsub...
emqttd_retained:retain(Msg), % emqttd_retained:retain(Msg),
% unset flag and pubsub % unset flag and pubsub
emqttd_pubsub:publish(emqtt_message:unset_flag(Msg)). emqttd_pubsub:publish(Msg).
%%%============================================================================= %%%=============================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%============================================================================= %%%=============================================================================
init([]) -> init([]) ->
{ok, #state{}, hibernate}. {ok, #state{}, hibernate}.

View File

@ -102,19 +102,19 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec publish(session(), {mqtt_qos(), mqtt_message()}) -> session(). -spec publish(session(), {mqtt_qos(), mqtt_message()}) -> session().
publish(Session, {?QOS_0, Message}) -> publish(Session = #session_state{clientid = ClientId}, {?QOS_0, Message}) ->
emqttd_router:route(Message), Session; emqttd_router:route(ClientId, Message), Session;
publish(Session, {?QOS_1, Message}) -> publish(Session = #session_state{clientid = ClientId}, {?QOS_1, Message}) ->
emqttd_router:route(Message), Session; emqttd_router:route(ClientId, Message), Session;
publish(SessState = #session_state{awaiting_rel = AwaitingRel}, publish(SessState = #session_state{awaiting_rel = AwaitingRel},
{?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
%% store in awaiting_rel %% store in awaiting_rel
SessState#session_state{awaiting_rel = maps:put(MsgId, Message, AwaitingRel)}; SessState#session_state{awaiting_rel = maps:put(MsgId, Message, AwaitingRel)};
publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) -> publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {publish, ?QOS_2, Message}), gen_server:cast(SessPid, {publish, {?QOS_2, Message}}),
SessPid. SessPid.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -151,7 +151,7 @@ puback(SessPid, {?PUBREC, PacketId}) when is_pid(SessPid) ->
puback(SessState = #session_state{clientid = ClientId, puback(SessState = #session_state{clientid = ClientId,
awaiting_rel = Awaiting}, {?PUBREL, PacketId}) -> awaiting_rel = Awaiting}, {?PUBREL, PacketId}) ->
case maps:find(PacketId, Awaiting) of case maps:find(PacketId, Awaiting) of
{ok, Msg} -> emqttd_router:route(Msg); {ok, Msg} -> emqttd_router:route(ClientId, Msg);
error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId]) error -> lager:warning("Session ~s: PUBREL PacketId '~p' not found!", [ClientId, PacketId])
end, end,
SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)}; SessState#session_state{awaiting_rel = maps:remove(PacketId, Awaiting)};
@ -310,7 +310,7 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{
msg_queue = emqttd_queue:clear(Queue), msg_queue = emqttd_queue:clear(Queue),
expire_timer = undefined}, hibernate}; expire_timer = undefined}, hibernate};
handle_cast({publish, ?QOS_2, Message}, State) -> handle_cast({publish, {?QOS_2, Message}}, State) ->
NewState = publish(State, {?QOS_2, Message}), NewState = publish(State, {?QOS_2, Message}),
{noreply, NewState}; {noreply, NewState};