emqx/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb.erl

279 lines
8.7 KiB
Erlang

%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_hstreamdb).
-behaviour(emqx_connector_examples).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_bridge_hstreamdb.hrl").
-import(hoconsc, [mk/2, enum/1]).
-export([
conn_bridge_examples/1,
bridge_v2_examples/1,
connector_examples/1
]).
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
-define(CONNECTOR_TYPE, hstreamdb).
-define(ACTION_TYPE, ?CONNECTOR_TYPE).
%% -------------------------------------------------------------------------------------------------
%% api
conn_bridge_examples(Method) ->
[
#{
<<"hstreamdb">> => #{
summary => <<"HStreamDB Bridge">>,
value => conn_bridge_example_values(Method)
}
}
].
conn_bridge_example_values(get) ->
conn_bridge_example_values(post);
conn_bridge_example_values(put) ->
conn_bridge_example_values(post);
conn_bridge_example_values(post) ->
#{
type => <<"hstreamdb">>,
name => <<"demo">>,
direction => <<"egress">>,
url => <<"http://127.0.0.1:6570">>,
stream => <<"stream">>,
%% raw HRecord
record_template =>
<<"{ \"temperature\": ${payload.temperature}, \"humidity\": ${payload.humidity} }">>,
pool_size => 8,
%% grpc_timeout => <<"1m">>
resource_opts => #{
query_mode => sync,
batch_size => 100,
batch_time => <<"20ms">>
},
ssl => #{enable => false}
};
conn_bridge_example_values(_) ->
#{}.
connector_examples(Method) ->
[
#{
<<"hstreamdb">> =>
#{
summary => <<"HStreamDB Connector">>,
value => emqx_connector_schema:connector_values(
Method, ?CONNECTOR_TYPE, connector_values()
)
}
}
].
connector_values() ->
#{
<<"url">> => <<"http://127.0.0.1:6570">>,
<<"grpc_timeout">> => <<"30s">>,
<<"ssl">> =>
#{
<<"enable">> => false,
<<"verify">> => <<"verify_peer">>
},
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"15s">>,
<<"start_timeout">> => <<"5s">>
}
}.
bridge_v2_examples(Method) ->
[
#{
<<"hstreamdb">> =>
#{
summary => <<"HStreamDB Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, ?ACTION_TYPE, ?CONNECTOR_TYPE, action_values()
)
}
}
].
action_values() ->
#{
<<"parameters">> => #{
<<"partition_key">> => <<"hej">>,
<<"record_template">> => <<"${payload}">>,
<<"stream">> => <<"mqtt_message">>,
<<"aggregation_pool_size">> => ?DEFAULT_AGG_POOL_SIZE,
<<"writer_pool_size">> => ?DEFAULT_WRITER_POOL_SIZE
}
}.
%% -------------------------------------------------------------------------------------------------
%% Hocon Schema Definitions
namespace() -> "bridge_hstreamdb".
roots() -> [].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
Fields =
fields(connector_fields) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(hstreamdb_action));
fields(action) ->
{?ACTION_TYPE,
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, hstreamdb_action)),
#{
desc => <<"HStreamDB Action Config">>,
required => false
}
)};
fields(hstreamdb_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
hoconsc:mk(
hoconsc:ref(?MODULE, action_parameters),
#{
required => true,
desc => ?DESC("action_parameters")
}
)
);
fields(action_parameters) ->
[
{stream,
mk(binary(), #{
required => true, desc => ?DESC(emqx_bridge_hstreamdb_connector, "stream_name")
})},
{partition_key,
mk(emqx_schema:template(), #{
required => false,
desc => ?DESC(emqx_bridge_hstreamdb_connector, "partition_key")
})},
{grpc_flush_timeout, fun grpc_flush_timeout/1},
{record_template, record_template_schema()},
{aggregation_pool_size,
mk(pos_integer(), #{
default => ?DEFAULT_AGG_POOL_SIZE, desc => ?DESC("aggregation_pool_size")
})},
{max_batches,
mk(pos_integer(), #{default => ?DEFAULT_MAX_BATCHES, desc => ?DESC("max_batches")})},
{writer_pool_size,
mk(pos_integer(), #{
default => ?DEFAULT_WRITER_POOL_SIZE, desc => ?DESC("writer_pool_size")
})},
{batch_size, mk(pos_integer(), #{default => 100, desc => ?DESC("batch_size")})},
{batch_interval,
mk(emqx_schema:timeout_duration_ms(), #{
default => ?DEFAULT_BATCH_INTERVAL_RAW, desc => ?DESC("batch_interval")
})}
];
fields(connector_fields) ->
[
{url,
mk(binary(), #{
required => true,
desc => ?DESC(emqx_bridge_hstreamdb_connector, "url"),
default => <<"http://127.0.0.1:6570">>
})},
{grpc_timeout, fun grpc_timeout/1}
] ++ emqx_connector_schema_lib:ssl_fields();
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
fields(connector_fields) ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields("config") ->
hstream_bridge_common_fields() ++
connector_fields();
fields("post") ->
hstream_bridge_common_fields() ++
connector_fields() ++
type_name_fields();
fields("get") ->
hstream_bridge_common_fields() ++
connector_fields() ++
type_name_fields() ++
emqx_bridge_schema:status_fields();
fields("put") ->
hstream_bridge_common_fields() ++
connector_fields().
record_template_schema() ->
mk(emqx_schema:template(), #{
default => <<"${payload}">>,
desc => ?DESC("record_template")
}).
grpc_timeout(type) -> emqx_schema:timeout_duration_ms();
grpc_timeout(desc) -> ?DESC(emqx_bridge_hstreamdb_connector, "grpc_timeout");
grpc_timeout(default) -> ?DEFAULT_GRPC_TIMEOUT_RAW;
grpc_timeout(required) -> false;
grpc_timeout(_) -> undefined.
grpc_flush_timeout(type) -> emqx_schema:timeout_duration_ms();
grpc_flush_timeout(desc) -> ?DESC("grpc_flush_timeout");
grpc_flush_timeout(default) -> ?DEFAULT_GRPC_FLUSH_TIMEOUT_RAW;
grpc_flush_timeout(required) -> false;
grpc_flush_timeout(_) -> undefined.
hstream_bridge_common_fields() ->
emqx_bridge_schema:common_bridge_fields() ++
[
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{record_template, record_template_schema()}
] ++
emqx_resource_schema:fields("resource_opts").
connector_fields() ->
emqx_bridge_hstreamdb_connector:fields(config).
desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for HStreamDB bridge using `", string:to_upper(Method), "` method."];
desc("creation_opts") ->
?DESC(emqx_resource_schema, "creation_opts");
desc("config_connector") ->
?DESC("config_connector");
desc(hstreamdb_action) ->
?DESC("hstreamdb_action");
desc(action_parameters) ->
?DESC("action_parameters");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_) ->
undefined.
%% -------------------------------------------------------------------------------------------------
%% internal
type_name_fields() ->
[
{type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}
].