diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index a3e50054e..18faf0e3b 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -83,7 +83,7 @@ fields(producer_opts) -> {pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})}, {strategy, mk( - hoconsc:enum([random, roundrobin, first_key_dispatch]), + hoconsc:enum([random, roundrobin, key_dispatch]), #{default => random, desc => ?DESC("producer_strategy")} )}, {buffer, mk(ref(producer_buffer), #{required => false, desc => ?DESC("producer_buffer")})}, diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl index 72363389e..b86124417 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_impl_producer.erl @@ -28,7 +28,7 @@ }. -type buffer_mode() :: memory | disk | hybrid. -type compression_mode() :: no_compression | snappy | zlib. --type partition_strategy() :: random | roundrobin | first_key_dispatch. +-type partition_strategy() :: random | roundrobin | key_dispatch. -type message_template_raw() :: #{ key := binary(), value := binary() @@ -290,7 +290,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> name => ProducerName, retention_period => RetentionPeriod, ssl_opts => SSLOpts, - strategy => Strategy, + strategy => partition_strategy(Strategy), tcp_opts => [{sndbuf, SendBuffer}] }, ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0), @@ -394,3 +394,6 @@ get_producer_status(Producers) -> true -> connected; false -> connecting end. + +partition_strategy(key_dispatch) -> first_key_dispatch; +partition_strategy(Strategy) -> Strategy.