From 20414d737323aa6a35906f49697fd72fbe990ef5 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 16 Mar 2023 13:33:52 -0300 Subject: [PATCH 01/15] fix(buffer_worker): check request timeout and health check interval Fixes https://emqx.atlassian.net/browse/EMQX-9099 The default value for `request_timeout` is 15 seconds, and the default resume interval is also 15 seconds (the health check timeout, if `resume_interval` is not explicitly given). This means that, in practice, if a buffer worker ever gets into the blocked state, then almost all requests will timeout. Proposed improvement: - `request_timeout` should by default be twice as much as health_check_interval. - Emit a alarm if `request_timeout` is not greater than `health_check_interval`. --- .../i18n/emqx_resource_schema_i18n.conf | 4 +- apps/emqx_resource/include/emqx_resource.hrl | 5 +- .../src/emqx_resource_buffer_worker.erl | 38 ++++++ .../src/emqx_resource_manager.erl | 1 + .../test/emqx_resource_SUITE.erl | 126 ++++++++++++++++++ changes/ce/fix-10154.en.md | 7 + 6 files changed, 178 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-10154.en.md diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index fb6b2eb06..2e5cf96e8 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -102,8 +102,8 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise request_timeout { desc { - en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" - zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。""" + en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired. We recommend setting this timeout to be at least twice the health check interval, so that the buffer has the chance to recover if too many requests get enqueued.""" + zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。我们建议将这个超时设置为健康检查间隔的至少两倍,这样,如果有太多的请求被排队,缓冲区就有机会恢复。""" } label { en: """Request Expiry""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index be570e694..8033ed660 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -91,7 +91,10 @@ -define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>). --define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). +%% Note: this should be greater than the health check timeout; +%% otherwise, if the buffer worker is ever blocked, than all queued +%% requests will basically fail without being attempted. +-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(30)). %% count -define(DEFAULT_BATCH_SIZE, 1). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 8bfd77e61..05622bdd7 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -56,6 +56,8 @@ -export([clear_disk_queue_dir/2]). +-export([deactivate_bad_request_timeout_alarm/1]). + -elvis([{elvis_style, dont_repeat_yourself, disable}]). -define(COLLECT_REQ_LIMIT, 1000). @@ -88,6 +90,8 @@ -type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()). -type request() :: term(). -type request_from() :: undefined | gen_statem:from(). +-type request_timeout() :: infinity | timer:time(). +-type health_check_interval() :: timer:time(). -type state() :: blocked | running. -type inflight_key() :: integer(). -type data() :: #{ @@ -199,6 +203,7 @@ init({Id, Index, Opts}) -> RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), + maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval), Data = #{ id => Id, index => Index, @@ -1679,6 +1684,39 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) -> end, BatchTime. +%% The request timeout should be greater than the health check +%% timeout, health timeout defines how often the buffer worker tries +%% to unblock. If request timeout is <= health check timeout and the +%% buffer worker is ever blocked, than all queued requests will +%% basically fail without being attempted. +-spec maybe_toggle_bad_request_timeout_alarm( + resource_id(), request_timeout(), health_check_interval() +) -> ok. +maybe_toggle_bad_request_timeout_alarm(Id, _RequestTimeout = infinity, _HealthCheckInterval) -> + deactivate_bad_request_timeout_alarm(Id), + ok; +maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval) -> + case RequestTimeout > HealthCheckInterval of + true -> + deactivate_bad_request_timeout_alarm(Id), + ok; + false -> + _ = emqx_alarm:activate( + bad_request_timeout_alarm_id(Id), + #{resource_id => Id, reason => bad_request_timeout}, + <<"Request timeout should be greater than health check timeout: ", Id/binary>> + ), + ok + end. + +-spec deactivate_bad_request_timeout_alarm(resource_id()) -> ok. +deactivate_bad_request_timeout_alarm(Id) -> + _ = emqx_alarm:ensure_deactivated(bad_request_timeout_alarm_id(Id)), + ok. + +bad_request_timeout_alarm_id(Id) -> + <<"bad_request_timeout:", Id/binary>>. + -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). adjust_batch_time_test_() -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 40f9fe1ab..2bdc67a4d 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -506,6 +506,7 @@ handle_remove_event(From, ClearMetrics, Data) -> true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, + emqx_resource_buffer_worker:deactivate_bad_request_timeout_alarm(Data#data.id), {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}. start_resource(Data, From) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index ff7e1d347..25f4a6d77 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2554,6 +2554,132 @@ do_t_recursive_flush() -> ), ok. +%% Check that we raise an alarm if a bad request timeout config is +%% issued. Request timeout should be greater than health check +%% timeout. +t_bad_request_timeout_alarm(_Config) -> + emqx_connector_demo:set_callback_mode(async_if_possible), + + %% 1) Same values. + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + request_timeout => 1_000, + health_check_interval => 1_000, + worker_pool_size => 2 + } + ), + ExpectedMessage = + <<"Request timeout should be greater than health check timeout: ", ?ID/binary>>, + ?assertMatch( + [ + #{ + message := ExpectedMessage, + details := #{reason := bad_request_timeout, resource_id := ?ID}, + deactivate_at := infinity + } + ], + emqx_alarm:get_alarms(activated) + ), + %% The unexpected termination of one of the buffer workers should + %% not turn the alarm off. + [Pid, _ | _] = emqx_resource_buffer_worker_sup:worker_pids(?ID), + MRef = monitor(process, Pid), + exit(Pid, kill), + receive + {'DOWN', MRef, process, Pid, _} -> + ok + after 300 -> + ct:fail("buffer worker didn't die") + end, + ?assertMatch( + [ + #{ + message := ExpectedMessage, + details := #{reason := bad_request_timeout, resource_id := ?ID}, + deactivate_at := infinity + } + ], + emqx_alarm:get_alarms(activated) + ), + ok = emqx_resource:remove_local(?ID), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + + %% 2) Request timeout < health check interval. + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + request_timeout => 999, + health_check_interval => 1_000, + worker_pool_size => 2 + } + ), + ?assertMatch( + [ + #{ + message := ExpectedMessage, + details := #{reason := bad_request_timeout, resource_id := ?ID}, + deactivate_at := infinity + } + ], + emqx_alarm:get_alarms(activated) + ), + ok = emqx_resource:remove_local(?ID), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + + %% 2) Request timeout < health check interval. + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + request_timeout => 999, + health_check_interval => 1_000, + worker_pool_size => 2 + } + ), + ?assertMatch( + [ + #{ + message := ExpectedMessage, + details := #{reason := bad_request_timeout, resource_id := ?ID}, + deactivate_at := infinity + } + ], + emqx_alarm:get_alarms(activated) + ), + ok = emqx_resource:remove_local(?ID), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + + %% 3) Request timeout > health check interval. + {ok, _} = emqx_resource:create( + ?ID, + ?DEFAULT_RESOURCE_GROUP, + ?TEST_RESOURCE, + #{name => test_resource}, + #{ + query_mode => async, + request_timeout => 1_001, + health_check_interval => 1_000, + worker_pool_size => 2 + } + ), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + ok = emqx_resource:remove_local(?ID), + ?assertEqual([], emqx_alarm:get_alarms(activated)), + + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-10154.en.md b/changes/ce/fix-10154.en.md new file mode 100644 index 000000000..83a729360 --- /dev/null +++ b/changes/ce/fix-10154.en.md @@ -0,0 +1,7 @@ +Change the default `request_timeout` for bridges and connectors to be +twice the default `health_check_interval`. + +Before this change, the default values for those two options meant +that, if a buffer ever got blocked due to resource errors or high +message volumes, then, by the time the buffer would try to resume its +normal operations, almost all requests would have timed out. From 2d75c7d6d998ed0b10bc84934959463a604b2ff4 Mon Sep 17 00:00:00 2001 From: Erik Timan Date: Fri, 24 Feb 2023 14:21:20 +0100 Subject: [PATCH 02/15] fix(emqx_bridge): remove metrics from non-dedicated bridge API endpoints Metrics should only be exposed via the /bridges/:id/metrics endpoint, and not in other operations such as getting the list of all bridges, or in the response when a bridge has been created. This commit removes all traces of metrics for the non-dedicated API endpoints. --- apps/emqx_bridge/include/emqx_bridge.hrl | 10 --- apps/emqx_bridge/src/emqx_bridge_api.erl | 63 +++++++------------ .../test/emqx_bridge_api_SUITE.erl | 30 ++++----- changes/ce/fix-10026.en.md | 1 + changes/ce/fix-10026.zh.md | 1 + .../src/emqx_ee_bridge_clickhouse.erl | 8 +-- .../src/emqx_ee_bridge_dynamo.erl | 8 +-- .../src/emqx_ee_bridge_gcp_pubsub.erl | 9 +-- .../src/emqx_ee_bridge_hstreamdb.erl | 9 +-- .../src/emqx_ee_bridge_influxdb.erl | 3 +- .../src/emqx_ee_bridge_kafka.erl | 3 +- .../src/emqx_ee_bridge_mongodb.erl | 4 -- .../src/emqx_ee_bridge_mysql.erl | 9 +-- .../src/emqx_ee_bridge_pgsql.erl | 9 +-- .../src/emqx_ee_bridge_redis.erl | 3 +- .../src/emqx_ee_bridge_tdengine.erl | 9 +-- .../test/emqx_ee_bridge_redis_SUITE.erl | 3 +- 17 files changed, 56 insertions(+), 126 deletions(-) create mode 100644 changes/ce/fix-10026.en.md create mode 100644 changes/ce/fix-10026.zh.md diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl index d062b4b7f..81feef893 100644 --- a/apps/emqx_bridge/include/emqx_bridge.hrl +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -99,13 +99,3 @@ received := Rcvd } ). - --define(METRICS_EXAMPLE, #{ - metrics => ?EMPTY_METRICS, - node_metrics => [ - #{ - node => node(), - metrics => ?EMPTY_METRICS - } - ] -}). diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 8ac3e476a..93b6cbf5a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -161,22 +161,19 @@ param_path_enable() -> } )}. -bridge_info_array_example(Method, WithMetrics) -> - [Config || #{value := Config} <- maps:values(bridge_info_examples(Method, WithMetrics))]. +bridge_info_array_example(Method) -> + lists:map(fun(#{value := Config}) -> Config end, maps:values(bridge_info_examples(Method))). bridge_info_examples(Method) -> - bridge_info_examples(Method, false). - -bridge_info_examples(Method, WithMetrics) -> maps:merge( #{ <<"webhook_example">> => #{ summary => <<"WebHook">>, - value => info_example(webhook, Method, WithMetrics) + value => info_example(webhook, Method) }, <<"mqtt_example">> => #{ summary => <<"MQTT Bridge">>, - value => info_example(mqtt, Method, WithMetrics) + value => info_example(mqtt, Method) } }, ee_bridge_examples(Method) @@ -189,35 +186,21 @@ ee_bridge_examples(Method) -> ee_bridge_examples(_Method) -> #{}. -endif. -info_example(Type, Method, WithMetrics) -> +info_example(Type, Method) -> maps:merge( info_example_basic(Type), - method_example(Type, Method, WithMetrics) + method_example(Type, Method) ). -method_example(Type, Method, WithMetrics) when Method == get; Method == post -> +method_example(Type, Method) when Method == get; Method == post -> SType = atom_to_list(Type), SName = SType ++ "_example", - TypeNameExam = #{ + #{ type => bin(SType), name => bin(SName) - }, - maybe_with_metrics_example(TypeNameExam, Method, WithMetrics); -method_example(_Type, put, _WithMetrics) -> - #{}. - -maybe_with_metrics_example(TypeNameExam, get, true) -> - TypeNameExam#{ - metrics => ?EMPTY_METRICS, - node_metrics => [ - #{ - node => node(), - metrics => ?EMPTY_METRICS - } - ] }; -maybe_with_metrics_example(TypeNameExam, _, _) -> - TypeNameExam. +method_example(_Type, put) -> + #{}. info_example_basic(webhook) -> #{ @@ -306,7 +289,7 @@ schema("/bridges") -> responses => #{ 200 => emqx_dashboard_swagger:schema_with_example( array(emqx_bridge_schema:get_response()), - bridge_info_array_example(get, true) + bridge_info_array_example(get) ) } }, @@ -587,7 +570,7 @@ maybe_deobfuscate_bridge_probe(Params) -> Params. lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> - FormatFun = fun format_bridge_info_without_metrics/1, + FormatFun = fun format_bridge_info/1, do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun). lookup_from_all_nodes_metrics(BridgeType, BridgeName, SuccCode) -> @@ -712,7 +695,7 @@ zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> lists:foldl( fun(#{type := Type, name := Name}, Acc) -> Bridges = pick_bridges_by_id(Type, Name, BridgesAllNodes), - [format_bridge_info_with_metrics(Bridges) | Acc] + [format_bridge_info(Bridges) | Acc] end, [], BridgesFirstNode @@ -746,24 +729,20 @@ pick_bridges_by_id(Type, Name, BridgesAllNodes) -> BridgesAllNodes ). -format_bridge_info_with_metrics([FirstBridge | _] = Bridges) -> - Res = maps:remove(node, FirstBridge), +format_bridge_info([FirstBridge | _] = Bridges) -> + Res = maps:without([node, metrics], FirstBridge), NodeStatus = collect_status(Bridges), - NodeMetrics = collect_metrics(Bridges), redact(Res#{ status => aggregate_status(NodeStatus), - node_status => NodeStatus, - metrics => aggregate_metrics(NodeMetrics), - node_metrics => NodeMetrics + node_status => NodeStatus }). -format_bridge_info_without_metrics(Bridges) -> - Res = format_bridge_info_with_metrics(Bridges), - maps:without([metrics, node_metrics], Res). - format_bridge_metrics(Bridges) -> - Res = format_bridge_info_with_metrics(Bridges), - maps:with([metrics, node_metrics], Res). + NodeMetrics = collect_metrics(Bridges), + #{ + metrics => aggregate_metrics(NodeMetrics), + node_metrics => NodeMetrics + }. collect_status(Bridges) -> [maps:with([node, status], B) || B <- Bridges]. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 8b388a771..ab24ccc8f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -267,8 +267,6 @@ t_http_crud_apis(Config) -> <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _], - <<"metrics">> := _, - <<"node_metrics">> := [_ | _], <<"url">> := URL2 } ], @@ -886,6 +884,7 @@ t_metrics(Config) -> ), %ct:pal("---bridge: ~p", [Bridge]), + Decoded = emqx_json:decode(Bridge, [return_maps]), #{ <<"type">> := ?BRIDGE_TYPE, <<"name">> := Name, @@ -893,7 +892,11 @@ t_metrics(Config) -> <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL1 - } = emqx_json:decode(Bridge, [return_maps]), + } = Decoded, + + %% assert that the bridge return doesn't contain metrics anymore + ?assertNot(maps:is_key(<<"metrics">>, Decoded)), + ?assertNot(maps:is_key(<<"node_metrics">>, Decoded)), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), @@ -909,9 +912,9 @@ t_metrics(Config) -> %% check that the bridge doesn't contain metrics anymore {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []), - Decoded = emqx_json:decode(Bridge2Str, [return_maps]), - ?assertNot(maps:is_key(<<"metrics">>, Decoded)), - ?assertNot(maps:is_key(<<"node_metrics">>, Decoded)), + Decoded2 = emqx_json:decode(Bridge2Str, [return_maps]), + ?assertNot(maps:is_key(<<"metrics">>, Decoded2)), + ?assertNot(maps:is_key(<<"node_metrics">>, Decoded2)), %% send an message to emqx and the message should be forwarded to the HTTP server Body = <<"my msg">>, @@ -942,16 +945,13 @@ t_metrics(Config) -> emqx_json:decode(Bridge3Str, [return_maps]) ), - %% check for non-empty metrics when listing all bridges + %% check that metrics isn't returned when listing all bridges {ok, 200, BridgesStr} = request(get, uri(["bridges"]), []), - ?assertMatch( - [ - #{ - <<"metrics">> := #{<<"success">> := _}, - <<"node_metrics">> := [_ | _] - } - ], - emqx_json:decode(BridgesStr, [return_maps]) + ?assert( + lists:all( + fun(E) -> not maps:is_key(<<"metrics">>, E) end, + emqx_json:decode(BridgesStr, [return_maps]) + ) ), ok. diff --git a/changes/ce/fix-10026.en.md b/changes/ce/fix-10026.en.md new file mode 100644 index 000000000..f1ff11460 --- /dev/null +++ b/changes/ce/fix-10026.en.md @@ -0,0 +1 @@ +Metrics are now only exposed via the /bridges/:id/metrics endpoint. Metrics are no longer returned in other API operations such as getting the list of all bridges, or in the response when a bridge has been created. diff --git a/changes/ce/fix-10026.zh.md b/changes/ce/fix-10026.zh.md new file mode 100644 index 000000000..b132bcb07 --- /dev/null +++ b/changes/ce/fix-10026.zh.md @@ -0,0 +1 @@ +现在只有显式调用 `/bridges/:id/metrics` 接口时才可以获得指标数据,而其他 API 接口将不再返回相关数据。 diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl index 9e03aca4a..1d6ecce7d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl @@ -41,9 +41,7 @@ conn_bridge_examples(Method) -> } ]. -values(get, Type) -> - maps:merge(values(post, Type), ?METRICS_EXAMPLE); -values(post, Type) -> +values(_Method, Type) -> #{ enable => true, type => Type, @@ -65,9 +63,7 @@ values(post, Type) -> query_mode => async, max_queue_bytes => ?DEFAULT_QUEUE_SIZE } - }; -values(put, Type) -> - values(post, Type). + }. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl index 066b873ce..e55be61e5 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl @@ -37,9 +37,7 @@ conn_bridge_examples(Method) -> } ]. -values(get) -> - maps:merge(values(post), ?METRICS_EXAMPLE); -values(post) -> +values(_Method) -> #{ enable => true, type => dynamo, @@ -60,9 +58,7 @@ values(post) -> query_mode => sync, max_queue_bytes => ?DEFAULT_QUEUE_SIZE } - }; -values(put) -> - values(post). + }. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl index 352a7163a..180640d65 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_gcp_pubsub.erl @@ -4,7 +4,6 @@ -module(emqx_ee_bridge_gcp_pubsub). --include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -146,9 +145,7 @@ conn_bridge_examples(Method) -> } ]. -values(get) -> - maps:merge(values(post), ?METRICS_EXAMPLE); -values(post) -> +values(_Method) -> #{ pubsub_topic => <<"mytopic">>, service_account_json => @@ -176,9 +173,7 @@ values(post) -> <<"https://oauth2.googleapis.com/token">>, type => <<"service_account">> } - }; -values(put) -> - values(post). + }. %%------------------------------------------------------------------------------------------------- %% Helper fns diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl index 6e0c711b2..13a70e7c7 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstreamdb.erl @@ -5,7 +5,6 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include_lib("emqx_bridge/include/emqx_bridge.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -33,9 +32,7 @@ conn_bridge_examples(Method) -> } ]. -values(get) -> - maps:merge(values(post), ?METRICS_EXAMPLE); -values(post) -> +values(_Method) -> #{ type => hstreamdb, name => <<"demo">>, @@ -44,9 +41,7 @@ values(post) -> direction => egress, local_topic => <<"local/topic/#">>, payload => <<"${payload}">> - }; -values(put) -> - values(post). + }. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index 14f53b5e7..7cf7ea55e 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -3,7 +3,6 @@ %%-------------------------------------------------------------------- -module(emqx_ee_bridge_influxdb). --include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -46,7 +45,7 @@ conn_bridge_examples(Method) -> ]. values(Protocol, get) -> - maps:merge(values(Protocol, post), ?METRICS_EXAMPLE); + values(Protocol, post); values("influxdb_api_v2", post) -> SupportUint = <<"uint_value=${payload.uint_key}u,">>, TypeOpts = #{ diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 8e9ff9628..30a58e4e0 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -3,7 +3,6 @@ %%-------------------------------------------------------------------- -module(emqx_ee_bridge_kafka). --include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("emqx_connector/include/emqx_connector.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -55,7 +54,7 @@ conn_bridge_examples(Method) -> ]. values({get, KafkaType}) -> - maps:merge(values({post, KafkaType}), ?METRICS_EXAMPLE); + values({post, KafkaType}); values({post, KafkaType}) -> maps:merge(values(common_config), values(KafkaType)); values({put, KafkaType}) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl index 8312c081c..bc450f39b 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mongodb.erl @@ -5,7 +5,6 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include_lib("emqx_bridge/include/emqx_bridge.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -156,9 +155,6 @@ values(common, MongoType, Method, TypeOpts) -> Vals0 = maps:merge(MethodVals, Common), maps:merge(Vals0, TypeOpts). -method_values(MongoType, get) -> - Vals = method_values(MongoType, post), - maps:merge(?METRICS_EXAMPLE, Vals); method_values(MongoType, _) -> ConnectorType = case MongoType of diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl index fd4d9bdd9..eed4172ab 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_mysql.erl @@ -5,7 +5,6 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -39,9 +38,7 @@ conn_bridge_examples(Method) -> } ]. -values(get) -> - maps:merge(values(post), ?METRICS_EXAMPLE); -values(post) -> +values(_Method) -> #{ enable => true, type => mysql, @@ -62,9 +59,7 @@ values(post) -> query_mode => async, max_queue_bytes => ?DEFAULT_QUEUE_SIZE } - }; -values(put) -> - values(post). + }. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl index b592197f9..46132bd99 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_pgsql.erl @@ -5,7 +5,6 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -41,9 +40,7 @@ conn_bridge_examples(Method) -> } ]. -values(get, Type) -> - maps:merge(values(post, Type), ?METRICS_EXAMPLE); -values(post, Type) -> +values(_Method, Type) -> #{ enable => true, type => Type, @@ -64,9 +61,7 @@ values(post, Type) -> query_mode => async, max_queue_bytes => ?DEFAULT_QUEUE_SIZE } - }; -values(put, Type) -> - values(post, Type). + }. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl index 18822ba11..fa6958b6d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -3,7 +3,6 @@ %%-------------------------------------------------------------------- -module(emqx_ee_bridge_redis). --include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). @@ -46,7 +45,7 @@ conn_bridge_examples(Method) -> ]. values(Protocol, get) -> - maps:merge(values(Protocol, post), ?METRICS_EXAMPLE); + values(Protocol, post); values("single", post) -> SpecificOpts = #{ server => <<"127.0.0.1:6379">>, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl index 35e81efa3..b72d79955 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_tdengine.erl @@ -5,7 +5,6 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -40,9 +39,7 @@ conn_bridge_examples(Method) -> } ]. -values(get) -> - maps:merge(values(post), ?METRICS_EXAMPLE); -values(post) -> +values(_Method) -> #{ enable => true, type => tdengine, @@ -63,9 +60,7 @@ values(post) -> query_mode => sync, max_queue_bytes => ?DEFAULT_QUEUE_SIZE } - }; -values(put) -> - values(post). + }. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl index 4eeebfaf8..5431cbb03 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -210,8 +210,7 @@ t_check_values(_Config) -> lists:foreach( fun(Method) -> lists:foreach( - fun({RedisType, #{value := Value0}}) -> - Value = maps:without(maps:keys(?METRICS_EXAMPLE), Value0), + fun({RedisType, #{value := Value}}) -> MethodBin = atom_to_binary(Method), Type = string:slice(RedisType, length("redis_")), RefName = binary_to_list(<>), From 6f71898546977403f4354a1ac025b0d2b1cd73a7 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 20 Mar 2023 12:48:41 +0100 Subject: [PATCH 03/15] fix: upgrade esockd from 5.9.4 to 5.9.6 changes in esockd are * 5.9.5: added API to retrieve SNI * 5.9.6: avoid error log if socket is closed before sending proxy protocol headers --- apps/emqx/rebar.config | 2 +- changes/ce/fix-10174.en.md | 2 ++ changes/ce/fix-10174.zh.md | 2 ++ mix.exs | 2 +- rebar.config | 2 +- 5 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 changes/ce/fix-10174.en.md create mode 100644 changes/ce/fix-10174.zh.md diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 229979f6c..bb66280f0 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -26,7 +26,7 @@ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.5"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.37.0"}}}, diff --git a/changes/ce/fix-10174.en.md b/changes/ce/fix-10174.en.md new file mode 100644 index 000000000..213af19da --- /dev/null +++ b/changes/ce/fix-10174.en.md @@ -0,0 +1,2 @@ +Upgrade library `esockd` from 5.9.4 to 5.9.6. +Fix an unnecessary error level logging when a connection is closed before proxy protocol header is sent by the proxy. diff --git a/changes/ce/fix-10174.zh.md b/changes/ce/fix-10174.zh.md new file mode 100644 index 000000000..435056280 --- /dev/null +++ b/changes/ce/fix-10174.zh.md @@ -0,0 +1,2 @@ +依赖库 `esockd` 从 5.9.4 升级到 5.9.6。 +修复了一个不必要的错误日志。如果连接在 proxy protocol 包头还没有发送前就关闭了, 则不打印错误日志。 diff --git a/mix.exs b/mix.exs index 42354f8dc..c4677dd1d 100644 --- a/mix.exs +++ b/mix.exs @@ -52,7 +52,7 @@ defmodule EMQXUmbrella.MixProject do {:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true}, {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true}, - {:esockd, github: "emqx/esockd", tag: "5.9.4", override: true}, + {:esockd, github: "emqx/esockd", tag: "5.9.6", override: true}, {:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true}, {:ekka, github: "emqx/ekka", tag: "0.14.5", override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true}, diff --git a/rebar.config b/rebar.config index 5ce9138ce..62fadbe51 100644 --- a/rebar.config +++ b/rebar.config @@ -54,7 +54,7 @@ , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}} - , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}} , {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.5"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}} From d2c5cbbcaaa3063358ca46bfd8d6eaf4f01e2ab3 Mon Sep 17 00:00:00 2001 From: Ingmar Delsink Date: Tue, 21 Mar 2023 15:21:08 +0100 Subject: [PATCH 04/15] fix(helm): helm chart extraVolumeMounts and extraVolumes indentation --- .../emqx-enterprise/templates/StatefulSet.yaml | 12 ++++++------ deploy/charts/emqx/templates/StatefulSet.yaml | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/deploy/charts/emqx-enterprise/templates/StatefulSet.yaml b/deploy/charts/emqx-enterprise/templates/StatefulSet.yaml index 00751aceb..3e9e39f2c 100644 --- a/deploy/charts/emqx-enterprise/templates/StatefulSet.yaml +++ b/deploy/charts/emqx-enterprise/templates/StatefulSet.yaml @@ -74,9 +74,9 @@ spec: secret: secretName: {{ .Values.emqxLicenseSecretName }} {{- end }} - {{- if .Values.extraVolumes }} - {{- toYaml .Values.extraVolumes | nindent 8 }} - {{- end }} + {{- if .Values.extraVolumes }} + {{- toYaml .Values.extraVolumes | nindent 6 }} + {{- end }} {{- if .Values.podSecurityContext.enabled }} securityContext: {{- omit .Values.podSecurityContext "enabled" | toYaml | nindent 8 }} {{- end }} @@ -141,9 +141,9 @@ spec: subPath: "emqx.lic" readOnly: true {{- end }} - {{- if .Values.extraVolumeMounts }} - {{- toYaml .Values.extraVolumeMounts | nindent 12 }} - {{- end }} + {{- if .Values.extraVolumeMounts }} + {{- toYaml .Values.extraVolumeMounts | nindent 10 }} + {{- end }} readinessProbe: httpGet: path: /status diff --git a/deploy/charts/emqx/templates/StatefulSet.yaml b/deploy/charts/emqx/templates/StatefulSet.yaml index 00751aceb..3e9e39f2c 100644 --- a/deploy/charts/emqx/templates/StatefulSet.yaml +++ b/deploy/charts/emqx/templates/StatefulSet.yaml @@ -74,9 +74,9 @@ spec: secret: secretName: {{ .Values.emqxLicenseSecretName }} {{- end }} - {{- if .Values.extraVolumes }} - {{- toYaml .Values.extraVolumes | nindent 8 }} - {{- end }} + {{- if .Values.extraVolumes }} + {{- toYaml .Values.extraVolumes | nindent 6 }} + {{- end }} {{- if .Values.podSecurityContext.enabled }} securityContext: {{- omit .Values.podSecurityContext "enabled" | toYaml | nindent 8 }} {{- end }} @@ -141,9 +141,9 @@ spec: subPath: "emqx.lic" readOnly: true {{- end }} - {{- if .Values.extraVolumeMounts }} - {{- toYaml .Values.extraVolumeMounts | nindent 12 }} - {{- end }} + {{- if .Values.extraVolumeMounts }} + {{- toYaml .Values.extraVolumeMounts | nindent 10 }} + {{- end }} readinessProbe: httpGet: path: /status From 7e6f52e8fea00d3f2f5553e49ff24dbb7b1a1ff4 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 21 Mar 2023 17:45:58 -0300 Subject: [PATCH 05/15] test: attempt to fix flaky kafka consumer test It might need some time for the metrics to be set. --- .../test/emqx_bridge_impl_kafka_consumer_SUITE.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl index 15b4fbe40..be6494cb2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_consumer_SUITE.erl @@ -1623,7 +1623,11 @@ t_bridge_rule_action_source(Config) -> }, emqx_json:decode(RawPayload, [return_maps]) ), - ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId)), + ?retry( + _Interval = 200, + _NAttempts = 20, + ?assertEqual(1, emqx_resource_metrics:received_get(ResourceId)) + ), ok end ), From 9853d00cadb9ba446c769353d565ceb2c5d54230 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 13 Mar 2023 11:17:53 +0800 Subject: [PATCH 06/15] feat(bridges): integrate RocketMQ into data bridges --- apps/emqx_bridge/src/emqx_bridge.erl | 3 +- .../i18n/emqx_ee_bridge_rocketmq.conf | 70 ++++ lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 17 +- .../src/emqx_ee_bridge_rocketmq.erl | 120 ++++++ .../i18n/emqx_ee_connector_rocketmq.conf | 66 ++++ lib-ee/emqx_ee_connector/rebar.config | 1 + .../src/emqx_ee_connector.app.src | 3 +- .../src/emqx_ee_connector_rocketmq.erl | 342 ++++++++++++++++++ 8 files changed, 617 insertions(+), 5 deletions(-) create mode 100644 lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_rocketmq.conf create mode 100644 lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl create mode 100644 lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_rocketmq.conf create mode 100644 lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 98ce6a8b0..087c72dc3 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -67,7 +67,8 @@ T == timescale; T == matrix; T == tdengine; - T == dynamo + T == dynamo; + T == rocketmq ). load() -> diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_rocketmq.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_rocketmq.conf new file mode 100644 index 000000000..2e33e6c07 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_rocketmq.conf @@ -0,0 +1,70 @@ +emqx_ee_bridge_rocketmq { + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to RocketMQ. All MQTT `PUBLISH` messages with the topic +matching the `local_topic` will be forwarded.
+NOTE: if the bridge is used as a rule action, `local_topic` should be left empty otherwise the messages will be duplicated.""" + zh: """发送到 'local_topic' 的消息都会转发到 RocketMQ。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + + template { + desc { + en: """Template, the default value is empty. When this value is empty the whole message will be stored in the RocketMQ""" + zh: """模板, 默认为空,为空时将会将整个消息转发给 RocketMQ""" + } + label { + en: "Template" + zh: "模板" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用桥接" + } + } + + desc_config { + desc { + en: """Configuration for a RocketMQ bridge.""" + zh: """RocketMQ 桥接配置""" + } + label: { + en: "RocketMQ Bridge Configuration" + zh: "RocketMQ 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + + desc_name { + desc { + en: """Bridge name.""" + zh: """桥接名字""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index ec81b7935..3989c3ab2 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -32,7 +32,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_matrix, Method), ref(emqx_ee_bridge_tdengine, Method), ref(emqx_ee_bridge_clickhouse, Method), - ref(emqx_ee_bridge_dynamo, Method) + ref(emqx_ee_bridge_dynamo, Method), + ref(emqx_ee_bridge_rocketmq, Method) ]. schema_modules() -> @@ -49,7 +50,8 @@ schema_modules() -> emqx_ee_bridge_matrix, emqx_ee_bridge_tdengine, emqx_ee_bridge_clickhouse, - emqx_ee_bridge_dynamo + emqx_ee_bridge_dynamo, + emqx_ee_bridge_rocketmq ]. examples(Method) -> @@ -85,7 +87,8 @@ resource_type(timescale) -> emqx_connector_pgsql; resource_type(matrix) -> emqx_connector_pgsql; resource_type(tdengine) -> emqx_ee_connector_tdengine; resource_type(clickhouse) -> emqx_ee_connector_clickhouse; -resource_type(dynamo) -> emqx_ee_connector_dynamo. +resource_type(dynamo) -> emqx_ee_connector_dynamo; +resource_type(rocketmq) -> emqx_ee_connector_rocketmq. fields(bridges) -> [ @@ -128,6 +131,14 @@ fields(bridges) -> desc => <<"Dynamo Bridge Config">>, required => false } + )}, + {rocketmq, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_rocketmq, "config")), + #{ + desc => <<"RocketMQ Bridge Config">>, + required => false + } )} ] ++ kafka_structs() ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs(). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl new file mode 100644 index 000000000..d81ffc54c --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl @@ -0,0 +1,120 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_rocketmq). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1, + values/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_TEMPLATE, <<>>). +-define(DEFFAULT_REQ_TIMEOUT, <<"15s">>). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"rocketmq">> => #{ + summary => <<"RocketMQ Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + enable => true, + type => rocketmq, + name => <<"foo">>, + server => <<"127.0.0.1:9876">>, + topic => <<"TopicTest">>, + template => ?DEFAULT_TEMPLATE, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 1, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => sync, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + } + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_rocketmq". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {template, + mk( + binary(), + #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("local_topic"), required => false} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{<<"request_timeout">> => ?DEFFAULT_REQ_TIMEOUT}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ + (emqx_ee_connector_rocketmq:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); +fields("creation_opts") -> + emqx_resource_schema:fields("creation_opts_sync_only"); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for RocketMQ using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- + +type_field() -> + {type, mk(enum([rocketmq]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_rocketmq.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_rocketmq.conf new file mode 100644 index 000000000..d4a610212 --- /dev/null +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_rocketmq.conf @@ -0,0 +1,66 @@ +emqx_ee_connector_rocketmq { + + server { + desc { + en: """ +The IPv4 or IPv6 address or the hostname to connect to.
+A host entry has the following form: `Host[:Port]`.
+The RocketMQ default port 9876 is used if `[:Port]` is not specified. +""" + zh: """ +将要连接的 IPv4 或 IPv6 地址,或者主机名。
+主机名具有以下形式:`Host[:Port]`。
+如果未指定 `[:Port]`,则使用 RocketMQ 默认端口 9876。 +""" + } + label: { + en: "Server Host" + zh: "服务器地址" + } + } + + topic { + desc { + en: """RocketMQ Topic""" + zh: """RocketMQ 主题""" + } + label: { + en: "RocketMQ Topic" + zh: "RocketMQ 主题" + } + } + + refresh_interval { + desc { + en: """RocketMQ Topic Route Refresh Interval.""" + zh: """RocketMQ 主题路由更新间隔。""" + } + label: { + en: "Topic Route Refresh Interval" + zh: "主题路由更新间隔" + } + } + + send_buffer { + desc { + en: """The socket send buffer size of the RocketMQ driver client.""" + zh: """RocketMQ 驱动的套字节发送消息的缓冲区大小""" + } + label: { + en: "Send Buffer Size" + zh: "发送消息的缓冲区大小" + } + } + + security_token { + desc { + en: """RocketMQ Server Security Token""" + zh: """RocketMQ 服务器安全令牌""" + } + label: { + en: "Security Token" + zh: "安全令牌" + } + } + +} diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index 76f6ccfba..96b3df6a3 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -5,6 +5,7 @@ {tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.5"}}}, {clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.2"}}}, {erlcloud, {git, "https://github.com/emqx/erlcloud.git", {tag,"3.5.16-emqx-1"}}}, + {rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.1"}}}, {emqx, {path, "../../apps/emqx"}} ]}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 6f40f7158..d8921198c 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -11,7 +11,8 @@ wolff, brod, clickhouse, - erlcloud + erlcloud, + rocketmq ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl new file mode 100644 index 000000000..d41f83e1d --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -0,0 +1,342 @@ +%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_rocketmq). + +-behaviour(emqx_resource). + +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1]). + +%% `emqx_resource' API +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-define(ROCKETMQ_HOST_OPTIONS, #{ + default_port => 9876 +}). + +-ifdef(TEST). +-export([execute/2]). +-endif. + +%%===================================================================== +%% Hocon schema +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {server, server()}, + {topic, + mk( + binary(), + #{default => <<"TopicTest">>, desc => ?DESC(topic)} + )}, + {refresh_interval, + mk( + emqx_schema:duration(), + #{default => <<"3s">>, desc => ?DESC(refresh_interval)} + )}, + {send_buffer, + mk( + emqx_schema:bytesize(), + #{default => <<"1024KB">>, desc => ?DESC(send_buffer)} + )}, + {security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})} + | relational_fields() + ]. + +add_default_username(Fields) -> + lists:map( + fun + ({username, OrigUsernameFn}) -> + {username, add_default_fn(OrigUsernameFn, <<"">>)}; + (Field) -> + Field + end, + Fields + ). + +add_default_fn(OrigFn, Default) -> + fun + (default) -> Default; + (Field) -> OrigFn(Field) + end. + +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). + +relational_fields() -> + Fields = [username, password, auto_reconnect], + Values = lists:filter( + fun({E, _}) -> lists:member(E, Fields) end, + emqx_connector_schema_lib:relational_db_fields() + ), + add_default_username(Values). + +%%======================================================================================== +%% `emqx_resource' API +%%======================================================================================== + +callback_mode() -> always_sync. + +is_buffer_supported() -> false. + +on_start( + InstanceId, + #{server := Server, topic := Topic} = Config1 +) -> + ?SLOG(info, #{ + msg => "starting_rocketmq_connector", + connector => InstanceId, + config => redact(Config1) + }), + Config = maps:merge(default_security_info(), Config1), + {Host, Port} = emqx_schema:parse_server(Server, ?ROCKETMQ_HOST_OPTIONS), + + Server1 = [{Host, Port}], + ClientId = client_id(InstanceId), + ClientCfg = #{acl_info => #{}}, + + TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), + ProducerOpts = make_producer_opts(Config), + Templates = parse_template(Config), + ProducersMapPID = create_producers_map(ClientId), + State = #{ + client_id => ClientId, + topic_tokens => TopicTks, + config => Config, + templates => Templates, + producers_map_pid => ProducersMapPID, + producers_opts => ProducerOpts + }, + + case rocketmq:ensure_supervised_client(ClientId, Server1, ClientCfg) of + {ok, _Pid} -> + {ok, State}; + {error, _Reason} = Error -> + ?tp( + rocketmq_connector_start_failed, + #{error => _Reason} + ), + Error + end. + +on_stop(InstanceId, #{client_id := ClientId, producers_map_pid := Pid} = _State) -> + ?SLOG(info, #{ + msg => "stopping_rocketmq_connector", + connector => InstanceId + }), + Pid ! ok, + ok = rocketmq:stop_and_delete_supervised_client(ClientId). + +on_query(InstanceId, Query, State) -> + do_query(InstanceId, Query, send_sync, State). + +%% We only support batch inserts and all messages must have the same topic +on_batch_query(InstanceId, [{send_message, _Msg} | _] = Query, State) -> + do_query(InstanceId, Query, batch_send_sync, State); +on_batch_query(_InstanceId, Query, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +on_get_status(_InstanceId, #{client_id := ClientId}) -> + case rocketmq_client_sup:find_client(ClientId) of + {ok, _Pid} -> + connected; + _ -> + connecting + end. + +%%======================================================================================== +%% Helper fns +%%======================================================================================== + +do_query( + InstanceId, + Query, + QueryFunc, + #{ + templates := Templates, + client_id := ClientId, + topic_tokens := TopicTks, + producers_opts := ProducerOpts, + config := #{topic := RawTopic, resource_opts := #{request_timeout := RequestTimeout}} + } = State +) -> + ?TRACE( + "QUERY", + "rocketmq_connector_received", + #{connector => InstanceId, query => Query, state => State} + ), + + TopicKey = get_topic_key(Query, RawTopic, TopicTks), + Data = apply_template(Query, Templates), + + Result = safe_do_produce( + InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout + ), + case Result of + {error, Reason} -> + ?tp( + rocketmq_connector_query_return, + #{error => Reason} + ), + ?SLOG(error, #{ + msg => "rocketmq_connector_do_query_failed", + connector => InstanceId, + query => Query, + reason => Reason + }), + Result; + _ -> + ?tp( + rocketmq_connector_query_return, + #{result => Result} + ), + Result + end. + +safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) -> + try + Producers = get_producers(ClientId, TopicKey, ProducerOpts), + produce(InstanceId, QueryFunc, Producers, Data, RequestTimeout) + catch + _Type:Reason -> + {error, {unrecoverable_error, Reason}} + end. + +produce(_InstanceId, QueryFunc, Producers, Data, RequestTimeout) -> + rocketmq:QueryFunc(Producers, Data, RequestTimeout). + +parse_template(Config) -> + Templates = + case maps:get(template, Config, undefined) of + undefined -> #{}; + <<>> -> #{}; + Template -> #{send_message => Template} + end, + + parse_template(maps:to_list(Templates), #{}). + +parse_template([{Key, H} | T], Templates) -> + ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), + parse_template( + T, + Templates#{Key => ParamsTks} + ); +parse_template([], Templates) -> + Templates. + +get_topic_key({_, Msg}, RawTopic, TopicTks) -> + {RawTopic, emqx_plugin_libs_rule:proc_tmpl(TopicTks, Msg)}; +get_topic_key([Query | _], RawTopic, TopicTks) -> + get_topic_key(Query, RawTopic, TopicTks). + +apply_template({Key, Msg} = _Req, Templates) -> + case maps:get(Key, Templates, undefined) of + undefined -> + emqx_json:encode(Msg); + Template -> + emqx_plugin_libs_rule:proc_tmpl(Template, Msg) + end; +apply_template([{Key, _} | _] = Reqs, Templates) -> + case maps:get(Key, Templates, undefined) of + undefined -> + [emqx_json:encode(Msg) || {_, Msg} <- Reqs]; + Template -> + [emqx_plugin_libs_rule:proc_tmpl(Template, Msg) || {_, Msg} <- Reqs] + end. + +client_id(InstanceId) -> + Name = emqx_resource_manager:manager_id_to_resource_id(InstanceId), + erlang:binary_to_atom(Name, utf8). + +redact(Msg) -> + emqx_misc:redact(Msg, fun is_sensitive_key/1). + +is_sensitive_key(security_token) -> + true; +is_sensitive_key(_) -> + false. + +make_producer_opts( + #{ + username := Username, + password := Password, + security_token := SecurityToken, + send_buffer := SendBuff, + refresh_interval := RefreshInterval + } +) -> + ACLInfo = acl_info(Username, Password, SecurityToken), + #{ + tcp_opts => [{sndbuf, SendBuff}], + ref_topic_route_interval => RefreshInterval, + acl_info => ACLInfo + }. + +acl_info(<<>>, <<>>, <<>>) -> + #{}; +acl_info(Username, Password, <<>>) when is_binary(Username), is_binary(Password) -> + #{ + access_key => Username, + secret_key => Password + }; +acl_info(Username, Password, SecurityToken) when + is_binary(Username), is_binary(Password), is_binary(SecurityToken) +-> + #{ + access_key => Username, + secret_key => Password, + security_token => SecurityToken + }; +acl_info(_, _, _) -> + #{}. + +create_producers_map(ClientId) -> + erlang:spawn(fun() -> + case ets:whereis(ClientId) of + undefined -> + _ = ets:new(ClientId, [public, named_table]), + ok; + _ -> + ok + end, + receive + _Msg -> + ok + end + end). + +get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) -> + case ets:lookup(ClientId, TopicKey) of + [{_, Producers0}] -> + Producers0; + _ -> + ProducerGroup = iolist_to_binary([atom_to_list(ClientId), "_", Topic1]), + {ok, Producers0} = rocketmq:ensure_supervised_producers( + ClientId, ProducerGroup, Topic1, ProducerOpts + ), + ets:insert(ClientId, {TopicKey, Producers0}), + Producers0 + end. + +default_security_info() -> + #{username => <<>>, password => <<>>, security_token => <<>>}. From 4ad3579966945a40750e9ee2b3c87fe4148a6d89 Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 20 Mar 2023 11:14:25 +0800 Subject: [PATCH 07/15] test(bridges): add test suite for RocketMQ --- .../docker-compose-rocketmq.yaml | 34 +++ .../docker-compose-toxiproxy.yaml | 1 + .../rocketmq/conf/broker.conf | 22 ++ .../rocketmq/logs/.gitkeep | 0 .../rocketmq/store/.gitkeep | 0 .ci/docker-compose-file/toxiproxy.json | 6 + lib-ee/emqx_ee_bridge/docker-ct | 1 + .../test/emqx_ee_bridge_rocketmq_SUITE.erl | 267 ++++++++++++++++++ .../src/emqx_ee_connector_rocketmq.erl | 4 - scripts/ct/run.sh | 3 + 10 files changed, 334 insertions(+), 4 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-rocketmq.yaml create mode 100644 .ci/docker-compose-file/rocketmq/conf/broker.conf create mode 100644 .ci/docker-compose-file/rocketmq/logs/.gitkeep create mode 100644 .ci/docker-compose-file/rocketmq/store/.gitkeep create mode 100644 lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl diff --git a/.ci/docker-compose-file/docker-compose-rocketmq.yaml b/.ci/docker-compose-file/docker-compose-rocketmq.yaml new file mode 100644 index 000000000..3c872a7c2 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-rocketmq.yaml @@ -0,0 +1,34 @@ +version: '3.9' + +services: + mqnamesrv: + image: apache/rocketmq:4.9.4 + container_name: rocketmq_namesrv +# ports: +# - 9876:9876 + volumes: + - ./rocketmq/logs:/opt/logs + - ./rocketmq/store:/opt/store + command: ./mqnamesrv + networks: + - emqx_bridge + + mqbroker: + image: apache/rocketmq:4.9.4 + container_name: rocketmq_broker +# ports: +# - 10909:10909 +# - 10911:10911 + volumes: + - ./rocketmq/logs:/opt/logs + - ./rocketmq/store:/opt/store + - ./rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf + environment: + NAMESRV_ADDR: "rocketmq_namesrv:9876" + JAVA_OPTS: " -Duser.home=/opt" + JAVA_OPT_EXT: "-server -Xms1024m -Xmx1024m -Xmn1024m" + command: ./mqbroker -c /etc/rocketmq/broker.conf + depends_on: + - mqnamesrv + networks: + - emqx_bridge diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index 16f18b6c2..24f1d90b2 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -22,6 +22,7 @@ services: - 15433:5433 - 16041:6041 - 18000:8000 + - 19876:9876 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/rocketmq/conf/broker.conf b/.ci/docker-compose-file/rocketmq/conf/broker.conf new file mode 100644 index 000000000..c343090e4 --- /dev/null +++ b/.ci/docker-compose-file/rocketmq/conf/broker.conf @@ -0,0 +1,22 @@ +brokerClusterName=DefaultCluster +brokerName=broker-a +brokerId=0 + +brokerIP1=rocketmq_broker + +defaultTopicQueueNums=4 +autoCreateTopicEnable=true +autoCreateSubscriptionGroup=true + +listenPort=10911 +deleteWhen=04 + +fileReservedTime=120 +mapedFileSizeCommitLog=1073741824 +mapedFileSizeConsumeQueue=300000 +diskMaxUsedSpaceRatio=100 +maxMessageSize=65536 + +brokerRole=ASYNC_MASTER + +flushDiskType=ASYNC_FLUSH diff --git a/.ci/docker-compose-file/rocketmq/logs/.gitkeep b/.ci/docker-compose-file/rocketmq/logs/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/.ci/docker-compose-file/rocketmq/store/.gitkeep b/.ci/docker-compose-file/rocketmq/store/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 2f8c4341b..e22735091 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -77,5 +77,11 @@ "listen": "0.0.0.0:9295", "upstream": "kafka-1.emqx.net:9295", "enabled": true + }, + { + "name": "rocketmq", + "listen": "0.0.0.0:9876", + "upstream": "rocketmq_namesrv:9876", + "enabled": true } ] diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index ac1728ad2..116bc44ad 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -10,3 +10,4 @@ pgsql tdengine clickhouse dynamo +rocketmq diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl new file mode 100644 index 000000000..cd02b65d0 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_rocketmq_SUITE.erl @@ -0,0 +1,267 @@ +%%-------------------------------------------------------------------- +% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_rocketmq_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +% Bridge defaults +-define(TOPIC, "TopicTest"). +-define(BATCH_SIZE, 10). +-define(PAYLOAD, <<"HELLO">>). + +-define(GET_CONFIG(KEY__, CFG__), proplists:get_value(KEY__, CFG__)). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, with_batch}, + {group, without_batch} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {with_batch, TCs}, + {without_batch, TCs} + ]. + +init_per_group(with_batch, Config0) -> + Config = [{batch_size, ?BATCH_SIZE} | Config0], + common_init(Config); +init_per_group(without_batch, Config0) -> + Config = [{batch_size, 1} | Config0], + common_init(Config); +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok. + +init_per_testcase(_Testcase, Config) -> + delete_bridge(Config), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = snabbkaffe:stop(), + delete_bridge(Config), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +common_init(ConfigT) -> + BridgeType = <<"rocketmq">>, + Host = os:getenv("ROCKETMQ_HOST", "toxiproxy"), + Port = list_to_integer(os:getenv("ROCKETMQ_PORT", "9876")), + + Config0 = [ + {host, Host}, + {port, Port}, + {query_mode, sync}, + {proxy_name, "rocketmq"} + | ConfigT + ], + + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of + true -> + % Setup toxiproxy + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + % Ensure EE bridge module is loaded + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + {Name, RocketMQConf} = rocketmq_config(BridgeType, Config0), + Config = + [ + {rocketmq_config, RocketMQConf}, + {rocketmq_bridge_type, BridgeType}, + {rocketmq_name, Name}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort} + | Config0 + ], + Config; + false -> + case os:getenv("IS_CI") of + false -> + {skip, no_rocketmq}; + _ -> + throw(no_rocketmq) + end + end. + +rocketmq_config(BridgeType, Config) -> + Port = integer_to_list(?GET_CONFIG(port, Config)), + Server = ?GET_CONFIG(host, Config) ++ ":" ++ Port, + Name = atom_to_binary(?MODULE), + BatchSize = ?config(batch_size, Config), + QueryMode = ?config(query_mode, Config), + ConfigString = + io_lib:format( + "bridges.~s.~s {\n" + " enable = true\n" + " server = ~p\n" + " topic = ~p\n" + " resource_opts = {\n" + " request_timeout = 1500ms\n" + " batch_size = ~b\n" + " query_mode = ~s\n" + " }\n" + "}", + [ + BridgeType, + Name, + Server, + ?TOPIC, + BatchSize, + QueryMode + ] + ), + {Name, parse_and_check(ConfigString, BridgeType, Name)}. + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + Config. + +create_bridge(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), + emqx_bridge:create(BridgeType, Name, RocketMQConf). + +delete_bridge(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + emqx_bridge:remove(BridgeType, Name). + +create_bridge_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +send_message(Config, Payload) -> + Name = ?GET_CONFIG(rocketmq_name, Config), + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), + emqx_bridge:send_message(BridgeID, Payload). + +query_resource(Config, Request) -> + Name = ?GET_CONFIG(rocketmq_name, Config), + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource:query(ResourceID, Request, #{timeout => 500}). + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_setup_via_config_and_publish(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + ok. + +t_setup_via_http_api_and_publish(Config) -> + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + RocketMQConf = ?GET_CONFIG(rocketmq_config, Config), + RocketMQConf2 = RocketMQConf#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch( + {ok, _}, + create_bridge_http(RocketMQConf2) + ), + SentData = #{payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertEqual(ok, send_message(Config, SentData)), + #{?snk_kind := rocketmq_connector_query_return}, + 10_000 + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(rocketmq_connector_query_return, Trace0), + ?assertMatch([#{result := ok}], Trace), + ok + end + ), + ok. + +t_get_status(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + Name = ?GET_CONFIG(rocketmq_name, Config), + BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), + ok. + +t_simple_query(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {send_message, #{message => <<"Hello">>}}, + Result = query_resource(Config, Request), + ?assertEqual(ok, Result), + ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index d41f83e1d..84f2e2a89 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -31,10 +31,6 @@ default_port => 9876 }). --ifdef(TEST). --export([execute/2]). --endif. - %%===================================================================== %% Hocon schema roots() -> diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index bf7b2073d..a4eeb366d 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -170,6 +170,9 @@ for dep in ${CT_DEPS}; do dynamo) FILES+=( '.ci/docker-compose-file/docker-compose-dynamo.yaml' ) ;; + rocketmq) + FILES+=( '.ci/docker-compose-file/docker-compose-rocketmq.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 From 17e207cb71dd5247a2718ff0db6c043b1d038b2c Mon Sep 17 00:00:00 2001 From: firest Date: Thu, 16 Mar 2023 22:58:47 +0800 Subject: [PATCH 08/15] chore: fix spellcheck && update changes --- changes/ee/feat-10143.en.md | 1 + changes/ee/feat-10143.zh.md | 1 + scripts/spellcheck/dicts/emqx.txt | 1 + 3 files changed, 3 insertions(+) create mode 100644 changes/ee/feat-10143.en.md create mode 100644 changes/ee/feat-10143.zh.md diff --git a/changes/ee/feat-10143.en.md b/changes/ee/feat-10143.en.md new file mode 100644 index 000000000..67fc13dc2 --- /dev/null +++ b/changes/ee/feat-10143.en.md @@ -0,0 +1 @@ +Add `RocketMQ` data integration bridge. diff --git a/changes/ee/feat-10143.zh.md b/changes/ee/feat-10143.zh.md new file mode 100644 index 000000000..85a13ffa7 --- /dev/null +++ b/changes/ee/feat-10143.zh.md @@ -0,0 +1 @@ +为数据桥接增加 `RocketMQ` 支持。 diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index b027f92ec..b0d663ead 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -271,3 +271,4 @@ nif TDengine clickhouse FormatType +RocketMQ From 934e46307242a952db1b108b296ef496c0ef14cc Mon Sep 17 00:00:00 2001 From: Kinplemelon Date: Wed, 22 Mar 2023 12:57:39 +0800 Subject: [PATCH 09/15] chore(dashboard): change dashboard repo --- scripts/get-dashboard.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/get-dashboard.sh b/scripts/get-dashboard.sh index c3559865f..ace795aa5 100755 --- a/scripts/get-dashboard.sh +++ b/scripts/get-dashboard.sh @@ -20,7 +20,7 @@ case "$VERSION" in esac DASHBOARD_PATH='apps/emqx_dashboard/priv' -DASHBOARD_REPO='emqx-dashboard-web-new' +DASHBOARD_REPO='emqx-dashboard5' DIRECT_DOWNLOAD_URL="https://github.com/emqx/${DASHBOARD_REPO}/releases/download/${VERSION}/${RELEASE_ASSET_FILE}" case $(uname) in From 5d31f85aec0efcfa95f40a0f333fc412ff70c530 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 22 Mar 2023 11:43:25 +0300 Subject: [PATCH 10/15] chore: remove irrelevant changelog entry --- changes/ce/fix-10190.en.md | 1 - 1 file changed, 1 deletion(-) delete mode 100644 changes/ce/fix-10190.en.md diff --git a/changes/ce/fix-10190.en.md b/changes/ce/fix-10190.en.md deleted file mode 100644 index bffd9ca00..000000000 --- a/changes/ce/fix-10190.en.md +++ /dev/null @@ -1 +0,0 @@ -Fix the issue where nodes responses to the list bridges RPC were incorrectly flattened, which caused List Bridges API HTTP handler to crash when there was more than 1 node in the cluster. From 208813330a6f5ea7a997cf104ba9edb0bcadeeea Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 22 Mar 2023 10:59:20 -0300 Subject: [PATCH 11/15] fix: remove metrics from rocketmq example --- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl index d81ffc54c..124e18069 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_rocketmq.erl @@ -39,7 +39,7 @@ conn_bridge_examples(Method) -> ]. values(get) -> - maps:merge(values(post), ?METRICS_EXAMPLE); + values(post); values(post) -> #{ enable => true, From 61cb03b45a427fe361a79edc807d5e1e791de132 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 21 Mar 2023 14:32:50 -0300 Subject: [PATCH 12/15] fix(buffer_worker): change the default `resume_interval` value and expose it as hidden config Also removes the previously added alarm for request timeout. There are situations where having a short request timeout and a long health check interval make sense, so we don't want to alarm the user for those situations. Instead, we automatically attempt to set a reasonable `resume_interval` value. --- .../i18n/emqx_resource_schema_i18n.conf | 15 ++- apps/emqx_resource/include/emqx_resource.hrl | 5 +- .../src/emqx_resource_buffer_worker.erl | 49 ++----- .../src/emqx_resource_manager.erl | 1 - .../src/schema/emqx_resource_schema.erl | 7 + .../test/emqx_resource_SUITE.erl | 126 ------------------ changes/ce/fix-10154.en.md | 7 +- .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 14 +- 8 files changed, 50 insertions(+), 174 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index 2e5cf96e8..eb4f00ac7 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -45,6 +45,17 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise } } + resume_interval { + desc { + en: """The interval at which a resource will retry inflight requests.""" + zh: """资源重试机内请求的间隔时间。""" + } + label { + en: """Resume Interval""" + zh: """复职时间间隔""" + } + } + start_after_created { desc { en: """Whether start the resource right after created.""" @@ -102,8 +113,8 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise request_timeout { desc { - en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired. We recommend setting this timeout to be at least twice the health check interval, so that the buffer has the chance to recover if too many requests get enqueued.""" - zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。我们建议将这个超时设置为健康检查间隔的至少两倍,这样,如果有太多的请求被排队,缓冲区就有机会恢复。""" + en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired.""" + zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。""" } label { en: """Request Expiry""" diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index 8033ed660..be570e694 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -91,10 +91,7 @@ -define(DEFAULT_QUEUE_SIZE, 100 * 1024 * 1024). -define(DEFAULT_QUEUE_SIZE_RAW, <<"100MB">>). -%% Note: this should be greater than the health check timeout; -%% otherwise, if the buffer worker is ever blocked, than all queued -%% requests will basically fail without being attempted. --define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(30)). +-define(DEFAULT_REQUEST_TIMEOUT, timer:seconds(15)). %% count -define(DEFAULT_BATCH_SIZE, 1). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 05622bdd7..648587c25 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -56,8 +56,6 @@ -export([clear_disk_queue_dir/2]). --export([deactivate_bad_request_timeout_alarm/1]). - -elvis([{elvis_style, dont_repeat_yourself, disable}]). -define(COLLECT_REQ_LIMIT, 1000). @@ -203,7 +201,8 @@ init({Id, Index, Opts}) -> RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT), BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME), BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0), - maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval), + DefaultResumeInterval = default_resume_interval(RequestTimeout, HealthCheckInterval), + ResumeInterval = maps:get(resume_interval, Opts, DefaultResumeInterval), Data = #{ id => Id, index => Index, @@ -212,7 +211,7 @@ init({Id, Index, Opts}) -> batch_size => BatchSize, batch_time => BatchTime, queue => Queue, - resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval), + resume_interval => ResumeInterval, tref => undefined }, ?tp(buffer_worker_init, #{id => Id, index => Index}), @@ -1684,38 +1683,16 @@ adjust_batch_time(Id, RequestTimeout, BatchTime0) -> end, BatchTime. -%% The request timeout should be greater than the health check -%% timeout, health timeout defines how often the buffer worker tries -%% to unblock. If request timeout is <= health check timeout and the -%% buffer worker is ever blocked, than all queued requests will -%% basically fail without being attempted. --spec maybe_toggle_bad_request_timeout_alarm( - resource_id(), request_timeout(), health_check_interval() -) -> ok. -maybe_toggle_bad_request_timeout_alarm(Id, _RequestTimeout = infinity, _HealthCheckInterval) -> - deactivate_bad_request_timeout_alarm(Id), - ok; -maybe_toggle_bad_request_timeout_alarm(Id, RequestTimeout, HealthCheckInterval) -> - case RequestTimeout > HealthCheckInterval of - true -> - deactivate_bad_request_timeout_alarm(Id), - ok; - false -> - _ = emqx_alarm:activate( - bad_request_timeout_alarm_id(Id), - #{resource_id => Id, reason => bad_request_timeout}, - <<"Request timeout should be greater than health check timeout: ", Id/binary>> - ), - ok - end. - --spec deactivate_bad_request_timeout_alarm(resource_id()) -> ok. -deactivate_bad_request_timeout_alarm(Id) -> - _ = emqx_alarm:ensure_deactivated(bad_request_timeout_alarm_id(Id)), - ok. - -bad_request_timeout_alarm_id(Id) -> - <<"bad_request_timeout:", Id/binary>>. +%% The request timeout should be greater than the resume interval, as +%% it defines how often the buffer worker tries to unblock. If request +%% timeout is <= resume interval and the buffer worker is ever +%% blocked, than all queued requests will basically fail without being +%% attempted. +-spec default_resume_interval(request_timeout(), health_check_interval()) -> timer:time(). +default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) -> + max(1, HealthCheckInterval); +default_resume_interval(RequestTimeout, HealthCheckInterval) -> + max(1, min(HealthCheckInterval, RequestTimeout div 3)). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 2bdc67a4d..40f9fe1ab 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -506,7 +506,6 @@ handle_remove_event(From, ClearMetrics, Data) -> true -> ok = emqx_metrics_worker:clear_metrics(?RES_METRICS, Data#data.id); false -> ok end, - emqx_resource_buffer_worker:deactivate_bad_request_timeout_alarm(Data#data.id), {stop_and_reply, {shutdown, removed}, [{reply, From, ok}]}. start_resource(Data, From) -> diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index fdd65bc3c..b9ed176fe 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -55,6 +55,7 @@ fields("creation_opts") -> [ {worker_pool_size, fun worker_pool_size/1}, {health_check_interval, fun health_check_interval/1}, + {resume_interval, fun resume_interval/1}, {start_after_created, fun start_after_created/1}, {start_timeout, fun start_timeout/1}, {auto_restart_interval, fun auto_restart_interval/1}, @@ -81,6 +82,12 @@ worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(required) -> false; worker_pool_size(_) -> undefined. +resume_interval(type) -> emqx_schema:duration_ms(); +resume_interval(hidden) -> true; +resume_interval(desc) -> ?DESC("resume_interval"); +resume_interval(required) -> false; +resume_interval(_) -> undefined. + health_check_interval(type) -> emqx_schema:duration_ms(); health_check_interval(desc) -> ?DESC("health_check_interval"); health_check_interval(default) -> ?HEALTHCHECK_INTERVAL_RAW; diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 25f4a6d77..ff7e1d347 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -2554,132 +2554,6 @@ do_t_recursive_flush() -> ), ok. -%% Check that we raise an alarm if a bad request timeout config is -%% issued. Request timeout should be greater than health check -%% timeout. -t_bad_request_timeout_alarm(_Config) -> - emqx_connector_demo:set_callback_mode(async_if_possible), - - %% 1) Same values. - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{ - query_mode => async, - request_timeout => 1_000, - health_check_interval => 1_000, - worker_pool_size => 2 - } - ), - ExpectedMessage = - <<"Request timeout should be greater than health check timeout: ", ?ID/binary>>, - ?assertMatch( - [ - #{ - message := ExpectedMessage, - details := #{reason := bad_request_timeout, resource_id := ?ID}, - deactivate_at := infinity - } - ], - emqx_alarm:get_alarms(activated) - ), - %% The unexpected termination of one of the buffer workers should - %% not turn the alarm off. - [Pid, _ | _] = emqx_resource_buffer_worker_sup:worker_pids(?ID), - MRef = monitor(process, Pid), - exit(Pid, kill), - receive - {'DOWN', MRef, process, Pid, _} -> - ok - after 300 -> - ct:fail("buffer worker didn't die") - end, - ?assertMatch( - [ - #{ - message := ExpectedMessage, - details := #{reason := bad_request_timeout, resource_id := ?ID}, - deactivate_at := infinity - } - ], - emqx_alarm:get_alarms(activated) - ), - ok = emqx_resource:remove_local(?ID), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - - %% 2) Request timeout < health check interval. - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{ - query_mode => async, - request_timeout => 999, - health_check_interval => 1_000, - worker_pool_size => 2 - } - ), - ?assertMatch( - [ - #{ - message := ExpectedMessage, - details := #{reason := bad_request_timeout, resource_id := ?ID}, - deactivate_at := infinity - } - ], - emqx_alarm:get_alarms(activated) - ), - ok = emqx_resource:remove_local(?ID), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - - %% 2) Request timeout < health check interval. - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{ - query_mode => async, - request_timeout => 999, - health_check_interval => 1_000, - worker_pool_size => 2 - } - ), - ?assertMatch( - [ - #{ - message := ExpectedMessage, - details := #{reason := bad_request_timeout, resource_id := ?ID}, - deactivate_at := infinity - } - ], - emqx_alarm:get_alarms(activated) - ), - ok = emqx_resource:remove_local(?ID), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - - %% 3) Request timeout > health check interval. - {ok, _} = emqx_resource:create( - ?ID, - ?DEFAULT_RESOURCE_GROUP, - ?TEST_RESOURCE, - #{name => test_resource}, - #{ - query_mode => async, - request_timeout => 1_001, - health_check_interval => 1_000, - worker_pool_size => 2 - } - ), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - ok = emqx_resource:remove_local(?ID), - ?assertEqual([], emqx_alarm:get_alarms(activated)), - - ok. - %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ diff --git a/changes/ce/fix-10154.en.md b/changes/ce/fix-10154.en.md index 83a729360..24bc4bae1 100644 --- a/changes/ce/fix-10154.en.md +++ b/changes/ce/fix-10154.en.md @@ -1,7 +1,8 @@ -Change the default `request_timeout` for bridges and connectors to be -twice the default `health_check_interval`. +Change the default `resume_interval` for bridges and connectors to be +the minimum of `health_check_interval` and `request_timeout / 3`. +Also exposes it as a hidden configuration to allow fine tuning. -Before this change, the default values for those two options meant +Before this change, the default values for `resume_interval` meant that, if a buffer ever got blocked due to resource errors or high message volumes, then, by the time the buffer would try to resume its normal operations, almost all requests would have timed out. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 452b7a4d2..55dfa5555 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -520,6 +520,7 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> #{measurements := #{gauge_set := ExpectedValue}} -> ok; #{measurements := #{gauge_set := Value}} -> + ct:pal("events: ~p", [Events]), ct:fail( "gauge ~p didn't reach expected value ~p; last value: ~p", [GaugeName, ExpectedValue, Value] @@ -972,7 +973,13 @@ t_publish_econnrefused(Config) -> ResourceId = ?config(resource_id, Config), %% set pipelining to 1 so that one of the 2 requests is `pending' %% in ehttpc. - {ok, _} = create_bridge(Config, #{<<"pipelining">> => 1}), + {ok, _} = create_bridge( + Config, + #{ + <<"pipelining">> => 1, + <<"resource_opts">> => #{<<"resume_interval">> => <<"15s">>} + } + ), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), assert_empty_metrics(ResourceId), @@ -986,7 +993,10 @@ t_publish_timeout(Config) -> %% requests are done separately. {ok, _} = create_bridge(Config, #{ <<"pipelining">> => 1, - <<"resource_opts">> => #{<<"batch_size">> => 1} + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"resume_interval">> => <<"15s">> + } }), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), From 127a075b66c469b0e873934c3b389ed2ebb73aca Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 22 Mar 2023 14:34:37 -0300 Subject: [PATCH 13/15] test(dynamo): attempt to fix dynamo tests Those tests in the `flaky` test are really flaky and require lots of CI retries. Apparently, the flakiness comes from race conditions from restarting bridges with the same name too fast between test cases. Previously, all test cases were sharing the same bridge name (the module name). --- .../src/emqx_resource_manager.erl | 1 + .../test/emqx_ee_bridge_dynamo_SUITE.erl | 50 +++++++++++++------ 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index b21ffcae3..6a4919b41 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -388,6 +388,7 @@ handle_event(state_timeout, health_check, connecting, Data) -> handle_event(enter, _OldState, connected = State, Data) -> ok = log_state_consistency(State, Data), _ = emqx_alarm:deactivate(Data#data.id), + ?tp(resource_connected_enter, #{}), {keep_state_and_data, health_check_actions(Data)}; handle_event(state_timeout, health_check, connected, Data) -> handle_connected_health_check(Data); diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl index 26666c6d8..183002e61 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl @@ -83,9 +83,10 @@ end_per_suite(_Config) -> ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), ok. -init_per_testcase(_Testcase, Config) -> +init_per_testcase(TestCase, Config) -> create_table(Config), - Config. + ok = snabbkaffe:start_trace(), + [{dynamo_name, atom_to_binary(TestCase)} | Config]. end_per_testcase(_Testcase, Config) -> ProxyHost = ?config(proxy_host, Config), @@ -93,7 +94,7 @@ end_per_testcase(_Testcase, Config) -> emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), ok = snabbkaffe:stop(), delete_table(Config), - delete_bridge(Config), + delete_all_bridges(), ok. %%------------------------------------------------------------------------------ @@ -186,15 +187,22 @@ parse_and_check(ConfigString, BridgeType, Name) -> Config. create_bridge(Config) -> - BridgeType = ?config(dynamo_bridge_type, Config), - Name = ?config(dynamo_name, Config), - TDConfig = ?config(dynamo_config, Config), - emqx_bridge:create(BridgeType, Name, TDConfig). + create_bridge(Config, _Overrides = #{}). -delete_bridge(Config) -> +create_bridge(Config, Overrides) -> BridgeType = ?config(dynamo_bridge_type, Config), Name = ?config(dynamo_name, Config), - emqx_bridge:remove(BridgeType, Name). + DynamoConfig0 = ?config(dynamo_config, Config), + DynamoConfig = emqx_map_lib:deep_merge(DynamoConfig0, Overrides), + emqx_bridge:create(BridgeType, Name, DynamoConfig). + +delete_all_bridges() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). create_bridge_http(Params) -> Path = emqx_mgmt_api_test_util:api_path(["bridges"]), @@ -327,10 +335,12 @@ t_setup_via_http_api_and_publish(Config) -> ok. t_get_status(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := resource_connected_enter}, + 20_000 + ), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), @@ -359,7 +369,12 @@ t_write_failure(Config) -> ProxyName = ?config(proxy_name, Config), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), - {ok, _} = create_bridge(Config), + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := resource_connected_enter}, + 20_000 + ), SentData = #{id => emqx_misc:gen_id(), payload => ?PAYLOAD}, emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ?assertMatch( @@ -372,7 +387,12 @@ t_write_timeout(Config) -> ProxyName = ?config(proxy_name, Config), ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), - {ok, _} = create_bridge(Config), + {{ok, _}, {ok, _}} = + ?wait_async_action( + create_bridge(Config), + #{?snk_kind := resource_connected_enter}, + 20_000 + ), SentData = #{id => emqx_misc:gen_id(), payload => ?PAYLOAD}, emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> ?assertMatch( From 8844b22c809ae7495243ed8f9164d84cf6303513 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 22 Mar 2023 15:32:09 -0300 Subject: [PATCH 14/15] docs: improve descriptions Co-authored-by: Zaiming (Stone) Shi --- apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf index eb4f00ac7..aedcabc70 100644 --- a/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf +++ b/apps/emqx_resource/i18n/emqx_resource_schema_i18n.conf @@ -47,12 +47,12 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise resume_interval { desc { - en: """The interval at which a resource will retry inflight requests.""" - zh: """资源重试机内请求的间隔时间。""" + en: """The interval at which the buffer worker attempts to resend failed requests in the inflight window.""" + zh: """在发送失败后尝试重传飞行窗口中的请求的时间间隔。""" } label { en: """Resume Interval""" - zh: """复职时间间隔""" + zh: """重试时间间隔""" } } From cb65cded8825eba47d5fde6f4542e66af7807b04 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Wed, 22 Mar 2023 15:22:13 -0300 Subject: [PATCH 15/15] fix(last_will_testament): don't publish LWT if client is banned when kicked Fixes https://emqx.atlassian.net/browse/EMQX-9288 Related issue: https://github.com/emqx/emqx/issues/10192#issuecomment-1478809900 --- apps/emqx/src/emqx_channel.erl | 20 ++++--- apps/emqx_authz/test/emqx_authz_SUITE.erl | 65 +++++++++++++++++++++++ changes/ce/fix-10209.en.md | 2 + 3 files changed, 80 insertions(+), 7 deletions(-) create mode 100644 changes/ce/fix-10209.en.md diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 9acad4d57..e01a16f83 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -2128,17 +2128,23 @@ publish_will_msg( ClientInfo = #{mountpoint := MountPoint}, Msg = #message{topic = Topic} ) -> - case emqx_access_control:authorize(ClientInfo, publish, Topic) of - allow -> - NMsg = emqx_mountpoint:mount(MountPoint, Msg), - _ = emqx_broker:publish(NMsg), - ok; - deny -> + PublishingDisallowed = emqx_access_control:authorize(ClientInfo, publish, Topic) =/= allow, + ClientBanned = emqx_banned:check(ClientInfo), + case PublishingDisallowed orelse ClientBanned of + true -> ?tp( warning, last_will_testament_publish_denied, - #{topic => Topic} + #{ + topic => Topic, + client_banned => ClientBanned, + publishing_disallowed => PublishingDisallowed + } ), + ok; + false -> + NMsg = emqx_mountpoint:mount(MountPoint, Msg), + _ = emqx_broker:publish(NMsg), ok end. diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index b3ce04f43..84b1d903e 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -26,6 +26,8 @@ -include_lib("emqx/include/emqx_placeholder.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -65,6 +67,7 @@ end_per_suite(_Config) -> init_per_testcase(TestCase, Config) when TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament; + TestCase =:= t_publish_last_will_testament_banned_client_connecting; TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament -> {ok, _} = emqx_authz:update(?CMD_REPLACE, []), @@ -76,11 +79,15 @@ init_per_testcase(_, Config) -> end_per_testcase(TestCase, _Config) when TestCase =:= t_subscribe_deny_disconnect_publishes_last_will_testament; + TestCase =:= t_publish_last_will_testament_banned_client_connecting; TestCase =:= t_publish_deny_disconnect_publishes_last_will_testament -> {ok, _} = emqx:update_config([authorization, deny_action], ignore), + {ok, _} = emqx_authz:update(?CMD_REPLACE, []), + emqx_common_test_helpers:call_janitor(), ok; end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(), ok. set_special_configs(emqx_authz) -> @@ -396,5 +403,63 @@ t_publish_last_will_testament_denied_topic(_Config) -> ok. +%% client is allowed by ACL to publish to its LWT topic, is connected, +%% and then gets banned and kicked out while connected. Should not +%% publish LWT. +t_publish_last_will_testament_banned_client_connecting(_Config) -> + {ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE7]), + Username = <<"some_client">>, + ClientId = <<"some_clientid">>, + LWTPayload = <<"should not be published">>, + LWTTopic = <<"some_client/lwt">>, + ok = emqx:subscribe(<<"some_client/lwt">>), + {ok, C} = emqtt:start_link([ + {clientid, ClientId}, + {username, Username}, + {will_topic, LWTTopic}, + {will_payload, LWTPayload} + ]), + ?assertMatch({ok, _}, emqtt:connect(C)), + + %% Now we ban the client while it is connected. + Now = erlang:system_time(second), + Who = {username, Username}, + emqx_banned:create(#{ + who => Who, + by => <<"test">>, + reason => <<"test">>, + at => Now, + until => Now + 120 + }), + on_exit(fun() -> emqx_banned:delete(Who) end), + %% Now kick it as we do in the ban API. + process_flag(trap_exit, true), + ?check_trace( + begin + ok = emqx_cm:kick_session(ClientId), + receive + {deliver, LWTTopic, #message{payload = LWTPayload}} -> + error(lwt_should_not_be_published_to_forbidden_topic) + after 2_000 -> ok + end, + ok + end, + fun(Trace) -> + ?assertMatch( + [ + #{ + client_banned := true, + publishing_disallowed := false + } + ], + ?of_kind(last_will_testament_publish_denied, Trace) + ), + ok + end + ), + ok = snabbkaffe:stop(), + + ok. + stop_apps(Apps) -> lists:foreach(fun application:stop/1, Apps). diff --git a/changes/ce/fix-10209.en.md b/changes/ce/fix-10209.en.md new file mode 100644 index 000000000..21ce98e44 --- /dev/null +++ b/changes/ce/fix-10209.en.md @@ -0,0 +1,2 @@ +Fix bug where a last will testament (LWT) message could be published +when kicking out a banned client.