Merge pull request #12512 from kjellwinblad/kjell/refactor/hstreamdb/EMQX-11458

feat: refactor HStreamDB bridge to connector and action
This commit is contained in:
Kjell Winblad 2024-02-20 13:30:29 +01:00 committed by GitHub
commit 86c85f3b45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 711 additions and 211 deletions

View File

@ -10,7 +10,7 @@ CASSANDRA_TAG=3.11
MINIO_TAG=RELEASE.2023-03-20T20-16-18Z
OPENTS_TAG=9aa7f88
KINESIS_TAG=2.1
HSTREAMDB_TAG=v0.16.1
HSTREAMDB_TAG=v0.19.3
HSTREAMDB_ZK_TAG=3.8.1
MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server

View File

@ -91,6 +91,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_kinesis_action_info,
emqx_bridge_hstreamdb_action_info,
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_oracle_action_info,

View File

@ -3,7 +3,7 @@
{erl_opts, [debug_info]}.
{deps, [
{hstreamdb_erl,
{git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.4.5+v0.16.1"}}},
{git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.5.18+v0.18.1"}}},
{emqx, {path, "../../apps/emqx"}},
{emqx_utils, {path, "../../apps/emqx_utils"}}
]}.

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_hstreamdb, [
{description, "EMQX Enterprise HStreamDB Bridge"},
{vsn, "0.1.3"},
{vsn, "0.1.4"},
{registered, []},
{applications, [
kernel,
@ -8,7 +8,7 @@
emqx_resource,
hstreamdb_erl
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_hstreamdb_action_info]}]},
{modules, []},
{links, []}
]}.

View File

