fix: hstreamdb connector conf & api

This commit is contained in:
DDDHuang 2022-07-28 16:47:30 +08:00
parent d2cf7dac7e
commit a4992ef1b5
9 changed files with 179 additions and 53 deletions

View File

@ -287,9 +287,7 @@ parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} =
maps:without([connector, direction], Conf), maps:without([connector, direction], Conf),
Type, Type,
Name Name
); ).
parse_confs(_Type, _Name, Conf) ->
Conf.
make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) ->
BName = bridge_id(Type, Name), BName = bridge_id(Type, Name),

View File

@ -80,7 +80,7 @@ connector_info_array_example(Method) ->
[Config || #{value := Config} <- maps:values(connector_info_examples(Method))]. [Config || #{value := Config} <- maps:values(connector_info_examples(Method))].
connector_info_examples(Method) -> connector_info_examples(Method) ->
lists:foldl( Fun =
fun(Type, Acc) -> fun(Type, Acc) ->
SType = atom_to_list(Type), SType = atom_to_list(Type),
maps:merge(Acc, #{ maps:merge(Acc, #{
@ -90,9 +90,17 @@ connector_info_examples(Method) ->
} }
}) })
end, end,
#{}, Broker = lists:foldl(Fun, #{}, ?CONN_TYPES),
?CONN_TYPES EE = ee_example(Method),
). maps:merge(Broker, EE).
-if(?EMQX_RELEASE_EDITION == ee).
ee_example(Method) ->
emqx_ee_connector:connector_examples(Method).
-else.
ee_example(_Method) ->
#{}.
-endif.
info_example(Type, Method) -> info_example(Type, Method) ->
maps:merge( maps:merge(

View File

@ -44,7 +44,9 @@ post_request() ->
http_schema("post"). http_schema("post").
http_schema(Method) -> http_schema(Method) ->
Schemas = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES], Broker = [?R_REF(schema_mod(Type), Method) || Type <- ?CONN_TYPES],
EE = [?R_REF(Module, Method) || Module <- schema_modules()],
Schemas = Broker ++ EE,
?UNION(Schemas). ?UNION(Schemas).
%%====================================================================================== %%======================================================================================
@ -57,13 +59,29 @@ roots() -> ["connectors"].
fields(connectors) -> fields(connectors) ->
fields("connectors"); fields("connectors");
fields("connectors") -> fields("connectors") ->
[ Broker = [
{mqtt, {mqtt,
?HOCON( ?HOCON(
?MAP(name, ?R_REF(emqx_connector_mqtt_schema, "connector")), ?MAP(name, ?R_REF(emqx_connector_mqtt_schema, "connector")),
#{desc => ?DESC("mqtt")} #{desc => ?DESC("mqtt")}
)} )}
]. ],
EE = ee_fields_connectors(),
Broker ++ EE.
-if(?EMQX_RELEASE_EDITION == ee).
schema_modules() ->
emqx_ee_connector:schema_modules().
ee_fields_connectors() ->
emqx_ee_connector:fields(connectors).
-else.
ee_fields_connectors() ->
[].
schema_modules() ->
[].
-endif.
desc(Record) when desc(Record) when
Record =:= connectors; Record =:= connectors;

View File

@ -81,4 +81,14 @@ will be forwarded.
zh: "Bridge 名字" zh: "Bridge 名字"
} }
} }
desc_connector {
desc {
en: """Generic configuration for the connector."""
zh: """连接器的通用配置。"""
}
label: {
en: "Connector Generic Configuration"
zh: "连接器通用配置。"
}
}
} }

View File

