Merge pull request #10460 from zmstone/0420-sync-release-50-to-masteer

0420 sync release 50 to masteer
This commit is contained in:
Zaiming (Stone) Shi 2023-04-20 16:41:50 +02:00 committed by GitHub
commit 82e6ce53be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 107 additions and 62 deletions

View File

@ -1633,7 +1633,9 @@ fields("sysmon") ->
{"top", {"top",
sc( sc(
ref("sysmon_top"), ref("sysmon_top"),
#{} %% Userful monitoring solution when benchmarking,
%% but hardly common enough for regular users.
#{importance => ?IMPORTANCE_HIDDEN}
)} )}
]; ];
fields("sysmon_vm") -> fields("sysmon_vm") ->

View File

@ -0,0 +1,5 @@
Fix some configuration item terminology errors in the DynamoDB data bridge:
- Changed `database` to `table`
- Changed `username` to `aws_access_key_id`
- Changed `password` to `aws_secret_access_key`

View File

@ -1,6 +1,6 @@
{application, emqx_ee_bridge, [ {application, emqx_ee_bridge, [
{description, "EMQX Enterprise data bridges"}, {description, "EMQX Enterprise data bridges"},
{vsn, "0.1.10"}, {vsn, "0.1.11"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,

View File

@ -43,10 +43,10 @@ values(_Method) ->
type => dynamo, type => dynamo,
name => <<"foo">>, name => <<"foo">>,
url => <<"http://127.0.0.1:8000">>, url => <<"http://127.0.0.1:8000">>,
database => <<"mqtt">>, table => <<"mqtt">>,
pool_size => 8, pool_size => 8,
username => <<"root">>, aws_access_key_id => <<"root">>,
password => <<"******">>, aws_secret_access_key => <<"******">>,
template => ?DEFAULT_TEMPLATE, template => ?DEFAULT_TEMPLATE,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
resource_opts => #{ resource_opts => #{

View File

@ -14,8 +14,8 @@
% DB defaults % DB defaults
-define(TABLE, "mqtt"). -define(TABLE, "mqtt").
-define(TABLE_BIN, to_bin(?TABLE)). -define(TABLE_BIN, to_bin(?TABLE)).
-define(USERNAME, "root"). -define(ACCESS_KEY_ID, "root").
-define(PASSWORD, "public"). -define(SECRET_ACCESS_KEY, "public").
-define(HOST, "dynamo"). -define(HOST, "dynamo").
-define(PORT, 8000). -define(PORT, 8000).
-define(SCHEMA, "http://"). -define(SCHEMA, "http://").
@ -158,9 +158,9 @@ dynamo_config(BridgeType, Config) ->
"bridges.~s.~s {\n" "bridges.~s.~s {\n"
" enable = true\n" " enable = true\n"
" url = ~p\n" " url = ~p\n"
" database = ~p\n" " table = ~p\n"
" username = ~p\n" " aws_access_key_id = ~p\n"
" password = ~p\n" " aws_secret_access_key = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"
" request_timeout = 500ms\n" " request_timeout = 500ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
@ -172,8 +172,8 @@ dynamo_config(BridgeType, Config) ->
Name, Name,
Url, Url,
?TABLE, ?TABLE,
?USERNAME, ?ACCESS_KEY_ID,
?PASSWORD, ?SECRET_ACCESS_KEY,
BatchSize, BatchSize,
QueryMode QueryMode
] ]
@ -244,10 +244,10 @@ delete_table(_Config) ->
setup_dynamo(Config) -> setup_dynamo(Config) ->
Host = ?GET_CONFIG(host, Config), Host = ?GET_CONFIG(host, Config),
Port = ?GET_CONFIG(port, Config), Port = ?GET_CONFIG(port, Config),
erlcloud_ddb2:configure(?USERNAME, ?PASSWORD, Host, Port, ?SCHEMA). erlcloud_ddb2:configure(?ACCESS_KEY_ID, ?SECRET_ACCESS_KEY, Host, Port, ?SCHEMA).
directly_setup_dynamo() -> directly_setup_dynamo() ->
erlcloud_ddb2:configure(?USERNAME, ?PASSWORD, ?HOST, ?PORT, ?SCHEMA). erlcloud_ddb2:configure(?ACCESS_KEY_ID, ?SECRET_ACCESS_KEY, ?HOST, ?PORT, ?SCHEMA).
directly_query(Query) -> directly_query(Query) ->
directly_setup_dynamo(), directly_setup_dynamo(),

View File

@ -51,29 +51,22 @@ roots() ->
fields(config) -> fields(config) ->
[ [
{url, mk(binary(), #{required => true, desc => ?DESC("url")})} {url, mk(binary(), #{required => true, desc => ?DESC("url")})},
| add_default_username( {table, mk(binary(), #{required => true, desc => ?DESC("table")})},
emqx_connector_schema_lib:relational_db_fields() {aws_access_key_id,
) mk(
binary(),
#{required => true, desc => ?DESC("aws_access_key_id")}
)},
{aws_secret_access_key,
mk(
binary(),
#{required => true, desc => ?DESC("aws_secret_access_key")}
)},
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{auto_reconnect, fun emqx_connector_schema_lib:auto_reconnect/1}
]. ].
add_default_username(Fields) ->
lists:map(
fun
({username, OrigUsernameFn}) ->
{username, add_default_fn(OrigUsernameFn, <<"root">>)};
(Field) ->
Field
end,
Fields
).
add_default_fn(OrigFn, Default) ->
fun
(default) -> Default;
(Field) -> OrigFn(Field)
end.
%%======================================================================================== %%========================================================================================
%% `emqx_resource' API %% `emqx_resource' API
%%======================================================================================== %%========================================================================================
@ -86,16 +79,16 @@ on_start(
InstanceId, InstanceId,
#{ #{
url := Url, url := Url,
username := Username, aws_access_key_id := AccessKeyID,
password := Password, aws_secret_access_key := SecretAccessKey,
database := Database, table := Table,
pool_size := PoolSize pool_size := PoolSize
} = Config } = Config
) -> ) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_dynamo_connector", msg => "starting_dynamo_connector",
connector => InstanceId, connector => InstanceId,
config => emqx_utils:redact(Config) config => redact(Config)
}), }),
{Schema, Server} = get_host_schema(to_str(Url)), {Schema, Server} = get_host_schema(to_str(Url)),
@ -105,8 +98,8 @@ on_start(
{config, #{ {config, #{
host => Host, host => Host,
port => Port, port => Port,
username => to_str(Username), aws_access_key_id => to_str(AccessKeyID),
password => to_str(Password), aws_secret_access_key => to_str(SecretAccessKey),
schema => Schema schema => Schema
}}, }},
{pool_size, PoolSize} {pool_size, PoolSize}
@ -115,7 +108,7 @@ on_start(
Templates = parse_template(Config), Templates = parse_template(Config),
State = #{ State = #{
pool_name => InstanceId, pool_name => InstanceId,
database => Database, table => Table,
templates => Templates templates => Templates
}, },
case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
@ -183,7 +176,7 @@ do_query(
InstanceId, InstanceId,
Query, Query,
ApplyMode, ApplyMode,
#{pool_name := PoolName, templates := Templates, database := Database} = State #{pool_name := PoolName, templates := Templates, table := Table} = State
) -> ) ->
?TRACE( ?TRACE(
"QUERY", "QUERY",
@ -192,7 +185,7 @@ do_query(
), ),
Result = ecpool:pick_and_do( Result = ecpool:pick_and_do(
PoolName, PoolName,
{?MODULE, worker_do_query, [Database, Query, Templates]}, {?MODULE, worker_do_query, [Table, Query, Templates]},
ApplyMode ApplyMode
), ),
@ -217,43 +210,43 @@ do_query(
Result Result
end. end.
worker_do_query(_Client, Database, Query0, Templates) -> worker_do_query(_Client, Table, Query0, Templates) ->
try try
Query = apply_template(Query0, Templates), Query = apply_template(Query0, Templates),
execute(Query, Database) execute(Query, Table)
catch catch
_Type:Reason -> _Type:Reason ->
{error, {unrecoverable_error, {invalid_request, Reason}}} {error, {unrecoverable_error, {invalid_request, Reason}}}
end. end.
%% some simple query commands for authn/authz or test %% some simple query commands for authn/authz or test
execute({insert_item, Msg}, Database) -> execute({insert_item, Msg}, Table) ->
Item = convert_to_item(Msg), Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Database, Item); erlcloud_ddb2:put_item(Table, Item);
execute({delete_item, Key}, Database) -> execute({delete_item, Key}, Table) ->
erlcloud_ddb2:delete_item(Database, Key); erlcloud_ddb2:delete_item(Table, Key);
execute({get_item, Key}, Database) -> execute({get_item, Key}, Table) ->
erlcloud_ddb2:get_item(Database, Key); erlcloud_ddb2:get_item(Table, Key);
%% commands for data bridge query or batch query %% commands for data bridge query or batch query
execute({send_message, Msg}, Database) -> execute({send_message, Msg}, Table) ->
Item = convert_to_item(Msg), Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Database, Item); erlcloud_ddb2:put_item(Table, Item);
execute([{put, _} | _] = Msgs, Database) -> execute([{put, _} | _] = Msgs, Table) ->
%% type of batch_write_item argument :: batch_write_item_request_items() %% type of batch_write_item argument :: batch_write_item_request_items()
%% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item())
%% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())}
%% batch_write_item_request() :: {put, item()} | {delete, key()} %% batch_write_item_request() :: {put, item()} | {delete, key()}
erlcloud_ddb2:batch_write_item({Database, Msgs}). erlcloud_ddb2:batch_write_item({Table, Msgs}).
connect(Opts) -> connect(Opts) ->
#{ #{
username := Username, aws_access_key_id := AccessKeyID,
password := Password, aws_secret_access_key := SecretAccessKey,
host := Host, host := Host,
port := Port, port := Port,
schema := Schema schema := Schema
} = proplists:get_value(config, Opts), } = proplists:get_value(config, Opts),
erlcloud_ddb2:configure(Username, Password, Host, Port, Schema), erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema),
%% The dynamodb driver uses caller process as its connection process %% The dynamodb driver uses caller process as its connection process
%% so at here, the connection process is the ecpool worker self %% so at here, the connection process is the ecpool worker self
@ -338,3 +331,6 @@ convert2binary(Value) when is_map(Value) ->
do_async_reply(Result, {ReplyFun, [Context]}) -> do_async_reply(Result, {ReplyFun, [Context]}) ->
ReplyFun(Context, Result). ReplyFun(Context, Result).
redact(Data) ->
emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end).

