diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index ef79f78fe..87da71449 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -29,7 +29,8 @@ on_query_async/4, on_batch_query/3, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). %% callbacks of ecpool @@ -459,6 +460,11 @@ handle_result({error, Error}) -> handle_result(Res) -> Res. +on_format_query_result({ok, Result}) -> + #{result => ok, info => Result}; +on_format_query_result(Result) -> + Result. + %%-------------------------------------------------------------------- %% utils diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 2c824aa95..f6888cad5 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -38,7 +38,8 @@ on_get_channels/1, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). %% callbacks for ecpool @@ -519,6 +520,13 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) -> to_error_tuple(ClickhouseErrorResult) end. +on_format_query_result(ok) -> + #{result => ok, message => <<"">>}; +on_format_query_result({ok, Message}) -> + #{result => ok, message => Message}; +on_format_query_result(Result) -> + Result. + to_recoverable_error({error, Reason}) -> {error, {recoverable_error, Reason}}; to_recoverable_error(Error) -> diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index f89786929..4e974a8a9 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -26,7 +26,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([ @@ -184,6 +185,11 @@ on_batch_query(InstanceId, [{_ChannelId, _} | _] = Query, State) -> on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. +on_format_query_result({ok, Result}) -> + #{result => ok, info => Result}; +on_format_query_result(Result) -> + Result. + health_check_timeout() -> 2500. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index fc3aa6d3b..d94ce8e15 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -23,7 +23,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([ @@ -288,6 +289,9 @@ on_query_async( InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State ). +on_format_query_result(Result) -> + emqx_bridge_http_connector:on_format_query_result(Result). + on_add_channel( InstanceId, #{channels := Channels} = State0, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 12d5d1f2f..48e50c416 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -53,7 +53,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([reply_delegator/2]). @@ -489,6 +490,11 @@ handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) -> handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) -> Result. +on_format_query_result({ok, Info}) -> + #{result => ok, info => Info}; +on_format_query_result(Result) -> + Result. + reply_delegator(ReplyFunAndArgs, Response) -> case Response of {error, Reason} when diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 97eedf3f6..963f0efd0 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -27,7 +27,8 @@ on_batch_query/3, on_query_async/4, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). -export([reply_callback/2]). @@ -453,6 +454,11 @@ do_query(InstId, Channel, Client, Points) -> end end. +on_format_query_result({ok, {affected_rows, Rows}}) -> + #{result => ok, affected_rows => Rows}; +on_format_query_result(Result) -> + Result. + do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) -> ?SLOG(info, #{ msg => "greptimedb_write_point_async", diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index f239d3735..88065b7b3 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -27,7 +27,8 @@ on_batch_query/3, on_query_async/4, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). -export([reply_callback/2]). @@ -209,6 +210,9 @@ on_batch_query_async( {error, {unrecoverable_error, Reason}} end. +on_format_query_result(Result) -> + emqx_bridge_http_connector:on_format_query_result(Result). + on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of true -> diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index d26b47f73..65fbda936 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -26,7 +26,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([ @@ -390,6 +391,9 @@ on_batch_query( Error end. +on_format_query_result(Result) -> + emqx_bridge_http_connector:on_format_query_result(Result). + on_add_channel( InstanceId, #{iotdb_version := Version, channels := Channels} = OldState0, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index 8744dfd71..95d193d92 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -39,7 +39,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([ @@ -318,6 +319,11 @@ handle_result({error, Reason} = Error, Requests, InstanceId) -> }), Error. +on_format_query_result({ok, Result}) -> + #{result => ok, info => Result}; +on_format_query_result(Result) -> + Result. + parse_template(Config) -> #{payload_template := PayloadTemplate, partition_key := PartitionKeyTemplate} = Config, Templates = #{send_message => PayloadTemplate, partition_key => PartitionKeyTemplate}, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index 69c2242e4..6b6db358a 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -18,7 +18,8 @@ on_get_status/2, on_query/3, on_start/2, - on_stop/2 + on_stop/2, + on_format_query_result/1 ]). %%======================================================================================== @@ -85,6 +86,11 @@ on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_stat on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) -> emqx_mongodb:on_query(InstanceId, Request, ConnectorState). +on_format_query_result({{Result, Info}, Documents}) -> + #{result => Result, info => Info, documents => Documents}; +on_format_query_result(Result) -> + Result. + on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> NewState = State#{channels => maps:remove(ChannelId, Channels)}, {ok, NewState}. diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 509d53284..19e117a0d 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -27,7 +27,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([connector_examples/1]). @@ -175,6 +176,11 @@ on_batch_query( Error end. +on_format_query_result({ok, StatusCode, BodyMap}) -> + #{result => ok, status_code => StatusCode, body => BodyMap}; +on_format_query_result(Result) -> + Result. + on_get_status(_InstanceId, #{server := Server}) -> Result = case opentsdb_connectivity(Server) of diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 0cddfab66..9d269493d 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -20,7 +20,8 @@ on_get_status/2, on_get_channel_status/3, on_query/3, - on_query_async/4 + on_query_async/4, + on_format_query_result/1 ]). -type pulsar_client_id() :: atom(). @@ -234,6 +235,11 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) -> }), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). +on_format_query_result({ok, Info}) -> + #{result => ok, info => Info}; +on_format_query_result(Result) -> + Result. + %%------------------------------------------------------------------------------------- %% Internal fns %%------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 683551316..726d2656a 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -39,7 +39,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). %% callbacks for ecpool @@ -320,6 +321,11 @@ on_batch_query(ResourceId, BatchRequests, State) -> ), do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State). +on_format_query_result({ok, Rows}) -> + #{result => ok, rows => Rows}; +on_format_query_result(Result) -> + Result. + on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> Health = emqx_resource_pool:health_check_workers( PoolName, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 67b0e77bc..324694edc 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -28,7 +28,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([connector_examples/1]). @@ -215,6 +216,11 @@ on_batch_query(InstanceId, BatchReq, State) -> ?SLOG(error, LogMeta#{msg => "invalid_request"}), {error, {unrecoverable_error, invalid_request}}. +on_format_query_result({ok, ResultMap}) -> + #{result => ok, info => ResultMap}; +on_format_query_result(Result) -> + Result. + on_get_status(_InstanceId, #{pool_name := PoolName} = State) -> case emqx_resource_pool:health_check_workers( diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index ff851558a..3ad2fb564 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -30,7 +30,8 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). %% ecpool connect & reconnect @@ -214,6 +215,13 @@ on_batch_query( }), {error, {unrecoverable_error, invalid_request}}. +on_format_query_result({ok, ColumnNames, Rows}) -> + #{result => ok, column_names => ColumnNames, rows => Rows}; +on_format_query_result({ok, DataList}) -> + #{result => ok, column_names_rows_list => DataList}; +on_format_query_result(Result) -> + Result. + mysql_function(sql) -> query; mysql_function(prepared_query) ->