diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index d7b4de0c0..bec3c49fa 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -253,10 +253,14 @@ producers_config(BridgeName, ClientId, Input) -> mode := BufferMode, per_partition_limit := PerPartitionLimit, segment_bytes := SegmentBytes, - memory_overload_protection := MemOLP + memory_overload_protection := MemOLP0 } } = Input, - + MemOLP = + case os:type() of + {unix, linux} -> MemOLP0; + _ -> false + end, {OffloadMode, ReplayqDir} = case BufferMode of memory -> {false, false}; @@ -268,7 +272,7 @@ producers_config(BridgeName, ClientId, Input) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ name => make_producer_name(BridgeName), - partitioner => PartitionStrategy, + partitioner => partitioner(PartitionStrategy), partition_count_refresh_interval_seconds => PCntRefreshInterval, replayq_dir => ReplayqDir, replayq_offload_mode => OffloadMode, @@ -282,6 +286,11 @@ producers_config(BridgeName, ClientId, Input) -> telemetry_meta_data => #{bridge_id => ResourceID} }. +%% Wolff API is a batch API. +%% key_dispatch only looks at the first element, so it's named 'first_key_dispatch' +partitioner(random) -> random; +partitioner(key_dispatch) -> first_key_dispatch. + replayq_dir(ClientId) -> filename:join([emqx:data_dir(), "kafka", ClientId]).