Merge branch 'emqx30' of github.com:emqx/emqx into emqx30

This commit is contained in:
Feng Lee 2018-09-16 20:50:35 +08:00
commit ec061f7a21
5 changed files with 31 additions and 26 deletions

View File

@ -1207,6 +1207,11 @@ listener.ssl.external.reuseaddr = true
## Examples: 8083, 127.0.0.1:8083, ::1:8083 ## Examples: 8083, 127.0.0.1:8083, ::1:8083
listener.ws.external = 8083 listener.ws.external = 8083
## The path of WebSocket MQTT endpoint
##
## Value: URL Path
listener.ws.external.mqtt_path = /mqtt
## The acceptor pool for external MQTT/WebSocket listener. ## The acceptor pool for external MQTT/WebSocket listener.
## ##
## Value: Number ## Value: Number
@ -1346,6 +1351,11 @@ listener.ws.external.nodelay = true
## Examples: 8084, 127.0.0.1:8084, ::1:8084 ## Examples: 8084, 127.0.0.1:8084, ::1:8084
listener.wss.external = 8084 listener.wss.external = 8084
## The path of WebSocket MQTT endpoint
##
## Value: URL Path
listener.wss.external.mqtt_path = /mqtt
## The acceptor pool for external MQTT/WebSocket/SSL listener. ## The acceptor pool for external MQTT/WebSocket/SSL listener.
## ##
## Value: Number ## Value: Number
@ -1829,7 +1839,7 @@ module.rewrite = off
## The etc dir for plugins' config. ## The etc dir for plugins' config.
## ##
## Value: Folder ## Value: Folder
plugins.etc_dir ={{ platform_etc_dir }}/plugins/ plugins.etc_dir = {{ platform_etc_dir }}/plugins/
## The file to store loaded plugin names. ## The file to store loaded plugin names.
## ##

View File

@ -1094,6 +1094,11 @@ end}.
{datatype, [integer, ip]} {datatype, [integer, ip]}
]}. ]}.
{mapping, "listener.ws.$name.mqtt_path", "emqx.listeners", [
{default, "/mqtt"},
{datatype, string}
]}.
{mapping, "listener.ws.$name.acceptors", "emqx.listeners", [ {mapping, "listener.ws.$name.acceptors", "emqx.listeners", [
{default, 8}, {default, 8},
{datatype, integer} {datatype, integer}
@ -1195,6 +1200,11 @@ end}.
{datatype, [integer, ip]} {datatype, [integer, ip]}
]}. ]}.
{mapping, "listener.wss.$name.mqtt_path", "emqx.listeners", [
{default, "/mqtt"},
{datatype, string}
]}.
{mapping, "listener.wss.$name.acceptors", "emqx.listeners", [ {mapping, "listener.wss.$name.acceptors", "emqx.listeners", [
{default, 8}, {default, 8},
{datatype, integer} {datatype, integer}
@ -1365,7 +1375,8 @@ end}.
end, end,
LisOpts = fun(Prefix) -> LisOpts = fun(Prefix) ->
Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
{mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)},
{max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)},
{max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)},
{tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)},

View File

@ -51,12 +51,12 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls ->
%% Start MQTT/WS listener %% Start MQTT/WS listener
start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]),
start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch); start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch);
%% Start MQTT/WSS listener %% Start MQTT/WSS listener
start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]),
start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch). start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch).
start_mqtt_listener(Name, ListenOn, Options) -> start_mqtt_listener(Name, ListenOn, Options) ->
@ -67,6 +67,9 @@ start_mqtt_listener(Name, ListenOn, Options) ->
start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) ->
Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}).
mqtt_path(Options) ->
proplists:get_value(mqtt_path, Options, "/mqtt").
ranch_opts(Options) -> ranch_opts(Options) ->
NumAcceptors = proplists:get_value(acceptors, Options, 4), NumAcceptors = proplists:get_value(acceptors, Options, 4),
MaxConnections = proplists:get_value(max_connections, Options, 1024), MaxConnections = proplists:get_value(max_connections, Options, 1024),

View File

@ -22,7 +22,7 @@
-export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). -export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]).
-export([set_headers/2]). -export([set_headers/2]).
-export([get_header/2, get_header/3, set_header/3]). -export([get_header/2, get_header/3, set_header/3]).
-export([is_expired/1, check_expiry/1, check_expiry/2, update_expiry/1]). -export([is_expired/1, update_expiry/1]).
-export([format/1]). -export([format/1]).
-type(flag() :: atom()). -type(flag() :: atom()).
@ -100,21 +100,6 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam
is_expired(_Msg) -> is_expired(_Msg) ->
false. false.
-spec(check_expiry(emqx_types:message()) -> {ok, pos_integer()} | expired | false).
check_expiry(Msg = #message{timestamp = CreatedAt}) ->
check_expiry(Msg, CreatedAt);
check_expiry(_Msg) ->
false.
-spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false).
check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) ->
case Interval - (elapsed(Since) div 1000) of
Timeout when Timeout > 0 -> {ok, Timeout};
_ -> expired
end;
check_expiry(_Msg, _Since) ->
false.
update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
case elapsed(CreatedAt) of case elapsed(CreatedAt) of
Elapsed when Elapsed > 0 -> Elapsed when Elapsed > 0 ->
@ -138,4 +123,3 @@ format(flags, Flags) ->
io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]);
format(headers, Headers) -> format(headers, Headers) ->
io_lib:format("~p", [Headers]). io_lib:format("~p", [Headers]).

View File

@ -68,11 +68,8 @@ message_expired(_) ->
Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg), Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg),
timer:sleep(500), timer:sleep(500),
?assertNot(emqx_message:is_expired(Msg1)), ?assertNot(emqx_message:is_expired(Msg1)),
{ok, 1} = emqx_message:check_expiry(Msg1),
timer:sleep(600), timer:sleep(600),
?assert(emqx_message:is_expired(Msg1)), ?assert(emqx_message:is_expired(Msg1)),
expired = emqx_message:check_expiry(Msg1),
timer:sleep(1000), timer:sleep(1000),
Msg2 = emqx_message:update_expiry(Msg1), Msg2 = emqx_message:update_expiry(Msg1),
?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)).