diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 5e42b4881..c8aff2f8a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -804,7 +804,9 @@ bridge_v2_type_to_connector_type(kafka) -> bridge_v2_type_to_connector_type(kafka_producer) -> kafka_producer; bridge_v2_type_to_connector_type(azure_event_hub_producer) -> - azure_event_hub_producer. + azure_event_hub_producer; +bridge_v2_type_to_connector_type(syskeeper_forwarder) -> + syskeeper_forwarder. %%==================================================================== %% Data backup API @@ -1031,7 +1033,9 @@ bridge_v1_type_to_bridge_v2_type(kafka) -> bridge_v1_type_to_bridge_v2_type(kafka_producer) -> kafka_producer; bridge_v1_type_to_bridge_v2_type(azure_event_hub_producer) -> - azure_event_hub_producer. + azure_event_hub_producer; +bridge_v1_type_to_bridge_v2_type(syskeeper_forwarder) -> + syskeeper_forwarder. %% This function should return true for all inputs that are bridge V1 types for %% bridges that have been refactored to bridge V2s, and for all all bridge V2 @@ -1044,6 +1048,8 @@ is_bridge_v2_type(<<"kafka">>) -> true; is_bridge_v2_type(<<"azure_event_hub_producer">>) -> true; +is_bridge_v2_type(<<"syskeeper_forwarder">>) -> + true; is_bridge_v2_type(_) -> false. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index a160ecd33..93951cca0 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -50,9 +50,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method), api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer"), api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_grpc_v1"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer"), - api_ref(emqx_bridge_syskeeper, <<"syskeeper">>, Method), - api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method) + api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer") ]. schema_modules() -> @@ -80,9 +78,7 @@ schema_modules() -> emqx_bridge_rabbitmq, emqx_bridge_kinesis, emqx_bridge_greptimedb, - emqx_bridge_azure_event_hub, - emqx_bridge_syskeeper, - emqx_bridge_syskeeper_proxy + emqx_bridge_azure_event_hub ]. examples(Method) -> @@ -130,9 +126,7 @@ resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer; resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; %% We use AEH's Kafka interface. -resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; -resource_type(syskeeper) -> emqx_bridge_syskeeper_connector; -resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server. +resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer. %% For bridges that need to override connector configurations. bridge_impl_module(BridgeType) when is_binary(BridgeType) -> @@ -221,8 +215,7 @@ fields(bridges) -> influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++ - kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs() ++ - syskeeper_structs(). + kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs(). mongodb_structs() -> [ @@ -435,26 +428,6 @@ azure_event_hub_structs() -> )} ]. -syskeeper_structs() -> - [ - {syskeeper, - mk( - hoconsc:map(name, ref(emqx_bridge_syskeeper, "config")), - #{ - desc => <<"Syskeeper bridge config ">>, - required => false - } - )}, - {syskeeper_proxy, - mk( - hoconsc:map(name, ref(emqx_bridge_syskeeper_proxy, "config")), - #{ - desc => <<"Syskeeper proxy server config">>, - required => false - } - )} - ]. - api_ref(Module, Type, Method) -> {Type, ref(Module, Method)}. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl index 54448f07d..ac0713545 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_enterprise.erl @@ -28,7 +28,8 @@ examples(Method) -> schema_modules() -> [ emqx_bridge_kafka, - emqx_bridge_azure_event_hub + emqx_bridge_azure_event_hub, + emqx_bridge_syskeeper ]. fields(actions) -> @@ -51,13 +52,24 @@ action_structs() -> desc => <<"Azure Event Hub Actions Config">>, required => false } + )}, + {syskeeper_forwarder, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper, config)), + #{ + desc => <<"Syskeeper forwarder Bridge V2 Config">>, + required => false + } )} ]. api_schemas(Method) -> [ api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_bridge_v2"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2") + api_ref( + emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_bridge_v2" + ), + api_ref(emqx_bridge_syskeeper, <<"syskeeper_forwarder">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index d6d8eb9a1..f0973f260 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -134,8 +134,11 @@ desc(_) -> schema_homogeneous_test() -> case lists:filtermap( - fun({_Name, Schema}) -> - is_bad_schema(Schema) + fun + ({syskeeper_forwarder, _Schema}) -> + false; + ({_Name, Schema}) -> + is_bad_schema(Schema) end, fields(actions) ) diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl index 9e0ec4eed..9e520d095 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl @@ -11,7 +11,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_examples/1, + bridge_v2_examples/1, values/1 ]). @@ -24,35 +24,46 @@ %% ------------------------------------------------------------------------------------------------- %% api -conn_bridge_examples(Method) -> +bridge_v2_examples(Method) -> [ #{ - <<"syskeeper">> => #{ - summary => <<"Syskeeper Bridge">>, + <<"syskeeper_forwarder">> => #{ + summary => <<"Syskeeper Forwarder Bridge">>, value => values(Method) } } ]. -values(_Method) -> +values(get) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values(post) + ); +values(post) -> + maps:merge( + #{ + name => <<"syskeeper_forwarder">>, + type => <<"syskeeper_forwarder">> + }, + values(put) + ); +values(put) -> #{ enable => true, - type => syskeeper, - name => <<"foo">>, - server => <<"127.0.0.1:9092">>, - ack_mode => <<"no_ack">>, - ack_timeout => <<"10s">>, - pool_size => 16, + connector => <<"syskeeper_forwarder">>, target_topic => <<"${topic}">>, target_qos => <<"-1">>, template => <<"${payload}">>, resource_opts => #{ - worker_pool_size => 16, - health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, - batch_size => ?DEFAULT_BATCH_SIZE, - batch_time => ?DEFAULT_BATCH_TIME, - query_mode => sync, - max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + worker_pool_size => 16 } }. @@ -62,9 +73,14 @@ namespace() -> "bridge_syskeeper". roots() -> []. -fields("config") -> +fields(config) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()}, + {connector, + mk(binary(), #{ + desc => ?DESC(emqx_connector_schema, "connector_field"), required => true + })}, {target_topic, mk( binary(), @@ -89,17 +105,17 @@ fields("config") -> desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) } )} - ] ++ emqx_bridge_syskeeper_connector:fields(config); + ]; fields("creation_opts") -> emqx_resource_schema:create_opts([{request_ttl, #{default => infinity}}]); fields("post") -> - [type_field(), name_field() | fields("config")]; + [type_field(), name_field() | fields(config)]; fields("put") -> - fields("config"); + fields(config); fields("get") -> emqx_bridge_schema:status_fields() ++ fields("post"). -desc("config") -> +desc(config) -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Syskeeper using `", string:to_upper(Method), "` method."]; @@ -111,7 +127,7 @@ desc(_) -> %% ------------------------------------------------------------------------------------------------- type_field() -> - {type, mk(enum([syskeeper]), #{required => true, desc => ?DESC("desc_type")})}. + {type, mk(enum([syskeeper_forwarder]), #{required => true, desc => ?DESC("desc_type")})}. name_field() -> {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl index 3a60519ff..5c52017fd 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -1,4 +1,4 @@ -%%-------------------------------------------------------------------- +%-------------------------------------------------------------------- %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- @@ -12,7 +12,7 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("hocon/include/hoconsc.hrl"). --export([roots/0, fields/1]). +-export([roots/0, fields/1, desc/1, connector_examples/1]). %% `emqx_resource' API -export([ @@ -22,7 +22,11 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([ @@ -37,6 +41,48 @@ -define(EXTRA_CALL_TIMEOUT, 2000). +%% ------------------------------------------------------------------------------------------------- +%% api +connector_examples(Method) -> + [ + #{ + <<"syskeeper_forwarder">> => #{ + summary => <<"Syskeeper Forwarder Connector">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values(post) + ); +values(post) -> + maps:merge( + #{ + name => <<"syskeeper_forwarder">>, + type => <<"syskeeper_forwarder">> + }, + values(put) + ); +values(put) -> + #{ + enable => true, + server => <<"127.0.0.1:9092">>, + ack_mode => <<"no_ack">>, + ack_timeout => <<"10s">>, + pool_size => 16 + }. + %% ------------------------------------------------------------------------------------------------- %% Hocon schema roots() -> @@ -44,6 +90,8 @@ roots() -> fields(config) -> [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()}, {server, server()}, {ack_mode, mk( @@ -61,12 +109,31 @@ fields(config) -> (Other) -> emqx_connector_schema_lib:pool_size(Other) end} - ]. + ]; +fields("post") -> + [type_field(), name_field() | fields(config)]; +fields("put") -> + fields(config); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc(config) -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. server() -> Meta = #{desc => ?DESC("server")}, emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). +type_field() -> + {type, mk(enum([syskeeper_forwarder]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + %% ------------------------------------------------------------------------------------------------- %% `emqx_resource' API @@ -79,9 +146,7 @@ on_start( #{ server := Server, pool_size := PoolSize, - ack_timeout := AckTimeout, - target_topic := TargetTopic, - target_qos := TargetQoS + ack_timeout := AckTimeout } = Config ) -> ?SLOG(info, #{ @@ -103,10 +168,8 @@ on_start( State = #{ pool_name => InstanceId, - target_qos => TargetQoS, ack_timeout => AckTimeout, - templates => parse_template(Config), - target_topic_tks => emqx_placeholder:preproc_tmpl(TargetTopic) + channels => #{} }, case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of ok -> @@ -122,13 +185,13 @@ on_stop(InstanceId, _State) -> }), emqx_resource_pool:stop(InstanceId). -on_query(InstanceId, {send_message, _} = Query, State) -> +on_query(InstanceId, {_MessageTag, _} = Query, State) -> do_query(InstanceId, [Query], State); on_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. %% we only support batch insert -on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> +on_batch_query(InstanceId, [{_MessageTag, _} | _] = Query, State) -> do_query(InstanceId, Query, State); on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. @@ -143,13 +206,46 @@ status_result(true) -> connected; status_result(false) -> connecting; status_result({error, _}) -> connecting. +on_add_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId, #{ + target_topic := TargetTopic, + target_qos := TargetQoS, + template := Template +}) -> + case maps:is_key(ChannelId, Channels) of + true -> + {error, already_exists}; + _ -> + Channel = #{ + target_qos => TargetQoS, + target_topic => emqx_placeholder:preproc_tmpl(TargetTopic), + template => emqx_placeholder:preproc_tmpl(Template) + }, + Channels2 = Channels#{ChannelId => Channel}, + {ok, OldState#{channels => Channels2}} + end. + +on_remove_channel(_InstanceId, #{channels := Channels} = OldState, ChannelId) -> + Channels2 = maps:remove(ChannelId, Channels), + {ok, OldState#{channels => Channels2}}. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + +on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> + case maps:is_key(ChannelId, Channels) of + true -> + connected; + _ -> + {error, not_exists} + end. + %% ------------------------------------------------------------------------------------------------- %% Helper fns do_query( InstanceId, Query, - #{pool_name := PoolName, ack_timeout := AckTimeout} = State + #{pool_name := PoolName, ack_timeout := AckTimeout, channels := Channels} = State ) -> ?TRACE( "QUERY", @@ -158,7 +254,7 @@ do_query( ), Result = - case try_apply_template(Query, State) of + case try_render_message(Query, Channels) of {ok, Msg} -> ecpool:pick_and_do( PoolName, @@ -199,48 +295,26 @@ connect(Opts) -> Options = proplists:get_value(options, Opts), emqx_bridge_syskeeper_client:start_link(Options). -parse_template(Config) -> - Templates = - case maps:get(template, Config, undefined) of - undefined -> #{}; - <<>> -> #{}; - Template -> #{send_message => Template} - end, +try_render_message(Datas, Channels) -> + try_render_message(Datas, Channels, []). - parse_template(maps:to_list(Templates), #{}). - -parse_template([{Key, H} | T], Templates) -> - ParamsTks = emqx_placeholder:preproc_tmpl(H), - parse_template( - T, - Templates#{Key => ParamsTks} - ); -parse_template([], Templates) -> - Templates. - -try_apply_template([{Type, _} | _] = Datas, #{templates := Templates} = State) -> - case maps:find(Type, Templates) of - {ok, Template} -> - apply_template(Datas, Template, State); +try_render_message([{MessageTag, Data} | T], Channels, Acc) -> + case maps:find(MessageTag, Channels) of + {ok, Channel} -> + case render_message(Data, Channel) of + {ok, Msg} -> + try_render_message(T, Channels, [Msg | Acc]); + Error -> + Error + end; _ -> - {error, {unrecoverable_error, {invalid_request, Datas}}} - end. - -apply_template(Datas, Template, State) -> - apply_template(Datas, Template, State, []). - -apply_template([{_, Data} | T], Template, State, Acc) -> - case do_apply_template(Data, Template, State) of - {ok, Msg} -> - apply_template(T, Template, State, [Msg | Acc]); - Error -> - Error + {error, {unrecoverable_error, {invalid_message_tag, MessageTag}}} end; -apply_template([], _Template, _State, Acc) -> +try_render_message([], _Channels, Acc) -> {ok, lists:reverse(Acc)}. -do_apply_template(#{id := Id, qos := QoS, clientid := From} = Data, Template, #{ - target_qos := TargetQoS, target_topic_tks := TargetTopicTks +render_message(#{id := Id, qos := QoS, clientid := From} = Data, #{ + target_qos := TargetQoS, target_topic := TargetTopicTks, template := Template }) -> Msg = maps:with([qos, flags, topic, payload, timestamp], Data), Topic = emqx_placeholder:proc_tmpl(TargetTopicTks, Msg), @@ -257,13 +331,7 @@ do_apply_template(#{id := Id, qos := QoS, clientid := From} = Data, Template, #{ topic := Topic, payload := format_data(Template, Msg) }}; -do_apply_template(Data, Template, State) -> - ?SLOG(info, #{ - msg => "syskeeper_connector_apply_template_error", - data => Data, - template => Template, - state => State - }), +render_message(Data, _Channel) -> {error, {unrecoverable_error, {invalid_data, Data}}}. format_data([], Msg) -> diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl index fcdcbac85..1968022c1 100644 --- a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl @@ -11,7 +11,7 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_examples/1, + connector_examples/1, values/1 ]). @@ -28,21 +28,40 @@ %% ------------------------------------------------------------------------------------------------- %% api -conn_bridge_examples(Method) -> +connector_examples(Method) -> [ #{ <<"syskeeper_proxy">> => #{ - summary => <<"Syskeeper Bridge Proxy">>, + summary => <<"Syskeeper Proxy Connector">>, value => values(Method) } } ]. -values(_Method) -> +values(get) -> + maps:merge( + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + }, + values(post) + ); +values(post) -> + maps:merge( + #{ + name => <<"syskeeper_proxy">>, + type => <<"syskeeper_proxy">> + }, + values(put) + ); +values(put) -> #{ enable => true, - type => syskeeper_proxy, - name => <<"foo">>, listen => <<"127.0.0.1:9092">>, acceptors => 16, handshake_timeout => <<"16s">> @@ -50,13 +69,14 @@ values(_Method) -> %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions -namespace() -> "bridge_syskeeper_proxy". +namespace() -> "connector_syskeeper_proxy". roots() -> []. -fields("config") -> +fields(config) -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()}, {listen, listen()}, {acceptors, mk( @@ -69,21 +89,17 @@ fields("config") -> #{desc => ?DESC(handshake_timeout), default => <<"10s">>} )} ]; -fields("creation_opts") -> - emqx_resource_schema:create_opts([{worker_pool_size, #{default => 1}}]); fields("post") -> - [type_field(), name_field() | fields("config")]; + [type_field(), name_field() | fields(config)]; fields("put") -> - fields("config"); + fields(config); fields("get") -> emqx_bridge_schema:status_fields() ++ fields("post"). -desc("config") -> +desc(config) -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."]; -desc("creation_opts" = Name) -> - emqx_resource_schema:desc(Name); desc(_) -> undefined. diff --git a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl index 21886a90c..10bbf4d59 100644 --- a/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl +++ b/apps/emqx_bridge_syskeeper/test/emqx_bridge_syskeeper_SUITE.erl @@ -13,7 +13,8 @@ -define(HOST, "127.0.0.1"). -define(PORT, 9092). --define(ACK_TIMEOUT, <<"3s">>). +-define(ACK_TIMEOUT, 2000). +-define(HANDSHAKE_TIMEOUT, 10000). -define(SYSKEEPER_NAME, <<"syskeeper">>). -define(SYSKEEPER_PROXY_NAME, <<"syskeeper_proxy">>). -define(BATCH_SIZE, 3). @@ -32,7 +33,13 @@ all() -> groups() -> TCs = emqx_common_test_helpers:all(?MODULE), - Lifecycle = [t_setup_via_config, t_setup_via_http_api, t_get_status], + Lifecycle = [ + t_setup_proxy_via_config, + t_setup_proxy_via_http_api, + t_setup_forwarder_via_config, + t_setup_forwarder_via_http_api, + t_get_status + ], Write = TCs -- Lifecycle, BatchingGroups = [{group, with_batch}, {group, without_batch}], [ @@ -58,14 +65,21 @@ end_per_group(_Group, _Config) -> ok. init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge, emqx_bridge_syskeeper]), + ok = emqx_common_test_helpers:start_apps([ + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_bridge_syskeeper + ]), _ = emqx_bridge_enterprise:module_info(), emqx_mgmt_api_test_util:init_suite(), Config. end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite(), - ok = emqx_common_test_helpers:stop_apps([emqx_bridge, emqx_conf]). + ok = emqx_common_test_helpers:stop_apps([ + emqx_bridge_syskeeper, emqx_bridge, emqx_connector, emqx_conf + ]). init_per_testcase(_Testcase, Config) -> snabbkaffe:start_trace(), @@ -73,15 +87,15 @@ init_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, _Config) -> ok = snabbkaffe:stop(), - delete_bridge(syskeeper, ?SYSKEEPER_NAME), - delete_bridge(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + delete_bridge(syskeeper_forwarder, ?SYSKEEPER_NAME), + delete_connectors(syskeeper_forwarder, ?SYSKEEPER_NAME), + delete_connectors(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), ok. %%------------------------------------------------------------------------------ %% Helper fns %%------------------------------------------------------------------------------ syskeeper_config(Config) -> - AckMode = proplists:get_value(ack_mode, Config, no_ack), BatchSize = case proplists:get_value(enable_batch, Config, false) of true -> ?BATCH_SIZE; @@ -89,82 +103,126 @@ syskeeper_config(Config) -> end, ConfigString = io_lib:format( - "bridges.~s.~s {\n" + "actions.~s.~s {\n" " enable = true\n" - " server = \"~ts\"\n" - " ack_mode = ~p\n" - " ack_timeout = \"~ts\"\n" + " connector = ~ts\n" " resource_opts = {\n" " request_ttl = 500ms\n" " batch_size = ~b\n" " }\n" "}", [ - syskeeper, + syskeeper_forwarder, + ?SYSKEEPER_NAME, ?SYSKEEPER_NAME, - server(), - AckMode, - ?ACK_TIMEOUT, BatchSize ] ), - {?SYSKEEPER_NAME, parse_and_check(ConfigString, syskeeper, ?SYSKEEPER_NAME)}. + {?SYSKEEPER_NAME, parse_bridge_and_check(ConfigString, syskeeper_forwarder, ?SYSKEEPER_NAME)}. + +syskeeper_connector_config(Config) -> + AckMode = proplists:get_value(ack_mode, Config, no_ack), + ConfigString = + io_lib:format( + "connectors.~s.~s {\n" + " enable = true\n" + " server = \"~ts\"\n" + " ack_mode = ~p\n" + " ack_timeout = ~p\n" + " pool_size = 1\n" + "}", + [ + syskeeper_forwarder, + ?SYSKEEPER_NAME, + server(), + AckMode, + ?ACK_TIMEOUT + ] + ), + {?SYSKEEPER_NAME, + parse_connectors_and_check(ConfigString, syskeeper_forwarder, ?SYSKEEPER_NAME)}. syskeeper_proxy_config(_Config) -> ConfigString = io_lib:format( - "bridges.~s.~s {\n" + "connectors.~s.~s {\n" " enable = true\n" " listen = \"~ts\"\n" " acceptors = 1\n" + " handshake_timeout = ~p\n" "}", [ syskeeper_proxy, ?SYSKEEPER_PROXY_NAME, - server() + server(), + ?HANDSHAKE_TIMEOUT ] ), - {?SYSKEEPER_PROXY_NAME, parse_and_check(ConfigString, syskeeper_proxy, ?SYSKEEPER_PROXY_NAME)}. + {?SYSKEEPER_PROXY_NAME, + parse_connectors_and_check(ConfigString, syskeeper_proxy, ?SYSKEEPER_PROXY_NAME)}. -parse_and_check(ConfigString, BridgeType0, Name) -> - BridgeType = to_bin(BridgeType0), - ct:pal("ConfigString:~ts~n", [ConfigString]), +parse_and_check(ConfigString, SchemaMod, RootKey, Type0, Name) -> + Type = to_bin(Type0), {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), - hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), - #{<<"bridges">> := #{BridgeType := #{Name := Config}}} = RawConf, + hocon_tconf:check_plain(SchemaMod, RawConf, #{required => false, atom_key => false}), + #{RootKey := #{Type := #{Name := Config}}} = RawConf, Config. +parse_bridge_and_check(ConfigString, BridgeType, Name) -> + parse_and_check(ConfigString, emqx_bridge_schema, <<"actions">>, BridgeType, Name). + +parse_connectors_and_check(ConfigString, ConnectorType, Name) -> + Config = parse_and_check( + ConfigString, emqx_connector_schema, <<"connectors">>, ConnectorType, Name + ), + emqx_utils_maps:safe_atom_key_map(Config). + create_bridge(Type, Name, Conf) -> - emqx_bridge:create(Type, Name, Conf). + emqx_bridge_v2:create(Type, Name, Conf). delete_bridge(Type, Name) -> - emqx_bridge:remove(Type, Name). + emqx_bridge_v2:remove(Type, Name). create_both_bridge(Config) -> {ProxyName, ProxyConf} = syskeeper_proxy_config(Config), - ?assertMatch( - {ok, _}, - create_bridge(syskeeper_proxy, ProxyName, ProxyConf) - ), + {ConnectorName, ConnectorConf} = syskeeper_connector_config(Config), {Name, Conf} = syskeeper_config(Config), ?assertMatch( {ok, _}, - create_bridge(syskeeper, Name, Conf) - ). + create_connectors(syskeeper_proxy, ProxyName, ProxyConf) + ), + timer:sleep(1000), + ?assertMatch( + {ok, _}, + create_connectors(syskeeper_forwarder, ConnectorName, ConnectorConf) + ), + timer:sleep(1000), + ?assertMatch({ok, _}, create_bridge(syskeeper_forwarder, Name, Conf)). create_bridge_http(Params) -> - Path = emqx_mgmt_api_test_util:api_path(["bridges"]), + call_create_http("actions", Params). + +create_connectors_http(Params) -> + call_create_http("connectors", Params). + +call_create_http(Root, Params) -> + Path = emqx_mgmt_api_test_util:api_path([Root]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; Error -> Error end. +create_connectors(Type, Name, Conf) -> + emqx_connector:create(Type, Name, Conf). + +delete_connectors(Type, Name) -> + emqx_connector:remove(Type, Name). + send_message(_Config, Payload) -> Name = ?SYSKEEPER_NAME, - BridgeType = syskeeper, - BridgeID = emqx_bridge_resource:bridge_id(BridgeType, Name), - emqx_bridge:send_message(BridgeID, Payload). + BridgeType = syskeeper_forwarder, + emqx_bridge_v2:send_message(BridgeType, Name, Payload, #{}). to_bin(List) when is_list(List) -> unicode:characters_to_binary(List, utf8); @@ -197,23 +255,23 @@ receive_msg() -> %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ -t_setup_via_config(Config) -> +t_setup_proxy_via_config(Config) -> {Name, Conf} = syskeeper_proxy_config(Config), ?assertMatch( {ok, _}, - create_bridge(syskeeper_proxy, Name, Conf) + create_connectors(syskeeper_proxy, Name, Conf) ), ?assertMatch( X when is_pid(X), esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) ), - delete_bridge(syskeeper_proxy, Name), + delete_connectors(syskeeper_proxy, Name), ?assertError( not_found, esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) ). -t_setup_via_http_api(Config) -> +t_setup_proxy_via_http_api(Config) -> {Name, ProxyConf0} = syskeeper_proxy_config(Config), ProxyConf = ProxyConf0#{ <<"name">> => Name, @@ -221,7 +279,7 @@ t_setup_via_http_api(Config) -> }, ?assertMatch( {ok, _}, - create_bridge_http(ProxyConf) + create_connectors_http(ProxyConf) ), ?assertMatch( @@ -229,29 +287,64 @@ t_setup_via_http_api(Config) -> esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) ), - delete_bridge(syskeeper_proxy, Name), + delete_connectors(syskeeper_proxy, Name), ?assertError( not_found, esockd:listener({emqx_bridge_syskeeper_proxy_server, {?HOST, ?PORT}}) ). +t_setup_forwarder_via_config(Config) -> + {ConnectorName, ConnectorConf} = syskeeper_connector_config(Config), + {Name, Conf} = syskeeper_config(Config), + ?assertMatch( + {ok, _}, + create_connectors(syskeeper_forwarder, ConnectorName, ConnectorConf) + ), + ?assertMatch({ok, _}, create_bridge(syskeeper_forwarder, Name, Conf)). + +t_setup_forwarder_via_http_api(Config) -> + {ConnectorName, ConnectorConf0} = syskeeper_connector_config(Config), + {Name, Conf0} = syskeeper_config(Config), + + ConnectorConf = ConnectorConf0#{ + <<"name">> => ConnectorName, + <<"type">> => syskeeper_forwarder + }, + + Conf = Conf0#{ + <<"name">> => Name, + <<"type">> => syskeeper_forwarder + }, + + ?assertMatch( + {ok, _}, + create_connectors_http(ConnectorConf) + ), + + ?assertMatch( + {ok, _}, + create_bridge_http(Conf) + ). + t_get_status(Config) -> create_both_bridge(Config), - SyskeeperId = emqx_bridge_resource:resource_id(syskeeper, ?SYSKEEPER_NAME), - ProxyId = emqx_bridge_resource:resource_id(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(SyskeeperId)), - ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ProxyId)), - delete_bridge(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + ?assertMatch( + #{status := connected}, emqx_bridge_v2:health_check(syskeeper_forwarder, ?SYSKEEPER_NAME) + ), + delete_connectors(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), ?retry( _Sleep = 500, _Attempts = 10, - ?assertMatch({ok, connecting}, emqx_resource_manager:health_check(SyskeeperId)) + ?assertMatch( + #{status := connecting}, + emqx_bridge_v2:health_check(syskeeper_forwarder, ?SYSKEEPER_NAME) + ) ). t_write_failure(Config) -> create_both_bridge(Config), - delete_bridge(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), + delete_connectors(syskeeper_proxy, ?SYSKEEPER_PROXY_NAME), SentData = make_message(), Result = ?wait_async_action( diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index c8ec8e1be..19b9fa244 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -25,6 +25,10 @@ resource_type(kafka_producer) -> %% We use AEH's Kafka interface. resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(syskeeper_forwarder) -> + emqx_bridge_syskeeper_connector; +resource_type(syskeeper_proxy) -> + emqx_bridge_syskeeper_proxy_server; resource_type(Type) -> error({unknown_connector_type, Type}). @@ -56,6 +60,22 @@ connector_structs() -> desc => <<"Azure Event Hub Connector Config">>, required => false } + )}, + {syskeeper_forwarder, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper_connector, config)), + #{ + desc => <<"Syskeeper Connector Config">>, + required => false + } + )}, + {syskeeper_proxy, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper_proxy, config)), + #{ + desc => <<"Syskeeper Proxy Connector Config">>, + required => false + } )} ]. @@ -74,7 +94,9 @@ examples(Method) -> schema_modules() -> [ emqx_bridge_kafka, - emqx_bridge_azure_event_hub + emqx_bridge_azure_event_hub, + emqx_bridge_syskeeper_connector, + emqx_bridge_syskeeper_proxy ]. api_schemas(Method) -> @@ -82,7 +104,11 @@ api_schemas(Method) -> %% We need to map the `type' field of a request (binary) to a %% connector schema module. api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector") + api_ref( + emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_connector" + ), + api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), + api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method) ]. api_ref(Module, Type, Method) -> diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 22eb523be..e4308ac54 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -60,7 +60,9 @@ enterprise_fields_connectors() -> []. -endif. connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; -connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]. +connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; +connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; +connector_type_to_bridge_types(syskeeper_proxy) -> []. actions_config_name() -> <<"actions">>. diff --git a/rel/i18n/emqx_bridge_syskeeper_connector.hocon b/rel/i18n/emqx_bridge_syskeeper_connector.hocon index a057e6647..3e93458e0 100644 --- a/rel/i18n/emqx_bridge_syskeeper_connector.hocon +++ b/rel/i18n/emqx_bridge_syskeeper_connector.hocon @@ -1,5 +1,17 @@ emqx_bridge_syskeeper_connector { +desc_config.desc: +"""Configuration for a Syskeeper forwarder connector""" + +desc_config.label: +"""Syskeeper Forwarder Connector Configuration""" + +config_enable.desc: +"""Enable or disable this connector""" + +config_enable.label: +"""Enable Or Disable connector""" + server.desc: """The address of the Syskeeper proxy server""" diff --git a/rel/i18n/emqx_bridge_syskeeper_proxy.hocon b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon index f6c519216..dc96486d1 100644 --- a/rel/i18n/emqx_bridge_syskeeper_proxy.hocon +++ b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon @@ -1,16 +1,16 @@ emqx_bridge_syskeeper_proxy { config_enable.desc: -"""Enable or disable this bridge""" +"""Enable or disable this connector""" config_enable.label: -"""Enable Or Disable Bridge""" +"""Enable Or Disable Connector""" desc_config.desc: -"""Configuration for a Syskeeper proxy bridge""" +"""Configuration for a Syskeeper proxy connector""" desc_config.label: -"""Syskeeper Proxy Bridge Configuration""" +"""Syskeeper Proxy Connector Configuration""" desc_name.desc: """Bridge name"""