diff --git a/etc/emq.conf b/etc/emq.conf index 41d871a2c..8c155882f 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -119,6 +119,9 @@ mqtt.max_clientid_len = 1024 ## Max Packet Size Allowed, 64K by default. mqtt.max_packet_size = 64KB +## Check Websocket Protocol Header. Enum: on, off +mqtt.websocket_protocol_header = on + ##-------------------------------------------------------------------- ## MQTT Connection ##-------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index ddddfff24..b5e03a359 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -345,6 +345,11 @@ end}. {max_packet_size, cuttlefish:conf_get("mqtt.max_packet_size", Conf)}] end}. +{mapping, "mqtt.websocket_protocol_header", "emqttd.websocket_protocol_header", [ + {default, on}, + {datatype, flag} +]}. + %%-------------------------------------------------------------------- %% MQTT Connection %%-------------------------------------------------------------------- diff --git a/src/emqttd_http.erl b/src/emqttd_http.erl index 25b7c3d23..9b50cf34a 100644 --- a/src/emqttd_http.erl +++ b/src/emqttd_http.erl @@ -59,7 +59,7 @@ handle_request('POST', "/mqtt/publish", Req) -> handle_request('GET', "/mqtt", Req) -> lager:info("WebSocket Connection from: ~s", [Req:get(peer)]), Upgrade = Req:get_header_value("Upgrade"), - Proto = Req:get_header_value("Sec-WebSocket-Protocol"), + Proto = check_protocol_header(Req), case {is_websocket(Upgrade), Proto} of {true, "mqtt" ++ _Vsn} -> emqttd_ws:handle_request(Req); @@ -83,18 +83,30 @@ handle_request(Method, Path, Req) -> lager:error("Unexpected HTTP Request: ~s ~s", [Method, Path]), Req:not_found(). +check_protocol_header(Req) -> + case emqttd:env(websocket_protocol_header, false) of + true -> get_protocol_header(Req); + false -> "mqtt-v3.1.1" + end. + +get_protocol_header(Req) -> + case Req:get_header_value("EMQ-WebSocket-Protocol") of + undefined -> Req:get_header_value("Sec-WebSocket-Protocol"); + Proto -> Proto + end. + %%-------------------------------------------------------------------- %% HTTP Publish %%-------------------------------------------------------------------- http_publish(Req) -> - Params = mochiweb_request:parse_post(Req), + Params = [{iolist_to_binary(Key), Val} || {Key, Val} <- mochiweb_request:parse_post(Req)], lager:info("HTTP Publish: ~p", [Params]), Topics = topics(Params), - ClientId = get_value("client", Params, http), - Qos = int(get_value("qos", Params, "0")), - Retain = bool(get_value("retain", Params, "0")), - Payload = list_to_binary(get_value("message", Params)), + ClientId = get_value(<<"client">>, Params, http), + Qos = int(get_value(<<"qos">>, Params, "0")), + Retain = bool(get_value(<<"retain">>, Params, "0")), + Payload = iolist_to_binary(get_value(<<"message">>, Params)), case {validate(qos, Qos), validate(topics, Topics)} of {true, true} -> lists:foreach(fun(Topic) -> @@ -109,8 +121,8 @@ http_publish(Req) -> end. topics(Params) -> - Tokens = [get_value("topic", Params) | string:tokens(get_value("topics", Params, ""), ",")], - [list_to_binary(Token) || Token <- Tokens, Token =/= undefined]. + Tokens = [get_value(<<"topic">>, Params) | string:tokens(get_value(<<"topics">>, Params, ""), ",")], + [iolist_to_binary(Token) || Token <- Tokens, Token =/= undefined]. validate(qos, Qos) -> (Qos >= ?QOS_0) and (Qos =< ?QOS_2); @@ -151,10 +163,16 @@ authorized(Req) -> user_passwd(BasicAuth) -> list_to_tuple(binary:split(base64:decode(BasicAuth), <<":">>)). +int(I) when is_integer(I)-> I; +int(B) when is_binary(B)-> binary_to_integer(B); int(S) -> list_to_integer(S). +bool(0) -> false; +bool(1) -> true; bool("0") -> false; -bool("1") -> true. +bool("1") -> true; +bool(<<"0">>) -> false; +bool(<<"1">>) -> true. is_websocket(Upgrade) -> Upgrade =/= undefined andalso string:to_lower(Upgrade) =:= "websocket".