diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index 29f8ef84d..70a27ef6e 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -44,6 +44,17 @@ fields(config) -> binary(), #{default => <<"TopicTest">>, desc => ?DESC(topic)} )}, + {access_key, + mk( + binary(), + #{default => <<>>, desc => ?DESC("access_key")} + )}, + {secret_key, + mk( + binary(), + #{default => <<>>, desc => ?DESC("secret_key")} + )}, + {security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})}, {sync_timeout, mk( emqx_schema:duration(), @@ -59,39 +70,15 @@ fields(config) -> emqx_schema:bytesize(), #{default => <<"1024KB">>, desc => ?DESC(send_buffer)} )}, - {security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})} - | relational_fields() + + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, + {auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1} ]. -add_default_username(Fields) -> - lists:map( - fun - ({username, OrigUsernameFn}) -> - {username, add_default_fn(OrigUsernameFn, <<"">>)}; - (Field) -> - Field - end, - Fields - ). - -add_default_fn(OrigFn, Default) -> - fun - (default) -> Default; - (Field) -> OrigFn(Field) - end. - servers() -> Meta = #{desc => ?DESC("servers")}, emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). -relational_fields() -> - Fields = [username, password, auto_reconnect], - Values = lists:filter( - fun({E, _}) -> lists:member(E, Fields) end, - emqx_connector_schema_lib:relational_db_fields() - ), - add_default_username(Values). - %%======================================================================================== %% `emqx_resource' API %%======================================================================================== @@ -102,21 +89,20 @@ is_buffer_supported() -> false. on_start( InstanceId, - #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config1 + #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config ) -> ?SLOG(info, #{ msg => "starting_rocketmq_connector", connector => InstanceId, - config => redact(Config1) + config => redact(Config) }), - Config = maps:merge(default_security_info(), Config1), Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS), ClientId = client_id(InstanceId), - ClientCfg = #{acl_info => #{}}, TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), - ProducerOpts = make_producer_opts(Config), + #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config), + ClientCfg = #{acl_info => AclInfo}, Templates = parse_template(Config), ProducersMapPID = create_producers_map(ClientId), State = #{ @@ -140,11 +126,21 @@ on_start( Error end. -on_stop(InstanceId, #{client_id := ClientId, producers_map_pid := Pid} = _State) -> +on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) -> ?SLOG(info, #{ msg => "stopping_rocketmq_connector", connector => InstanceId }), + + Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}), + lists:foreach( + fun([Topic, Producer]) -> + ets:delete(ClientId, {RawTopic, Topic}), + _ = rocketmq:stop_and_delete_supervised_producers(Producer) + end, + Producers + ), + Pid ! ok, ok = rocketmq:stop_and_delete_supervised_client(ClientId). @@ -276,6 +272,8 @@ client_id(InstanceId) -> redact(Msg) -> emqx_utils:redact(Msg, fun is_sensitive_key/1). +is_sensitive_key(secret_key) -> + true; is_sensitive_key(security_token) -> true; is_sensitive_key(_) -> @@ -283,14 +281,14 @@ is_sensitive_key(_) -> make_producer_opts( #{ - username := Username, - password := Password, + access_key := AccessKey, + secret_key := SecretKey, security_token := SecurityToken, send_buffer := SendBuff, refresh_interval := RefreshInterval } ) -> - ACLInfo = acl_info(Username, Password, SecurityToken), + ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken), #{ tcp_opts => [{sndbuf, SendBuff}], ref_topic_route_interval => RefreshInterval, @@ -299,17 +297,17 @@ make_producer_opts( acl_info(<<>>, <<>>, <<>>) -> #{}; -acl_info(Username, Password, <<>>) when is_binary(Username), is_binary(Password) -> +acl_info(AccessKey, SecretKey, <<>>) when is_binary(AccessKey), is_binary(SecretKey) -> #{ - access_key => Username, - secret_key => Password + access_key => AccessKey, + secret_key => SecretKey }; -acl_info(Username, Password, SecurityToken) when - is_binary(Username), is_binary(Password), is_binary(SecurityToken) +acl_info(AccessKey, SecretKey, SecurityToken) when + is_binary(AccessKey), is_binary(SecretKey), is_binary(SecurityToken) -> #{ - access_key => Username, - secret_key => Password, + access_key => AccessKey, + secret_key => SecretKey, security_token => SecurityToken }; acl_info(_, _, _) -> @@ -342,6 +340,3 @@ get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) -> ets:insert(ClientId, {TopicKey, Producers0}), Producers0 end. - -default_security_info() -> - #{username => <<>>, password => <<>>, security_token => <<>>}. diff --git a/rel/i18n/emqx_ee_connector_rocketmq.hocon b/rel/i18n/emqx_ee_connector_rocketmq.hocon index 7f786898e..ddbe3a77b 100644 --- a/rel/i18n/emqx_ee_connector_rocketmq.hocon +++ b/rel/i18n/emqx_ee_connector_rocketmq.hocon @@ -26,6 +26,28 @@ The RocketMQ default port 9876 is used if `[:Port]` is not specified.""" } } + access_key { + desc { + en: """RocketMQ server `accessKey`.""" + zh: """RocketMQ 服务器的 `accessKey`。""" + } + label: { + en: "AccessKey" + zh: "AccessKey" + } + } + + secret_key { + desc { + en: """RocketMQ server `secretKey`.""" + zh: """RocketMQ 服务器的 `secretKey`。""" + } + label: { + en: "SecretKey" + zh: "SecretKey" + } + } + sync_timeout { desc { en: """Timeout of RocketMQ driver synchronous call."""