diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 291c656ef..626ad55cb 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -33,7 +33,6 @@ roots() -> fields(pulsar_producer) -> fields(config) ++ emqx_bridge_pulsar_pubsub_schema:fields(action_parameters) ++ - fields(producer_opts) ++ [ {local_topic, mk(binary(), #{required => false, desc => ?DESC("producer_local_topic")})}, @@ -85,6 +84,7 @@ fields(config) -> ] ++ emqx_connector_schema_lib:ssl_fields(); fields(producer_opts) -> [ + {pulsar_topic, mk(string(), #{required => true, desc => ?DESC("producer_pulsar_topic")})}, {batch_size, mk( pos_integer(), @@ -110,7 +110,6 @@ fields(producer_opts) -> emqx_schema:bytesize(), #{default => <<"900KB">>, desc => ?DESC("producer_max_batch_bytes")} )}, - {pulsar_topic, mk(binary(), #{required => true, desc => ?DESC("producer_pulsar_topic")})}, {strategy, mk( hoconsc:enum([random, roundrobin, key_dispatch]), @@ -202,7 +201,37 @@ conn_bridge_examples(_Method) -> #{ <<"pulsar_producer">> => #{ summary => <<"Pulsar Producer Bridge">>, - value => #{todo => true} + value => #{ + <<"authentication">> => <<"none">>, + <<"batch_size">> => 1, + <<"buffer">> => + #{ + <<"memory_overload_protection">> => true, + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"10MB">>, + <<"segment_bytes">> => <<"5MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"enable">> => true, + <<"local_topic">> => <<"mqtt/topic/-576460752303423482">>, + <<"max_batch_bytes">> => <<"900KB">>, + <<"message">> => + #{<<"key">> => <<"${.clientid}">>, <<"value">> => <<"${.}">>}, + <<"name">> => <<"pulsar_example_name">>, + <<"pulsar_topic">> => <<"pulsar_example_topic">>, + <<"retention_period">> => <<"infinity">>, + <<"send_buffer">> => <<"1MB">>, + <<"servers">> => <<"pulsar://127.0.0.1:6650">>, + <<"ssl">> => + #{ + <<"enable">> => false, + <<"server_name_indication">> => <<"auto">>, + <<"verify">> => <<"verify_none">> + }, + <<"strategy">> => <<"key_dispatch">>, + <<"sync_timeout">> => <<"5s">>, + <<"type">> => <<"pulsar_producer">> + } } } ]. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl index f51ed7884..6d15687f6 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_action_info.erl @@ -11,8 +11,7 @@ action_type_name/0, connector_type_name/0, schema_module/0, - is_action/1, - action_convert_from_connector/2 + is_action/1 ]). is_action(_) -> true. @@ -24,31 +23,3 @@ action_type_name() -> pulsar. connector_type_name() -> pulsar. schema_module() -> emqx_bridge_pulsar_pubsub_schema. - -action_convert_from_connector(ConnectorConfig, ActionConfig) -> - Dispatch = emqx_utils_conv:bin(maps:get(<<"strategy">>, ConnectorConfig, <<>>)), - case Dispatch of - <<"key_dispatch">> -> - case emqx_utils_maps:deep_find([<<"parameters">>, <<"message">>], ActionConfig) of - {ok, Message} -> - Validator = - #{ - <<"strategy">> => key_dispatch, - <<"message">> => emqx_utils_maps:binary_key_map(Message) - }, - case emqx_bridge_pulsar:producer_strategy_key_validator(Validator) of - ok -> - ActionConfig; - {error, Reason} -> - throw(#{ - reason => Reason, - kind => validation_error - }) - end; - {not_found, _, _} -> - %% no message field, use the default message template - ActionConfig - end; - _ -> - ActionConfig - end. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 7b080d0e6..c88716abc 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -25,13 +25,11 @@ -type pulsar_client_id() :: atom(). -type state() :: #{ - pulsar_client_id := pulsar_client_id(), - producers := pulsar_producers:producers(), - channels := map() + client_id := pulsar_client_id(), + channels := map(), + client_opts := map() }. --type buffer_mode() :: memory | disk | hybrid. --type compression_mode() :: no_compression | snappy | zlib. --type partition_strategy() :: random | roundrobin | key_dispatch. + -type message_template_raw() :: #{ key := binary(), value := binary() @@ -42,25 +40,9 @@ }. -type config() :: #{ authentication := _, - batch_size := pos_integer(), bridge_name := atom(), - buffer := #{ - mode := buffer_mode(), - per_partition_limit := emqx_schema:bytesize(), - segment_bytes := emqx_schema:bytesize(), - memory_overload_protection := boolean() - }, - compression := compression_mode(), - connect_timeout := emqx_schema:duration_ms(), - max_batch_bytes := emqx_schema:bytesize(), - message := message_template_raw(), - pulsar_topic := binary(), - retention_period := infinity | emqx_schema:duration_ms(), - send_buffer := emqx_schema:bytesize(), servers := binary(), - ssl := _, - strategy := partition_strategy(), - sync_timeout := emqx_schema:duration_ms() + ssl := _ }. %% Allocatable resources @@ -116,54 +98,57 @@ on_start(InstanceId, Config) -> end, throw(Message) end, - start_producer(Config, InstanceId, ClientId, ClientOpts). + {ok, #{channels => #{}, client_id => ClientId, client_opts => ClientOpts}}. on_add_channel( - _InstanceId, - #{channels := Channels} = State, + InstanceId, + #{channels := Channels, client_id := ClientId, client_opts := ClientOpts} = State, ChannelId, - #{parameters := #{message := Message, sync_timeout := SyncTimeout}} + #{parameters := #{message := Message, sync_timeout := SyncTimeout} = Params} ) -> case maps:is_key(ChannelId, Channels) of true -> - {error, already_exists}; + {error, channel_already_exists}; false -> + {ok, Producers} = start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params), Parameters = #{ message => compile_message_template(Message), - sync_timeout => SyncTimeout + sync_timeout => SyncTimeout, + producers => Producers }, NewChannels = maps:put(ChannelId, Parameters, Channels), {ok, State#{channels => NewChannels}} end. -on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> - {ok, State#{channels => maps:remove(ChannelId, Channels)}}. +on_remove_channel(InstanceId, State, ChannelId) -> + #{channels := Channels, client_id := ClientId} = State, + case maps:find(ChannelId, Channels) of + {ok, #{producers := Producers}} -> + stop_producers(ClientId, Producers), + emqx_resource:deallocate_resource(InstanceId, {?pulsar_producers, ChannelId}), + {ok, State#{channels => maps:remove(ChannelId, Channels)}}; + error -> + {ok, State} + end. on_get_channels(InstanceId) -> emqx_bridge_v2:get_channels_for_connector(InstanceId). -spec on_stop(resource_id(), state()) -> ok. on_stop(InstanceId, _State) -> - case emqx_resource:get_allocated_resources(InstanceId) of - #{?pulsar_client_id := ClientId, ?pulsar_producers := Producers} -> - stop_producers(ClientId, Producers), + Resources0 = emqx_resource:get_allocated_resources(InstanceId), + case maps:take(?pulsar_client_id, Resources0) of + {ClientId, Resources} -> + maps:foreach( + fun({?pulsar_producers, _BridgeV2Id}, Producers) -> + stop_producers(ClientId, Producers) + end, + Resources + ), stop_client(ClientId), - ?tp(pulsar_bridge_stopped, #{ - instance_id => InstanceId, - pulsar_client_id => ClientId, - pulsar_producers => Producers - }), - ok; - #{?pulsar_client_id := ClientId} -> - stop_client(ClientId), - ?tp(pulsar_bridge_stopped, #{ - instance_id => InstanceId, - pulsar_client_id => ClientId, - pulsar_producers => undefined - }), - ok; - _ -> ?tp(pulsar_bridge_stopped, #{instance_id => InstanceId}), + ok; + error -> ok end. @@ -172,35 +157,32 @@ on_stop(InstanceId, _State) -> %% `emqx_resource_manager' will kill the Pulsar producers and messages might be lost. -spec on_get_status(resource_id(), state()) -> connected | connecting. on_get_status(_InstanceId, State = #{}) -> - #{ - pulsar_client_id := ClientId, - producers := Producers - } = State, + #{client_id := ClientId} = State, case pulsar_client_sup:find_client(ClientId) of {ok, Pid} -> try pulsar_client:get_status(Pid) of - true -> - get_producer_status(Producers); - false -> - connecting + true -> ?status_connected; + false -> ?status_connecting catch error:timeout -> - connecting; + ?status_connecting; exit:{noproc, _} -> - connecting + ?status_connecting end; {error, _} -> - connecting + ?status_connecting end; on_get_status(_InstanceId, _State) -> %% If a health check happens just after a concurrent request to %% create the bridge is not quite finished, `State = undefined'. - connecting. + ?status_connecting. on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> - case maps:is_key(ChannelId, Channels) of - true -> connected; - false -> {error, channel_not_exists} + case maps:find(ChannelId, Channels) of + {ok, #{producers := Producers}} -> + get_producer_status(Producers); + error -> + {error, channel_not_exists} end. -spec on_query(resource_id(), tuple(), state()) -> @@ -208,11 +190,11 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> | {error, timeout} | {error, term()}. on_query(_InstanceId, {ChannelId, Message}, State) -> - #{producers := Producers, channels := Channels} = State, + #{channels := Channels} = State, case maps:find(ChannelId, Channels) of error -> {error, channel_not_exists}; - {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout}} -> + {ok, #{message := MessageTmpl, sync_timeout := SyncTimeout, producers := Producers}} -> PulsarMessage = render_message(Message, MessageTmpl), try pulsar:send_sync(Producers, [PulsarMessage], SyncTimeout) @@ -227,11 +209,11 @@ on_query(_InstanceId, {ChannelId, Message}, State) -> ) -> {ok, pid()}. on_query_async(_InstanceId, {ChannelId, Message}, AsyncReplyFn, State) -> - #{producers := Producers, channels := Channels} = State, + #{channels := Channels} = State, case maps:find(ChannelId, Channels) of error -> {error, channel_not_exists}; - {ok, #{message := MessageTmpl}} -> + {ok, #{message := MessageTmpl, producers := Producers}} -> ?tp_span( pulsar_producer_on_query_async, #{instance_id => _InstanceId, message => Message}, @@ -299,18 +281,22 @@ conn_opts(#{authentication := #{jwt := JWT}}) -> replayq_dir(ClientId) -> filename:join([emqx:data_dir(), "pulsar", emqx_utils_conv:bin(ClientId)]). --spec producer_name(pulsar_client_id()) -> atom(). -producer_name(ClientId) -> - ClientIdBin = emqx_utils_conv:bin(ClientId), - binary_to_atom( - iolist_to_binary([ - <<"producer-">>, - ClientIdBin - ]) - ). +producer_name(InstanceId, ChannelId) -> + case is_dry_run(InstanceId) of + %% do not create more atom + true -> + pulsar_producer_probe_worker; + false -> + ChannelIdBin = emqx_utils_conv:bin(ChannelId), + binary_to_atom( + iolist_to_binary([ + <<"producer-">>, + ChannelIdBin + ]) + ) + end. --spec start_producer(config(), resource_id(), pulsar_client_id(), map()) -> {ok, state()}. -start_producer(Config, InstanceId, ClientId, ClientOpts) -> +start_producer(InstanceId, ChannelId, ClientId, ClientOpts, Params) -> #{ conn_opts := ConnOpts, ssl_opts := SSLOpts @@ -325,16 +311,16 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> }, compression := Compression, max_batch_bytes := MaxBatchBytes, - pulsar_topic := PulsarTopic0, + pulsar_topic := PulsarTopic, retention_period := RetentionPeriod, send_buffer := SendBuffer, strategy := Strategy - } = Config, + } = Params, {OffloadMode, ReplayQDir} = case BufferMode of memory -> {false, false}; - disk -> {false, replayq_dir(ClientId)}; - hybrid -> {true, replayq_dir(ClientId)} + disk -> {false, replayq_dir(ChannelId)}; + hybrid -> {true, replayq_dir(ChannelId)} end, MemOLP = case os:type() of @@ -348,7 +334,7 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> replayq_seg_bytes => SegmentBytes, drop_if_highmem => MemOLP }, - ProducerName = producer_name(ClientId), + ProducerName = producer_name(InstanceId, ChannelId), ?tp(pulsar_producer_capture_name, #{producer_name => ProducerName}), ProducerOpts0 = #{ @@ -363,19 +349,17 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> tcp_opts => [{sndbuf, SendBuffer}] }, ProducerOpts = maps:merge(ReplayQOpts, ProducerOpts0), - PulsarTopic = binary_to_list(PulsarTopic0), ?tp(pulsar_producer_about_to_start_producers, #{producer_name => ProducerName}), try pulsar:ensure_supervised_producers(ClientId, PulsarTopic, ProducerOpts) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstanceId, ?pulsar_producers, Producers), + ok = emqx_resource:allocate_resource( + InstanceId, + {?pulsar_producers, ChannelId}, + Producers + ), ?tp(pulsar_producer_producers_allocated, #{}), - State = #{ - pulsar_client_id => ClientId, - producers => Producers, - channels => #{} - }, ?tp(pulsar_producer_bridge_started, #{}), - {ok, State} + {ok, Producers} catch Kind:Error:Stacktrace -> ?tp( @@ -388,7 +372,10 @@ start_producer(Config, InstanceId, ClientId, ClientOpts) -> stacktrace => Stacktrace } ), - stop_client(ClientId), + ?tp(pulsar_bridge_producer_stopped, #{ + pulsar_client_id => ClientId, + producers => undefined + }), throw(failed_to_start_pulsar_producer) end. @@ -412,7 +399,10 @@ stop_producers(ClientId, Producers) -> _ = log_when_error( fun() -> ok = pulsar:stop_and_delete_supervised_producers(Producers), - ?tp(pulsar_bridge_producer_stopped, #{pulsar_client_id => ClientId}), + ?tp(pulsar_bridge_producer_stopped, #{ + pulsar_client_id => ClientId, + producers => Producers + }), ok end, #{ @@ -467,15 +457,19 @@ get_producer_status(Producers) -> do_get_producer_status(Producers, 0). do_get_producer_status(_Producers, TimeSpent) when TimeSpent > ?HEALTH_CHECK_RETRY_TIMEOUT -> - connecting; + ?status_connecting; do_get_producer_status(Producers, TimeSpent) -> - case pulsar_producers:all_connected(Producers) of + try pulsar_producers:all_connected(Producers) of true -> - connected; + ?status_connected; false -> Sleep = 200, timer:sleep(Sleep), do_get_producer_status(Producers, TimeSpent + Sleep) + %% producer crashed with badarg. will recover later + catch + error:badarg -> + ?status_connecting end. partition_strategy(key_dispatch) -> first_key_dispatch; @@ -485,17 +479,17 @@ is_sensitive_key(auth_data) -> true; is_sensitive_key(_) -> false. get_error_message({BrokerErrorMap, _}) when is_map(BrokerErrorMap) -> - Iter = maps:iterator(BrokerErrorMap), - do_get_error_message(Iter); + Iterator = maps:iterator(BrokerErrorMap), + do_get_error_message(Iterator); get_error_message(_Error) -> error. -do_get_error_message(Iter) -> - case maps:next(Iter) of - {{_Broker, _Port}, #{message := Message}, _NIter} -> +do_get_error_message(Iterator) -> + case maps:next(Iterator) of + {{_Broker, _Port}, #{message := Message}, _NIterator} -> {ok, Message}; - {_K, _V, NIter} -> - do_get_error_message(NIter); + {_K, _V, NIterator} -> + do_get_error_message(NIterator); none -> error end. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl index 953318e0a..f8b7e3909 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector_schema.erl @@ -18,9 +18,8 @@ namespace() -> ?TYPE. roots() -> []. fields("config_connector") -> - lists:keydelete(enable, 1, emqx_bridge_schema:common_bridge_fields()) ++ - emqx_bridge_pulsar:fields(config) ++ - emqx_bridge_pulsar:fields(producer_opts) ++ + emqx_bridge_schema:common_bridge_fields() ++ + lists:keydelete(enable, 1, emqx_bridge_pulsar:fields(config)) ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields(connector_resource_opts) -> emqx_connector_schema:resource_opts_fields(); @@ -59,13 +58,5 @@ connector_example_values() -> servers => <<"pulsar://127.0.0.1:6650">>, authentication => none, connect_timeout => <<"5s">>, - batch_size => 10, - compression => no_compression, - send_buffer => <<"1MB">>, - retention_period => <<"100s">>, - max_batch_bytes => <<"32MB">>, - pulsar_topic => <<"test_topic">>, - strategy => random, - buffer => #{mode => memory}, ssl => #{enable => false} }. diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl index a705ed560..9565aa5bc 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_pubsub_schema.erl @@ -32,22 +32,23 @@ fields(publisher_action) -> ?R_REF(action_parameters), #{ required => true, - desc => ?DESC(action_parameters) + desc => ?DESC(action_parameters), + validator => fun emqx_bridge_pulsar:producer_strategy_key_validator/1 } ), #{resource_opts_ref => ?R_REF(action_resource_opts)} ); fields(action_parameters) -> [ - {sync_timeout, - ?HOCON(emqx_schema:timeout_duration_ms(), #{ - default => <<"3s">>, desc => ?DESC("producer_sync_timeout") - })}, {message, ?HOCON(?R_REF(producer_pulsar_message), #{ required => false, desc => ?DESC("producer_message_opts") + })}, + {sync_timeout, + ?HOCON(emqx_schema:timeout_duration_ms(), #{ + default => <<"3s">>, desc => ?DESC("producer_sync_timeout") })} - ]; + ] ++ emqx_bridge_pulsar:fields(producer_opts); fields(producer_pulsar_message) -> [ {key, @@ -114,7 +115,8 @@ bridge_v2_examples(Method) -> message => #{ key => <<"${.clientid}">>, value => <<"${.}">> - } + }, + pulsar_topic => <<"test_topic">> } } ) diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl index c9b25cc71..b3c351da0 100644 --- a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_connector_SUITE.erl @@ -599,13 +599,7 @@ t_start_and_produce_ok(Config) -> _Sleep = 100, _Attempts0 = 20, begin - BridgeId = emqx_bridge_resource:bridge_id( - <<"pulsar">>, ?config(pulsar_name, Config) - ), - ConnectorId = emqx_bridge_resource:resource_id( - <<"pulsar">>, ?config(pulsar_name, Config) - ), - Id = <<"action:", BridgeId/binary, ":", ConnectorId/binary>>, + Id = get_channel_id(Config), ?assertMatch( #{ counters := #{ @@ -634,6 +628,15 @@ t_start_and_produce_ok(Config) -> ), ok. +get_channel_id(Config) -> + BridgeId = emqx_bridge_resource:bridge_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + ConnectorId = emqx_bridge_resource:resource_id( + <<"pulsar">>, ?config(pulsar_name, Config) + ), + <<"action:", BridgeId/binary, ":", ConnectorId/binary>>. + %% Under normal operations, the bridge will be called async via %% `simple_async_query'. t_sync_query(Config) -> @@ -900,7 +903,7 @@ t_failure_to_start_producer(Config) -> {{ok, _}, {ok, _}} = ?wait_async_action( create_bridge(Config), - #{?snk_kind := pulsar_bridge_client_stopped}, + #{?snk_kind := pulsar_bridge_producer_stopped}, 20_000 ), ok @@ -928,6 +931,8 @@ t_producer_process_crash(Config) -> #{?snk_kind := pulsar_producer_bridge_started}, 10_000 ), + ResourceId = resource_id(Config), + ChannelId = get_channel_id(Config), [ProducerPid | _] = [ Pid || {_Name, PS, _Type, _Mods} <- supervisor:which_children(pulsar_producers_sup), @@ -944,17 +949,23 @@ t_producer_process_crash(Config) -> ok after 1_000 -> ct:fail("pid didn't die") end, - ResourceId = resource_id(Config), ?retry( _Sleep0 = 50, _Attempts0 = 50, - ?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual( + #{error => <<"Not connected for unknown reason">>, status => connecting}, + emqx_resource_manager:channel_health_check(ResourceId, ChannelId) + ) ), + ?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId)), %% Should recover given enough time. ?retry( _Sleep = 1_000, _Attempts = 20, - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ?assertEqual( + #{error => undefined, status => connected}, + emqx_resource_manager:channel_health_check(ResourceId, ChannelId) + ) ), {_, {ok, _}} = ?wait_async_action( @@ -1002,8 +1013,8 @@ t_resource_manager_crash_after_producers_started(Config) -> {{error, {config_update_crashed, {killed, _}}}, {ok, _}} = ?wait_async_action( create_bridge(Config), - #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := Producers} when - Producers =/= undefined, + #{?snk_kind := pulsar_bridge_stopped, instance_id := InstanceId} when + InstanceId =/= undefined, 10_000 ), ?assertEqual([], get_pulsar_producers()), @@ -1036,7 +1047,7 @@ t_resource_manager_crash_before_producers_started(Config) -> {{error, {config_update_crashed, _}}, {ok, _}} = ?wait_async_action( create_bridge(Config), - #{?snk_kind := pulsar_bridge_stopped, pulsar_producers := undefined}, + #{?snk_kind := pulsar_bridge_stopped}, 10_000 ), ?assertEqual([], get_pulsar_producers()), @@ -1236,3 +1247,19 @@ t_resilience(Config) -> [] ), ok. + +get_producers_config(ConnectorId, ChannelId) -> + [ + #{ + state := + #{ + channels := + #{ChannelId := #{producers := Producers}} + } + } + ] = + lists:filter( + fun(#{id := Id}) -> Id =:= ConnectorId end, + emqx_resource_manager:list_all() + ), + Producers.