From d16458ccd0ab0ba405febab6c73c4ec25ea26dd8 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 2 Jan 2024 20:02:28 +0800 Subject: [PATCH 1/4] feat(iotdb): improve the IoTDB bridge to v2 style --- apps/emqx_bridge/src/emqx_action_info.erl | 3 +- .../src/emqx_bridge_azure_event_hub.erl | 4 +- .../src/emqx_bridge_confluent_producer.erl | 4 +- .../src/emqx_bridge_http_schema.erl | 13 +- .../src/emqx_bridge_iotdb.erl | 242 ++++++++++--- .../src/emqx_bridge_iotdb_action_info.erl | 71 ++++ ...pl.erl => emqx_bridge_iotdb_connector.erl} | 327 +++++++++++++++--- .../src/emqx_connector_resource.erl | 54 +-- .../src/schema/emqx_connector_ee_schema.erl | 18 +- .../src/schema/emqx_connector_schema.erl | 4 +- .../src/schema/emqx_resource_schema.erl | 2 +- rel/i18n/emqx_bridge_iotdb.hocon | 50 +++ rel/i18n/emqx_bridge_iotdb_connector.hocon | 44 +++ 13 files changed, 666 insertions(+), 170 deletions(-) create mode 100644 apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl rename apps/emqx_bridge_iotdb/src/{emqx_bridge_iotdb_impl.erl => emqx_bridge_iotdb_connector.erl} (57%) create mode 100644 rel/i18n/emqx_bridge_iotdb_connector.hocon diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index e9c51edfa..9dce9ed8d 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -84,7 +84,8 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, emqx_bridge_timescale_action_info, - emqx_bridge_redis_action_info + emqx_bridge_redis_action_info, + emqx_bridge_iotdb_action_info ]. -else. hard_coded_action_info_modules_ee() -> diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index cf733ddfd..a63249fa2 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -25,7 +25,7 @@ ]). %% emqx_connector_resource behaviour callbacks --export([connector_config/1]). +-export([connector_config/2]). -export([producer_converter/2, host_opts/0]). @@ -326,7 +326,7 @@ values(producer) -> %% `emqx_connector_resource' API %%------------------------------------------------------------------------------------------------- -connector_config(Config) -> +connector_config(Config, _) -> %% Default port for AEH is 9093 BootstrapHosts0 = maps:get(bootstrap_hosts, Config), BootstrapHosts = emqx_schema:parse_servers( diff --git a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl index a43a8a285..dcae031eb 100644 --- a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl +++ b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent_producer.erl @@ -24,7 +24,7 @@ ]). %% emqx_connector_resource behaviour callbacks --export([connector_config/1]). +-export([connector_config/2]). -export([host_opts/0]). @@ -288,7 +288,7 @@ values(action) -> %% `emqx_connector_resource' API %%------------------------------------------------------------------------------------------------- -connector_config(Config) -> +connector_config(Config, _) -> %% Default port for Confluent is 9092 BootstrapHosts0 = maps:get(bootstrap_hosts, Config), BootstrapHosts = emqx_schema:parse_servers( diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl index 91ad25d31..7e8c4a2cd 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_schema.erl @@ -133,17 +133,8 @@ fields("get_" ++ Type) -> fields("config_bridge_v2") -> fields("http_action"); fields("config_connector") -> - [ - {enable, - mk( - boolean(), - #{ - desc => <<"Enable or disable this connector">>, - default => true - } - )}, - {description, emqx_schema:description_schema()} - ] ++ connector_url_headers() ++ + emqx_connector_schema:common_fields() ++ + connector_url_headers() ++ connector_opts() ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields(connector_resource_opts) -> diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 25bafbd00..a313a9613 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_iotdb). @@ -8,7 +8,12 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). --import(hoconsc, [mk/2, enum/1, ref/2]). +-import(hoconsc, [mk/2, enum/1, ref/2, array/1]). + +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1 +]). %% hocon_schema API -export([ @@ -18,8 +23,8 @@ desc/1 ]). -%% emqx_bridge_enterprise "unofficial" API --export([conn_bridge_examples/1]). +-define(CONNECTOR_TYPE, iotdb). +-define(ACTION_TYPE, ?CONNECTOR_TYPE). %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API @@ -29,24 +34,140 @@ namespace() -> "bridge_iotdb". roots() -> []. -fields("config") -> - basic_config() ++ request_config(); -fields("post") -> - [ - type_field(), - name_field() - ] ++ fields("config"); -fields("put") -> - fields("config"); -fields("get") -> - emqx_bridge_schema:status_fields() ++ fields("post"); -fields("creation_opts") -> +%%------------------------------------------------------------------------------------------------- +%% v2 schema +%%------------------------------------------------------------------------------------------------- + +fields(action) -> + {iotdb, + mk( + hoconsc:map(name, ref(?MODULE, action_config)), + #{ + desc => <<"IoTDB Action Config">>, + required => false + } + )}; +fields(action_config) -> + emqx_resource_schema:override( + emqx_bridge_v2_schema:make_producer_action_schema( + mk( + ref(?MODULE, action_parameters), + #{ + required => true, desc => ?DESC("action_parameters") + } + ) + ), + [ + {resource_opts, + mk(ref(?MODULE, action_resource_opts), #{ + default => #{}, + desc => ?DESC(emqx_resource_schema, "resource_opts") + })} + ] + ); +fields(action_resource_opts) -> lists:filter( fun({K, _V}) -> not lists:member(K, unsupported_opts()) end, - emqx_resource_schema:fields("creation_opts") + emqx_bridge_v2_schema:resource_opts_fields() ); +fields(action_parameters) -> + [ + {is_aligned, + mk( + boolean(), + #{ + desc => ?DESC("config_is_aligned"), + default => false + } + )}, + {device_id, + mk( + binary(), + #{ + desc => ?DESC("config_device_id") + } + )}, + {iotdb_version, + mk( + hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]), + #{ + desc => ?DESC("config_iotdb_version"), + default => ?VSN_1_1_X + } + )}, + {data, + mk( + array(ref(?MODULE, action_parameters_data)), + #{ + desc => ?DESC("action_parameters_data") + } + )} + ] ++ + proplists_without( + [path, method, body, headers, request_timeout], + emqx_bridge_http_schema:fields("parameters_opts") + ); +fields(action_parameters_data) -> + [ + {timestamp, + mk( + binary(), + #{ + desc => ?DESC("config_parameters_timestamp"), + default => <<"now">> + } + )}, + {measurement, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_measurement") + } + )}, + {data_type, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_data_type"), + validator => fun(Type) -> + lists:member(Type, [ + <<"TEXT">>, + <<"BOOLEAN">>, + <<"INT32">>, + <<"INT64">>, + <<"FLOAT">>, + <<"DOUBLE">> + ]) + end + } + )}, + {value, + mk( + binary(), + #{ + required => true, + desc => ?DESC("config_parameters_value") + } + )} + ]; +fields("post_bridge_v2") -> + emqx_bridge_schema:type_and_name_fields(enum([iotdb])) ++ fields(action_config); +fields("put_bridge_v2") -> + fields(action_config); +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"); +%%------------------------------------------------------------------------------------------------- +%% v1 schema +%%------------------------------------------------------------------------------------------------- + +fields("config") -> + basic_config() ++ request_config(); +fields("creation_opts") -> + proplists_without(unsupported_opts(), emqx_resource_schema:fields("creation_opts")); fields(auth_basic) -> [ {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, @@ -55,22 +176,28 @@ fields(auth_basic) -> required => true, desc => ?DESC("config_auth_basic_password") })} - ]. + ]; +fields("post") -> + emqx_bridge_schema:type_and_name_fields(enum([iotdb])) ++ fields("config"); +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). desc("config") -> ?DESC("desc_config"); -desc("creation_opts") -> - ?DESC(emqx_resource_schema, "creation_opts"); -desc("post") -> - ["Configuration for IoTDB using `POST` method."]; -desc(Name) -> - lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), - ?DESC(Name). - -struct_names() -> - [ - auth_basic - ]. +desc(action_config) -> + ?DESC("desc_config"); +desc(action_parameters) -> + ?DESC("action_parameters"); +desc(action_parameters_data) -> + ?DESC("action_parameters_data"); +desc(auth_basic) -> + "Basic Authentication"; +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for IoTDB using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. basic_config() -> [ @@ -160,30 +287,43 @@ unsupported_opts() -> batch_time ]. -%%====================================================================================== +%%------------------------------------------------------------------------------------------------- +%% v2 examples +%%------------------------------------------------------------------------------------------------- -type_field() -> - {type, - mk( - hoconsc:enum([iotdb]), - #{ - required => true, - desc => ?DESC("desc_type") - } - )}. +bridge_v2_examples(Method) -> + [ + #{ + <<"iotdb">> => + #{ + summary => <<"Apache IoTDB Bridge">>, + value => emqx_bridge_v2_schema:action_values( + Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values() + ) + } + } + ]. -name_field() -> - {name, - mk( - binary(), - #{ - required => true, - desc => ?DESC("desc_name") - } - )}. - -%%====================================================================================== +action_values() -> + #{ + parameters => #{ + data => [ + #{ + timestamp => now, + measurement => <<"status">>, + data_type => <<"BOOLEAN">>, + value => <<"${st}">> + } + ], + is_aligned => false, + device_id => <<"${clientid}">>, + iotdb_version => ?VSN_1_1_X + } + }. +%%------------------------------------------------------------------------------------------------- +%% v1 examples +%%------------------------------------------------------------------------------------------------- conn_bridge_examples(Method) -> [ #{ diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl new file mode 100644 index 000000000..32afa2581 --- /dev/null +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_action_info.erl @@ -0,0 +1,71 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_iotdb_action_info). + +-behaviour(emqx_action_info). + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +%% behaviour callbacks +-export([ + action_type_name/0, + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + bridge_v1_type_name/0, + connector_action_config_to_bridge_v1_config/2, + connector_type_name/0, + schema_module/0 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(ACTION_TYPE, iotdb). +-define(SCHEMA_MODULE, emqx_bridge_iotdb). + +action_type_name() -> ?ACTION_TYPE. +bridge_v1_type_name() -> ?ACTION_TYPE. +connector_type_name() -> ?ACTION_TYPE. + +schema_module() -> ?SCHEMA_MODULE. + +connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> + MergedConfig = + emqx_utils_maps:deep_merge( + maps:without( + [<<"description">>, <<"local_topic">>, <<"connector">>, <<"data">>], + emqx_utils_maps:unindent(<<"parameters">>, ActionConfig) + ), + ConnectorConfig + ), + BridgeV1Keys = schema_keys("config"), + maps:with(BridgeV1Keys, MergedConfig). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(action_config), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ConnectorKeys = schema_keys(emqx_bridge_iotdb_connector, config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + maps:with(ConnectorKeys, BridgeV1Config) + ). + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config#{<<"data">> => []}), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + schema_keys(?SCHEMA_MODULE, Name). + +schema_keys(Mod, Name) -> + [bin(Key) || Key <- proplists:get_keys(Mod:fields(Name))]. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl similarity index 57% rename from apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl rename to apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index b101cfc68..c493da90c 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -1,10 +1,13 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- --module(emqx_bridge_iotdb_impl). +-module(emqx_bridge_iotdb_connector). + +-behaviour(emqx_resource). -include("emqx_bridge_iotdb.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% `emqx_resource' API @@ -14,9 +17,25 @@ on_stop/2, on_get_status/2, on_query/3, - on_query_async/4 + on_query_async/4, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). +-export([ + namespace/0, + roots/0, + fields/1, + desc/1, + connector_examples/1, + connector_example_values/0 +]). + +%% emqx_connector_resource behaviour callbacks +-export([connector_config/2]). + -type config() :: #{ base_url := #{ @@ -29,33 +48,140 @@ pool_type := random | hash, pool_size := pos_integer(), request => undefined | map(), - is_aligned => boolean(), - iotdb_version => binary(), - device_id => binary() | undefined, atom() => _ }. -type state() :: #{ base_path := _, - base_url := #{ - scheme := http | https, - host := iolist(), - port := inet:port_number(), - path := _ - }, connect_timeout := pos_integer(), pool_type := random | hash, - pool_size := pos_integer(), + channels := map(), request => undefined | map(), - is_aligned => boolean(), - iotdb_version => binary(), - device_id => binary() | undefined, atom() => _ }. -type manager_id() :: binary(). +-define(CONNECTOR_TYPE, iotdb). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +%%------------------------------------------------------------------------------------- +%% connector examples +%%------------------------------------------------------------------------------------- +connector_examples(Method) -> + [ + #{ + <<"iotdb">> => + #{ + summary => <<"Apache IoTDB Connector">>, + value => emqx_connector_schema:connector_values( + Method, ?CONNECTOR_TYPE, connector_example_values() + ) + } + } + ]. + +connector_example_values() -> + #{ + name => <<"iotdb_connector">>, + type => iotdb, + enable => true, + authentication => #{ + <<"username">> => <<"root">>, + <<"password">> => <<"*****">> + }, + base_url => <<"http://iotdb.local:18080/">>, + connect_timeout => <<"15s">>, + pool_type => <<"random">>, + pool_size => 8, + enable_pipelining => 100, + ssl => #{enable => false} + }. + +%%------------------------------------------------------------------------------------- +%% schema +%%------------------------------------------------------------------------------------- +namespace() -> "iotdb". + +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + proplists_without([url, headers], emqx_bridge_http_schema:fields("config_connector")) ++ + fields("connection_fields"); +fields("connection_fields") -> + [ + {base_url, + mk( + emqx_schema:url(), + #{ + required => true, + desc => ?DESC(emqx_bridge_iotdb, "config_base_url") + } + )}, + {authentication, + mk( + hoconsc:union([ref(?MODULE, auth_basic)]), + #{ + default => auth_basic, desc => ?DESC("config_authentication") + } + )} + ]; +fields(auth_basic) -> + [ + {username, mk(binary(), #{required => true, desc => ?DESC("config_auth_basic_username")})}, + {password, + emqx_schema_secret:mk(#{ + required => true, + desc => ?DESC("config_auth_basic_password") + })} + ]; +fields("post") -> + emqx_connector_schema:type_and_name_fields(enum([iotdb])) ++ fields(config); +fields("put") -> + fields(config); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc(config) -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for IoTDB using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +connector_config(Conf, #{name := Name, parse_confs := ParseConfs}) -> + #{ + base_url := BaseUrl, + authentication := + #{ + username := Username, + password := Password0 + } + } = Conf, + + Password = emqx_secret:unwrap(Password0), + BasicToken = base64:encode(<>), + + WebhookConfig = + Conf#{ + url => BaseUrl, + headers => [ + {<<"Content-type">>, <<"application/json">>}, + {<<"Authorization">>, BasicToken} + ] + }, + ParseConfs( + <<"http">>, + Name, + WebhookConfig + ). + +proplists_without(Keys, List) -> + [El || El = {K, _} <- List, not lists:member(K, Keys)]. + %%------------------------------------------------------------------------------------- %% `emqx_resource' API %%------------------------------------------------------------------------------------- @@ -73,7 +199,7 @@ on_start(InstanceId, Config) -> request => maps:get(request, State, <<>>) }), ?tp(iotdb_bridge_started, #{instance_id => InstanceId}), - {ok, maps:merge(Config, State)}; + {ok, State#{channels => #{}}}; {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", @@ -103,19 +229,20 @@ on_get_status(InstanceId, State) -> {ok, pos_integer(), [term()], term()} | {ok, pos_integer(), [term()]} | {error, term()}. -on_query(InstanceId, {send_message, Message}, State) -> +on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = State) -> ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_called", instance_id => InstanceId, - send_message => Message, + send_message => Req, state => emqx_utils:redact(State) }), - case make_iotdb_insert_request(Message, State) of + + case try_render_message(Req, Channels) of {ok, IoTDBPayload} -> handle_response( emqx_bridge_http_connector:on_query( - InstanceId, {send_message, IoTDBPayload}, State + InstanceId, {ChannelId, IoTDBPayload}, State ) ); Error -> @@ -124,15 +251,17 @@ on_query(InstanceId, {send_message, Message}, State) -> -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) -> {ok, pid()} | {error, empty_request}. -on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> +on_query_async( + InstanceId, {ChannelId, _Message} = Req, ReplyFunAndArgs0, #{channels := Channels} = State +) -> ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}), ?SLOG(debug, #{ msg => "iotdb_bridge_on_query_async_called", instance_id => InstanceId, - send_message => Message, + send_message => Req, state => emqx_utils:redact(State) }), - case make_iotdb_insert_request(Message, State) of + case try_render_message(Req, Channels) of {ok, IoTDBPayload} -> ReplyFunAndArgs = { @@ -143,12 +272,71 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> [] }, emqx_bridge_http_connector:on_query_async( - InstanceId, {send_message, IoTDBPayload}, ReplyFunAndArgs, State + InstanceId, {ChannelId, IoTDBPayload}, ReplyFunAndArgs, State ); Error -> Error end. +on_add_channel( + InstanceId, + #{channels := Channels} = OldState0, + ChannelId, + #{ + parameters := #{iotdb_version := Version, data := Data} = Parameter + } +) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + _ -> + %% update HTTP channel + InsertTabletPathV1 = <<"rest/v1/insertTablet">>, + InsertTabletPathV2 = <<"rest/v2/insertTablet">>, + + Path = + case Version of + ?VSN_1_1_X -> InsertTabletPathV2; + _ -> InsertTabletPathV1 + end, + + HTTPReq = #{ + parameters => Parameter#{ + path => Path, + method => <<"post">> + } + }, + + {ok, OldState} = emqx_bridge_http_connector:on_add_channel( + InstanceId, OldState0, ChannelId, HTTPReq + ), + + %% update IoTDB channel + DeviceId = maps:get(device_id, Parameter, <<>>), + Channel = Parameter#{ + device_id => emqx_placeholder:preproc_tmpl(DeviceId), + data := preproc_data_template(Data) + }, + Channels2 = Channels#{ChannelId => Channel}, + {ok, OldState#{channels := Channels2}} + end. + +on_remove_channel(InstanceId, #{channels := Channels} = OldState0, ChannelId) -> + {ok, OldState} = emqx_bridge_http_connector:on_remove_channel(InstanceId, OldState0, ChannelId), + Channels2 = maps:remove(ChannelId, Channels), + {ok, OldState#{channels => Channels2}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> + connected; + _ -> + {error, not_exists} + end. + %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- @@ -238,14 +426,14 @@ iot_timestamp(Timestamp, _, _) when is_integer(Timestamp) -> iot_timestamp(TimestampTkn, Msg, Nows) -> iot_timestamp(emqx_placeholder:proc_tmpl(TimestampTkn, Msg), Nows). +iot_timestamp(<<"now_us">>, #{now_us := NowUs}) -> + NowUs; +iot_timestamp(<<"now_ns">>, #{now_ns := NowNs}) -> + NowNs; iot_timestamp(Timestamp, #{now_ms := NowMs}) when Timestamp =:= <<"now">>; Timestamp =:= <<"now_ms">>; Timestamp =:= <<>> -> NowMs; -iot_timestamp(Timestamp, #{now_us := NowUs}) when Timestamp =:= <<"now_us">> -> - NowUs; -iot_timestamp(Timestamp, #{now_ns := NowNs}) when Timestamp =:= <<"now_ns">> -> - NowNs; iot_timestamp(Timestamp, _) when is_binary(Timestamp) -> binary_to_integer(Timestamp). @@ -304,25 +492,13 @@ convert_float(Str) when is_binary(Str) -> convert_float(undefined) -> null. -make_iotdb_insert_request(Message, State) -> - Payloads = to_list(parse_payload(get_payload(Message))), - IsAligned = maps:get(is_aligned, State, false), - IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X), - case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of - {undefined, _} -> - {error, device_id_missing}; - {_, []} -> - {error, invalid_data}; - {DeviceId, PreProcessedData} -> - DataList = proc_data(PreProcessedData, Message), - InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, - Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), - {ok, - maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, - iotdb_field_key(device_id, IotDBVsn) => DeviceId - })} - end. +make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) -> + InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, + Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), + maps:merge(Rows, #{ + iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, + iotdb_field_key(device_id, IotDBVsn) => DeviceId + }). replace_dtypes(Rows0, IotDBVsn) -> {Types, Rows} = maps:take(dtypes, Rows0), @@ -404,13 +580,12 @@ iotdb_field_key(data_types, ?VSN_0_13_X) -> to_list(List) when is_list(List) -> List; to_list(Data) -> [Data]. -device_id(Message, Payloads, State) -> - case maps:get(device_id, State, undefined) of - undefined -> - %% [FIXME] there could be conflicting device-ids in the Payloads +%% If device_id is missing from the channel data, try to find it from the payload +device_id(Message, Payloads, Channel) -> + case maps:get(device_id, Channel, <<>>) of + <<>> -> maps:get(<<"device_id">>, hd(Payloads), undefined); - DeviceId -> - DeviceIdTkn = emqx_placeholder:preproc_tmpl(DeviceId), + DeviceIdTkn -> emqx_placeholder:proc_tmpl(DeviceIdTkn, Message) end. @@ -430,3 +605,47 @@ eval_response_body(Body, Resp) -> #{<<"code">> := 200} -> Resp; Reason -> {error, Reason} end. + +preproc_data_template(DataList) -> + lists:map( + fun( + #{ + timestamp := Timestamp, + measurement := Measurement, + data_type := DataType, + value := Value + } + ) -> + #{ + timestamp => emqx_placeholder:preproc_tmpl(Timestamp), + measurement => emqx_placeholder:preproc_tmpl(Measurement), + data_type => DataType, + value => emqx_placeholder:preproc_tmpl(Value) + } + end, + DataList + ). + +try_render_message({ChannelId, Msg}, Channels) -> + case maps:find(ChannelId, Channels) of + {ok, Channel} -> + {ok, render_channel_message(Channel, Msg)}; + _ -> + {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} + end. + +render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) -> + Payloads = to_list(parse_payload(get_payload(Message))), + DataTemplate = get_data_template(Channel, Payloads), + DataList = proc_data(DataTemplate, Message), + DeviceId = device_id(Message, Payloads, Channel), + make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn). + +%% Get the message template. +%% In order to be compatible with 4.4, the template version has higher priority +%% This is a template, using it +get_data_template(#{data := Data}, _Payloads) when Data =/= [] -> + Data; +%% This is a self-describing message +get_data_template(#{data := []}, Payloads) -> + preproc_data_list(Payloads). diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 72576b22e..08c320490 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -302,54 +302,18 @@ parse_confs( method => undefined } }; -parse_confs(<<"iotdb">>, Name, Conf) -> - %% [FIXME] this has no place here, it's used in parse_confs/3, which should - %% rather delegate to a behavior callback than implementing domain knowledge - %% here (reversed dependency) - InsertTabletPathV1 = <<"rest/v1/insertTablet">>, - InsertTabletPathV2 = <<"rest/v2/insertTablet">>, - #{ - base_url := BaseURL, - authentication := - #{ - username := Username, - password := Password - } - } = Conf, - BasicToken = base64:encode(<>), - %% This version atom correspond to the macro ?VSN_1_1_X in - %% emqx_connector_iotdb.hrl. It would be better to use the macro directly, but - %% this cannot be done without introducing a dependency on the - %% emqx_iotdb_connector app (which is an EE app). - DefaultIOTDBConnector = 'v1.1.x', - Version = maps:get(iotdb_version, Conf, DefaultIOTDBConnector), - InsertTabletPath = - case Version of - DefaultIOTDBConnector -> InsertTabletPathV2; - _ -> InsertTabletPathV1 - end, - WebhookConfig = - Conf#{ - method => <<"post">>, - url => <>, - headers => [ - {<<"Content-type">>, <<"application/json">>}, - {<<"Authorization">>, BasicToken} - ] - }, - parse_confs( - <<"webhook">>, - Name, - WebhookConfig - ); -parse_confs(ConnectorType, _Name, Config) -> - connector_config(ConnectorType, Config). +parse_confs(ConnectorType, Name, Config) -> + connector_config(ConnectorType, Name, Config). -connector_config(ConnectorType, Config) -> +connector_config(ConnectorType, Name, Config) -> Mod = connector_impl_module(ConnectorType), - case erlang:function_exported(Mod, connector_config, 1) of + case erlang:function_exported(Mod, connector_config, 2) of true -> - Mod:connector_config(Config); + Mod:connector_config(Config, #{ + type => ConnectorType, + name => Name, + parse_confs => fun parse_confs/3 + }); false -> Config end. diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 4e8618915..ef78945b5 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -46,6 +46,8 @@ resource_type(timescale) -> emqx_postgresql; resource_type(redis) -> emqx_bridge_redis_connector; +resource_type(iotdb) -> + emqx_bridge_iotdb_connector; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -56,6 +58,8 @@ connector_impl_module(azure_event_hub_producer) -> emqx_bridge_azure_event_hub; connector_impl_module(confluent_producer) -> emqx_bridge_confluent_producer; +connector_impl_module(iotdb) -> + emqx_bridge_iotdb_connector; connector_impl_module(_ConnectorType) -> undefined. @@ -159,6 +163,14 @@ connector_structs() -> desc => <<"Timescale Connector Config">>, required => false } + )}, + {iotdb, + mk( + hoconsc:map(name, ref(emqx_bridge_iotdb_connector, config)), + #{ + desc => <<"IoTDB Connector Config">>, + required => false + } )} ]. @@ -175,7 +187,8 @@ schema_modules() -> emqx_bridge_syskeeper_proxy, emqx_bridge_timescale, emqx_postgresql_connector_schema, - emqx_bridge_redis_schema + emqx_bridge_redis_schema, + emqx_bridge_iotdb_connector ]. api_schemas(Method) -> @@ -201,7 +214,8 @@ api_schemas(Method) -> api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), api_ref(emqx_bridge_timescale, <<"timescale">>, Method ++ "_connector"), api_ref(emqx_postgresql_connector_schema, <<"pgsql">>, Method ++ "_connector"), - api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector") + api_ref(emqx_bridge_redis_schema, <<"redis">>, Method ++ "_connector"), + api_ref(emqx_bridge_iotdb_connector, <<"iotdb">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index f67cd6991..a0c28cc22 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -142,7 +142,9 @@ connector_type_to_bridge_types(syskeeper_forwarder) -> connector_type_to_bridge_types(syskeeper_proxy) -> []; connector_type_to_bridge_types(timescale) -> - [timescale]. + [timescale]; +connector_type_to_bridge_types(iotdb) -> + [iotdb]. actions_config_name() -> <<"actions">>. diff --git a/apps/emqx_resource/src/schema/emqx_resource_schema.erl b/apps/emqx_resource/src/schema/emqx_resource_schema.erl index 6041dc2fb..3d2417497 100644 --- a/apps/emqx_resource/src/schema/emqx_resource_schema.erl +++ b/apps/emqx_resource/src/schema/emqx_resource_schema.erl @@ -23,7 +23,7 @@ -export([namespace/0, roots/0, fields/1, desc/1]). --export([create_opts/1, resource_opts_meta/0]). +-export([create_opts/1, resource_opts_meta/0, override/2]). %% range interval in ms -define(HEALTH_CHECK_INTERVAL_RANGE_MIN, 1). diff --git a/rel/i18n/emqx_bridge_iotdb.hocon b/rel/i18n/emqx_bridge_iotdb.hocon index e38c828f5..987006a53 100644 --- a/rel/i18n/emqx_bridge_iotdb.hocon +++ b/rel/i18n/emqx_bridge_iotdb.hocon @@ -70,4 +70,54 @@ desc_name.desc: desc_name.label: """Bridge Name""" + +config_parameters_timestamp.desc: +"""Timestamp. Placeholders in format of ${var} is supported, the finally value can be:
+- now: use the `now_ms` which is contained in the payload as timestamp +- now_ms: same as above +- now_us: use the `now_us` which is contained in the payload as timestamp +- now_ns: use the `now_us` which is contained in the payload as timestamp +- any other: use the value directly as the timestamp +""" + +config_parameters_timestamp.label: +"""Timestamp""" + +config_parameters_measurement.desc: +"""Measurement. Placeholders in format of ${var} is supported""" + +config_parameters_measurement.label: +"""Measurement""" + +config_parameters_data_type.desc: +"""Data Type, can be:
+- TEXT +- BOOLEAN +- INT32 +- INT64 +- FLOAT +- DOUBLE +""" + +config_parameters_data_type.label: +"""Data type""" + +config_parameters_value.desc: +"""Value. Placeholders in format of ${var} is supported""" + +config_parameters_value.label: +"""Value""" + +action_parameters_data.desc: +"""IoTDB action parameter data""" + +action_parameters_data.label: +"""Parameter Data""" + +action_parameters.desc: +"""IoTDB action parameters""" + +action_parameters.label: +"""Parameters""" + } diff --git a/rel/i18n/emqx_bridge_iotdb_connector.hocon b/rel/i18n/emqx_bridge_iotdb_connector.hocon new file mode 100644 index 000000000..7df06f858 --- /dev/null +++ b/rel/i18n/emqx_bridge_iotdb_connector.hocon @@ -0,0 +1,44 @@ +emqx_bridge_iotdb_connector { + +config_authentication.desc: +"""Authentication configuration""" + +config_authentication.label: +"""Authentication""" + +auth_basic.desc: +"""Parameters for basic authentication.""" + +auth_basic.label: +"""Basic auth params""" + +config_auth_basic_username.desc: +"""The username as configured at the IoTDB REST interface""" + +config_auth_basic_username.label: + """HTTP Basic Auth Username""" + +config_auth_basic_password.desc: +"""The password as configured at the IoTDB REST interface""" + +config_auth_basic_password.label: +"""HTTP Basic Auth Password""" + +config_base_url.desc: +"""The base URL of the external IoTDB service's REST interface.""" +config_base_url.label: +"""IoTDB REST Service Base URL""" + +config_max_retries.desc: +"""HTTP request max retry times if failed.""" + +config_max_retries.label: +"""HTTP Request Max Retries""" + +desc_config.desc: +"""Configuration for Apache IoTDB bridge.""" + +desc_config.label: +"""IoTDB Bridge Configuration""" + +} From e20b024b6e891b684168ed9ab1fddbbd623af313 Mon Sep 17 00:00:00 2001 From: firest Date: Sat, 6 Jan 2024 13:01:43 +0800 Subject: [PATCH 2/4] fix(iotdb): fix iotdb testcases --- .../src/schema/emqx_bridge_enterprise.erl | 2 +- .../test/emqx_bridge_v2_testlib.erl | 4 +- .../src/emqx_bridge_iotdb.app.src | 2 +- .../src/emqx_bridge_iotdb.erl | 2 +- .../src/emqx_bridge_iotdb_connector.erl | 32 ++-- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 149 ++++++++++++++---- 6 files changed, 148 insertions(+), 43 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 92e295589..cc4c6eb01 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -124,7 +124,7 @@ resource_type(sqlserver) -> emqx_bridge_sqlserver_connector; resource_type(opents) -> emqx_bridge_opents_connector; resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer; resource_type(oracle) -> emqx_oracle; -resource_type(iotdb) -> emqx_bridge_iotdb_impl; +resource_type(iotdb) -> emqx_bridge_iotdb_connector; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer; resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 1f5373b70..eb8a9a5f8 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -478,11 +478,13 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), BridgeId = bridge_id(Config), + BridgeType = ?config(bridge_type, Config), + BridgeName = ?config(bridge_name, Config), Message = {BridgeId, MakeMessageFun()}, ?assertMatch( {ok, {ok, _}}, ?wait_async_action( - emqx_resource:query(ResourceId, Message, #{ + emqx_bridge_v2:query(BridgeType, BridgeName, Message, #{ async_reply_fun => {ReplyFun, [self()]} }), #{?snk_kind := TracePoint, instance_id := ResourceId}, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 42b3c165f..b071d7b3d 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -4,7 +4,7 @@ {vsn, "0.1.4"}, {modules, [ emqx_bridge_iotdb, - emqx_bridge_iotdb_impl + emqx_bridge_iotdb_connector ]}, {registered, []}, {applications, [ diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index a313a9613..4b509e908 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -316,7 +316,7 @@ action_values() -> } ], is_aligned => false, - device_id => <<"${clientid}">>, + device_id => <<"my_device">>, iotdb_version => ?VSN_1_1_X } }. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index c493da90c..1b3cd4c38 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -495,10 +495,11 @@ convert_float(undefined) -> make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) -> InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), - maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, - iotdb_field_key(device_id, IotDBVsn) => DeviceId - }). + {ok, + maps:merge(Rows, #{ + iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, + iotdb_field_key(device_id, IotDBVsn) => DeviceId + })}. replace_dtypes(Rows0, IotDBVsn) -> {Types, Rows} = maps:take(dtypes, Rows0), @@ -582,8 +583,8 @@ to_list(Data) -> [Data]. %% If device_id is missing from the channel data, try to find it from the payload device_id(Message, Payloads, Channel) -> - case maps:get(device_id, Channel, <<>>) of - <<>> -> + case maps:get(device_id, Channel, []) of + [] -> maps:get(<<"device_id">>, hd(Payloads), undefined); DeviceIdTkn -> emqx_placeholder:proc_tmpl(DeviceIdTkn, Message) @@ -629,17 +630,26 @@ preproc_data_template(DataList) -> try_render_message({ChannelId, Msg}, Channels) -> case maps:find(ChannelId, Channels) of {ok, Channel} -> - {ok, render_channel_message(Channel, Msg)}; + render_channel_message(Channel, Msg); _ -> {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} end. render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) -> Payloads = to_list(parse_payload(get_payload(Message))), - DataTemplate = get_data_template(Channel, Payloads), - DataList = proc_data(DataTemplate, Message), - DeviceId = device_id(Message, Payloads, Channel), - make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn). + case device_id(Message, Payloads, Channel) of + undefined -> + {error, device_id_missing}; + DeviceId -> + case get_data_template(Channel, Payloads) of + [] -> + {error, invalid_data}; + DataTemplate -> + DataList = proc_data(DataTemplate, Message), + + make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) + end + end. %% Get the message template. %% In order to be compatible with 4.4, the template version has higher priority diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 6951ff81c..25349121a 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -32,10 +32,10 @@ groups() -> ]. init_per_suite(Config) -> - emqx_bridge_testlib:init_per_suite(Config, ?APPS). + emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS). end_per_suite(Config) -> - emqx_bridge_testlib:end_per_suite(Config). + emqx_bridge_v2_testlib:end_per_suite(Config). init_per_group(plain = Type, Config0) -> Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"), @@ -43,7 +43,7 @@ init_per_group(plain = Type, Config0) -> ProxyName = "iotdb", case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> - Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), + Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), [ {bridge_host, Host}, {bridge_port, Port}, @@ -66,7 +66,7 @@ init_per_group(legacy = Type, Config0) -> ProxyName = "iotdb013", case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> - Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), + Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), [ {bridge_host, Host}, {bridge_port, Port}, @@ -90,18 +90,34 @@ end_per_group(Group, Config) when Group =:= plain; Group =:= legacy -> - emqx_bridge_testlib:end_per_group(Config), + emqx_bridge_v2_testlib:end_per_group(Config), ok; end_per_group(_Group, _Config) -> ok. init_per_testcase(TestCase, Config0) -> - Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3), + Type = ?config(bridge_type, Config0), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = << + (atom_to_binary(TestCase))/binary, UniqueNum/binary + >>, + {_ConfigString, ConnectorConfig} = connector_config(Name, Config0), + {_, ActionConfig} = action_config(Name, Config0), + Config = [ + {connector_type, Type}, + {connector_name, Name}, + {connector_config, ConnectorConfig}, + {bridge_type, Type}, + {bridge_name, Name}, + {bridge_config, ActionConfig} + | Config0 + ], iotdb_reset(Config), + ok = snabbkaffe:start_trace(), Config. end_per_testcase(TestCase, Config) -> - emqx_bridge_testlib:end_per_testcase(TestCase, Config). + emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config). %%------------------------------------------------------------------------------ %% Helper fns @@ -114,7 +130,7 @@ iotdb_server_url(Host, Port) -> integer_to_binary(Port) ]). -bridge_config(TestCase, _TestGroup, Config) -> +bridge_config(TestCase, Config) -> UniqueNum = integer_to_binary(erlang:unique_integer()), Host = ?config(bridge_host, Config), Port = ?config(bridge_port, Config), @@ -149,7 +165,7 @@ bridge_config(TestCase, _TestGroup, Config) -> Version ] ), - {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}. + {Name, ConfigString, emqx_bridge_v2_testlib:parse_and_check(Type, Name, ConfigString)}. make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ @@ -201,7 +217,7 @@ iotdb_request(Config, Path, Body, Opts) -> <<"password">> := Password } } = - ?config(bridge_config, Config), + ?config(connector_config, Config), ct:pal("bridge config: ~p", [_BridgeConfig]), URL = <>, BasicToken = base64:encode(<>), @@ -238,6 +254,76 @@ is_error_check(Reason) -> ?assertEqual({error, Reason}, Result) end. +action_config(Name, Config) -> + Version = ?config(iotdb_version, Config), + Type = ?config(bridge_type, Config), + ConfigString = + io_lib:format( + "actions.~s.~s {\n" + " enable = true\n" + " connector = \"~s\"\n" + " parameters = {\n" + " iotdb_version = \"~s\"\n" + " data = []\n" + " }\n" + "}\n", + [ + Type, + Name, + Name, + Version + ] + ), + ct:pal("ActionConfig:~ts~n", [ConfigString]), + {ConfigString, parse_action_and_check(ConfigString, Type, Name)}. + +connector_config(Name, Config) -> + Host = ?config(bridge_host, Config), + Port = ?config(bridge_port, Config), + Type = ?config(bridge_type, Config), + ServerURL = iotdb_server_url(Host, Port), + ConfigString = + io_lib:format( + "connectors.~s.~s {\n" + " enable = true\n" + " base_url = \"~s\"\n" + " authentication = {\n" + " username = \"root\"\n" + " password = \"root\"\n" + " }\n" + "}\n", + [ + Type, + Name, + ServerURL + ] + ), + ct:pal("ConnectorConfig:~ts~n", [ConfigString]), + {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}. + +parse_action_and_check(ConfigString, BridgeType, Name) -> + parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name). + +parse_connector_and_check(ConfigString, ConnectorType, Name) -> + parse_and_check( + ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name + ). +%% emqx_utils_maps:safe_atom_key_map(Config). + +parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) -> + Type = to_bin(Type0), + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}), + #{RootKey := #{Type := #{Name := Config}}} = RawConf, + Config. + +to_bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8); +to_bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom); +to_bin(Bin) when is_binary(Bin) -> + Bin. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -246,7 +332,7 @@ t_sync_query_simple(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), - ok = emqx_bridge_testlib:t_sync_query( + ok = emqx_bridge_v2_testlib:t_sync_query( Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query ), Query = <<"select temp from ", DeviceId/binary>>, @@ -260,7 +346,7 @@ t_async_query(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), - ok = emqx_bridge_testlib:t_async_query( + ok = emqx_bridge_v2_testlib:t_async_query( Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async ), Query = <<"select temp from ", DeviceId/binary>>, @@ -310,7 +396,7 @@ t_sync_query_aggregated(Config) -> make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300) ], MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), - ok = emqx_bridge_testlib:t_sync_query( + ok = emqx_bridge_v2_testlib:t_sync_query( Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query ), @@ -369,10 +455,12 @@ t_sync_query_fail(Config) -> fun(Result) -> ?assertEqual(error, element(1, Result)) end, - emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). + emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + ). t_sync_device_id_missing(Config) -> - emqx_bridge_testlib:t_sync_query( + emqx_bridge_v2_testlib:t_sync_query( Config, make_message_fun(iotdb_topic(Config), #{foo => bar}), is_error_check(device_id_missing), @@ -387,7 +475,7 @@ t_extract_device_id_from_rule_engine_message(Config) -> Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)), ?check_trace( begin - {ok, _} = emqx_bridge_testlib:create_bridge(Config), + {ok, _} = emqx_bridge_v2_testlib:create_bridge(Config), SQL = << "SELECT\n" " payload.measurement, payload.data_type, payload.value, payload.device_id\n" @@ -397,7 +485,7 @@ t_extract_device_id_from_rule_engine_message(Config) -> "\"" >>, Opts = #{sql => SQL}, - {ok, _} = emqx_bridge_testlib:create_rule_and_action_http( + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http( BridgeType, RuleTopic, Config, Opts ), emqx:publish(Message), @@ -415,7 +503,7 @@ t_extract_device_id_from_rule_engine_message(Config) -> ok. t_sync_invalid_data(Config) -> - emqx_bridge_testlib:t_sync_query( + emqx_bridge_v2_testlib:t_sync_query( Config, make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), is_error_check(invalid_data), @@ -423,7 +511,7 @@ t_sync_invalid_data(Config) -> ). t_async_device_id_missing(Config) -> - emqx_bridge_testlib:t_async_query( + emqx_bridge_v2_testlib:t_async_query( Config, make_message_fun(iotdb_topic(Config), #{foo => bar}), is_error_check(device_id_missing), @@ -431,7 +519,7 @@ t_async_device_id_missing(Config) -> ). t_async_invalid_data(Config) -> - emqx_bridge_testlib:t_async_query( + emqx_bridge_v2_testlib:t_async_query( Config, make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), is_error_check(invalid_data), @@ -439,18 +527,19 @@ t_async_invalid_data(Config) -> ). t_create_via_http(Config) -> - emqx_bridge_testlib:t_create_via_http(Config). + emqx_bridge_v2_testlib:t_create_via_http(Config). t_start_stop(Config) -> - emqx_bridge_testlib:t_start_stop(Config, iotdb_bridge_stopped). + emqx_bridge_v2_testlib:t_start_stop(Config, iotdb_bridge_stopped). t_on_get_status(Config) -> - emqx_bridge_testlib:t_on_get_status(Config). + emqx_bridge_v2_testlib:t_on_get_status(Config). t_device_id(Config) -> - ResourceId = emqx_bridge_testlib:resource_id(Config), %% Create without device_id configured - ?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)), + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), + ResourceId = emqx_bridge_v2_testlib:resource_id(Config), + BridgeId = emqx_bridge_v2_testlib:bridge_id(Config), ?retry( _Sleep = 1_000, _Attempts = 20, @@ -463,9 +552,11 @@ t_device_id(Config) -> iotdb_reset(Config, ConfiguredDevice), Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true), MessageF1 = make_message_fun(Topic, Payload1), + is_success_check( - emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) + emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()}) ), + {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]), #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1), @@ -476,10 +567,12 @@ t_device_id(Config) -> %% reconfigure bridge with device_id {ok, _} = - emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}), + emqx_bridge_v2_testlib:update_bridge_api(Config, #{ + <<"parameters">> => #{<<"device_id">> => ConfiguredDevice} + }), is_success_check( - emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) + emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()}) ), %% even though we had a device_id in the message it's not being used From d221f34ce8b46ad90f9fabf2099768450bc33bfc Mon Sep 17 00:00:00 2001 From: firest Date: Sat, 6 Jan 2024 13:34:07 +0800 Subject: [PATCH 3/4] chore: bump version && update changes --- .../src/emqx_bridge_azure_event_hub.app.src | 2 +- apps/emqx_bridge_confluent/src/emqx_bridge_confluent.app.src | 2 +- apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src | 2 +- apps/emqx_resource/src/emqx_resource.app.src | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src index 12d0890c3..b588d1e18 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_azure_event_hub, [ {description, "EMQX Enterprise Azure Event Hub Bridge"}, - {vsn, "0.1.5"}, + {vsn, "0.1.6"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent.app.src b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent.app.src index 64d1dec09..e63b8df6a 100644 --- a/apps/emqx_bridge_confluent/src/emqx_bridge_confluent.app.src +++ b/apps/emqx_bridge_confluent/src/emqx_bridge_confluent.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_confluent, [ {description, "EMQX Enterprise Confluent Connector and Action"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {applications, [ kernel, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index b071d7b3d..d7044b063 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge_iotdb, [ {description, "EMQX Enterprise Apache IoTDB Bridge"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {modules, [ emqx_bridge_iotdb, emqx_bridge_iotdb_connector diff --git a/apps/emqx_resource/src/emqx_resource.app.src b/apps/emqx_resource/src/emqx_resource.app.src index 6649d0ef2..8ee5d7a04 100644 --- a/apps/emqx_resource/src/emqx_resource.app.src +++ b/apps/emqx_resource/src/emqx_resource.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_resource, [ {description, "Manager for all external resources"}, - {vsn, "0.1.26"}, + {vsn, "0.1.27"}, {registered, []}, {mod, {emqx_resource_app, []}}, {applications, [ From 6f4b22e376ac4620ea3c65f0495514f07008c25d Mon Sep 17 00:00:00 2001 From: firest Date: Sat, 6 Jan 2024 16:36:14 +0800 Subject: [PATCH 4/4] test(iotdb): add template test case for IoTDB --- .../src/emqx_bridge_iotdb.erl | 4 ++ .../src/emqx_bridge_iotdb_connector.erl | 2 + .../test/emqx_bridge_iotdb_impl_SUITE.erl | 57 +++++++++++++++++++ changes/feat-12261.en.md | 1 + rel/i18n/emqx_bridge_iotdb.hocon | 10 ++-- scripts/spellcheck/dicts/emqx.txt | 2 + 6 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 changes/feat-12261.en.md diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index 4b509e908..34591332b 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -192,6 +192,10 @@ desc(action_parameters) -> ?DESC("action_parameters"); desc(action_parameters_data) -> ?DESC("action_parameters_data"); +desc(action_resource_opts) -> + "Action Resource Options"; +desc("creation_opts") -> + "Creation Options"; desc(auth_basic) -> "Basic Authentication"; desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index 1b3cd4c38..5f6e31aaf 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -147,6 +147,8 @@ fields("get") -> desc(config) -> ?DESC("desc_config"); +desc(auth_basic) -> + "Basic Authentication"; desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for IoTDB using `", string:to_upper(Method), "` method."]; desc(_) -> diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 25349121a..6b9af7b9a 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -589,6 +589,63 @@ t_device_id(Config) -> iotdb_reset(Config, ConfiguredDevice), ok. +t_template(Config) -> + %% Create without data configured + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), + ResourceId = emqx_bridge_v2_testlib:resource_id(Config), + BridgeId = emqx_bridge_v2_testlib:bridge_id(Config), + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + TemplateDeviceId = <<"root.deviceWithTemplate">>, + DeviceId = <<"root.deviceWithoutTemplate">>, + Topic = <<"some/random/topic">>, + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TemplateDeviceId), + Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true), + MessageF1 = make_message_fun(Topic, Payload1), + + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()}) + ), + + {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + ?assertMatch(#{<<"values">> := [[true]]}, emqx_utils_json:decode(Res1_1)), + + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TemplateDeviceId), + + %% reconfigure with data template + {ok, _} = + emqx_bridge_v2_testlib:update_bridge_api(Config, #{ + <<"parameters">> => #{ + <<"device_id">> => TemplateDeviceId, + <<"data">> => [ + #{ + <<"measurement">> => <<"${payload.measurement}">>, + <<"data_type">> => "TEXT", + <<"value">> => <<"${payload.device_id}">> + } + ] + } + }), + + is_success_check( + emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()}) + ), + + {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query( + Config, <<"select * from ", TemplateDeviceId/binary>> + ), + + ?assertMatch(#{<<"values">> := [[<>]]}, emqx_utils_json:decode(Res2_2)), + + iotdb_reset(Config, DeviceId), + iotdb_reset(Config, TemplateDeviceId), + ok. + is_empty(null) -> true; is_empty([]) -> true; is_empty([[]]) -> true; diff --git a/changes/feat-12261.en.md b/changes/feat-12261.en.md new file mode 100644 index 000000000..52949b5d2 --- /dev/null +++ b/changes/feat-12261.en.md @@ -0,0 +1 @@ +The bridges for IoTDB have been split so it is available via the connectors and actions APIs. They are still backwards compatible with the old bridge API. diff --git a/rel/i18n/emqx_bridge_iotdb.hocon b/rel/i18n/emqx_bridge_iotdb.hocon index 987006a53..c1841428a 100644 --- a/rel/i18n/emqx_bridge_iotdb.hocon +++ b/rel/i18n/emqx_bridge_iotdb.hocon @@ -72,13 +72,12 @@ desc_name.label: """Bridge Name""" config_parameters_timestamp.desc: -"""Timestamp. Placeholders in format of ${var} is supported, the finally value can be:
+"""Timestamp. Placeholders in format of ${var} is supported, the final value can be:
- now: use the `now_ms` which is contained in the payload as timestamp - now_ms: same as above - now_us: use the `now_us` which is contained in the payload as timestamp -- now_ns: use the `now_us` which is contained in the payload as timestamp -- any other: use the value directly as the timestamp -""" +- now_ns: use the `now_ns` which is contained in the payload as timestamp +- any other: use the value directly as the timestamp""" config_parameters_timestamp.label: """Timestamp""" @@ -96,8 +95,7 @@ config_parameters_data_type.desc: - INT32 - INT64 - FLOAT -- DOUBLE -""" +- DOUBLE""" config_parameters_data_type.label: """Data type""" diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 9c1799a36..401df33a6 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -295,3 +295,5 @@ upstream priv Syskeeper msacc +now_us +ns