fix: lost messages when HTTP connection times out

When using async mode with the webhook bridge, queued messages that are
not fully processed when the connection times out could be lost. This
commit fixes this by letting the bridge return a recoverable_error when
this happen. The message send will then be retried in sync mode by the
emqx_resource_buffer_worker.

Fixes: https://emqx.atlassian.net/browse/EMQX-8974
This commit is contained in:
Kjell Winblad 2023-03-06 14:17:23 +01:00 committed by Zaiming (Stone) Shi
parent 58ee8ed35c
commit ca947e3e70
2 changed files with 255 additions and 1 deletions

View File

@ -0,0 +1,252 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_webhook_SUITE).
%% This suite should contains testcases that are specific for the webhook
%% bridge. There are also some test cases that implicitly tests the webhook
%% bridge in emqx_bridge_api_SUITE
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_suite(_Config) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
{ok, _} = application:ensure_all_started(emqx_ee_connector),
{ok, _} = application:ensure_all_started(emqx_ee_bridge),
snabbkaffe:fix_ct_logging(),
[].
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
_ = application:stop(emqx_ee_connector),
_ = application:stop(emqx_bridge),
ok.
suite() ->
[{timetrap, {seconds, 60}}].
%%------------------------------------------------------------------------------
%% HTTP server for testing
%% (Orginally copied from emqx_bridge_api_SUITE)
%%------------------------------------------------------------------------------
start_http_server(HTTPServerConfig) ->
ct:pal("Start server\n"),
process_flag(trap_exit, true),
Parent = self(),
{Port, Sock} = listen_on_random_port(),
Acceptor = spawn(fun() ->
accept_loop(Sock, Parent, HTTPServerConfig)
end),
timer:sleep(100),
#{port => Port, sock => Sock, acceptor => Acceptor}.
stop_http_server(#{sock := Sock, acceptor := Acceptor}) ->
ct:pal("Stop server\n"),
exit(Acceptor, kill),
gen_tcp:close(Sock).
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
case gen_tcp:listen(0, SockOpts) of
{ok, Sock} ->
{ok, Port} = inet:port(Sock),
{Port, Sock};
{error, Reason} when Reason /= eaddrinuse ->
{error, Reason}
end.
accept_loop(Sock, Parent, HTTPServerConfig) ->
process_flag(trap_exit, true),
{ok, Conn} = gen_tcp:accept(Sock),
spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig) end),
%%gen_tcp:controlling_process(Conn, Handler),
accept_loop(Sock, Parent, HTTPServerConfig).
make_response(CodeStr, Str) ->
B = iolist_to_binary(Str),
iolist_to_binary(
io_lib:fwrite(
"HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s",
[CodeStr, size(B), B]
)
).
handle_fun_200_ok(Conn, Parent, HTTPServerConfig) ->
ResponseDelayMS = maps:get(response_delay_ms, HTTPServerConfig, 0),
ct:pal("Waiting for request~n"),
case gen_tcp:recv(Conn, 0) of
{ok, ReqStr} ->
ct:pal("The http handler got request: ~p", [ReqStr]),
Req = parse_http_request(ReqStr),
timer:sleep(ResponseDelayMS),
Parent ! {http_server, received, Req},
gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
handle_fun_200_ok(Conn, Parent, HTTPServerConfig);
{error, closed} ->
ct:pal("http connection closed");
{error, Reason} ->
ct:pal("the http handler recv error: ~p", [Reason]),
timer:sleep(100),
gen_tcp:close(Conn)
end.
parse_http_request(ReqStr0) ->
[Method, ReqStr1] = string:split(ReqStr0, " ", leading),
[Path, ReqStr2] = string:split(ReqStr1, " ", leading),
[_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading),
[_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading),
#{method => Method, path => Path, body => Body}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper functions
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
bridge_async_config(#{port := Port} = Config) ->
Type = maps:get(type, Config, <<"webhook">>),
Name = maps:get(name, Config, atom_to_binary(?MODULE)),
PoolSize = maps:get(pool_size, Config, 1),
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
" connect_timeout = \"~ps\"\n"
" enable = true\n"
" enable_pipelining = 100\n"
" max_retries = 2\n"
" method = \"post\"\n"
" pool_size = ~p\n"
" pool_type = \"random\"\n"
" request_timeout = \"~ps\"\n"
" body = \"${id}\""
" resource_opts {\n"
" async_inflight_window = 100\n"
" auto_restart_interval = \"60s\"\n"
" health_check_interval = \"15s\"\n"
" max_queue_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n"
" request_timeout = \"~s\"\n"
" start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n"
" worker_pool_size = \"1\"\n"
" }\n"
" ssl {\n"
" enable = false\n"
" }\n"
"}\n",
[
Type,
Name,
Port,
ConnectTimeout,
PoolSize,
RequestTimeout,
QueryMode,
ResourceRequestTimeout
]
),
ct:pal(ConfigString),
parse_and_check(ConfigString, Type, Name).
parse_and_check(ConfigString, BridgeType, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
RetConfig.
make_bridge(Config) ->
Type = <<"webhook">>,
Name = atom_to_binary(?MODULE),
BridgeConfig = bridge_async_config(Config#{
name => Name,
type => Type
}),
{ok, _} = emqx_bridge:create(
Type,
Name,
BridgeConfig
),
emqx_bridge_resource:bridge_id(Type, Name).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed.
%% When the connection time out all the queued requests where dropped in
t_send_async_connection_timeout(_Config) ->
ResponseDelayMS = 90,
#{port := Port} = Server = start_http_server(#{response_delay_ms => 900}),
% Port = 9000,
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
resouce_request_timeout => "infinity"
}),
NumberOfMessagesToSend = 10,
[
emqx_bridge:send_message(BridgeID, #{<<"id">> => Id})
|| Id <- lists:seq(1, NumberOfMessagesToSend)
],
%% Make sure server recive all messages
ct:pal("Sent messages\n"),
MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void),
receive_request_notifications(MessageIDs, ResponseDelayMS),
stop_http_server(Server),
ok.
receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 ->
ok;
receive_request_notifications(MessageIDs, ResponseDelay) ->
receive
{http_server, received, Req} ->
RemainingMessageIDs = remove_message_id(MessageIDs, Req),
receive_request_notifications(RemainingMessageIDs, ResponseDelay)
after (30 * 1000) ->
ct:pal("Waited to long time but did not get any message\n"),
ct:fail("All requests did not reach server at least once")
end.
remove_message_id(MessageIDs, #{body := IDBin}) ->
try
ID = erlang:binary_to_integer(IDBin),
maps:remove(ID, MessageIDs)
catch
_:_ ->
%% It is acceptable to get the same message more than once
MessageIDs
end.

View File

@ -564,7 +564,9 @@ bin(Atom) when is_atom(Atom) ->
reply_delegator(ReplyFunAndArgs, Result) -> reply_delegator(ReplyFunAndArgs, Result) ->
case Result of case Result of
{error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> %% The normal reason happens when the HTTP connection times out before
%% the request has been fully processed
{error, Reason} when Reason =:= econnrefused; Reason =:= timeout; Reason =:= normal ->
Result1 = {error, {recoverable_error, Reason}}, Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
_ -> _ ->