From e284a83f733f68d0463dd705a33d44aad64de2e3 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 5 Feb 2024 15:29:19 +0100 Subject: [PATCH 1/3] feat: refactor RocketMQ bridge to connector and action Fixes: https://emqx.atlassian.net/browse/EMQX-11467 --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + .../src/emqx_bridge_rocketmq.app.src | 4 +- .../src/emqx_bridge_rocketmq.erl | 166 ++++++++++++++++-- .../src/emqx_bridge_rocketmq_action_info.erl | 22 +++ .../src/emqx_bridge_rocketmq_connector.erl | 121 ++++++++++--- .../test/emqx_bridge_rocketmq_SUITE.erl | 15 +- .../src/schema/emqx_connector_ee_schema.erl | 12 ++ .../src/schema/emqx_connector_schema.erl | 2 + rel/i18n/emqx_bridge_rocketmq.hocon | 18 ++ 9 files changed, 317 insertions(+), 44 deletions(-) create mode 100644 apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_action_info.erl diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index e74e6aa3e..ea9c76d2b 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -94,6 +94,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, emqx_bridge_oracle_action_info, + emqx_bridge_rocketmq_action_info, emqx_bridge_influxdb_action_info, emqx_bridge_cassandra_action_info, emqx_bridge_mysql_action_info, diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src index 38c00e7ee..564e36a88 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.app.src @@ -1,9 +1,9 @@ {application, emqx_bridge_rocketmq, [ {description, "EMQX Enterprise RocketMQ Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, []}, {applications, [kernel, stdlib, emqx_resource, rocketmq]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_rocketmq_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl index b3149fa99..faac69095 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl @@ -8,12 +8,7 @@ -include_lib("emqx_bridge/include/emqx_bridge.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). - --export([ - conn_bridge_examples/1, - values/1 -]). +-import(hoconsc, [mk/2, enum/1]). -export([ namespace/0, @@ -22,6 +17,14 @@ desc/1 ]). +-export([ + bridge_v2_examples/1, + connector_examples/1, + conn_bridge_examples/1 +]). + +-define(CONNECTOR_TYPE, rocketmq). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). -define(DEFAULT_TEMPLATE, <<>>). -define(DEFFAULT_REQ_TIMEOUT, <<"15s">>). @@ -33,14 +36,14 @@ conn_bridge_examples(Method) -> #{ <<"rocketmq">> => #{ summary => <<"RocketMQ Bridge">>, - value => values(Method) + value => conn_bridge_example_values(Method) } } ]. -values(get) -> - values(post); -values(post) -> +conn_bridge_example_values(get) -> + conn_bridge_example_values(post); +conn_bridge_example_values(post) -> #{ enable => true, type => rocketmq, @@ -58,8 +61,59 @@ values(post) -> max_buffer_bytes => ?DEFAULT_BUFFER_BYTES } }; -values(put) -> - values(post). +conn_bridge_example_values(put) -> + conn_bridge_example_values(post). + +%% TODO fix these examples + +connector_examples(Method) -> + [ + #{ + <<"rocketmq">> => + #{ + summary => <<"RocketMQ Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_values() + ) + } + } + ]. + +connector_values() -> + #{ + <<"enable">> => true, + <<"servers">> => <<"127.0.0.1:9876">>, + <<"pool_size">> => 8, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } + }. + +bridge_v2_examples(Method) -> + [ + #{ + <<"rocketmq">> => + #{ + summary => <<"RocketMQ Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. + +action_values() -> + #{ + <<"parameters">> => #{ + <<"topic">> => <<"TopicTest">>, + <<"template">> => ?DEFAULT_TEMPLATE, + <<"refresh_interval">> => <<"3s">>, + <<"send_buffer">> => <<"1024KB">>, + <<"sync_timeout">> => <<"3s">> + } + }. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -67,6 +121,84 @@ namespace() -> "bridge_rocketmq". roots() -> []. +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + fields("config_connector") -- emqx_connector_schema:common_fields() + ); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(rocketmq_action)); +fields(action) -> + {?ACTION_TYPE, + hoconsc:mk( + hoconsc:map(name, hoconsc:ref(?MODULE, rocketmq_action)), + #{ + desc => <<"RocketMQ Action Config">>, + required => false + } + )}; +fields(rocketmq_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + hoconsc:mk( + hoconsc:ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC("action_parameters") + } + ) + ); +fields(action_parameters) -> + Parameters = + [ + {template, + mk( + binary(), + #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} + )} + ] ++ emqx_bridge_rocketmq_connector:fields(config), + lists:foldl( + fun(Key, Acc) -> + proplists:delete(Key, Acc) + end, + Parameters, + [ + servers, + pool_size, + auto_reconnect, + access_key, + secret_key, + security_token + ] + ); +fields("config_connector") -> + Config = + emqx_connector_schema:common_fields() ++ + emqx_bridge_rocketmq_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + lists:foldl( + fun(Key, Acc) -> + proplists:delete(Key, Acc) + end, + Config, + [ + topic, + sync_timeout, + refresh_interval, + send_buffer, + auto_reconnect + ] + ); +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, @@ -94,6 +226,16 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for RocketMQ using `", string:to_upper(Method), "` method."]; +desc("creation_opts") -> + ?DESC(emqx_resource_schema, "creation_opts"); +desc("config_connector") -> + ?DESC("config_connector"); +desc(rocketmq_action) -> + ?DESC("rocketmq_action"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_action_info.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_action_info.erl new file mode 100644 index 000000000..f3a7ab1a3 --- /dev/null +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_action_info.erl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_rocketmq_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +bridge_v1_type_name() -> rocketmq. + +action_type_name() -> rocketmq. + +connector_type_name() -> rocketmq. + +schema_module() -> emqx_bridge_rocketmq. diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index 81045ade4..baa895a8a 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -21,10 +21,14 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2]). -define(ROCKETMQ_HOST_OPTIONS, #{ default_port => 9876 @@ -82,7 +86,12 @@ callback_mode() -> always_sync. on_start( InstanceId, - #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config + #{ + servers := BinServers, + access_key := AccessKey, + secret_key := SecretKey, + security_token := SecurityToken + } = Config ) -> ?SLOG(info, #{ msg => "starting_rocketmq_connector", @@ -94,18 +103,18 @@ on_start( emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS) ), ClientId = client_id(InstanceId), - TopicTks = emqx_placeholder:preproc_tmpl(Topic), - #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), - ClientCfg = #{acl_info => AclInfo}, - Templates = parse_template(Config), + ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), + ClientCfg = #{acl_info => ACLInfo}, State = #{ client_id => ClientId, - topic => Topic, - topic_tokens => TopicTks, - sync_timeout => SyncTimeout, - templates => Templates, - producers_opts => ProducerOpts + acl_info => ACLInfo, + installed_channels => #{} + % topic => Topic, + % topic_tokens => TopicTks, + % sync_timeout => SyncTimeout, + % templates => Templates, + % producers_opts => ProducerOpts }, ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId), @@ -123,6 +132,64 @@ on_start( {error, Reason} end. +on_add_channel( + _InstId, + #{ + installed_channels := InstalledChannels, + acl_info := ACLInfo + } = OldState, + ChannelId, + ChannelConfig +) -> + {ok, ChannelState} = create_channel_state(ChannelConfig, ACLInfo), + NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +create_channel_state( + #{parameters := Conf} = _ChannelConfig, + ACLInfo +) -> + #{ + topic := Topic, + sync_timeout := SyncTimeout + } = Conf, + TopicTks = emqx_placeholder:preproc_tmpl(Topic), + ProducerOpts = make_producer_opts(Conf, ACLInfo), + Templates = parse_template(Conf), + State = #{ + topic => Topic, + topic_tokens => TopicTks, + templates => Templates, + sync_timeout => SyncTimeout, + acl_info => ACLInfo, + producers_opts => ProducerOpts + }, + {ok, State}. + +on_remove_channel( + _InstId, + #{ + installed_channels := InstalledChannels + } = OldState, + ChannelId +) -> + NewInstalledChannels = maps:remove(ChannelId, InstalledChannels), + %% Update state + NewState = OldState#{installed_channels => NewInstalledChannels}, + {ok, NewState}. + +on_get_channel_status( + _ResId, + _ChannelId, + _State +) -> + ?status_connected. + +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + on_stop(InstanceId, _State) -> ?SLOG(info, #{ msg => "stopping_rocketmq_connector", @@ -144,7 +211,7 @@ on_query(InstanceId, Query, State) -> do_query(InstanceId, Query, send_sync, State). %% We only support batch inserts and all messages must have the same topic -on_batch_query(InstanceId, [{send_message, _Msg} | _] = Query, State) -> +on_batch_query(InstanceId, [{_ChannelId, _Msg} | _] = Query, State) -> do_query(InstanceId, Query, batch_send_sync, State); on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. @@ -154,11 +221,11 @@ on_get_status(_InstanceId, #{client_id := ClientId}) -> {ok, Pid} -> status_result(rocketmq_client:get_status(Pid)); _ -> - connecting + ?status_connecting end. -status_result(_Status = true) -> connected; -status_result(_Status) -> connecting. +status_result(_Status = true) -> ?status_connected; +status_result(_Status) -> ?status_connecting. %%======================================================================================== %% Helper fns @@ -169,11 +236,8 @@ do_query( Query, QueryFunc, #{ - templates := Templates, client_id := ClientId, - topic_tokens := TopicTks, - producers_opts := ProducerOpts, - sync_timeout := RequestTimeout + installed_channels := Channels } = State ) -> ?TRACE( @@ -181,6 +245,13 @@ do_query( "rocketmq_connector_received", #{connector => InstanceId, query => Query, state => State} ), + ChannelId = get_channel_id(Query), + #{ + topic_tokens := TopicTks, + templates := Templates, + sync_timeout := RequestTimeout, + producers_opts := ProducerOpts + } = maps:get(ChannelId, Channels), TopicKey = get_topic_key(Query, TopicTks), Data = apply_template(Query, Templates), @@ -209,6 +280,9 @@ do_query( Result end. +get_channel_id({ChannelId, _}) -> ChannelId; +get_channel_id([{ChannelId, _} | _]) -> ChannelId. + safe_do_produce(InstanceId, QueryFunc, ClientId, TopicKey, Data, ProducerOpts, RequestTimeout) -> try Producers = get_producers(InstanceId, ClientId, TopicKey, ProducerOpts), @@ -275,14 +349,11 @@ is_sensitive_key(_) -> make_producer_opts( #{ - access_key := AccessKey, - secret_key := SecretKey, - security_token := SecurityToken, send_buffer := SendBuff, refresh_interval := RefreshInterval - } + }, + ACLInfo ) -> - ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), #{ tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, diff --git a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl index 1a5133b84..4a0a5a862 100644 --- a/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl +++ b/apps/emqx_bridge_rocketmq/test/emqx_bridge_rocketmq_SUITE.erl @@ -196,14 +196,15 @@ create_bridge_http(Params) -> send_message(Config, Payload) -> Name = ?GET_CONFIG(rocketmq_name, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), - BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), - emqx_bridge:send_message(BridgeID, Payload). + ActionId = emqx_bridge_v2:id(BridgeType, Name), + emqx_bridge_v2:query(BridgeType, Name, {ActionId, Payload}, #{}). query_resource(Config, Request) -> Name = ?GET_CONFIG(rocketmq_name, Config), BridgeType = ?GET_CONFIG(rocketmq_bridge_type, Config), - ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), - emqx_resource:query(ResourceID, Request, #{timeout => 500}). + ID = emqx_bridge_v2:id(BridgeType, Name), + ResID = emqx_connector_resource:resource_id(BridgeType, Name), + emqx_resource:query(ID, Request, #{timeout => 500, connector_resource_id => ResID}). %%------------------------------------------------------------------------------ %% Testcases @@ -273,6 +274,7 @@ t_get_status(Config) -> ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), + ?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(BridgeType, Name)), ok. t_simple_query(Config) -> @@ -280,7 +282,10 @@ t_simple_query(Config) -> {ok, _}, create_bridge(Config) ), - Request = {send_message, #{message => <<"Hello">>}}, + Type = ?GET_CONFIG(rocketmq_bridge_type, Config), + Name = ?GET_CONFIG(rocketmq_name, Config), + ActionId = emqx_bridge_v2:id(Type, Name), + Request = {ActionId, #{message => <<"Hello">>}}, Result = query_resource(Config, Request), ?assertEqual(ok, Result), ok. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 2935233be..019545fd6 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -54,6 +54,8 @@ resource_type(timescale) -> emqx_postgresql; resource_type(redis) -> emqx_bridge_redis_connector; +resource_type(rocketmq) -> + emqx_bridge_rocketmq_connector; resource_type(iotdb) -> emqx_bridge_iotdb_connector; resource_type(elasticsearch) -> @@ -199,6 +201,14 @@ connector_structs() -> required => false } )}, + {rocketmq, + mk( + hoconsc:map(name, ref(emqx_bridge_rocketmq, "config_connector")), + #{ + desc => <<"RocketMQ Connector Config">>, + required => false + } + )}, {syskeeper_forwarder, mk( hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)), @@ -291,6 +301,7 @@ schema_modules() -> emqx_bridge_timescale, emqx_postgresql_connector_schema, emqx_bridge_redis_schema, + emqx_bridge_rocketmq, emqx_bridge_iotdb_connector, emqx_bridge_es_connector, emqx_bridge_rabbitmq_connector_schema, @@ -327,6 +338,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), + api_ref(emqx_bridge_rocketmq, <<"rocketmq">>, Method ++ "_connector"), api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method), api_ref(emqx_bridge_es_connector, <<"elasticsearch">>, Method), api_ref(emqx_bridge_opents_connector, <<"opents">>, Method), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index b803ab9f2..5e0174479 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -150,6 +150,8 @@ connector_type_to_bridge_types(pgsql) -> [pgsql]; connector_type_to_bridge_types(redis) -> [redis, redis_single, redis_sentinel, redis_cluster]; +connector_type_to_bridge_types(rocketmq) -> + [rocketmq]; connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; connector_type_to_bridge_types(syskeeper_proxy) -> diff --git a/rel/i18n/emqx_bridge_rocketmq.hocon b/rel/i18n/emqx_bridge_rocketmq.hocon index a2449c1a9..b6bb3aad6 100644 --- a/rel/i18n/emqx_bridge_rocketmq.hocon +++ b/rel/i18n/emqx_bridge_rocketmq.hocon @@ -41,4 +41,22 @@ template.desc: template.label: """Template""" +action_parameters.desc: +"""Action specific configuration.""" + +action_parameters.label: +"""Action""" + +rocketmq_action.desc: +"""Configuration for RocketMQ Action""" + +rocketmq_action.label: +"""RocketMQ Action Configuration""" + +config_connector.desc: +"""Configuration for an RocketMQ Client.""" + +config_connector.label: +"""RocketMQ Client Configuration""" + } From 511d1f732af21767a4066127eb9fbb246a18223b Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 8 Feb 2024 17:50:10 +0100 Subject: [PATCH 2/3] docs: add change log entry for RocketMQ bridge refactoring --- changes/ee/feat-12488.en.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/ee/feat-12488.en.md diff --git a/changes/ee/feat-12488.en.md b/changes/ee/feat-12488.en.md new file mode 100644 index 000000000..3c2eed26f --- /dev/null +++ b/changes/ee/feat-12488.en.md @@ -0,0 +1 @@ +The RocketMQ bridge has been split into connector and action components. Old RocketMQ bridges will be upgraded automatically. From 976099f5fbb3f1894b77d9017465bef114ff4eef Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 12 Feb 2024 10:16:17 +0100 Subject: [PATCH 3/3] fix: cleanups due to problems found by @thalesmg --- apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl | 4 ---- .../src/emqx_bridge_rocketmq_connector.erl | 5 ----- 2 files changed, 9 deletions(-) diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl index faac69095..22514dc5c 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq.erl @@ -64,8 +64,6 @@ conn_bridge_example_values(post) -> conn_bridge_example_values(put) -> conn_bridge_example_values(post). -%% TODO fix these examples - connector_examples(Method) -> [ #{ @@ -226,8 +224,6 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for RocketMQ using `", string:to_upper(Method), "` method."]; -desc("creation_opts") -> - ?DESC(emqx_resource_schema, "creation_opts"); desc("config_connector") -> ?DESC("config_connector"); desc(rocketmq_action) -> diff --git a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl index baa895a8a..c9a7ce177 100644 --- a/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl +++ b/apps/emqx_bridge_rocketmq/src/emqx_bridge_rocketmq_connector.erl @@ -110,11 +110,6 @@ on_start( client_id => ClientId, acl_info => ACLInfo, installed_channels => #{} - % topic => Topic, - % topic_tokens => TopicTks, - % sync_timeout => SyncTimeout, - % templates => Templates, - % producers_opts => ProducerOpts }, ok = emqx_resource:allocate_resource(InstanceId, client_id, ClientId),