Merge pull request #1062 from emqtt/emq22

Version 2.2-beta.2
This commit is contained in:
turtleDeng 2017-05-20 11:03:40 +08:00 committed by GitHub
commit fbe4ee4eed
3 changed files with 35 additions and 9 deletions

View File

@ -119,6 +119,9 @@ mqtt.max_clientid_len = 1024
## Max Packet Size Allowed, 64K by default.
mqtt.max_packet_size = 64KB
## Check Websocket Protocol Header. Enum: on, off
mqtt.websocket_protocol_header = on
##--------------------------------------------------------------------
## MQTT Connection
##--------------------------------------------------------------------

View File

@ -345,6 +345,11 @@ end}.
{max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}]
end}.
{mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [
{default, on},
{datatype, flag}
]}.
%%--------------------------------------------------------------------
%% MQTT Connection
%%--------------------------------------------------------------------

View File

@ -59,7 +59,7 @@ handle_request('POST', "/mqtt/publish", Req) ->
handle_request('GET', "/mqtt", Req) ->
lager:info("WebSocket Connection from: ~s", [Req:get(peer)]),
Upgrade = Req:get_header_value("Upgrade"),
Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
Proto = check_protocol_header(Req),
case {is_websocket(Upgrade), Proto} of
{true, "mqtt" ++ _Vsn} ->
emqttd_ws:handle_request(Req);
@ -83,18 +83,30 @@ handle_request(Method, Path, Req) ->
lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]),
Req:not_found().
check_protocol_header(Req) ->
case emqttd:env(websocket_protocol_header, false) of
true -> get_protocol_header(Req);
false -> "mqtt-v3.1.1"
end.
get_protocol_header(Req) ->
case Req:get_header_value("EMQ-WebSocket-Protocol") of
undefined -> Req:get_header_value("Sec-WebSocket-Protocol");
Proto -> Proto
end.
%%--------------------------------------------------------------------
%% HTTP Publish
%%--------------------------------------------------------------------
http_publish(Req) ->
Params = mochiweb_request:parse_post(Req),
Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- mochiweb_request:parse_post(Req)],
lager:info("HTTP Publish: ~p", [Params]),
Topics = topics(Params),
ClientId = get_value("client", Params, http),
Qos = int(get_value("qos", Params, "0")),
Retain = bool(get_value("retain", Params, "0")),
Payload = list_to_binary(get_value("message", Params)),
ClientId = get_value(<<"client">>, Params, http),
Qos = int(get_value(<<"qos">>, Params, "0")),
Retain = bool(get_value(<<"retain">>, Params, "0")),
Payload = iolist_to_binary(get_value(<<"message">>, Params)),
case {validate(qos, Qos), validate(topics, Topics)} of
{true, true} ->
lists:foreach(fun(Topic) ->
@ -109,8 +121,8 @@ http_publish(Req) ->
end.
topics(Params) ->
Tokens = [get_value("topic", Params) | string:tokens(get_value("topics", Params, ""), ",")],
[list_to_binary(Token) || Token <- Tokens, Token =/= undefined].
Tokens = [get_value(<<"topic">>, Params) | string:tokens(get_value(<<"topics">>, Params, ""), ",")],
[iolist_to_binary(Token) || Token <- Tokens, Token =/= undefined].
validate(qos, Qos) ->
(Qos >= ?QOS_0) and (Qos =< ?QOS_2);
@ -151,10 +163,16 @@ authorized(Req) ->
user_passwd(BasicAuth) ->
list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)).
int(I) when is_integer(I)-> I;
int(B) when is_binary(B)-> binary_to_integer(B);
int(S) -> list_to_integer(S).
bool(0) -> false;
bool(1) -> true;
bool("0") -> false;
bool("1") -> true.
bool("1") -> true;
bool(<<"0">>) -> false;
bool(<<"1">>) -> true.
is_websocket(Upgrade) ->
Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".