diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index e3beb1bc7..397c44854 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -5,5 +5,6 @@ PGSQL_TAG=13 LDAP_TAG=2.4.50 INFLUXDB_TAG=2.5.0 TDENGINE_TAG=3.0.2.4 +DYNAMO_TAG=1.21.0 TARGET=emqx/emqx diff --git a/.ci/docker-compose-file/docker-compose-dynamo.yaml b/.ci/docker-compose-file/docker-compose-dynamo.yaml new file mode 100644 index 000000000..926d6287c --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-dynamo.yaml @@ -0,0 +1,15 @@ +version: '3.9' + +services: + dynamodb-local: + container_name: dynamo + image: amazon/dynamodb-local:${DYNAMO_TAG} + restart: always + ports: + - "8000:8000" + environment: + AWS_ACCESS_KEY_ID: root + AWS_SECRET_ACCESS_KEY: public + AWS_DEFAULT_REGION: us-west-2 + networks: + - emqx_bridge diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml index 3f526978e..3dd30af52 100644 --- a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -18,6 +18,7 @@ services: - 15432:5432 - 15433:5433 - 16041:6041 + - 18000:8000 command: - "-host=0.0.0.0" - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index e26134ec8..6188eab17 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -47,5 +47,11 @@ "listen": "0.0.0.0:6041", "upstream": "tdengine:6041", "enabled": true + }, + { + "name": "dynamo", + "listen": "0.0.0.0:8000", + "upstream": "dynamo:8000", + "enabled": true } ] diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 196338336..ddf24d380 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -58,7 +58,12 @@ T == redis_single; T == redis_sentinel; T == redis_cluster; - T == clickhouse + T == clickhouse; + T == pgsql; + T == timescale; + T == matrix; + T == tdengine; + T == dynamo ). load() -> diff --git a/changes/ee/feat-10083.en.md b/changes/ee/feat-10083.en.md new file mode 100644 index 000000000..635549d5e --- /dev/null +++ b/changes/ee/feat-10083.en.md @@ -0,0 +1 @@ +Integrate `DynamoDB` into `bridges` as a new backend. diff --git a/changes/ee/feat-10083.zh.md b/changes/ee/feat-10083.zh.md new file mode 100644 index 000000000..061e2e416 --- /dev/null +++ b/changes/ee/feat-10083.zh.md @@ -0,0 +1 @@ +在 `桥接` 中集成 `DynamoDB`。 diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index 91a937b5c..ac1728ad2 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -9,3 +9,4 @@ redis_cluster pgsql tdengine clickhouse +dynamo diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_dynamo.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_dynamo.conf new file mode 100644 index 000000000..664b13174 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_dynamo.conf @@ -0,0 +1,72 @@ +emqx_ee_bridge_dynamo { + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to DynamoDB. All MQTT `PUBLISH` messages with the topic +matching the `local_topic` will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also `local_topic` is +configured, then both the data got from the rule and the MQTT messages that match `local_topic` +will be forwarded.""" + zh: """发送到 'local_topic' 的消息都会转发到 DynamoDB。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + + template { + desc { + en: """Template, the default value is empty. When this value is empty the whole message will be stored in the database""" + zh: """模板, 默认为空,为空时将会将整个消息存入数据库""" + } + label { + en: "Template" + zh: "模板" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用桥接" + } + } + + desc_config { + desc { + en: """Configuration for an DynamoDB bridge.""" + zh: """DynamoDB 桥接配置""" + } + label: { + en: "DynamoDB Bridge Configuration" + zh: "DynamoDB 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + + desc_name { + desc { + en: """Bridge name.""" + zh: """桥接名字""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_acked.json b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_acked.json new file mode 100644 index 000000000..6ede088a4 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_acked.json @@ -0,0 +1,15 @@ +{ + "TableName": "mqtt_acked", + "KeySchema": [ + { "AttributeName": "topic", "KeyType": "HASH" }, + { "AttributeName": "clientid", "KeyType": "RANGE" } + ], + "AttributeDefinitions": [ + { "AttributeName": "topic", "AttributeType": "S" }, + { "AttributeName": "clientid", "AttributeType": "S" } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } +} diff --git a/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_client.json b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_client.json new file mode 100644 index 000000000..ce1b7d267 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_client.json @@ -0,0 +1,13 @@ +{ + "TableName": "mqtt_client", + "KeySchema": [ + { "AttributeName": "clientid", "KeyType": "HASH" } + ], + "AttributeDefinitions": [ + { "AttributeName": "clientid", "AttributeType": "S" } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } +} diff --git a/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_clientid_msg_map.json b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_clientid_msg_map.json new file mode 100644 index 000000000..fd703c664 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_clientid_msg_map.json @@ -0,0 +1,13 @@ +{ + "TableName": "mqtt_clientid_msg_map", + "KeySchema": [ + { "AttributeName": "clientid", "KeyType": "HASH" } + ], + "AttributeDefinitions": [ + { "AttributeName": "clientid", "AttributeType": "S" } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } +} diff --git a/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_msg.json b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_msg.json new file mode 100644 index 000000000..ad94b8f72 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_msg.json @@ -0,0 +1,13 @@ +{ + "TableName": "mqtt_msg", + "KeySchema": [ + { "AttributeName": "id", "KeyType": "HASH" } + ], + "AttributeDefinitions": [ + { "AttributeName": "id", "AttributeType": "S" } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } +} diff --git a/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_retain.json b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_retain.json new file mode 100644 index 000000000..2a0af2e86 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_retain.json @@ -0,0 +1,13 @@ +{ + "TableName": "mqtt_retain", + "KeySchema": [ + { "AttributeName": "topic", "KeyType": "HASH" } + ], + "AttributeDefinitions": [ + { "AttributeName": "topic", "AttributeType": "S" } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } +} diff --git a/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_sub.json b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_sub.json new file mode 100644 index 000000000..9a559f048 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_sub.json @@ -0,0 +1,16 @@ +{ + "TableName": "mqtt_sub", + "KeySchema": [ + { "AttributeName": "clientid", "KeyType": "HASH" }, + { "AttributeName": "topic", "KeyType": "RANGE" } + ], + "AttributeDefinitions": [ + { "AttributeName": "clientid", "AttributeType": "S" }, + { "AttributeName": "topic", "AttributeType": "S" } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } +} + diff --git a/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_topic_msg_map.json b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_topic_msg_map.json new file mode 100644 index 000000000..effd4b4b9 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_topic_msg_map.json @@ -0,0 +1,13 @@ +{ + "TableName": "mqtt_topic_msg_map", + "KeySchema": [ + { "AttributeName": "topic", "KeyType": "HASH" } + ], + "AttributeDefinitions": [ + { "AttributeName": "topic", "AttributeType": "S" } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index b7f35537e..b5c656291 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -30,7 +30,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_timescale, Method), ref(emqx_ee_bridge_matrix, Method), ref(emqx_ee_bridge_tdengine, Method), - ref(emqx_ee_bridge_clickhouse, Method) + ref(emqx_ee_bridge_clickhouse, Method), + ref(emqx_ee_bridge_dynamo, Method) ]. schema_modules() -> @@ -46,7 +47,8 @@ schema_modules() -> emqx_ee_bridge_timescale, emqx_ee_bridge_matrix, emqx_ee_bridge_tdengine, - emqx_ee_bridge_clickhouse + emqx_ee_bridge_clickhouse, + emqx_ee_bridge_dynamo ]. examples(Method) -> @@ -78,7 +80,8 @@ resource_type(pgsql) -> emqx_connector_pgsql; resource_type(timescale) -> emqx_connector_pgsql; resource_type(matrix) -> emqx_connector_pgsql; resource_type(tdengine) -> emqx_ee_connector_tdengine; -resource_type(clickhouse) -> emqx_ee_connector_clickhouse. +resource_type(clickhouse) -> emqx_ee_connector_clickhouse; +resource_type(dynamo) -> emqx_ee_connector_dynamo. fields(bridges) -> [ @@ -121,6 +124,14 @@ fields(bridges) -> desc => <<"TDengine Bridge Config">>, required => false } + )}, + {dynamo, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_dynamo, "config")), + #{ + desc => <<"Dynamo Bridge Config">>, + required => false + } )} ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs(). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl new file mode 100644 index 000000000..066b873ce --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl @@ -0,0 +1,122 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_dynamo). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1, + values/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_TEMPLATE, <<>>). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"dynamo">> => #{ + summary => <<"DynamoDB Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + enable => true, + type => dynamo, + name => <<"foo">>, + url => <<"http://127.0.0.1:8000">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"root">>, + password => <<"public">>, + template => ?DEFAULT_TEMPLATE, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => sync, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + } + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_dynamo". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {template, + mk( + binary(), + #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("local_topic"), default => undefined} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ + (emqx_ee_connector_dynamo:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); +fields("creation_opts") -> + emqx_resource_schema:fields("creation_opts"); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- + +type_field() -> + {type, mk(enum([dynamo]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl new file mode 100644 index 000000000..26666c6d8 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl @@ -0,0 +1,422 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_dynamo_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +% DB defaults +-define(TABLE, "mqtt"). +-define(TABLE_BIN, to_bin(?TABLE)). +-define(USERNAME, "root"). +-define(PASSWORD, "public"). +-define(HOST, "dynamo"). +-define(PORT, 8000). +-define(SCHEMA, "http://"). +-define(BATCH_SIZE, 10). +-define(PAYLOAD, <<"HELLO">>). + +-define(GET_CONFIG(KEY__, CFG__), proplists:get_value(KEY__, CFG__)). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, with_batch}, + {group, without_batch}, + {group, flaky} + ]. + +groups() -> + TCs0 = emqx_common_test_helpers:all(?MODULE), + + %% due to the poorly implemented driver or other reasons + %% if we mix these cases with others, this suite will become flaky. + Flaky = [t_get_status, t_write_failure, t_write_timeout], + TCs = TCs0 -- Flaky, + + [ + {with_batch, TCs}, + {without_batch, TCs}, + {flaky, Flaky} + ]. + +init_per_group(with_batch, Config0) -> + Config = [{batch_size, ?BATCH_SIZE} | Config0], + common_init(Config); +init_per_group(without_batch, Config0) -> + Config = [{batch_size, 1} | Config0], + common_init(Config); +init_per_group(flaky, Config0) -> + Config = [{batch_size, 1} | Config0], + common_init(Config); +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when Group =:= with_batch; Group =:= without_batch -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok; +end_per_group(Group, Config) when Group =:= flaky -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + timer:sleep(1000), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + emqx_mgmt_api_test_util:end_suite(), + ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]), + ok. + +init_per_testcase(_Testcase, Config) -> + create_table(Config), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = snabbkaffe:stop(), + delete_table(Config), + delete_bridge(Config), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +common_init(ConfigT) -> + Host = os:getenv("DYNAMO_HOST", "toxiproxy"), + Port = list_to_integer(os:getenv("DYNAMO_PORT", "8000")), + + Config0 = [ + {host, Host}, + {port, Port}, + {query_mode, sync}, + {proxy_name, "dynamo"} + | ConfigT + ], + + BridgeType = proplists:get_value(bridge_type, Config0, <<"dynamo">>), + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of + true -> + % Setup toxiproxy + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + % Ensure EE bridge module is loaded + _ = application:load(emqx_ee_bridge), + _ = emqx_ee_bridge:module_info(), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + emqx_mgmt_api_test_util:init_suite(), + % setup dynamo + setup_dynamo(Config0), + {Name, TDConf} = dynamo_config(BridgeType, Config0), + Config = + [ + {dynamo_config, TDConf}, + {dynamo_bridge_type, BridgeType}, + {dynamo_name, Name}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort} + | Config0 + ], + Config; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_dynamo); + _ -> + {skip, no_dynamo} + end + end. + +dynamo_config(BridgeType, Config) -> + Port = integer_to_list(?GET_CONFIG(port, Config)), + Url = "http://" ++ ?GET_CONFIG(host, Config) ++ ":" ++ Port, + Name = atom_to_binary(?MODULE), + BatchSize = ?GET_CONFIG(batch_size, Config), + QueryMode = ?GET_CONFIG(query_mode, Config), + ConfigString = + io_lib:format( + "bridges.~s.~s {\n" + " enable = true\n" + " url = ~p\n" + " database = ~p\n" + " username = ~p\n" + " password = ~p\n" + " resource_opts = {\n" + " request_timeout = 500ms\n" + " batch_size = ~b\n" + " query_mode = ~s\n" + " }\n" + "}", + [ + BridgeType, + Name, + Url, + ?TABLE, + ?USERNAME, + ?PASSWORD, + BatchSize, + QueryMode + ] + ), + {Name, parse_and_check(ConfigString, BridgeType, Name)}. + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + Config. + +create_bridge(Config) -> + BridgeType = ?config(dynamo_bridge_type, Config), + Name = ?config(dynamo_name, Config), + TDConfig = ?config(dynamo_config, Config), + emqx_bridge:create(BridgeType, Name, TDConfig). + +delete_bridge(Config) -> + BridgeType = ?config(dynamo_bridge_type, Config), + Name = ?config(dynamo_name, Config), + emqx_bridge:remove(BridgeType, Name). + +create_bridge_http(Params) -> + Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of + {ok, Res} -> {ok, emqx_json:decode(Res, [return_maps])}; + Error -> Error + end. + +send_message(Config, Payload) -> + Name = ?config(dynamo_name, Config), + BridgeType = ?config(dynamo_bridge_type, Config), + BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), + emqx_bridge:send_message(BridgeID, Payload). + +query_resource(Config, Request) -> + Name = ?config(dynamo_name, Config), + BridgeType = ?config(dynamo_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). + +%% create a table, use the lib-ee/emqx_ee_bridge/priv/dynamo/mqtt_msg.json as template +create_table(Config) -> + directly_setup_dynamo(), + delete_table(Config), + ?assertMatch( + {ok, _}, + erlcloud_ddb2:create_table( + ?TABLE_BIN, + [{<<"id">>, s}], + <<"id">>, + [{provisioned_throughput, {5, 5}}] + ) + ). + +delete_table(_Config) -> + erlcloud_ddb2:delete_table(?TABLE_BIN). + +setup_dynamo(Config) -> + Host = ?GET_CONFIG(host, Config), + Port = ?GET_CONFIG(port, Config), + erlcloud_ddb2:configure(?USERNAME, ?PASSWORD, Host, Port, ?SCHEMA). + +directly_setup_dynamo() -> + erlcloud_ddb2:configure(?USERNAME, ?PASSWORD, ?HOST, ?PORT, ?SCHEMA). + +directly_query(Query) -> + directly_setup_dynamo(), + emqx_ee_connector_dynamo:execute(Query, ?TABLE_BIN). + +directly_get_payload(Key) -> + case directly_query({get_item, {<<"id">>, Key}}) of + {ok, Values} -> + proplists:get_value(<<"payload">>, Values, {error, {invalid_item, Values}}); + Error -> + Error + end. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_setup_via_config_and_publish(Config) -> + ?assertNotEqual(undefined, get(aws_config)), + create_table(Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + MsgId = emqx_misc:gen_id(), + SentData = #{id => MsgId, payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertMatch( + {ok, _}, send_message(Config, SentData) + ), + #{?snk_kind := dynamo_connector_query_return}, + 10_000 + ), + ?assertMatch( + ?PAYLOAD, + directly_get_payload(MsgId) + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(dynamo_connector_query_return, Trace0), + ?assertMatch([#{result := {ok, _}}], Trace), + ok + end + ), + ok. + +t_setup_via_http_api_and_publish(Config) -> + BridgeType = ?config(dynamo_bridge_type, Config), + Name = ?config(dynamo_name, Config), + PgsqlConfig0 = ?config(dynamo_config, Config), + PgsqlConfig = PgsqlConfig0#{ + <<"name">> => Name, + <<"type">> => BridgeType + }, + ?assertMatch( + {ok, _}, + create_bridge_http(PgsqlConfig) + ), + MsgId = emqx_misc:gen_id(), + SentData = #{id => MsgId, payload => ?PAYLOAD}, + ?check_trace( + begin + ?wait_async_action( + ?assertMatch( + {ok, _}, send_message(Config, SentData) + ), + #{?snk_kind := dynamo_connector_query_return}, + 10_000 + ), + ?assertMatch( + ?PAYLOAD, + directly_get_payload(MsgId) + ), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(dynamo_connector_query_return, Trace0), + ?assertMatch([#{result := {ok, _}}], Trace), + ok + end + ), + ok. + +t_get_status(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + + Name = ?config(dynamo_name, Config), + BridgeType = ?config(dynamo_bridge_type, Config), + ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), + + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID)), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + case emqx_resource_manager:health_check(ResourceID) of + {ok, Status} when Status =:= disconnected orelse Status =:= connecting -> + ok; + {error, timeout} -> + ok; + Other -> + ?assert( + false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other])) + ) + end + end), + ok. + +t_write_failure(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + {ok, _} = create_bridge(Config), + SentData = #{id => emqx_misc:gen_id(), payload => ?PAYLOAD}, + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData) + ) + end), + ok. + +t_write_timeout(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + {ok, _} = create_bridge(Config), + SentData = #{id => emqx_misc:gen_id(), payload => ?PAYLOAD}, + emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch( + {error, {resource_error, #{reason := timeout}}}, + query_resource(Config, {send_message, SentData}) + ) + end), + ok. + +t_simple_query(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {get_item, {<<"id">>, <<"not_exists">>}}, + Result = query_resource(Config, Request), + case ?GET_CONFIG(batch_size, Config) of + ?BATCH_SIZE -> + ?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result); + 1 -> + ?assertMatch({ok, []}, Result) + end, + ok. + +t_missing_data(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Result = send_message(Config, #{}), + ?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result), + ok. + +t_bad_parameter(Config) -> + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + Request = {insert_item, bad_parameter}, + Result = query_resource(Config, Request), + ?assertMatch({error, {unrecoverable_error, {invalid_request, _}}}, Result), + ok. + +to_bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8); +to_bin(Bin) when is_binary(Bin) -> + Bin. diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_dynamo.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_dynamo.conf new file mode 100644 index 000000000..e1fc11e03 --- /dev/null +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_dynamo.conf @@ -0,0 +1,14 @@ +emqx_ee_connector_dynamo { + + url { + desc { + en: """The url of DynamoDB endpoint.
""" + zh: """DynamoDB 的地址。
""" + } + label: { + en: "DynamoDB Endpoint" + zh: "DynamoDB 地址" + } + } + +} diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index bcf9508bf..76f6ccfba 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -4,6 +4,7 @@ {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}}, {tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.5"}}}, {clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.2"}}}, + {erlcloud, {git, "https://github.com/emqx/erlcloud.git", {tag,"3.5.16-emqx-1"}}}, {emqx, {path, "../../apps/emqx"}} ]}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 6c9d83bc7..5fcb83baa 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -10,7 +10,8 @@ tdengine, wolff, brod, - clickhouse + clickhouse, + erlcloud ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl new file mode 100644 index 000000000..957706f6a --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -0,0 +1,345 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_dynamo). + +-behaviour(emqx_resource). + +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1]). + +%% `emqx_resource' API +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_query_async/4, + on_batch_query_async/4, + on_get_status/2 +]). + +-export([ + connect/1, + do_get_status/1, + do_async_reply/2, + worker_do_query/4, + worker_do_get_status/1 +]). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-define(DYNAMO_HOST_OPTIONS, #{ + default_port => 8000 +}). + +-ifdef(TEST). +-export([execute/2]). +-endif. + +%%===================================================================== +%% Hocon schema +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {url, mk(binary(), #{required => true, desc => ?DESC("url")})} + | add_default_username( + emqx_connector_schema_lib:relational_db_fields() + ) + ]. + +add_default_username(Fields) -> + lists:map( + fun + ({username, OrigUsernameFn}) -> + {username, add_default_fn(OrigUsernameFn, <<"root">>)}; + (Field) -> + Field + end, + Fields + ). + +add_default_fn(OrigFn, Default) -> + fun + (default) -> Default; + (Field) -> OrigFn(Field) + end. + +%%======================================================================================== +%% `emqx_resource' API +%%======================================================================================== + +callback_mode() -> async_if_possible. + +is_buffer_supported() -> false. + +on_start( + InstanceId, + #{ + url := Url, + username := Username, + password := Password, + database := Database, + pool_size := PoolSize + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_dynamo_connector", + connector => InstanceId, + config => emqx_misc:redact(Config) + }), + + {Schema, Server} = get_host_schema(to_str(Url)), + {Host, Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS), + + Options = [ + {config, #{ + host => Host, + port => Port, + username => to_str(Username), + password => to_str(Password), + schema => Schema + }}, + {pool_size, PoolSize} + ], + + Templates = parse_template(Config), + State = #{ + poolname => InstanceId, + database => Database, + templates => Templates + }, + case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of + ok -> + {ok, State}; + Error -> + Error + end. + +on_stop(InstanceId, #{poolname := PoolName} = _State) -> + ?SLOG(info, #{ + msg => "stopping_dynamo_connector", + connector => InstanceId + }), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstanceId, Query, State) -> + do_query(InstanceId, Query, handover, State). + +on_query_async(InstanceId, Query, Reply, State) -> + do_query( + InstanceId, + Query, + {handover_async, {?MODULE, do_async_reply, [Reply]}}, + State + ). + +%% we only support batch insert +on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> + do_query(InstanceId, Query, handover, State); +on_batch_query(_InstanceId, Query, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +%% we only support batch insert +on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, Reply, State) -> + do_query( + InstanceId, + Query, + {handover_async, {?MODULE, do_async_reply, [Reply]}}, + State + ); +on_batch_query_async(_InstanceId, Query, _Reply, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +on_get_status(_InstanceId, #{poolname := Pool}) -> + Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), + status_result(Health). + +do_get_status(Conn) -> + %% because the dynamodb driver connection process is the ecpool worker self + %% so we must call the checker function inside the worker + ListTables = ecpool_worker:exec(Conn, {?MODULE, worker_do_get_status, []}, infinity), + case ListTables of + {ok, _} -> true; + _ -> false + end. + +worker_do_get_status(_) -> + erlcloud_ddb2:list_tables(). + +status_result(_Status = true) -> connected; +status_result(_Status = false) -> connecting. + +%%======================================================================================== +%% Helper fns +%%======================================================================================== + +do_query( + InstanceId, + Query, + ApplyMode, + #{poolname := PoolName, templates := Templates, database := Database} = State +) -> + ?TRACE( + "QUERY", + "dynamo_connector_received", + #{connector => InstanceId, query => Query, state => State} + ), + Result = ecpool:pick_and_do( + PoolName, + {?MODULE, worker_do_query, [Database, Query, Templates]}, + ApplyMode + ), + + case Result of + {error, Reason} -> + ?tp( + dynamo_connector_query_return, + #{error => Reason} + ), + ?SLOG(error, #{ + msg => "dynamo_connector_do_query_failed", + connector => InstanceId, + query => Query, + reason => Reason + }), + Result; + _ -> + ?tp( + dynamo_connector_query_return, + #{result => Result} + ), + Result + end. + +worker_do_query(_Client, Database, Query0, Templates) -> + try + Query = apply_template(Query0, Templates), + execute(Query, Database) + catch + _Type:Reason -> + {error, {unrecoverable_error, {invalid_request, Reason}}} + end. + +%% some simple query commands for authn/authz or test +execute({insert_item, Msg}, Database) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Database, Item); +execute({delete_item, Key}, Database) -> + erlcloud_ddb2:delete_item(Database, Key); +execute({get_item, Key}, Database) -> + erlcloud_ddb2:get_item(Database, Key); +%% commands for data bridge query or batch query +execute({send_message, Msg}, Database) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Database, Item); +execute([{put, _} | _] = Msgs, Database) -> + %% type of batch_write_item argument :: batch_write_item_request_items() + %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) + %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} + %% batch_write_item_request() :: {put, item()} | {delete, key()} + erlcloud_ddb2:batch_write_item({Database, Msgs}). + +connect(Opts) -> + #{ + username := Username, + password := Password, + host := Host, + port := Port, + schema := Schema + } = proplists:get_value(config, Opts), + erlcloud_ddb2:configure(Username, Password, Host, Port, Schema), + + %% The dynamodb driver uses caller process as its connection process + %% so at here, the connection process is the ecpool worker self + {ok, self()}. + +parse_template(Config) -> + Templates = + case maps:get(template, Config, undefined) of + undefined -> #{}; + <<>> -> #{}; + Template -> #{send_message => Template} + end, + + parse_template(maps:to_list(Templates), #{}). + +parse_template([{Key, H} | T], Templates) -> + ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), + parse_template( + T, + Templates#{Key => ParamsTks} + ); +parse_template([], Templates) -> + Templates. + +to_str(List) when is_list(List) -> + List; +to_str(Bin) when is_binary(Bin) -> + erlang:binary_to_list(Bin). + +get_host_schema("http://" ++ Server) -> + {"http://", Server}; +get_host_schema("https://" ++ Server) -> + {"https://", Server}; +get_host_schema(Server) -> + {"http://", Server}. + +apply_template({Key, Msg} = Req, Templates) -> + case maps:get(Key, Templates, undefined) of + undefined -> + Req; + Template -> + {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} + end; +%% now there is no batch delete, so +%% 1. we can simply replace the `send_message` to `put` +%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`, +%% so we can reduce some list map cost +apply_template([{send_message, _Msg} | _] = Msgs, Templates) -> + lists:map( + fun(Req) -> + {_, Msg} = apply_template(Req, Templates), + {put, convert_to_item(Msg)} + end, + Msgs + ). + +convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 -> + maps:fold( + fun + (_K, <<>>, AccIn) -> + AccIn; + (K, V, AccIn) -> + [{convert2binary(K), convert2binary(V)} | AccIn] + end, + [], + Msg + ); +convert_to_item(MsgBin) when is_binary(MsgBin) -> + Msg = emqx_json:decode(MsgBin), + convert_to_item(Msg); +convert_to_item(Item) -> + erlang:throw({invalid_item, Item}). + +convert2binary(Value) when is_atom(Value) -> + erlang:atom_to_binary(Value, utf8); +convert2binary(Value) when is_binary(Value); is_number(Value) -> + Value; +convert2binary(Value) when is_list(Value) -> + unicode:characters_to_binary(Value); +convert2binary(Value) when is_map(Value) -> + emqx_json:encode(Value). + +do_async_reply(Result, {ReplyFun, [Context]}) -> + ReplyFun(Context, Result). diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 7e4c06c3e..b3c424ea1 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -165,6 +165,9 @@ for dep in ${CT_DEPS}; do clickhouse) FILES+=( '.ci/docker-compose-file/docker-compose-clickhouse.yaml' ) ;; + dynamo) + FILES+=( '.ci/docker-compose-file/docker-compose-dynamo.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1