diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index d7b4de0c0..bec3c49fa 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -253,10 +253,14 @@ producers_config(BridgeName, ClientId, Input) -> mode := BufferMode, per_partition_limit := PerPartitionLimit, segment_bytes := SegmentBytes, - memory_overload_protection := MemOLP + memory_overload_protection := MemOLP0 } } = Input, - + MemOLP = + case os:type() of + {unix, linux} -> MemOLP0; + _ -> false + end, {OffloadMode, ReplayqDir} = case BufferMode of memory -> {false, false}; @@ -268,7 +272,7 @@ producers_config(BridgeName, ClientId, Input) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ name => make_producer_name(BridgeName), - partitioner => PartitionStrategy, + partitioner => partitioner(PartitionStrategy), partition_count_refresh_interval_seconds => PCntRefreshInterval, replayq_dir => ReplayqDir, replayq_offload_mode => OffloadMode, @@ -282,6 +286,11 @@ producers_config(BridgeName, ClientId, Input) -> telemetry_meta_data => #{bridge_id => ResourceID} }. +%% Wolff API is a batch API. +%% key_dispatch only looks at the first element, so it's named 'first_key_dispatch' +partitioner(random) -> random; +partitioner(key_dispatch) -> first_key_dispatch. + replayq_dir(ClientId) -> filename:join([emqx:data_dir(), "kafka", ClientId]). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 2d67a9941..14567dd39 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -109,6 +109,9 @@ set_special_configs(_) -> t_publish_no_auth(_CtConfig) -> publish_with_and_without_ssl("none"). +t_publish_no_auth_key_dispatch(_CtConfig) -> + publish_with_and_without_ssl("none", #{"partition_strategy" => "key_dispatch"}). + t_publish_sasl_plain(_CtConfig) -> publish_with_and_without_ssl(valid_sasl_plain_settings()). @@ -404,20 +407,35 @@ t_failed_creation_then_fix(_Config) -> %%------------------------------------------------------------------------------ publish_with_and_without_ssl(AuthSettings) -> - publish_helper(#{ - auth_settings => AuthSettings, - ssl_settings => #{} - }), - publish_helper(#{ - auth_settings => AuthSettings, - ssl_settings => valid_ssl_settings() - }), + publish_with_and_without_ssl(AuthSettings, #{}). + +publish_with_and_without_ssl(AuthSettings, Config) -> + publish_helper( + #{ + auth_settings => AuthSettings, + ssl_settings => #{} + }, + Config + ), + publish_helper( + #{ + auth_settings => AuthSettings, + ssl_settings => valid_ssl_settings() + }, + Config + ), ok. -publish_helper(#{ - auth_settings := AuthSettings, - ssl_settings := SSLSettings -}) -> +publish_helper(AuthSettings) -> + publish_helper(AuthSettings, #{}). + +publish_helper( + #{ + auth_settings := AuthSettings, + ssl_settings := SSLSettings + }, + Conf0 +) -> HostsString = case {AuthSettings, SSLSettings} of {"none", Map} when map_size(Map) =:= 0 -> @@ -434,13 +452,17 @@ publish_helper(#{ InstId = emqx_bridge_resource:resource_id("kafka", Name), BridgeId = emqx_bridge_resource:bridge_id("kafka", Name), KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => AuthSettings, - "kafka_hosts_string" => HostsString, - "kafka_topic" => KafkaTopic, - "instance_id" => InstId, - "ssl" => SSLSettings - }), + Conf = config( + #{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => InstId, + "ssl" => SSLSettings + }, + Conf0 + ), + emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}), %% To make sure we get unique value timer:sleep(1), @@ -463,7 +485,15 @@ publish_helper(#{ ok = emqx_bridge_resource:remove(BridgeId), ok. +default_config() -> + #{"partition_strategy" => "random"}. + config(Args) -> + config(Args, #{}). + +config(Args0, More) -> + Args1 = maps:merge(default_config(), Args0), + Args = maps:merge(Args1, More), ConfText = hocon_config(Args), ct:pal("Running tests with conf:\n~s", [ConfText]), {ok, Conf} = hocon:binary(ConfText), @@ -506,6 +536,7 @@ producer = { kafka = { topic = \"{{ kafka_topic }}\" message = {key = \"${clientid}\", value = \"${.payload}\"} + partition_strategy = {{ partition_strategy }} } } """. diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 3a7b40317..07d45efe1 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -169,11 +169,6 @@ else export UID_GID="$ORIG_UID_GID" fi -if [ "$STOP" = 'no' ]; then - # shellcheck disable=2086 # no quotes for F_OPTIONS - docker-compose $F_OPTIONS up -d --build --remove-orphans -fi - # /emqx is where the source dir is mounted to the Erlang container # in .ci/docker-compose-file/docker-compose.yaml TTY='' @@ -181,18 +176,29 @@ if [[ -t 1 ]]; then TTY='-t' fi +function restore_ownership { + if ! sudo chown -R "$ORIG_UID_GID" . >/dev/null 2>&1; then + docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "chown -R $ORIG_UID_GID /emqx" >/dev/null 2>&1 || true + fi +} + +restore_ownership +trap restore_ownership EXIT + + +if [ "$STOP" = 'no' ]; then + # some left-over log file has to be deleted before a new docker-compose up + rm -f '.ci/docker-compose-file/redis/*.log' + # shellcheck disable=2086 # no quotes for F_OPTIONS + docker-compose $F_OPTIONS up -d --build --remove-orphans +fi + echo "Fixing file owners and permissions for $UID_GID" # rebar and hex cache directory need to be writable by $UID docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "mkdir -p /.cache && chown $UID_GID /.cache && chown -R $UID_GID /emqx" # need to initialize .erlang.cookie manually here because / is not writable by $UID docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "openssl rand -base64 16 > /.erlang.cookie && chown $UID_GID /.erlang.cookie && chmod 0400 /.erlang.cookie" -restore_ownership() { - if [[ "$ORIG_UID_GID" != "$UID_GID" ]]; then - docker exec -i $TTY -u root:root "$ERLANG_CONTAINER" bash -c "chown -R $ORIG_UID_GID /emqx" - fi -} - if [ "$ONLY_UP" = 'yes' ]; then exit 0 fi @@ -204,10 +210,8 @@ if [ "$STOP" = 'yes' ]; then docker-compose $F_OPTIONS down --remove-orphans elif [ "$ATTACH" = 'yes' ]; then docker exec -it "$ERLANG_CONTAINER" bash - restore_ownership elif [ "$CONSOLE" = 'yes' ]; then docker exec -e PROFILE="$PROFILE" -i $TTY "$ERLANG_CONTAINER" bash -c "make run" - restore_ownership else if [ -z "${REBAR3CT:-}" ]; then docker exec -e IS_CI="$IS_CI" -e PROFILE="$PROFILE" -i $TTY "$ERLANG_CONTAINER" bash -c "BUILD_WITHOUT_QUIC=1 make ${WHICH_APP}-ct"