@ -23,12 +23,12 @@ conn_bridge_examples(Method) ->
end, end,
lists:foldl(Fun, #{}, schema_modules()). lists:foldl(Fun, #{}, schema_modules()).
resource_type(hstream) -> emqx_ee_connector_hstream; resource_type(hstreamdb) -> emqx_ee_connector_hstream;
resource_type(<<"hstream">>) -> emqx_ee_connector_hstream. resource_type(<<"hstreamdb">>) -> emqx_ee_connector_hstream.
fields(bridges) -> fields(bridges) ->
[ [
{hstream, {hstreamdb,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")),
#{desc => <<"EMQX Enterprise Config">>} #{desc => <<"EMQX Enterprise Config">>}

View File

@ -7,7 +7,7 @@
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include("emqx_ee_bridge.hrl"). -include("emqx_ee_bridge.hrl").
-import(hoconsc, [mk/2, enum/1]). -import(hoconsc, [mk/2, enum/1, ref/2]).
-export([ -export([
conn_bridge_example/1 conn_bridge_example/1
@ -25,7 +25,7 @@
conn_bridge_example(Method) -> conn_bridge_example(Method) ->
#{ #{
<<"hstream">> => #{ <<"hstreamdb">> => #{
summary => <<"HStreamDB Bridge">>, summary => <<"HStreamDB Bridge">>,
value => values(Method) value => values(Method)
} }
@ -35,12 +35,9 @@ values(get) ->
maps:merge(values(post), ?METRICS_EXAMPLE); maps:merge(values(post), ?METRICS_EXAMPLE);
values(post) -> values(post) ->
#{ #{
type => hstream, type => hstreamdb,
name => <<"demo">>, name => <<"demo">>,
url => <<"http://127.0.0.1:6570">>, connector => <<"hstreamdb:connector">>,
stream => <<"stream1">>,
ordering_key => <<"some_key">>,
pool_size => 8,
enable => true, enable => true,
direction => egress, direction => egress,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
@ -56,11 +53,13 @@ namespace() -> "bridge".
roots() -> []. roots() -> [].
fields("config") -> fields("config") ->
ExtConfig = [ [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})},
{local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})},
{payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})} {payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})},
], {connector, field(connector)}
basic_config() ++ ExtConfig; ];
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [type_field(), name_field() | fields("config")];
fields("put") -> fields("put") ->
@ -68,6 +67,16 @@ fields("put") ->
fields("get") -> fields("get") ->
emqx_bridge_schema:metrics_status_fields() ++ fields("post"). emqx_bridge_schema:metrics_status_fields() ++ fields("post").
field(connector) ->
mk(
hoconsc:union([binary(), ref(emqx_ee_connector_hstream, config)]),
#{
required => true,
example => <<"hstreamdb:demo">>,
desc => ?DESC("desc_connector")
}
).
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
@ -75,17 +84,10 @@ desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
desc(_) -> desc(_) ->
undefined. undefined.
basic_config() ->
Basic = [
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
{direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}
],
emqx_ee_connector_hstream:fields(config) ++ Basic.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% internal %% internal
type_field() -> type_field() ->
{type, mk(enum([hstream]), #{required => true, desc => ?DESC("desc_type")})}. {type, mk(enum([hstreamdb]), #{required => true, desc => ?DESC("desc_type")})}.
name_field() -> name_field() ->
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.

View File

@ -1,5 +1,26 @@
emqx_ee_connector_hstream { emqx_ee_connector_hstream {
type {
desc {
en: "The Connector Type."
zh: "连接器类型。"
}
label: {
en: "Connector Type"
zh: "连接器类型"
}
}
name {
desc {
en: "Connector name, used as a human-readable description of the connector."
zh: "连接器名称,人类可读的连接器描述。"
}
label: {
en: "Connector Name"
zh: "连接器名称"
}
}
url { url {
desc { desc {
en: """HStream Server URL""" en: """HStream Server URL"""

View File

@ -0,0 +1,32 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector).
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
schema_modules/0,
fields/1,
connector_examples/1
]).
schema_modules() ->
[emqx_ee_connector_hstream].
fields(connectors) ->
[
{hstreamdb,
mk(
hoconsc:map(name, ref(emqx_ee_connector_hstream, config)),
#{desc => <<"EMQX Enterprise Config">>}
)}
].
connector_examples(Method) ->
Fun =
fun(Module, Examples) ->
Example = erlang:apply(Module, connector_example, [Method]),
maps:merge(Examples, Example)
end,
lists:foldl(Fun, #{}, schema_modules()).

View File

@ -25,7 +25,8 @@
-export([ -export([
roots/0, roots/0,
fields/1 fields/1,
connector_example/1
]). ]).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
@ -38,7 +39,7 @@ on_stop(InstId, #{client := Client, producer := Producer}) ->
StopClientRes = hstreamdb:stop_client(Client), StopClientRes = hstreamdb:stop_client(Client),
StopProducerRes = hstreamdb:stop_producer(Producer), StopProducerRes = hstreamdb:stop_producer(Producer),
?SLOG(info, #{ ?SLOG(info, #{
msg => "stop hstream connector", msg => "stop hstreamdb connector",
connector => InstId, connector => InstId,
client => Client, client => Client,
producer => Producer, producer => Producer,
@ -64,7 +65,7 @@ on_get_status(_InstId, #{client := Client}) ->
end. end.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% hstream batch callback %% hstreamdb batch callback
%% TODO: maybe remove it after disk cache is ready %% TODO: maybe remove it after disk cache is ready
on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) -> on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) ->
@ -82,9 +83,41 @@ fields(config) ->
[ [
{url, mk(binary(), #{required => true, desc => ?DESC("url")})}, {url, mk(binary(), #{required => true, desc => ?DESC("url")})},
{stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})},
{ordering_key, mk(binary(), #{required => true, desc => ?DESC("ordering_key")})}, {ordering_key, mk(binary(), #{required => false, desc => ?DESC("ordering_key")})},
{pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}
]. ];
fields("get") ->
fields("post");
fields("put") ->
fields(config);
fields("post") ->
[
{type, mk(hstreamdb, #{required => true, desc => ?DESC("type")})},
{name, mk(binary(), #{required => true, desc => ?DESC("name")})}
] ++ fields("put").
connector_example(Method) ->
#{
<<"hstreamdb">> => #{
summary => <<"HStreamDB Connector">>,
value => values(Method)
}
}.
values(post) ->
maps:merge(values(put), #{name => <<"connector">>});
values(get) ->
values(post);
values(put) ->
#{
type => hstreamdb,
url => <<"http://127.0.0.1:6570">>,
stream => <<"stream1">>,
ordering_key => <<"some_key">>,
pool_size => 8
};
values(_) ->
#{}.
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% internal functions %% internal functions
@ -94,7 +127,7 @@ start_client(InstId, Config) ->
catch catch
E:R:S -> E:R:S ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "start hstream connector error", msg => "start hstreamdb connector error",
connector => InstId, connector => InstId,
error => E, error => E,
reason => R, reason => R,
@ -104,7 +137,7 @@ start_client(InstId, Config) ->
do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting hstream connector: client", msg => "starting hstreamdb connector: client",
connector => InstId, connector => InstId,
config => Config config => Config
}), }),
@ -118,21 +151,21 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
case is_alive(Client) of case is_alive(Client) of
true -> true ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "hstream connector: client started", msg => "hstreamdb connector: client started",
connector => InstId, connector => InstId,
client => Client client => Client
}), }),
start_producer(InstId, Client, Config); start_producer(InstId, Client, Config);
_ -> _ ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "hstream connector: client not alive", msg => "hstreamdb connector: client not alive",
connector => InstId connector => InstId
}), }),
{error, connect_failed} {error, connect_failed}
end; end;
{error, {already_started, Pid}} -> {error, {already_started, Pid}} ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting hstream connector: client, find old client. restart client", msg => "starting hstreamdb connector: client, find old client. restart client",
old_client_pid => Pid, old_client_pid => Pid,
old_client_name => ClientName old_client_name => ClientName
}), }),
@ -140,7 +173,7 @@ do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) ->
start_client(InstId, Config); start_client(InstId, Config);
{error, Error} -> {error, Error} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "hstream connector: client failed", msg => "hstreamdb connector: client failed",
connector => InstId, connector => InstId,
reason => Error reason => Error
}), }),
@ -155,7 +188,11 @@ is_alive(Client) ->
false false
end. end.
start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSize}) -> start_producer(
InstId,
Client,
Options = #{stream := Stream, pool_size := PoolSize, egress := #{payload := PayloadBin}}
) ->
%% TODO: change these batch options after we have better disk cache. %% TODO: change these batch options after we have better disk cache.
BatchSize = maps:get(batch_size, Options, 100), BatchSize = maps:get(batch_size, Options, 100),
Interval = maps:get(batch_interval, Options, 1000), Interval = maps:get(batch_interval, Options, 1000),
@ -168,16 +205,15 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi
], ],
Name = produce_name(InstId), Name = produce_name(InstId),
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting hstream connector: producer", msg => "starting hstreamdb connector: producer",
connector => InstId connector => InstId
}), }),
case hstreamdb:start_producer(Client, Name, ProducerOptions) of case hstreamdb:start_producer(Client, Name, ProducerOptions) of
{ok, Producer} -> {ok, Producer} ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "hstream connector: producer started" msg => "hstreamdb connector: producer started"
}), }),
EnableBatch = maps:get(enable_batch, Options, false), EnableBatch = maps:get(enable_batch, Options, false),
PayloadBin = maps:get(payload, Options, <<"">>),
Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin), Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin),
OrderingKeyBin = maps:get(ordering_key, Options, <<"">>), OrderingKeyBin = maps:get(ordering_key, Options, <<"">>),
OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin), OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin),
@ -191,7 +227,8 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi
{ok, State}; {ok, State};
{error, {already_started, Pid}} -> {error, {already_started, Pid}} ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "starting hstream connector: producer, find old producer. restart producer", msg =>
"starting hstreamdb connector: producer, find old producer. restart producer",
old_producer_pid => Pid, old_producer_pid => Pid,
old_producer_name => Name old_producer_name => Name
}), }),
@ -199,7 +236,7 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi
start_producer(InstId, Client, Options); start_producer(InstId, Client, Options);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "starting hstream connector: producer, failed", msg => "starting hstreamdb connector: producer, failed",
reason => Reason reason => Reason
}), }),
{error, Reason} {error, Reason}
@ -223,13 +260,13 @@ do_append(AfterQuery, Producer, Record) ->
% case hstreamdb:append(Producer, Record) of % case hstreamdb:append(Producer, Record) of
% ok -> % ok ->
% ?SLOG(debug, #{ % ?SLOG(debug, #{
% msg => "hstream producer async append success", % msg => "hstreamdb producer async append success",
% record => Record % record => Record
% }), % }),
% emqx_resource:query_success(AfterQuery); % emqx_resource:query_success(AfterQuery);
% {error, Reason} -> % {error, Reason} ->
% ?SLOG(error, #{ % ?SLOG(error, #{
% msg => "hstream producer async append failed", % msg => "hstreamdb producer async append failed",
% reason => Reason, % reason => Reason,
% record => Record % record => Record
% }), % }),
@ -241,13 +278,13 @@ do_append(AfterQuery, false, Producer, Record) ->
case hstreamdb:append_flush(Producer, Record) of case hstreamdb:append_flush(Producer, Record) of
{ok, _} -> {ok, _} ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "hstream producer sync append success", msg => "hstreamdb producer sync append success",
record => Record record => Record
}), }),
emqx_resource:query_success(AfterQuery); emqx_resource:query_success(AfterQuery);
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "hstream producer sync append failed", msg => "hstreamdb producer sync append failed",
reason => Reason, reason => Reason,
record => Record record => Record
}), }),