diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 70550efe4..fe19ed066 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "An OTP application"}, - {vsn, "0.1.0"}, + {vsn, "0.1.1"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index bc9b6c5a2..e8eb91e57 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -163,7 +163,7 @@ bridge_info_examples(Method) -> }). conn_bridge_examples(Method) -> - lists:foldl( + Fun = fun(Type, Acc) -> SType = atom_to_list(Type), KeyIngress = bin(SType ++ "_ingress"), @@ -179,9 +179,17 @@ conn_bridge_examples(Method) -> } }) end, - #{}, - ?CONN_TYPES - ). + Broker = lists:foldl(Fun, #{}, ?CONN_TYPES), + EE = ee_conn_bridge_examples(Method), + maps:merge(Broker, EE). + +-if(?EMQX_RELEASE_EDITION == ee). +ee_conn_bridge_examples(Method) -> + emqx_ee_bridge:conn_bridge_examples(Method). +-else. +ee_conn_bridge_examples(_Method) -> + #{}. +-endif. info_example(Type, Direction, Method) -> maps:merge( diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 3fc4d57ba..cac6ab1e6 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -29,6 +29,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_bridge_sup:start_link(), + ok = start_ee_apps(), ok = emqx_bridge:load(), ok = emqx_bridge:load_hook(), ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE), @@ -41,6 +42,16 @@ stop(_State) -> ok = emqx_bridge:unload_hook(), ok. +-if(?EMQX_RELEASE_EDITION == ee). +start_ee_apps() -> + {ok, _} = application:ensure_all_started(emqx_ee_bridge), + {ok, _} = application:ensure_all_started(emqx_ee_connector), + ok. +-else. +start_ee_apps() -> + ok. +-endif. + %% NOTE: We depends on the `emqx_bridge:pre_config_update/3` to restart/stop the %% underlying resources. pre_config_update(_, {_Oper, _, _}, undefined) -> diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 678aa1f10..cbd337970 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -42,10 +42,18 @@ reset_metrics/1 ]). +-if(?EMQX_RELEASE_EDITION == ee). +bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; +bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; +bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; +bridge_to_resource_type(webhook) -> emqx_connector_http; +bridge_to_resource_type(BridgeType) -> emqx_ee_bridge:resource_type(BridgeType). +-else. bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; bridge_to_resource_type(webhook) -> emqx_connector_http. +-endif. resource_id(BridgeId) when is_binary(BridgeId) -> <<"bridge:", BridgeId/binary>>. @@ -231,7 +239,7 @@ is_tmp_path(TmpPath, File) -> string:str(str(File), str(TmpPath)) > 0. parse_confs( - webhook, + Type, _Name, #{ url := Url, @@ -240,7 +248,7 @@ parse_confs( headers := Headers, request_timeout := ReqTimeout } = Conf -) -> +) when Type == webhook orelse Type == <<"webhook">> -> {BaseUrl, Path} = parse_url(Url), {ok, BaseUrl2} = emqx_http_lib:uri_parse(BaseUrl), Conf#{ @@ -279,7 +287,9 @@ parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = maps:without([connector, direction], Conf), Type, Name - ). + ); +parse_confs(_Type, _Name, Conf) -> + Conf. make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> BName = bridge_id(Type, Name), diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index d512df323..0b885db8c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -45,10 +45,16 @@ http_schema(Method) -> end, ?CONN_TYPES ), - hoconsc:union([ - ref(emqx_bridge_webhook_schema, Method) - | Schemas - ]). + ExtSchemas = [ref(Module, Method) || Module <- schema_modules()], + hoconsc:union(Schemas ++ ExtSchemas). + +-if(?EMQX_RELEASE_EDITION == ee). +schema_modules() -> + [emqx_bridge_webhook_schema] ++ emqx_ee_bridge:schema_modules(). +-else. +schema_modules() -> + [emqx_bridge_webhook_schema]. +-endif. common_bridge_fields(ConnectorRef) -> [ @@ -127,7 +133,7 @@ fields(bridges) -> #{desc => ?DESC("bridges_name")} )} || T <- ?CONN_TYPES - ]; + ] ++ ee_fields_bridges(); fields("metrics") -> [ {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, @@ -152,6 +158,14 @@ fields("node_status") -> {"status", mk(status(), #{})} ]. +-if(?EMQX_RELEASE_EDITION == ee). +ee_fields_bridges() -> + emqx_ee_bridge:fields(bridges). +-else. +ee_fields_bridges() -> + []. +-endif. + desc(bridges) -> ?DESC("desc_bridges"); desc("metrics") -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 3a1afd27c..82be9c58f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -214,7 +214,7 @@ get_metrics(ResId) -> reset_metrics(ResId) -> emqx_metrics_worker:reset_metrics(resource_metrics, ResId). -%% @doc Returns the data for all resorces +%% @doc Returns the data for all resources -spec list_all() -> [resource_data()] | []. list_all() -> try diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf new file mode 100644 index 000000000..a110dbf27 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_hstream.conf @@ -0,0 +1,84 @@ +emqx_ee_bridge_hstream { + local_topic { + desc { + en: """ +The MQTT topic filter to be forwarded to the HStreamDB. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """ +发送到 'local_topic' 的消息都会转发到 HStreamDB。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 HStreamDB。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + payload { + desc { + en: """The payload to be forwarded to the HStreamDB. Placeholders supported.""" + zh: """要转发到 HStreamDB 的数据内容,支持占位符""" + } + label { + en: "Payload" + zh: "消息内容" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用 Bridge""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用 Bridge" + } + } + config_direction { + desc { + en: """The direction of this bridge, MUST be 'egress'""" + zh: """Bridge 的方向, 必须是 egress""" + } + label { + en: "Bridge Direction" + zh: "Bridge 方向" + } + } + + desc_config { + desc { + en: """Configuration for an HStreamDB bridge.""" + zh: """HStreamDB Bridge 配置""" + } + label: { + en: "HStreamDB Bridge Configuration" + zh: "HStreamDB Bridge 配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "Bridge 类型" + } + } + + desc_name { + desc { + en: """Bridge name, used as a human-readable description of the bridge.""" + zh: """Bridge 名字,Bridge 的可读描述""" + } + label { + en: "Bridge Name" + zh: "Bridge 名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl b/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl new file mode 100644 index 000000000..0065db56b --- /dev/null +++ b/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl @@ -0,0 +1,18 @@ +-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ + matched => MATCH, + success => SUCC, + failed => FAILED, + rate => RATE, + rate_last5m => RATE_5, + rate_max => RATE_MAX +}). + +-define(METRICS_EXAMPLE, #{ + metrics => ?METRICS(0, 0, 0, 0, 0, 0), + node_metrics => [ + #{ + node => node(), + metrics => ?METRICS(0, 0, 0, 0, 0, 0) + } + ] +}). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index 58846577e..a578b7d0d 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -1,8 +1,6 @@ {application, emqx_ee_bridge, [ - {description, "An OTP application"}, {vsn, "0.1.0"}, {registered, []}, - {mod, {emqx_ee_bridge_app, []}}, {applications, [ kernel, stdlib diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl new file mode 100644 index 000000000..7f051a158 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -0,0 +1,36 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + schema_modules/0, + conn_bridge_examples/1, + resource_type/1, + fields/1 +]). + +schema_modules() -> + [emqx_ee_bridge_hstream]. + +conn_bridge_examples(Method) -> + Fun = + fun(Module, Examples) -> + Example = erlang:apply(Module, conn_bridge_example, [Method]), + maps:merge(Examples, Example) + end, + lists:foldl(Fun, #{}, schema_modules()). + +resource_type(hstream) -> emqx_ee_connector_hstream; +resource_type(<<"hstream">>) -> emqx_ee_connector_hstream. + +fields(bridges) -> + [ + {hstream, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), + #{desc => <<"EMQX Enterprise Config">>} + )} + ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_app.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_app.erl deleted file mode 100644 index 22268e285..000000000 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_app.erl +++ /dev/null @@ -1,17 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_ee_bridge_app). - --behaviour(application). - --export([start/2, stop/1]). - -start(_StartType, _StartArgs) -> - emqx_ee_bridge_sup:start_link(). - -stop(_State) -> - ok. - -%% internal functions diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl new file mode 100644 index 000000000..56bac456e --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl @@ -0,0 +1,91 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_hstream). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_ee_bridge.hrl"). + +-import(hoconsc, [mk/2, enum/1]). + +-export([ + conn_bridge_example/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_example(Method) -> + #{ + <<"hstream">> => #{ + summary => <<"HStreamDB Bridge">>, + value => values(Method) + } + }. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + type => hstream, + name => <<"demo">>, + url => <<"http://127.0.0.1:6570">>, + stream => <<"stream1">>, + ordering_key => <<"some_key">>, + pool_size => 8, + enable => true, + direction => egress, + local_topic => <<"local/topic/#">>, + payload => <<"${payload}">> + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge". + +roots() -> []. + +fields("config") -> + ExtConfig = [ + {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, + {payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})} + ], + basic_config() ++ ExtConfig; +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:metrics_status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for HStream using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +basic_config() -> + Basic = [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})} + ], + emqx_ee_connector_hstream:fields(config) ++ Basic. + +%% ------------------------------------------------------------------------------------------------- +%% internal +type_field() -> + {type, mk(enum([hstream]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sup.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sup.erl deleted file mode 100644 index 5a2484442..000000000 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_sup.erl +++ /dev/null @@ -1,27 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_ee_bridge_sup). - --behaviour(supervisor). - --export([start_link/0]). - --export([init/1]). - --define(SERVER, ?MODULE). - -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). - -init([]) -> - SupFlags = #{ - strategy => one_for_all, - intensity => 0, - period => 1 - }, - ChildSpecs = [], - {ok, {SupFlags, ChildSpecs}}. - -%% internal functions diff --git a/lib-ee/emqx_ee_bridge/test/ee_bridge_hstream_SUITE.erl b/lib-ee/emqx_ee_bridge/test/ee_bridge_hstream_SUITE.erl new file mode 100644 index 000000000..d03c13cac --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/ee_bridge_hstream_SUITE.erl @@ -0,0 +1,16 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(ee_bridge_hstream_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +%% TODO: diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf new file mode 100644 index 000000000..fce00baed --- /dev/null +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf @@ -0,0 +1,43 @@ + +emqx_ee_connector_hstream { + url { + desc { + en: """HStream Server URL""" + zh: """HStream 服务器 URL""" + } + label { + en: """HStream Server URL""" + zh: """HStream 服务器 URL""" + } + } + stream_name { + desc { + en: """HStream Stream Name""" + zh: """HStream 流名称""" + } + label { + en: """HStream Stream Name""" + zh: """HStream 流名称""" + } + } + ordering_key { + desc { + en: """HStream Ordering Key""" + zh: """HStream 分区键""" + } + label { + en: """HStream Ordering Key""" + zh: """HStream 分区键""" + } + } + pool_size { + desc { + en: """HStream Pool Size""" + zh: """HStream 连接池大小""" + } + label { + en: """HStream Pool Size""" + zh: """HStream 连接池大小""" + } + } +} diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index cba9da867..38194cbf5 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -1,5 +1,7 @@ {erl_opts, [debug_info]}. -{deps, []}. +{deps, [ + {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}} +]}. {shell, [ {apps, [emqx_ee_connector]} diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 87f68afb6..6d9fea3c9 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,11 +1,10 @@ {application, emqx_ee_connector, [ - {description, "An OTP application"}, {vsn, "0.1.0"}, {registered, []}, - {mod, {emqx_ee_connector_app, []}}, {applications, [ kernel, - stdlib + stdlib, + hstreamdb_erl ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_app.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_app.erl deleted file mode 100644 index 13db2ccb0..000000000 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_app.erl +++ /dev/null @@ -1,17 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_ee_connector_app). - --behaviour(application). - --export([start/2, stop/1]). - -start(_StartType, _StartArgs) -> - emqx_ee_connector_sup:start_link(). - -stop(_State) -> - ok. - -%% internal functions diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl new file mode 100644 index 000000000..fa8439851 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl @@ -0,0 +1,265 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_connector_hstream). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-import(hoconsc, [mk/2, enum/1]). + +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + on_start/2, + on_stop/2, + on_query/4, + on_get_status/2 +]). + +-export([ + on_flush_result/1 +]). + +-export([ + roots/0, + fields/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% resource callback + +on_start(InstId, Config) -> + start_client(InstId, Config). + +on_stop(InstId, #{client := Client, producer := Producer}) -> + StopClientRes = hstreamdb:stop_client(Client), + StopProducerRes = hstreamdb:stop_producer(Producer), + ?SLOG(info, #{ + msg => "stop hstream connector", + connector => InstId, + client => Client, + producer => Producer, + stop_client => StopClientRes, + stop_producer => StopProducerRes + }). + +on_query( + _InstId, + {send_message, Data}, + AfterQuery, + #{producer := Producer, ordering_key := OrderingKey, payload := Payload} +) -> + Record = to_record(OrderingKey, Payload, Data), + do_append(AfterQuery, Producer, Record). + +on_get_status(_InstId, #{client := Client}) -> + case is_alive(Client) of + true -> + connected; + false -> + disconnected + end. + +%% ------------------------------------------------------------------------------------------------- +%% hstream batch callback +%% TODO: maybe remove it after disk cache is ready + +on_flush_result({{flush, _Stream, _Records}, {ok, _Resp}}) -> + ok; +on_flush_result({{flush, _Stream, _Records}, {error, _Reason}}) -> + ok. + +%% ------------------------------------------------------------------------------------------------- +%% schema + +roots() -> + fields(config). + +fields(config) -> + [ + {url, mk(binary(), #{required => true, desc => ?DESC("url")})}, + {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, + {ordering_key, mk(binary(), #{required => true, desc => ?DESC("ordering_key")})}, + {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} + ]. + +%% ------------------------------------------------------------------------------------------------- +%% internal functions +start_client(InstId, Config) -> + try + do_start_client(InstId, Config) + catch + E:R:S -> + ?SLOG(error, #{ + msg => "start hstream connector error", + connector => InstId, + error => E, + reason => R, + stack => S + }) + end. + +do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> + ?SLOG(info, #{ + msg => "starting hstream connector: client", + connector => InstId, + config => Config + }), + ClientName = client_name(InstId), + ClientOptions = [ + {url, binary_to_list(Server)}, + {rpc_options, #{pool_size => PoolSize}} + ], + case hstreamdb:start_client(ClientName, ClientOptions) of + {ok, Client} -> + case is_alive(Client) of + true -> + ?SLOG(info, #{ + msg => "hstream connector: client started", + connector => InstId, + client => Client + }), + start_producer(InstId, Client, Config); + _ -> + ?SLOG(error, #{ + msg => "hstream connector: client not alive", + connector => InstId + }), + {error, connect_failed} + end; + {error, {already_started, Pid}} -> + ?SLOG(info, #{ + msg => "starting hstream connector: client, find old client. restart client", + old_client_pid => Pid, + old_client_name => ClientName + }), + _ = hstreamdb:stop_client(ClientName), + start_client(InstId, Config); + {error, Error} -> + ?SLOG(error, #{ + msg => "hstream connector: client failed", + connector => InstId, + reason => Error + }), + {error, Error} + end. + +is_alive(Client) -> + case hstreamdb:echo(Client) of + {ok, _Echo} -> + true; + _ErrorEcho -> + false + end. + +start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSize}) -> + %% TODO: change these batch options after we have better disk cache. + BatchSize = maps:get(batch_size, Options, 100), + Interval = maps:get(batch_interval, Options, 1000), + ProducerOptions = [ + {stream, Stream}, + {callback, {?MODULE, on_flush_result, []}}, + {max_records, BatchSize}, + {interval, Interval}, + {pool_size, PoolSize} + ], + Name = produce_name(InstId), + ?SLOG(info, #{ + msg => "starting hstream connector: producer", + connector => InstId + }), + case hstreamdb:start_producer(Client, Name, ProducerOptions) of + {ok, Producer} -> + ?SLOG(info, #{ + msg => "hstream connector: producer started" + }), + EnableBatch = maps:get(enable_batch, Options, false), + PayloadBin = maps:get(payload, Options, <<"">>), + Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin), + OrderingKeyBin = maps:get(ordering_key, Options, <<"">>), + OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin), + State = #{ + client => Client, + producer => Producer, + enable_batch => EnableBatch, + ordering_key => OrderingKey, + payload => Payload + }, + {ok, State}; + {error, {already_started, Pid}} -> + ?SLOG(info, #{ + msg => "starting hstream connector: producer, find old producer. restart producer", + old_producer_pid => Pid, + old_producer_name => Name + }), + _ = hstreamdb:stop_producer(Name), + start_producer(InstId, Client, Options); + {error, Reason} -> + ?SLOG(error, #{ + msg => "starting hstream connector: producer, failed", + reason => Reason + }), + {error, Reason} + end. + +to_record(OrderingKeyTmpl, PayloadTmpl, Data) -> + OrderingKey = emqx_plugin_libs_rule:proc_tmpl(OrderingKeyTmpl, Data), + Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTmpl, Data), + to_record(OrderingKey, Payload). + +to_record(OrderingKey, Payload) when is_binary(OrderingKey) -> + to_record(binary_to_list(OrderingKey), Payload); +to_record(OrderingKey, Payload) -> + hstreamdb:to_record(OrderingKey, raw, Payload). + +do_append(AfterQuery, Producer, Record) -> + do_append(AfterQuery, false, Producer, Record). + +%% TODO: this append is async, remove or change it after we have better disk cache. +% do_append(AfterQuery, true, Producer, Record) -> +% case hstreamdb:append(Producer, Record) of +% ok -> +% ?SLOG(debug, #{ +% msg => "hstream producer async append success", +% record => Record +% }), +% emqx_resource:query_success(AfterQuery); +% {error, Reason} -> +% ?SLOG(error, #{ +% msg => "hstream producer async append failed", +% reason => Reason, +% record => Record +% }), +% emqx_resource:query_failed(AfterQuery) +% end; +do_append(AfterQuery, false, Producer, Record) -> + %% TODO: this append is sync, but it does not support [Record], can only append one Record. + %% Change it after we have better dick cache. + case hstreamdb:append_flush(Producer, Record) of + {ok, _} -> + ?SLOG(debug, #{ + msg => "hstream producer sync append success", + record => Record + }), + emqx_resource:query_success(AfterQuery); + {error, Reason} -> + ?SLOG(error, #{ + msg => "hstream producer sync append failed", + reason => Reason, + record => Record + }), + emqx_resource:query_failed(AfterQuery) + end. + +client_name(InstId) -> + "client:" ++ to_string(InstId). + +produce_name(ActionId) -> + list_to_atom("producer:" ++ to_string(ActionId)). + +to_string(List) when is_list(List) -> List; +to_string(Bin) when is_binary(Bin) -> binary_to_list(Bin); +to_string(Atom) when is_atom(Atom) -> atom_to_list(Atom). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sup.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sup.erl deleted file mode 100644 index f8a219fdc..000000000 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_sup.erl +++ /dev/null @@ -1,27 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. -%%-------------------------------------------------------------------- - --module(emqx_ee_connector_sup). - --behaviour(supervisor). - --export([start_link/0]). - --export([init/1]). - --define(SERVER, ?MODULE). - -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). - -init([]) -> - SupFlags = #{ - strategy => one_for_all, - intensity => 0, - period => 1 - }, - ChildSpecs = [], - {ok, {SupFlags, ChildSpecs}}. - -%% internal functions diff --git a/lib-ee/emqx_ee_connector/test/ee_connector_hstream_SUITE.erl b/lib-ee/emqx_ee_connector/test/ee_connector_hstream_SUITE.erl new file mode 100644 index 000000000..cebe77c6a --- /dev/null +++ b/lib-ee/emqx_ee_connector/test/ee_connector_hstream_SUITE.erl @@ -0,0 +1,16 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(ee_connector_hstream_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +%% TODO: diff --git a/mix.exs b/mix.exs index 714279cfd..2408b68f3 100644 --- a/mix.exs +++ b/mix.exs @@ -88,7 +88,8 @@ defmodule EMQXUmbrella.MixProject do {:ranch, github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true}, # in conflict by grpc and eetcd - {:gpb, "4.11.2", override: true, runtime: false} + {:gpb, "4.11.2", override: true, runtime: false}, + {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"} ] ++ umbrella_apps() ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep() end @@ -215,7 +216,9 @@ defmodule EMQXUmbrella.MixProject do if(edition_type == :enterprise, do: [ emqx_license: :permanent, - emqx_enterprise_conf: :load + emqx_enterprise_conf: :load, + emqx_ee_connector: :permanent, + emqx_ee_bridge: :permanent ], else: [] ) diff --git a/scripts/merge-i18n.escript b/scripts/merge-i18n.escript index 9f8ac91ff..493798738 100755 --- a/scripts/merge-i18n.escript +++ b/scripts/merge-i18n.escript @@ -4,7 +4,7 @@ main(_) -> BaseConf = <<"">>, - Cfgs = get_all_cfgs("apps/"), + Cfgs = get_all_cfgs("apps/") ++ get_all_cfgs("lib-ee/"), Conf = [merge(BaseConf, Cfgs), io_lib:nl() ],