This commit is contained in:
Feng Lee 2015-05-25 11:16:06 +08:00
parent 46545be9d0
commit 73dbc98068
3 changed files with 21 additions and 12 deletions

View File

@ -173,7 +173,7 @@ foldl_hooks(Hook, Args, Acc0) ->
apply(M, F, [Acc, Args++A]) apply(M, F, [Acc, Args++A])
end, Acc0, Hooks); end, Acc0, Hooks);
[] -> [] ->
ok Acc0
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -48,14 +48,16 @@ load(Opts) ->
emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe}, emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe},
{?MODULE, rewrite, [subscribe, Sections]}), {?MODULE, rewrite, [subscribe, Sections]}),
emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}, emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe},
{?MODULE, rewrite_unsubscribe, [unsubscribe, Sections]}), {?MODULE, rewrite, [unsubscribe, Sections]}),
emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish},
{?MODULE, rewrite_publish, [publish, Sections]}). {?MODULE, rewrite, [publish, Sections]}).
rewrite(TopicTable, [subscribe, Sections]) -> rewrite(TopicTable, [subscribe, Sections]) ->
lager:info("rewrite subscribe: ~p", [TopicTable]),
[{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]; [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable];
rewrite(Topics, [unsubscribe, Sections]) -> rewrite(Topics, [unsubscribe, Sections]) ->
lager:info("rewrite unsubscribe: ~p", [Topics]),
[match_topic(Topic, Sections) || Topic <- Topics]; [match_topic(Topic, Sections) || Topic <- Topics];
rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) -> rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) ->
@ -94,11 +96,14 @@ compile(Sections) ->
{ok, MP} = re:compile(Re), {ok, MP} = re:compile(Re),
{rewrite, MP, Dest} {rewrite, MP, Dest}
end, end,
[{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections]. F = fun({topic, Topic, Rules}) ->
{topic, list_to_binary(Topic), [C(R) || R <- Rules]}
end,
[F(Section) || Section <- Sections].
match_topic(Topic, []) -> match_topic(Topic, []) ->
Topic; Topic;
match_topic(Topic, [{topic, Filter, Rules}|Sections]) -> match_topic(Topic, [{topic, Filter, Rules} | Sections]) ->
case emqtt_topic:match(Topic, Filter) of case emqtt_topic:match(Topic, Filter) of
true -> true ->
match_rule(Topic, Rules); match_rule(Topic, Rules);
@ -108,8 +113,8 @@ match_topic(Topic, [{topic, Filter, Rules}|Sections]) ->
match_rule(Topic, []) -> match_rule(Topic, []) ->
Topic; Topic;
match_rule(Topic, [{rewrite, MP, Dest}|Rules]) -> match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
case re:run(Topic, MP, [{captrue, all_but_first, list}]) of case re:run(Topic, MP, [{capture, all_but_first, list}]) of
{match, Captured} -> {match, Captured} ->
%%TODO: stupid??? how to replace $1, $2? %%TODO: stupid??? how to replace $1, $2?
Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured),

View File

@ -158,7 +158,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) -> State = #proto_state{clientid = ClientId, session = Session}) ->
case check_acl(publish, Topic, State) of case check_acl(publish, Topic, State) of
allow -> allow ->
emqttd_session:publish(Session, ClientId, {?QOS_0, emqtt_message:from_packet(Packet)}); do_publish(Session, ClientId, ?QOS_0, Packet);
deny -> deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]) lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
end, end,
@ -168,7 +168,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) -> State = #proto_state{clientid = ClientId, session = Session}) ->
case check_acl(publish, Topic, State) of case check_acl(publish, Topic, State) of
allow -> allow ->
emqttd_session:publish(Session, ClientId, {?QOS_1, emqtt_message:from_packet(Packet)}), do_publish(Session, ClientId, ?QOS_1, Packet),
send(?PUBACK_PACKET(?PUBACK, PacketId), State); send(?PUBACK_PACKET(?PUBACK, PacketId), State);
deny -> deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
@ -179,7 +179,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
State = #proto_state{clientid = ClientId, session = Session}) -> State = #proto_state{clientid = ClientId, session = Session}) ->
case check_acl(publish, Topic, State) of case check_acl(publish, Topic, State) of
allow -> allow ->
NewSession = emqttd_session:publish(Session, ClientId, {?QOS_2, emqtt_message:from_packet(Packet)}), NewSession = do_publish(Session, ClientId, ?QOS_2, Packet),
send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession});
deny -> deny ->
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
@ -239,6 +239,10 @@ handle(?PACKET(?DISCONNECT), State) ->
% clean willmsg % clean willmsg
{stop, normal, State#proto_state{will_msg = undefined}}. {stop, normal, State#proto_state{will_msg = undefined}}.
do_publish(Session, ClientId, Qos, Packet) ->
Message = emqttd_broker:foldl_hooks(client_publish, [], emqtt_message:from_packet(Packet)),
emqttd_session:publish(Session, ClientId, {Qos, Message}).
-spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}. -spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
%% qos0 message %% qos0 message
send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) -> send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) ->