View File

@ -1,5 +1,23 @@
emqx_ee_connector_dynamo { emqx_ee_connector_dynamo {
aws_access_key_id.desc:
"""Access Key ID for connecting to DynamoDB."""
aws_access_key_id.label:
"""AWS Access Key ID"""
aws_secret_access_key.desc:
"""AWS Secret Access Key for connecting to DynamoDB."""
aws_secret_access_key.label:
"""AWS Secret Access Key"""
table.desc:
"""DynamoDB Table."""
table.label:
"""Table """
url.desc: url.desc:
"""The url of DynamoDB endpoint.""" """The url of DynamoDB endpoint."""

View File

@ -1,5 +1,23 @@
emqx_ee_connector_dynamo { emqx_ee_connector_dynamo {
aws_access_key_id.desc:
"""DynamoDB 的访问 ID。"""
aws_access_key_id.label:
"""连接访问 ID"""
aws_secret_access_key.desc:
"""DynamoDB 的访问密钥。"""
aws_secret_access_key.label:
"""连接访问密钥"""
table.desc:
"""DynamoDB 的表。"""
table.label:
"""表"""
url.desc: url.desc:
"""DynamoDB 的地址。""" """DynamoDB 的地址。"""

View File

@ -27,9 +27,13 @@ add_ebin(Dir) ->
split_file(Path) -> split_file(Path) ->
{ok, DescMap} = hocon:load(Path), {ok, DescMap} = hocon:load(Path),
[{Module, Descs}] = maps:to_list(DescMap), [{Module, Descs}] = maps:to_list(DescMap),
try
ok = split(Path, Module, <<"en">>, Descs), ok = split(Path, Module, <<"en">>, Descs),
ok = split(Path, Module, <<"zh">>, Descs), ok = split(Path, Module, <<"zh">>, Descs)
ok. catch
throw : already_done ->
ok
end.
split(Path, Module, Lang, Fields) when is_map(Fields) -> split(Path, Module, Lang, Fields) when is_map(Fields) ->
split(Path, Module, Lang, maps:to_list(Fields)); split(Path, Module, Lang, maps:to_list(Fields));
@ -54,6 +58,8 @@ rename(FilePath, Lang) ->
BaseName = filename:basename(FilePath), BaseName = filename:basename(FilePath),
filename:join([Dir, Lang, BaseName]). filename:join([Dir, Lang, BaseName]).
do_split(_Path, _Name, _Lang, #{<<"desc">> := Desc}) when is_binary(Desc) ->
throw(already_done);
do_split(Path, Name, Lang, #{<<"desc">> := Desc} = D) -> do_split(Path, Name, Lang, #{<<"desc">> := Desc} = D) ->
try try
Label = maps:get(<<"label">>, D, #{}), Label = maps:get(<<"label">>, D, #{}),