From 98b36c468167135300e069989b1553db8fc40331 Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Mon, 25 Jul 2022 19:03:53 +0800 Subject: [PATCH] fix: hstream db connector , TODO: start apps --- apps/emqx_bridge/src/emqx_bridge_api.erl | 16 +++++-- apps/emqx_bridge/src/emqx_bridge_resource.erl | 24 ++++++++-- apps/emqx_bridge/src/emqx_bridge_schema.erl | 10 +++- .../src/emqx_resource_manager.erl | 2 +- .../emqx_ee_bridge/include/emqx_ee_bridge.hrl | 18 ++++++++ lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 31 +++++++++++-- .../src/emqx_ee_bridge_hstream.erl | 46 ++++++++++++++++--- .../i18n/emqx_ee_connector_hstream.conf | 2 +- .../src/emqx_ee_connector.app.src | 3 +- .../src/emqx_ee_connector_hstream.erl | 23 ++++++++-- scripts/merge-i18n.escript | 2 +- 11 files changed, 151 insertions(+), 26 deletions(-) create mode 100644 lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index bc9b6c5a2..0d3facc5c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -163,7 +163,7 @@ bridge_info_examples(Method) -> }). conn_bridge_examples(Method) -> - lists:foldl( + Fun = fun(Type, Acc) -> SType = atom_to_list(Type), KeyIngress = bin(SType ++ "_ingress"), @@ -179,9 +179,17 @@ conn_bridge_examples(Method) -> } }) end, - #{}, - ?CONN_TYPES - ). + Broker = lists:foldl(Fun, #{}, ?CONN_TYPES), + EE = ee_conn_bridge_examples(Method), + maps:merge(Broker, EE). + +-ifdef(EMQX_RELEASE_EDITION). +ee_conn_bridge_examples(Method) -> + emqx_ee_bridge:conn_bridge_examples(Method). +-else. +ee_conn_bridge_examples(_Method) -> + #{}. +-endif. info_example(Type, Direction, Method) -> maps:merge( diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 678aa1f10..c82a290b3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -42,10 +42,18 @@ reset_metrics/1 ]). +-ifdef(EMQX_RELEASE_EDITION). +bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; +bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; +bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; +bridge_to_resource_type(webhook) -> emqx_connector_http; +bridge_to_resource_type(BridgeType) -> emqx_ee_bridge:resource_type(BridgeType). +-else. bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; bridge_to_resource_type(mqtt) -> emqx_connector_mqtt; bridge_to_resource_type(<<"webhook">>) -> emqx_connector_http; bridge_to_resource_type(webhook) -> emqx_connector_http. +-endif. resource_id(BridgeId) when is_binary(BridgeId) -> <<"bridge:", BridgeId/binary>>. @@ -254,7 +262,7 @@ parse_confs( request_timeout => ReqTimeout } }; -parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) when +parse_confs(Type = mqtt, Name, #{connector := ConnId, direction := Direction} = Conf) when is_binary(ConnId) -> case emqx_connector:parse_connector_id(ConnId) of @@ -270,7 +278,7 @@ parse_confs(Type, Name, #{connector := ConnId, direction := Direction} = Conf) w {_ConnType, _ConnName} -> error({cannot_use_connector_with_different_type, ConnId}) end; -parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when +parse_confs(Type = mqtt, Name, #{connector := ConnectorConfs, direction := Direction} = Conf) when is_map(ConnectorConfs) -> make_resource_confs( @@ -279,7 +287,17 @@ parse_confs(Type, Name, #{connector := ConnectorConfs, direction := Direction} = maps:without([connector, direction], Conf), Type, Name - ). + ); +parse_confs(Type, Name, Conf) -> + parse_enterprise_confs(Type, Name, Conf). + +-ifdef(EMQX_RELEASE_EDITION). +parse_enterprise_confs(Type, Name, Conf) -> + emqx_ee_bridge:parse_conf(Type, Name, Conf). +-else. +parse_enterprise_confs(Type, Name, Conf) -> + error({not_supported, Type, Name}). +-endif. make_resource_confs(ingress, ConnectorConfs, BridgeConf, Type, Name) -> BName = bridge_id(Type, Name), diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index d162f0c13..de27cd34c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -133,7 +133,7 @@ fields(bridges) -> #{desc => ?DESC("bridges_name")} )} || T <- ?CONN_TYPES - ]; + ] ++ ee_fields_bridges(); fields("metrics") -> [ {"matched", mk(integer(), #{desc => ?DESC("metric_matched")})}, @@ -158,6 +158,14 @@ fields("node_status") -> {"status", mk(status(), #{})} ]. +-ifdef(EMQX_RELEASE_EDITION). +ee_fields_bridges() -> + emqx_ee_bridge:fields(bridges). +-else. +ee_fields_bridges() -> + []. +-endif. + desc(bridges) -> ?DESC("desc_bridges"); desc("metrics") -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 3a1afd27c..82be9c58f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -214,7 +214,7 @@ get_metrics(ResId) -> reset_metrics(ResId) -> emqx_metrics_worker:reset_metrics(resource_metrics, ResId). -%% @doc Returns the data for all resorces +%% @doc Returns the data for all resources -spec list_all() -> [resource_data()] | []. list_all() -> try diff --git a/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl b/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl new file mode 100644 index 000000000..0065db56b --- /dev/null +++ b/lib-ee/emqx_ee_bridge/include/emqx_ee_bridge.hrl @@ -0,0 +1,18 @@ +-define(METRICS(MATCH, SUCC, FAILED, RATE, RATE_5, RATE_MAX), #{ + matched => MATCH, + success => SUCC, + failed => FAILED, + rate => RATE, + rate_last5m => RATE_5, + rate_max => RATE_MAX +}). + +-define(METRICS_EXAMPLE, #{ + metrics => ?METRICS(0, 0, 0, 0, 0, 0), + node_metrics => [ + #{ + node => node(), + metrics => ?METRICS(0, 0, 0, 0, 0, 0) + } + ] +}). diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index d18fa1fde..a94089c21 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -3,13 +3,38 @@ %%-------------------------------------------------------------------- -module(emqx_ee_bridge). +-import(hoconsc, [mk/2, enum/1, ref/2]). + -export([ schema_modules/0, - info_example_basic/2 + conn_bridge_examples/1, + resource_type/1, + parse_conf/3, + fields/1 ]). schema_modules() -> [emqx_ee_bridge_hstream]. -info_example_basic(_Type, _Direction) -> - #{}. +conn_bridge_examples(Method) -> + Fun = + fun(Module, Examples) -> + Example = erlang:apply(Module, conn_bridge_example, [Method]), + maps:merge(Examples, Example) + end, + lists:foldl(Fun, #{}, schema_modules()). + +resource_type(hstream) -> emqx_ee_connector_hstream; +resource_type(<<"hstream">>) -> emqx_ee_connector_hstream. + +parse_conf(_Type, _Name, Conf) -> + Conf. + +fields(bridges) -> + [ + {hstream, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), + #{desc => <<"hstream_webhook">>} + )} + ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl index 6b01d2c0b..42dd86f14 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl @@ -5,18 +5,53 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include("emqx_ee_bridge.hrl"). -import(hoconsc, [mk/2, enum/1]). -export([ + conn_bridge_example/1 +]). + +-export([ + namespace/0, roots/0, fields/1, desc/1 ]). -%%====================================================================================== +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_example(Method) -> + #{ + <<"hstream">> => #{ + summary => <<"HStreamDB Bridge">>, + value => values(Method) + } + }. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + type => hstream, + name => <<"hstream_bridge_demo">>, + url => <<"http://127.0.0.1:6570">>, + stream => <<"stream1">>, + ordering_key => <<"${topic}">>, + pool_size => 8, + enable => true, + direction => egress, + local_topic => <<"local/topic/#">>, + payload => <<"${payload}">> + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions -% namespace() -> "bridge". +namespace() -> "bridge". roots() -> []. @@ -47,11 +82,10 @@ basic_config() -> ], emqx_ee_connector_hstream:fields(config) ++ Basic. -%%====================================================================================== - +%% ------------------------------------------------------------------------------------------------- +%% internal type_field() -> - % {type, mk(hstream, #{required => true, desc => ?DESC("desc_type")})}. - {type, mk(hstream, #{required => true, desc => <<"desc_type">>})}. + {type, mk(enum([hstream]), #{required => true, desc => ?DESC("desc_type")})}. name_field() -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf index d99b09c5f..605944997 100644 --- a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_hstream.conf @@ -38,6 +38,6 @@ emqx_ee_connector_hstream { label { en: """Hstream Pool Size""" zh: """HStream 连接池大小""" - } + } } } diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 87f68afb6..214488f80 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -5,7 +5,8 @@ {mod, {emqx_ee_connector_app, []}}, {applications, [ kernel, - stdlib + stdlib, + hstreamdb_erl ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl index ea3a73035..7ba92f809 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl @@ -51,7 +51,12 @@ on_query(_InstId, {OrderingKey, Payload, Record}, AfterQuery, #{producer := Prod do_append(AfterQuery, false, Producer, Record). on_get_status(_InstId, #{client := Client}) -> - is_alive(Client). + case is_alive(Client) of + true -> + connected; + false -> + disconnected + end. %% ------------------------------------------------------------------------------------------------- %% hstream batch callback @@ -71,15 +76,23 @@ roots() -> fields(config) -> [ {url, mk(binary(), #{required => true, desc => ?DESC("url")})}, - {stream, mk(binary(), #{required => true, desc => ?DESC("stream")})}, + {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, {ordering_key, mk(binary(), #{required => true, desc => ?DESC("ordering_key")})}, {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})} ]. %% ------------------------------------------------------------------------------------------------- %% internal functions +start_client(InstId, Config) -> + try + do_start_client(InstId, Config) + catch + E:R:S -> + io:format("E:R:S ~p:~p ~n~p~n", [E, R, S]), + error(E) + end. -start_client(InstId, Config = #{server := Server, pool_size := PoolSize}) -> +do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> ?SLOG(info, #{ msg => "starting hstream connector: client", connector => InstId, @@ -87,7 +100,7 @@ start_client(InstId, Config = #{server := Server, pool_size := PoolSize}) -> }), ClientName = client_name(InstId), ClientOptions = [ - {url, Server}, + {url, binary_to_list(Server)}, {rpc_options, #{pool_size => PoolSize}} ], case hstreamdb:start_client(ClientName, ClientOptions) of @@ -154,7 +167,7 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi msg => "hstream connector: producer started" }), EnableBatch = maps:get(enable_batch, Options, false), - #{client => Client, producer => Producer, enable_batch => EnableBatch}; + {ok, #{client => Client, producer => Producer, enable_batch => EnableBatch}}; {error, {already_started, Pid}} -> ?SLOG(info, #{ msg => "starting hstream connector: producer, find old producer. restart producer", diff --git a/scripts/merge-i18n.escript b/scripts/merge-i18n.escript index 9f8ac91ff..493798738 100755 --- a/scripts/merge-i18n.escript +++ b/scripts/merge-i18n.escript @@ -4,7 +4,7 @@ main(_) -> BaseConf = <<"">>, - Cfgs = get_all_cfgs("apps/"), + Cfgs = get_all_cfgs("apps/") ++ get_all_cfgs("lib-ee/"), Conf = [merge(BaseConf, Cfgs), io_lib:nl() ],