From 417e01749815d41562ec991df7413a339891ba0c Mon Sep 17 00:00:00 2001 From: Dennis Zhuang Date: Tue, 9 May 2023 21:34:30 +0800 Subject: [PATCH] feat: begin to impl connector --- .../src/emqx_bridge_greptimedb.app.src | 26 ++-- .../src/emqx_bridge_greptimedb_connector.erl | 113 ++++++++++++++++++ 2 files changed, 126 insertions(+), 13 deletions(-) diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src index f63863d71..14d655763 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb.app.src @@ -1,14 +1,14 @@ -{application, emqx_bridge_greptimedb, - [{description, "An OTP library"}, - {vsn, "0.1.0"}, - {registered, []}, - {applications, - [kernel, - stdlib - ]}, - {env,[]}, - {modules, []}, +{application, emqx_bridge_greptimedb, [ + {description, "An OTP library"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {env, []}, + {modules, []}, - {licenses, ["Apache-2.0"]}, - {links, []} - ]}. + {licenses, ["Apache-2.0"]}, + {links, []} +]}. diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 8f7aa65e2..17c4d9a3c 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -1 +1,114 @@ -module(emqx_bridge_greptimedb_connector). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% `emqx_resource' API +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_get_status/2, + on_query/3, + on_query_async/4, + on_batch_query/3, + on_batch_query_async/4 +]). + +-define(GREPTIMEDB_DEFAULT_PORT, 4001). + +-define(GREPTIMEDB_HOST_OPTIONS, #{ + default_port => ?GREPTIMEDB_DEFAULT_PORT +}). + +%%------------------------------------------------------------------------------------- +%% `emqx_resource' API +%%------------------------------------------------------------------------------------- +callback_mode() -> async_if_possible. + +on_start(InstId, Config) -> + start_client(InstId, Config). + +on_stop(_InstId, #{client := Client}) -> + greptimedb:stop_client(Client). + +on_get_status(_InstId, _State) -> + %% FIXME + connected. + +on_query(_InstanceId, {send_message, _Message}, _State) -> + todo. + +on_query_async(_InstanceId, {send_message, _Message}, _ReplyFunAndArgs0, _State) -> + todo. + +on_batch_query( + _ResourceID, + _BatchReq, + _State +) -> + todo. + +on_batch_query_async( + _InstId, + _BatchData, + {_ReplyFun, _Args}, + _State +) -> + todo. + +%% internal functions + +start_client(InstId, Config) -> + ClientConfig = client_config(InstId, Config), + ?SLOG(info, #{ + msg => "starting GreptimeDB connector", + connector => InstId, + config => emqx_utils:redact(Config), + client_config => emqx_utils:redact(ClientConfig) + }), + try + case greptimedb:start_client(ClientConfig) of + {ok, Client} -> + {ok, #{client => Client}}; + {error, Reason} -> + ?tp(greptimedb_connector_start_failed, #{error => Reason}), + ?SLOG(warning, #{ + msg => "failed_to_start_greptimedb_connector", + connector => InstId, + reason => Reason + }), + {error, Reason} + end + catch + E:R:S -> + ?tp(greptimedb_connector_start_exception, #{error => {E, R}}), + ?SLOG(warning, #{ + msg => "start greptimedb connector error", + connector => InstId, + error => E, + reason => R, + stack => S + }), + {error, R} + end. + +client_config( + InstId, + _Config = #{ + server := Server + } +) -> + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?GREPTIMEDB_HOST_OPTIONS), + [ + {endpoints, [{http, str(Host), Port}]}, + {pool_size, erlang:system_info(schedulers)}, + {pool, InstId}, + {pool_type, random} + ]. + +str(A) when is_atom(A) -> + atom_to_list(A); +str(B) when is_binary(B) -> + binary_to_list(B); +str(S) when is_list(S) -> + S.