diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl index 9cf7eb8f4..3b07acbe0 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl @@ -251,7 +251,7 @@ directly_setup_dynamo() -> directly_query(Query) -> directly_setup_dynamo(), - emqx_ee_connector_dynamo:execute(Query, ?TABLE_BIN). + emqx_ee_connector_dynamo_client:execute(Query, ?TABLE_BIN). directly_get_payload(Key) -> case directly_query({get_item, {<<"id">>, Key}}) of diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index 9a149b6f7..2170827d6 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -28,10 +28,7 @@ ]). -export([ - connect/1, - do_get_status/1, - do_async_reply/2, - worker_do_query/4 + connect/1 ]). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -40,10 +37,6 @@ default_port => 8000 }). --ifdef(TEST). --export([execute/2]). --endif. - %%===================================================================== %% Hocon schema roots() -> @@ -126,45 +119,39 @@ on_stop(InstanceId, #{poolname := PoolName} = _State) -> emqx_plugin_libs_pool:stop_pool(PoolName). on_query(InstanceId, Query, State) -> - do_query(InstanceId, Query, handover, State). + do_query(InstanceId, Query, sync, State). -on_query_async(InstanceId, Query, Reply, State) -> +on_query_async(InstanceId, Query, ReplyCtx, State) -> do_query( InstanceId, Query, - {handover_async, {?MODULE, do_async_reply, [Reply]}}, + {async, ReplyCtx}, State ). %% we only support batch insert on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> - do_query(InstanceId, Query, handover, State); + do_query(InstanceId, Query, sync, State); on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. %% we only support batch insert -on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, Reply, State) -> +on_batch_query_async(InstanceId, [{send_message, _} | _] = Query, ReplyCtx, State) -> do_query( InstanceId, Query, - {handover_async, {?MODULE, do_async_reply, [Reply]}}, + {async, ReplyCtx}, State ); on_batch_query_async(_InstanceId, Query, _Reply, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. on_get_status(_InstanceId, #{poolname := Pool}) -> - Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), + Health = emqx_plugin_libs_pool:health_check_ecpool_workers( + Pool, {emqx_ee_connector_dynamo_client, is_connected, []} + ), status_result(Health). -do_get_status(_Conn) -> - %% because the dynamodb driver connection process is the ecpool worker self - %% so we must call the checker function inside the worker - case erlcloud_ddb2:list_tables() of - {ok, _} -> true; - _ -> false - end. - status_result(_Status = true) -> connected; status_result(_Status = false) -> connecting. @@ -185,8 +172,8 @@ do_query( ), Result = ecpool:pick_and_do( PoolName, - {?MODULE, worker_do_query, [Table, Query, Templates]}, - ApplyMode + {emqx_ee_connector_dynamo_client, query, [ApplyMode, Table, Query, Templates]}, + no_handover ), case Result of @@ -210,47 +197,10 @@ do_query( Result end. -worker_do_query(_Client, Table, Query0, Templates) -> - try - Query = apply_template(Query0, Templates), - execute(Query, Table) - catch - _Type:Reason -> - {error, {unrecoverable_error, {invalid_request, Reason}}} - end. - -%% some simple query commands for authn/authz or test -execute({insert_item, Msg}, Table) -> - Item = convert_to_item(Msg), - erlcloud_ddb2:put_item(Table, Item); -execute({delete_item, Key}, Table) -> - erlcloud_ddb2:delete_item(Table, Key); -execute({get_item, Key}, Table) -> - erlcloud_ddb2:get_item(Table, Key); -%% commands for data bridge query or batch query -execute({send_message, Msg}, Table) -> - Item = convert_to_item(Msg), - erlcloud_ddb2:put_item(Table, Item); -execute([{put, _} | _] = Msgs, Table) -> - %% type of batch_write_item argument :: batch_write_item_request_items() - %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) - %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} - %% batch_write_item_request() :: {put, item()} | {delete, key()} - erlcloud_ddb2:batch_write_item({Table, Msgs}). - connect(Opts) -> - #{ - aws_access_key_id := AccessKeyID, - aws_secret_access_key := SecretAccessKey, - host := Host, - port := Port, - schema := Schema - } = proplists:get_value(config, Opts), - erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), - - %% The dynamodb driver uses caller process as its connection process - %% so at here, the connection process is the ecpool worker self - {ok, self()}. + Options = proplists:get_value(config, Opts), + {ok, _Pid} = Result = emqx_ee_connector_dynamo_client:start_link(Options), + Result. parse_template(Config) -> Templates = @@ -283,54 +233,5 @@ get_host_schema("https://" ++ Server) -> get_host_schema(Server) -> {"http://", Server}. -apply_template({Key, Msg} = Req, Templates) -> - case maps:get(Key, Templates, undefined) of - undefined -> - Req; - Template -> - {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} - end; -%% now there is no batch delete, so -%% 1. we can simply replace the `send_message` to `put` -%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`, -%% so we can reduce some list map cost -apply_template([{send_message, _Msg} | _] = Msgs, Templates) -> - lists:map( - fun(Req) -> - {_, Msg} = apply_template(Req, Templates), - {put, convert_to_item(Msg)} - end, - Msgs - ). - -convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 -> - maps:fold( - fun - (_K, <<>>, AccIn) -> - AccIn; - (K, V, AccIn) -> - [{convert2binary(K), convert2binary(V)} | AccIn] - end, - [], - Msg - ); -convert_to_item(MsgBin) when is_binary(MsgBin) -> - Msg = emqx_utils_json:decode(MsgBin), - convert_to_item(Msg); -convert_to_item(Item) -> - erlang:throw({invalid_item, Item}). - -convert2binary(Value) when is_atom(Value) -> - erlang:atom_to_binary(Value, utf8); -convert2binary(Value) when is_binary(Value); is_number(Value) -> - Value; -convert2binary(Value) when is_list(Value) -> - unicode:characters_to_binary(Value); -convert2binary(Value) when is_map(Value) -> - emqx_utils_json:encode(Value). - -do_async_reply(Result, {ReplyFun, [Context]}) -> - ReplyFun(Context, Result). - redact(Data) -> emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl new file mode 100644 index 000000000..e0d8ee4bf --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo_client.erl @@ -0,0 +1,180 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_connector_dynamo_client). + +-behaviour(gen_server). + +%% API +-export([ + start_link/1, + is_connected/1, + query/5, + query/4 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + format_status/2 +]). + +-ifdef(TEST). +-export([execute/2]). +-endif. + +%%%=================================================================== +%%% API +%%%=================================================================== +is_connected(Pid) -> + try + gen_server:call(Pid, is_connected) + catch + _:_ -> + false + end. + +query(Pid, sync, Table, Query, Templates) -> + query(Pid, Table, Query, Templates); +query(Pid, {async, ReplyCtx}, Table, Query, Templates) -> + gen_server:cast(Pid, {query, Table, Query, Templates, ReplyCtx}). + +query(Pid, Table, Query, Templates) -> + gen_server:call(Pid, {query, Table, Query, Templates}, infinity). + +%%-------------------------------------------------------------------- +%% @doc +%% Starts Bridge which transfer data to DynamoDB +%% @endn +%%-------------------------------------------------------------------- +start_link(Options) -> + gen_server:start_link(?MODULE, Options, []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% Initialize dynamodb data bridge +init(#{ + aws_access_key_id := AccessKeyID, + aws_secret_access_key := SecretAccessKey, + host := Host, + port := Port, + schema := Schema +}) -> + erlcloud_ddb2:configure(AccessKeyID, SecretAccessKey, Host, Port, Schema), + {ok, #{}}. + +handle_call(is_connected, _From, State) -> + _ = erlcloud_ddb2:list_tables(), + {reply, true, State}; +handle_call({query, Table, Query, Templates}, _From, State) -> + Result = do_query(Table, Query, Templates), + {reply, Result, State}; +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast({query, Table, Query, Templates, {ReplyFun, [Context]}}, State) -> + Result = do_query(Table, Query, Templates), + ReplyFun(Context, Result), + {noreply, State}; +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +-spec format_status( + Opt :: normal | terminate, + Status :: list() +) -> Status :: term(). +format_status(_Opt, Status) -> + Status. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +do_query(Table, Query0, Templates) -> + try + Query = apply_template(Query0, Templates), + execute(Query, Table) + catch + _Type:Reason -> + {error, {unrecoverable_error, {invalid_request, Reason}}} + end. + +%% some simple query commands for authn/authz or test +execute({insert_item, Msg}, Table) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Table, Item); +execute({delete_item, Key}, Table) -> + erlcloud_ddb2:delete_item(Table, Key); +execute({get_item, Key}, Table) -> + erlcloud_ddb2:get_item(Table, Key); +%% commands for data bridge query or batch query +execute({send_message, Msg}, Table) -> + Item = convert_to_item(Msg), + erlcloud_ddb2:put_item(Table, Item); +execute([{put, _} | _] = Msgs, Table) -> + %% type of batch_write_item argument :: batch_write_item_request_items() + %% batch_write_item_request_items() :: maybe_list(batch_write_item_request_item()) + %% batch_write_item_request_item() :: {table_name(), list(batch_write_item_request())} + %% batch_write_item_request() :: {put, item()} | {delete, key()} + erlcloud_ddb2:batch_write_item({Table, Msgs}). + +apply_template({Key, Msg} = Req, Templates) -> + case maps:get(Key, Templates, undefined) of + undefined -> + Req; + Template -> + {Key, emqx_plugin_libs_rule:proc_tmpl(Template, Msg)} + end; +%% now there is no batch delete, so +%% 1. we can simply replace the `send_message` to `put` +%% 2. convert the message to in_item() here, not at the time when calling `batch_write_items`, +%% so we can reduce some list map cost +apply_template([{send_message, _Msg} | _] = Msgs, Templates) -> + lists:map( + fun(Req) -> + {_, Msg} = apply_template(Req, Templates), + {put, convert_to_item(Msg)} + end, + Msgs + ). + +convert_to_item(Msg) when is_map(Msg), map_size(Msg) > 0 -> + maps:fold( + fun + (_K, <<>>, AccIn) -> + AccIn; + (K, V, AccIn) -> + [{convert2binary(K), convert2binary(V)} | AccIn] + end, + [], + Msg + ); +convert_to_item(MsgBin) when is_binary(MsgBin) -> + Msg = emqx_utils_json:decode(MsgBin), + convert_to_item(Msg); +convert_to_item(Item) -> + erlang:throw({invalid_item, Item}). + +convert2binary(Value) when is_atom(Value) -> + erlang:atom_to_binary(Value, utf8); +convert2binary(Value) when is_binary(Value); is_number(Value) -> + Value; +convert2binary(Value) when is_list(Value) -> + unicode:characters_to_binary(Value); +convert2binary(Value) when is_map(Value) -> + emqx_utils_json:encode(Value).