diff --git a/etc/emqx.conf b/etc/emqx.conf index deb702211..3604f2efd 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1207,6 +1207,11 @@ listener.ssl.external.reuseaddr = true ## Examples: 8083, 127.0.0.1:8083, ::1:8083 listener.ws.external = 8083 +## The path of WebSocket MQTT endpoint +## +## Value: URL Path +listener.ws.external.mqtt_path = /mqtt + ## The acceptor pool for external MQTT/WebSocket listener. ## ## Value: Number @@ -1346,6 +1351,11 @@ listener.ws.external.nodelay = true ## Examples: 8084, 127.0.0.1:8084, ::1:8084 listener.wss.external = 8084 +## The path of WebSocket MQTT endpoint +## +## Value: URL Path +listener.wss.external.mqtt_path = /mqtt + ## The acceptor pool for external MQTT/WebSocket/SSL listener. ## ## Value: Number @@ -1829,7 +1839,7 @@ module.rewrite = off ## The etc dir for plugins' config. ## ## Value: Folder -plugins.etc_dir ={{ platform_etc_dir }}/plugins/ +plugins.etc_dir = {{ platform_etc_dir }}/plugins/ ## The file to store loaded plugin names. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index e9f4932c4..a8ed316c2 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1094,6 +1094,11 @@ end}. {datatype, [integer, ip]} ]}. +{mapping, "listener.ws.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} +]}. + {mapping, "listener.ws.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} @@ -1195,6 +1200,11 @@ end}. {datatype, [integer, ip]} ]}. +{mapping, "listener.wss.$name.mqtt_path", "emqx.listeners", [ + {default, "/mqtt"}, + {datatype, string} +]}. + {mapping, "listener.wss.$name.acceptors", "emqx.listeners", [ {default, 8}, {datatype, integer} @@ -1365,7 +1375,8 @@ end}. end, LisOpts = fun(Prefix) -> - Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, + {mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)}, {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)}, {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)}, {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 421304f3a..d3ef43069 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -51,12 +51,12 @@ start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls -> %% Start MQTT/WS listener start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch); %% Start MQTT/WSS listener start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss -> - Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]), + Dispatch = cowboy_router:compile([{'_', [{mqtt_path(Options), emqx_ws_connection, Options}]}]), start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch). start_mqtt_listener(Name, ListenOn, Options) -> @@ -67,6 +67,9 @@ start_mqtt_listener(Name, ListenOn, Options) -> start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) -> Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}). +mqtt_path(Options) -> + proplists:get_value(mqtt_path, Options, "/mqtt"). + ranch_opts(Options) -> NumAcceptors = proplists:get_value(acceptors, Options, 4), MaxConnections = proplists:get_value(max_connections, Options, 1024), diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 311bd58dd..91e5d4d59 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -22,7 +22,7 @@ -export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]). -export([set_headers/2]). -export([get_header/2, get_header/3, set_header/3]). --export([is_expired/1, check_expiry/1, check_expiry/2, update_expiry/1]). +-export([is_expired/1, update_expiry/1]). -export([format/1]). -type(flag() :: atom()). @@ -100,21 +100,6 @@ is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestam is_expired(_Msg) -> false. --spec(check_expiry(emqx_types:message()) -> {ok, pos_integer()} | expired | false). -check_expiry(Msg = #message{timestamp = CreatedAt}) -> - check_expiry(Msg, CreatedAt); -check_expiry(_Msg) -> - false. - --spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false). -check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) -> - case Interval - (elapsed(Since) div 1000) of - Timeout when Timeout > 0 -> {ok, Timeout}; - _ -> expired - end; -check_expiry(_Msg, _Since) -> - false. - update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) -> case elapsed(CreatedAt) of Elapsed when Elapsed > 0 -> @@ -138,4 +123,3 @@ format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> io_lib:format("~p", [Headers]). - diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index c2ca0f0aa..37207e40c 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -29,7 +29,7 @@ all() -> message_flag, message_header, message_format, - message_expired + message_expired ]. message_make(_) -> @@ -53,7 +53,7 @@ message_flag(_) -> ?assert(emqx_message:get_flag(dup, Msg6)), ?assert(emqx_message:get_flag(retain, Msg6)). -message_header(_) -> +message_header(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg), Msg2 = emqx_message:set_header(c, 3, Msg1), @@ -68,11 +68,8 @@ message_expired(_) -> Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg), timer:sleep(500), ?assertNot(emqx_message:is_expired(Msg1)), - {ok, 1} = emqx_message:check_expiry(Msg1), timer:sleep(600), ?assert(emqx_message:is_expired(Msg1)), - expired = emqx_message:check_expiry(Msg1), timer:sleep(1000), Msg2 = emqx_message:update_expiry(Msg1), ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). -