fix(emqx_bridge_iotdb): handle rule engine passed payload

also remove topic logic as it's duplicated functionality via rule engine
This commit is contained in:
Stefan Strigler 2023-05-31 15:57:22 +02:00
parent 6c0fb0e2ea
commit 1e4cee05df
2 changed files with 58 additions and 78 deletions

View File

@ -153,10 +153,17 @@ on_query_async(InstanceId, {send_message, Message}, ReplyFunAndArgs0, State) ->
%% Internal Functions
%%--------------------------------------------------------------------
make_parsed_payload(PayloadUnparsed) when is_binary(PayloadUnparsed) ->
emqx_utils_json:decode(PayloadUnparsed);
make_parsed_payload(PayloadUnparsed) when is_list(PayloadUnparsed) ->
lists:map(fun make_parsed_payload/1, PayloadUnparsed).
get_payload(#{payload := Payload}) ->
Payload;
get_payload(#{<<"payload">> := Payload}) ->
Payload.
parse_payload(ParsedPayload) when is_map(ParsedPayload) ->
ParsedPayload;
parse_payload(UnparsedPayload) when is_binary(UnparsedPayload) ->
emqx_utils_json:decode(UnparsedPayload);
parse_payload(UnparsedPayloads) when is_list(UnparsedPayloads) ->
lists:map(fun parse_payload/1, UnparsedPayloads).
preproc_data_list(DataList) ->
lists:foldl(
@ -297,16 +304,16 @@ convert_float(Str) when is_binary(Str) ->
convert_float(undefined) ->
null.
make_iotdb_insert_request(MessageUnparsedPayload, State) ->
Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
make_iotdb_insert_request(Message, State) ->
Payloads = to_list(parse_payload(get_payload(Message))),
IsAligned = maps:get(is_aligned, State, false),
DeviceId = device_id(Message, State),
IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
Payload = make_list(maps:get(payload, Message)),
case preproc_data_list(Payload) of
[] ->
case {device_id(Message, Payloads, State), preproc_data_list(Payloads)} of
{undefined, _} ->
{error, device_id_missing};
{_, []} ->
{error, invalid_data};
PreProcessedData ->
{DeviceId, PreProcessedData} ->
DataList = proc_data(PreProcessedData, Message),
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn),
@ -394,22 +401,14 @@ iotdb_field_key(data_types, ?VSN_1_0_X) ->
iotdb_field_key(data_types, ?VSN_0_13_X) ->
<<"dataTypes">>.
make_list(List) when is_list(List) -> List;
make_list(Data) -> [Data].
to_list(List) when is_list(List) -> List;
to_list(Data) -> [Data].
device_id(Message, State) ->
device_id(Message, Payloads, State) ->
case maps:get(device_id, State, undefined) of
undefined ->
case maps:get(payload, Message) of
#{<<"device_id">> := DeviceId} ->
DeviceId;
_NotFound ->
Topic = maps:get(topic, Message),
case re:replace(Topic, "/", ".", [global, {return, binary}]) of
<<"root.", _/binary>> = Device -> Device;
Device -> <<"root.", Device/binary>>
end
end;
%% [FIXME] there could be conflicting device-ids in the Payloads
maps:get(<<"device_id">>, hd(Payloads), undefined);
DeviceId ->
DeviceIdTkn = emqx_plugin_libs_rule:preproc_tmpl(DeviceId),
emqx_plugin_libs_rule:proc_tmpl(DeviceIdTkn, Message)

View File

@ -231,6 +231,11 @@ is_success_check({ok, 200, _, Body}) ->
is_code(Code, #{<<"code">> := Code}) -> true;
is_code(_, _) -> false.
is_error_check(Reason) ->
fun(Result) ->
?assertEqual({error, Reason}, Result)
end.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -364,33 +369,37 @@ t_sync_query_fail(Config) ->
end,
emqx_bridge_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, iotdb_bridge_on_query).
t_sync_query_badpayload(Config) ->
BadPayload = #{foo => bar},
IsSuccessCheck =
fun(Result) ->
?assertEqual({error, invalid_data}, Result)
end,
t_sync_device_id_missing(Config) ->
emqx_bridge_testlib:t_sync_query(
Config,
make_message_fun(iotdb_topic(Config), BadPayload),
IsSuccessCheck,
make_message_fun(iotdb_topic(Config), #{foo => bar}),
is_error_check(device_id_missing),
iotdb_bridge_on_query
),
ok.
).
t_async_query_badpayload(Config) ->
BadPayload = #{foo => bar},
IsSuccessCheck =
fun(Result) ->
?assertEqual({error, invalid_data}, Result)
end,
t_sync_invalid_data(Config) ->
emqx_bridge_testlib:t_sync_query(
Config,
make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
is_error_check(invalid_data),
iotdb_bridge_on_query
).
t_async_device_id_missing(Config) ->
emqx_bridge_testlib:t_async_query(
Config,
make_message_fun(iotdb_topic(Config), BadPayload),
IsSuccessCheck,
make_message_fun(iotdb_topic(Config), #{foo => bar}),
is_error_check(device_id_missing),
iotdb_bridge_on_query_async
),
ok.
).
t_async_invalid_data(Config) ->
emqx_bridge_testlib:t_async_query(
Config,
make_message_fun(iotdb_topic(Config), #{foo => bar, device_id => <<"root.sg27">>}),
is_error_check(invalid_data),
iotdb_bridge_on_query_async
).
t_create_via_http(Config) ->
emqx_bridge_testlib:t_create_via_http(Config).
@ -413,13 +422,10 @@ t_device_id(Config) ->
ConfiguredDevice = <<"root.someOtherDevice234">>,
DeviceId = <<"root.deviceFooBar123">>,
Topic = <<"some/random/topic">>,
TopicDevice = topic_to_iotdb_device(Topic),
iotdb_reset(Config, DeviceId),
iotdb_reset(Config, TopicDevice),
iotdb_reset(Config, ConfiguredDevice),
Payload1 = make_iotdb_payload(DeviceId, "test", "BOOLEAN", true),
MessageF1 = make_message_fun(Topic, Payload1),
?assertNotEqual(DeviceId, TopicDevice),
is_success_check(
emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF1()})
),
@ -427,29 +433,8 @@ t_device_id(Config) ->
ct:pal("device_id result: ~p", [emqx_utils_json:decode(Res1_1)]),
#{<<"values">> := Values1_1} = emqx_utils_json:decode(Res1_1),
?assertNot(is_empty(Values1_1)),
{ok, {{_, 200, _}, _, Res1_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>),
ct:pal("topic device result: ~p", [emqx_utils_json:decode(Res1_2)]),
#{<<"values">> := Values1_2} = emqx_utils_json:decode(Res1_2),
?assert(is_empty(Values1_2)),
%% test without device_id in message, taking it from topic
iotdb_reset(Config, DeviceId),
iotdb_reset(Config, TopicDevice),
iotdb_reset(Config, ConfiguredDevice),
Payload2 = maps:remove(device_id, make_iotdb_payload(DeviceId, "root", "BOOLEAN", true)),
MessageF2 = make_message_fun(Topic, Payload2),
is_success_check(
emqx_resource:simple_sync_query(ResourceId, {send_message, MessageF2()})
),
{ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
#{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1),
?assert(is_empty(Values2_1)),
{ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>),
#{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2),
?assertNot(is_empty(Values2_2)),
iotdb_reset(Config, DeviceId),
iotdb_reset(Config, TopicDevice),
iotdb_reset(Config, ConfiguredDevice),
%% reconfigure bridge with device_id
@ -461,20 +446,16 @@ t_device_id(Config) ->
),
%% even though we had a device_id in the message it's not being used
{ok, {{_, 200, _}, _, Res3_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
#{<<"values">> := Values3_1} = emqx_utils_json:decode(Res3_1),
?assert(is_empty(Values3_1)),
{ok, {{_, 200, _}, _, Res3_2}} = iotdb_query(Config, <<"select * from ", TopicDevice/binary>>),
#{<<"values">> := Values3_2} = emqx_utils_json:decode(Res3_2),
?assert(is_empty(Values3_2)),
{ok, {{_, 200, _}, _, Res3_3}} = iotdb_query(
{ok, {{_, 200, _}, _, Res2_1}} = iotdb_query(Config, <<"select * from ", DeviceId/binary>>),
#{<<"values">> := Values2_1} = emqx_utils_json:decode(Res2_1),
?assert(is_empty(Values2_1)),
{ok, {{_, 200, _}, _, Res2_2}} = iotdb_query(
Config, <<"select * from ", ConfiguredDevice/binary>>
),
#{<<"values">> := Values3_3} = emqx_utils_json:decode(Res3_3),
?assertNot(is_empty(Values3_3)),
#{<<"values">> := Values2_2} = emqx_utils_json:decode(Res2_2),
?assertNot(is_empty(Values2_2)),
iotdb_reset(Config, DeviceId),
iotdb_reset(Config, TopicDevice),
iotdb_reset(Config, ConfiguredDevice),
ok.