feat: refactor HStreamDB bridge to connector and action

This commit also upgrades the hstreamdb_erl driver library and change
the action/bridge to use the new hstreamdb_erl.

Much of the code for the new API is copied from:
be1a1604dd/lib-ee/emqx_rule_actions/src/emqx_backend_hstreamdb_actions.erl

Fixes:
https://emqx.atlassian.net/browse/EMQX-11458
This commit is contained in:
Kjell Winblad 2024-02-12 12:42:16 +01:00
parent fa359246c1
commit a153d758c3
11 changed files with 629 additions and 206 deletions

View File

@ -10,7 +10,7 @@ CASSANDRA_TAG=3.11
MINIO_TAG=RELEASE.2023-03-20T20-16-18Z MINIO_TAG=RELEASE.2023-03-20T20-16-18Z
OPENTS_TAG=9aa7f88 OPENTS_TAG=9aa7f88
KINESIS_TAG=2.1 KINESIS_TAG=2.1
HSTREAMDB_TAG=v0.16.1 HSTREAMDB_TAG=v0.19.3
HSTREAMDB_ZK_TAG=3.8.1 HSTREAMDB_ZK_TAG=3.8.1
MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server

View File

@ -91,6 +91,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info, emqx_bridge_kafka_action_info,
emqx_bridge_kinesis_action_info, emqx_bridge_kinesis_action_info,
emqx_bridge_hstreamdb_action_info,
emqx_bridge_matrix_action_info, emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info, emqx_bridge_mongodb_action_info,
emqx_bridge_oracle_action_info, emqx_bridge_oracle_action_info,

View File

@ -3,7 +3,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{hstreamdb_erl, {hstreamdb_erl,
{git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.5+v0.16.1"}}}, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.5.18+v0.18.1"}}},
{emqx, {path, "../../apps/emqx"}}, {emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../../apps/emqx_utils"}} {emqx_utils, {path, "../../apps/emqx_utils"}}
]}. ]}.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_hstreamdb, [ {application, emqx_bridge_hstreamdb, [
{description, "EMQX Enterprise HStreamDB Bridge"}, {description, "EMQX Enterprise HStreamDB Bridge"},
{vsn, "0.1.3"}, {vsn, "0.1.4"},
{registered, []}, {registered, []},
{applications, [ {applications, [
kernel, kernel,
@ -8,7 +8,7 @@
emqx_resource, emqx_resource,
hstreamdb_erl hstreamdb_erl
]}, ]},
{env, []}, {env, [{emqx_action_info_modules, [emqx_bridge_hstreamdb_action_info]}]},
{modules, []}, {modules, []},
{links, []} {links, []}
]}. ]}.

View File

