diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 36dc778d9..794193a5d 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -44,27 +44,9 @@ handle_request('GET', "/status", Req) -> %%-------------------------------------------------------------------- handle_request('POST', "/mqtt/publish", Req) -> - Params = mochiweb_request:parse_post(Req), - lager:info("HTTP Publish: ~p", [Params]), case authorized(Req) of - true -> - ClientId = get_value("client", Params, http), - Qos = int(get_value("qos", Params, "0")), - Retain = bool(get_value("retain", Params, "0")), - Topic = list_to_binary(get_value("topic", Params)), - Payload = list_to_binary(get_value("message", Params)), - case {validate(qos, Qos), validate(topic, Topic)} of - {true, true} -> - Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), - emqttd:publish(Msg#mqtt_message{retain = Retain}), - Req:ok({"text/plain", <<"ok">>}); - {false, _} -> - Req:respond({400, [], <<"Bad QoS">>}); - {_, false} -> - Req:respond({400, [], <<"Bad Topic">>}) - end; - false -> - Req:respond({401, [], <<"Fobbiden">>}) + true -> http_publish(Req); + false -> Req:respond({401, [], <<"Fobbiden">>}) end; %%-------------------------------------------------------------------- @@ -97,9 +79,53 @@ handle_request(Method, Path, Req) -> lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]), Req:not_found(). +%%-------------------------------------------------------------------- +%% HTTP Publish +%%-------------------------------------------------------------------- + +http_publish(Req) -> + Params = mochiweb_request:parse_post(Req), + lager:info("HTTP Publish: ~p", [Params]), + Topics = topics(Params), + ClientId = get_value("client", Params, http), + Qos = int(get_value("qos", Params, "0")), + Retain = bool(get_value("retain", Params, "0")), + Payload = list_to_binary(get_value("message", Params)), + case {validate(qos, Qos), validate(topics, Topics)} of + {true, true} -> + lists:foreach(fun(Topic) -> + Msg = emqttd_message:make(ClientId, Qos, Topic, Payload), + emqttd:publish(Msg#mqtt_message{retain = Retain}) + end, Topics), + Req:ok({"text/plain", <<"ok">>}); + {false, _} -> + Req:respond({400, [], <<"Bad QoS">>}); + {_, false} -> + Req:respond({400, [], <<"Bad Topics">>}) + end. + +topics(Params) -> + Tokens = [get_value("topic", Params) | string:tokens(get_value("topics", Params, ""), ",")], + [list_to_binary(Token) || Token <- Tokens, Token =/= undefined]. + +validate(qos, Qos) -> + (Qos >= ?QOS_0) and (Qos =< ?QOS_2); + +validate(topics, [Topic|Left]) -> + case validate(topic, Topic) of + true -> validate(topics, Left); + false -> false + end; +validate(topics, []) -> + true; + +validate(topic, Topic) -> + emqttd_topic:validate({name, Topic}). + %%-------------------------------------------------------------------- %% basic authorization %%-------------------------------------------------------------------- + authorized(Req) -> case Req:get_header_value("Authorization") of undefined -> @@ -118,11 +144,6 @@ authorized(Req) -> user_passwd(BasicAuth) -> list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). -validate(qos, Qos) -> - (Qos >= ?QOS_0) and (Qos =< ?QOS_2); - -validate(topic, Topic) -> - emqttd_topic:validate({name, Topic}). int(S) -> list_to_integer(S).