fix(rocketmq): fix that the update of ACL info not working

This commit is contained in:
firest 2023-04-25 16:15:28 +08:00
parent 996d5eee45
commit 3bb50a5751
2 changed files with 63 additions and 46 deletions

View File

@ -44,6 +44,17 @@ fields(config) ->
binary(),
#{default => <<"TopicTest">>, desc => ?DESC(topic)}
)},
{access_key,
mk(
binary(),
#{default => <<>>, desc => ?DESC("access_key")}
)},
{secret_key,
mk(
binary(),
#{default => <<>>, desc => ?DESC("secret_key")}
)},
{security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})},
{sync_timeout,
mk(
emqx_schema:duration(),
@ -59,39 +70,15 @@ fields(config) ->
emqx_schema:bytesize(),
#{default => <<"1024KB">>, desc => ?DESC(send_buffer)}
)},
{security_token, mk(binary(), #{default => <<>>, desc => ?DESC(security_token)})}
| relational_fields()
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
].
add_default_username(Fields) ->
lists:map(
fun
({username, OrigUsernameFn}) ->
{username, add_default_fn(OrigUsernameFn, <<"">>)};
(Field) ->
Field
end,
Fields
).
add_default_fn(OrigFn, Default) ->
fun
(default) -> Default;
(Field) -> OrigFn(Field)
end.
servers() ->
Meta = #{desc => ?DESC("servers")},
emqx_schema:servers_sc(Meta, ?ROCKETMQ_HOST_OPTIONS).
relational_fields() ->
Fields = [username, password, auto_reconnect],
Values = lists:filter(
fun({E, _}) -> lists:member(E, Fields) end,
emqx_connector_schema_lib:relational_db_fields()
),
add_default_username(Values).
%%========================================================================================
%% `emqx_resource' API
%%========================================================================================
@ -102,21 +89,20 @@ is_buffer_supported() -> false.
on_start(
InstanceId,
#{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config1
#{servers := BinServers, topic := Topic, sync_timeout := SyncTimeout} = Config
) ->
?SLOG(info, #{
msg => "starting_rocketmq_connector",
connector => InstanceId,
config => redact(Config1)
config => redact(Config)
}),
Config = maps:merge(default_security_info(), Config1),
Servers = emqx_schema:parse_servers(BinServers, ?ROCKETMQ_HOST_OPTIONS),
ClientId = client_id(InstanceId),
ClientCfg = #{acl_info => #{}},
TopicTks = emqx_plugin_libs_rule:preproc_tmpl(Topic),
ProducerOpts = make_producer_opts(Config),
#{acl_info := AclInfo} = ProducerOpts = make_producer_opts(Config),
ClientCfg = #{acl_info => AclInfo},
Templates = parse_template(Config),
ProducersMapPID = create_producers_map(ClientId),
State = #{
@ -140,11 +126,21 @@ on_start(
Error
end.
on_stop(InstanceId, #{client_id := ClientId, producers_map_pid := Pid} = _State) ->
on_stop(InstanceId, #{client_id := ClientId, topic := RawTopic, producers_map_pid := Pid} = _State) ->
?SLOG(info, #{
msg => "stopping_rocketmq_connector",
connector => InstanceId
}),
Producers = ets:match(ClientId, {{RawTopic, '$1'}, '$2'}),
lists:foreach(
fun([Topic, Producer]) ->
ets:delete(ClientId, {RawTopic, Topic}),
_ = rocketmq:stop_and_delete_supervised_producers(Producer)
end,
Producers
),
Pid ! ok,
ok = rocketmq:stop_and_delete_supervised_client(ClientId).
@ -276,6 +272,8 @@ client_id(InstanceId) ->
redact(Msg) ->
emqx_utils:redact(Msg, fun is_sensitive_key/1).
is_sensitive_key(secret_key) ->
true;
is_sensitive_key(security_token) ->
true;
is_sensitive_key(_) ->
@ -283,14 +281,14 @@ is_sensitive_key(_) ->
make_producer_opts(
#{
username := Username,
password := Password,
access_key := AccessKey,
secret_key := SecretKey,
security_token := SecurityToken,
send_buffer := SendBuff,
refresh_interval := RefreshInterval
}
) ->
ACLInfo = acl_info(Username, Password, SecurityToken),
ACLInfo = acl_info(AccessKey, SecretKey, SecurityToken),
#{
tcp_opts => [{sndbuf, SendBuff}],
ref_topic_route_interval => RefreshInterval,
@ -299,17 +297,17 @@ make_producer_opts(
acl_info(<<>>, <<>>, <<>>) ->
#{};
acl_info(Username, Password, <<>>) when is_binary(Username), is_binary(Password) ->
acl_info(AccessKey, SecretKey, <<>>) when is_binary(AccessKey), is_binary(SecretKey) ->
#{
access_key => Username,
secret_key => Password
access_key => AccessKey,
secret_key => SecretKey
};
acl_info(Username, Password, SecurityToken) when
is_binary(Username), is_binary(Password), is_binary(SecurityToken)
acl_info(AccessKey, SecretKey, SecurityToken) when
is_binary(AccessKey), is_binary(SecretKey), is_binary(SecurityToken)
->
#{
access_key => Username,
secret_key => Password,
access_key => AccessKey,
secret_key => SecretKey,
security_token => SecurityToken
};
acl_info(_, _, _) ->
@ -342,6 +340,3 @@ get_producers(ClientId, {_, Topic1} = TopicKey, ProducerOpts) ->
ets:insert(ClientId, {TopicKey, Producers0}),
Producers0
end.
default_security_info() ->
#{username => <<>>, password => <<>>, security_token => <<>>}.

View File

@ -26,6 +26,28 @@ The RocketMQ default port 9876 is used if `[:Port]` is not specified."""
}
}
access_key {
desc {
en: """RocketMQ server `accessKey`."""
zh: """RocketMQ 服务器的 `accessKey`。"""
}
label: {
en: "AccessKey"
zh: "AccessKey"
}
}
secret_key {
desc {
en: """RocketMQ server `secretKey`."""
zh: """RocketMQ 服务器的 `secretKey`。"""
}
label: {
en: "SecretKey"
zh: "SecretKey"
}
}
sync_timeout {
desc {
en: """Timeout of RocketMQ driver synchronous call."""