diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index d512df323..d162f0c13 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -45,10 +45,16 @@ http_schema(Method) -> end, ?CONN_TYPES ), - hoconsc:union([ - ref(emqx_bridge_webhook_schema, Method) - | Schemas - ]). + ExtSchemas = [ref(Module, Method) || Module <- schema_modules()], + hoconsc:union(Schemas ++ ExtSchemas). + +-ifdef(EMQX_RELEASE_EDITION). +schema_modules() -> + [emqx_bridge_webhook_schema] ++ emqx_ee_bridge:schema_modules(). +-else. +schema_modules() -> + [emqx_bridge_webhook_schema]. +-endif. common_bridge_fields(ConnectorRef) -> [ diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf new file mode 100644 index 000000000..a110dbf27 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf @@ -0,0 +1,84 @@ +emqx_ee_bridge_hstream { + local_topic { + desc { + en: """ +The MQTT topic filter to be forwarded to the HStreamDB. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """ +发送到 'local_topic' 的消息都会转发到 HStreamDB。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 HStreamDB。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + payload { + desc { + en: """The payload to be forwarded to the HStreamDB. Placeholders supported.""" + zh: """要转发到 HStreamDB 的数据内容,支持占位符""" + } + label { + en: "Payload" + zh: "消息内容" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用 Bridge""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用 Bridge" + } + } + config_direction { + desc { + en: """The direction of this bridge, MUST be 'egress'""" + zh: """Bridge 的方向, 必须是 egress""" + } + label { + en: "Bridge Direction" + zh: "Bridge 方向" + } + } + + desc_config { + desc { + en: """Configuration for an HStreamDB bridge.""" + zh: """HStreamDB Bridge 配置""" + } + label: { + en: "HStreamDB Bridge Configuration" + zh: "HStreamDB Bridge 配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "Bridge 类型" + } + } + + desc_name { + desc { + en: """Bridge name, used as a human-readable description of the bridge.""" + zh: """Bridge 名字,Bridge 的可读描述""" + } + label { + en: "Bridge Name" + zh: "Bridge 名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl new file mode 100644 index 000000000..d18fa1fde --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -0,0 +1,15 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge). + +-export([ + schema_modules/0, + info_example_basic/2 +]). + +schema_modules() -> + [emqx_ee_bridge_hstream]. + +info_example_basic(_Type, _Direction) -> + #{}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl new file mode 100644 index 000000000..6b01d2c0b --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_hstream). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, enum/1]). + +-export([ + roots/0, + fields/1, + desc/1 +]). + +%%====================================================================================== +%% Hocon Schema Definitions +% namespace() -> "bridge". + +roots() -> []. + +fields("config") -> + ExtConfig = [ + {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, + {payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})} + ], + basic_config() ++ ExtConfig; +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for HStream using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +basic_config() -> + Basic = [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})} + ], + emqx_ee_connector_hstream:fields(config) ++ Basic. + +%%====================================================================================== + +type_field() -> + % {type, mk(hstream, #{required => true, desc => ?DESC("desc_type")})}. + {type, mk(hstream, #{required => true, desc => <<"desc_type">>})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf new file mode 100644 index 000000000..d99b09c5f --- /dev/null +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf @@ -0,0 +1,43 @@ + +emqx_ee_connector_hstream { + url { + desc { + en: """Hstream Server URL""" + zh: """HStream 服务器 URL""" + } + label { + en: """Hstream Server URL""" + zh: """HStream 服务器 URL""" + } + } + stream_name { + desc { + en: """Hstream Stream Name""" + zh: """HStream 流名称""" + } + label { + en: """Hstream Stream Name""" + zh: """HStream 流名称""" + } + } + ordering_key { + desc { + en: """Hstream Ordering Key""" + zh: """HStream 分区键""" + } + label { + en: """Hstream Ordering Key""" + zh: """HStream 分区键""" + } + } + pool_size { + desc { + en: """Hstream Pool Size""" + zh: """HStream 连接池大小""" + } + label { + en: """Hstream Pool Size""" + zh: """HStream 连接池大小""" + } + } +} diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index cba9da867..1f16e648e 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -1,5 +1,7 @@ {erl_opts, [debug_info]}. -{deps, []}. +{deps, [ + {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.4"}}} +]}. {shell, [ {apps, [emqx_ee_connector]} diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl new file mode 100644 index 000000000..ea3a73035 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl @@ -0,0 +1,218 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_connector_hstream). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-import(hoconsc, [mk/2, enum/1]). + +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). + +-export([ + on_flush_result/1 +]). + +-export([ + roots/0, + fields/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% resource callback + +on_start(InstId, Config) -> + start_client(InstId, Config). + +on_stop(InstId, #{client := Client, producer := Producer}) -> + StopClientRes = hstreamdb:stop_client(Client), + StopProducerRes = hstreamdb:stop_producer(Producer), + ?SLOG(info, #{ + msg => "stop hstream connector", + connector => InstId, + client => Client, + producer => Producer, + stop_client => StopClientRes, + stop_producer => StopProducerRes + }). + +on_query(_InstId, {OrderingKey, Payload, Record}, AfterQuery, #{producer := Producer}) -> + Record = hstreamdb:to_record(OrderingKey, raw, Payload), + do_append(AfterQuery, false, Producer, Record). + +on_get_status(_InstId, #{client := Client}) -> + is_alive(Client). + +%% ------------------------------------------------------------------------------------------------- +%% hstream batch callback +%% TODO: maybe remove it after disk cache is ready + +on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) -> + ok; +on_flush_result({{flush, _Stream, _Records}, {error, _Reason}}) -> + ok. + +%% ------------------------------------------------------------------------------------------------- +%% schema + +roots() -> + fields(config). + +fields(config) -> + [ + {url, mk(binary(), #{required => true, desc => ?DESC("url")})}, + {stream, mk(binary(), #{required => true, desc => ?DESC("stream")})}, + {ordering_key, mk(binary(), #{required => true, desc => ?DESC("ordering_key")})}, + {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} + ]. + +%% ------------------------------------------------------------------------------------------------- +%% internal functions + +start_client(InstId, Config = #{server := Server, pool_size := PoolSize}) -> + ?SLOG(info, #{ + msg => "starting hstream connector: client", + connector => InstId, + config => Config + }), + ClientName = client_name(InstId), + ClientOptions = [ + {url, Server}, + {rpc_options, #{pool_size => PoolSize}} + ], + case hstreamdb:start_client(ClientName, ClientOptions) of + {ok, Client} -> + case is_alive(Client) of + true -> + ?SLOG(info, #{ + msg => "hstream connector: client started", + connector => InstId, + client => Client + }), + start_producer(InstId, Client, Config); + _ -> + ?SLOG(error, #{ + msg => "hstream connector: client not alive", + connector => InstId + }), + {error, connect_failed} + end; + {error, {already_started, Pid}} -> + ?SLOG(info, #{ + msg => "starting hstream connector: client, find old client. restart client", + old_client_pid => Pid, + old_client_name => ClientName + }), + _ = hstreamdb:stop_client(ClientName), + start_client(InstId, Config); + {error, Error} -> + ?SLOG(error, #{ + msg => "hstream connector: client failed", + connector => InstId, + reason => Error + }), + {error, Error} + end. + +is_alive(Client) -> + case hstreamdb:echo(Client) of + {ok, _Echo} -> + true; + _ErrorEcho -> + false + end. + +start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSize}) -> + %% TODO: change these batch options after we have better disk cache. + BatchSize = maps:get(batch_size, Options, 100), + Interval = maps:get(batch_interval, Options, 1000), + ProducerOptions = [ + {stream, Stream}, + {callback, {?MODULE, on_flush_result, []}}, + {max_records, BatchSize}, + {interval, Interval}, + {pool_size, PoolSize} + ], + Name = produce_name(InstId), + ?SLOG(info, #{ + msg => "starting hstream connector: producer", + connector => InstId + }), + case hstreamdb:start_producer(Client, Name, ProducerOptions) of + {ok, Producer} -> + ?SLOG(info, #{ + msg => "hstream connector: producer started" + }), + EnableBatch = maps:get(enable_batch, Options, false), + #{client => Client, producer => Producer, enable_batch => EnableBatch}; + {error, {already_started, Pid}} -> + ?SLOG(info, #{ + msg => "starting hstream connector: producer, find old producer. restart producer", + old_producer_pid => Pid, + old_producer_name => Name + }), + _ = hstreamdb:stop_producer(Name), + start_producer(InstId, Client, Options); + {error, Reason} -> + ?SLOG(error, #{ + msg => "starting hstream connector: producer, failed", + reason => Reason + }), + {error, Reason} + end. + +%% TODO: this append is async, remove or change it after we have better disk cache. +% do_append(AfterQuery, true, Producer, Record) -> +% case hstreamdb:append(Producer, Record) of +% ok -> +% ?SLOG(debug, #{ +% msg => "hstream producer async append success", +% record => Record +% }), +% emqx_resource:query_success(AfterQuery); +% {error, Reason} -> +% ?SLOG(error, #{ +% msg => "hstream producer async append failed", +% reason => Reason, +% record => Record +% }), +% emqx_resource:query_failed(AfterQuery) +% end; +do_append(AfterQuery, false, Producer, Record) -> + %% TODO: this append is sync, but it does not support [Record], can only append one Record. + %% Change it after we have better dick cache. + case hstreamdb:append_flush(Producer, Record) of + {ok, _} -> + ?SLOG(debug, #{ + msg => "hstream producer sync append success", + record => Record + }), + emqx_resource:query_success(AfterQuery); + {error, Reason} -> + ?SLOG(error, #{ + msg => "hstream producer sync append failed", + reason => Reason, + record => Record + }), + emqx_resource:query_failed(AfterQuery) + end. + +client_name(InstId) -> + "backend_hstream_client:" ++ to_string(InstId). + +produce_name(ActionId) -> + list_to_atom("backend_hstream_producer:" ++ to_string(ActionId)). + +to_string(List) when is_list(List) -> List; +to_string(Bin) when is_binary(Bin) -> binary_to_list(Bin); +to_string(Atom) when is_atom(Atom) -> atom_to_list(Atom).