diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 92e295589..cc4c6eb01 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -124,7 +124,7 @@ resource_type(sqlserver) -> emqx_bridge_sqlserver_connector; resource_type(opents) -> emqx_bridge_opents_connector; resource_type(pulsar_producer) -> emqx_bridge_pulsar_impl_producer; resource_type(oracle) -> emqx_oracle; -resource_type(iotdb) -> emqx_bridge_iotdb_impl; +resource_type(iotdb) -> emqx_bridge_iotdb_connector; resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer; resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 1f5373b70..eb8a9a5f8 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -478,11 +478,13 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) -> ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), BridgeId = bridge_id(Config), + BridgeType = ?config(bridge_type, Config), + BridgeName = ?config(bridge_name, Config), Message = {BridgeId, MakeMessageFun()}, ?assertMatch( {ok, {ok, _}}, ?wait_async_action( - emqx_resource:query(ResourceId, Message, #{ + emqx_bridge_v2:query(BridgeType, BridgeName, Message, #{ async_reply_fun => {ReplyFun, [self()]} }), #{?snk_kind := TracePoint, instance_id := ResourceId}, diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src index 42b3c165f..b071d7b3d 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.app.src @@ -4,7 +4,7 @@ {vsn, "0.1.4"}, {modules, [ emqx_bridge_iotdb, - emqx_bridge_iotdb_impl + emqx_bridge_iotdb_connector ]}, {registered, []}, {applications, [ diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl index a313a9613..4b509e908 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb.erl @@ -316,7 +316,7 @@ action_values() -> } ], is_aligned => false, - device_id => <<"${clientid}">>, + device_id => <<"my_device">>, iotdb_version => ?VSN_1_1_X } }. diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index c493da90c..1b3cd4c38 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -495,10 +495,11 @@ convert_float(undefined) -> make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) -> InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), - maps:merge(Rows, #{ - iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, - iotdb_field_key(device_id, IotDBVsn) => DeviceId - }). + {ok, + maps:merge(Rows, #{ + iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, + iotdb_field_key(device_id, IotDBVsn) => DeviceId + })}. replace_dtypes(Rows0, IotDBVsn) -> {Types, Rows} = maps:take(dtypes, Rows0), @@ -582,8 +583,8 @@ to_list(Data) -> [Data]. %% If device_id is missing from the channel data, try to find it from the payload device_id(Message, Payloads, Channel) -> - case maps:get(device_id, Channel, <<>>) of - <<>> -> + case maps:get(device_id, Channel, []) of + [] -> maps:get(<<"device_id">>, hd(Payloads), undefined); DeviceIdTkn -> emqx_placeholder:proc_tmpl(DeviceIdTkn, Message) @@ -629,17 +630,26 @@ preproc_data_template(DataList) -> try_render_message({ChannelId, Msg}, Channels) -> case maps:find(ChannelId, Channels) of {ok, Channel} -> - {ok, render_channel_message(Channel, Msg)}; + render_channel_message(Channel, Msg); _ -> {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} end. render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) -> Payloads = to_list(parse_payload(get_payload(Message))), - DataTemplate = get_data_template(Channel, Payloads), - DataList = proc_data(DataTemplate, Message), - DeviceId = device_id(Message, Payloads, Channel), - make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn). + case device_id(Message, Payloads, Channel) of + undefined -> + {error, device_id_missing}; + DeviceId -> + case get_data_template(Channel, Payloads) of + [] -> + {error, invalid_data}; + DataTemplate -> + DataList = proc_data(DataTemplate, Message), + + make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) + end + end. %% Get the message template. %% In order to be compatible with 4.4, the template version has higher priority diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 6951ff81c..25349121a 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -32,10 +32,10 @@ groups() -> ]. init_per_suite(Config) -> - emqx_bridge_testlib:init_per_suite(Config, ?APPS). + emqx_bridge_v2_testlib:init_per_suite(Config, ?APPS). end_per_suite(Config) -> - emqx_bridge_testlib:end_per_suite(Config). + emqx_bridge_v2_testlib:end_per_suite(Config). init_per_group(plain = Type, Config0) -> Host = os:getenv("IOTDB_PLAIN_HOST", "toxiproxy.emqx.net"), @@ -43,7 +43,7 @@ init_per_group(plain = Type, Config0) -> ProxyName = "iotdb", case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> - Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), + Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), [ {bridge_host, Host}, {bridge_port, Port}, @@ -66,7 +66,7 @@ init_per_group(legacy = Type, Config0) -> ProxyName = "iotdb013", case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> - Config = emqx_bridge_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), + Config = emqx_bridge_v2_testlib:init_per_group(Type, ?BRIDGE_TYPE_BIN, Config0), [ {bridge_host, Host}, {bridge_port, Port}, @@ -90,18 +90,34 @@ end_per_group(Group, Config) when Group =:= plain; Group =:= legacy -> - emqx_bridge_testlib:end_per_group(Config), + emqx_bridge_v2_testlib:end_per_group(Config), ok; end_per_group(_Group, _Config) -> ok. init_per_testcase(TestCase, Config0) -> - Config = emqx_bridge_testlib:init_per_testcase(TestCase, Config0, fun bridge_config/3), + Type = ?config(bridge_type, Config0), + UniqueNum = integer_to_binary(erlang:unique_integer()), + Name = << + (atom_to_binary(TestCase))/binary, UniqueNum/binary + >>, + {_ConfigString, ConnectorConfig} = connector_config(Name, Config0), + {_, ActionConfig} = action_config(Name, Config0), + Config = [ + {connector_type, Type}, + {connector_name, Name}, + {connector_config, ConnectorConfig}, + {bridge_type, Type}, + {bridge_name, Name}, + {bridge_config, ActionConfig} + | Config0 + ], iotdb_reset(Config), + ok = snabbkaffe:start_trace(), Config. end_per_testcase(TestCase, Config) -> - emqx_bridge_testlib:end_per_testcase(TestCase, Config). + emqx_bridge_v2_testlib:end_per_testcase(TestCase, Config). %%------------------------------------------------------------------------------ %% Helper fns @@ -114,7 +130,7 @@ iotdb_server_url(Host, Port) -> integer_to_binary(Port) ]). -bridge_config(TestCase, _TestGroup, Config) -> +bridge_config(TestCase, Config) -> UniqueNum = integer_to_binary(erlang:unique_integer()), Host = ?config(bridge_host, Config), Port = ?config(bridge_port, Config), @@ -149,7 +165,7 @@ bridge_config(TestCase, _TestGroup, Config) -> Version ] ), - {Name, ConfigString, emqx_bridge_testlib:parse_and_check(Type, Name, ConfigString)}. + {Name, ConfigString, emqx_bridge_v2_testlib:parse_and_check(Type, Name, ConfigString)}. make_iotdb_payload(DeviceId, Measurement, Type, Value) -> #{ @@ -201,7 +217,7 @@ iotdb_request(Config, Path, Body, Opts) -> <<"password">> := Password } } = - ?config(bridge_config, Config), + ?config(connector_config, Config), ct:pal("bridge config: ~p", [_BridgeConfig]), URL = <>, BasicToken = base64:encode(<>), @@ -238,6 +254,76 @@ is_error_check(Reason) -> ?assertEqual({error, Reason}, Result) end. +action_config(Name, Config) -> + Version = ?config(iotdb_version, Config), + Type = ?config(bridge_type, Config), + ConfigString = + io_lib:format( + "actions.~s.~s {\n" + " enable = true\n" + " connector = \"~s\"\n" + " parameters = {\n" + " iotdb_version = \"~s\"\n" + " data = []\n" + " }\n" + "}\n", + [ + Type, + Name, + Name, + Version + ] + ), + ct:pal("ActionConfig:~ts~n", [ConfigString]), + {ConfigString, parse_action_and_check(ConfigString, Type, Name)}. + +connector_config(Name, Config) -> + Host = ?config(bridge_host, Config), + Port = ?config(bridge_port, Config), + Type = ?config(bridge_type, Config), + ServerURL = iotdb_server_url(Host, Port), + ConfigString = + io_lib:format( + "connectors.~s.~s {\n" + " enable = true\n" + " base_url = \"~s\"\n" + " authentication = {\n" + " username = \"root\"\n" + " password = \"root\"\n" + " }\n" + "}\n", + [ + Type, + Name, + ServerURL + ] + ), + ct:pal("ConnectorConfig:~ts~n", [ConfigString]), + {ConfigString, parse_connector_and_check(ConfigString, Type, Name)}. + +parse_action_and_check(ConfigString, BridgeType, Name) -> + parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name). + +parse_connector_and_check(ConfigString, ConnectorType, Name) -> + parse_and_check( + ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name + ). +%% emqx_utils_maps:safe_atom_key_map(Config). + +parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) -> + Type = to_bin(Type0), + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}), + #{RootKey := #{Type := #{Name := Config}}} = RawConf, + Config. + +to_bin(List) when is_list(List) -> + unicode:characters_to_binary(List, utf8); +to_bin(Atom) when is_atom(Atom) -> + erlang:atom_to_binary(Atom); +to_bin(Bin) when is_binary(Bin) -> + Bin. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -246,7 +332,7 @@ t_sync_query_simple(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), - ok = emqx_bridge_testlib:t_sync_query( + ok = emqx_bridge_v2_testlib:t_sync_query( Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query ), Query = <<"select temp from ", DeviceId/binary>>, @@ -260,7 +346,7 @@ t_async_query(Config) -> DeviceId = iotdb_device(Config), Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "36"), MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), - ok = emqx_bridge_testlib:t_async_query( + ok = emqx_bridge_v2_testlib:t_async_query( Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query_async ), Query = <<"select temp from ", DeviceId/binary>>, @@ -310,7 +396,7 @@ t_sync_query_aggregated(Config) -> make_iotdb_payload(DeviceId, "foo", "TEXT", "bar", 1685112026300) ], MakeMessageFun = make_message_fun(iotdb_topic(Config), Payload), - ok = emqx_bridge_testlib:t_sync_query( + ok = emqx_bridge_v2_testlib:t_sync_query( Config, MakeMessageFun, fun is_success_check/1, iotdb_bridge_on_query ), @@ -369,10 +455,12 @@ t_sync_query_fail(Config) -> fun(Result) -> ?assertEqual(error, element(1, Result)) end, - emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). + emqx_bridge_v2_testlib:t_sync_query( + Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query + ). t_sync_device_id_missing(Config) -> - emqx_bridge_testlib:t_sync_query( + emqx_bridge_v2_testlib:t_sync_query( Config, make_message_fun(iotdb_topic(Config), #{foo => bar}), is_error_check(device_id_missing), @@ -387,7 +475,7 @@ t_extract_device_id_from_rule_engine_message(Config) -> Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)), ?check_trace( begin - {ok, _} = emqx_bridge_testlib:create_bridge(Config), + {ok, _} = emqx_bridge_v2_testlib:create_bridge(Config), SQL = << "SELECT\n" " payload.measurement, payload.data_type, payload.value, payload.device_id\n" @@ -397,7 +485,7 @@ t_extract_device_id_from_rule_engine_message(Config) -> "\"" >>, Opts = #{sql => SQL}, - {ok, _} = emqx_bridge_testlib:create_rule_and_action_http( + {ok, _} = emqx_bridge_v2_testlib:create_rule_and_action_http( BridgeType, RuleTopic, Config, Opts ), emqx:publish(Message), @@ -415,7 +503,7 @@ t_extract_device_id_from_rule_engine_message(Config) -> ok. t_sync_invalid_data(Config) -> - emqx_bridge_testlib:t_sync_query( + emqx_bridge_v2_testlib:t_sync_query( Config, make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), is_error_check(invalid_data), @@ -423,7 +511,7 @@ t_sync_invalid_data(Config) -> ). t_async_device_id_missing(Config) -> - emqx_bridge_testlib:t_async_query( + emqx_bridge_v2_testlib:t_async_query( Config, make_message_fun(iotdb_topic(Config), #{foo => bar}), is_error_check(device_id_missing), @@ -431,7 +519,7 @@ t_async_device_id_missing(Config) -> ). t_async_invalid_data(Config) -> - emqx_bridge_testlib:t_async_query( + emqx_bridge_v2_testlib:t_async_query( Config, make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), is_error_check(invalid_data), @@ -439,18 +527,19 @@ t_async_invalid_data(Config) -> ). t_create_via_http(Config) -> - emqx_bridge_testlib:t_create_via_http(Config). + emqx_bridge_v2_testlib:t_create_via_http(Config). t_start_stop(Config) -> - emqx_bridge_testlib:t_start_stop(Config, iotdb_bridge_stopped). + emqx_bridge_v2_testlib:t_start_stop(Config, iotdb_bridge_stopped). t_on_get_status(Config) -> - emqx_bridge_testlib:t_on_get_status(Config). + emqx_bridge_v2_testlib:t_on_get_status(Config). t_device_id(Config) -> - ResourceId = emqx_bridge_testlib:resource_id(Config), %% Create without device_id configured - ?assertMatch({ok, _}, emqx_bridge_testlib:create_bridge(Config)), + ?assertMatch({ok, _}, emqx_bridge_v2_testlib:create_bridge(Config)), + ResourceId = emqx_bridge_v2_testlib:resource_id(Config), + BridgeId = emqx_bridge_v2_testlib:bridge_id(Config), ?retry( _Sleep = 1_000, _Attempts = 20, @@ -463,9 +552,11 @@ t_device_id(Config) -> iotdb_reset(Config, ConfiguredDevice), Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true), MessageF1 = make_message_fun(Topic, Payload1), + is_success_check( - emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) + emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()}) ), + {ok, {{_, 200, _}, _, Res1_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]), #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1), @@ -476,10 +567,12 @@ t_device_id(Config) -> %% reconfigure bridge with device_id {ok, _} = - emqx_bridge_testlib:update_bridge_api(Config, #{<<"device_id">> => ConfiguredDevice}), + emqx_bridge_v2_testlib:update_bridge_api(Config, #{ + <<"parameters">> => #{<<"device_id">> => ConfiguredDevice} + }), is_success_check( - emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) + emqx_resource:simple_sync_query(ResourceId, {BridgeId, MessageF1()}) ), %% even though we had a device_id in the message it's not being used