@ -6,10 +6,12 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]). -import(hoconsc, [mk/2, enum/1]).
-export([ -export([
conn_bridge_examples/1 conn_bridge_examples/1,
bridge_v2_examples/1,
connector_examples/1
]). ]).
-export([ -export([
@ -19,6 +21,11 @@
desc/1 desc/1
]). ]).
-define(CONNECTOR_TYPE, hstreamdb).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
-define(DEFAULT_GRPC_FLUSH_TIMEOUT_RAW, <<"10s">>).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% api %% api
@ -27,16 +34,16 @@ conn_bridge_examples(Method) ->
#{ #{
<<"hstreamdb">> => #{ <<"hstreamdb">> => #{
summary => <<"HStreamDB Bridge">>, summary => <<"HStreamDB Bridge">>,
value => values(Method) value => conn_bridge_example_values(Method)
} }
} }
]. ].
values(get) -> conn_bridge_example_values(get) ->
values(post); conn_bridge_example_values(post);
values(put) -> conn_bridge_example_values(put) ->
values(post); conn_bridge_example_values(post);
values(post) -> conn_bridge_example_values(post) ->
#{ #{
type => <<"hstreamdb">>, type => <<"hstreamdb">>,
name => <<"demo">>, name => <<"demo">>,
@ -55,15 +62,135 @@ values(post) ->
}, },
ssl => #{enable => false} ssl => #{enable => false}
}; };
values(_) -> conn_bridge_example_values(_) ->
#{}. #{}.
connector_examples(Method) ->
[
#{
<<"hstreamdb">> =>
#{
summary => <<"HStreamDB Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_values()
)
}
}
].
connector_values() ->
#{
<<"url">> => <<"http://127.0.0.1:6570">>,
<<"grpc_timeout">> => <<"30s">>,
<<"ssl">> =>
#{
<<"enable">> => false,
<<"verify">> => <<"verify_peer">>
},
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"15s">>,
<<"start_timeout">> => <<"5s">>
}
}.
bridge_v2_examples(Method) ->
[
#{
<<"hstreamdb">> =>
#{
summary => <<"HStreamDB Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{
<<"parameters">> => #{
<<"aggregation_pool_size">> => 8,
<<"partition_key">> => <<"hej">>,
<<"record_template">> => <<"${payload}">>,
<<"stream">> => <<"mqtt_message">>,
<<"writer_pool_size">> => 8
}
}.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions %% Hocon Schema Definitions
namespace() -> "bridge_hstreamdb". namespace() -> "bridge_hstreamdb".
roots() -> []. roots() -> [].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
Fields =
fields(connector_fields) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(hstreamdb_action));
fields(action) ->
{?ACTION_TYPE,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, hstreamdb_action)),
#{
desc => <<"HStreamDB Action Config">>,
required => false
}
)};
fields(hstreamdb_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
hoconsc:ref(?MODULE, action_parameters),
#{
required => true,
desc => ?DESC("action_parameters")
}
)
);
fields(action_parameters) ->
[
{stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})},
{partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})},
{grpc_flush_timeout, fun grpc_flush_timeout/1},
{record_template,
mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})},
{aggregation_pool_size,
mk(integer(), #{default => 8, desc => ?DESC("aggregation_pool_size")})},
{max_batches, mk(integer(), #{default => 500, desc => ?DESC("max_batches")})},
{writer_pool_size, mk(integer(), #{default => 8, desc => ?DESC("writer_pool_size")})},
{batch_size, mk(integer(), #{default => 100, desc => ?DESC("batch_size")})},
{batch_interval,
mk(emqx_schema:timeout_duration_ms(), #{
default => <<"500ms">>, desc => ?DESC("batch_interval")
})}
];
fields(connector_fields) ->
[
{url,
mk(binary(), #{
required => true, desc => ?DESC("url"), default => <<"http://127.0.0.1:6570">>
})},
{grpc_timeout, fun grpc_timeout/1}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
fields(connector_fields) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("config") -> fields("config") ->
hstream_bridge_common_fields() ++ hstream_bridge_common_fields() ++
connector_fields(); connector_fields();
@ -80,6 +207,18 @@ fields("put") ->
hstream_bridge_common_fields() ++ hstream_bridge_common_fields() ++
connector_fields(). connector_fields().
grpc_timeout(type) -> emqx_schema:timeout_duration_ms();
grpc_timeout(desc) -> ?DESC("grpc_timeout");
grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW;
grpc_timeout(required) -> false;
grpc_timeout(_) -> undefined.
grpc_flush_timeout(type) -> emqx_schema:timeout_duration_ms();
grpc_flush_timeout(desc) -> ?DESC("grpc_timeout");
grpc_flush_timeout(default) -> ?DEFAULT_GRPC_FLUSH_TIMEOUT_RAW;
grpc_flush_timeout(required) -> false;
grpc_flush_timeout(_) -> undefined.
hstream_bridge_common_fields() -> hstream_bridge_common_fields() ->
emqx_bridge_schema:common_bridge_fields() ++ emqx_bridge_schema:common_bridge_fields() ++
[ [

View File

@ -0,0 +1,88 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_hstreamdb_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
bridge_v1_config_to_connector_config/1,
bridge_v1_config_to_action_config/2,
connector_action_config_to_bridge_v1_config/2
]).
bridge_v1_type_name() -> hstreamdb.
action_type_name() -> hstreamdb.
connector_type_name() -> hstreamdb.
schema_module() -> emqx_bridge_hstreamdb.
bridge_v1_config_to_connector_config(BridgeV1Conf) ->
ConnectorSchema = emqx_bridge_hstreamdb:fields(connector_fields),
ConnectorAtomKeys = lists:foldl(fun({K, _}, Acc) -> [K | Acc] end, [], ConnectorSchema),
ConnectorBinKeys = [atom_to_binary(K) || K <- ConnectorAtomKeys] ++ [<<"resource_opts">>],
ConnectorConfig0 = maps:with(ConnectorBinKeys, BridgeV1Conf),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
ConnectorConfig0
).
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, emqx_bridge_hstreamdb, "config_connector"
),
%% Remove fields no longer relevant for the action
Config1 = lists:foldl(
fun(Field, Acc) ->
emqx_utils_maps:deep_remove(Field, Acc)
end,
Config0,
[
[<<"parameters">>, <<"pool_size">>],
[<<"parameters">>, <<"direction">>]
]
),
%% Move pool_size to aggregation_pool_size and writer_pool_size
PoolSize = maps:get(<<"pool_size">>, BridgeV1Conf, 8),
Config2 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"aggregation_pool_size">>],
Config1,
PoolSize
),
Config3 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"writer_pool_size">>],
Config2,
PoolSize
),
Config3.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
BridgeV1Config3 = maps:remove(<<"parameters">>, BridgeV1Config2),
%% Pick out pool_size from aggregation_pool_size
PoolSize = emqx_utils_maps:deep_get(
[<<"parameters">>, <<"aggregation_pool_size">>], ActionConfig, 8
),
BridgeV1Config4 = maps:put(<<"pool_size">>, PoolSize, BridgeV1Config3),
%% Move the fields stream, partition_key and record_template from parameters in ActionConfig to the top level in BridgeV1Config
lists:foldl(
fun(Field, Acc) ->
emqx_utils_maps:deep_put(
[Field],
Acc,
emqx_utils_maps:deep_get([<<"parameters">>, Field], ActionConfig, <<>>)
)
end,
BridgeV1Config4,
[<<"stream">>, <<"partition_key">>, <<"record_template">>]
).

