From 1e4cee05df2fe14cf005ad602d602e0d6d55bf74 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 31 May 2023 15:57:22 +0200 Subject: [PATCH] fix(emqx_bridge_iotdb): handle rule engine passed payload also remove topic logic as it's duplicated functionality via rule engine --- .../src/emqx_bridge_iotdb_impl.erl | 47 +++++----- .../test/emqx_bridge_iotdb_impl_SUITE.erl | 89 ++++++++----------- 2 files changed, 58 insertions(+), 78 deletions(-) diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl index ec7250a8d..4e64f07b5 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_impl.erl @@ -153,10 +153,17 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) -> %% Internal Functions %%-------------------------------------------------------------------- -make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) -> - emqx_utils_json:decode(PayloadUnparsed); -make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) -> - lists:map(fun make_parsed_payload/1, PayloadUnparsed). +get_payload(#{payload := Payload}) -> + Payload; +get_payload(#{<<"payload">> := Payload}) -> + Payload. + +parse_payload(ParsedPayload) when is_map(ParsedPayload) -> + ParsedPayload; +parse_payload(UnparsedPayload) when is_binary(UnparsedPayload) -> + emqx_utils_json:decode(UnparsedPayload); +parse_payload(UnparsedPayloads) when is_list(UnparsedPayloads) -> + lists:map(fun parse_payload/1, UnparsedPayloads). preproc_data_list(DataList) -> lists:foldl( @@ -297,16 +304,16 @@ convert_float(Str) when is_binary(Str) -> convert_float(undefined) -> null. -make_iotdb_insert_request(MessageUnparsedPayload, State) -> - Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload), +make_iotdb_insert_request(Message, State) -> + Payloads = to_list(parse_payload(get_payload(Message))), IsAligned = maps:get(is_aligned, State, false), - DeviceId = device_id(Message, State), IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X), - Payload = make_list(maps:get(payload, Message)), - case preproc_data_list(Payload) of - [] -> + case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of + {undefined, _} -> + {error, device_id_missing}; + {_, []} -> {error, invalid_data}; - PreProcessedData -> + {DeviceId, PreProcessedData} -> DataList = proc_data(PreProcessedData, Message), InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), @@ -394,22 +401,14 @@ iotdb_field_key(data_types, ?VSN_1_0_X) -> iotdb_field_key(data_types, ?VSN_0_13_X) -> <<"dataTypes">>. -make_list(List) when is_list(List) -> List; -make_list(Data) -> [Data]. +to_list(List) when is_list(List) -> List; +to_list(Data) -> [Data]. -device_id(Message, State) -> +device_id(Message, Payloads, State) -> case maps:get(device_id, State, undefined) of undefined -> - case maps:get(payload, Message) of - #{<<"device_id">> := DeviceId} -> - DeviceId; - _NotFound -> - Topic = maps:get(topic, Message), - case re:replace(Topic, "/", ".", [global, {return, binary}]) of - <<"root.", _/binary>> = Device -> Device; - Device -> <<"root.", Device/binary>> - end - end; + %% [FIXME] there could be conflicting device-ids in the Payloads + maps:get(<<"device_id">>, hd(Payloads), undefined); DeviceId -> DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId), emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message) diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index 55d3743d4..d89b870d0 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -231,6 +231,11 @@ is_success_check({ok, 200, _, Body}) -> is_code(Code, #{<<"code">> := Code}) -> true; is_code(_, _) -> false. +is_error_check(Reason) -> + fun(Result) -> + ?assertEqual({error, Reason}, Result) + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -364,33 +369,37 @@ t_sync_query_fail(Config) -> end, emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query). -t_sync_query_badpayload(Config) -> - BadPayload = #{foo => bar}, - IsSuccessCheck = - fun(Result) -> - ?assertEqual({error, invalid_data}, Result) - end, +t_sync_device_id_missing(Config) -> emqx_bridge_testlib:t_sync_query( Config, - make_message_fun(iotdb_topic(Config), BadPayload), - IsSuccessCheck, + make_message_fun(iotdb_topic(Config), #{foo => bar}), + is_error_check(device_id_missing), iotdb_bridge_on_query - ), - ok. + ). -t_async_query_badpayload(Config) -> - BadPayload = #{foo => bar}, - IsSuccessCheck = - fun(Result) -> - ?assertEqual({error, invalid_data}, Result) - end, +t_sync_invalid_data(Config) -> + emqx_bridge_testlib:t_sync_query( + Config, + make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), + is_error_check(invalid_data), + iotdb_bridge_on_query + ). + +t_async_device_id_missing(Config) -> emqx_bridge_testlib:t_async_query( Config, - make_message_fun(iotdb_topic(Config), BadPayload), - IsSuccessCheck, + make_message_fun(iotdb_topic(Config), #{foo => bar}), + is_error_check(device_id_missing), iotdb_bridge_on_query_async - ), - ok. + ). + +t_async_invalid_data(Config) -> + emqx_bridge_testlib:t_async_query( + Config, + make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}), + is_error_check(invalid_data), + iotdb_bridge_on_query_async + ). t_create_via_http(Config) -> emqx_bridge_testlib:t_create_via_http(Config). @@ -413,13 +422,10 @@ t_device_id(Config) -> ConfiguredDevice = <<"root.someOtherDevice234">>, DeviceId = <<"root.deviceFooBar123">>, Topic = <<"some/random/topic">>, - TopicDevice = topic_to_iotdb_device(Topic), iotdb_reset(Config, DeviceId), - iotdb_reset(Config, TopicDevice), iotdb_reset(Config, ConfiguredDevice), Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true), MessageF1 = make_message_fun(Topic, Payload1), - ?assertNotEqual(DeviceId, TopicDevice), is_success_check( emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()}) ), @@ -427,29 +433,8 @@ t_device_id(Config) -> ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]), #{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1), ?assertNot(is_empty(Values1_1)), - {ok, {{_, 200, _}, _, Res1_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), - ct:pal("topic device result: ~p", [emqx_utils_json:decode(Res1_2)]), - #{<<"values">> := Values1_2} = emqx_utils_json:decode(Res1_2), - ?assert(is_empty(Values1_2)), - - %% test without device_id in message, taking it from topic - iotdb_reset(Config, DeviceId), - iotdb_reset(Config, TopicDevice), - iotdb_reset(Config, ConfiguredDevice), - Payload2 = maps:remove(device_id, make_iotdb_payload(DeviceId, "root", "BOOLEAN", true)), - MessageF2 = make_message_fun(Topic, Payload2), - is_success_check( - emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF2()}) - ), - {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), - #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1), - ?assert(is_empty(Values2_1)), - {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), - #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2), - ?assertNot(is_empty(Values2_2)), iotdb_reset(Config, DeviceId), - iotdb_reset(Config, TopicDevice), iotdb_reset(Config, ConfiguredDevice), %% reconfigure bridge with device_id @@ -461,20 +446,16 @@ t_device_id(Config) -> ), %% even though we had a device_id in the message it's not being used - {ok, {{_, 200, _}, _, Res3_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), - #{<<"values">> := Values3_1} = emqx_utils_json:decode(Res3_1), - ?assert(is_empty(Values3_1)), - {ok, {{_, 200, _}, _, Res3_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>), - #{<<"values">> := Values3_2} = emqx_utils_json:decode(Res3_2), - ?assert(is_empty(Values3_2)), - {ok, {{_, 200, _}, _, Res3_3}} = iotdb_query( + {ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>), + #{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1), + ?assert(is_empty(Values2_1)), + {ok, {{_, 200, _}, _, Res2_2}} = iotdb_query( Config, <<"select * from ", ConfiguredDevice/binary>> ), - #{<<"values">> := Values3_3} = emqx_utils_json:decode(Res3_3), - ?assertNot(is_empty(Values3_3)), + #{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2), + ?assertNot(is_empty(Values2_2)), iotdb_reset(Config, DeviceId), - iotdb_reset(Config, TopicDevice), iotdb_reset(Config, ConfiguredDevice), ok.