diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 196338336..ddf24d380 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -58,7 +58,12 @@ T == redis_single; T == redis_sentinel; T == redis_cluster; - T == clickhouse + T == clickhouse; + T == pgsql; + T == timescale; + T == matrix; + T == tdengine; + T == dynamo ). load() -> diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_dynamo.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_dynamo.conf new file mode 100644 index 000000000..664b13174 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_dynamo.conf @@ -0,0 +1,72 @@ +emqx_ee_bridge_dynamo { + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to DynamoDB. All MQTT `PUBLISH` messages with the topic +matching the `local_topic` will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also `local_topic` is +configured, then both the data got from the rule and the MQTT messages that match `local_topic` +will be forwarded.""" + zh: """发送到 'local_topic' 的消息都会转发到 DynamoDB。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + + template { + desc { + en: """Template, the default value is empty. When this value is empty the whole message will be stored in the database""" + zh: """模板, 默认为空,为空时将会将整个消息存入数据库""" + } + label { + en: "Template" + zh: "模板" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用桥接" + } + } + + desc_config { + desc { + en: """Configuration for an DynamoDB bridge.""" + zh: """DynamoDB 桥接配置""" + } + label: { + en: "DynamoDB Bridge Configuration" + zh: "DynamoDB 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + + desc_name { + desc { + en: """Bridge name.""" + zh: """桥接名字""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index b7f35537e..b5c656291 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -30,7 +30,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_timescale, Method), ref(emqx_ee_bridge_matrix, Method), ref(emqx_ee_bridge_tdengine, Method), - ref(emqx_ee_bridge_clickhouse, Method) + ref(emqx_ee_bridge_clickhouse, Method), + ref(emqx_ee_bridge_dynamo, Method) ]. schema_modules() -> @@ -46,7 +47,8 @@ schema_modules() -> emqx_ee_bridge_timescale, emqx_ee_bridge_matrix, emqx_ee_bridge_tdengine, - emqx_ee_bridge_clickhouse + emqx_ee_bridge_clickhouse, + emqx_ee_bridge_dynamo ]. examples(Method) -> @@ -78,7 +80,8 @@ resource_type(pgsql) -> emqx_connector_pgsql; resource_type(timescale) -> emqx_connector_pgsql; resource_type(matrix) -> emqx_connector_pgsql; resource_type(tdengine) -> emqx_ee_connector_tdengine; -resource_type(clickhouse) -> emqx_ee_connector_clickhouse. +resource_type(clickhouse) -> emqx_ee_connector_clickhouse; +resource_type(dynamo) -> emqx_ee_connector_dynamo. fields(bridges) -> [ @@ -121,6 +124,14 @@ fields(bridges) -> desc => <<"TDengine Bridge Config">>, required => false } + )}, + {dynamo, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_dynamo, "config")), + #{ + desc => <<"Dynamo Bridge Config">>, + required => false + } )} ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs(). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl new file mode 100644 index 000000000..066b873ce --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_dynamo.erl @@ -0,0 +1,122 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_dynamo). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1, + values/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_TEMPLATE, <<>>). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"dynamo">> => #{ + summary => <<"DynamoDB Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + enable => true, + type => dynamo, + name => <<"foo">>, + url => <<"http://127.0.0.1:8000">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"root">>, + password => <<"public">>, + template => ?DEFAULT_TEMPLATE, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => sync, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + } + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_dynamo". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {template, + mk( + binary(), + #{desc => ?DESC("template"), default => ?DEFAULT_TEMPLATE} + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("local_topic"), default => undefined} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ + (emqx_ee_connector_dynamo:fields(config) -- + emqx_connector_schema_lib:prepare_statement_fields()); +fields("creation_opts") -> + emqx_resource_schema:fields("creation_opts"); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- + +type_field() -> + {type, mk(enum([dynamo]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_dynamo.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_dynamo.conf new file mode 100644 index 000000000..e1fc11e03 --- /dev/null +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_dynamo.conf @@ -0,0 +1,14 @@ +emqx_ee_connector_dynamo { + + url { + desc { + en: """The url of DynamoDB endpoint.
""" + zh: """DynamoDB 的地址。
""" + } + label: { + en: "DynamoDB Endpoint" + zh: "DynamoDB 地址" + } + } + +} diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index bcf9508bf..76f6ccfba 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -4,6 +4,7 @@ {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}}, {tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.5"}}}, {clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.2"}}}, + {erlcloud, {git, "https://github.com/emqx/erlcloud.git", {tag,"3.5.16-emqx-1"}}}, {emqx, {path, "../../apps/emqx"}} ]}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 6c9d83bc7..5fcb83baa 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -10,7 +10,8 @@ tdengine, wolff, brod, - clickhouse + clickhouse, + erlcloud ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl new file mode 100644 index 000000000..957706f6a --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -0,0 +1,345 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_dynamo). + +-behaviour(emqx_resource). + +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1]). + +%% `emqx_resource' API +-export([ + callback_mode/0, + is_buffer_supported/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_query_async/4, + on_batch_query_async/4, + on_get_status/2 +]). + +-export([ + connect/1, + do_get_status/1, + do_async_reply/2, + worker_do_query/4, + worker_do_get_status/1 +]). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-define(DYNAMO_HOST_OPTIONS, #{ + default_port => 8000 +}). + +-ifdef(TEST). +-export([execute/2]). +-endif. + +%%===================================================================== +%% Hocon schema +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {url, mk(binary(), #{required => true, desc => ?DESC("url")})} + | add_default_username( + emqx_connector_schema_lib:relational_db_fields() + ) + ]. + +add_default_username(Fields) -> + lists:map( + fun + ({username, OrigUsernameFn}) -> + {username, add_default_fn(OrigUsernameFn, <<"root">>)}; + (Field) -> + Field + end, + Fields + ). + +add_default_fn(OrigFn, Default) -> + fun + (default) -> Default; + (Field) -> OrigFn(Field) + end. + +%%======================================================================================== +%% `emqx_resource' API +%%======================================================================================== + +callback_mode() -> async_if_possible. + +is_buffer_supported() -> false. + +on_start( + InstanceId, + #{ + url := Url, + username := Username, + password := Password, + database := Database, + pool_size := PoolSize + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_dynamo_connector", + connector => InstanceId, + config => emqx_misc:redact(Config) + }), + + {Schema, Server} = get_host_schema(to_str(Url)), + {Host, Port} = emqx_schema:parse_server(Server, ?DYNAMO_HOST_OPTIONS), + + Options = [ + {config, #{ + host => Host, + port => Port, + username => to_str(Username), + password => to_str(Password), + schema => Schema + }}, + {pool_size, PoolSize} + ], + + Templates = parse_template(Config), + State = #{ + poolname => InstanceId, + database => Database, + templates => Templates + }, + case emqx_plugin_libs_pool:start_pool(InstanceId, ?MODULE, Options) of + ok -> + {ok, State}; + Error -> + Error + end. + +on_stop(InstanceId, #{poolname := PoolName} = _State) -> + ?SLOG(info, #{ + msg => "stopping_dynamo_connector", + connector => InstanceId + }), + emqx_plugin_libs_pool:stop_pool(PoolName). + +on_query(InstanceId, Query, State) -> + do_query(InstanceId, Query, handover, State). + +on_query_async(InstanceId, Query, Reply, State) -> + do_query( + InstanceId, + Query, + {handover_async, {?MODULE, do_async_reply, [Reply]}}, + State + ). + +%% we only support batch insert +on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> + do_query(InstanceId, Query, handover, State); +on_batch_query(_InstanceId, Query, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +%% we only support batch insert +on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, Reply, State) -> + do_query( + InstanceId, + Query, + {handover_async, {?MODULE, do_async_reply, [Reply]}}, + State + ); +on_batch_query_async(_InstanceId, Query, _Reply, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +on_get_status(_InstanceId, #{poolname := Pool}) -> + Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), + status_result(Health). + +do_get_status(Conn) -> + %% because the dynamodb driver connection process is the ecpool worker self + %% so we must call the checker function inside the worker + ListTables = ecpool_worker:exec(Conn, {?MODULE, worker_do_get_status, []}, infinity), + case ListTables of + {ok, _} -> true; + _ -> false + end. + +worker_do_get_status(_) -> + erlcloud_ddb2:list_tables(). + +status_result(_Status = true) -> connected; +status_result(_Status = false) -> connecting. + +%%======================================================================================== +%% Helper fns +%%======================================================================================== + +do_query( + InstanceId, + Query, + ApplyMode, + #{poolname := PoolName, templates := Templates, database := Database} = State +) -> + ?TRACE( + "QUERY", + "dynamo_connector_received", + #{connector => InstanceId, query => Query, state => State} + ), + Result = ecpool:pick_and_do( + PoolName, + {?MODULE, worker_do_query, [Database, Query, Templates]}, + ApplyMode + ), + + case Result of + {error, Reason} -> + ?tp( + dynamo_connector_query_return, + #{error => Reason} + ), + ?SLOG(error, #{ + msg => "dynamo_connector_do_query_failed", + connector => InstanceId, + query => Query, + reason => Reason + }), + Result; + _ -> + ?tp( + dynamo_connector_query_return, + #{result => Result} + ), + Result + end. + +worker_do_query(_Client, Database, Query0, Templates) -> + try + Query = apply_template(Query0, Templates), + execute(Query, Database) + catch + _Type:Reason -> + {error, {unrecoverable_error, {invalid_request, Reason}}} + end. + +%% some simple query commands for authn/authz or test +execute({insert_item, Msg}, Database) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Database, Item); +execute({delete_item, Key}, Database) -> + erlcloud_ddb2:delete_item(Database, Key); +execute({get_item, Key}, Database) -> + erlcloud_ddb2:get_item(Database, Key); +%% commands for data bridge query or batch query +execute({send_message, Msg}, Database) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Database, Item); +execute([{put, _} | _] = Msgs, Database) -> + %% type of batch_write_item argument :: batch_write_item_request_items() + %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) + %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} + %% batch_write_item_request() :: {put, item()} | {delete, key()} + erlcloud_ddb2:batch_write_item({Database, Msgs}). + +connect(Opts) -> + #{ + username := Username, + password := Password, + host := Host, + port := Port, + schema := Schema + } = proplists:get_value(config, Opts), + erlcloud_ddb2:configure(Username, Password, Host, Port, Schema), + + %% The dynamodb driver uses caller process as its connection process + %% so at here, the connection process is the ecpool worker self + {ok, self()}. + +parse_template(Config) -> + Templates = + case maps:get(template, Config, undefined) of + undefined -> #{}; + <<>> -> #{}; + Template -> #{send_message => Template} + end, + + parse_template(maps:to_list(Templates), #{}). + +parse_template([{Key, H} | T], Templates) -> + ParamsTks = emqx_plugin_libs_rule:preproc_tmpl(H), + parse_template( + T, + Templates#{Key => ParamsTks} + ); +parse_template([], Templates) -> + Templates. + +to_str(List) when is_list(List) -> + List; +to_str(Bin) when is_binary(Bin) -> + erlang:binary_to_list(Bin). + +get_host_schema("http://" ++ Server) -> + {"http://", Server}; +get_host_schema("https://" ++ Server) -> + {"https://", Server}; +get_host_schema(Server) -> + {"http://", Server}. + +apply_template({Key, Msg} = Req, Templates) -> + case maps:get(Key, Templates, undefined) of + undefined -> + Req; + Template -> + {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} + end; +%% now there is no batch delete, so +%% 1. we can simply replace the `send_message` to `put` +%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`, +%% so we can reduce some list map cost +apply_template([{send_message, _Msg} | _] = Msgs, Templates) -> + lists:map( + fun(Req) -> + {_, Msg} = apply_template(Req, Templates), + {put, convert_to_item(Msg)} + end, + Msgs + ). + +convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 -> + maps:fold( + fun + (_K, <<>>, AccIn) -> + AccIn; + (K, V, AccIn) -> + [{convert2binary(K), convert2binary(V)} | AccIn] + end, + [], + Msg + ); +convert_to_item(MsgBin) when is_binary(MsgBin) -> + Msg = emqx_json:decode(MsgBin), + convert_to_item(Msg); +convert_to_item(Item) -> + erlang:throw({invalid_item, Item}). + +convert2binary(Value) when is_atom(Value) -> + erlang:atom_to_binary(Value, utf8); +convert2binary(Value) when is_binary(Value); is_number(Value) -> + Value; +convert2binary(Value) when is_list(Value) -> + unicode:characters_to_binary(Value); +convert2binary(Value) when is_map(Value) -> + emqx_json:encode(Value). + +do_async_reply(Result, {ReplyFun, [Context]}) -> + ReplyFun(Context, Result).