fix(kafka): olp fix was accidentally deleted in 0fd8880d0a

This commit is contained in:
Zaiming (Stone) Shi 2023-01-18 10:04:38 +01:00
parent 3891aeb5fc
commit 3872c4451f
1 changed files with 12 additions and 3 deletions

View File

@ -253,10 +253,14 @@ producers_config(BridgeName, ClientId, Input) ->
mode := BufferMode, mode := BufferMode,
per_partition_limit := PerPartitionLimit, per_partition_limit := PerPartitionLimit,
segment_bytes := SegmentBytes, segment_bytes := SegmentBytes,
memory_overload_protection := MemOLP memory_overload_protection := MemOLP0
} }
} = Input, } = Input,
MemOLP =
case os:type() of
{unix, linux} -> MemOLP0;
_ -> false
end,
{OffloadMode, ReplayqDir} = {OffloadMode, ReplayqDir} =
case BufferMode of case BufferMode of
memory -> {false, false}; memory -> {false, false};
@ -268,7 +272,7 @@ producers_config(BridgeName, ClientId, Input) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
#{ #{
name => make_producer_name(BridgeName), name => make_producer_name(BridgeName),
partitioner => PartitionStrategy, partitioner => partitioner(PartitionStrategy),
partition_count_refresh_interval_seconds => PCntRefreshInterval, partition_count_refresh_interval_seconds => PCntRefreshInterval,
replayq_dir => ReplayqDir, replayq_dir => ReplayqDir,
replayq_offload_mode => OffloadMode, replayq_offload_mode => OffloadMode,
@ -282,6 +286,11 @@ producers_config(BridgeName, ClientId, Input) ->
telemetry_meta_data => #{bridge_id => ResourceID} telemetry_meta_data => #{bridge_id => ResourceID}
}. }.
%% Wolff API is a batch API.
%% key_dispatch only looks at the first element, so it's named 'first_key_dispatch'
partitioner(random) -> random;
partitioner(key_dispatch) -> first_key_dispatch.
replayq_dir(ClientId) -> replayq_dir(ClientId) ->
filename:join([emqx:data_dir(), "kafka", ClientId]). filename:join([emqx:data_dir(), "kafka", ClientId]).