fix(iotdb): move the `iot_version` into IoTDB connector

This commit is contained in:
firest 2024-01-12 17:52:31 +08:00
parent 9e85b53c39
commit 0f1aaa65bc
3 changed files with 41 additions and 32 deletions

View File

@ -89,14 +89,6 @@ fields(action_parameters) ->
desc => ?DESC("config_device_id") desc => ?DESC("config_device_id")
} }
)}, )},
{iotdb_version,
mk(
hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
#{
desc => ?DESC("config_iotdb_version"),
default => ?VSN_1_1_X
}
)},
{data, {data,
mk( mk(
array(ref(?MODULE, action_parameters_data)), array(ref(?MODULE, action_parameters_data)),
@ -310,8 +302,7 @@ action_values() ->
} }
], ],
is_aligned => false, is_aligned => false,
device_id => <<"my_device">>, device_id => <<"my_device">>
iotdb_version => ?VSN_1_1_X
} }
}. }.

View File

@ -47,6 +47,7 @@
connect_timeout := pos_integer(), connect_timeout := pos_integer(),
pool_type := random | hash, pool_type := random | hash,
pool_size := pos_integer(), pool_size := pos_integer(),
iotdb_version := atom(),
request => undefined | map(), request => undefined | map(),
atom() => _ atom() => _
}. }.
@ -57,6 +58,7 @@
connect_timeout := pos_integer(), connect_timeout := pos_integer(),
pool_type := random | hash, pool_type := random | hash,
channels := map(), channels := map(),
iotdb_version := atom(),
request => undefined | map(), request => undefined | map(),
atom() => _ atom() => _
}. }.
@ -88,6 +90,7 @@ connector_example_values() ->
name => <<"iotdb_connector">>, name => <<"iotdb_connector">>,
type => iotdb, type => iotdb,
enable => true, enable => true,
iotdb_version => ?VSN_1_1_X,
authentication => #{ authentication => #{
<<"username">> => <<"root">>, <<"username">> => <<"root">>,
<<"password">> => <<"*****">> <<"password">> => <<"*****">>
@ -121,6 +124,14 @@ fields("connection_fields") ->
desc => ?DESC(emqx_bridge_iotdb, "config_base_url") desc => ?DESC(emqx_bridge_iotdb, "config_base_url")
} }
)}, )},
{iotdb_version,
mk(
hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
#{
desc => ?DESC(emqx_bridge_iotdb, "iotdb_version"),
default => ?VSN_1_1_X
}
)},
{authentication, {authentication,
mk( mk(
hoconsc:union([ref(?MODULE, auth_basic)]), hoconsc:union([ref(?MODULE, auth_basic)]),
@ -190,7 +201,7 @@ proplists_without(Keys, List) ->
callback_mode() -> async_if_possible. callback_mode() -> async_if_possible.
-spec on_start(manager_id(), config()) -> {ok, state()} | no_return(). -spec on_start(manager_id(), config()) -> {ok, state()} | no_return().
on_start(InstanceId, Config) -> on_start(InstanceId, #{iotdb_version := Version} = Config) ->
%% [FIXME] The configuration passed in here is pre-processed and transformed %% [FIXME] The configuration passed in here is pre-processed and transformed
%% in emqx_bridge_resource:parse_confs/2. %% in emqx_bridge_resource:parse_confs/2.
case emqx_bridge_http_connector:on_start(InstanceId, Config) of case emqx_bridge_http_connector:on_start(InstanceId, Config) of
@ -201,7 +212,7 @@ on_start(InstanceId, Config) ->
request => maps:get(request, State, <<>>) request => maps:get(request, State, <<>>)
}), }),
?tp(iotdb_bridge_started, #{instance_id => InstanceId}), ?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
{ok, State#{channels => #{}}}; {ok, State#{iotdb_version => Version, channels => #{}}};
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "failed_to_start_iotdb_bridge", msg => "failed_to_start_iotdb_bridge",
@ -231,7 +242,11 @@ on_get_status(InstanceId, State) ->
{ok, pos_integer(), [term()], term()} {ok, pos_integer(), [term()], term()}
| {ok, pos_integer(), [term()]} | {ok, pos_integer(), [term()]}
| {error, term()}. | {error, term()}.
on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = State) -> on_query(
InstanceId,
{ChannelId, _Message} = Req,
#{iotdb_version := IoTDBVsn, channels := Channels} = State
) ->
?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}), ?tp(iotdb_bridge_on_query, #{instance_id => InstanceId}),
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "iotdb_bridge_on_query_called", msg => "iotdb_bridge_on_query_called",
@ -240,7 +255,7 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat
state => emqx_utils:redact(State) state => emqx_utils:redact(State)
}), }),
case try_render_message(Req, Channels) of case try_render_message(Req, IoTDBVsn, Channels) of
{ok, IoTDBPayload} -> {ok, IoTDBPayload} ->
handle_response( handle_response(
emqx_bridge_http_connector:on_query( emqx_bridge_http_connector:on_query(
@ -254,7 +269,10 @@ on_query(InstanceId, {ChannelId, _Message} = Req, #{channels := Channels} = Stat
-spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) -> -spec on_query_async(manager_id(), {send_message, map()}, {function(), [term()]}, state()) ->
{ok, pid()} | {error, empty_request}. {ok, pid()} | {error, empty_request}.
on_query_async( on_query_async(
InstanceId, {ChannelId, _Message} = Req, ReplyFunAndArgs0, #{channels := Channels} = State InstanceId,
{ChannelId, _Message} = Req,
ReplyFunAndArgs0,
#{iotdb_version := IoTDBVsn, channels := Channels} = State
) -> ) ->
?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}), ?tp(iotdb_bridge_on_query_async, #{instance_id => InstanceId}),
?SLOG(debug, #{ ?SLOG(debug, #{
@ -263,7 +281,7 @@ on_query_async(
send_message => Req, send_message => Req,
state => emqx_utils:redact(State) state => emqx_utils:redact(State)
}), }),
case try_render_message(Req, Channels) of case try_render_message(Req, IoTDBVsn, Channels) of
{ok, IoTDBPayload} -> {ok, IoTDBPayload} ->
ReplyFunAndArgs = ReplyFunAndArgs =
{ {
@ -282,10 +300,10 @@ on_query_async(
on_add_channel( on_add_channel(
InstanceId, InstanceId,
#{channels := Channels} = OldState0, #{iotdb_version := Version, channels := Channels} = OldState0,
ChannelId, ChannelId,
#{ #{
parameters := #{iotdb_version := Version, data := Data} = Parameter parameters := #{data := Data} = Parameter
} }
) -> ) ->
case maps:is_key(ChannelId, Channels) of case maps:is_key(ChannelId, Channels) of
@ -495,18 +513,18 @@ convert_float(Str) when is_binary(Str) ->
convert_float(undefined) -> convert_float(undefined) ->
null. null.
make_iotdb_insert_request(DataList, IsAligned, DeviceId, IotDBVsn) -> make_iotdb_insert_request(DataList, IsAligned, DeviceId, IoTDBVsn) ->
InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []}, InitAcc = #{timestamps => [], measurements => [], dtypes => [], values => []},
Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IotDBVsn), Rows = replace_dtypes(aggregate_rows(DataList, InitAcc), IoTDBVsn),
{ok, {ok,
maps:merge(Rows, #{ maps:merge(Rows, #{
iotdb_field_key(is_aligned, IotDBVsn) => IsAligned, iotdb_field_key(is_aligned, IoTDBVsn) => IsAligned,
iotdb_field_key(device_id, IotDBVsn) => DeviceId iotdb_field_key(device_id, IoTDBVsn) => DeviceId
})}. })}.
replace_dtypes(Rows0, IotDBVsn) -> replace_dtypes(Rows0, IoTDBVsn) ->
{Types, Rows} = maps:take(dtypes, Rows0), {Types, Rows} = maps:take(dtypes, Rows0),
Rows#{iotdb_field_key(data_types, IotDBVsn) => Types}. Rows#{iotdb_field_key(data_types, IoTDBVsn) => Types}.
aggregate_rows(DataList, InitAcc) -> aggregate_rows(DataList, InitAcc) ->
lists:foldr( lists:foldr(
@ -645,15 +663,15 @@ preproc_data_template(DataList) ->
DataList DataList
). ).
try_render_message({ChannelId, Msg}, Channels) -> try_render_message({ChannelId, Msg}, IoTDBVsn, Channels) ->
case maps:find(ChannelId, Channels) of case maps:find(ChannelId, Channels) of
{ok, Channel} -> {ok, Channel} ->
render_channel_message(Channel, Msg); render_channel_message(Channel, IoTDBVsn, Msg);
_ -> _ ->
{error, {unrecoverable_error, {invalid_channel_id, ChannelId}}} {error, {unrecoverable_error, {invalid_channel_id, ChannelId}}}
end. end.
render_channel_message(#{is_aligned := IsAligned, iotdb_version := IoTDBVsn} = Channel, Message) -> render_channel_message(#{is_aligned := IsAligned} = Channel, IoTDBVsn, Message) ->
Payloads = to_list(parse_payload(get_payload(Message))), Payloads = to_list(parse_payload(get_payload(Message))),
case device_id(Message, Payloads, Channel) of case device_id(Message, Payloads, Channel) of
undefined -> undefined ->

View File

@ -255,7 +255,6 @@ is_error_check(Reason) ->
end. end.
action_config(Name, Config) -> action_config(Name, Config) ->
Version = ?config(iotdb_version, Config),
Type = ?config(bridge_type, Config), Type = ?config(bridge_type, Config),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
@ -263,15 +262,13 @@ action_config(Name, Config) ->
" enable = true\n" " enable = true\n"
" connector = \"~s\"\n" " connector = \"~s\"\n"
" parameters = {\n" " parameters = {\n"
" iotdb_version = \"~s\"\n"
" data = []\n" " data = []\n"
" }\n" " }\n"
"}\n", "}\n",
[ [
Type, Type,
Name, Name,
Name, Name
Version
] ]
), ),
ct:pal("ActionConfig:~ts~n", [ConfigString]), ct:pal("ActionConfig:~ts~n", [ConfigString]),
@ -281,12 +278,14 @@ connector_config(Name, Config) ->
Host = ?config(bridge_host, Config), Host = ?config(bridge_host, Config),
Port = ?config(bridge_port, Config), Port = ?config(bridge_port, Config),
Type = ?config(bridge_type, Config), Type = ?config(bridge_type, Config),
Version = ?config(iotdb_version, Config),
ServerURL = iotdb_server_url(Host, Port), ServerURL = iotdb_server_url(Host, Port),
ConfigString = ConfigString =
io_lib:format( io_lib:format(
"connectors.~s.~s {\n" "connectors.~s.~s {\n"
" enable = true\n" " enable = true\n"
" base_url = \"~s\"\n" " base_url = \"~s\"\n"
" iotdb_version = \"~s\"\n"
" authentication = {\n" " authentication = {\n"
" username = \"root\"\n" " username = \"root\"\n"
" password = \"root\"\n" " password = \"root\"\n"
@ -295,7 +294,8 @@ connector_config(Name, Config) ->
[ [
Type, Type,
Name, Name,
ServerURL ServerURL,
Version
] ]
), ),
ct:pal("ConnectorConfig:~ts~n", [ConfigString]), ct:pal("ConnectorConfig:~ts~n", [ConfigString]),