Merge pull request #13441 from lafirest/feat/coap

feat(coap): use content-sensitive udp proxy for coap
This commit is contained in:
lafirest 2024-07-17 10:01:51 +08:00 committed by GitHub
commit 6697035812
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 206 additions and 62 deletions

View File

@ -27,7 +27,7 @@
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.43.1"}}},

View File

@ -57,7 +57,7 @@
-record(state, {
%% TCP/SSL/UDP/DTLS Wrapped Socket
socket :: {esockd_transport, esockd:socket()} | {udp, _, _},
socket :: {esockd_transport, esockd:socket()} | {udp, _, _} | {esockd_udp_proxy, _, _},
%% Peername of the connection
peername :: emqx_types:peername(),
%% Sockname of the connection
@ -122,6 +122,9 @@ start_link(Socket = {udp, _SockPid, _Sock}, Peername, Options) ->
start_link(esockd_transport, Sock, Options) ->
Socket = {esockd_transport, Sock},
Args = [self(), Socket, undefined, Options] ++ callback_modules(Options),
{ok, proc_lib:spawn_link(?MODULE, init, Args)};
start_link(Socket = {esockd_udp_proxy, _ProxyId, _Sock}, Peername, Options) ->
Args = [self(), Socket, Peername, Options] ++ callback_modules(Options),
{ok, proc_lib:spawn_link(?MODULE, init, Args)}.
callback_modules(Options) ->
@ -196,10 +199,14 @@ esockd_peername({udp, _SockPid, _Sock}, Peername) ->
Peername;
esockd_peername({esockd_transport, Sock}, _Peername) ->
{ok, Peername} = esockd_transport:ensure_ok_or_exit(peername, [Sock]),
Peername;
esockd_peername({esockd_udp_proxy, _ProxyId, _Sock}, Peername) ->
Peername.
esockd_wait(Socket = {udp, _SockPid, _Sock}) ->
{ok, Socket};
esockd_wait(Socket = {esockd_udp_proxy, _ProxyId, _Sock}) ->
{ok, Socket};
esockd_wait({esockd_transport, Sock}) ->
case esockd_transport:wait(Sock) of
{ok, NSock} -> {ok, {esockd_transport, NSock}};
@ -211,29 +218,41 @@ esockd_close({udp, _SockPid, _Sock}) ->
%%gen_udp:close(Sock);
ok;
esockd_close({esockd_transport, Sock}) ->
esockd_transport:fast_close(Sock).
esockd_transport:fast_close(Sock);
esockd_close({esockd_udp_proxy, ProxyId, _Sock}) ->
esockd_udp_proxy:close(ProxyId).
esockd_ensure_ok_or_exit(peercert, {udp, _SockPid, _Sock}) ->
nossl;
esockd_ensure_ok_or_exit(Fun, {udp, _SockPid, Sock}) ->
esockd_transport:ensure_ok_or_exit(Fun, [Sock]);
esockd_ensure_ok_or_exit(Fun, {esockd_transport, Socket}) ->
esockd_transport:ensure_ok_or_exit(Fun, [Socket]).
esockd_transport:ensure_ok_or_exit(Fun, [Socket]);
esockd_ensure_ok_or_exit(Fun, {esockd_udp_proxy, _ProxyId, Sock}) ->
esockd_transport:ensure_ok_or_exit(Fun, [Sock]).
esockd_type({udp, _, _}) ->
udp;
esockd_type({esockd_transport, Socket}) ->
esockd_transport:type(Socket).
esockd_transport:type(Socket);
esockd_type({esockd_udp_proxy, _ProxyId, Sock}) when is_port(Sock) ->
udp;
esockd_type({esockd_udp_proxy, _ProxyId, _Sock}) ->
ssl.
esockd_setopts({udp, _, _}, _) ->
ok;
esockd_setopts({esockd_transport, Socket}, Opts) ->
%% FIXME: DTLS works??
esockd_transport:setopts(Socket, Opts);
esockd_setopts({esockd_udp_proxy, _ProxyId, Socket}, Opts) ->
esockd_transport:setopts(Socket, Opts).
esockd_getstat({udp, _SockPid, Sock}, Stats) ->
inet:getstat(Sock, Stats);
esockd_getstat({esockd_transport, Sock}, Stats) ->
esockd_transport:getstat(Sock, Stats);
esockd_getstat({esockd_udp_proxy, _ProxyId, Sock}, Stats) ->
esockd_transport:getstat(Sock, Stats).
esockd_send(Data, #state{
@ -242,7 +261,9 @@ esockd_send(Data, #state{
}) ->
gen_udp:send(Sock, Ip, Port, Data);
esockd_send(Data, #state{socket = {esockd_transport, Sock}}) ->
esockd_transport:send(Sock, Data).
esockd_transport:send(Sock, Data);
esockd_send(Data, #state{socket = {esockd_udp_proxy, ProxyId, _Sock}}) ->
esockd_udp_proxy:send(ProxyId, Data).
keepalive_stats(recv) ->
emqx_pd:get_counter(recv_pkt);
@ -250,7 +271,8 @@ keepalive_stats(send) ->
emqx_pd:get_counter(send_pkt).
is_datadram_socket({esockd_transport, _}) -> false;
is_datadram_socket({udp, _, _}) -> true.
is_datadram_socket({udp, _, _}) -> true;
is_datadram_socket({esockd_udp_proxy, _ProxyId, Sock}) -> erlang:is_port(Sock).
%%--------------------------------------------------------------------
%% callbacks
@ -461,6 +483,21 @@ handle_msg({'$gen_cast', Req}, State) ->
with_channel(handle_cast, [Req], State);
handle_msg({datagram, _SockPid, Data}, State) ->
parse_incoming(Data, State);
handle_msg(
{{esockd_udp_proxy, _ProxyId, _Socket} = NSock, Data, Packets},
State = #state{
chann_mod = ChannMod,
channel = Channel
}
) ->
?SLOG(debug, #{msg => "RECV_data", data => Data}),
Oct = iolist_size(Data),
inc_counter(incoming_bytes, Oct),
Ctx = ChannMod:info(ctx, Channel),
ok = emqx_gateway_ctx:metrics_inc(Ctx, 'bytes.received', Oct),
NState = State#state{socket = NSock},
{ok, next_incoming_msgs(Packets), NState};
handle_msg({Inet, _Sock, Data}, State) when
Inet == tcp;
Inet == ssl
@ -508,6 +545,9 @@ handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_msg({close, Reason}, State) ->
?tp(debug, force_socket_close, #{reason => Reason}),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg(udp_proxy_closed, State) ->
?tp(debug, udp_proxy_closed, #{reason => normal}),
handle_info({sock_closed, normal}, close_socket(State));
handle_msg(
{event, connected},
State = #state{

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway, [
{description, "The Gateway management application"},
{vsn, "0.1.33"},
{vsn, "0.1.34"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [

View File

@ -139,6 +139,16 @@ fields(websocket) ->
fields(udp_listener) ->
[
%% some special configs for udp listener
{health_check,
sc(
ref(udp_health_check),
#{
desc => ?DESC(
udp_health_check
),
required => false
}
)}
] ++
udp_opts() ++
common_listener_opts();
@ -175,7 +185,12 @@ fields(dtls_opts) ->
versions => dtls_all_available
},
_IsRanchListener = false
).
);
fields(udp_health_check) ->
[
{request, sc(binary(), #{desc => ?DESC(udp_health_check_request), required => false})},
{reply, sc(binary(), #{desc => ?DESC(udp_health_check_reply), required => false})}
].
desc(gateway) ->
"EMQX Gateway configuration root.";
@ -201,6 +216,8 @@ desc(dtls_opts) ->
"Settings for DTLS protocol.";
desc(websocket) ->
"Websocket options";
desc(udp_health_check) ->
"UDP health check";
desc(_) ->
undefined.

View File

@ -151,7 +151,12 @@ find_sup_child(Sup, ChildId) ->
{ok, [pid()]}
| {error, term()}
when
ModCfg :: #{frame_mod := atom(), chann_mod := atom(), connection_mod => atom()}.
ModCfg :: #{
frame_mod := atom(),
chann_mod := atom(),
connection_mod => atom(),
esockd_proxy_opts => map()
}.
start_listeners(Listeners, GwName, Ctx, ModCfg) ->
start_listeners(Listeners, GwName, Ctx, ModCfg, []).
@ -519,7 +524,8 @@ esockd_opts(Type, Opts0) when ?IS_ESOCKD_LISTENER(Type) ->
max_connections,
max_conn_rate,
proxy_protocol,
proxy_protocol_timeout
proxy_protocol_timeout,
health_check
],
Opts0
),

View File

@ -91,6 +91,7 @@ end_per_suite(Config) ->
%%------------------------------------------------------------------------------
t_case_coap(_) ->
emqx_coap_SUITE:restart_coap_with_connection_mode(false),
Login = fun(URI, Checker) ->
Action = fun(Channel) ->
Req = emqx_coap_SUITE:make_req(post),

View File

@ -97,7 +97,8 @@ t_case_coap_publish(_) ->
end,
Case = fun(Channel, Token) ->
Fun(Channel, Token, <<"/publish">>, ?checkMatch({ok, changed, _})),
Fun(Channel, Token, <<"/badpublish">>, ?checkMatch({error, uauthorized}))
Fun(Channel, Token, <<"/badpublish">>, ?checkMatch({error, uauthorized})),
true
end,
Mod:with_connection(Case).
@ -113,7 +114,8 @@ t_case_coap_subscribe(_) ->
end,
Case = fun(Channel, Token) ->
Fun(Channel, Token, <<"/subscribe">>, ?checkMatch({ok, content, _})),
Fun(Channel, Token, <<"/badsubscribe">>, ?checkMatch({error, uauthorized}))
Fun(Channel, Token, <<"/badsubscribe">>, ?checkMatch({error, uauthorized})),
true
end,
Mod:with_connection(Case).

View File

@ -430,7 +430,6 @@ check_token(
clientinfo = ClientInfo
} = Channel
) ->
IsDeleteConn = is_delete_connection_request(Msg),
#{clientid := ClientId} = ClientInfo,
case emqx_coap_message:extract_uri_query(Msg) of
#{
@ -438,39 +437,18 @@ check_token(
<<"token">> := Token
} ->
call_session(handle_request, Msg, Channel);
#{<<"clientid">> := ReqClientId, <<"token">> := ReqToken} ->
case emqx_gateway_cm:call(coap, ReqClientId, {check_token, ReqToken}) of
undefined when IsDeleteConn ->
Any ->
%% This channel is create by this DELETE command, so here can safely close this channel
case Token =:= undefined andalso is_delete_connection_request(Msg) of
true ->
Reply = emqx_coap_message:piggyback({ok, deleted}, Msg),
{shutdown, normal, Reply, Channel};
undefined ->
?SLOG(info, #{
msg => "remote_connection_not_found",
clientid => ReqClientId,
token => ReqToken
}),
Reply = emqx_coap_message:reset(Msg),
{shutdown, normal, Reply, Channel};
false ->
?SLOG(info, #{
msg => "request_token_invalid", clientid => ReqClientId, token => ReqToken
}),
Reply = emqx_coap_message:piggyback({error, unauthorized}, Msg),
{shutdown, normal, Reply, Channel};
true ->
%% hack: since each message request can spawn a new connection
%% process, we can't rely on the `inc_incoming_stats' call in
%% `emqx_gateway_conn:handle_incoming' to properly keep track of
%% bumping incoming requests for an existing channel. Since this
%% number is used by keepalive, we have to bump it inside the
%% requested channel/connection pid so heartbeats actually work.
emqx_gateway_cm:cast(coap, ReqClientId, inc_recv_pkt),
call_session(handle_request, Msg, Channel)
end;
_ ->
io:format(">>> C1:~p, T1:~p~nC2:~p~n", [ClientId, Token, Any]),
ErrMsg = <<"Missing token or clientid in connection mode">>,
Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
{shutdown, normal, Reply, Channel}
{ok, {outgoing, Reply}, Channel}
end
end.
run_conn_hooks(
@ -785,6 +763,7 @@ process_connection(
) when
ConnState == connected
->
%% TODO should take over the session here
Queries = emqx_coap_message:extract_uri_query(Req),
ErrMsg0 =
case Queries of

View File

@ -0,0 +1,67 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_coap_proxy_conn).
-behaviour(esockd_udp_proxy_connection).
-include("emqx_coap.hrl").
-export([initialize/1, find_or_create/4, get_connection_id/4, dispatch/3, close/2]).
%%--------------------------------------------------------------------
%% Callbacks
%%--------------------------------------------------------------------
initialize(_Opts) ->
emqx_coap_frame:initial_parse_state(#{}).
find_or_create(CId, Transport, Peer, Opts) ->
case emqx_gateway_cm_registry:lookup_channels(coap, CId) of
[Pid] ->
{ok, Pid};
[] ->
emqx_gateway_conn:start_link(Transport, Peer, Opts)
end.
get_connection_id(_Transport, _Peer, State, Data) ->
case parse_incoming(Data, [], State) of
{[Msg | _] = Packets, NState} ->
case emqx_coap_message:extract_uri_query(Msg) of
#{
<<"clientid">> := ClientId
} ->
{ok, ClientId, Packets, NState};
_ ->
ErrMsg = <<"Missing token or clientid in connection mode">>,
Reply = emqx_coap_message:piggyback({error, bad_request}, ErrMsg, Msg),
Bin = emqx_coap_frame:serialize_pkt(Reply, emqx_coap_frame:serialize_opts()),
{error, Bin}
end;
_Error ->
invalid
end.
dispatch(Pid, _State, Packet) ->
erlang:send(Pid, Packet).
close(Pid, _State) ->
erlang:send(Pid, udp_proxy_closed).
parse_incoming(<<>>, Packets, State) ->
{Packets, State};
parse_incoming(Data, Packets, State) ->
{ok, Packet, Rest, NParseState} = emqx_coap_frame:parse(Data, State),
parse_incoming(Rest, [Packet | Packets], NParseState).

View File

@ -20,7 +20,7 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx_gateway/include/emqx_gateway.hrl").
%% define a gateway named stomp
%% define a gateway named coap
-gateway(#{
name => coap,
callback_module => ?MODULE,
@ -58,10 +58,11 @@ on_gateway_load(
Ctx
) ->
Listeners = normalize_config(Config),
ModCfg = #{
ModCfg = maps:merge(connection_opts(Config), #{
frame_mod => emqx_coap_frame,
chann_mod => emqx_coap_channel
},
}),
case
start_listeners(
Listeners, GwName, Ctx, ModCfg
@ -105,3 +106,13 @@ on_gateway_unload(
) ->
Listeners = normalize_config(Config),
stop_listeners(GwName, Listeners).
connection_opts(#{connection_required := false}) ->
#{};
connection_opts(_) ->
#{
connection_mod => esockd_udp_proxy,
esockd_proxy_opts => #{
connection_mod => emqx_coap_proxy_conn
}
}.

View File

@ -165,7 +165,8 @@ t_connection(_) ->
emqx_gateway_cm_registry:lookup_channels(coap, <<"client1">>)
)
end,
do(Action).
do(Action),
ok.
t_connection_with_short_param_name(_) ->
Action = fun(Channel) ->
@ -330,7 +331,8 @@ t_publish(_) ->
?assertEqual(Payload, Msg#message.payload)
after 500 ->
?assert(false)
end
end,
true
end,
with_connection(Topics, Action).
@ -360,7 +362,9 @@ t_publish_with_retain_qos_expiry(_) ->
?assertEqual(Payload, Msg#message.payload)
after 500 ->
?assert(false)
end
end,
true
end,
with_connection(Topics, Action),
@ -392,7 +396,8 @@ t_subscribe(_) ->
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
?assertEqual(Payload, PayloadRecv),
true
end,
with_connection(Topics, Fun),
@ -431,7 +436,8 @@ t_subscribe_with_qos_opt(_) ->
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
?assertEqual(Payload, PayloadRecv),
true
end,
with_connection(Topics, Fun),
@ -468,7 +474,8 @@ t_un_subscribe(_) ->
{ok, nocontent, _} = do_request(Channel, URI, UnReq),
?LOGT("un observer topic:~ts~n", [Topic]),
timer:sleep(100),
?assertEqual([], emqx:subscribers(Topic))
?assertEqual([], emqx:subscribers(Topic)),
true
end,
with_connection(Topics, Fun).
@ -497,7 +504,8 @@ t_observe_wildcard(_) ->
#coap_content{payload = PayloadRecv} = Notify,
?assertEqual(Payload, PayloadRecv)
?assertEqual(Payload, PayloadRecv),
true
end,
with_connection(Fun).
@ -530,7 +538,8 @@ t_clients_api(_) ->
{204, _} =
request(delete, "/gateways/coap/clients/client1"),
timer:sleep(200),
{200, #{data := []}} = request(get, "/gateways/coap/clients")
{200, #{data := []}} = request(get, "/gateways/coap/clients"),
false
end,
with_connection(Fun).
@ -560,7 +569,8 @@ t_clients_subscription_api(_) ->
{204, _} = request(delete, Path ++ "/tx"),
{200, []} = request(get, Path)
{200, []} = request(get, Path),
true
end,
with_connection(Fun).
@ -578,7 +588,8 @@ t_clients_get_subscription_api(_) ->
observe(Channel, Token, false),
{200, []} = request(get, Path)
{200, []} = request(get, Path),
true
end,
with_connection(Fun).
@ -773,8 +784,7 @@ with_connection(Action) ->
Fun = fun(Channel) ->
Token = connection(Channel),
timer:sleep(100),
Action(Channel, Token),
disconnection(Channel, Token),
_ = Action(Channel, Token) andalso disconnection(Channel, Token),
timer:sleep(100)
end,
do(Fun).

View File

@ -207,7 +207,8 @@ test_recv_coap_request(UdpSock) ->
test_send_coap_response(UdpSock, Host, Port, Code, Content, Request) ->
is_list(Host) orelse error("Host is not a string"),
{ok, IpAddr} = inet:getaddr(Host, inet),
Response = emqx_coap_message:piggyback(Code, Content, Request),
Response0 = emqx_coap_message:piggyback(Code, Content, Request),
Response = Response0#coap_message{options = #{uri_query => [<<"clientid=client1">>]}},
?LOGT("test_send_coap_response Response=~p", [Response]),
Binary = emqx_coap_frame:serialize_pkt(Response, undefined),
ok = gen_udp:send(UdpSock, IpAddr, Port, Binary).

View File

@ -0,0 +1 @@
Enhanced CoAP gateway connection mode, UDP connection will always be bound to the corresponding gateway connection through the `clientid`.

View File

@ -175,7 +175,7 @@ defmodule EMQXUmbrella.MixProject do
end
def common_dep(:ekka), do: {:ekka, github: "emqx/ekka", tag: "0.19.5", override: true}
def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.2", override: true}
def common_dep(:esockd), do: {:esockd, github: "emqx/esockd", tag: "5.11.3", override: true}
def common_dep(:gproc), do: {:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true}
def common_dep(:hocon), do: {:hocon, github: "emqx/hocon", tag: "0.43.1", override: true}
def common_dep(:lc), do: {:lc, github: "emqx/lc", tag: "0.3.2", override: true}

View File

@ -81,7 +81,7 @@
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.6"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.11.3"}}},
{rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.8.0-emqx-5"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.19.5"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "3.3.1"}}},

View File

@ -195,4 +195,13 @@ Relevant when the EMQX cluster is deployed behind a load-balancer."""
fields_ws_opts_proxy_address_header.label:
"""Proxy address header"""
udp_health_check.desc:
"""Some Cloud platform use a `request-reply` mechanism to check whether a UDP port is healthy, here can configure this pair."""
udp_health_check_request.desc:
"""The content of the request."""
udp_health_check_reply.desc:
"""The content to reply."""
}