This commit is contained in:
Ery Lee 2015-04-15 20:47:28 +08:00
parent 9325c31be6
commit 95652c77d0
4 changed files with 31 additions and 32 deletions

View File

@ -119,29 +119,29 @@ match_who(#mqtt_client{ipaddr = undefined}, {ipaddr, _Tup}) ->
match_who(#mqtt_client{ipaddr = IP}, {ipaddr, {_CDIR, Start, End}}) ->
I = esockd_access:atoi(IP),
I >= Start andalso I =< End;
match_who(_User, _Who) ->
match_who(_Client, _Who) ->
false.
match_topics(_User, _Topic, []) ->
match_topics(_Client, _Topic, []) ->
false;
match_topics(User, Topic, [{pattern, PatternFilter}|Filters]) ->
TopicFilter = feed_var(User, PatternFilter),
match_topics(Client, Topic, [{pattern, PatternFilter}|Filters]) ->
TopicFilter = feed_var(Client, PatternFilter),
case match_topic(emqtt_topic:words(Topic), TopicFilter) of
true -> true;
false -> match_topics(User, Topic, Filters)
false -> match_topics(Client, Topic, Filters)
end;
match_topics(User, Topic, [TopicFilter|Filters]) ->
match_topics(Client, Topic, [TopicFilter|Filters]) ->
case match_topic(emqtt_topic:words(Topic), TopicFilter) of
true -> true;
false -> match_topics(User, Topic, Filters)
false -> match_topics(Client, Topic, Filters)
end.
match_topic(Topic, TopicFilter) ->
emqtt_topic:match(Topic, TopicFilter).
feed_var(User, Pattern) ->
feed_var(User, Pattern, []).
feed_var(_User, [], Acc) ->
feed_var(Client, Pattern) ->
feed_var(Client, Pattern, []).
feed_var(_Client, [], Acc) ->
lists:reverse(Acc);
feed_var(Client = #mqtt_client{clientid = undefined}, [<<"$c">>|Words], Acc) ->
feed_var(Client, Words, [<<"$c">>|Acc]);
@ -154,4 +154,3 @@ feed_var(Client = #mqtt_client{username = Username}, [<<"$u">>|Words], Acc) ->
feed_var(Client, [W|Words], Acc) ->
feed_var(Client, Words, [W|Acc]).

View File

@ -33,7 +33,7 @@
-define(SERVER, ?MODULE).
%% API Function Exports
-export([start_link/0, allow/3]).
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -46,11 +46,6 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
allow(subscribe, User, Topic) ->
true;
allow(publish, User, Topic) ->
true.
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------

View File

@ -95,9 +95,8 @@ received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
received(_Packet, State = #proto_state{connected = false}) ->
{error, protocol_not_connected, State};
received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername,
clientid = ClientId}) ->
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]),
received(Packet = ?PACKET(_Type), State) ->
trace(recv, Packet, State),
case validate_packet(Packet) of
ok ->
handle(Packet, State);
@ -114,7 +113,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State = #proto_state{peername = Peername =
keep_alive = KeepAlive,
clientid = ClientId} = Var,
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]),
trace(recv, Packet, State),
State1 = State#proto_state{proto_ver = ProtoVer,
username = Username,
@ -149,7 +148,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of
allow ->
emqttd_session:publish(Session, {?QOS_0, emqttd_message:from_packet(Packet)});
emqttd_session:publish(Session, {?QOS_0, emqtt_message:from_packet(Packet)});
deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
end,
@ -159,7 +158,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of
allow ->
emqttd_session:publish(Session, {?QOS_1, emqttd_message:from_packet(Packet)}),
emqttd_session:publish(Session, {?QOS_1, emqtt_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
@ -170,7 +169,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) ->
case emqttd_acl:check({client(State), publish, Topic}) of
allow ->
NewSession = emqttd_session:publish(Session, {?QOS_2, emqttd_message:from_packet(Packet)}),
NewSession = emqttd_session:publish(Session, {?QOS_2, emqtt_message:from_packet(Packet)}),
send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
@ -218,20 +217,20 @@ handle(?PACKET(?DISCONNECT), State) ->
-spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
%% qos0 message
send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) ->
send(emqttd_message:to_packet(Message), State);
send(emqtt_message:to_packet(Message), State);
%% message from session
send({_From = SessPid, Message}, State = #proto_state{session = SessPid}) when is_pid(SessPid) ->
send(emqttd_message:to_packet(Message), State);
send(emqtt_message:to_packet(Message), State);
%% message(qos1, qos2) not from session
send({_From, Message = #mqtt_message{qos = Qos}}, State = #proto_state{session = Session})
when (Qos =:= ?QOS_1) orelse (Qos =:= ?QOS_2) ->
{Message1, NewSession} = emqttd_session:store(Session, Message),
send(emqttd_message:to_packet(Message1), State#proto_state{session = NewSession});
send(emqtt_message:to_packet(Message1), State#proto_state{session = NewSession});
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername, clientid = ClientId}) when is_record(Packet, mqtt_packet) ->
lager:debug("SENT to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:dump(Packet)]),
send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername = Peername}) when is_record(Packet, mqtt_packet) ->
trace(send, Packet, State),
sent_stats(Packet),
Data = emqttd_serialiser:serialise(Packet),
lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
@ -239,6 +238,12 @@ send(Packet, State = #proto_state{transport = Transport, socket = Sock, peername
Transport:send(Sock, Data),
{ok, State}.
trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]);
trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) ->
lager:debug("SEND to ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqtt_packet:format(Packet)]).
%% @doc redeliver PUBREL PacketId
redeliver({?PUBREL, PacketId}, State) ->
send(?PUBREL_PACKET(PacketId), State).
@ -250,7 +255,7 @@ shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg
ok.
willmsg(Packet) when is_record(Packet, mqtt_packet_connect) ->
emqttd_message:from_packet(Packet).
emqtt_message:from_packet(Packet).
clientid(<<>>, #proto_state{peername = Peername}) ->
<<"eMQTT_", (base64:encode(emqttd_net:format(Peername)))/binary>>;

View File

@ -64,11 +64,11 @@ start_link() ->
%%------------------------------------------------------------------------------
-spec route(mqtt_message()) -> ok.
route(Msg) ->
lager:debug("Route ~s", [emqttd_message:dump(Msg)]),
lager:debug("Route ~p", [emqtt_message:format(Msg)]),
% TODO: need to retain?
emqttd_retained:retain(Msg),
% unset flag and pubsub
emqttd_pubsub:publish(emqttd_message:unset_flag(Msg)).
emqttd_pubsub:publish(emqtt_message:unset_flag(Msg)).
%%%=============================================================================
%%% gen_server callbacks