@ -1,4 +1,4 @@
%%--------------------------------------------------------------------
%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_hstreamdb).
@ -6,10 +6,12 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-import(hoconsc, [mk/2, enum/1, ref/2]).
-import(hoconsc, [mk/2, enum/1]).
-export([
conn_bridge_examples/1
conn_bridge_examples/1,
bridge_v2_examples/1,
connector_examples/1
]).
-export([
@ -19,6 +21,11 @@
desc/1
]).
-define(CONNECTOR_TYPE, hstreamdb).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
-define(DEFAULT_GRPC_FLUSH_TIMEOUT_RAW, <<"10s">>).
%% -------------------------------------------------------------------------------------------------
%% api
@ -27,16 +34,16 @@ conn_bridge_examples(Method) ->
#{
<<"hstreamdb">> => #{
summary => <<"HStreamDB Bridge">>,
value => values(Method)
value => conn_bridge_example_values(Method)
}
}
].
values(get) ->
values(post);
values(put) ->
values(post);
values(post) ->
conn_bridge_example_values(get) ->
conn_bridge_example_values(post);
conn_bridge_example_values(put) ->
conn_bridge_example_values(post);
conn_bridge_example_values(post) ->
#{
type => <<"hstreamdb">>,
name => <<"demo">>,
@ -55,15 +62,143 @@ values(post) ->
},
ssl => #{enable => false}
};
values(_) ->
conn_bridge_example_values(_) ->
#{}.
connector_examples(Method) ->
[
#{
<<"hstreamdb">> =>
#{
summary => <<"HStreamDB Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_values()
)
}
}
].
connector_values() ->
#{
<<"url">> => <<"http://127.0.0.1:6570">>,
<<"grpc_timeout">> => <<"30s">>,
<<"ssl">> =>
#{
<<"enable">> => false,
<<"verify">> => <<"verify_peer">>
},
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"15s">>,
<<"start_timeout">> => <<"5s">>
}
}.
bridge_v2_examples(Method) ->
[
#{
<<"hstreamdb">> =>
#{
summary => <<"HStreamDB Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{
<<"parameters">> => #{
<<"partition_key">> => <<"hej">>,
<<"record_template">> => <<"${payload}">>,
<<"stream">> => <<"mqtt_message">>,
<<"aggregation_pool_size">> => 8,
<<"writer_pool_size">> => 8
}
}.
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_hstreamdb".
roots() -> [].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
Fields =
fields(connector_fields) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(hstreamdb_action));
fields(action) ->
{?ACTION_TYPE,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, hstreamdb_action)),
#{
desc => <<"HStreamDB Action Config">>,
required => false
}
)};
fields(hstreamdb_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
hoconsc:ref(?MODULE, action_parameters),
#{
required => true,
desc => ?DESC("action_parameters")
}
)
);
fields(action_parameters) ->
[
{stream,
mk(binary(), #{
required => true, desc => ?DESC(emqx_bridge_hstreamdb_connector, "stream_name")
})},
{partition_key,
mk(binary(), #{
required => false, desc => ?DESC(emqx_bridge_hstreamdb_connector, "partition_key")
})},
{grpc_flush_timeout, fun grpc_flush_timeout/1},
{record_template,
mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("record_template")})},
{aggregation_pool_size,
mk(integer(), #{default => 8, desc => ?DESC("aggregation_pool_size")})},
{max_batches, mk(integer(), #{default => 500, desc => ?DESC("max_batches")})},
{writer_pool_size, mk(integer(), #{default => 8, desc => ?DESC("writer_pool_size")})},
{batch_size, mk(integer(), #{default => 100, desc => ?DESC("batch_size")})},
{batch_interval,
mk(emqx_schema:timeout_duration_ms(), #{
default => <<"500ms">>, desc => ?DESC("batch_interval")
})}
];
fields(connector_fields) ->
[
{url,
mk(binary(), #{
required => true,
desc => ?DESC(emqx_bridge_hstreamdb_connector, "url"),
default => <<"http://127.0.0.1:6570">>
})},
{grpc_timeout, fun grpc_timeout/1}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
fields(connector_fields) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("config") ->
hstream_bridge_common_fields() ++
connector_fields();
@ -80,6 +215,18 @@ fields("put") ->
hstream_bridge_common_fields() ++
connector_fields().
grpc_timeout(type) -> emqx_schema:timeout_duration_ms();
grpc_timeout(desc) -> ?DESC(emqx_bridge_hstreamdb_connector, "grpc_timeout");
grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW;
grpc_timeout(required) -> false;
grpc_timeout(_) -> undefined.
grpc_flush_timeout(type) -> emqx_schema:timeout_duration_ms();
grpc_flush_timeout(desc) -> ?DESC("grpc_flush_timeout");
grpc_flush_timeout(default) -> ?DEFAULT_GRPC_FLUSH_TIMEOUT_RAW;
grpc_flush_timeout(required) -> false;
grpc_flush_timeout(_) -> undefined.
hstream_bridge_common_fields() ->
emqx_bridge_schema:common_bridge_fields() ++
[
@ -97,6 +244,16 @@ desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for HStreamDB bridge using `", string:to_upper(Method), "` method."];
desc("creation_opts") ->
?DESC(emqx_resource_schema, "creation_opts");
desc("config_connector") ->
?DESC("config_connector");
desc(hstreamdb_action) ->
?DESC("hstreamdb_action");
desc(action_parameters) ->
?DESC("action_parameters");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_) ->
undefined.

View File

@ -0,0 +1,89 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_hstreamdb_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
bridge_v1_config_to_connector_config/1,
bridge_v1_config_to_action_config/2,
connector_action_config_to_bridge_v1_config/2
]).
bridge_v1_type_name() -> hstreamdb.
action_type_name() -> hstreamdb.
connector_type_name() -> hstreamdb.
schema_module() -> emqx_bridge_hstreamdb.
bridge_v1_config_to_connector_config(BridgeV1Conf) ->
ConnectorSchema = emqx_bridge_hstreamdb:fields(connector_fields),
ConnectorAtomKeys = lists:foldl(fun({K, _}, Acc) -> [K | Acc] end, [], ConnectorSchema),
ConnectorBinKeys = [atom_to_binary(K) || K <- ConnectorAtomKeys] ++ [<<"resource_opts">>],
ConnectorConfig0 = maps:with(ConnectorBinKeys, BridgeV1Conf),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
ConnectorConfig0
).
bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, emqx_bridge_hstreamdb, "config_connector"
),
%% Remove fields no longer relevant for the action
Config1 = lists:foldl(
fun(Field, Acc) ->
emqx_utils_maps:deep_remove(Field, Acc)
end,
Config0,
[
[<<"parameters">>, <<"pool_size">>],
[<<"parameters">>, <<"direction">>]
]
),
%% Move pool_size to aggregation_pool_size and writer_pool_size
PoolSize = maps:get(<<"pool_size">>, BridgeV1Conf, 8),
Config2 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"aggregation_pool_size">>],
Config1,
PoolSize
),
Config3 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"writer_pool_size">>],
Config2,
PoolSize
),
Config3.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
BridgeV1Config1 = maps:remove(<<"connector">>, ActionConfig),
BridgeV1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, BridgeV1Config1),
BridgeV1Config3 = maps:remove(<<"parameters">>, BridgeV1Config2),
%% Pick out pool_size from aggregation_pool_size
PoolSize = emqx_utils_maps:deep_get(
[<<"parameters">>, <<"aggregation_pool_size">>], ActionConfig, 8
),
BridgeV1Config4 = maps:put(<<"pool_size">>, PoolSize, BridgeV1Config3),
%% Move the fields stream, partition_key and record_template from
%% parameters in ActionConfig to the top level in BridgeV1Config
lists:foldl(
fun(Field, Acc) ->
emqx_utils_maps:deep_put(
[Field],
Acc,
emqx_utils_maps:deep_get([<<"parameters">>, Field], ActionConfig, <<>>)
)
end,
BridgeV1Config4,
[<<"stream">>, <<"partition_key">>, <<"record_template">>]
).

View File

