From d16458ccd0ab0ba405febab6c73c4ec25ea26dd8 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 2 Jan 2024 20:02:28 +0800 Subject: [PATCH] feat(iotdb): improve the IoTDB bridge to v2 style --- apps/emqx_bridge/src/emqx_action_info.erl | 3 +- .../src/emqx_bridge_azure_event_hub.erl | 4 +- .../src/emqx_bridge_confluent_producer.erl | 4 +- .../src/emqx_bridge_http_schema.erl | 13 +- .../src/emqx_bridge_iotdb.erl | 242 ++++++++++--- .../src/emqx_bridge_iotdb_action_info.erl | 71 ++++ ...pl.erl => emqx_bridge_iotdb_connector.erl} | 327 +++++++++++++++--- .../src/emqx_connector_resource.erl | 54 +-- .../src/schema/emqx_connector_ee_schema.erl | 18 +- .../src/schema/emqx_connector_schema.erl | 4 +- .../src/schema/emqx_resource_schema.erl | 2 +- rel/i18n/emqx_bridge_iotdb.hocon | 50 +++ rel/i18n/emqx_bridge_iotdb_connector.hocon | 44 +++ 13 files changed, 666 insertions(+), 170 deletions(-) create mode 100644 apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl rename apps/emqx_bridge_iotdb/src/{emqx_bridge_iotdb_impl.erl => emqx_bridge_iotdb_connector.erl} (57%) create mode 100644 rel/i18n/emqx_bridge_iotdb_connector.hocon diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index e9c51edfa..9dce9ed8d 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -84,7 +84,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, emqx_bridge_timescale_action_info, - emqx_bridge_redis_action_info + emqx_bridge_redis_action_info, + emqx_bridge_iotdb_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index cf733ddfd..a63249fa2 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -25,7 +25,7 @@ ]). %% emqx_connector_resource behaviour callbacks --export([connector_config/1]). +-export([connector_config/2]). -export([producer_converter/2, host_opts/0]). @@ -326,7 +326,7 @@ values(producer) -> %% `emqx_connector_resource' API %%------------------------------------------------------------------------------------------------- -connector_config(Config) -> +connector_config(Config, _) -> %% Default port for AEH is 9093 BootstrapHosts0 = maps:get(bootstrap_hosts, Config), BootstrapHosts = emqx_schema:parse_servers( diff --git a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl index a43a8a285..dcae031eb 100644 --- a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl +++ b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl @@ -24,7 +24,7 @@ ]). %% emqx_connector_resource behaviour callbacks --export([connector_config/1]). +-export([connector_config/2]). -export([host_opts/0]). @@ -288,7 +288,7 @@ values(action) -> %% `emqx_connector_resource' API %%------------------------------------------------------------------------------------------------- -connector_config(Config) -> +connector_config(Config, _) -> %% Default port for Confluent is 9092 BootstrapHosts0 = maps:get(bootstrap_hosts, Config), BootstrapHosts = emqx_schema:parse_servers( diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index 91ad25d31..7e8c4a2cd 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -133,17 +133,8 @@ fields("get_" ++ Type) -> fields("config_bridge_v2") -> fields("http_action"); fields("config_connector") -> - [ - {enable, - mk( - boolean(), - #{ - desc => <<"Enable or disable this connector">>, - default => true - } - )}, - {description, emqx_schema:description_schema()} - ] ++ connector_url_headers() ++ + emqx_connector_schema:common_fields() ++ + connector_url_headers() ++ connector_opts() ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields(connector_resource_opts) -> diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 25bafbd00..a313a9613 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_iotdb). @@ -8,7 +8,12 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2, enum/1, ref/2, array/1]). + +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1 +]). %% hocon_schema API -export([ @@ -18,8 +23,8 @@ desc/1 ]). -%% emqx_bridge_enterprise "unofficial" API --export([conn_bridge_examples/1]). +-define(CONNECTOR_TYPE, iotdb). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API @@ -29,24 +34,140 @@ namespace() -> "bridge_iotdb". roots() -> []. -fields("config") -> - basic_config() ++ request_config(); -fields("post") -> - [ - type_field(), - name_field() - ] ++ fields("config"); -fields("put") -> - fields("config"); -fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"); -fields("creation_opts") -> +%%------------------------------------------------------------------------------------------------- +%% v2 schema +%%------------------------------------------------------------------------------------------------- + +fields(action) -> + {iotdb, + mk( + hoconsc:map(name, ref(?MODULE, action_config)), + #{ + desc => <<"IoTDB Action Config">>, + required => false + } + )}; +fields(action_config) -> + emqx_resource_schema:override( + emqx_bridge_v2_schema:make_producer_action_schema( + mk( + ref(?MODULE, action_parameters), + #{ + required => true, desc => ?DESC("action_parameters") + } + ) + ), + [ + {resource_opts, + mk(ref(?MODULE, action_resource_opts), #{ + default => #{}, + desc => ?DESC(emqx_resource_schema, "resource_opts") + })} + ] + ); +fields(action_resource_opts) -> lists:filter( fun({K, _V}) -> not lists:member(K, unsupported_opts()) end, - emqx_resource_schema:fields("creation_opts") + emqx_bridge_v2_schema:resource_opts_fields() ); +fields(action_parameters) -> + [ + {is_aligned, + mk( + boolean(), + #{ + desc => ?DESC("config_is_aligned"), + default => false + } + )}, + {device_id, + mk( + binary(), + #{ + desc => ?DESC("config_device_id") + } + )}, + {iotdb_version, + mk( + hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), + #{ + desc => ?DESC("config_iotdb_version"), + default => ?VSN_1_1_X + } + )}, + {data, + mk( + array(ref(?MODULE, action_parameters_data)), + #{ + desc => ?DESC("action_parameters_data") + } + )} + ] ++ + proplists_without( + [path, method, body, headers, request_timeout], + emqx_bridge_http_schema:fields("parameters_opts") + ); +fields(action_parameters_data) -> + [ + {timestamp, + mk( + binary(), + #{ + desc => ?DESC("config_parameters_timestamp"), + default => <<"now">> + } + )}, + {measurement, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_measurement") + } + )}, + {data_type, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_data_type"), + validator => fun(Type) -> + lists:member(Type, [ + <<"TEXT">>, + <<"BOOLEAN">>, + <<"INT32">>, + <<"INT64">>, + <<"FLOAT">>, + <<"DOUBLE">> + ]) + end + } + )}, + {value, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_value") + } + )} + ]; +fields("post_bridge_v2") -> + emqx_bridge_schema:type_and_name_fields(enum([iotdb])) ++ fields(action_config); +fields("put_bridge_v2") -> + fields(action_config); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"); +%%------------------------------------------------------------------------------------------------- +%% v1 schema +%%------------------------------------------------------------------------------------------------- + +fields("config") -> + basic_config() ++ request_config(); +fields("creation_opts") -> + proplists_without(unsupported_opts(), emqx_resource_schema:fields("creation_opts")); fields(auth_basic) -> [ {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, @@ -55,22 +176,28 @@ fields(auth_basic) -> required => true, desc => ?DESC("config_auth_basic_password") })} - ]. + ]; +fields("post") -> + emqx_bridge_schema:type_and_name_fields(enum([iotdb])) ++ fields("config"); +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). desc("config") -> ?DESC("desc_config"); -desc("creation_opts") -> - ?DESC(emqx_resource_schema, "creation_opts"); -desc("post") -> - ["Configuration for IoTDB using `POST` method."]; -desc(Name) -> - lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), - ?DESC(Name). - -struct_names() -> - [ - auth_basic - ]. +desc(action_config) -> + ?DESC("desc_config"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(action_parameters_data) -> + ?DESC("action_parameters_data"); +desc(auth_basic) -> + "Basic Authentication"; +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for IoTDB using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. basic_config() -> [ @@ -160,30 +287,43 @@ unsupported_opts() -> batch_time ]. -%%====================================================================================== +%%------------------------------------------------------------------------------------------------- +%% v2 examples +%%------------------------------------------------------------------------------------------------- -type_field() -> - {type, - mk( - hoconsc:enum([iotdb]), - #{ - required => true, - desc => ?DESC("desc_type") - } - )}. +bridge_v2_examples(Method) -> + [ + #{ + <<"iotdb">> => + #{ + summary => <<"Apache IoTDB Bridge">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. -name_field() -> - {name, - mk( - binary(), - #{ - required => true, - desc => ?DESC("desc_name") - } - )}. - -%%====================================================================================== +action_values() -> + #{ + parameters => #{ + data => [ + #{ + timestamp => now, + measurement => <<"status">>, + data_type => <<"BOOLEAN">>, + value => <<"${st}">> + } + ], + is_aligned => false, + device_id => <<"${clientid}">>, + iotdb_version => ?VSN_1_1_X + } + }. +%%------------------------------------------------------------------------------------------------- +%% v1 examples +%%------------------------------------------------------------------------------------------------- conn_bridge_examples(Method) -> [ #{ diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl new file mode 100644 index 000000000..32afa2581 --- /dev/null +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_iotdb_action_info). + +-behaviour(emqx_action_info). + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +%% behaviour callbacks +-export([ + action_type_name/0, + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + bridge_v1_type_name/0, + connector_action_config_to_bridge_v1_config/2, + connector_type_name/0, + schema_module/0 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(ACTION_TYPE, iotdb). +-define(SCHEMA_MODULE, emqx_bridge_iotdb). + +action_type_name() -> ?ACTION_TYPE. +bridge_v1_type_name() -> ?ACTION_TYPE. +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> ?SCHEMA_MODULE. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + MergedConfig = + emqx_utils_maps:deep_merge( + maps:without( + [<<"description">>, <<"local_topic">>, <<"connector">>, <<"data">>], + emqx_utils_maps:unindent(<<"parameters">>, ActionConfig) + ), + ConnectorConfig + ), + BridgeV1Keys = schema_keys("config"), + maps:with(BridgeV1Keys, MergedConfig). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(action_config), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ConnectorKeys = schema_keys(emqx_bridge_iotdb_connector, config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + maps:with(ConnectorKeys, BridgeV1Config) + ). + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config#{<<"data">> => []}), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + schema_keys(?SCHEMA_MODULE, Name). + +schema_keys(Mod, Name) -> + [bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))]. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl similarity index 57% rename from apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl rename to apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index b101cfc68..c493da90c 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -1,10 +1,13 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_iotdb_impl). +-module(emqx_bridge_iotdb_connector). + +-behaviour(emqx_resource). -include("emqx_bridge_iotdb.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% `emqx_resource' API @@ -14,9 +17,25 @@ on_stop/2, on_get_status/2, on_query/3, - on_query_async/4 + on_query_async/4, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). +-export([ + namespace/0, + roots/0, + fields/1, + desc/1, + connector_examples/1, + connector_example_values/0 +]). + +%% emqx_connector_resource behaviour callbacks +-export([connector_config/2]). + -type config() :: #{ base_url := #{ @@ -29,33 +48,140 @@ pool_type := random | hash, pool_size := pos_integer(), request => undefined | map(), - is_aligned => boolean(), - iotdb_version => binary(), - device_id => binary() | undefined, atom() => _ }. -type state() :: #{ base_path := _, - base_url := #{ - scheme := http | https, - host := iolist(), - port := inet:port_number(), - path := _ - }, connect_timeout := pos_integer(), pool_type := random | hash, - pool_size := pos_integer(), + channels := map(), request => undefined | map(), - is_aligned => boolean(), - iotdb_version => binary(), - device_id => binary() | undefined, atom() => _ }. -type manager_id() :: binary(). +-define(CONNECTOR_TYPE, iotdb). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +%%------------------------------------------------------------------------------------- +%% connector examples +%%------------------------------------------------------------------------------------- +connector_examples(Method) -> + [ + #{ + <<"iotdb">> => + #{ + summary => <<"Apache IoTDB Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_example_values() + ) + } + } + ]. + +connector_example_values() -> + #{ + name => <<"iotdb_connector">>, + type => iotdb, + enable => true, + authentication => #{ + <<"username">> => <<"root">>, + <<"password">> => <<"*****">> + }, + base_url => <<"http://iotdb.local:18080/">>, + connect_timeout => <<"15s">>, + pool_type => <<"random">>, + pool_size => 8, + enable_pipelining => 100, + ssl => #{enable => false} + }. + +%%------------------------------------------------------------------------------------- +%% schema +%%------------------------------------------------------------------------------------- +namespace() -> "iotdb". + +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + proplists_without([url, headers], emqx_bridge_http_schema:fields("config_connector")) ++ + fields("connection_fields"); +fields("connection_fields") -> + [ + {base_url, + mk( + emqx_schema:url(), + #{ + required => true, + desc => ?DESC(emqx_bridge_iotdb, "config_base_url") + } + )}, + {authentication, + mk( + hoconsc:union([ref(?MODULE, auth_basic)]), + #{ + default => auth_basic, desc => ?DESC("config_authentication") + } + )} + ]; +fields(auth_basic) -> + [ + {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, + {password, + emqx_schema_secret:mk(#{ + required => true, + desc => ?DESC("config_auth_basic_password") + })} + ]; +fields("post") -> + emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields(config); +fields("put") -> + fields(config); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc(config) -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for IoTDB using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) -> + #{ + base_url := BaseUrl, + authentication := + #{ + username := Username, + password := Password0 + } + } = Conf, + + Password = emqx_secret:unwrap(Password0), + BasicToken = base64:encode(<>), + + WebhookConfig = + Conf#{ + url => BaseUrl, + headers => [ + {<<"Content-type">>, <<"application/json">>}, + {<<"Authorization">>, BasicToken} + ] + }, + ParseConfs( + <<"http">>, + Name, + WebhookConfig + ). + +proplists_without(Keys, List) -> + [El || El = {K, _} <- List, not lists:member(K, Keys)]. + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -73,7 +199,7 @@ on_start(InstanceId, Config) -> request => maps:get(request, State, <<>>) }), ?tp(iotdb_bridge_started, #{instance_id => InstanceId}), - {ok, maps:merge(Config, State)}; + {ok, State#{channels => #{}}}; {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", @@ -103,19 +229,20 @@ on_get_status(InstanceId, State) -> {ok, pos_integer(), [term()], term()} | {ok, pos_integer(), [term()]} | {error, term()}. -on_query(InstanceId, {send_message, Message}, State) -> +on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = State) -> ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_called", instance_id => InstanceId, - send_message => Message, + send_message => Req, state => emqx_utils:redact(State) }), - case make_iotdb_insert_request(Message, State) of + + case try_render_message(Req, Channels) of {ok, IoTDBPayload} -> handle_response( emqx_bridge_http_connector:on_query( - InstanceId, {send_message, IoTDBPayload}, State + InstanceId, {ChannelId, IoTDBPayload}, State ) ); Error -> @@ -124,15 +251,17 @@ on_query(InstanceId, {send_message, Message}, State) -> -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) -> {ok, pid()} | {error, empty_request}. -on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> +on_query_async( + InstanceId, {ChannelId, _Message} = Req, ReplyFunAndArgs0, #{channels := Channels} = State +) -> ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_async_called", instance_id => InstanceId, - send_message => Message, + send_message => Req, state => emqx_utils:redact(State) }), - case make_iotdb_insert_request(Message, State) of + case try_render_message(Req, Channels) of {ok, IoTDBPayload} -> ReplyFunAndArgs = { @@ -143,12 +272,71 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> [] }, emqx_bridge_http_connector:on_query_async( - InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State + InstanceId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State ); Error -> Error end. +on_add_channel( + InstanceId, + #{channels := Channels} = OldState0, + ChannelId, + #{ + parameters := #{iotdb_version := Version, data := Data} = Parameter + } +) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + _ -> + %% update HTTP channel + InsertTabletPathV1 = <<"rest/v1/insertTablet">>, + InsertTabletPathV2 = <<"rest/v2/insertTablet">>, + + Path = + case Version of + ?VSN_1_1_X -> InsertTabletPathV2; + _ -> InsertTabletPathV1 + end, + + HTTPReq = #{ + parameters => Parameter#{ + path => Path, + method => <<"post">> + } + }, + + {ok, OldState} = emqx_bridge_http_connector:on_add_channel( + InstanceId, OldState0, ChannelId, HTTPReq + ), + + %% update IoTDB channel + DeviceId = maps:get(device_id, Parameter, <<>>), + Channel = Parameter#{ + device_id => emqx_placeholder:preproc_tmpl(DeviceId), + data := preproc_data_template(Data) + }, + Channels2 = Channels#{ChannelId => Channel}, + {ok, OldState#{channels := Channels2}} + end. + +on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> + {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId), + Channels2 = maps:remove(ChannelId, Channels), + {ok, OldState#{channels => Channels2}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> + connected; + _ -> + {error, not_exists} + end. + %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- @@ -238,14 +426,14 @@ iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> iot_timestamp(TimestampTkn, Msg, Nows) -> iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows). +iot_timestamp(<<"now_us">>, #{now_us := NowUs}) -> + NowUs; +iot_timestamp(<<"now_ns">>, #{now_ns := NowNs}) -> + NowNs; iot_timestamp(Timestamp, #{now_ms := NowMs}) when Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>> -> NowMs; -iot_timestamp(Timestamp, #{now_us := NowUs}) when Timestamp =:= <<"now_us">> -> - NowUs; -iot_timestamp(Timestamp, #{now_ns := NowNs}) when Timestamp =:= <<"now_ns">> -> - NowNs; iot_timestamp(Timestamp, _) when is_binary(Timestamp) -> binary_to_integer(Timestamp). @@ -304,25 +492,13 @@ convert_float(Str) when is_binary(Str) -> convert_float(undefined) -> null. -make_iotdb_insert_request(Message, State) -> - Payloads = to_list(parse_payload(get_payload(Message))), - IsAligned = maps:get(is_aligned, State, false), - IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X), - case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of - {undefined, _} -> - {error, device_id_missing}; - {_, []} -> - {error, invalid_data}; - {DeviceId, PreProcessedData} -> - DataList = proc_data(PreProcessedData, Message), - InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, - Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), - {ok, - maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, - iotdb_field_key(device_id, IotDBVsn) => DeviceId - })} - end. +make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) -> + InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, + Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), + maps:merge(Rows, #{ + iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, + iotdb_field_key(device_id, IotDBVsn) => DeviceId + }). replace_dtypes(Rows0, IotDBVsn) -> {Types, Rows} = maps:take(dtypes, Rows0), @@ -404,13 +580,12 @@ iotdb_field_key(data_types, ?VSN_0_13_X) -> to_list(List) when is_list(List) -> List; to_list(Data) -> [Data]. -device_id(Message, Payloads, State) -> - case maps:get(device_id, State, undefined) of - undefined -> - %% [FIXME] there could be conflicting device-ids in the Payloads +%% If device_id is missing from the channel data, try to find it from the payload +device_id(Message, Payloads, Channel) -> + case maps:get(device_id, Channel, <<>>) of + <<>> -> maps:get(<<"device_id">>, hd(Payloads), undefined); - DeviceId -> - DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId), + DeviceIdTkn -> emqx_placeholder:proc_tmpl(DeviceIdTkn, Message) end. @@ -430,3 +605,47 @@ eval_response_body(Body, Resp) -> #{<<"code">> := 200} -> Resp; Reason -> {error, Reason} end. + +preproc_data_template(DataList) -> + lists:map( + fun( + #{ + timestamp := Timestamp, + measurement := Measurement, + data_type := DataType, + value := Value + } + ) -> + #{ + timestamp => emqx_placeholder:preproc_tmpl(Timestamp), + measurement => emqx_placeholder:preproc_tmpl(Measurement), + data_type => DataType, + value => emqx_placeholder:preproc_tmpl(Value) + } + end, + DataList + ). + +try_render_message({ChannelId, Msg}, Channels) -> + case maps:find(ChannelId, Channels) of + {ok, Channel} -> + {ok, render_channel_message(Channel, Msg)}; + _ -> + {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} + end. + +render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) -> + Payloads = to_list(parse_payload(get_payload(Message))), + DataTemplate = get_data_template(Channel, Payloads), + DataList = proc_data(DataTemplate, Message), + DeviceId = device_id(Message, Payloads, Channel), + make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn). + +%% Get the message template. +%% In order to be compatible with 4.4, the template version has higher priority +%% This is a template, using it +get_data_template(#{data := Data}, _Payloads) when Data =/= [] -> + Data; +%% This is a self-describing message +get_data_template(#{data := []}, Payloads) -> + preproc_data_list(Payloads). diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 72576b22e..08c320490 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -302,54 +302,18 @@ parse_confs( method => undefined } }; -parse_confs(<<"iotdb">>, Name, Conf) -> - %% [FIXME] this has no place here, it's used in parse_confs/3, which should - %% rather delegate to a behavior callback than implementing domain knowledge - %% here (reversed dependency) - InsertTabletPathV1 = <<"rest/v1/insertTablet">>, - InsertTabletPathV2 = <<"rest/v2/insertTablet">>, - #{ - base_url := BaseURL, - authentication := - #{ - username := Username, - password := Password - } - } = Conf, - BasicToken = base64:encode(<>), - %% This version atom correspond to the macro ?VSN_1_1_X in - %% emqx_connector_iotdb.hrl. It would be better to use the macro directly, but - %% this cannot be done without introducing a dependency on the - %% emqx_iotdb_connector app (which is an EE app). - DefaultIOTDBConnector = 'v1.1.x', - Version = maps:get(iotdb_version, Conf, DefaultIOTDBConnector), - InsertTabletPath = - case Version of - DefaultIOTDBConnector -> InsertTabletPathV2; - _ -> InsertTabletPathV1 - end, - WebhookConfig = - Conf#{ - method => <<"post">>, - url => <>, - headers => [ - {<<"Content-type">>, <<"application/json">>}, - {<<"Authorization">>, BasicToken} - ] - }, - parse_confs( - <<"webhook">>, - Name, - WebhookConfig - ); -parse_confs(ConnectorType, _Name, Config) -> - connector_config(ConnectorType, Config). +parse_confs(ConnectorType, Name, Config) -> + connector_config(ConnectorType, Name, Config). -connector_config(ConnectorType, Config) -> +connector_config(ConnectorType, Name, Config) -> Mod = connector_impl_module(ConnectorType), - case erlang:function_exported(Mod, connector_config, 1) of + case erlang:function_exported(Mod, connector_config, 2) of true -> - Mod:connector_config(Config); + Mod:connector_config(Config, #{ + type => ConnectorType, + name => Name, + parse_confs => fun parse_confs/3 + }); false -> Config end. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 4e8618915..ef78945b5 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -46,6 +46,8 @@ resource_type(timescale) -> emqx_postgresql; resource_type(redis) -> emqx_bridge_redis_connector; +resource_type(iotdb) -> + emqx_bridge_iotdb_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -56,6 +58,8 @@ connector_impl_module(azure_event_hub_producer) -> emqx_bridge_azure_event_hub; connector_impl_module(confluent_producer) -> emqx_bridge_confluent_producer; +connector_impl_module(iotdb) -> + emqx_bridge_iotdb_connector; connector_impl_module(_ConnectorType) -> undefined. @@ -159,6 +163,14 @@ connector_structs() -> desc => <<"Timescale Connector Config">>, required => false } + )}, + {iotdb, + mk( + hoconsc:map(name, ref(emqx_bridge_iotdb_connector, config)), + #{ + desc => <<"IoTDB Connector Config">>, + required => false + } )} ]. @@ -175,7 +187,8 @@ schema_modules() -> emqx_bridge_syskeeper_proxy, emqx_bridge_timescale, emqx_postgresql_connector_schema, - emqx_bridge_redis_schema + emqx_bridge_redis_schema, + emqx_bridge_iotdb_connector ]. api_schemas(Method) -> @@ -201,7 +214,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), - api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector") + api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), + api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index f67cd6991..a0c28cc22 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -142,7 +142,9 @@ connector_type_to_bridge_types(syskeeper_forwarder) -> connector_type_to_bridge_types(syskeeper_proxy) -> []; connector_type_to_bridge_types(timescale) -> - [timescale]. + [timescale]; +connector_type_to_bridge_types(iotdb) -> + [iotdb]. actions_config_name() -> <<"actions">>. diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 6041dc2fb..3d2417497 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -23,7 +23,7 @@ -export([namespace/0, roots/0, fields/1, desc/1]). --export([create_opts/1, resource_opts_meta/0]). +-export([create_opts/1, resource_opts_meta/0, override/2]). %% range interval in ms -define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1). diff --git a/rel/i18n/emqx_bridge_iotdb.hocon b/rel/i18n/emqx_bridge_iotdb.hocon index e38c828f5..987006a53 100644 --- a/rel/i18n/emqx_bridge_iotdb.hocon +++ b/rel/i18n/emqx_bridge_iotdb.hocon @@ -70,4 +70,54 @@ desc_name.desc: desc_name.label: """Bridge Name""" + +config_parameters_timestamp.desc: +"""Timestamp. Placeholders in format of ${var} is supported, the finally value can be:
+- now: use the `now_ms` which is contained in the payload as timestamp +- now_ms: same as above +- now_us: use the `now_us` which is contained in the payload as timestamp +- now_ns: use the `now_us` which is contained in the payload as timestamp +- any other: use the value directly as the timestamp +""" + +config_parameters_timestamp.label: +"""Timestamp""" + +config_parameters_measurement.desc: +"""Measurement. Placeholders in format of ${var} is supported""" + +config_parameters_measurement.label: +"""Measurement""" + +config_parameters_data_type.desc: +"""Data Type, can be:
+- TEXT +- BOOLEAN +- INT32 +- INT64 +- FLOAT +- DOUBLE +""" + +config_parameters_data_type.label: +"""Data type""" + +config_parameters_value.desc: +"""Value. Placeholders in format of ${var} is supported""" + +config_parameters_value.label: +"""Value""" + +action_parameters_data.desc: +"""IoTDB action parameter data""" + +action_parameters_data.label: +"""Parameter Data""" + +action_parameters.desc: +"""IoTDB action parameters""" + +action_parameters.label: +"""Parameters""" + } diff --git a/rel/i18n/emqx_bridge_iotdb_connector.hocon b/rel/i18n/emqx_bridge_iotdb_connector.hocon new file mode 100644 index 000000000..7df06f858 --- /dev/null +++ b/rel/i18n/emqx_bridge_iotdb_connector.hocon @@ -0,0 +1,44 @@ +emqx_bridge_iotdb_connector { + +config_authentication.desc: +"""Authentication configuration""" + +config_authentication.label: +"""Authentication""" + +auth_basic.desc: +"""Parameters for basic authentication.""" + +auth_basic.label: +"""Basic auth params""" + +config_auth_basic_username.desc: +"""The username as configured at the IoTDB REST interface""" + +config_auth_basic_username.label: + """HTTP Basic Auth Username""" + +config_auth_basic_password.desc: +"""The password as configured at the IoTDB REST interface""" + +config_auth_basic_password.label: +"""HTTP Basic Auth Password""" + +config_base_url.desc: +"""The base URL of the external IoTDB service's REST interface.""" +config_base_url.label: +"""IoTDB REST Service Base URL""" + +config_max_retries.desc: +"""HTTP request max retry times if failed.""" + +config_max_retries.label: +"""HTTP Request Max Retries""" + +desc_config.desc: +"""Configuration for Apache IoTDB bridge.""" + +desc_config.label: +"""IoTDB Bridge Configuration""" + +}