diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl new file mode 100644 index 000000000..9446b0ffe --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -0,0 +1,252 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_webhook_SUITE). + +%% This suite should contains testcases that are specific for the webhook +%% bridge. There are also some test cases that implicitly tests the webhook +%% bridge in emqx_bridge_api_SUITE + +-compile(nowarn_export_all). +-compile(export_all). + +-import(emqx_mgmt_api_test_util, [request/3, uri/1]). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +groups() -> + []. + +init_per_suite(_Config) -> + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(emqx_ee_connector), + {ok, _} = application:ensure_all_started(emqx_ee_bridge), + snabbkaffe:fix_ct_logging(), + []. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector), + _ = application:stop(emqx_ee_connector), + _ = application:stop(emqx_bridge), + ok. + +suite() -> + [{timetrap, {seconds, 60}}]. + +%%------------------------------------------------------------------------------ +%% HTTP server for testing +%% (Orginally copied from emqx_bridge_api_SUITE) +%%------------------------------------------------------------------------------ +start_http_server(HTTPServerConfig) -> + ct:pal("Start server\n"), + process_flag(trap_exit, true), + Parent = self(), + {Port, Sock} = listen_on_random_port(), + Acceptor = spawn(fun() -> + accept_loop(Sock, Parent, HTTPServerConfig) + end), + timer:sleep(100), + #{port => Port, sock => Sock, acceptor => Acceptor}. + +stop_http_server(#{sock := Sock, acceptor := Acceptor}) -> + ct:pal("Stop server\n"), + exit(Acceptor, kill), + gen_tcp:close(Sock). + +listen_on_random_port() -> + SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], + case gen_tcp:listen(0, SockOpts) of + {ok, Sock} -> + {ok, Port} = inet:port(Sock), + {Port, Sock}; + {error, Reason} when Reason /= eaddrinuse -> + {error, Reason} + end. + +accept_loop(Sock, Parent, HTTPServerConfig) -> + process_flag(trap_exit, true), + {ok, Conn} = gen_tcp:accept(Sock), + spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig) end), + %%gen_tcp:controlling_process(Conn, Handler), + accept_loop(Sock, Parent, HTTPServerConfig). + +make_response(CodeStr, Str) -> + B = iolist_to_binary(Str), + iolist_to_binary( + io_lib:fwrite( + "HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s", + [CodeStr, size(B), B] + ) + ). + +handle_fun_200_ok(Conn, Parent, HTTPServerConfig) -> + ResponseDelayMS = maps:get(response_delay_ms, HTTPServerConfig, 0), + ct:pal("Waiting for request~n"), + case gen_tcp:recv(Conn, 0) of + {ok, ReqStr} -> + ct:pal("The http handler got request: ~p", [ReqStr]), + Req = parse_http_request(ReqStr), + timer:sleep(ResponseDelayMS), + Parent ! {http_server, received, Req}, + gen_tcp:send(Conn, make_response("200 OK", "Request OK")), + handle_fun_200_ok(Conn, Parent, HTTPServerConfig); + {error, closed} -> + ct:pal("http connection closed"); + {error, Reason} -> + ct:pal("the http handler recv error: ~p", [Reason]), + timer:sleep(100), + gen_tcp:close(Conn) + end. + +parse_http_request(ReqStr0) -> + [Method, ReqStr1] = string:split(ReqStr0, " ", leading), + [Path, ReqStr2] = string:split(ReqStr1, " ", leading), + [_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading), + [_HeaderStr, Body] = string:split(ReqStr3, "\r\n\r\n", leading), + #{method => Method, path => Path, body => Body}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Helper functions +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +bridge_async_config(#{port := Port} = Config) -> + Type = maps:get(type, Config, <<"webhook">>), + Name = maps:get(name, Config, atom_to_binary(?MODULE)), + PoolSize = maps:get(pool_size, Config, 1), + QueryMode = maps:get(query_mode, Config, "async"), + ConnectTimeout = maps:get(connect_timeout, Config, 1), + RequestTimeout = maps:get(request_timeout, Config, 10000), + ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"), + ConfigString = io_lib:format( + "bridges.~s.~s {\n" + " url = \"http://localhost:~p\"\n" + " connect_timeout = \"~ps\"\n" + " enable = true\n" + " enable_pipelining = 100\n" + " max_retries = 2\n" + " method = \"post\"\n" + " pool_size = ~p\n" + " pool_type = \"random\"\n" + " request_timeout = \"~ps\"\n" + " body = \"${id}\"" + " resource_opts {\n" + " async_inflight_window = 100\n" + " auto_restart_interval = \"60s\"\n" + " health_check_interval = \"15s\"\n" + " max_queue_bytes = \"1GB\"\n" + " query_mode = \"~s\"\n" + " request_timeout = \"~s\"\n" + " start_after_created = \"true\"\n" + " start_timeout = \"5s\"\n" + " worker_pool_size = \"1\"\n" + " }\n" + " ssl {\n" + " enable = false\n" + " }\n" + "}\n", + [ + Type, + Name, + Port, + ConnectTimeout, + PoolSize, + RequestTimeout, + QueryMode, + ResourceRequestTimeout + ] + ), + ct:pal(ConfigString), + parse_and_check(ConfigString, Type, Name). + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, + RetConfig. + +make_bridge(Config) -> + Type = <<"webhook">>, + Name = atom_to_binary(?MODULE), + BridgeConfig = bridge_async_config(Config#{ + name => Name, + type => Type + }), + {ok, _} = emqx_bridge:create( + Type, + Name, + BridgeConfig + ), + emqx_bridge_resource:bridge_id(Type, Name). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. +%% When the connection time out all the queued requests where dropped in +t_send_async_connection_timeout(_Config) -> + ResponseDelayMS = 90, + #{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), + % Port = 9000, + BridgeID = make_bridge(#{ + port => Port, + pool_size => 1, + query_mode => "async", + connect_timeout => ResponseDelayMS * 2, + request_timeout => 10000, + resouce_request_timeout => "infinity" + }), + NumberOfMessagesToSend = 10, + [ + emqx_bridge:send_message(BridgeID, #{<<"id">> => Id}) + || Id <- lists:seq(1, NumberOfMessagesToSend) + ], + %% Make sure server recive all messages + ct:pal("Sent messages\n"), + MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), + receive_request_notifications(MessageIDs, ResponseDelayMS), + stop_http_server(Server), + ok. + +receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 -> + ok; +receive_request_notifications(MessageIDs, ResponseDelay) -> + receive + {http_server, received, Req} -> + RemainingMessageIDs = remove_message_id(MessageIDs, Req), + receive_request_notifications(RemainingMessageIDs, ResponseDelay) + after (30 * 1000) -> + ct:pal("Waited to long time but did not get any message\n"), + ct:fail("All requests did not reach server at least once") + end. + +remove_message_id(MessageIDs, #{body := IDBin}) -> + try + ID = erlang:binary_to_integer(IDBin), + maps:remove(ID, MessageIDs) + catch + _:_ -> + %% It is acceptable to get the same message more than once + MessageIDs + end. diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 7d91e18b9..09fa988d3 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -564,7 +564,9 @@ bin(Atom) when is_atom(Atom) -> reply_delegator(ReplyFunAndArgs, Result) -> case Result of - {error, Reason} when Reason =:= econnrefused; Reason =:= timeout -> + %% The normal reason happens when the HTTP connection times out before + %% the request has been fully processed + {error, Reason} when Reason =:= econnrefused; Reason =:= timeout; Reason =:= normal -> Result1 = {error, {recoverable_error, Reason}}, emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); _ ->