route with clientid

This commit is contained in:
Feng Lee 2015-04-15 22:20:19 +08:00
parent 50174901d7
commit b7a2b66db0
3 changed files with 15 additions and 15 deletions

View File

@ -148,7 +148,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) -> State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of case emqttd_acl:check({client(State), publish, Topic}) of
allow -> allow ->
emqttd_session:publish(Session, {?QOS_0, emqtt_message:from_packet(Packet)}); emqttd_session:publish(Session, ClientId, {?QOS_0, emqtt_message:from_packet(Packet)});
deny -> deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]) lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
end, end,
@ -158,7 +158,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) -> State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of case emqttd_acl:check({client(State), publish, Topic}) of
allow -> allow ->
emqttd_session:publish(Session, {?QOS_1, emqtt_message:from_packet(Packet)}), emqttd_session:publish(Session, ClientId, {?QOS_1, emqtt_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBACK, PacketId), State); send(?PUBACK_PACKET(?PUBACK, PacketId), State);
deny -> deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
@ -169,7 +169,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) -> State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of case emqttd_acl:check({client(State), publish, Topic}) of
allow -> allow ->
NewSession = emqttd_session:publish(Session, {?QOS_2, emqtt_message:from_packet(Packet)}), NewSession = emqttd_session:publish(Session, ClientId, {?QOS_2, emqtt_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
deny -> deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
@ -239,10 +239,10 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername
{ok, State}. {ok, State}.
trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]); lager:info("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]);
trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
lager:debug("SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]). lager:info("SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]).
%% @doc redeliver PUBREL PacketId %% @doc redeliver PUBREL PacketId
redeliver({?PUBREL, PacketId}, State) -> redeliver({?PUBREL, PacketId}, State) ->

View File

@ -63,7 +63,7 @@ start_link() ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec route(From :: binary() | atom(), Msg :: mqtt_message()) -> ok. -spec route(From :: binary() | atom(), Msg :: mqtt_message()) -> ok.
route(From, Msg) -> route(From, Msg) ->
lager:debug("Route from ~s: ~p", [From, emqtt_message:format(Msg)]), lager:info("Route ~s from ~s", [emqtt_message:format(Msg), From]),
% TODO: retained message should be stored in emqttd_pubsub... % TODO: retained message should be stored in emqttd_pubsub...
% emqttd_retained:retain(Msg), % emqttd_retained:retain(Msg),
% unset flag and pubsub % unset flag and pubsub

View File

@ -33,7 +33,7 @@
%% API Function Exports %% API Function Exports
-export([start/1, -export([start/1,
resume/3, resume/3,
publish/2, publish/3,
puback/2, puback/2,
subscribe/2, subscribe/2,
unsubscribe/2, unsubscribe/2,
@ -101,20 +101,20 @@ resume(SessPid, ClientId, ClientPid) when is_pid(SessPid) ->
%% %%
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec publish(session(), {mqtt_qos(), mqtt_message()}) -> session(). -spec publish(session(), mqtt_clientid(), {mqtt_qos(), mqtt_message()}) -> session().
publish(Session = #session_state{clientid = ClientId}, {?QOS_0, Message}) -> publish(Session, ClientId, {?QOS_0, Message}) ->
emqttd_router:route(ClientId, Message), Session; emqttd_router:route(ClientId, Message), Session;
publish(Session = #session_state{clientid = ClientId}, {?QOS_1, Message}) -> publish(Session, ClientId, {?QOS_1, Message}) ->
emqttd_router:route(ClientId, Message), Session; emqttd_router:route(ClientId, Message), Session;
publish(SessState = #session_state{awaiting_rel = AwaitingRel}, publish(SessState = #session_state{awaiting_rel = AwaitingRel}, _ClientId,
{?QOS_2, Message = #mqtt_message{msgid = MsgId}}) -> {?QOS_2, Message = #mqtt_message{msgid = MsgId}}) ->
%% store in awaiting_rel %% store in awaiting_rel
SessState#session_state{awaiting_rel = maps:put(MsgId, Message, AwaitingRel)}; SessState#session_state{awaiting_rel = maps:put(MsgId, Message, AwaitingRel)};
publish(SessPid, {?QOS_2, Message}) when is_pid(SessPid) -> publish(SessPid, ClientId, {?QOS_2, Message}) when is_pid(SessPid) ->
gen_server:cast(SessPid, {publish, {?QOS_2, Message}}), gen_server:cast(SessPid, {publish, ClientId, {?QOS_2, Message}}),
SessPid. SessPid.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -310,8 +310,8 @@ handle_cast({resume, ClientId, ClientPid}, State = #session_state{
msg_queue = emqttd_queue:clear(Queue), msg_queue = emqttd_queue:clear(Queue),
expire_timer = undefined}, hibernate}; expire_timer = undefined}, hibernate};
handle_cast({publish, {?QOS_2, Message}}, State) -> handle_cast({publish, ClientId, {?QOS_2, Message}}, State) ->
NewState = publish(State, {?QOS_2, Message}), NewState = publish(State, ClientId, {?QOS_2, Message}),
{noreply, NewState}; {noreply, NewState};
handle_cast({puback, PacketId}, State) -> handle_cast({puback, PacketId}, State) ->