From 8d0e95cc6ba2bce500d6881927e2ab2e75213668 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 24 Jun 2022 19:03:23 +0800 Subject: [PATCH] feat(exhook): expose tcp some options for grpc client --- apps/emqx_exhook/i18n/emqx_exhook_i18n.conf | 28 +++++++++++++++++++++ apps/emqx_exhook/src/emqx_exhook_schema.erl | 19 +++++++++++++- apps/emqx_exhook/src/emqx_exhook_server.erl | 11 +++++--- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/apps/emqx_exhook/i18n/emqx_exhook_i18n.conf b/apps/emqx_exhook/i18n/emqx_exhook_i18n.conf index e15766080..48754ced7 100644 --- a/apps/emqx_exhook/i18n/emqx_exhook_i18n.conf +++ b/apps/emqx_exhook/i18n/emqx_exhook_i18n.conf @@ -58,5 +58,33 @@ When gRPC is not available, Exhook tries to request the gRPC service at that int } } + keepalive { + desc { + en: """Enables/disables periodic transmission on a connected socket when no other data is exchanged. +If the other end does not respond, the connection is considered broken and an error message is sent to the controlling process.""" + zh: """当没有其他数据交换时,是否向连接的对端套接字定期的发送探测包。如果另一端没有响应,则认为连接断开,并向控制进程发送错误消息""" + } + } + nodelay { + desc { + en: """If true, option TCP_NODELAY is turned on for the socket, +which means that also small amounts of data are sent immediately""" + zh: "如果为 true,则为套接字设置 TCP_NODELAY 选项,这意味着会立即发送数据包" + } + } + + recbuf { + desc { + en: "The minimum size of the receive buffer to use for the socket" + zh: "套接字的最小接收缓冲区大小" + } + } + + sndbuf { + desc { + en: "The minimum size of the send buffer to use for the socket" + zh: "套接字的最小发送缓冲区大小" + } + } } diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl index 3454ab14c..dfd4e3ac2 100644 --- a/apps/emqx_exhook/src/emqx_exhook_schema.erl +++ b/apps/emqx_exhook/src/emqx_exhook_schema.erl @@ -68,6 +68,10 @@ fields(server) -> })}, {failed_action, failed_action()}, {ssl, ?HOCON(?R_REF(ssl_conf), #{})}, + {socket_options, + ?HOCON(?R_REF(socket_options), #{ + default => #{<<"keepalive">> => true, <<"nodelay">> => true} + })}, {auto_reconnect, ?HOCON(hoconsc:union([false, emqx_schema:duration()]), #{ default => "60s", @@ -81,7 +85,20 @@ fields(server) -> ]; fields(ssl_conf) -> Schema = emqx_schema:client_ssl_opts_schema(#{}), - lists:keydelete("user_lookup_fun", 1, Schema). + lists:keydelete("user_lookup_fun", 1, Schema); +fields(socket_options) -> + [ + {keepalive, ?HOCON(boolean(), #{default => true, desc => ?DESC(keepalive)})}, + {nodelay, ?HOCON(boolean(), #{default => true, desc => ?DESC(nodelay)})}, + {recbuf, + ?HOCON(emqx_schema:bytesize(), #{ + desc => ?DESC(recbuf), required => false, example => <<"64KB">> + })}, + {sndbuf, + ?HOCON(emqx_schema:bytesize(), #{ + desc => ?DESC(sndbuf), required => false, example => <<"16KB">> + })} + ]. desc(exhook) -> "External hook (exhook) configuration."; diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index b0c9b8454..286d062dd 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -130,14 +130,19 @@ load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) end. %% @private -channel_opts(Opts = #{url := URL}) -> +channel_opts(Opts = #{url := URL, socket_options := SockOptsT}) -> ClientOpts = maps:merge( #{pool_size => erlang:system_info(schedulers)}, Opts ), + SockOpts = maps:to_list(SockOptsT), case uri_string:parse(URL) of #{scheme := <<"http">>, host := Host, port := Port} -> - {ok, {format_http_uri("http", Host, Port), ClientOpts}}; + NClientOpts = ClientOpts#{ + gun_opts => + #{transport_opts => SockOpts} + }, + {ok, {format_http_uri("http", Host, Port), NClientOpts}}; #{scheme := <<"https">>, host := Host, port := Port} -> SslOpts = case maps:get(ssl, Opts, undefined) of @@ -158,7 +163,7 @@ channel_opts(Opts = #{url := URL}) -> gun_opts => #{ transport => ssl, - transport_opts => SslOpts + transport_opts => SockOpts ++ SslOpts } }, {ok, {format_http_uri("https", Host, Port), NClientOpts}};