@ -7,8 +7,9 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-import(hoconsc, [mk/2, enum/1]).
-import(hoconsc, [mk/2]).
-behaviour(emqx_resource).
@ -19,7 +20,11 @@
on_stop/2,
on_query/3,
on_batch_query/3,
on_get_status/2
on_get_status/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channels/1,
on_get_channel_status/3
]).
-export([
@ -38,67 +43,132 @@
-define(DEFAULT_GRPC_TIMEOUT, timer:seconds(30)).
-define(DEFAULT_GRPC_TIMEOUT_RAW, <<"30s">>).
-define(DEFAULT_GRPC_FLUSH_TIMEOUT, 10000).
-define(DEFAULT_MAX_BATCHES, 500).
-define(DEFAULT_BATCH_INTERVAL, 500).
-define(DEFAULT_AGG_POOL_SIZE, 8).
-define(DEFAULT_WRITER_POOL_SIZE, 8).
%% -------------------------------------------------------------------------------------------------
%% resource callback
callback_mode() -> always_sync.
on_start(InstId, Config) ->
start_client(InstId, Config).
try
do_on_start(InstId, Config)
catch
E:R:S ->
Error = #{
msg => "start_hstreamdb_connector_error",
connector => InstId,
error => E,
reason => R,
stack => S
},
?SLOG(error, Error),
{error, Error}
end.
on_add_channel(
_InstId,
#{
installed_channels := InstalledChannels,
client_options := ClientOptions
} = OldState,
ChannelId,
ChannelConfig
) ->
%{ok, ChannelState} = create_channel_state(ChannelId, PoolName, ChannelConfig),
Parameters0 = maps:get(parameters, ChannelConfig),
Parameters = Parameters0#{client_options => ClientOptions},
PartitionKey = emqx_placeholder:preproc_tmpl(maps:get(partition_key, Parameters, <<"">>)),
try
ChannelState = #{
producer => start_producer(ChannelId, Parameters),
record_template => record_template(Parameters),
partition_key => PartitionKey
},
NewInstalledChannels = maps:put(ChannelId, ChannelState, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}
catch
Error:Reason:Stack ->
{error, {Error, Reason, Stack}}
end.
on_remove_channel(
_InstId,
#{
installed_channels := InstalledChannels
} = OldState,
ChannelId
) ->
#{
producer := Producer
} = maps:get(ChannelId, InstalledChannels),
_ = hstreamdb:stop_producer(Producer),
NewInstalledChannels = maps:remove(ChannelId, InstalledChannels),
%% Update state
NewState = OldState#{installed_channels => NewInstalledChannels},
{ok, NewState}.
on_get_channel_status(
_ResId,
_ChannelId,
_State
) ->
?status_connected.
on_get_channels(ResId) ->
emqx_bridge_v2:get_channels_for_connector(ResId).
on_stop(InstId, _State) ->
case emqx_resource:get_allocated_resources(InstId) of
#{?hstreamdb_client := #{client := Client, producer := Producer}} ->
StopClientRes = hstreamdb:stop_client(Client),
StopProducerRes = hstreamdb:stop_producer(Producer),
?SLOG(info, #{
msg => "stop_hstreamdb_connector",
connector => InstId,
client => Client,
producer => Producer,
stop_client => StopClientRes,
stop_producer => StopProducerRes
});
_ ->
ok
end.
?tp(
hstreamdb_connector_on_stop,
#{instance_id => InstId}
).
-define(FAILED_TO_APPLY_HRECORD_TEMPLATE,
{error, {unrecoverable_error, failed_to_apply_hrecord_template}}
).
on_query(
_InstId,
{send_message, Data},
_State = #{
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
}
InstId,
{ChannelID, Data},
#{installed_channels := Channels} = _State
) ->
#{
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
} = maps:get(ChannelID, Channels),
try to_record(PartitionKey, HRecordTemplate, Data) of
Record -> append_record(Producer, Record, false)
Record -> append_record(InstId, Producer, Record, false)
catch
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
end.
on_batch_query(
_InstId,
BatchList,
_State = #{
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
}
InstId,
[{ChannelID, _Data} | _] = BatchList,
#{installed_channels := Channels} = _State
) ->
#{
producer := Producer, partition_key := PartitionKey, record_template := HRecordTemplate
} = maps:get(ChannelID, Channels),
try to_multi_part_records(PartitionKey, HRecordTemplate, BatchList) of
Records -> append_record(Producer, Records, true)
Records -> append_record(InstId, Producer, Records, true)
catch
_:_ -> ?FAILED_TO_APPLY_HRECORD_TEMPLATE
end.
on_get_status(_InstId, #{client := Client}) ->
case is_alive(Client) of
true ->
connected;
false ->
disconnected
on_get_status(_InstId, State) ->
case check_status(State) of
ok ->
?status_connected;
Error ->
%% We set it to ?status_connecting so that the channels are not deleted.
%% The producers in the channels contains buffers so we don't want to delete them.
{?status_connecting, State, Error}
end.
%% -------------------------------------------------------------------------------------------------
@ -140,142 +210,152 @@ desc(config) ->
%% -------------------------------------------------------------------------------------------------
%% internal functions
start_client(InstId, Config) ->
try
do_start_client(InstId, Config)
catch
E:R:S ->
Error = #{
msg => "start_hstreamdb_connector_error",
connector => InstId,
error => E,
reason => R,
stack => S
},
?SLOG(error, Error),
{error, Error}
end.
do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize, ssl := SSL}) ->
do_on_start(InstId, Config) ->
?SLOG(info, #{
msg => "starting_hstreamdb_connector_client",
connector => InstId,
config => Config
}),
ClientName = client_name(InstId),
RpcOpts =
case maps:get(enable, SSL) of
false ->
#{pool_size => PoolSize};
true ->
#{
pool_size => PoolSize,
gun_opts => #{
transport => tls,
transport_opts => emqx_tls_lib:to_client_opts(SSL)
}
}
end,
ClientOptions = [
{url, binary_to_list(Server)},
{rpc_options, RpcOpts}
],
case hstreamdb:start_client(ClientName, ClientOptions) of
{ok, Client} ->
case is_alive(Client) of
true ->
{ok, _} = application:ensure_all_started(hstreamdb_erl),
ClientOptions = client_options(Config),
State = #{
client_options => ClientOptions,
installed_channels => #{}
},
case check_status(State) of
ok ->
?SLOG(info, #{
msg => "hstreamdb_connector_client_started",
connector => InstId,
client => Client
connector => InstId
}),
start_producer(InstId, Client, Config);
_ ->
{ok, State};
Error ->
?tp(
hstreamdb_connector_start_failed,
#{error => client_not_alive}
),
?SLOG(error, #{
msg => "hstreamdb_connector_client_not_alive",
connector => InstId
}),
{error, connect_failed}
end;
{error, {already_started, Pid}} ->
?SLOG(info, #{
msg => "starting_hstreamdb_connector_client_find_old_client_restart_client",
old_client_pid => Pid,
old_client_name => ClientName
}),
_ = hstreamdb:stop_client(ClientName),
start_client(InstId, Config);
{error, Error} ->
?SLOG(error, #{
msg => "hstreamdb_connector_client_failed",
connector => InstId,
reason => Error
error => Error
}),
{error, {connect_failed, Error}}
end.
client_options(Config = #{url := ServerURL, ssl := SSL}) ->
GRPCTimeout = maps:get(<<"grpc_timeout">>, Config, ?DEFAULT_GRPC_TIMEOUT),
EnableSSL = maps:get(enable, SSL),
RpcOpts =
case EnableSSL of
false ->
#{pool_size => 1};
true ->
#{
pool_size => 1,
gun_opts => #{
transport => tls,
transport_opts =>
emqx_tls_lib:to_client_opts(SSL)
}
}
end,
ClientOptions = #{
url => to_string(ServerURL),
grpc_timeout => GRPCTimeout,
rpc_options => RpcOpts
},
ClientOptions.
check_status(ConnectorState) ->
try start_client(ConnectorState) of
{ok, Client} ->
check_status_with_client(Client);
{error, _} = StartClientError ->
StartClientError
catch
ErrorType:Reason:_ST ->
{error, {ErrorType, Reason}}
end.
check_status_with_client(Client) ->
try hstreamdb_client:echo(Client) of
ok -> ok;
{error, _} = ErrorEcho -> ErrorEcho
after
_ = hstreamdb:stop_client(Client)
end.
start_client(Opts) ->
ClientOptions = maps:get(client_options, Opts),
case hstreamdb:start_client(ClientOptions) of
{ok, Client} ->
{ok, Client};
{error, Error} ->
{error, Error}
end.
is_alive(Client) ->
hstreamdb_client:echo(Client) =:= ok.
start_producer(
InstId,
Client,
Options = #{stream := Stream, pool_size := PoolSize}
ActionId,
#{
stream := Stream,
batch_size := BatchSize,
batch_interval := Interval
} = Opts
) ->
%% TODO: change these batch options after we have better disk cache.
BatchSize = maps:get(batch_size, Options, 100),
Interval = maps:get(batch_interval, Options, 1000),
ProducerOptions = [
{stream, Stream},
{callback, {?MODULE, on_flush_result, []}},
{max_records, BatchSize},
{interval, Interval},
{pool_size, PoolSize},
{grpc_timeout, maps:get(grpc_timeout, Options, ?DEFAULT_GRPC_TIMEOUT)}
],
Name = produce_name(InstId),
?SLOG(info, #{
msg => "starting_hstreamdb_connector_producer",
connector => InstId
}),
case hstreamdb:start_producer(Client, Name, ProducerOptions) of
{ok, Producer} ->
?SLOG(info, #{
msg => "hstreamdb_connector_producer_started"
}),
State = #{
client => Client,
producer => Producer,
enable_batch => maps:get(enable_batch, Options, false),
partition_key => emqx_placeholder:preproc_tmpl(
maps:get(partition_key, Options, <<"">>)
),
record_template => record_template(Options)
MaxBatches = maps:get(max_batches, Opts, ?DEFAULT_MAX_BATCHES),
AggPoolSize = maps:get(aggregation_pool_size, Opts, ?DEFAULT_AGG_POOL_SIZE),
WriterPoolSize = maps:get(writer_pool_size, Opts, ?DEFAULT_WRITER_POOL_SIZE),
GRPCTimeout = maps:get(grpc_flush_timeout, Opts, ?DEFAULT_GRPC_FLUSH_TIMEOUT),
ClientOptions = maps:get(client_options, Opts),
ProducerOptions = #{
stream => to_string(Stream),
buffer_options => #{
interval => Interval,
callback => {?MODULE, on_flush_result, [ActionId]},
max_records => BatchSize,
max_batches => MaxBatches
},
ok = emqx_resource:allocate_resource(InstId, ?hstreamdb_client, #{
client => Client, producer => Producer
}),
{ok, State};
{error, {already_started, Pid}} ->
?SLOG(info, #{
msg =>
"starting_hstreamdb_connector_producer_find_old_producer_restart_producer",
old_producer_pid => Pid,
old_producer_name => Name
}),
_ = hstreamdb:stop_producer(Name),
start_producer(InstId, Client, Options);
buffer_pool_size => AggPoolSize,
writer_options => #{
grpc_timeout => GRPCTimeout
},
writer_pool_size => WriterPoolSize,
client_options => ClientOptions
},
Name = produce_name(ActionId),
ensure_start_producer(Name, ProducerOptions).
ensure_start_producer(ProducerName, ProducerOptions) ->
case hstreamdb:start_producer(ProducerName, ProducerOptions) of
ok ->
ok;
{error, {already_started, _Pid}} ->
%% HStreamDB producer already started, restart it
_ = hstreamdb:stop_producer(ProducerName),
%% the pool might have been leaked after relup
_ = ecpool:stop_sup_pool(ProducerName),
ok = hstreamdb:start_producer(ProducerName, ProducerOptions);
{error, {
{shutdown,
{failed_to_start_child, {pool_sup, Pool},
{shutdown,
{failed_to_start_child, worker_sup,
{shutdown, {failed_to_start_child, _, {badarg, _}}}}}}},
_
}} ->
%% HStreamDB producer was not properly cleared, restart it
%% the badarg error in gproc maybe caused by the pool is leaked after relup
_ = ecpool:stop_sup_pool(Pool),
ok = hstreamdb:start_producer(ProducerName, ProducerOptions);
{error, Reason} ->
?SLOG(error, #{
msg => "starting_hstreamdb_connector_producer_failed",
reason => Reason
}),
{error, Reason}
end.
%% HStreamDB start producer failed
throw({start_producer_failed, Reason})
end,
ProducerName.
produce_name(ActionId) ->
list_to_binary("backend_hstream_producer:" ++ to_string(ActionId)).
to_record(PartitionKeyTmpl, HRecordTmpl, Data) ->
PartitionKey = emqx_placeholder:proc_tmpl(PartitionKeyTmpl, Data),
@ -289,43 +369,46 @@ to_record(PartitionKey, RawRecord) ->
to_multi_part_records(PartitionKeyTmpl, HRecordTmpl, BatchList) ->
lists:map(
fun({send_message, Data}) ->
fun({_, Data}) ->
to_record(PartitionKeyTmpl, HRecordTmpl, Data)
end,
BatchList
).
append_record(Producer, MultiPartsRecords, MaybeBatch) when is_list(MultiPartsRecords) ->
append_record(ResourceId, Producer, MultiPartsRecords, MaybeBatch) when
is_list(MultiPartsRecords)
->
lists:foreach(
fun(Record) -> append_record(Producer, Record, MaybeBatch) end, MultiPartsRecords
fun(Record) -> append_record(ResourceId, Producer, Record, MaybeBatch) end,
MultiPartsRecords
);
append_record(Producer, Record, MaybeBatch) when is_tuple(Record) ->
do_append_records(Producer, Record, MaybeBatch).
append_record(ResourceId, Producer, Record, MaybeBatch) when is_tuple(Record) ->
do_append_records(ResourceId, Producer, Record, MaybeBatch).
%% TODO: only sync request supported. implement async request later.
do_append_records(Producer, Record, true = IsBatch) ->
do_append_records(ResourceId, Producer, Record, true = IsBatch) ->
Result = hstreamdb:append(Producer, Record),
handle_result(Result, Record, IsBatch);
do_append_records(Producer, Record, false = IsBatch) ->
handle_result(ResourceId, Result, Record, IsBatch);
do_append_records(ResourceId, Producer, Record, false = IsBatch) ->
Result = hstreamdb:append_flush(Producer, Record),
handle_result(Result, Record, IsBatch).
handle_result(ResourceId, Result, Record, IsBatch).
handle_result(ok = Result, Record, IsBatch) ->
handle_result({ok, Result}, Record, IsBatch);
handle_result({ok, Result}, Record, IsBatch) ->
handle_result(ResourceId, ok = Result, Record, IsBatch) ->
handle_result(ResourceId, {ok, Result}, Record, IsBatch);
handle_result(ResourceId, {ok, Result}, Record, IsBatch) ->
?tp(
hstreamdb_connector_query_append_return,
#{result => Result, is_batch => IsBatch}
#{result => Result, is_batch => IsBatch, instance_id => ResourceId}
),
?SLOG(debug, #{
msg => "hstreamdb_producer_sync_append_success",
record => Record,
is_batch => IsBatch
});
handle_result({error, Reason} = Err, Record, IsBatch) ->
handle_result(ResourceId, {error, Reason} = Err, Record, IsBatch) ->
?tp(
hstreamdb_connector_query_append_return,
#{error => Reason, is_batch => IsBatch}
#{error => Reason, is_batch => IsBatch, instance_id => ResourceId}
),
?SLOG(error, #{
msg => "hstreamdb_producer_sync_append_failed",
@ -335,12 +418,6 @@ handle_result({error, Reason} = Err, Record, IsBatch) ->
}),
Err.
client_name(InstId) ->
"client:" ++ to_string(InstId).
produce_name(ActionId) ->
list_to_atom("producer:" ++ to_string(ActionId)).
record_template(#{record_template := RawHRecordTemplate}) ->
emqx_placeholder:preproc_tmpl(RawHRecordTemplate);
record_template(_) ->

View File

@ -117,16 +117,21 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(t_to_hrecord_failed, Config) ->
init_per_testcase_common(),
meck:new([hstreamdb], [passthrough, no_history, no_link]),
meck:expect(hstreamdb, to_record, fun(_, _, _) -> error(trans_to_hrecord_failed) end),
Config;
init_per_testcase(_Testcase, Config) ->
init_per_testcase_common(),
%% drop stream and will create a new one in common_init/1
%% TODO: create a new stream for each test case
delete_bridge(Config),
snabbkaffe:start_trace(),
Config.
init_per_testcase_common() ->
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors().
end_per_testcase(t_to_hrecord_failed, _Config) ->
meck:unload([hstreamdb]);
end_per_testcase(_Testcase, Config) ->
@ -301,7 +306,10 @@ t_simple_query(Config) ->
{ok, _},
create_bridge(Config)
),
Requests = gen_batch_req(BatchSize),
Type = ?config(hstreamdb_bridge_type, Config),
Name = ?config(hstreamdb_name, Config),
ActionId = emqx_bridge_v2:id(Type, Name),
Requests = gen_batch_req(BatchSize, ActionId),
?check_trace(
begin
?wait_async_action(
@ -351,6 +359,24 @@ t_to_hrecord_failed(Config) ->
end,
ok.
%% Connector Action Tests
t_action_on_get_status(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}).
t_action_create_via_http(Config) ->
emqx_bridge_v2_testlib:t_create_via_http(Config).
t_action_sync_query(Config) ->
MakeMessageFun = fun() -> rand_data() end,
IsSuccessCheck = fun(Result) -> ?assertEqual(ok, Result) end,
TracePoint = hstreamdb_connector_query_append_return,
emqx_bridge_v2_testlib:t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint).
t_action_start_stop(Config) ->
StopTracePoint = hstreamdb_connector_on_stop,
emqx_bridge_v2_testlib:t_start_stop(Config, StopTracePoint).
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
@ -362,6 +388,10 @@ common_init(ConfigT) ->
URL = "http://" ++ Host ++ ":" ++ RawPort,
Config0 = [
{bridge_type, <<"hstreamdb">>},
{bridge_name, <<"my_hstreamdb_action">>},
{connector_type, <<"hstreamdb">>},
{connector_name, <<"my_hstreamdb_connector">>},
{hstreamdb_host, Host},
{hstreamdb_port, Port},
{hstreamdb_url, URL},
@ -393,6 +423,8 @@ common_init(ConfigT) ->
{hstreamdb_config, HStreamDBConf},
{hstreamdb_bridge_type, BridgeType},
{hstreamdb_name, Name},
{bridge_config, action_config(Config0)},
{connector_config, connector_config(Config0)},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort}
| Config0
@ -424,7 +456,7 @@ hstreamdb_config(BridgeType, Config) ->
" resource_opts = {\n"
%% always sync
" query_mode = sync\n"
" request_ttl = 500ms\n"
" request_ttl = 10000ms\n"
" batch_size = ~b\n"
" worker_pool_size = ~b\n"
" }\n"
@ -443,6 +475,45 @@ hstreamdb_config(BridgeType, Config) ->
),
{Name, parse_and_check(ConfigString, BridgeType, Name)}.
action_config(Config) ->
ConnectorName = ?config(connector_name, Config),
BatchSize = batch_size(Config),
#{
<<"connector">> => ConnectorName,
<<"enable">> => true,
<<"parameters">> =>
#{
<<"aggregation_pool_size">> => ?POOL_SIZE,
<<"record_template">> => ?RECORD_TEMPLATE,
<<"stream">> => ?STREAM,
<<"writer_pool_size">> => ?POOL_SIZE
},
<<"resource_opts">> =>
#{
<<"batch_size">> => BatchSize,
<<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"45s">>,
<<"worker_pool_size">> => ?POOL_SIZE
}
}.
connector_config(Config) ->
Port = integer_to_list(?config(hstreamdb_port, Config)),
URL = "http://" ++ ?config(hstreamdb_host, Config) ++ ":" ++ Port,
#{
<<"url">> => URL,
<<"ssl">> =>
#{<<"enable">> => false, <<"verify">> => <<"verify_peer">>},
<<"grpc_timeout">> => <<"30s">>,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>,
<<"start_timeout">> => <<"5s">>
}
}.
parse_and_check(ConfigString, BridgeType, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
@ -454,10 +525,10 @@ parse_and_check(ConfigString, BridgeType, Name) ->
-define(CONN_ATTEMPTS, 10).
default_options(Config) ->
[
{url, ?config(hstreamdb_url, Config)},
{rpc_options, ?RPC_OPTIONS}
].
#{
url => ?config(hstreamdb_url, Config),
rpc_options => ?RPC_OPTIONS
}.
connect_direct_hstream(Name, Config) ->
client(Name, Config, ?CONN_ATTEMPTS).
@ -511,8 +582,9 @@ send_message(Config, Data) ->
query_resource(Config, Request) ->
Name = ?config(hstreamdb_name, Config),
BridgeType = ?config(hstreamdb_bridge_type, Config),
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
ID = emqx_bridge_v2:id(BridgeType, Name),
ResID = emqx_connector_resource:resource_id(BridgeType, Name),
emqx_resource:query(ID, Request, #{timeout => 1_000, connector_resource_id => ResID}).
restart_resource(Config) ->
BridgeName = ?config(hstreamdb_name, Config),
@ -526,8 +598,16 @@ resource_id(Config) ->
BridgeType = ?config(hstreamdb_bridge_type, Config),
_ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName).
action_id(Config) ->
ActionName = ?config(hstreamdb_name, Config),
ActionType = ?config(hstreamdb_bridge_type, Config),
_ActionID = emqx_bridge_v2:id(ActionType, ActionName).
health_check_resource_ok(Config) ->
?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))).
?assertEqual({ok, connected}, emqx_resource_manager:health_check(resource_id(Config))),
ActionName = ?config(hstreamdb_name, Config),
ActionType = ?config(hstreamdb_bridge_type, Config),
?assertMatch(#{status := connected}, emqx_bridge_v2:health_check(ActionType, ActionName)).
health_check_resource_down(Config) ->
case emqx_resource_manager:health_check(resource_id(Config)) of
@ -539,6 +619,19 @@ health_check_resource_down(Config) ->
?assert(
false, lists:flatten(io_lib:format("invalid health check result:~p~n", [Other]))
)
end,
ActionName = ?config(hstreamdb_name, Config),
ActionType = ?config(hstreamdb_bridge_type, Config),
#{status := StatusV2} = emqx_bridge_v2:health_check(ActionType, ActionName),
case StatusV2 of
disconnected ->
ok;
connecting ->
ok;
OtherV2 ->
?assert(
false, lists:flatten(io_lib:format("invalid health check result:~p~n", [OtherV2]))
)
end.
% These funs start and then stop the hstreamdb connection
@ -548,22 +641,36 @@ connect_and_create_stream(Config) ->
Client, ?STREAM, ?REPLICATION_FACTOR, ?BACKLOG_RETENTION_SECOND, ?SHARD_COUNT
)
),
%% force write to stream to make it created and ready to be written data for rest cases
ProducerOptions = [
{pool_size, 4},
{stream, ?STREAM},
{callback, fun(_) -> ok end},
{max_records, 10},
{interval, 1000}
],
%% force write to stream to make it created and ready to be written data for test cases
ProducerOptions = #{
stream => ?STREAM,
buffer_options => #{
interval => 1000,
callback => {?MODULE, on_flush_result, [<<"WHAT">>]},
max_records => 1,
max_batches => 1
},
buffer_pool_size => 1,
writer_options => #{
grpc_timeout => 100
},
writer_pool_size => 1,
client_options => default_options(Config)
},
?WITH_CLIENT(
begin
{ok, Producer} = hstreamdb:start_producer(Client, test_producer, ProducerOptions),
_ = hstreamdb:append_flush(Producer, hstreamdb:to_record([], raw, rand_payload())),
_ = hstreamdb:stop_producer(Producer)
ok = hstreamdb:start_producer(test_producer, ProducerOptions),
_ = hstreamdb:append_flush(test_producer, hstreamdb:to_record([], raw, rand_payload())),
_ = hstreamdb:stop_producer(test_producer)
end
).
on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) ->
ok;
on_flush_result({{flush, _Stream, _Records}, {error, _Reason}}) ->
ok.
connect_and_delete_stream(Config) ->
?WITH_CLIENT(
_ = hstreamdb_client:delete_stream(Client, ?STREAM)
@ -593,11 +700,11 @@ rand_payload() ->
temperature => rand:uniform(40), humidity => rand:uniform(100)
}).
gen_batch_req(Count) when
gen_batch_req(Count, ActionId) when
is_integer(Count) andalso Count > 0
->
[{send_message, rand_data()} || _Val <- lists:seq(1, Count)];
gen_batch_req(Count) ->
[{ActionId, rand_data()} || _Val <- lists:seq(1, Count)];
gen_batch_req(Count, _ActionId) ->
ct:pal("Gen batch requests failed with unexpected Count: ~p", [Count]).
str(List) when is_list(List) ->