View File

@ -7,8 +7,9 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-import(hoconsc, [mk/2, enum/1]). -import(hoconsc, [mk/2]).
-behaviour(emqx_resource). -behaviour(emqx_resource).
@ -19,7 +20,11 @@
on_stop/2, on_stop/2,
on_query/3, on_query/3,
on_batch_query/3, on_batch_query/3,
on_get_status/2 on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]). ]).
-export([ -export([
@ -38,67 +43,132 @@
-define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)). -define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)).
-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>). -define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
-define(DEFAULT_GRPC_FLUSH_TIMEOUT, 10000).
-define(DEFAULT_MAX_BATCHES, 500).
-define(DEFAULT_BATCH_INTERVAL, 500).
-define(DEFAULT_AGG_POOL_SIZE, 8).
-define(DEFAULT_WRITER_POOL_SIZE, 8).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% resource callback %% resource callback
callback_mode() -> always_sync. callback_mode() -> always_sync.
on_start(InstId, Config) -> on_start(InstId, Config) ->
start_client(InstId, Config). try
do_on_start(InstId, Config)
catch
E:R:S ->
Error = #{
msg => "start_hstreamdb_connector_error",
connector => InstId,
error => E,
reason => R,
stack => S
},
?SLOG(error, Error),
{error, Error}
end.
on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels,
client_options := ClientOptions
} = OldState,
ChannelId,
ChannelConfig
) ->
%{ok, ChannelState} = create_channel_state(ChannelId, PoolName, ChannelConfig),
Parameters0 = maps:get(parameters, ChannelConfig),
Parameters = Parameters0#{client_options => ClientOptions},
PartitionKey = emqx_placeholder:preproc_tmpl(maps:get(partition_key, Parameters, <<"">>)),
try
ChannelState = #{
producer => start_producer(ChannelId, Parameters),
record_template => record_template(Parameters),
partition_key => PartitionKey
},
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}
catch
Error:Reason:Stack ->
{error, {Error, Reason, Stack}}
end.
on_remove_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId
) ->
#{
producer := Producer
} = maps:get(ChannelId, InstalledChannels),
_ = hstreamdb:stop_producer(Producer),
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
on_get_channel_status(
_ResId,
_ChannelId,
_State
) ->
?status_connected.
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_stop(InstId, _State) -> on_stop(InstId, _State) ->
case emqx_resource:get_allocated_resources(InstId) of ?tp(
#{?hstreamdb_client := #{client := Client, producer := Producer}} -> hstreamdb_connector_on_stop,
StopClientRes = hstreamdb:stop_client(Client), #{instance_id => InstId}
StopProducerRes = hstreamdb:stop_producer(Producer), ).
?SLOG(info, #{
msg => "stop_hstreamdb_connector",
connector => InstId,
client => Client,
producer => Producer,
stop_client => StopClientRes,
stop_producer => StopProducerRes
});
_ ->
ok
end.
-define(FAILED_TO_APPLY_HRECORD_TEMPLATE, -define(FAILED_TO_APPLY_HRECORD_TEMPLATE,
{error, {unrecoverable_error, failed_to_apply_hrecord_template}} {error, {unrecoverable_error, failed_to_apply_hrecord_template}}
). ).
on_query( on_query(
_InstId, InstId,
{send_message, Data}, {ChannelID, Data},
_State = #{ #{installed_channels := Channels} = _State
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
}
) -> ) ->
#{
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
} = maps:get(ChannelID, Channels),
try to_record(PartitionKey, HRecordTemplate, Data) of try to_record(PartitionKey, HRecordTemplate, Data) of
Record -> append_record(Producer, Record, false) Record -> append_record(InstId, Producer, Record, false)
catch catch
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
end. end.
on_batch_query( on_batch_query(
_InstId, InstId,
BatchList, [{ChannelID, _Data} | _] = BatchList,
_State = #{ #{installed_channels := Channels} = _State
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
}
) -> ) ->
#{
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
} = maps:get(ChannelID, Channels),
try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of
Records -> append_record(Producer, Records, true) Records -> append_record(InstId, Producer, Records, true)
catch catch
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE _:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
end. end.
on_get_status(_InstId, #{client := Client}) -> on_get_status(_InstId, State) ->
case is_alive(Client) of case check_status(State) of
true -> ok ->
connected; ?status_connected;
false -> Error ->
disconnected %% We set it to ?status_connecting so that the channels are not deleted.
%% The producers in the channels contains buffers so we don't want to delete them.
{?status_connecting, State, Error}
end. end.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
@ -140,142 +210,149 @@ desc(config) ->
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% internal functions %% internal functions
start_client(InstId, Config) ->
try
do_start_client(InstId, Config)
catch
E:R:S ->
Error = #{
msg => "start_hstreamdb_connector_error",
connector => InstId,
error => E,
reason => R,
stack => S
},
?SLOG(error, Error),
{error, Error}
end.
do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize, ssl := SSL}) -> do_on_start(InstId, Config) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting_hstreamdb_connector_client", msg => "starting_hstreamdb_connector_client",
connector => InstId, connector => InstId,
config => Config config => Config
}), }),
ClientName = client_name(InstId), {ok, _} = application:ensure_all_started(hstreamdb_erl),
RpcOpts = ClientOptions = client_options(Config),
case maps:get(enable, SSL) of State = #{
false -> client_options => ClientOptions,
#{pool_size => PoolSize}; installed_channels => #{}
true -> },
#{ case check_status(State) of
pool_size => PoolSize, ok ->
gun_opts => #{
transport => tls,
transport_opts => emqx_tls_lib:to_client_opts(SSL)
}
}
end,
ClientOptions = [
{url, binary_to_list(Server)},
{rpc_options, RpcOpts}
],
case hstreamdb:start_client(ClientName, ClientOptions) of
{ok, Client} ->
case is_alive(Client) of
true ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "hstreamdb_connector_client_started", msg => "hstreamdb_connector_client_started",
connector => InstId, connector => InstId
client => Client
}), }),
start_producer(InstId, Client, Config); {ok, State};
_ -> Error ->
?tp( ?tp(
hstreamdb_connector_start_failed, hstreamdb_connector_start_failed,
#{error => client_not_alive} #{error => client_not_alive}
), ),
?SLOG(error, #{ ?SLOG(error, #{
msg => "hstreamdb_connector_client_not_alive", msg => "hstreamdb_connector_client_not_alive",
connector => InstId
}),
{error, connect_failed}
end;
{error, {already_started, Pid}} ->
?SLOG(info, #{
msg => "starting_hstreamdb_connector_client_find_old_client_restart_client",
old_client_pid => Pid,
old_client_name => ClientName
}),
_ = hstreamdb:stop_client(ClientName),
start_client(InstId, Config);
{error, Error} ->
?SLOG(error, #{
msg => "hstreamdb_connector_client_failed",
connector => InstId, connector => InstId,
reason => Error error => Error
}), }),
{error, {connect_failed, Error}}
end.
client_options(Config = #{url := ServerURL, ssl := SSL}) ->
GRPCTimeout = maps:get(<<"grpc_timeout">>, Config, ?DEFAULT_GRPC_TIMEOUT),
EnableSSL = maps:get(enable, SSL),
RpcOpts =
case EnableSSL of
false ->
#{pool_size => 1};
true ->
#{
pool_size => 1,
gun_opts => #{
transport => tls,
transport_opts =>
emqx_tls_lib:to_client_opts(SSL)
}
}
end,
ClientOptions = #{
url => to_string(ServerURL),
grpc_timeout => GRPCTimeout,
rpc_options => RpcOpts
},
ClientOptions.
check_status(ConnectorState) ->
try start_client(ConnectorState) of
{ok, Client} ->
try hstreamdb_client:echo(Client) of
ok -> ok;
{error, _} = ErrorEcho -> ErrorEcho
after
_ = hstreamdb:stop_client(Client)
end;
{error, _} = StartClientError ->
StartClientError
catch
ErrorType:Reason:_ST ->
{error, {ErrorType, Reason}}
end.
start_client(Opts) ->
ClientOptions = maps:get(client_options, Opts),
case hstreamdb:start_client(ClientOptions) of
{ok, Client} ->
{ok, Client};
{error, Error} ->
{error, Error} {error, Error}
end. end.
is_alive(Client) ->
hstreamdb_client:echo(Client) =:= ok.
start_producer( start_producer(
InstId, ActionId,
Client, #{
Options = #{stream := Stream, pool_size := PoolSize} stream := Stream,
batch_size := BatchSize,
batch_interval := Interval
} = Opts
) -> ) ->
%% TODO: change these batch options after we have better disk cache. MaxBatches = maps:get(max_batches, Opts, ?DEFAULT_MAX_BATCHES),
BatchSize = maps:get(batch_size, Options, 100), AggPoolSize = maps:get(aggregation_pool_size, Opts, ?DEFAULT_AGG_POOL_SIZE),
Interval = maps:get(batch_interval, Options, 1000), WriterPoolSize = maps:get(writer_pool_size, Opts, ?DEFAULT_WRITER_POOL_SIZE),
ProducerOptions = [ GRPCTimeout = maps:get(grpc_flush_timeout, Opts, ?DEFAULT_GRPC_FLUSH_TIMEOUT),
{stream, Stream}, ClientOptions = maps:get(client_options, Opts),
{callback, {?MODULE, on_flush_result, []}}, ProducerOptions = #{
{max_records, BatchSize}, stream => to_string(Stream),
{interval, Interval}, buffer_options => #{
{pool_size, PoolSize}, interval => Interval,
{grpc_timeout, maps:get(grpc_timeout, Options, ?DEFAULT_GRPC_TIMEOUT)} callback => {?MODULE, on_flush_result, [ActionId]},
], max_records => BatchSize,
Name = produce_name(InstId), max_batches => MaxBatches
?SLOG(info, #{
msg => "starting_hstreamdb_connector_producer",
connector => InstId
}),
case hstreamdb:start_producer(Client, Name, ProducerOptions) of
{ok, Producer} ->
?SLOG(info, #{
msg => "hstreamdb_connector_producer_started"
}),
State = #{
client => Client,
producer => Producer,
enable_batch => maps:get(enable_batch, Options, false),
partition_key => emqx_placeholder:preproc_tmpl(
maps:get(partition_key, Options, <<"">>)
),
record_template => record_template(Options)
}, },
ok = emqx_resource:allocate_resource(InstId, ?hstreamdb_client, #{ buffer_pool_size => AggPoolSize,
client => Client, producer => Producer writer_options => #{
}), grpc_timeout => GRPCTimeout
{ok, State}; },
{error, {already_started, Pid}} -> writer_pool_size => WriterPoolSize,
?SLOG(info, #{ client_options => ClientOptions
msg => },
"starting_hstreamdb_connector_producer_find_old_producer_restart_producer", Name = produce_name(ActionId),
old_producer_pid => Pid, ensure_start_producer(Name, ProducerOptions).
old_producer_name => Name
}), ensure_start_producer(ProducerName, ProducerOptions) ->
_ = hstreamdb:stop_producer(Name), case hstreamdb:start_producer(ProducerName, ProducerOptions) of
start_producer(InstId, Client, Options); ok ->
ok;
{error, {already_started, _Pid}} ->
%% HStreamDB producer already started, restart it
_ = hstreamdb:stop_producer(ProducerName),
%% the pool might have been leaked after relup
_ = ecpool:stop_sup_pool(ProducerName),
ok = hstreamdb:start_producer(ProducerName, ProducerOptions);
{error, {
{shutdown,
{failed_to_start_child, {pool_sup, Pool},
{shutdown,
{failed_to_start_child, worker_sup,
{shutdown, {failed_to_start_child, _, {badarg, _}}}}}}},
_
}} ->
%% HStreamDB producer was not properly cleared, restart it
%% the badarg error in gproc maybe caused by the pool is leaked after relup
_ = ecpool:stop_sup_pool(Pool),
ok = hstreamdb:start_producer(ProducerName, ProducerOptions);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ %% HStreamDB start producer failed
msg => "starting_hstreamdb_connector_producer_failed", throw({start_producer_failed, Reason})
reason => Reason end,
}), ProducerName.
{error, Reason}
end. produce_name(ActionId) ->
list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)).
to_record(PartitionKeyTmpl, HRecordTmpl, Data) -> to_record(PartitionKeyTmpl, HRecordTmpl, Data) ->
PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data), PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data),
@ -289,43 +366,46 @@ to_record(PartitionKey, RawRecord) ->
to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) -> to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
lists:map( lists:map(
fun({send_message, Data}) -> fun({_, Data}) ->
to_record(PartitionKeyTmpl, HRecordTmpl, Data) to_record(PartitionKeyTmpl, HRecordTmpl, Data)
end, end,
BatchList BatchList
). ).
append_record(Producer, MultiPartsRecords, MaybeBatch) when is_list(MultiPartsRecords) -> append_record(ResourceId, Producer, MultiPartsRecords, MaybeBatch) when
is_list(MultiPartsRecords)
->
lists:foreach( lists:foreach(
fun(Record) -> append_record(Producer, Record, MaybeBatch) end, MultiPartsRecords fun(Record) -> append_record(ResourceId, Producer, Record, MaybeBatch) end,
MultiPartsRecords
); );
append_record(Producer, Record, MaybeBatch) when is_tuple(Record) -> append_record(ResourceId, Producer, Record, MaybeBatch) when is_tuple(Record) ->
do_append_records(Producer, Record, MaybeBatch). do_append_records(ResourceId, Producer, Record, MaybeBatch).
%% TODO: only sync request supported. implement async request later. %% TODO: only sync request supported. implement async request later.
do_append_records(Producer, Record, true = IsBatch) -> do_append_records(ResourceId, Producer, Record, true = IsBatch) ->
Result = hstreamdb:append(Producer, Record), Result = hstreamdb:append(Producer, Record),
handle_result(Result, Record, IsBatch); handle_result(ResourceId, Result, Record, IsBatch);
do_append_records(Producer, Record, false = IsBatch) -> do_append_records(ResourceId, Producer, Record, false = IsBatch) ->
Result = hstreamdb:append_flush(Producer, Record), Result = hstreamdb:append_flush(Producer, Record),
handle_result(Result, Record, IsBatch). handle_result(ResourceId, Result, Record, IsBatch).
handle_result(ok = Result, Record, IsBatch) -> handle_result(ResourceId, ok = Result, Record, IsBatch) ->
handle_result({ok, Result}, Record, IsBatch); handle_result(ResourceId, {ok, Result}, Record, IsBatch);
handle_result({ok, Result}, Record, IsBatch) -> handle_result(ResourceId, {ok, Result}, Record, IsBatch) ->
?tp( ?tp(
hstreamdb_connector_query_append_return, hstreamdb_connector_query_append_return,
#{result => Result, is_batch => IsBatch} #{result => Result, is_batch => IsBatch, instance_id => ResourceId}
), ),
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "hstreamdb_producer_sync_append_success", msg => "hstreamdb_producer_sync_append_success",
record => Record, record => Record,
is_batch => IsBatch is_batch => IsBatch
}); });
handle_result({error, Reason} = Err, Record, IsBatch) -> handle_result(ResourceId, {error, Reason} = Err, Record, IsBatch) ->
?tp( ?tp(
hstreamdb_connector_query_append_return, hstreamdb_connector_query_append_return,
#{error => Reason, is_batch => IsBatch} #{error => Reason, is_batch => IsBatch, instance_id => ResourceId}
), ),
?SLOG(error, #{ ?SLOG(error, #{
msg => "hstreamdb_producer_sync_append_failed", msg => "hstreamdb_producer_sync_append_failed",
@ -335,12 +415,6 @@ handle_result({error, Reason} = Err, Record, IsBatch) ->
}), }),
Err. Err.
client_name(InstId) ->
"client:" ++ to_string(InstId).
produce_name(ActionId) ->
list_to_atom("producer:" ++ to_string(ActionId)).
record_template(#{record_template := RawHRecordTemplate}) -> record_template(#{record_template := RawHRecordTemplate}) ->
emqx_placeholder:preproc_tmpl(RawHRecordTemplate); emqx_placeholder:preproc_tmpl(RawHRecordTemplate);
record_template(_) -> record_template(_) ->

View File

@ -117,16 +117,21 @@ end_per_suite(_Config) ->
ok. ok.
init_per_testcase(t_to_hrecord_failed, Config) -> init_per_testcase(t_to_hrecord_failed, Config) ->
init_per_testcase_common(),
meck:new([hstreamdb], [passthrough, no_history, no_link]), meck:new([hstreamdb], [passthrough, no_history, no_link]),
meck:expect(hstreamdb, to_record, fun(_, _, _) -> error(trans_to_hrecord_failed) end), meck:expect(hstreamdb, to_record, fun(_, _, _) -> error(trans_to_hrecord_failed) end),
Config; Config;
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
init_per_testcase_common(),
%% drop stream and will create a new one in common_init/1 %% drop stream and will create a new one in common_init/1
%% TODO: create a new stream for each test case %% TODO: create a new stream for each test case
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(), snabbkaffe:start_trace(),
Config. Config.
init_per_testcase_common() ->
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors().
end_per_testcase(t_to_hrecord_failed, _Config) -> end_per_testcase(t_to_hrecord_failed, _Config) ->
meck:unload([hstreamdb]); meck:unload([hstreamdb]);
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
@ -301,7 +306,10 @@ t_simple_query(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Requests = gen_batch_req(BatchSize), Type = ?config(hstreamdb_bridge_type, Config),
Name = ?config(hstreamdb_name, Config),
ActionId = emqx_bridge_v2:id(Type, Name),
Requests = gen_batch_req(BatchSize, ActionId),
?check_trace( ?check_trace(
begin begin
?wait_async_action( ?wait_async_action(
@ -351,6 +359,24 @@ t_to_hrecord_failed(Config) ->
end, end,
ok. ok.
%% Connector Action Tests
t_action_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}).
t_action_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config).
t_action_sync_query(Config) ->
MakeMessageFun = fun() -> rand_data() end,
IsSuccessCheck = fun(Result) -> ?assertEqual(ok, Result) end,
TracePoint = hstreamdb_connector_query_append_return,
emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint).
t_action_start_stop(Config) ->
StopTracePoint = hstreamdb_connector_on_stop,
emqx_bridge_v2_testlib:t_start_stop(Config, StopTracePoint).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper fns %% Helper fns
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -362,6 +388,10 @@ common_init(ConfigT) ->
URL = "http://" ++ Host ++ ":" ++ RawPort, URL = "http://" ++ Host ++ ":" ++ RawPort,
Config0 = [ Config0 = [
{bridge_type, <<"hstreamdb">>},
{bridge_name, <<"my_hstreamdb_action">>},
{connector_type, <<"hstreamdb">>},
{connector_name, <<"my_hstreamdb_connector">>},
{hstreamdb_host, Host}, {hstreamdb_host, Host},
{hstreamdb_port, Port}, {hstreamdb_port, Port},
{hstreamdb_url, URL}, {hstreamdb_url, URL},
@ -393,6 +423,8 @@ common_init(ConfigT) ->
{hstreamdb_config, HStreamDBConf}, {hstreamdb_config, HStreamDBConf},
{hstreamdb_bridge_type, BridgeType}, {hstreamdb_bridge_type, BridgeType},
{hstreamdb_name, Name}, {hstreamdb_name, Name},
{bridge_config, action_config(Config0)},
{connector_config, connector_config(Config0)},
{proxy_host, ProxyHost}, {proxy_host, ProxyHost},
{proxy_port, ProxyPort} {proxy_port, ProxyPort}
| Config0 | Config0
@ -424,7 +456,7 @@ hstreamdb_config(BridgeType, Config) ->
" resource_opts = {\n" " resource_opts = {\n"
%% always sync %% always sync
" query_mode = sync\n" " query_mode = sync\n"
" request_ttl = 500ms\n" " request_ttl = 10000ms\n"
" batch_size = ~b\n" " batch_size = ~b\n"
" worker_pool_size = ~b\n" " worker_pool_size = ~b\n"
" }\n" " }\n"
@ -443,6 +475,45 @@ hstreamdb_config(BridgeType, Config) ->
), ),
{Name, parse_and_check(ConfigString, BridgeType, Name)}. {Name, parse_and_check(ConfigString, BridgeType, Name)}.
action_config(Config) ->
ConnectorName = ?config(connector_name, Config),
BatchSize = batch_size(Config),
#{
<<"connector">> => ConnectorName,
<<"enable">> => true,
<<"parameters">> =>
#{
<<"aggregation_pool_size">> => ?POOL_SIZE,
<<"record_template">> => ?RECORD_TEMPLATE,
<<"stream">> => ?STREAM,
<<"writer_pool_size">> => ?POOL_SIZE
},
<<"resource_opts">> =>
#{
<<"batch_size">> => BatchSize,
<<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"45s">>,
<<"worker_pool_size">> => ?POOL_SIZE
}
}.
connector_config(Config) ->
Port = integer_to_list(?config(hstreamdb_port, Config)),
URL = "http://" ++ ?config(hstreamdb_host, Config) ++ ":" ++ Port,
#{
<<"url">> => URL,
<<"ssl">> =>
#{<<"enable">> => false, <<"verify">> => <<"verify_peer">>},
<<"grpc_timeout">> => <<"30s">>,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>,
<<"start_timeout">> => <<"5s">>
}
}.
parse_and_check(ConfigString, BridgeType, Name) -> parse_and_check(ConfigString, BridgeType, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}), {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
@ -454,10 +525,10 @@ parse_and_check(ConfigString, BridgeType, Name) ->
-define(CONN_ATTEMPTS, 10). -define(CONN_ATTEMPTS, 10).
default_options(Config) -> default_options(Config) ->
[ #{
{url, ?config(hstreamdb_url, Config)}, url => ?config(hstreamdb_url, Config),
{rpc_options, ?RPC_OPTIONS} rpc_options => ?RPC_OPTIONS
]. }.
connect_direct_hstream(Name, Config) -> connect_direct_hstream(Name, Config) ->
client(Name, Config, ?CONN_ATTEMPTS). client(Name, Config, ?CONN_ATTEMPTS).
@ -511,8 +582,9 @@ send_message(Config, Data) ->
query_resource(Config, Request) -> query_resource(Config, Request) ->
Name = ?config(hstreamdb_name, Config), Name = ?config(hstreamdb_name, Config),
BridgeType = ?config(hstreamdb_bridge_type, Config), BridgeType = ?config(hstreamdb_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ID = emqx_bridge_v2:id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). ResID = emqx_connector_resource:resource_id(BridgeType, Name),
emqx_resource:query(ID, Request, #{timeout => 1_000, connector_resource_id => ResID}).
restart_resource(Config) -> restart_resource(Config) ->
BridgeName = ?config(hstreamdb_name, Config), BridgeName = ?config(hstreamdb_name, Config),
@ -526,8 +598,16 @@ resource_id(Config) ->
BridgeType = ?config(hstreamdb_bridge_type, Config), BridgeType = ?config(hstreamdb_bridge_type, Config),
_ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName). _ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName).
action_id(Config) ->
ActionName = ?config(hstreamdb_name, Config),
ActionType = ?config(hstreamdb_bridge_type, Config),
_ActionID = emqx_bridge_v2:id(ActionType, ActionName).
health_check_resource_ok(Config) -> health_check_resource_ok(Config) ->
?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))). ?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))),
ActionName = ?config(hstreamdb_name, Config),
ActionType = ?config(hstreamdb_bridge_type, Config),
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(ActionType, ActionName)).
health_check_resource_down(Config) -> health_check_resource_down(Config) ->
case emqx_resource_manager:health_check(resource_id(Config)) of case emqx_resource_manager:health_check(resource_id(Config)) of
@ -539,6 +619,19 @@ health_check_resource_down(Config) ->
?assert( ?assert(
false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other])) false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other]))
) )
end,
ActionName = ?config(hstreamdb_name, Config),
ActionType = ?config(hstreamdb_bridge_type, Config),
#{status := StatusV2} = emqx_bridge_v2:health_check(ActionType, ActionName),
case StatusV2 of
disconnected ->
ok;
connecting ->
ok;
OtherV2 ->
?assert(
false, lists:flatten(io_lib:format("invalid health check result:~p~n", [OtherV2]))
)
end. end.
% These funs start and then stop the hstreamdb connection % These funs start and then stop the hstreamdb connection
@ -548,22 +641,36 @@ connect_and_create_stream(Config) ->
Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT
) )
), ),
%% force write to stream to make it created and ready to be written data for rest cases %% force write to stream to make it created and ready to be written data for test cases
ProducerOptions = [ ProducerOptions = #{
{pool_size, 4}, stream => ?STREAM,
{stream, ?STREAM}, buffer_options => #{
{callback, fun(_) -> ok end}, interval => 1000,
{max_records, 10}, callback => {?MODULE, on_flush_result, [<<"WHAT">>]},
{interval, 1000} max_records => 1,
], max_batches => 1
},
buffer_pool_size => 1,
writer_options => #{
grpc_timeout => 100
},
writer_pool_size => 1,
client_options => default_options(Config)
},
?WITH_CLIENT( ?WITH_CLIENT(
begin begin
{ok, Producer} = hstreamdb:start_producer(Client, test_producer, ProducerOptions), ok = hstreamdb:start_producer(test_producer, ProducerOptions),
_ = hstreamdb:append_flush(Producer, hstreamdb:to_record([], raw, rand_payload())), _ = hstreamdb:append_flush(test_producer, hstreamdb:to_record([], raw, rand_payload())),
_ = hstreamdb:stop_producer(Producer) _ = hstreamdb:stop_producer(test_producer)
end end
). ).
on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) ->
ok;
on_flush_result({{flush, _Stream, _Records}, {error, _Reason}}) ->
ok.
connect_and_delete_stream(Config) -> connect_and_delete_stream(Config) ->
?WITH_CLIENT( ?WITH_CLIENT(
_ = hstreamdb_client:delete_stream(Client, ?STREAM) _ = hstreamdb_client:delete_stream(Client, ?STREAM)
@ -593,11 +700,11 @@ rand_payload() ->
temperature => rand:uniform(40), humidity => rand:uniform(100) temperature => rand:uniform(40), humidity => rand:uniform(100)
}). }).
gen_batch_req(Count) when gen_batch_req(Count, ActionId) when
is_integer(Count) andalso Count > 0 is_integer(Count) andalso Count > 0
-> ->
[{send_message, rand_data()} || _Val <- lists:seq(1, Count)]; [{ActionId, rand_data()} || _Val <- lists:seq(1, Count)];
gen_batch_req(Count) -> gen_batch_req(Count, _ActionId) ->
ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]). ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]).
str(List) when is_list(List) -> str(List) when is_list(List) ->

