From ce83079c6b855b2a579e4cff3e1538e8b19821f1 Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 10 Nov 2023 09:51:43 +0800 Subject: [PATCH] feat(sysk): integrated Nari Syskeeper 2000 as a new bridge backend --- .../src/schema/emqx_bridge_enterprise.erl | 35 +- apps/emqx_bridge_syskeeper/BSL.txt | 94 +++++ apps/emqx_bridge_syskeeper/README.md | 30 ++ apps/emqx_bridge_syskeeper/doc/protocol_v1.md | 370 ++++++++++++++++++ .../emqx_bridge_syskeeper/doc/protocol_v1.org | 80 ++++ apps/emqx_bridge_syskeeper/docker-ct | 1 + .../include/emqx_bridge_syskeeper.hrl | 15 + apps/emqx_bridge_syskeeper/rebar.config | 6 + .../src/emqx_bridge_syskeeper.app.src | 13 + .../src/emqx_bridge_syskeeper.erl | 117 ++++++ .../src/emqx_bridge_syskeeper_client.erl | 180 +++++++++ .../src/emqx_bridge_syskeeper_connector.erl | 262 +++++++++++++ .../src/emqx_bridge_syskeeper_frame.erl | 163 ++++++++ .../src/emqx_bridge_syskeeper_frame_v1.erl | 70 ++++ .../src/emqx_bridge_syskeeper_proxy.erl | 100 +++++ .../emqx_bridge_syskeeper_proxy_server.erl | 251 ++++++++++++ apps/emqx_machine/priv/reboot_lists.eterm | 3 +- mix.exs | 3 +- rebar.config.erl | 1 + rel/i18n/emqx_bridge_syskeeper.hocon | 45 +++ .../emqx_bridge_syskeeper_connector.hocon | 21 + rel/i18n/emqx_bridge_syskeeper_proxy.hocon | 45 +++ 22 files changed, 1899 insertions(+), 6 deletions(-) create mode 100644 apps/emqx_bridge_syskeeper/BSL.txt create mode 100644 apps/emqx_bridge_syskeeper/README.md create mode 100644 apps/emqx_bridge_syskeeper/doc/protocol_v1.md create mode 100644 apps/emqx_bridge_syskeeper/doc/protocol_v1.org create mode 100644 apps/emqx_bridge_syskeeper/docker-ct create mode 100644 apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl create mode 100644 apps/emqx_bridge_syskeeper/rebar.config create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl create mode 100644 apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl create mode 100644 rel/i18n/emqx_bridge_syskeeper.hocon create mode 100644 rel/i18n/emqx_bridge_syskeeper_connector.hocon create mode 100644 rel/i18n/emqx_bridge_syskeeper_proxy.hocon diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 93951cca0..a160ecd33 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -50,7 +50,9 @@ api_schemas(Method) -> api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method), api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer"), api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_grpc_v1"), - api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer") + api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer"), + api_ref(emqx_bridge_syskeeper, <<"syskeeper">>, Method), + api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method) ]. schema_modules() -> @@ -78,7 +80,9 @@ schema_modules() -> emqx_bridge_rabbitmq, emqx_bridge_kinesis, emqx_bridge_greptimedb, - emqx_bridge_azure_event_hub + emqx_bridge_azure_event_hub, + emqx_bridge_syskeeper, + emqx_bridge_syskeeper_proxy ]. examples(Method) -> @@ -126,7 +130,9 @@ resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector; resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer; resource_type(greptimedb) -> emqx_bridge_greptimedb_connector; %% We use AEH's Kafka interface. -resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer. +resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(syskeeper) -> emqx_bridge_syskeeper_connector; +resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server. %% For bridges that need to override connector configurations. bridge_impl_module(BridgeType) when is_binary(BridgeType) -> @@ -215,7 +221,8 @@ fields(bridges) -> influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++ - kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs(). + kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs() ++ + syskeeper_structs(). mongodb_structs() -> [ @@ -428,6 +435,26 @@ azure_event_hub_structs() -> )} ]. +syskeeper_structs() -> + [ + {syskeeper, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper, "config")), + #{ + desc => <<"Syskeeper bridge config ">>, + required => false + } + )}, + {syskeeper_proxy, + mk( + hoconsc:map(name, ref(emqx_bridge_syskeeper_proxy, "config")), + #{ + desc => <<"Syskeeper proxy server config">>, + required => false + } + )} + ]. + api_ref(Module, Type, Method) -> {Type, ref(Module, Method)}. diff --git a/apps/emqx_bridge_syskeeper/BSL.txt b/apps/emqx_bridge_syskeeper/BSL.txt new file mode 100644 index 000000000..0acc0e696 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/BSL.txt @@ -0,0 +1,94 @@ +Business Source License 1.1 + +Licensor: Hangzhou EMQ Technologies Co., Ltd. +Licensed Work: EMQX Enterprise Edition + The Licensed Work is (c) 2023 + Hangzhou EMQ Technologies Co., Ltd. +Additional Use Grant: Students and educators are granted right to copy, + modify, and create derivative work for research + or education. +Change Date: 2027-02-01 +Change License: Apache License, Version 2.0 + +For information about alternative licensing arrangements for the Software, +please contact Licensor: https://www.emqx.com/en/contact + +Notice + +The Business Source License (this document, or the “License”) is not an Open +Source license. However, the Licensed Work will eventually be made available +under an Open Source License, as stated in this License. + +License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved. +“Business Source License” is a trademark of MariaDB Corporation Ab. + +----------------------------------------------------------------------------- + +Business Source License 1.1 + +Terms + +The Licensor hereby grants you the right to copy, modify, create derivative +works, redistribute, and make non-production use of the Licensed Work. The +Licensor may make an Additional Use Grant, above, permitting limited +production use. + +Effective on the Change Date, or the fourth anniversary of the first publicly +available distribution of a specific version of the Licensed Work under this +License, whichever comes first, the Licensor hereby grants you rights under +the terms of the Change License, and the rights granted in the paragraph +above terminate. + +If your use of the Licensed Work does not comply with the requirements +currently in effect as described in this License, you must purchase a +commercial license from the Licensor, its affiliated entities, or authorized +resellers, or you must refrain from using the Licensed Work. + +All copies of the original and modified Licensed Work, and derivative works +of the Licensed Work, are subject to this License. This License applies +separately for each version of the Licensed Work and the Change Date may vary +for each version of the Licensed Work released by Licensor. + +You must conspicuously display this License on each original or modified copy +of the Licensed Work. If you receive the Licensed Work in original or +modified form from a third party, the terms and conditions set forth in this +License apply to your use of that work. + +Any use of the Licensed Work in violation of this License will automatically +terminate your rights under this License for the current and all other +versions of the Licensed Work. + +This License does not grant you any right in any trademark or logo of +Licensor or its affiliates (provided that you may use a trademark or logo of +Licensor as expressly required by this License). + +TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON +AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS, +EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND +TITLE. + +MariaDB hereby grants you permission to use this License’s text to license +your works, and to refer to it using the trademark “Business Source License”, +as long as you comply with the Covenants of Licensor below. + +Covenants of Licensor + +In consideration of the right to use this License’s text and the “Business +Source License” name and trademark, Licensor covenants to MariaDB, and to all +other recipients of the licensed work to be provided by Licensor: + +1. To specify as the Change License the GPL Version 2.0 or any later version, + or a license that is compatible with GPL Version 2.0 or a later version, + where “compatible” means that software provided under the Change License can + be included in a program with software provided under GPL Version 2.0 or a + later version. Licensor may specify additional Change Licenses without + limitation. + +2. To either: (a) specify an additional grant of rights to use that does not + impose any additional restriction on the right granted in this License, as + the Additional Use Grant; or (b) insert the text “None”. + +3. To specify a Change Date. + +4. Not to modify this License in any other way. diff --git a/apps/emqx_bridge_syskeeper/README.md b/apps/emqx_bridge_syskeeper/README.md new file mode 100644 index 000000000..328fd488e --- /dev/null +++ b/apps/emqx_bridge_syskeeper/README.md @@ -0,0 +1,30 @@ +# EMQX Syskeeper Bridge + +Nari Syskeeper 2000 is a one-way Physical Isolation Net Gap. + +The application is used to connect EMQX and Syskeeper. +Users can create a rule and quickly ingest IoT data to the Syskeeper by leveraging +[EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html). + +# Documentation + +- Refer to [Rules engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html) + for the EMQX rules engine introduction. + +# HTTP APIs + +- Several APIs are provided for bridge management, which includes create bridge, + update bridge, get bridge, stop or restart bridge and list bridges etc. + + Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges) + for more detailed information. + + +# Contributing + +Please see our [contributing.md](../../CONTRIBUTING.md). + + +# License + +EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt). diff --git a/apps/emqx_bridge_syskeeper/doc/protocol_v1.md b/apps/emqx_bridge_syskeeper/doc/protocol_v1.md new file mode 100644 index 000000000..ca73c300d --- /dev/null +++ b/apps/emqx_bridge_syskeeper/doc/protocol_v1.md @@ -0,0 +1,370 @@ + +# Table of Contents + +1. [Packet Format](#orgb2a43d1) +2. [Common Header](#org5ca4c69) + 1. [Types](#org240efb3) + 2. [Shared Flags](#org804fcce) +3. [Handshake Packet](#org6a73ea8) +4. [Forward Packet](#org39c753e) + 1. [Flags](#org5177d26) + 2. [Payload](#orgb29cbd7) + 1. [Message Content map structure](#org75acfe6) +5. [Heartbeat Packet](#org388b69a) + + + + +# Packet Format + + + + + + + + + + + + + + + + + + +
+  bytes  + +   0   + +   1   + +   2   + +   3   + +         5         + +     6 .. end     +
+         + +     variable length     + +   common header   + +     payload      +
+ +The length of the remaining part(common header + payload) is indicated by the Length Header of each packet + + + + +# Common Header + + + + + + + + + + + + + + + + + + + +
+  bits  + +   0   + +   1   + +   2   + +   3   + +   4   + +   5   + +   6   + +   7   +
+        + +       packet type       + +      shared flags       +
+ + + + +## Types + + + + + + + + + + + + + + + + + + + +
+    type    + +    usage    +
+     0      + +  handshake  +
+     1      + +   forward   +
+     2      + +  heartbeat  +
+ + + + +## Shared Flags + +The usage of each bit is determined by the type of packet + + + + +# Handshake Packet + + + + + + + + + + + + + +
+  bytes  + +        0        + +        1        +
+         + +  common header  + +     version     +
+ + + + +# Forward Packet + + + + + + + + + + + + + + + + + + + + + + + + + +
+  bits  + +  0  + +  1  + +  2  + +  3  + +  4  + +  5  + +  6  + +   7   + +     ...     +
+       
+       
+        +
+                
+   packet type  
+                 +
+             + +  ACK  + +            
+   payload  
+             +
+   forward flags   +
+ + + + +## Flags + + + + + + + + + + + +
+  flag  + +                    usage                    +
+  ACK   + +       This packet need a ACK response       +
+ + + + +## Payload + + + + + + + + + + + + + + + + + +
+  bytes  + +   0   + +   ..    + +   n   + +  n+1  + +  ..   + +   x   +
+         + +   Content Length    + +  Message Content  +
+ +- Content length is a variable length number. +- Message content is a list in an opaque binary format whose element is a map structure + + + + +### Message Content map structure + + { + id: "0006081CCFF3D48F03C10000058B0000", // unique message id + qos: 1, + flags: {dup: false, retain: false}, + from: "clientid", + topic: "t/1", + payload: "hello, world", + timestamp: 1697786555281 + } + + + + +# Heartbeat Packet + + + + + + + + + + + +
+  bytes  + +        0        +
+         + +  common header  +
+ diff --git a/apps/emqx_bridge_syskeeper/doc/protocol_v1.org b/apps/emqx_bridge_syskeeper/doc/protocol_v1.org new file mode 100644 index 000000000..12d0fe850 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/doc/protocol_v1.org @@ -0,0 +1,80 @@ +* Packet Format + +-------+-----+-----+-----+-----+-----------------+----------------+ + | bytes | 0 | 1 | 2 | 3 | 5 | 6 .. end | + +-------+-----+-----+-----+-----+-----------------+----------------+ + | | variable length | common header | payload | + +-------+-----------------------+-----------------+----------------+ + + The length of the remaining part(common header + payload) is indicated by the Length Header of each packet + +* Common Header + +------+-----+-----+-----+-----+-----+-----+-----+-----+ + | bits | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | + +------+-----+-----+-----+-----+-----+-----+-----+-----+ + | | packet type | shared flags | + +------+-----------------------+-----------------------+ +** Types + +----------+-----------+ + | type | usage | + +----------+-----------+ + | 0 | handshake | + +----------+-----------+ + | 1 | forward | + +----------+-----------+ + | 2 | heartbeat | + +----------+-----------+ +** Shared Flags + The usage of each bit is determined by the type of packet +* Handshake Packet + +-------+---------------+---------------+ + | bytes | 0 | 1 | + +-------+---------------+---------------+ + | | common header | version | + +-------+---------------+---------------+ +* Forward Packet + +------+---+---+---+---+---+---+---+-----+-----------+ + | bits | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | ... | + +------+---+---+---+---+---+---+---+-----+-----------+ + | | | | ACK | | + | | packet type +-----------+-----+ payload | + | | | forward flags | | + +------+---------------+-----------------+-----------+ + +** Flags + +------+-------------------------------------------+ + | flag | usage | + +------+-------------------------------------------+ + | ACK | This packet need a ACK response | + +------+-------------------------------------------+ + +** Payload + +-------+-----+-------+-----+-----+-----+-----+ + | bytes | 0 | .. | n | n+1 | .. | x | + +-------+-----+-------+-----+-----+-----+-----+ + | | Content Length | Message Content | + +-------+-------------------+-----------------+ + + + Content length is a variable length number. + + Message content is a list in an opaque binary format whose element is a map structure + +*** Message Content map structure + +#+begin_src json + { + id: "0006081CCFF3D48F03C10000058B0000", // unique message id + qos: 1, + flags: {dup: false, retain: false}, + from: "clientid", + topic: "t/1", + payload: "hello, world", + timestamp: 1697786555281 + } +#+end_src + +* Heartbeat Packet + + +-------+---------------+ + | bytes | 0 | + +-------+---------------+ + | | common header | + +-------+---------------+ diff --git a/apps/emqx_bridge_syskeeper/docker-ct b/apps/emqx_bridge_syskeeper/docker-ct new file mode 100644 index 000000000..80f0d394b --- /dev/null +++ b/apps/emqx_bridge_syskeeper/docker-ct @@ -0,0 +1 @@ +toxiproxy diff --git a/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl b/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl new file mode 100644 index 000000000..b381ebf50 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl @@ -0,0 +1,15 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-ifndef(EMQX_BRIDGE_SYSKEEPER). +-define(EMQX_BRIDGE_SYSKEEPER, true). + +-define(TYPE_HANDSHAKE, 0). +-define(TYPE_FORWARD, 1). +-define(TYPE_HEARTBEAT, 2). + +-type packet_type() :: handshake | forward | heartbeat. +-type packet_data() :: none | binary() | [binary()]. +-type packet_type_val() :: ?TYPE_HANDSHAKE..?TYPE_HEARTBEAT. + +-endif. diff --git a/apps/emqx_bridge_syskeeper/rebar.config b/apps/emqx_bridge_syskeeper/rebar.config new file mode 100644 index 000000000..31879d9ce --- /dev/null +++ b/apps/emqx_bridge_syskeeper/rebar.config @@ -0,0 +1,6 @@ +%% -*- mode: erlang; -*- +{erl_opts, [debug_info]}. +{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}} + , {emqx_resource, {path, "../../apps/emqx_resource"}} + , {emqx_bridge, {path, "../../apps/emqx_bridge"}} + ]}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src new file mode 100644 index 000000000..a8f533867 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src @@ -0,0 +1,13 @@ +{application, emqx_bridge_syskeeper, [ + {description, "EMQX Enterprise Bridge"}, + {vsn, "0.1.0"}, + {registered, []}, + {applications, [ + kernel, + stdlib, + emqx_resource + ]}, + {env, []}, + {modules, []}, + {links, []} +]}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl new file mode 100644 index 000000000..9e0ec4eed --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl @@ -0,0 +1,117 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_syskeeper). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1, + values/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api +conn_bridge_examples(Method) -> + [ + #{ + <<"syskeeper">> => #{ + summary => <<"Syskeeper Bridge">>, + value => values(Method) + } + } + ]. + +values(_Method) -> + #{ + enable => true, + type => syskeeper, + name => <<"foo">>, + server => <<"127.0.0.1:9092">>, + ack_mode => <<"no_ack">>, + ack_timeout => <<"10s">>, + pool_size => 16, + target_topic => <<"${topic}">>, + target_qos => <<"-1">>, + template => <<"${payload}">>, + resource_opts => #{ + worker_pool_size => 16, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => sync, + max_buffer_bytes => ?DEFAULT_BUFFER_BYTES + } + }. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_syskeeper". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {target_topic, + mk( + binary(), + #{desc => ?DESC("target_topic"), default => <<"${topic}">>} + )}, + {target_qos, + mk( + range(-1, 2), + #{desc => ?DESC("target_qos"), default => -1} + )}, + {template, + mk( + binary(), + #{desc => ?DESC("template"), default => <<"${payload}">>} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ emqx_bridge_syskeeper_connector:fields(config); +fields("creation_opts") -> + emqx_resource_schema:create_opts([{request_ttl, #{default => infinity}}]); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Syskeeper using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- + +type_field() -> + {type, mk(enum([syskeeper]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl new file mode 100644 index 000000000..18822886f --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl @@ -0,0 +1,180 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_syskeeper_client). + +-behaviour(gen_server). + +%% API +-export([ + start_link/1, + forward/3, + heartbeat/2 +]). + +%% gen_server callbacks +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3, + format_status/2 +]). + +-include("emqx_bridge_syskeeper.hrl"). + +-type state() :: #{ + ack_mode := need_ack | no_ack, + ack_timeout := timer:time(), + socket := undefined | inet:socket(), + frame_state := emqx_bridge_syskeeper_frame:state(), + last_error := undefined | tuple() +}. + +-type send_result() :: {ok, state()} | {error, term()}. + +%% ------------------------------------------------------------------------------------------------- +%% API +forward(Pid, Msg, Timeout) -> + call(Pid, {?FUNCTION_NAME, Msg}, Timeout). + +heartbeat(Pid, Timeout) -> + ok =:= call(Pid, ?FUNCTION_NAME, Timeout). + +%% ------------------------------------------------------------------------------------------------- +%% Starts Bridge which transfer data to Syskeeper + +start_link(Options) -> + gen_server:start_link(?MODULE, Options, []). + +%% ------------------------------------------------------------------------------------------------- +%%% gen_server callbacks + +%% Initialize syskeeper client +init(#{ack_timeout := AckTimeout, ack_mode := AckMode} = Options) -> + erlang:process_flag(trap_exit, true), + connect(Options, #{ + ack_timeout => AckTimeout, + ack_mode => AckMode, + socket => undefined, + last_error => undefined, + frame_state => emqx_bridge_syskeeper_frame:make_state_with_conf(Options) + }). + +handle_call({forward, Msgs}, _From, State) -> + Result = send_packet(forward, Msgs, State), + handle_reply_result(Result, State); +handle_call(heartbeat, _From, State) -> + Result = send_ack_packet(heartbeat, none, State), + handle_reply_result(Result, State); +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({tcp_closed, _} = Reason, State) -> + {noreply, State#{socket := undefined, last_error := Reason}}; +handle_info({last_error, _, _} = Reason, State) -> + {noreply, State#{socket := undefined, last_error := Reason}}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, #{socket := Socket} = _State) -> + close_socket(Socket), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +-spec format_status( + Opt :: normal | terminate, + Status :: list() +) -> Status :: term(). +format_status(_Opt, Status) -> + Status. + +%% ------------------------------------------------------------------------------------------------ +connect( + #{ + hostname := Host, + port := Port + }, + State +) -> + case + gen_tcp:connect(Host, Port, [ + {active, true}, + {mode, binary}, + {nodelay, true} + ]) + of + {ok, Socket} -> + send_ack_packet(handshake, none, State#{socket := Socket}); + {error, Reason} -> + {stop, Reason} + end. + +-spec send_ack_packet(packet_type(), packet_data(), state()) -> send_result(). +send_ack_packet(Type, Data, State) -> + send_packet(Type, Data, State, true). + +-spec send_packet(packet_type(), packet_data(), state()) -> send_result(). +send_packet(Type, Data, State) -> + send_packet(Type, Data, State, false). + +-spec send_packet(packet_type(), packet_data(), state(), boolean()) -> send_result(). +send_packet(_Type, _Data, #{socket := undefined, last_error := Reason}, _Force) -> + {error, Reason}; +send_packet(Type, Data, #{frame_state := FrameState} = State, Force) -> + Packet = emqx_bridge_syskeeper_frame:encode(Type, Data, FrameState), + case socket_send(Packet, State) of + ok -> + wait_ack(State, Force); + {error, _} = Error -> + Error + end. + +-spec socket_send(binary() | [binary()], state()) -> ok | {error, _Reason}. +socket_send(Bin, State) when is_binary(Bin) -> + socket_send([Bin], State); +socket_send(Bins, #{socket := Socket}) -> + Map = fun(Data) -> + Len = erlang:byte_size(Data), + VarLen = emqx_bridge_syskeeper_frame:serialize_variable_byte_integer(Len), + <> + end, + gen_tcp:send(Socket, lists:map(Map, Bins)). + +-spec wait_ack(state(), boolean()) -> send_result(). +wait_ack(#{ack_timeout := AckTimeout, ack_mode := AckMode} = State, Force) when + AckMode =:= need_ack; Force +-> + receive + {tcp, _Socket, <<16#FF>>} -> + {ok, State}; + {tcp_closed, _} = Reason -> + {error, Reason}; + {tcp_error, _, _} = Reason -> + {error, Reason} + after AckTimeout -> + {error, wait_ack_timeout} + end; +wait_ack(State, _Force) -> + {ok, State}. + +close_socket(undefined) -> + ok; +close_socket(Socket) -> + catch gen_tcp:close(Socket), + ok. + +call(Pid, Msg, Timeout) -> + gen_server:call(Pid, Msg, Timeout). + +handle_reply_result({ok, _}, State) -> + {reply, ok, State}; +handle_reply_result({error, Reason}, State) -> + {reply, {error, {recoverable_error, Reason}}, State#{last_error := Reason}}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl new file mode 100644 index 000000000..219d4d0d2 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl @@ -0,0 +1,262 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_syskeeper_connector). + +-behaviour(emqx_resource). + +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-export([roots/0, fields/1]). + +%% `emqx_resource' API +-export([ + callback_mode/0, + query_mode/1, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +-export([ + connect/1 +]). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-define(SYSKEEPER_HOST_OPTIONS, #{ + default_port => 9092 +}). + +-define(EXTRA_CALL_TIMEOUT, 2000). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon schema +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {server, server()}, + {ack_mode, + mk( + enum([need_ack, no_ack]), + #{desc => ?DESC(ack_mode), default => <<"no_ack">>} + )}, + {ack_timeout, + mk( + emqx_schema:timeout_duration_ms(), + #{desc => ?DESC(ack_timeout), default => <<"10s">>} + )}, + {pool_size, fun + (default) -> + 16; + (Other) -> + emqx_connector_schema_lib:pool_size(Other) + end} + ]. + +server() -> + Meta = #{desc => ?DESC("server")}, + emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). + +%% ------------------------------------------------------------------------------------------------- +%% `emqx_resource' API + +callback_mode() -> always_sync. + +query_mode(_) -> sync. + +on_start( + InstanceId, + #{ + server := Server, + pool_size := PoolSize, + ack_timeout := AckTimeout, + target_topic := TargetTopic, + target_qos := TargetQoS + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_syskeeper_connector", + connector => InstanceId, + config => redact(Config) + }), + + HostCfg = emqx_schema:parse_server(Server, ?SYSKEEPER_HOST_OPTIONS), + + Options = [ + {options, + maps:merge( + HostCfg, + maps:with([ack_mode, ack_timeout], Config) + )}, + {pool_size, PoolSize} + ], + + State = #{ + pool_name => InstanceId, + target_qos => TargetQoS, + ack_timeout => AckTimeout, + templates => parse_template(Config), + target_topic_tks => emqx_placeholder:preproc_tmpl(TargetTopic) + }, + case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of + ok -> + {ok, State}; + Error -> + Error + end. + +on_stop(InstanceId, _State) -> + ?SLOG(info, #{ + msg => "stopping_syskeeper_connector", + connector => InstanceId + }), + emqx_resource_pool:stop(InstanceId). + +on_query(InstanceId, {send_message, _} = Query, State) -> + do_query(InstanceId, [Query], State); +on_query(_InstanceId, Query, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +%% we only support batch insert +on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) -> + do_query(InstanceId, Query, State); +on_batch_query(_InstanceId, Query, _State) -> + {error, {unrecoverable_error, {invalid_request, Query}}}. + +on_get_status(_InstanceId, #{pool_name := Pool, ack_timeout := AckTimeout}) -> + Health = emqx_resource_pool:health_check_workers( + Pool, {emqx_bridge_syskeeper_client, heartbeat, [AckTimeout + ?EXTRA_CALL_TIMEOUT]} + ), + status_result(Health). + +status_result(true) -> connected; +status_result(false) -> connecting; +status_result({error, _}) -> connecting. + +%% ------------------------------------------------------------------------------------------------- +%% Helper fns + +do_query( + InstanceId, + Query, + #{pool_name := PoolName, ack_timeout := AckTimeout} = State +) -> + ?TRACE( + "QUERY", + "syskeeper_connector_received", + #{connector => InstanceId, query => Query, state => State} + ), + + Result = + case try_apply_template(Query, State) of + {ok, Msg} -> + ecpool:pick_and_do( + PoolName, + {emqx_bridge_syskeeper_client, forward, [Msg, AckTimeout + ?EXTRA_CALL_TIMEOUT]}, + no_handover + ); + Error -> + Error + end, + + case Result of + {error, Reason} -> + ?tp( + syskeeper_connector_query_return, + #{error => Reason} + ), + %% ?SLOG(error, #{ + %% msg => "syskeeper_connector_do_query_failed", + %% connector => InstanceId, + %% query => Query, + %% reason => Reason + %% }), + case Reason of + ecpool_empty -> + {error, {recoverable_error, Reason}}; + _ -> + Result + end; + _ -> + %% ?tp( + %% syskeeper_connector_query_return, + %% #{result => Result} + %% ), + Result + end. + +connect(Opts) -> + Options = proplists:get_value(options, Opts), + emqx_bridge_syskeeper_client:start_link(Options). + +parse_template(Config) -> + Templates = + case maps:get(template, Config, undefined) of + undefined -> #{}; + <<>> -> #{}; + Template -> #{send_message => Template} + end, + + parse_template(maps:to_list(Templates), #{}). + +parse_template([{Key, H} | T], Templates) -> + ParamsTks = emqx_placeholder:preproc_tmpl(H), + parse_template( + T, + Templates#{Key => ParamsTks} + ); +parse_template([], Templates) -> + Templates. + +try_apply_template([{Type, _} | _] = Datas, #{templates := Templates} = State) -> + case maps:find(Type, Templates) of + {ok, Template} -> + {ok, apply_template(Datas, Template, State)}; + _ -> + {error, {unrecoverable_error, {invalid_request, Datas}}} + end. + +apply_template(Datas, Template, State) -> + lists:map( + fun({_, Data}) -> + do_apply_template(Data, Template, State) + end, + Datas + ). + +do_apply_template(#{id := Id, qos := QoS, clientid := From} = Data, Template, #{ + target_qos := TargetQoS, target_topic_tks := TargetTopicTks +}) -> + Msg = maps:with([qos, flags, topic, payload, timestamp], Data), + Topic = emqx_placeholder:proc_tmpl(TargetTopicTks, Msg), + Msg#{ + id => emqx_guid:from_hexstr(Id), + qos := + case TargetQoS of + -1 -> + QoS; + _ -> + TargetQoS + end, + from => From, + topic := Topic, + payload := format_data(Template, Msg) + }. + +format_data([], Msg) -> + emqx_utils_json:encode(Msg); +format_data(Tokens, Msg) -> + emqx_placeholder:proc_tmpl(Tokens, Msg). + +redact(Data) -> + emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end). diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl new file mode 100644 index 000000000..d2f8febb9 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl @@ -0,0 +1,163 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% @doc EMQ X Bridge Sysk Frame +%%-------------------------------------------------------------------- + +-module(emqx_bridge_syskeeper_frame). + +%% API +-export([ + versions/0, + current_version/0, + make_state_with_conf/1, + make_state/1, + encode/3, + parse/2, + parse_handshake/1 +]). + +-export([ + bool2int/1, + int2bool/1, + marshaller/1, + serialize_variable_byte_integer/1, + parse_variable_byte_integer/1 +]). + +-export_type([state/0, versions/0, handshake/0, forward/0, packet/0]). + +-include("emqx_bridge_syskeeper.hrl"). + +-type state() :: #{ + handler := atom(), + version := versions(), + ack => boolean() +}. + +-type versions() :: 1. + +-type handshake() :: #{type := handshake, version := versions()}. +-type forward() :: #{type := forward, ack := boolean(), messages := list(map())}. +-type heartbeat() :: #{type := heartbeat}. + +-type packet() :: + handshake() + | forward() + | heartbeat(). + +-callback version() -> versions(). +-callback encode(packet_type_val(), packet_data(), state()) -> binary(). +-callback parse(packet_type(), binary(), state()) -> packet(). + +-define(HIGHBIT, 2#10000000). +-define(LOWBITS, 2#01111111). +-define(MULTIPLIER_MAX, 16#200000). + +-export_type([packet_type/0]). + +%%------------------------------------------------------------------- +%%% API +%%------------------------------------------------------------------- +-spec versions() -> list(versions()). +versions() -> + [1]. + +-spec current_version() -> versions(). +current_version() -> + 1. + +-spec make_state_with_conf(map()) -> state(). +make_state_with_conf(#{ack_mode := Mode}) -> + State = make_state(current_version()), + State#{ack => Mode =:= need_ack}. + +-spec make_state(versions()) -> state(). +make_state(Version) -> + case lists:member(Version, versions()) of + true -> + Handler = erlang:list_to_existing_atom( + io_lib:format("emqx_bridge_syskeeper_frame_v~B", [Version]) + ), + #{ + handler => Handler, + version => Version + }; + _ -> + erlang:throw({unsupport_version, Version}) + end. + +-spec encode(packet_type(), term(), state()) -> binary(). +encode(Type, Data, #{handler := Handler} = State) -> + Handler:encode(packet_type_val(Type), Data, State). + +-spec parse(binary(), state()) -> _. +parse(<> = Bin, #{handler := Handler} = State) -> + Type = to_packet_type(TypeVal), + Handler:parse(Type, Bin, State). + +parse_handshake(Data) -> + State = make_state(1), + parse_handshake(Data, State). + +parse_handshake(Data, #{version := Version} = State) -> + case parse(Data, State) of + {ok, #{type := handshake, version := Version} = Shake} -> + {ok, {State, Shake}}; + {ok, #{type := handshake, version := NewVersion}} -> + State2 = make_state(NewVersion), + parse_handshake(Data, State2); + Error -> + Error + end. + +bool2int(true) -> + 1; +bool2int(_) -> + 0. + +int2bool(1) -> + true; +int2bool(_) -> + false. + +marshaller(Item) when is_binary(Item) -> + erlang:binary_to_term(Item); +marshaller(Item) -> + erlang:term_to_binary(Item). + +serialize_variable_byte_integer(N) when N =< ?LOWBITS -> + <<0:1, N:7>>; +serialize_variable_byte_integer(N) -> + <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. + +parse_variable_byte_integer(Bin) -> + parse_variable_byte_integer(Bin, 1, 0). + +%%------------------------------------------------------------------- +%%% Internal functions +%%------------------------------------------------------------------- +to_packet_type(?TYPE_HANDSHAKE) -> + handshake; +to_packet_type(?TYPE_FORWARD) -> + forward; +to_packet_type(?TYPE_HEARTBEAT) -> + heartbeat. + +packet_type_val(handshake) -> + ?TYPE_HANDSHAKE; +packet_type_val(forward) -> + ?TYPE_FORWARD; +packet_type_val(heartbeat) -> + ?TYPE_HEARTBEAT. + +parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value) when + Multiplier > ?MULTIPLIER_MAX +-> + {error, malformed_variable_byte_integer}; +parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> + parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); +parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> + {ok, Value + Len * Multiplier, Rest}; +parse_variable_byte_integer(<<>>, _Multiplier, _Value) -> + {error, incomplete}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl new file mode 100644 index 000000000..b1c35c68b --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl @@ -0,0 +1,70 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% @doc EMQ X Bridge Sysk Frame version 1 +%%-------------------------------------------------------------------- + +-module(emqx_bridge_syskeeper_frame_v1). + +%% API +-export([ + version/0, + encode/3, + parse/3 +]). + +-behaviour(emqx_bridge_syskeeper_frame). + +-include("emqx_bridge_syskeeper.hrl"). + +-define(B2I(X), emqx_bridge_syskeeper_frame:bool2int((X))). +-define(I2B(X), emqx_bridge_syskeeper_frame:int2bool((X))). + +-import(emqx_bridge_syskeeper_frame, [ + serialize_variable_byte_integer/1, parse_variable_byte_integer/1, marshaller/1 +]). + +%%------------------------------------------------------------------- +%%% API +%%------------------------------------------------------------------- +version() -> + 1. + +encode(?TYPE_HANDSHAKE = Type, _, _) -> + Version = version(), + <>; +encode(?TYPE_FORWARD = Type, Messages, #{ack := Ack}) -> + encode_forward(Messages, Type, Ack); +encode(?TYPE_HEARTBEAT = Type, _, _) -> + <>. + +-dialyzer({nowarn_function, parse/3}). +parse(handshake, <<_:4, _:4, Version:8>>, _) -> + {ok, #{type => handshake, version => Version}}; +parse(forward, Bin, _) -> + parse_forward(Bin); +parse(heartbeat, <<_:4, _:4>>, _) -> + {ok, #{type => heartbeat}}. + +%%------------------------------------------------------------------- +%%% Internal functions +%%------------------------------------------------------------------- +encode_forward(Messages, Type, Ack) -> + AckVal = ?B2I(Ack), + Data = marshaller(Messages), + Len = erlang:byte_size(Data), + LenVal = serialize_variable_byte_integer(Len), + <>. + +parse_forward(<<_:4, AckVal:4, Bin/binary>>) -> + case parse_variable_byte_integer(Bin) of + {ok, Len, Rest} -> + <> = Rest, + {ok, #{ + type => forward, + ack => ?I2B(AckVal), + messages => emqx_bridge_syskeeper_frame:marshaller(MsgBin) + }}; + Error -> + Error + end. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl new file mode 100644 index 000000000..fcdcbac85 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl @@ -0,0 +1,100 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_syskeeper_proxy). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1, + values/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(SYSKEEPER_HOST_OPTIONS, #{ + default_port => 9092 +}). + +%% ------------------------------------------------------------------------------------------------- +%% api +conn_bridge_examples(Method) -> + [ + #{ + <<"syskeeper_proxy">> => #{ + summary => <<"Syskeeper Bridge Proxy">>, + value => values(Method) + } + } + ]. + +values(_Method) -> + #{ + enable => true, + type => syskeeper_proxy, + name => <<"foo">>, + listen => <<"127.0.0.1:9092">>, + acceptors => 16, + handshake_timeout => <<"16s">> + }. + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_syskeeper_proxy". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {listen, listen()}, + {acceptors, + mk( + non_neg_integer(), + #{desc => ?DESC("acceptors"), default => 16} + )}, + {handshake_timeout, + mk( + emqx_schema:timeout_duration_ms(), + #{desc => ?DESC(handshake_timeout), default => <<"10s">>} + )} + ]; +fields("creation_opts") -> + emqx_resource_schema:create_opts([{worker_pool_size, #{default => 1}}]); +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +listen() -> + Meta = #{desc => ?DESC("listen")}, + emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS). + +%% ------------------------------------------------------------------------------------------------- + +type_field() -> + {type, mk(enum([syskeeper_proxy]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl new file mode 100644 index 000000000..50a49a0f3 --- /dev/null +++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl @@ -0,0 +1,251 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_syskeeper_proxy_server). + +-behaviour(gen_statem). + +-include_lib("emqx/include/logger.hrl"). + +-elvis([{elvis_style, invalid_dynamic_call, disable}]). + +%% `emqx_resource' API +-export([ + query_mode/1, + on_start/2, + on_stop/2, + on_get_status/2 +]). + +%% API +-export([start_link/3]). + +%% gen_statem callbacks +-export([callback_mode/0, init/1, terminate/3, code_change/4]). +-export([handle_event/4]). + +-type state() :: wait_ready | handshake | running. +-type data() :: #{ + transport := atom(), + socket := inet:socket(), + frame_state := + undefined + | emqx_bridge_sysk_frame:state(), + buffer := binary(), + conf := map() +}. + +-define(DEFAULT_PORT, 9092). + +%% ------------------------------------------------------------------------------------------------- +%% emqx_resource + +query_mode(_) -> + no_queries. + +on_start( + InstanceId, + #{ + listen := Server, + acceptors := Acceptors + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_syskeeper_connector", + connector => InstanceId, + config => Config + }), + + #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{ + default_port => ?DEFAULT_PORT + }), + ListenOn = {Host, Port}, + + Options = [ + {acceptors, Acceptors}, + {tcp_options, [{mode, binary}, {reuseaddr, true}, {nodelay, true}]} + ], + MFArgs = {?MODULE, start_link, [maps:with([handshake_timeout], Config)]}, + ok = emqx_resource:allocate_resource(InstanceId, listen_on, ListenOn), + + case esockd:open(?MODULE, ListenOn, Options, MFArgs) of + {ok, _} -> + {ok, #{listen_on => ListenOn}}; + Error -> + Error + end. + +on_stop(InstanceId, _State) -> + ?SLOG(info, #{ + msg => "stopping_syskeeper_connector", + connector => InstanceId + }), + case emqx_resource:get_allocated_resources(InstanceId) of + #{listen_on := ListenOn} -> + esockd:close(?MODULE, ListenOn); + _ -> + ok + end. + +on_get_status(_InstanceId, #{listen_on := ListenOn}) -> + try + _ = esockd:listener({?MODULE, ListenOn}), + connected + catch + _:_ -> + disconnected + end. + +%% ------------------------------------------------------------------------------------------------- +-spec start_link(atom(), inet:socket(), map()) -> + {ok, Pid :: pid()} + | ignore + | {error, Error :: term()}. +start_link(Transport, Socket, Conf) -> + gen_statem:start_link(?MODULE, [Transport, Socket, Conf], []). + +%% ------------------------------------------------------------------------------------------------- +%% gen_statem callbacks + +-spec callback_mode() -> gen_statem:callback_mode_result(). +callback_mode() -> handle_event_function. + +%% ------------------------------------------------------------------------------------------------- +-spec init(Args :: term()) -> + gen_statem:init_result(term()). +init([Transport, Socket, Conf]) -> + {ok, wait_ready, + #{ + transport => Transport, + socket => Socket, + conf => Conf, + buffer => <<>>, + frame_state => undefined + }, + {next_event, internal, wait_ready}}. + +handle_event(internal, wait_ready, wait_ready, Data) -> + wait_ready(Data); +handle_event(state_timeout, handshake_timeout, handshake, _Data) -> + %% ?LOG(error, "Handshake tiemout~n", []), + {stop, normal}; +handle_event(internal, try_parse, running, Data) -> + try_parse(running, Data); +handle_event(info, {tcp, _Socket, Bin}, State, Data) -> + try_parse(State, combine_buffer(Bin, Data)); +handle_event(info, {tcp_closed, _}, _State, _Data) -> + {stop, normal}; +handle_event(info, {tcp_error, _, _Reason}, _State, _Data) -> + %% ?LOG(warning, "TCP error, reason:~p~n", [Reason]), + {stop, normal}; +handle_event(_Event, _Content, _State, _Data) -> + %% ?LOG(warning, "Unexpected event:~p, Context:~p, State:~p~n", [Event, Content, State]), + keep_state_and_data. + +-spec terminate(Reason :: term(), State :: state(), Data :: data()) -> + any(). +terminate(_Reason, _State, _Data) -> + ok. + +code_change(_OldVsn, State, Data, _Extra) -> + {ok, State, Data}. + +%% ------------------------------------------------------------------------------------------------- +%%% Internal functions +send(#{transport := Transport, socket := Socket}, Bin) -> + Transport:send(Socket, Bin). + +ack(Data) -> + ack(Data, true). + +ack(Data, false) -> + send(Data, <<0>>); +ack(Data, true) -> + send(Data, <<16#FF>>). + +wait_ready( + #{ + transport := Transport, + socket := RawSocket, + conf := #{handshake_timeout := Timeout} + } = + Data +) -> + case Transport:wait(RawSocket) of + {ok, Socket} -> + Transport:setopts(Socket, [{active, true}]), + {next_state, handshake, + Data#{ + socket => Socket, + frame_state => undefined + }, + {state_timeout, Timeout, handshake_timeout}}; + {error, Reason} -> + ok = Transport:fast_close(RawSocket), + {stop, Reason} + end. + +combine_buffer(Bin, #{buffer := Buffer} = Data) -> + Data#{buffer := <>}. + +try_parse(State, #{buffer := Bin} = Data) -> + case emqx_bridge_syskeeper_frame:parse_variable_byte_integer(Bin) of + {ok, Len, Rest} -> + case Rest of + <> -> + Data2 = Data#{buffer := Rest2}, + Result = parse(Payload, Data2), + handle_parse_result(Result, State, Data2); + _ -> + {keep_state, Data} + end; + {error, incomplete} -> + {keep_state, Data}; + {error, _Reason} -> + %% ?LOG(warning, "Parse error, reason:~p, buffer:~p~n", [Reason, Bin]), + {stop, parse_error} + end. + +%% maybe handshake +parse(Bin, #{frame_state := undefined}) -> + emqx_bridge_syskeeper_frame:parse_handshake(Bin); +parse(Bin, #{frame_state := State}) -> + emqx_bridge_syskeeper_frame:parse(Bin, State). + +do_forward(Ack, Messages, Data) -> + lists:foreach( + fun(Message) -> + Msg = emqx_message:from_map(Message#{headers => #{}, extra => #{}}), + _ = emqx_broker:safe_publish(Msg) + end, + Messages + ), + case Ack of + true -> + ack(Data); + _ -> + ok + end. + +handle_parse_result({ok, Msg}, State, Data) -> + handle_packet(Msg, State, Data); +handle_parse_result({error, _Reason} = Error, State, Data) -> + handle_parse_error(Error, State, #{buffer := _Bin} = Data), + %% ?LOG(warning, "Parse error, state:~p, reason:~p, buffer:~p~n", [State, Reason, Bin]), + {stop, parse_error}. + +handle_parse_error(_, handshake, Data) -> + ack(Data, false); +handle_parse_error(_, _, _) -> + ok. + +handle_packet({FrameState, _Shake}, handshake, Data) -> + ack(Data), + {next_state, running, Data#{frame_state := FrameState}, {next_event, internal, try_parse}}; +handle_packet(#{type := forward, ack := Ack, messages := Messages}, running, Data) -> + do_forward(Ack, Messages, Data), + try_parse(running, Data); +handle_packet(#{type := heartbeat}, running, Data) -> + ack(Data), + try_parse(running, Data). diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm index bb9fc91a6..190ef1afa 100644 --- a/apps/emqx_machine/priv/reboot_lists.eterm +++ b/apps/emqx_machine/priv/reboot_lists.eterm @@ -127,7 +127,8 @@ emqx_dashboard_sso, emqx_audit, emqx_gateway_gbt32960, - emqx_gateway_ocpp + emqx_gateway_ocpp, + emqx_bridge_syskeeper ], %% must always be of type `load' ce_business_apps => diff --git a/mix.exs b/mix.exs index 1e6b37d18..23cef80e3 100644 --- a/mix.exs +++ b/mix.exs @@ -217,7 +217,8 @@ defmodule EMQXUmbrella.MixProject do :emqx_dashboard_sso, :emqx_audit, :emqx_gateway_gbt32960, - :emqx_gateway_ocpp + :emqx_gateway_ocpp, + :emqx_bridge_syskeeper ]) end diff --git a/rebar.config.erl b/rebar.config.erl index 54ce0d6c3..93ad9fa99 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -113,6 +113,7 @@ is_community_umbrella_app("apps/emqx_dashboard_sso") -> false; is_community_umbrella_app("apps/emqx_audit") -> false; is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false; is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false; +is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false; is_community_umbrella_app(_) -> true. is_jq_supported() -> diff --git a/rel/i18n/emqx_bridge_syskeeper.hocon b/rel/i18n/emqx_bridge_syskeeper.hocon new file mode 100644 index 000000000..7cf195317 --- /dev/null +++ b/rel/i18n/emqx_bridge_syskeeper.hocon @@ -0,0 +1,45 @@ +emqx_bridge_syskeeper { + +config_enable.desc: +"""Enable or disable this bridge""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +desc_config.desc: +"""Configuration for a Syskeeper bridge""" + +desc_config.label: +"""Syskeeper Bridge Configuration""" + +desc_name.desc: +"""Bridge name.""" + +desc_name.label: +"""Bridge Name""" + +desc_type.desc: +"""The Bridge Type""" + +desc_type.label: +"""Bridge Type""" + +template.desc: +"""Template""" + +template.label: +"""Template""" + +target_topic.desc: +"""The topic for the forwarded message""" + +target_topic.label: +"""Target Topic""" + +target_qos.desc: +"""The QoS for the forwarded message, -1 is for the original topic""" + +target_qos.label: +"""Target QoS""" + +} diff --git a/rel/i18n/emqx_bridge_syskeeper_connector.hocon b/rel/i18n/emqx_bridge_syskeeper_connector.hocon new file mode 100644 index 000000000..a057e6647 --- /dev/null +++ b/rel/i18n/emqx_bridge_syskeeper_connector.hocon @@ -0,0 +1,21 @@ +emqx_bridge_syskeeper_connector { + +server.desc: +"""The address of the Syskeeper proxy server""" + +server.label: +"""Server""" + +ack_mode.desc: +"""Specify whether the proxy server should reply with an acknowledgement for the message forwarding, can be:
- need_ack
- no_ack
""" + +ack_mode.label: +"""Acknowledgement Mode""" + +ack_timeout.desc: +"""The maximum time to wait for an acknowledgement from the proxy server""" + +ack_timeout.label: +"""Acknowledgement Timeout""" + +} diff --git a/rel/i18n/emqx_bridge_syskeeper_proxy.hocon b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon new file mode 100644 index 000000000..f6c519216 --- /dev/null +++ b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon @@ -0,0 +1,45 @@ +emqx_bridge_syskeeper_proxy { + +config_enable.desc: +"""Enable or disable this bridge""" + +config_enable.label: +"""Enable Or Disable Bridge""" + +desc_config.desc: +"""Configuration for a Syskeeper proxy bridge""" + +desc_config.label: +"""Syskeeper Proxy Bridge Configuration""" + +desc_name.desc: +"""Bridge name""" + +desc_name.label: +"""Bridge Name""" + +desc_type.desc: +"""The Bridge Type""" + +desc_type.label: +"""Bridge Type""" + +listen.desc: +"""The listening address for this Syskeeper proxy server""" + +listen.label: +"""Listen Address""" + +acceptors.desc: +"""The number of the acceptors""" + +acceptors.label: +"""Acceptors""" + +handshake_timeout.desc: +"""The maximum to wait for the handshake when a connection is created""" + +handshake_timeout.label: +"""Handshake Timeout""" + +}