View File

@ -28,6 +28,8 @@ resource_type(confluent_producer) ->
emqx_bridge_kafka_impl_producer;
resource_type(gcp_pubsub_producer) ->
emqx_bridge_gcp_pubsub_impl_producer;
resource_type(hstreamdb) ->
emqx_bridge_hstreamdb_connector;
resource_type(kafka_producer) ->
emqx_bridge_kafka_impl_producer;
resource_type(kinesis) ->
@ -126,6 +128,14 @@ connector_structs() ->
required => false
}
)},
{hstreamdb,
mk(
hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config_connector")),
#{
desc => <<"HStreamDB Connector Config">>,
required => false
}
)},
{kafka_producer,
mk(
hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")),
@ -310,6 +320,7 @@ schema_modules() ->
emqx_bridge_azure_event_hub,
emqx_bridge_confluent_producer,
emqx_bridge_gcp_pubsub_producer_schema,
emqx_bridge_hstreamdb,
emqx_bridge_kafka,
emqx_bridge_kinesis,
emqx_bridge_matrix,
@ -349,6 +360,7 @@ api_schemas(Method) ->
<<"gcp_pubsub_producer">>,
Method ++ "_connector"
),
api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method ++ "_connector"),
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(emqx_bridge_kinesis, <<"kinesis">>, Method ++ "_connector"),
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),

