diff --git a/apps/emqttd/src/emqttd_broker.erl b/apps/emqttd/src/emqttd_broker.erl index ebff43718..bff331d9c 100644 --- a/apps/emqttd/src/emqttd_broker.erl +++ b/apps/emqttd/src/emqttd_broker.erl @@ -173,7 +173,7 @@ foldl_hooks(Hook, Args, Acc0) -> apply(M, F, [Acc, Args++A]) end, Acc0, Hooks); [] -> - ok + Acc0 end. %%------------------------------------------------------------------------------ diff --git a/apps/emqttd/src/emqttd_mod_rewrite.erl b/apps/emqttd/src/emqttd_mod_rewrite.erl index 4131afcf6..fc038d95f 100644 --- a/apps/emqttd/src/emqttd_mod_rewrite.erl +++ b/apps/emqttd/src/emqttd_mod_rewrite.erl @@ -48,14 +48,16 @@ load(Opts) -> emqttd_broker:hook(client_subscribe, {?MODULE, rewrite_subscribe}, {?MODULE, rewrite, [subscribe, Sections]}), emqttd_broker:hook(client_unsubscribe, {?MODULE, rewrite_unsubscribe}, - {?MODULE, rewrite_unsubscribe, [unsubscribe, Sections]}), + {?MODULE, rewrite, [unsubscribe, Sections]}), emqttd_broker:hook(client_publish, {?MODULE, rewrite_publish}, - {?MODULE, rewrite_publish, [publish, Sections]}). + {?MODULE, rewrite, [publish, Sections]}). rewrite(TopicTable, [subscribe, Sections]) -> + lager:info("rewrite subscribe: ~p", [TopicTable]), [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]; rewrite(Topics, [unsubscribe, Sections]) -> + lager:info("rewrite unsubscribe: ~p", [Topics]), [match_topic(Topic, Sections) || Topic <- Topics]; rewrite(Message=#mqtt_message{topic = Topic}, [publish, Sections]) -> @@ -91,14 +93,17 @@ unload(_) -> compile(Sections) -> C = fun({rewrite, Re, Dest}) -> - {ok, MP} = re:compile(Re), - {rewrite, MP, Dest} + {ok, MP} = re:compile(Re), + {rewrite, MP, Dest} end, - [{topic, Topic, [C(R) || R <- Rules]} || {topic, Topic, Rules} <- Sections]. + F = fun({topic, Topic, Rules}) -> + {topic, list_to_binary(Topic), [C(R) || R <- Rules]} + end, + [F(Section) || Section <- Sections]. match_topic(Topic, []) -> Topic; -match_topic(Topic, [{topic, Filter, Rules}|Sections]) -> +match_topic(Topic, [{topic, Filter, Rules} | Sections]) -> case emqtt_topic:match(Topic, Filter) of true -> match_rule(Topic, Rules); @@ -108,8 +113,8 @@ match_topic(Topic, [{topic, Filter, Rules}|Sections]) -> match_rule(Topic, []) -> Topic; -match_rule(Topic, [{rewrite, MP, Dest}|Rules]) -> - case re:run(Topic, MP, [{captrue, all_but_first, list}]) of +match_rule(Topic, [{rewrite, MP, Dest} | Rules]) -> + case re:run(Topic, MP, [{capture, all_but_first, list}]) of {match, Captured} -> %%TODO: stupid??? how to replace $1, $2? Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured), diff --git a/apps/emqttd/src/emqttd_protocol.erl b/apps/emqttd/src/emqttd_protocol.erl index 71c33bd72..08b24674f 100644 --- a/apps/emqttd/src/emqttd_protocol.erl +++ b/apps/emqttd/src/emqttd_protocol.erl @@ -158,7 +158,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case check_acl(publish, Topic, State) of allow -> - emqttd_session:publish(Session, ClientId, {?QOS_0, emqtt_message:from_packet(Packet)}); + do_publish(Session, ClientId, ?QOS_0, Packet); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]) end, @@ -168,7 +168,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case check_acl(publish, Topic, State) of allow -> - emqttd_session:publish(Session, ClientId, {?QOS_1, emqtt_message:from_packet(Packet)}), + do_publish(Session, ClientId, ?QOS_1, Packet), send(?PUBACK_PACKET(?PUBACK, PacketId), State); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), @@ -179,7 +179,7 @@ handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), State = #proto_state{clientid = ClientId, session = Session}) -> case check_acl(publish, Topic, State) of allow -> - NewSession = emqttd_session:publish(Session, ClientId, {?QOS_2, emqtt_message:from_packet(Packet)}), + NewSession = do_publish(Session, ClientId, ?QOS_2, Packet), send(?PUBACK_PACKET(?PUBREC, PacketId), State#proto_state{session = NewSession}); deny -> lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]), @@ -239,6 +239,10 @@ handle(?PACKET(?DISCONNECT), State) -> % clean willmsg {stop, normal, State#proto_state{will_msg = undefined}}. +do_publish(Session, ClientId, Qos, Packet) -> + Message = emqttd_broker:foldl_hooks(client_publish, [], emqtt_message:from_packet(Packet)), + emqttd_session:publish(Session, ClientId, {Qos, Message}). + -spec send({pid() | tuple(), mqtt_message()} | mqtt_packet(), proto_state()) -> {ok, proto_state()}. %% qos0 message send({_From, Message = #mqtt_message{qos = ?QOS_0}}, State) ->