diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index cf195fc9f..cf53291b2 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -134,8 +134,11 @@ on_query( #{ producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate } = maps:get(ChannelID, Channels), - try to_record(PartitionKey, HRecordTemplate, Data) of - Record -> append_record(InstId, Producer, Record, false) + try + KeyAndRawRecord = to_key_and_raw_record(PartitionKey, HRecordTemplate, Data), + emqx_trace:rendered_action_template(ChannelID, #{record => KeyAndRawRecord}), + Record = to_record(KeyAndRawRecord), + append_record(InstId, Producer, Record, false) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -148,8 +151,13 @@ on_batch_query( #{ producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate } = maps:get(ChannelID, Channels), - try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of - Records -> append_record(InstId, Producer, Records, true) + try + KeyAndRawRecordList = to_multi_part_key_and_partition_key( + PartitionKey, HRecordTemplate, BatchList + ), + emqx_trace:rendered_action_template(ChannelID, #{records => KeyAndRawRecordList}), + Records = [to_record(Item) || Item <- KeyAndRawRecordList], + append_record(InstId, Producer, Records, true) catch _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE end. @@ -348,20 +356,20 @@ ensure_start_producer(ProducerName, ProducerOptions) -> produce_name(ActionId) -> list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)). -to_record(PartitionKeyTmpl, HRecordTmpl, Data) -> +to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data) -> PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data), RawRecord = emqx_placeholder:proc_tmpl(HRecordTmpl, Data), - to_record(PartitionKey, RawRecord). + #{partition_key => PartitionKey, raw_record => RawRecord}. -to_record(PartitionKey, RawRecord) when is_binary(PartitionKey) -> - to_record(binary_to_list(PartitionKey), RawRecord); -to_record(PartitionKey, RawRecord) -> +to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) when is_binary(PartitionKey) -> + to_record(#{partition_key => binary_to_list(PartitionKey), raw_record => RawRecord}); +to_record(#{partition_key := PartitionKey, raw_record := RawRecord}) -> hstreamdb:to_record(PartitionKey, raw, RawRecord). -to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> +to_multi_part_key_and_partition_key(PartitionKeyTmpl, HRecordTmpl, BatchList) -> lists:map( fun({_, Data}) -> - to_record(PartitionKeyTmpl, HRecordTmpl, Data) + to_key_and_raw_record(PartitionKeyTmpl, HRecordTmpl, Data) end, BatchList ).