View File

@ -128,6 +128,8 @@ connector_type_to_bridge_types(confluent_producer) ->
[confluent_producer];
connector_type_to_bridge_types(gcp_pubsub_producer) ->
[gcp_pubsub, gcp_pubsub_producer];
connector_type_to_bridge_types(hstreamdb) ->
[hstreamdb];
connector_type_to_bridge_types(kafka_producer) ->
[kafka, kafka_producer];
connector_type_to_bridge_types(kinesis) ->

View File

@ -0,0 +1 @@
The HStreamDB bridge has been split into connector and action components. Old HStreamDB bridges will be upgraded automatically but it is recommended to do the upgrade manually as new fields has been added to the configuration.

View File

@ -200,7 +200,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.2"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},

View File

@ -47,4 +47,58 @@ NOTE: When you use `raw record` template (which means the data is not a valid JS
record_template.label:
"""HStream Record"""
action_parameters.desc:
"""Action specific configuration."""
action_parameters.label:
"""Action"""
grpc_flush_timeout.desc:
"""Time interval for flushing gRPC calls to the HStreamDB server."""
grpc_flush_timeout.label:
"""gRPC Flush Interval"""
aggregation_pool_size.desc:
"""The size of the record aggregation pool. A larger aggregation pool size can lead to enhanced parallelization but may also result in reduced efficiency due to smaller batch sizes."""
aggregation_pool_size.label:
"""Aggregation Pool Size"""
max_batches.desc:
"""Maximum number of unconfirmed batches in the flush queue."""
max_batches.label:
"""Max Batches"""
writer_pool_size.desc:
"""The size of the writer pool. A larger pool may increase parallelization and concurrent write operations, potentially boosting throughput. Trade-offs include greater memory consumption and possible resource contention."""
writer_pool_size.label:
"""Writer Pool Size"""
batch_size.desc:
"""Maximum number of insert data clauses that can be sent in a single request."""
batch_size.label:
"""Max Batch Append Count"""
batch_interval.desc:
"""Maximum interval that is allowed between two successive (batch) request."""
batch_interval.label:
"""Max Batch Interval"""
hstreamdb_action.desc:
"""Configuration for HStreamDB action."""
hstreamdb_action.label:
"""HStreamDB Action Configuration"""
config_connector.desc:
"""Configuration for an HStreamDB connector."""
config_connector.label:
"""HStreamDB Connector Configuration"""
}

View File

@ -19,7 +19,7 @@ name.label:
"""Connector Name"""
url.desc:
"""HStreamDB Server URL. Using gRPC http server address."""
"""HStreamDB Server URL. This URL will be used as the gRPC HTTP server address."""
url.label:
"""HStreamDB Server URL"""
@ -37,13 +37,13 @@ partition_key.label:
"""HStreamDB Partition Key"""
pool_size.desc:
"""HStreamDB Pool Size."""
"""The size of the aggregation pool and the writer pool (see the description of the HStreamDB action for more information about these pools). Larger pool sizes can enhance parallelization but may also reduce efficiency due to smaller batch sizes."""
pool_size.label:
"""HStreamDB Pool Size"""
grpc_timeout.desc:
"""HStreamDB gRPC Timeout."""
"""The timeout for HStreamDB gRPC requests."""
grpc_timeout.label:
"""HStreamDB gRPC Timeout"""