From 8ee34333152ae5559ca5b32ae10c5a153ccfe7d2 Mon Sep 17 00:00:00 2001 From: Ery Lee Date: Tue, 7 Apr 2015 23:05:27 +0800 Subject: [PATCH] syn with imac --- apps/emqttd/src/emqttd_acl.erl | 6 +++-- apps/emqttd/src/emqttd_protocol.erl | 40 ++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/apps/emqttd/src/emqttd_acl.erl b/apps/emqttd/src/emqttd_acl.erl index b2268e9b6..7dfef1e5d 100644 --- a/apps/emqttd/src/emqttd_acl.erl +++ b/apps/emqttd/src/emqttd_acl.erl @@ -35,7 +35,9 @@ -define(SERVER, ?MODULE). %% API Function Exports --export([start_link/1, check/3, reload/0, register_mod/1, unregister_mod/1, all_modules/0, stop/0]). +-export([start_link/1, check/3, reload/0, + register_mod/1, unregister_mod/1, all_modules/0, + stop/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -87,7 +89,7 @@ start_link(AclOpts) -> %% Check ACL. %% %% @end -%%------------------------------------------------------------------------------ +%%-------------------------------------------------------------------------- -spec check(User, PubSub, Topic) -> {ok, allow | deny} | {error, any()} when User :: mqtt_user(), PubSub :: publish | subscribe, diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 7629a46e3..e64504f21 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -96,7 +96,15 @@ received(Packet = ?PACKET(_Type), State = #proto_state{peername = Peername, lager:debug("RECV from ~s@~s: ~s", [ClientId, emqttd_net:format(Peername), emqttd_packet:dump(Packet)]), case validate_packet(Packet) of ok -> - handle(Packet, State); + case access_control(Packet, State) of + {ok, allow} -> + handle(Packet, State); + {ok, deny} -> + {error, acl_denied, State}; + {error, AclError} -> + lager:error("Client ~s@~s: acl error - ~p", [ClientId, emqttd_net:format(Peername), AclError]), + {error, acl_error, State} + end; {error, Reason} -> {error, Reason, State} end. @@ -308,6 +316,36 @@ validate_qos(undefined) -> true; validate_qos(Qos) when Qos =< ?QOS_2 -> true; validate_qos(_) -> false. +access_control(publish, Topic, State = #proto_state{client_id = ClientId}) -> + case emqttd_acl:check(mqtt_user(State), publish, Topic) of + {ok, allow} -> + allow; + {ok, deny} -> + lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), deny; + {error, AclError} -> + lager:error("ACL Error: ~p when ~s publish to ~s", [AclError, ClientId, Topic]), deny + end. + +access_control(?SUBSCRIBE_PACKET(_PacketId, TopicTable), State) -> + check_acl(mqtt_user(State), subscribe, [Topic || {Topic, _Qos} <- TopicTable]); + +mqtt_user(#proto_state{peername = {Addr, _Port}, client_id = ClientId, username = Username}) -> + #mqtt_user{username = Username, clientid = ClientId, ipaddr = Addr}. + +check_acl(_User, subscribe, []) -> + {ok, allow}; +check_acl(User = #mqtt_user{clientid=ClientId}, subscribe, [Topic|Topics]) -> + case emqttd_acl:check(User, subscribe, Topic) of + {ok, allow} -> + check_acl(User, subscribe, Topics); + {ok, deny} -> + lager:warning("ACL Deny: ~s cannnot subscribe ~s", [ClientId, Topic]), + {ok, deny}; + {error, Error} -> + {error, Error} + end. + + try_unregister(undefined, _) -> ok; try_unregister(ClientId, _) -> emqttd_cm:unregister(ClientId, self()).