View File

@ -28,6 +28,8 @@ resource_type(confluent_producer) ->
emqx_bridge_kafka_impl_producer; emqx_bridge_kafka_impl_producer;
resource_type(gcp_pubsub_producer) -> resource_type(gcp_pubsub_producer) ->
emqx_bridge_gcp_pubsub_impl_producer; emqx_bridge_gcp_pubsub_impl_producer;
resource_type(hstreamdb) ->
emqx_bridge_hstreamdb_connector;
resource_type(kafka_producer) -> resource_type(kafka_producer) ->
emqx_bridge_kafka_impl_producer; emqx_bridge_kafka_impl_producer;
resource_type(kinesis) -> resource_type(kinesis) ->
@ -122,6 +124,14 @@ connector_structs() ->
required => false required => false
} }
)}, )},
{hstreamdb,
mk(
hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config_connector")),
#{
desc => <<"HStreamDB Connector Config">>,
required => false
}
)},
{kafka_producer, {kafka_producer,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")), hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
@ -298,6 +308,7 @@ schema_modules() ->
emqx_bridge_azure_event_hub, emqx_bridge_azure_event_hub,
emqx_bridge_confluent_producer, emqx_bridge_confluent_producer,
emqx_bridge_gcp_pubsub_producer_schema, emqx_bridge_gcp_pubsub_producer_schema,
emqx_bridge_hstreamdb,
emqx_bridge_kafka, emqx_bridge_kafka,
emqx_bridge_kinesis, emqx_bridge_kinesis,
emqx_bridge_matrix, emqx_bridge_matrix,
@ -336,6 +347,7 @@ api_schemas(Method) ->
<<"gcp_pubsub_producer">>, <<"gcp_pubsub_producer">>,
Method ++ "_connector" Method ++ "_connector"
), ),
api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method ++ "_connector"),
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"), api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"),
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),

View File

@ -128,6 +128,8 @@ connector_type_to_bridge_types(confluent_producer) ->
[confluent_producer]; [confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) -> connector_type_to_bridge_types(gcp_pubsub_producer) ->
[gcp_pubsub, gcp_pubsub_producer]; [gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(hstreamdb) ->
[hstreamdb];
connector_type_to_bridge_types(kafka_producer) -> connector_type_to_bridge_types(kafka_producer) ->
[kafka, kafka_producer]; [kafka, kafka_producer];
connector_type_to_bridge_types(kinesis) -> connector_type_to_bridge_types(kinesis) ->

View File

@ -200,7 +200,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[ [
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.2"}, {:wolff, github: "kafka4beam/wolff", tag: "1.10.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},