fix(dynamo): change `database` to `table` in the schema of the DynamoDB bridge

there is no term like `database` in DynamoDB, the correct concept should be `table`
This commit is contained in:
firest 2023-04-19 15:56:29 +08:00
parent 18ab3b8923
commit bc353b0a06
4 changed files with 36 additions and 26 deletions

View File

@ -43,7 +43,7 @@ values(_Method) ->
type => dynamo, type => dynamo,
name => <<"foo">>, name => <<"foo">>,
url => <<"http://127.0.0.1:8000">>, url => <<"http://127.0.0.1:8000">>,
database => <<"mqtt">>, table => <<"mqtt">>,
pool_size => 8, pool_size => 8,
username => <<"root">>, username => <<"root">>,
password => <<"******">>, password => <<"******">>,

View File

@ -158,7 +158,7 @@ dynamo_config(BridgeType, Config) ->
"bridges.~s.~s {\n" "bridges.~s.~s {\n"
" enable = true\n" " enable = true\n"
" url = ~p\n" " url = ~p\n"
" database = ~p\n" " table = ~p\n"
" username = ~p\n" " username = ~p\n"
" password = ~p\n" " password = ~p\n"
" resource_opts = {\n" " resource_opts = {\n"

View File

@ -51,20 +51,24 @@ roots() ->
fields(config) -> fields(config) ->
[ [
{url, mk(binary(), #{required => true, desc => ?DESC("url")})} {url, mk(binary(), #{required => true, desc => ?DESC("url")})},
| add_default_username( {table, mk(binary(), #{required => true, desc => ?DESC("table")})}
| override_schemas(
emqx_connector_schema_lib:relational_db_fields() emqx_connector_schema_lib:relational_db_fields()
) )
]. ].
add_default_username(Fields) -> override_schemas(Fields) ->
lists:map( lists:foldr(
fun fun
({username, OrigUsernameFn}) -> ({username, OrigUsernameFn}, Acc) ->
{username, add_default_fn(OrigUsernameFn, <<"root">>)}; [{username, add_default_fn(OrigUsernameFn, <<"root">>)} | Acc];
(Field) -> ({database, _}, Acc) ->
Field Acc;
(Field, Acc) ->
[Field | Acc]
end, end,
[],
Fields Fields
). ).
@ -88,7 +92,7 @@ on_start(
url := Url, url := Url,
username := Username, username := Username,
password := Password, password := Password,
database := Database, table := Table,
pool_size := PoolSize pool_size := PoolSize
} = Config } = Config
) -> ) ->
@ -115,7 +119,7 @@ on_start(
Templates = parse_template(Config), Templates = parse_template(Config),
State = #{ State = #{
poolname => InstanceId, poolname => InstanceId,
database => Database, table => Table,
templates => Templates templates => Templates
}, },
case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of
@ -183,7 +187,7 @@ do_query(
InstanceId, InstanceId,
Query, Query,
ApplyMode, ApplyMode,
#{poolname := PoolName, templates := Templates, database := Database} = State #{poolname := PoolName, templates := Templates, table := Table} = State
) -> ) ->
?TRACE( ?TRACE(
"QUERY", "QUERY",
@ -192,7 +196,7 @@ do_query(
), ),
Result = ecpool:pick_and_do( Result = ecpool:pick_and_do(
PoolName, PoolName,
{?MODULE, worker_do_query, [Database, Query, Templates]}, {?MODULE, worker_do_query, [Table, Query, Templates]},
ApplyMode ApplyMode
), ),
@ -217,33 +221,33 @@ do_query(
Result Result
end. end.
worker_do_query(_Client, Database, Query0, Templates) -> worker_do_query(_Client, Table, Query0, Templates) ->
try try
Query = apply_template(Query0, Templates), Query = apply_template(Query0, Templates),
execute(Query, Database) execute(Query, Table)
catch catch
_Type:Reason -> _Type:Reason ->
{error, {unrecoverable_error, {invalid_request, Reason}}} {error, {unrecoverable_error, {invalid_request, Reason}}}
end. end.
%% some simple query commands for authn/authz or test %% some simple query commands for authn/authz or test
execute({insert_item, Msg}, Database) -> execute({insert_item, Msg}, Table) ->
Item = convert_to_item(Msg), Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Database, Item); erlcloud_ddb2:put_item(Table, Item);
execute({delete_item, Key}, Database) -> execute({delete_item, Key}, Table) ->
erlcloud_ddb2:delete_item(Database, Key); erlcloud_ddb2:delete_item(Table, Key);
execute({get_item, Key}, Database) -> execute({get_item, Key}, Table) ->
erlcloud_ddb2:get_item(Database, Key); erlcloud_ddb2:get_item(Table, Key);
%% commands for data bridge query or batch query %% commands for data bridge query or batch query
execute({send_message, Msg}, Database) -> execute({send_message, Msg}, Table) ->
Item = convert_to_item(Msg), Item = convert_to_item(Msg),
erlcloud_ddb2:put_item(Database, Item); erlcloud_ddb2:put_item(Table, Item);
execute([{put, _} | _] = Msgs, Database) -> execute([{put, _} | _] = Msgs, Table) ->
%% type of batch_write_item argument :: batch_write_item_request_items() %% type of batch_write_item argument :: batch_write_item_request_items()
%% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item())
%% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())}
%% batch_write_item_request() :: {put, item()} | {delete, key()} %% batch_write_item_request() :: {put, item()} | {delete, key()}
erlcloud_ddb2:batch_write_item({Database, Msgs}). erlcloud_ddb2:batch_write_item({Table, Msgs}).
connect(Opts) -> connect(Opts) ->
#{ #{

View File

@ -11,4 +11,10 @@ emqx_ee_connector_dynamo {
} }
} }
table.desc:
"""DynamoDB Table."""
table.label:
"""DynamoDB Table"""
} }