diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 0816b013e..3c6a0c516 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -27,7 +27,7 @@ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.1"}}}, diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 710148b94..e2aeef6fc 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -57,7 +57,7 @@ -record(state, { %% TCP/SSL/UDP/DTLS Wrapped Socket - socket :: {esockd_transport, esockd:socket()} | {udp, _, _}, + socket :: {esockd_transport, esockd:socket()} | {udp, _, _} | {esockd_udp_proxy, _, _}, %% Peername of the connection peername :: emqx_types:peername(), %% Sockname of the connection @@ -122,6 +122,9 @@ start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) -> start_link(esockd_transport, Sock, Options) -> Socket = {esockd_transport, Sock}, Args = [self(), Socket, undefined, Options] ++ callback_modules(Options), + {ok, proc_lib:spawn_link(?MODULE, init, Args)}; +start_link(Socket = {esockd_udp_proxy, _ProxyId, _Sock}, Peername, Options) -> + Args = [self(), Socket, Peername, Options] ++ callback_modules(Options), {ok, proc_lib:spawn_link(?MODULE, init, Args)}. callback_modules(Options) -> @@ -196,10 +199,14 @@ esockd_peername({udp, _SockPid, _Sock}, Peername) -> Peername; esockd_peername({esockd_transport, Sock}, _Peername) -> {ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]), + Peername; +esockd_peername({esockd_udp_proxy, _ProxyId, _Sock}, Peername) -> Peername. esockd_wait(Socket = {udp, _SockPid, _Sock}) -> {ok, Socket}; +esockd_wait(Socket = {esockd_udp_proxy, _ProxyId, _Sock}) -> + {ok, Socket}; esockd_wait({esockd_transport, Sock}) -> case esockd_transport:wait(Sock) of {ok, NSock} -> {ok, {esockd_transport, NSock}}; @@ -211,29 +218,41 @@ esockd_close({udp, _SockPid, _Sock}) -> %%gen_udp:close(Sock); ok; esockd_close({esockd_transport, Sock}) -> - esockd_transport:fast_close(Sock). + esockd_transport:fast_close(Sock); +esockd_close({esockd_udp_proxy, ProxyId, _Sock}) -> + esockd_udp_proxy:close(ProxyId). esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) -> nossl; esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) -> esockd_transport:ensure_ok_or_exit(Fun, [Sock]); esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) -> - esockd_transport:ensure_ok_or_exit(Fun, [Socket]). + esockd_transport:ensure_ok_or_exit(Fun, [Socket]); +esockd_ensure_ok_or_exit(Fun, {esockd_udp_proxy, _ProxyId, Sock}) -> + esockd_transport:ensure_ok_or_exit(Fun, [Sock]). esockd_type({udp, _, _}) -> udp; esockd_type({esockd_transport, Socket}) -> - esockd_transport:type(Socket). + esockd_transport:type(Socket); +esockd_type({esockd_udp_proxy, _ProxyId, Sock}) when is_port(Sock) -> + udp; +esockd_type({esockd_udp_proxy, _ProxyId, _Sock}) -> + ssl. esockd_setopts({udp, _, _}, _) -> ok; esockd_setopts({esockd_transport, Socket}, Opts) -> %% FIXME: DTLS works?? + esockd_transport:setopts(Socket, Opts); +esockd_setopts({esockd_udp_proxy, _ProxyId, Socket}, Opts) -> esockd_transport:setopts(Socket, Opts). esockd_getstat({udp, _SockPid, Sock}, Stats) -> inet:getstat(Sock, Stats); esockd_getstat({esockd_transport, Sock}, Stats) -> + esockd_transport:getstat(Sock, Stats); +esockd_getstat({esockd_udp_proxy, _ProxyId, Sock}, Stats) -> esockd_transport:getstat(Sock, Stats). esockd_send(Data, #state{ @@ -242,7 +261,9 @@ esockd_send(Data, #state{ }) -> gen_udp:send(Sock, Ip, Port, Data); esockd_send(Data, #state{socket = {esockd_transport, Sock}}) -> - esockd_transport:send(Sock, Data). + esockd_transport:send(Sock, Data); +esockd_send(Data, #state{socket = {esockd_udp_proxy, ProxyId, _Sock}}) -> + esockd_udp_proxy:send(ProxyId, Data). keepalive_stats(recv) -> emqx_pd:get_counter(recv_pkt); @@ -250,7 +271,8 @@ keepalive_stats(send) -> emqx_pd:get_counter(send_pkt). is_datadram_socket({esockd_transport, _}) -> false; -is_datadram_socket({udp, _, _}) -> true. +is_datadram_socket({udp, _, _}) -> true; +is_datadram_socket({esockd_udp_proxy, _ProxyId, Sock}) -> erlang:is_port(Sock). %%-------------------------------------------------------------------- %% callbacks @@ -461,6 +483,21 @@ handle_msg({'$gen_cast', Req}, State) -> with_channel(handle_cast, [Req], State); handle_msg({datagram, _SockPid, Data}, State) -> parse_incoming(Data, State); +handle_msg( + {{esockd_udp_proxy, _ProxyId, _Socket} = NSock, Data, Packets}, + State = #state{ + chann_mod = ChannMod, + channel = Channel + } +) -> + ?SLOG(debug, #{msg => "RECV_data", data => Data}), + Oct = iolist_size(Data), + inc_counter(incoming_bytes, Oct), + Ctx = ChannMod:info(ctx, Channel), + ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct), + + NState = State#state{socket = NSock}, + {ok, next_incoming_msgs(Packets), NState}; handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl @@ -508,6 +545,9 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) -> handle_msg({close, Reason}, State) -> ?tp(debug, force_socket_close, #{reason => Reason}), handle_info({sock_closed, Reason}, close_socket(State)); +handle_msg(udp_proxy_closed, State) -> + ?tp(debug, udp_proxy_closed, #{reason => normal}), + handle_info({sock_closed, normal}, close_socket(State)); handle_msg( {event, connected}, State = #state{ diff --git a/apps/emqx_gateway/src/emqx_gateway.app.src b/apps/emqx_gateway/src/emqx_gateway.app.src index 25a81801b..6c1a3a8a6 100644 --- a/apps/emqx_gateway/src/emqx_gateway.app.src +++ b/apps/emqx_gateway/src/emqx_gateway.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_gateway, [ {description, "The Gateway management application"}, - {vsn, "0.1.33"}, + {vsn, "0.1.34"}, {registered, []}, {mod, {emqx_gateway_app, []}}, {applications, [ diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 00555c67a..11488d1a3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -139,6 +139,16 @@ fields(websocket) -> fields(udp_listener) -> [ %% some special configs for udp listener + {health_check, + sc( + ref(udp_health_check), + #{ + desc => ?DESC( + udp_health_check + ), + required => false + } + )} ] ++ udp_opts() ++ common_listener_opts(); @@ -175,7 +185,12 @@ fields(dtls_opts) -> versions => dtls_all_available }, _IsRanchListener = false - ). + ); +fields(udp_health_check) -> + [ + {request, sc(binary(), #{desc => ?DESC(udp_health_check_request), required => false})}, + {reply, sc(binary(), #{desc => ?DESC(udp_health_check_reply), required => false})} + ]. desc(gateway) -> "EMQX Gateway configuration root."; @@ -201,6 +216,8 @@ desc(dtls_opts) -> "Settings for DTLS protocol."; desc(websocket) -> "Websocket options"; +desc(udp_health_check) -> + "UDP health check"; desc(_) -> undefined. diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index e6a5be8ab..88a537613 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -151,7 +151,12 @@ find_sup_child(Sup, ChildId) -> {ok, [pid()]} | {error, term()} when - ModCfg :: #{frame_mod := atom(), chann_mod := atom(), connection_mod => atom()}. + ModCfg :: #{ + frame_mod := atom(), + chann_mod := atom(), + connection_mod => atom(), + esockd_proxy_opts => map() + }. start_listeners(Listeners, GwName, Ctx, ModCfg) -> start_listeners(Listeners, GwName, Ctx, ModCfg, []). @@ -519,7 +524,8 @@ esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) -> max_connections, max_conn_rate, proxy_protocol, - proxy_protocol_timeout + proxy_protocol_timeout, + health_check ], Opts0 ), diff --git a/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl index 936856c2e..fb7cc49d3 100644 --- a/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_authn_SUITE.erl @@ -91,6 +91,7 @@ end_per_suite(Config) -> %%------------------------------------------------------------------------------ t_case_coap(_) -> + emqx_coap_SUITE:restart_coap_with_connection_mode(false), Login = fun(URI, Checker) -> Action = fun(Channel) -> Req = emqx_coap_SUITE:make_req(post), diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index 844677d12..a10fa04b2 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -410,19 +410,6 @@ is_create_connection_request(Msg = #coap_message{method = Method}) when is_create_connection_request(_Msg) -> false. -is_delete_connection_request(Msg = #coap_message{method = Method}) when - is_atom(Method) andalso Method =/= undefined --> - URIPath = emqx_coap_message:get_option(uri_path, Msg, []), - case URIPath of - [<<"mqtt">>, <<"connection">>] when Method == delete -> - true; - _ -> - false - end; -is_delete_connection_request(_Msg) -> - false. - check_token( Msg, #channel{ @@ -430,7 +417,6 @@ check_token( clientinfo = ClientInfo } = Channel ) -> - IsDeleteConn = is_delete_connection_request(Msg), #{clientid := ClientId} = ClientInfo, case emqx_coap_message:extract_uri_query(Msg) of #{ @@ -438,39 +424,10 @@ check_token( <<"token">> := Token } -> call_session(handle_request, Msg, Channel); - #{<<"clientid">> := ReqClientId, <<"token">> := ReqToken} -> - case emqx_gateway_cm:call(coap, ReqClientId, {check_token, ReqToken}) of - undefined when IsDeleteConn -> - Reply = emqx_coap_message:piggyback({ok, deleted}, Msg), - {shutdown, normal, Reply, Channel}; - undefined -> - ?SLOG(info, #{ - msg => "remote_connection_not_found", - clientid => ReqClientId, - token => ReqToken - }), - Reply = emqx_coap_message:reset(Msg), - {shutdown, normal, Reply, Channel}; - false -> - ?SLOG(info, #{ - msg => "request_token_invalid", clientid => ReqClientId, token => ReqToken - }), - Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg), - {shutdown, normal, Reply, Channel}; - true -> - %% hack: since each message request can spawn a new connection - %% process, we can't rely on the `inc_incoming_stats' call in - %% `emqx_gateway_conn:handle_incoming' to properly keep track of - %% bumping incoming requests for an existing channel. Since this - %% number is used by keepalive, we have to bump it inside the - %% requested channel/connection pid so heartbeats actually work. - emqx_gateway_cm:cast(coap, ReqClientId, inc_recv_pkt), - call_session(handle_request, Msg, Channel) - end; _ -> ErrMsg = <<"Missing token or clientid in connection mode">>, Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg), - {shutdown, normal, Reply, Channel} + {ok, {outgoing, Reply}, Channel} end. run_conn_hooks( @@ -785,6 +742,7 @@ process_connection( ) when ConnState == connected -> + %% TODO should take over the session here Queries = emqx_coap_message:extract_uri_query(Req), ErrMsg0 = case Queries of diff --git a/apps/emqx_gateway_coap/src/emqx_coap_proxy_conn.erl b/apps/emqx_gateway_coap/src/emqx_coap_proxy_conn.erl new file mode 100644 index 000000000..703450d7e --- /dev/null +++ b/apps/emqx_gateway_coap/src/emqx_coap_proxy_conn.erl @@ -0,0 +1,59 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_coap_proxy_conn). + +-behaviour(esockd_udp_proxy_connection). + +-include("emqx_coap.hrl"). + +-export([initialize/1, create/3, get_connection_id/4, dispatch/3, close/2]). + +%%-------------------------------------------------------------------- +%% Callbacks +%%-------------------------------------------------------------------- +initialize(_Opts) -> + emqx_coap_frame:initial_parse_state(#{}). + +create(Transport, Peer, Opts) -> + emqx_gateway_conn:start_link(Transport, Peer, Opts). + +get_connection_id(_Transport, _Peer, State, Data) -> + case parse_incoming(Data, [], State) of + {[Msg | _] = Packets, NState} -> + case emqx_coap_message:extract_uri_query(Msg) of + #{ + <<"clientid">> := ClientId + } -> + {ok, ClientId, Packets, NState}; + _ -> + invalid + end; + _Error -> + invalid + end. + +dispatch(Pid, _State, Packet) -> + erlang:send(Pid, Packet). + +close(Pid, _State) -> + erlang:send(Pid, udp_proxy_closed). + +parse_incoming(<<>>, Packets, State) -> + {Packets, State}; +parse_incoming(Data, Packets, State) -> + {ok, Packet, Rest, NParseState} = emqx_coap_frame:parse(Data, State), + parse_incoming(Rest, [Packet | Packets], NParseState). diff --git a/apps/emqx_gateway_coap/src/emqx_gateway_coap.erl b/apps/emqx_gateway_coap/src/emqx_gateway_coap.erl index c92103bfc..cc18c3351 100644 --- a/apps/emqx_gateway_coap/src/emqx_gateway_coap.erl +++ b/apps/emqx_gateway_coap/src/emqx_gateway_coap.erl @@ -20,7 +20,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx_gateway/include/emqx_gateway.hrl"). -%% define a gateway named stomp +%% define a gateway named coap -gateway(#{ name => coap, callback_module => ?MODULE, @@ -58,10 +58,11 @@ on_gateway_load( Ctx ) -> Listeners = normalize_config(Config), - ModCfg = #{ + ModCfg = maps:merge(connection_opts(Config), #{ frame_mod => emqx_coap_frame, chann_mod => emqx_coap_channel - }, + }), + case start_listeners( Listeners, GwName, Ctx, ModCfg @@ -105,3 +106,13 @@ on_gateway_unload( ) -> Listeners = normalize_config(Config), stop_listeners(GwName, Listeners). + +connection_opts(#{connection_required := false}) -> + #{}; +connection_opts(_) -> + #{ + connection_mod => esockd_udp_proxy, + esockd_proxy_opts => #{ + connection_mod => emqx_coap_proxy_conn + } + }. diff --git a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl index bd403a463..bb244c8d4 100644 --- a/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl +++ b/apps/emqx_gateway_coap/test/emqx_coap_SUITE.erl @@ -330,7 +330,8 @@ t_publish(_) -> ?assertEqual(Payload, Msg#message.payload) after 500 -> ?assert(false) - end + end, + true end, with_connection(Topics, Action). @@ -360,7 +361,9 @@ t_publish_with_retain_qos_expiry(_) -> ?assertEqual(Payload, Msg#message.payload) after 500 -> ?assert(false) - end + end, + + true end, with_connection(Topics, Action), @@ -392,7 +395,8 @@ t_subscribe(_) -> #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) + ?assertEqual(Payload, PayloadRecv), + true end, with_connection(Topics, Fun), @@ -431,7 +435,8 @@ t_subscribe_with_qos_opt(_) -> #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) + ?assertEqual(Payload, PayloadRecv), + true end, with_connection(Topics, Fun), @@ -468,7 +473,8 @@ t_un_subscribe(_) -> {ok, nocontent, _} = do_request(Channel, URI, UnReq), ?LOGT("un observer topic:~ts~n", [Topic]), timer:sleep(100), - ?assertEqual([], emqx:subscribers(Topic)) + ?assertEqual([], emqx:subscribers(Topic)), + true end, with_connection(Topics, Fun). @@ -497,7 +503,8 @@ t_observe_wildcard(_) -> #coap_content{payload = PayloadRecv} = Notify, - ?assertEqual(Payload, PayloadRecv) + ?assertEqual(Payload, PayloadRecv), + true end, with_connection(Fun). @@ -530,7 +537,8 @@ t_clients_api(_) -> {204, _} = request(delete, "/gateways/coap/clients/client1"), timer:sleep(200), - {200, #{data := []}} = request(get, "/gateways/coap/clients") + {200, #{data := []}} = request(get, "/gateways/coap/clients"), + false end, with_connection(Fun). @@ -560,7 +568,8 @@ t_clients_subscription_api(_) -> {204, _} = request(delete, Path ++ "/tx"), - {200, []} = request(get, Path) + {200, []} = request(get, Path), + true end, with_connection(Fun). @@ -578,7 +587,8 @@ t_clients_get_subscription_api(_) -> observe(Channel, Token, false), - {200, []} = request(get, Path) + {200, []} = request(get, Path), + true end, with_connection(Fun). @@ -773,8 +783,7 @@ with_connection(Action) -> Fun = fun(Channel) -> Token = connection(Channel), timer:sleep(100), - Action(Channel, Token), - disconnection(Channel, Token), + _ = Action(Channel, Token) andalso disconnection(Channel, Token), timer:sleep(100) end, do(Fun). diff --git a/mix.exs b/mix.exs index 313059bde..53e5b304f 100644 --- a/mix.exs +++ b/mix.exs @@ -175,7 +175,7 @@ defmodule EMQXUmbrella.MixProject do end def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true} - def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true} + def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.3", override: true} def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true} def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.1", override: true} def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true} diff --git a/rebar.config b/rebar.config index 8f689ea3d..115bed3e9 100644 --- a/rebar.config +++ b/rebar.config @@ -81,7 +81,7 @@ {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}}, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}}, diff --git a/rel/i18n/emqx_gateway_schema.hocon b/rel/i18n/emqx_gateway_schema.hocon index 1a04d57ac..9f7bd8462 100644 --- a/rel/i18n/emqx_gateway_schema.hocon +++ b/rel/i18n/emqx_gateway_schema.hocon @@ -195,4 +195,13 @@ Relevant when the EMQX cluster is deployed behind a load-balancer.""" fields_ws_opts_proxy_address_header.label: """Proxy address header""" +udp_health_check.desc: +"""Some Cloud platform use a `request-reply` mechanism to check whether a UDP port is healthy, here can configure this pair.""" + +udp_health_check_request.desc: +"""The content of the request.""" + +udp_health_check_reply.desc: +"""The content to reply.""" + }