Merge pull request #10508 from lafirest/fix/rocketmq_log

fix(rocketmq): fix that the update of ACL info not working
This commit is contained in:
lafirest 2023-04-25 18:25:50 +08:00 committed by GitHub
commit 9a3b8f7c1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 46 deletions

View File

@ -44,6 +44,17 @@ fields(config) ->
binary(), binary(),
#{default => <<"TopicTest">>, desc => ?DESC(topic)} #{default => <<"TopicTest">>, desc => ?DESC(topic)}
)}, )},
{access_key,
mk(
binary(),
#{default => <<>>, desc => ?DESC("access_key")}
)},
{secret_key,
mk(
binary(),
#{default => <<>>, desc => ?DESC("secret_key")}
)},
{security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})},
{sync_timeout, {sync_timeout,
mk( mk(
emqx_schema:duration(), emqx_schema:duration(),
@ -59,39 +70,15 @@ fields(config) ->
emqx_schema:bytesize(), emqx_schema:bytesize(),
#{default => <<"1024KB">>, desc => ?DESC(send_buffer)} #{default => <<"1024KB">>, desc => ?DESC(send_buffer)}
)}, )},
{security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})}
| relational_fields() {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
]. ].
add_default_username(Fields) ->
lists:map(
fun
({username, OrigUsernameFn}) ->
{username, add_default_fn(OrigUsernameFn, <<"">>)};
(Field) ->
Field
end,
Fields
).
add_default_fn(OrigFn, Default) ->
fun
(default) -> Default;
(Field) -> OrigFn(Field)
end.
servers() -> servers() ->
Meta = #{desc => ?DESC("servers")}, Meta = #{desc => ?DESC("servers")},
emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS). emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS).
relational_fields() ->
Fields = [username, password, auto_reconnect],
Values = lists:filter(
fun({E, _}) -> lists:member(E, Fields) end,
emqx_connector_schema_lib:relational_db_fields()
),
add_default_username(Values).
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
@ -102,21 +89,20 @@ is_buffer_supported() -> false.
on_start( on_start(
InstanceId, InstanceId,
#{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config1 #{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_rocketmq_connector", msg => "starting_rocketmq_connector",
connector => InstanceId, connector => InstanceId,
config => redact(Config1) config => redact(Config)
}), }),
Config = maps:merge(default_security_info(), Config1),
Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS), Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS),
ClientId = client_id(InstanceId), ClientId = client_id(InstanceId),
ClientCfg = #{acl_info => #{}},
TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic), TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Config), #{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config),
ClientCfg = #{acl_info => AclInfo},
Templates = parse_template(Config), Templates = parse_template(Config),
ProducersMapPID = create_producers_map(ClientId), ProducersMapPID = create_producers_map(ClientId),
State = #{ State = #{
@ -140,11 +126,21 @@ on_start(
Error Error
end. end.
on_stop(InstanceId, #{client_id := ClientId, producers_map_pid := Pid} = _State) -> on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "stopping_rocketmq_connector", msg => "stopping_rocketmq_connector",
connector => InstanceId connector => InstanceId
}), }),
Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}),
lists:foreach(
fun([Topic, Producer]) ->
ets:delete(ClientId, {RawTopic, Topic}),
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
end,
Producers
),
Pid ! ok, Pid ! ok,
ok = rocketmq:stop_and_delete_supervised_client(ClientId). ok = rocketmq:stop_and_delete_supervised_client(ClientId).
@ -276,6 +272,8 @@ client_id(InstanceId) ->
redact(Msg) -> redact(Msg) ->
emqx_utils:redact(Msg, fun is_sensitive_key/1). emqx_utils:redact(Msg, fun is_sensitive_key/1).
is_sensitive_key(secret_key) ->
true;
is_sensitive_key(security_token) -> is_sensitive_key(security_token) ->
true; true;
is_sensitive_key(_) -> is_sensitive_key(_) ->
@ -283,14 +281,14 @@ is_sensitive_key(_) ->
make_producer_opts( make_producer_opts(
#{ #{
username := Username, access_key := AccessKey,
password := Password, secret_key := SecretKey,
security_token := SecurityToken, security_token := SecurityToken,
send_buffer := SendBuff, send_buffer := SendBuff,
refresh_interval := RefreshInterval refresh_interval := RefreshInterval
} }
) -> ) ->
ACLInfo = acl_info(Username, Password, SecurityToken), ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
#{ #{
tcp_opts => [{sndbuf, SendBuff}], tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval, ref_topic_route_interval => RefreshInterval,
@ -299,17 +297,17 @@ make_producer_opts(
acl_info(<<>>, <<>>, <<>>) -> acl_info(<<>>, <<>>, <<>>) ->
#{}; #{};
acl_info(Username, Password, <<>>) when is_binary(Username), is_binary(Password) -> acl_info(AccessKey, SecretKey, <<>>) when is_binary(AccessKey), is_binary(SecretKey) ->
#{ #{
access_key => Username, access_key => AccessKey,
secret_key => Password secret_key => SecretKey
}; };
acl_info(Username, Password, SecurityToken) when acl_info(AccessKey, SecretKey, SecurityToken) when
is_binary(Username), is_binary(Password), is_binary(SecurityToken) is_binary(AccessKey), is_binary(SecretKey), is_binary(SecurityToken)
-> ->
#{ #{
access_key => Username, access_key => AccessKey,
secret_key => Password, secret_key => SecretKey,
security_token => SecurityToken security_token => SecurityToken
}; };
acl_info(_, _, _) -> acl_info(_, _, _) ->
@ -342,6 +340,3 @@ get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) ->
ets:insert(ClientId, {TopicKey, Producers0}), ets:insert(ClientId, {TopicKey, Producers0}),
Producers0 Producers0
end. end.
default_security_info() ->
#{username => <<>>, password => <<>>, security_token => <<>>}.

View File

@ -26,6 +26,28 @@ The RocketMQ default port 9876 is used if `[:Port]` is not specified."""
} }
} }
access_key {
desc {
en: """RocketMQ server `accessKey`."""
zh: """RocketMQ 服务器的 `accessKey`。"""
}
label: {
en: "AccessKey"
zh: "AccessKey"
}
}
secret_key {
desc {
en: """RocketMQ server `secretKey`."""
zh: """RocketMQ 服务器的 `secretKey`。"""
}
label: {
en: "SecretKey"
zh: "SecretKey"
}
}
sync_timeout { sync_timeout {
desc { desc {
en: """Timeout of RocketMQ driver synchronous call.""" en: """Timeout of RocketMQ driver synchronous call."""