diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index 0b885db8c..46039390d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -36,24 +36,31 @@ post_request() -> http_schema("post"). http_schema(Method) -> - Schemas = lists:flatmap( - fun(Type) -> - [ - ref(schema_mod(Type), Method ++ "_ingress"), - ref(schema_mod(Type), Method ++ "_egress") - ] - end, - ?CONN_TYPES - ), - ExtSchemas = [ref(Module, Method) || Module <- schema_modules()], - hoconsc:union(Schemas ++ ExtSchemas). + Broker = + lists:flatmap( + fun(Type) -> + [ + ref(schema_mod(Type), Method ++ "_ingress"), + ref(schema_mod(Type), Method ++ "_egress") + ] + end, + ?CONN_TYPES + ) ++ [ref(Module, Method) || Module <- [emqx_bridge_webhook_schema]], + EE = ee_schemas(Method), + hoconsc:union(Broker ++ EE). -if(?EMQX_RELEASE_EDITION == ee). -schema_modules() -> - [emqx_bridge_webhook_schema] ++ emqx_ee_bridge:schema_modules(). +ee_schemas(Method) -> + emqx_ee_bridge:api_schemas(Method). + +ee_fields_bridges() -> + emqx_ee_bridge:fields(bridges). -else. -schema_modules() -> - [emqx_bridge_webhook_schema]. +ee_schemas(_) -> + []. + +ee_fields_bridges() -> + []. -endif. common_bridge_fields(ConnectorRef) -> @@ -158,14 +165,6 @@ fields("node_status") -> {"status", mk(status(), #{})} ]. --if(?EMQX_RELEASE_EDITION == ee). -ee_fields_bridges() -> - emqx_ee_bridge:fields(bridges). --else. -ee_fields_bridges() -> - []. --endif. - desc(bridges) -> ?DESC("desc_bridges"); desc("metrics") -> diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf index f421fb912..3930825e5 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_influxdb.conf @@ -18,14 +18,44 @@ will be forwarded. zh: "本地 Topic" } } - payload { + measurement { desc { - en: """The payload to be forwarded to the InfluxDB. Placeholders supported.""" - zh: """要转发到 InfluxDB 的数据内容,支持占位符""" + en: """The measurement name to be forwarded to the InfluxDB. Placeholders supported.""" + zh: """要转发到 InfluxDB 的 Measurement 名称,支持占位符""" } label { - en: "Payload" - zh: "消息内容" + en: "Measurement" + zh: "Measurement" + } + } + timestamp { + desc { + en: """The timestamp to be forwarded to the InfluxDB. Placeholders supported. Default is message timestamp""" + zh: """要转发到 InfluxDB 的时间戳,支持占位符。默认使用消息的时间戳""" + } + label { + en: "Timestamp" + zh: "Timestamp" + } + } + tags { + desc { + en: """The tags to be forwarded to the InfluxDB. Placeholders supported.""" + zh: """要转发到 InfluxDB 的 Tags 数据内容,支持占位符""" + } + label { + en: "Tags" + zh: "Tags" + } + } + fields { + desc { + en: """The fields to be forwarded to the InfluxDB. Placeholders supported.""" + zh: """要转发到 InfluxDB 的 fields 数据内容,支持占位符""" + } + label { + en: "Fields" + zh: "Fields" } } config_enable { diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index ff9661cc8..881609bfc 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -6,26 +6,43 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - schema_modules/0, + api_schemas/1, conn_bridge_examples/1, resource_type/1, fields/1 ]). +api_schemas(Method) -> + [ + ref(emqx_ee_bridge_hstream, Method), + ref(emqx_ee_bridge_influxdb, Method ++ "_udp"), + ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"), + ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2") + ]. + schema_modules() -> - [emqx_ee_bridge_hstream, emqx_ee_bridge_influxdb]. + [ + emqx_ee_bridge_hstream, + emqx_ee_bridge_influxdb + ]. conn_bridge_examples(Method) -> + MergeFun = + fun(Example, Examples) -> + maps:merge(Examples, Example) + end, Fun = fun(Module, Examples) -> - Example = erlang:apply(Module, conn_bridge_example, [Method]), - maps:merge(Examples, Example) + ConnectorExamples = erlang:apply(Module, conn_bridge_examples, [Method]), + lists:foldl(MergeFun, Examples, ConnectorExamples) end, lists:foldl(Fun, #{}, schema_modules()). resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); resource_type(hstreamdb) -> emqx_ee_connector_hstream; -resource_type(influxdb) -> emqx_ee_connector_influxdb. +resource_type(influxdb_udp) -> emqx_ee_connector_influxdb; +resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; +resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb. fields(bridges) -> [ @@ -33,10 +50,14 @@ fields(bridges) -> mk( hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), #{desc => <<"EMQX Enterprise Config">>} - )}, - {influxdb, + )} + ] ++ fields(influxdb); +fields(influxdb) -> + [ + {Protocol, mk( - hoconsc:map(name, ref(emqx_ee_bridge_influxdb, "config")), + hoconsc:map(name, ref(emqx_ee_bridge_influxdb, Protocol)), #{desc => <<"EMQX Enterprise Config">>} )} + || Protocol <- [influxdb_udp, influxdb_api_v1, influxdb_api_v2] ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl index 73f7a20eb..200e695da 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl @@ -10,7 +10,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_example/1 + conn_bridge_examples/1 ]). -export([ @@ -23,13 +23,15 @@ %% ------------------------------------------------------------------------------------------------- %% api -conn_bridge_example(Method) -> - #{ - <<"hstreamdb">> => #{ - summary => <<"HStreamDB Bridge">>, - value => values(Method) +conn_bridge_examples(Method) -> + [ + #{ + <<"hstreamdb">> => #{ + summary => <<"HStreamDB Bridge">>, + value => values(Method) + } } - }. + ]. values(get) -> maps:merge(values(post), ?METRICS_EXAMPLE); diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl index dca55ba23..a19459208 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_influxdb.erl @@ -10,7 +10,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_example/1 + conn_bridge_examples/1 ]). -export([ @@ -23,28 +23,48 @@ %% ------------------------------------------------------------------------------------------------- %% api -conn_bridge_example(Method) -> - #{ - <<"influxdb">> => #{ - summary => <<"InfluxDB Bridge">>, - value => values(Method) +conn_bridge_examples(Method) -> + [ + #{ + <<"influxdb_udp">> => #{ + summary => <<"InfluxDB UDP Bridge">>, + value => values("influxdb_udp", Method) + } + }, + #{ + <<"influxdb_api_v1">> => #{ + summary => <<"InfluxDB HTTP API V1 Bridge">>, + value => values("influxdb_api_v1", Method) + } + }, + #{ + <<"influxdb_api_v2">> => #{ + summary => <<"InfluxDB HTTP API V2 Bridge">>, + value => values("influxdb_api_v2", Method) + } } - }. + ]. -values(get) -> - maps:merge(values(post), ?METRICS_EXAMPLE); -values(post) -> +values(Protocol, get) -> + maps:merge(values(Protocol, post), ?METRICS_EXAMPLE); +values(Protocol, post) -> #{ - type => influxdb, + type => list_to_atom(Protocol), name => <<"demo">>, - connector => <<"influxdb:api_v2_connector">>, + connector => list_to_binary(Protocol ++ ":connector"), enable => true, direction => egress, local_topic => <<"local/topic/#">>, - payload => <<"${payload}">> + measurement => <<"${topic}">>, + tags => #{<<"clientid">> => <<"${clientid}">>}, + fields => #{ + <<"payload">> => <<"${payload}">>, + <<"int_value">> => [int, <<"${payload.int_key}">>], + <<"uint_value">> => [uint, <<"${payload.uint_key}">>] + } }; -values(put) -> - values(post). +values(Protocol, put) -> + values(Protocol, post). %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions @@ -52,40 +72,69 @@ namespace() -> "bridge". roots() -> []. -fields("config") -> +fields(basic) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, {direction, mk(egress, #{desc => ?DESC("config_direction"), default => egress})}, {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, - {payload, mk(binary(), #{default => <<"${payload}">>, desc => ?DESC("payload")})}, - {connector, field(connector)} + {measurement, mk(binary(), #{desc => ?DESC("measurement"), required => true})}, + {timestamp, + mk(binary(), #{ + desc => ?DESC("timestamp"), default => <<"${timestamp}">>, required => false + })}, + {tags, mk(map(), #{desc => ?DESC("tags"), required => false})}, + {fields, mk(map(), #{desc => ?DESC("fields"), required => true})} ]; -fields("post") -> - [ - {type, mk(enum([influxdb]), #{required => true, desc => ?DESC("desc_type")})}, - {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} - | fields("config") - ]; -fields("put") -> - fields("config"); -fields("get") -> - emqx_bridge_schema:metrics_status_fields() ++ fields("post"). +fields("post_udp") -> + method_fileds(post, influxdb_udp); +fields("post_api_v1") -> + method_fileds(post, influxdb_api_v1); +fields("post_api_v2") -> + method_fileds(post, influxdb_api_v2); +fields("put_udp") -> + method_fileds(put, influxdb_udp); +fields("put_api_v1") -> + method_fileds(put, influxdb_api_v1); +fields("put_api_v2") -> + method_fileds(put, influxdb_api_v2); +fields("get_udp") -> + method_fileds(get, influxdb_udp); +fields("get_api_v1") -> + method_fileds(get, influxdb_api_v1); +fields("get_api_v2") -> + method_fileds(get, influxdb_api_v2); +fields(Name) when + Name == influxdb_udp orelse Name == influxdb_api_v1 orelse Name == influxdb_api_v2 +-> + fields(basic) ++ connector_field(Name). -field(connector) -> - ConnectorConfigRef = - [ - ref(emqx_ee_connector_influxdb, influxdb_udp), - ref(emqx_ee_connector_influxdb, influxdb_api_v1), - ref(emqx_ee_connector_influxdb, influxdb_api_v2) - ], - mk( - hoconsc:union([binary() | ConnectorConfigRef]), - #{ - required => true, - example => <<"influxdb:demo">>, - desc => ?DESC("desc_connector") - } - ). +method_fileds(post, ConnectorType) -> + fields(basic) ++ connector_field(ConnectorType) ++ type_name_field(ConnectorType); +method_fileds(get, ConnectorType) -> + fields(basic) ++ + emqx_bridge_schema:metrics_status_fields() ++ + connector_field(ConnectorType) ++ type_name_field(ConnectorType); +method_fileds(put, ConnectorType) -> + fields(basic) ++ connector_field(ConnectorType). + +connector_field(Type) -> + [ + {connector, + mk( + hoconsc:union([binary(), ref(emqx_ee_connector_influxdb, Type)]), + #{ + required => true, + example => list_to_binary(atom_to_list(Type) ++ ":connector"), + desc => ?DESC(<<"desc_connector">>) + } + )} + ]. + +type_name_field(Type) -> + [ + {type, mk(Type, #{required => true, desc => ?DESC("desc_type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} + ]. desc("config") -> ?DESC("desc_config"); diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 6d9fea3c9..675a934aa 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -4,7 +4,8 @@ {applications, [ kernel, stdlib, - hstreamdb_erl + hstreamdb_erl, + influxdb ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 37915d2e8..e343a64b0 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -33,8 +33,8 @@ on_start(InstId, Config) -> on_stop(_InstId, #{client := Client}) -> influxdb:stop_client(Client). -on_query(_InstId, {send_message, _Data}, _AfterQuery, _State) -> - ok. +on_query(InstId, {send_message, Data}, AfterQuery, State) -> + do_query(InstId, {send_message, Data}, AfterQuery, State). on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) of @@ -184,18 +184,36 @@ start_client(InstId, Config) -> {error, R} end. -do_start_client(InstId, ClientConfig, Config = #{egress := #{payload := PayloadBin}}) -> +do_start_client( + InstId, + ClientConfig, + Config = #{ + egress := #{ + measurement := Measurement, + timestamp := Timestamp, + tags := Tags, + fields := Fields + } + } +) -> case influxdb:start_client(ClientConfig) of {ok, Client} -> case influxdb:is_alive(Client) of true -> - Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin), + State = #{ + client => Client, + measurement => emqx_plugin_libs_rule:preproc_tmpl(Measurement), + timestamp => emqx_plugin_libs_rule:preproc_tmpl(Timestamp), + tags => to_tags_config(Tags), + fields => to_fields_config(Fields) + }, ?SLOG(info, #{ msg => "starting influxdb connector success", connector => InstId, - client => Client + client => Client, + state => State }), - #{client => Client, payload => Payload}; + {ok, State}; false -> ?SLOG(error, #{ msg => "starting influxdb connector failed", @@ -231,21 +249,15 @@ client_config( } ) -> [ - {host, Host}, + {host, binary_to_list(Host)}, {port, Port}, {pool_size, PoolSize}, {pool, binary_to_atom(InstId, utf8)}, - {precision, maps:get(precision, Config, ms)} + {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} ] ++ protocol_config(Config). +%% api v2 config protocol_config(#{ - protocol := udp -}) -> - [ - {protocol, udp} - ]; -protocol_config(#{ - protocol := api_v1, username := Username, password := Password, database := DB, @@ -254,13 +266,12 @@ protocol_config(#{ [ {protocol, http}, {version, v1}, - {username, Username}, - {password, Password}, - {database, DB}, - {ssl, SSL} + {username, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {database, binary_to_list(DB)} ] ++ ssl_config(SSL); +%% api v1 config protocol_config(#{ - protocol := api_v2, bucket := Bucket, org := Org, token := Token, @@ -269,11 +280,15 @@ protocol_config(#{ [ {protocol, http}, {version, v2}, - {bucket, Bucket}, - {org, Org}, - {token, Token}, - {ssl, SSL} - ] ++ ssl_config(SSL). + {bucket, binary_to_list(Bucket)}, + {org, binary_to_list(Org)}, + {token, Token} + ] ++ ssl_config(SSL); +%% udp config +protocol_config(_) -> + [ + {protocol, udp} + ]. ssl_config(#{enable := false}) -> [ @@ -284,3 +299,110 @@ ssl_config(SSL = #{enable := true}) -> {https_enabled, true}, {transport, ssl} ] ++ maps:to_list(maps:remove(enable, SSL)). + +%% ------------------------------------------------------------------------------------------------- +%% Query + +do_query(InstId, {send_message, Data}, AfterQuery, State = #{client := Client}) -> + case data_to_point(Data, State) of + {ok, Point} -> + case influxdb:write(Client, [Point]) of + ok -> + ?SLOG(debug, #{ + msg => "influxdb write point success", + connector => InstId, + point => Point + }), + emqx_resource:query_success(AfterQuery); + {error, Reason} -> + ?SLOG(error, #{ + msg => "influxdb write point failed", + connector => InstId, + reason => Reason + }), + emqx_resource:query_failed(AfterQuery) + end; + {error, Reason} -> + ?SLOG(error, #{ + msg => "influxdb trans point failed", + connector => InstId, + reason => Reason + }), + {error, Reason} + end. + +%% ------------------------------------------------------------------------------------------------- +%% Tags & Fields Config Trans + +to_tags_config(Tags) -> + maps:fold(fun to_maps_config/3, #{}, Tags). + +to_fields_config(Fields) -> + maps:fold(fun to_maps_config/3, #{}, Fields). + +to_maps_config(K, [IntType, V], Res) when IntType == <<"int">> orelse IntType == <<"uint">> -> + NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), + NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), + Res#{NK => {binary_to_atom(IntType, utf8), NV}}; +to_maps_config(K, V, Res) -> + NK = emqx_plugin_libs_rule:preproc_tmpl(bin(K)), + NV = emqx_plugin_libs_rule:preproc_tmpl(bin(V)), + Res#{NK => NV}. + +%% ------------------------------------------------------------------------------------------------- +%% Tags & Fields Data Trans +data_to_point( + Data, + #{ + measurement := Measurement, + timestamp := Timestamp, + tags := Tags, + fields := Fields + } +) -> + TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + case emqx_plugin_libs_rule:proc_tmpl(Timestamp, Data, TransOptions) of + [TimestampInt] when is_integer(TimestampInt) -> + Point = #{ + measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data), + timestamp => TimestampInt, + tags => maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags), + fields => maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields) + }, + {ok, Point}; + BadTimestamp -> + {error, {bad_timestamp, BadTimestamp}} + end. + +maps_config_to_data(K, {IntType, V}, {Data, Res}) when IntType == int orelse IntType == uint -> + TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions), + NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions), + case {NK, NV} of + {[undefined], _} -> + Res; + {_, [undefined]} -> + Res; + {_, [IntV]} when is_integer(IntV) -> + Res#{NK => {IntType, IntV}} + end; +maps_config_to_data(K, V, {Data, Res}) -> + TransOptions = #{return => rawlist, var_trans => fun data_filter/1}, + NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, TransOptions), + NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, TransOptions), + case {NK, NV} of + {[undefined], _} -> + Res; + {_, [undefined]} -> + Res; + _ -> + Res#{bin(NK) => NV} + end. + +data_filter(undefined) -> undefined; +data_filter(Int) when is_integer(Int) -> Int; +data_filter(Number) when is_number(Number) -> Number; +data_filter(Bool) when is_boolean(Bool) -> Bool; +data_filter(Data) -> bin(Data). + +bin(Data) -> emqx_plugin_libs_rule:bin(Data).