From 9343a7c4195aaa52ee5754c3923127ea0a8aab06 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 27 Apr 2015 22:37:00 +0800 Subject: [PATCH] websocket --- apps/emqttd/priv/www/index.html | 59 ++++++++++++++++++++++++ apps/emqttd/src/emqttd_http.erl | 68 +++++++++++++++++----------- apps/emqttd/src/emqttd_websocket.erl | 42 +++++++++++++++++ 3 files changed, 143 insertions(+), 26 deletions(-) create mode 100644 apps/emqttd/priv/www/index.html create mode 100644 apps/emqttd/src/emqttd_websocket.erl diff --git a/apps/emqttd/priv/www/index.html b/apps/emqttd/priv/www/index.html new file mode 100644 index 000000000..7ac1fda17 --- /dev/null +++ b/apps/emqttd/priv/www/index.html @@ -0,0 +1,59 @@ + + + + MQTT Over Mochiweb websocket + + +

MQTT Over Mochiweb websocket

+ +
+ +   State: +
+
Protip: open your javascript error console, just in case..
+
+
+
+ + +
+
+
+
+ + + + + diff --git a/apps/emqttd/src/emqttd_http.erl b/apps/emqttd/src/emqttd_http.erl index 06146c8c2..7f6f66685 100644 --- a/apps/emqttd/src/emqttd_http.erl +++ b/apps/emqttd/src/emqttd_http.erl @@ -20,7 +20,7 @@ %%% SOFTWARE. %%%----------------------------------------------------------------------------- %%% @doc -%%% emqttd http handler. +%%% emqttd http publish API and websocket client. %%% %%% @end %%%----------------------------------------------------------------------------- @@ -37,35 +37,47 @@ -export([handle/1]). handle(Req) -> - case authorized(Req) of - true -> - Path = Req:get(path), - Method = Req:get(method), - handle(Method, Path, Req); - false -> - Req:respond({401, [], <<"Fobbiden">>}) - end. + handle(Req:get(method), Req:get(path), Req). handle('POST', "/mqtt/publish", Req) -> Params = mochiweb_request:parse_post(Req), - lager:info("HTTP Publish: ~p~n", [Params]), - Qos = int(get_value("qos", Params, "0")), - Retain = bool(get_value("retain", Params, "0")), - Topic = list_to_binary(get_value("topic", Params)), - Message = list_to_binary(get_value("message", Params)), - case {validate(qos, Qos), validate(topic, Topic)} of - {true, true} -> - emqttd_pubsub:publish(http, #mqtt_message{qos = Qos, - retain = Retain, - topic = Topic, - payload = Message}), - Req:ok({"text/plan", <<"ok\n">>}); - {false, _} -> - Req:respond({400, [], <<"Bad QoS">>}); - {_, false} -> - Req:respond({400, [], <<"Bad Topic">>}) + lager:info("HTTP Publish: ~p~n", [Params]), + case authorized(Req) of + true -> + Qos = int(get_value("qos", Params, "0")), + Retain = bool(get_value("retain", Params, "0")), + Topic = list_to_binary(get_value("topic", Params)), + Message = list_to_binary(get_value("message", Params)), + case {validate(qos, Qos), validate(topic, Topic)} of + {true, true} -> + emqttd_pubsub:publish(http, #mqtt_message{qos = Qos, + retain = Retain, + topic = Topic, + payload = Message}), + Req:ok({"text/plan", <<"ok\n">>}); + {false, _} -> + Req:respond({400, [], <<"Bad QoS">>}); + {_, false} -> + Req:respond({400, [], <<"Bad Topic">>}) + end; + false -> + Req:respond({401, [], <<"Fobbiden">>}) + end; + + +handle(_Method, "/mqtt/wsocket", Req) -> + lager:info("Websocket Headers: ~p~n", [Req:get(headers)]), + Up = Req:get_header_value("Upgrade"), + case Up =/= undefined andalso string:to_lower(Up) =:= "websocket" of + true -> + emqttd_websocket:init(Req); + false -> + Req:respond({400, [], <<"Bad Request">>}) end; - + +handle('GET', "/mqtt/" ++ File, Req) -> + mochiweb_request:serve_file(File, docroot(), Req); + handle(_Method, _Path, Req) -> Req:not_found(). @@ -101,4 +113,8 @@ int(S) -> list_to_integer(S). bool("0") -> false; bool("1") -> true. +docroot() -> + {file, Here} = code:is_loaded(?MODULE), + Dir = filename:dirname(filename:dirname(Here)), + filename:join([Dir, "priv", "www"]). diff --git a/apps/emqttd/src/emqttd_websocket.erl b/apps/emqttd/src/emqttd_websocket.erl new file mode 100644 index 000000000..71e59f418 --- /dev/null +++ b/apps/emqttd/src/emqttd_websocket.erl @@ -0,0 +1,42 @@ +%%%----------------------------------------------------------------------------- +%%% @Copyright (C) 2012-2015, Feng Lee +%%% +%%% Permission is hereby granted, free of charge, to any person obtaining a copy +%%% of this software and associated documentation files (the "Software"), to deal +%%% in the Software without restriction, including without limitation the rights +%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +%%% copies of the Software, and to permit persons to whom the Software is +%%% furnished to do so, subject to the following conditions: +%%% +%%% The above copyright notice and this permission notice shall be included in all +%%% copies or substantial portions of the Software. +%%% +%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +%%% SOFTWARE. +%%%----------------------------------------------------------------------------- +%%% @doc +%%% emqttd websocket client. +%%% +%%% @end +%%%----------------------------------------------------------------------------- +-module(emqttd_websocket). + +-export([init/1, loop/3]). + +-record(state, {}). + +init(Req) -> + {ReentryWs, _ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:loop/3), + ReentryWs(#state{}). + +loop(Payload, State, ReplyChannel) -> + io:format("Received data: ~p~n", [Payload]), + ReplyChannel(Payload), + State. + +