From ca947e3e702ec57d62b95dcc7441910bdc6679a7 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Mar 2023 14:17:23 +0100 Subject: [PATCH 1/5] fix: lost messages when HTTP connection times out When using async mode with the webhook bridge, queued messages that are not fully processed when the connection times out could be lost. This commit fixes this by letting the bridge return a recoverable_error when this happen. The message send will then be retried in sync mode by the emqx_resource_buffer_worker. Fixes: https://emqx.atlassian.net/browse/EMQX-8974 --- .../test/emqx_bridge_webhook_SUITE.erl | 252 ++++++++++++++++++ .../src/emqx_connector_http.erl | 4 +- 2 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl new file mode 100644 index 000000000..9446b0ffe --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -0,0 +1,252 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_webhook_SUITE). + +%% This suite should contains testcases that are specific for the webhook +%% bridge. There are also some test cases that implicitly tests the webhook +%% bridge in emqx_bridge_api_SUITE + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_mgmt_api_test_util, [request/3, uri/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +groups() -> + []. + +init_per_suite(_Config) -> + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(emqx_ee_connector), + {ok, _} = application:ensure_all_started(emqx_ee_bridge), + snabbkaffe:fix_ct_logging(), + []. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector), + _ = application:stop(emqx_ee_connector), + _ = application:stop(emqx_bridge), + ok. + +suite() -> + [{timetrap, {seconds, 60}}]. + +%%------------------------------------------------------------------------------ +%% HTTP server for testing +%% (Orginally copied from emqx_bridge_api_SUITE) +%%------------------------------------------------------------------------------ +start_http_server(HTTPServerConfig) -> + ct:pal("Start server\n"), + process_flag(trap_exit, true), + Parent = self(), + {Port, Sock} = listen_on_random_port(), + Acceptor = spawn(fun() -> + accept_loop(Sock, Parent, HTTPServerConfig) + end), + timer:sleep(100), + #{port => Port, sock => Sock, acceptor => Acceptor}. + +stop_http_server(#{sock := Sock, acceptor := Acceptor}) -> + ct:pal("Stop server\n"), + exit(Acceptor, kill), + gen_tcp:close(Sock). + +listen_on_random_port() -> + SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], + case gen_tcp:listen(0, SockOpts) of + {ok, Sock} -> + {ok, Port} = inet:port(Sock), + {Port, Sock}; + {error, Reason} when Reason /= eaddrinuse -> + {error, Reason} + end. + +accept_loop(Sock, Parent, HTTPServerConfig) -> + process_flag(trap_exit, true), + {ok, Conn} = gen_tcp:accept(Sock), + spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig) end), + %%gen_tcp:controlling_process(Conn, Handler), + accept_loop(Sock, Parent, HTTPServerConfig). + +make_response(CodeStr, Str) -> + B = iolist_to_binary(Str), + iolist_to_binary( + io_lib:fwrite( + "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s", + [CodeStr, size(B), B] + ) + ). + +handle_fun_200_ok(Conn, Parent, HTTPServerConfig) -> + ResponseDelayMS = maps:get(response_delay_ms, HTTPServerConfig, 0), + ct:pal("Waiting for request~n"), + case gen_tcp:recv(Conn, 0) of + {ok, ReqStr} -> + ct:pal("The http handler got request: ~p", [ReqStr]), + Req = parse_http_request(ReqStr), + timer:sleep(ResponseDelayMS), + Parent ! {http_server, received, Req}, + gen_tcp:send(Conn, make_response("200 OK", "Request OK")), + handle_fun_200_ok(Conn, Parent, HTTPServerConfig); + {error, closed} -> + ct:pal("http connection closed"); + {error, Reason} -> + ct:pal("the http handler recv error: ~p", [Reason]), + timer:sleep(100), + gen_tcp:close(Conn) + end. + +parse_http_request(ReqStr0) -> + [Method, ReqStr1] = string:split(ReqStr0, " ", leading), + [Path, ReqStr2] = string:split(ReqStr1, " ", leading), + [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), + [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading), + #{method => Method, path => Path, body => Body}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper functions +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +bridge_async_config(#{port := Port} = Config) -> + Type = maps:get(type, Config, <<"webhook">>), + Name = maps:get(name, Config, atom_to_binary(?MODULE)), + PoolSize = maps:get(pool_size, Config, 1), + QueryMode = maps:get(query_mode, Config, "async"), + ConnectTimeout = maps:get(connect_timeout, Config, 1), + RequestTimeout = maps:get(request_timeout, Config, 10000), + ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"), + ConfigString = io_lib:format( + "bridges.~s.~s {\n" + " url = \"http://localhost:~p\"\n" + " connect_timeout = \"~ps\"\n" + " enable = true\n" + " enable_pipelining = 100\n" + " max_retries = 2\n" + " method = \"post\"\n" + " pool_size = ~p\n" + " pool_type = \"random\"\n" + " request_timeout = \"~ps\"\n" + " body = \"${id}\"" + " resource_opts {\n" + " async_inflight_window = 100\n" + " auto_restart_interval = \"60s\"\n" + " health_check_interval = \"15s\"\n" + " max_queue_bytes = \"1GB\"\n" + " query_mode = \"~s\"\n" + " request_timeout = \"~s\"\n" + " start_after_created = \"true\"\n" + " start_timeout = \"5s\"\n" + " worker_pool_size = \"1\"\n" + " }\n" + " ssl {\n" + " enable = false\n" + " }\n" + "}\n", + [ + Type, + Name, + Port, + ConnectTimeout, + PoolSize, + RequestTimeout, + QueryMode, + ResourceRequestTimeout + ] + ), + ct:pal(ConfigString), + parse_and_check(ConfigString, Type, Name). + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, + RetConfig. + +make_bridge(Config) -> + Type = <<"webhook">>, + Name = atom_to_binary(?MODULE), + BridgeConfig = bridge_async_config(Config#{ + name => Name, + type => Type + }), + {ok, _} = emqx_bridge:create( + Type, + Name, + BridgeConfig + ), + emqx_bridge_resource:bridge_id(Type, Name). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. +%% When the connection time out all the queued requests where dropped in +t_send_async_connection_timeout(_Config) -> + ResponseDelayMS = 90, + #{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), + % Port = 9000, + BridgeID = make_bridge(#{ + port => Port, + pool_size => 1, + query_mode => "async", + connect_timeout => ResponseDelayMS * 2, + request_timeout => 10000, + resouce_request_timeout => "infinity" + }), + NumberOfMessagesToSend = 10, + [ + emqx_bridge:send_message(BridgeID, #{<<"id">> => Id}) + || Id <- lists:seq(1, NumberOfMessagesToSend) + ], + %% Make sure server recive all messages + ct:pal("Sent messages\n"), + MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), + receive_request_notifications(MessageIDs, ResponseDelayMS), + stop_http_server(Server), + ok. + +receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 -> + ok; +receive_request_notifications(MessageIDs, ResponseDelay) -> + receive + {http_server, received, Req} -> + RemainingMessageIDs = remove_message_id(MessageIDs, Req), + receive_request_notifications(RemainingMessageIDs, ResponseDelay) + after (30 * 1000) -> + ct:pal("Waited to long time but did not get any message\n"), + ct:fail("All requests did not reach server at least once") + end. + +remove_message_id(MessageIDs, #{body := IDBin}) -> + try + ID = erlang:binary_to_integer(IDBin), + maps:remove(ID, MessageIDs) + catch + _:_ -> + %% It is acceptable to get the same message more than once + MessageIDs + end. diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 7d91e18b9..09fa988d3 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -564,7 +564,9 @@ bin(Atom) when is_atom(Atom) -> reply_delegator(ReplyFunAndArgs, Result) -> case Result of - {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> + %% The normal reason happens when the HTTP connection times out before + %% the request has been fully processed + {error, Reason} when Reason =:= econnrefused; Reason =:= timeout; Reason =:= normal -> Result1 = {error, {recoverable_error, Reason}}, emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); _ -> From 163b33ab287b3b5463a0064251a18384fdf0c767 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 Mar 2023 15:20:34 +0100 Subject: [PATCH 2/5] test: remove unnecessary dependencies of ee apps --- .../test/emqx_bridge_webhook_SUITE.erl | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 9446b0ffe..8c688a0ec 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -38,16 +38,13 @@ init_per_suite(_Config) -> ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), - {ok, _} = application:ensure_all_started(emqx_ee_connector), - {ok, _} = application:ensure_all_started(emqx_ee_bridge), snabbkaffe:fix_ct_logging(), []. end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), - _ = application:stop(emqx_ee_connector), _ = application:stop(emqx_bridge), ok. @@ -242,11 +239,6 @@ receive_request_notifications(MessageIDs, ResponseDelay) -> end. remove_message_id(MessageIDs, #{body := IDBin}) -> - try - ID = erlang:binary_to_integer(IDBin), - maps:remove(ID, MessageIDs) - catch - _:_ -> - %% It is acceptable to get the same message more than once - MessageIDs - end. + ID = erlang:binary_to_integer(IDBin), + %% It is acceptable to get the same message more than once + maps:without([ID], MessageIDs). From 93ebd59fb24b78e28e8567532001cb6490202a9b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Mar 2023 20:53:15 +0100 Subject: [PATCH 3/5] docs: add changelogs for PR 10076 --- changes/ce/fix-10076.en.md | 2 ++ changes/ce/fix-10076.zh.md | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 changes/ce/fix-10076.en.md create mode 100644 changes/ce/fix-10076.zh.md diff --git a/changes/ce/fix-10076.en.md b/changes/ce/fix-10076.en.md new file mode 100644 index 000000000..e01df765b --- /dev/null +++ b/changes/ce/fix-10076.en.md @@ -0,0 +1,2 @@ +Fix webhook bridge error handling: connection timeout should be a retriable error. +Prior to this fix, connection timeout was clasified as unrecoverable error hence lead to request being dropped. diff --git a/changes/ce/fix-10076.zh.md b/changes/ce/fix-10076.zh.md new file mode 100644 index 000000000..516345f92 --- /dev/null +++ b/changes/ce/fix-10076.zh.md @@ -0,0 +1,2 @@ +修复 HTTP 桥接的一个异常处理:连接超时错误发生后,发生错误的请求可以被重试。 +在此修复前,连接超时后,被当作不可重试类型的错误处理,导致请求被丢弃。 From 26b29185b2ce9b7d049bd692a0c6f28e11027d88 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Mar 2023 20:53:37 +0100 Subject: [PATCH 4/5] test(emqx_bridge_webhook_SUITE): fix flakyness in test web server --- .../test/emqx_bridge_webhook_SUITE.erl | 55 +++++++++++++------ 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 8c688a0ec..4c349c7a0 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -59,7 +59,7 @@ start_http_server(HTTPServerConfig) -> ct:pal("Start server\n"), process_flag(trap_exit, true), Parent = self(), - {Port, Sock} = listen_on_random_port(), + {ok, {Port, Sock}} = listen_on_random_port(), Acceptor = spawn(fun() -> accept_loop(Sock, Parent, HTTPServerConfig) end), @@ -76,17 +76,22 @@ listen_on_random_port() -> case gen_tcp:listen(0, SockOpts) of {ok, Sock} -> {ok, Port} = inet:port(Sock), - {Port, Sock}; - {error, Reason} when Reason /= eaddrinuse -> + {ok, {Port, Sock}}; + {error, Reason} when Reason =/= eaddrinuse -> {error, Reason} end. accept_loop(Sock, Parent, HTTPServerConfig) -> process_flag(trap_exit, true), - {ok, Conn} = gen_tcp:accept(Sock), - spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig) end), - %%gen_tcp:controlling_process(Conn, Handler), - accept_loop(Sock, Parent, HTTPServerConfig). + case gen_tcp:accept(Sock) of + {ok, Conn} -> + spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig, <<>>) end), + %%gen_tcp:controlling_process(Conn, Handler), + accept_loop(Sock, Parent, HTTPServerConfig); + {error, closed} -> + %% socket owner died + ok + end. make_response(CodeStr, Str) -> B = iolist_to_binary(Str), @@ -97,17 +102,21 @@ make_response(CodeStr, Str) -> ) ). -handle_fun_200_ok(Conn, Parent, HTTPServerConfig) -> +handle_fun_200_ok(Conn, Parent, HTTPServerConfig, Acc) -> ResponseDelayMS = maps:get(response_delay_ms, HTTPServerConfig, 0), ct:pal("Waiting for request~n"), case gen_tcp:recv(Conn, 0) of {ok, ReqStr} -> ct:pal("The http handler got request: ~p", [ReqStr]), - Req = parse_http_request(ReqStr), - timer:sleep(ResponseDelayMS), - Parent ! {http_server, received, Req}, - gen_tcp:send(Conn, make_response("200 OK", "Request OK")), - handle_fun_200_ok(Conn, Parent, HTTPServerConfig); + case parse_http_request(<>) of + {ok, incomplete, NewAcc} -> + handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc); + {ok, Req, NewAcc} -> + timer:sleep(ResponseDelayMS), + Parent ! {http_server, received, Req}, + gen_tcp:send(Conn, make_response("200 OK", "Request OK")), + handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc) + end; {error, closed} -> ct:pal("http connection closed"); {error, Reason} -> @@ -116,12 +125,26 @@ handle_fun_200_ok(Conn, Parent, HTTPServerConfig) -> gen_tcp:close(Conn) end. -parse_http_request(ReqStr0) -> +parse_http_request(ReqStr) -> + try + parse_http_request_assertive(ReqStr) + catch + _:_ -> + {ok, incomplete, ReqStr} + end. + +parse_http_request_assertive(ReqStr0) -> + %% find body length + [_, LengthStr0] = string:split(ReqStr0, "content-length:"), + [LengthStr, _] = string:split(LengthStr0, "\r\n"), + Length = binary_to_integer(string:trim(LengthStr, both)), + %% split between multiple requests [Method, ReqStr1] = string:split(ReqStr0, " ", leading), [Path, ReqStr2] = string:split(ReqStr1, " ", leading), [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), - [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading), - #{method => Method, path => Path, body => Body}. + [_HeaderStr, Rest] = string:split(ReqStr3, "\r\n\r\n", leading), + <> = Rest, + {ok, #{method => Method, path => Path, body => Body}, Remain}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Helper functions From 116137a447d8be7993794108d41ca33b5a5da1ee Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Tue, 7 Mar 2023 21:58:05 +0100 Subject: [PATCH 5/5] docs: fix typos in change log Co-authored-by: Ivan Dyachkov --- changes/ce/fix-10076.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ce/fix-10076.en.md b/changes/ce/fix-10076.en.md index e01df765b..5bbbffa32 100644 --- a/changes/ce/fix-10076.en.md +++ b/changes/ce/fix-10076.en.md @@ -1,2 +1,2 @@ Fix webhook bridge error handling: connection timeout should be a retriable error. -Prior to this fix, connection timeout was clasified as unrecoverable error hence lead to request being dropped. +Prior to this fix, connection timeout was classified as unrecoverable error and led to request being dropped.