diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl
index 93951cca0..a160ecd33 100644
--- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl
+++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl
@@ -50,7 +50,9 @@ api_schemas(Method) ->
api_ref(emqx_bridge_rabbitmq, <<"rabbitmq">>, Method),
api_ref(emqx_bridge_kinesis, <<"kinesis_producer">>, Method ++ "_producer"),
api_ref(emqx_bridge_greptimedb, <<"greptimedb">>, Method ++ "_grpc_v1"),
- api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer")
+ api_ref(emqx_bridge_azure_event_hub, <<"azure_event_hub_producer">>, Method ++ "_producer"),
+ api_ref(emqx_bridge_syskeeper, <<"syskeeper">>, Method),
+ api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method)
].
schema_modules() ->
@@ -78,7 +80,9 @@ schema_modules() ->
emqx_bridge_rabbitmq,
emqx_bridge_kinesis,
emqx_bridge_greptimedb,
- emqx_bridge_azure_event_hub
+ emqx_bridge_azure_event_hub,
+ emqx_bridge_syskeeper,
+ emqx_bridge_syskeeper_proxy
].
examples(Method) ->
@@ -126,7 +130,9 @@ resource_type(rabbitmq) -> emqx_bridge_rabbitmq_connector;
resource_type(kinesis_producer) -> emqx_bridge_kinesis_impl_producer;
resource_type(greptimedb) -> emqx_bridge_greptimedb_connector;
%% We use AEH's Kafka interface.
-resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer.
+resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer;
+resource_type(syskeeper) -> emqx_bridge_syskeeper_connector;
+resource_type(syskeeper_proxy) -> emqx_bridge_syskeeper_proxy_server.
%% For bridges that need to override connector configurations.
bridge_impl_module(BridgeType) when is_binary(BridgeType) ->
@@ -215,7 +221,8 @@ fields(bridges) ->
influxdb_structs() ++
redis_structs() ++
pgsql_structs() ++ clickhouse_structs() ++ sqlserver_structs() ++ rabbitmq_structs() ++
- kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs().
+ kinesis_structs() ++ greptimedb_structs() ++ azure_event_hub_structs() ++
+ syskeeper_structs().
mongodb_structs() ->
[
@@ -428,6 +435,26 @@ azure_event_hub_structs() ->
)}
].
+syskeeper_structs() ->
+ [
+ {syskeeper,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_syskeeper, "config")),
+ #{
+ desc => <<"Syskeeper bridge config ">>,
+ required => false
+ }
+ )},
+ {syskeeper_proxy,
+ mk(
+ hoconsc:map(name, ref(emqx_bridge_syskeeper_proxy, "config")),
+ #{
+ desc => <<"Syskeeper proxy server config">>,
+ required => false
+ }
+ )}
+ ].
+
api_ref(Module, Type, Method) ->
{Type, ref(Module, Method)}.
diff --git a/apps/emqx_bridge_syskeeper/BSL.txt b/apps/emqx_bridge_syskeeper/BSL.txt
new file mode 100644
index 000000000..0acc0e696
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/BSL.txt
@@ -0,0 +1,94 @@
+Business Source License 1.1
+
+Licensor: Hangzhou EMQ Technologies Co., Ltd.
+Licensed Work: EMQX Enterprise Edition
+ The Licensed Work is (c) 2023
+ Hangzhou EMQ Technologies Co., Ltd.
+Additional Use Grant: Students and educators are granted right to copy,
+ modify, and create derivative work for research
+ or education.
+Change Date: 2027-02-01
+Change License: Apache License, Version 2.0
+
+For information about alternative licensing arrangements for the Software,
+please contact Licensor: https://www.emqx.com/en/contact
+
+Notice
+
+The Business Source License (this document, or the “License”) is not an Open
+Source license. However, the Licensed Work will eventually be made available
+under an Open Source License, as stated in this License.
+
+License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
+“Business Source License” is a trademark of MariaDB Corporation Ab.
+
+-----------------------------------------------------------------------------
+
+Business Source License 1.1
+
+Terms
+
+The Licensor hereby grants you the right to copy, modify, create derivative
+works, redistribute, and make non-production use of the Licensed Work. The
+Licensor may make an Additional Use Grant, above, permitting limited
+production use.
+
+Effective on the Change Date, or the fourth anniversary of the first publicly
+available distribution of a specific version of the Licensed Work under this
+License, whichever comes first, the Licensor hereby grants you rights under
+the terms of the Change License, and the rights granted in the paragraph
+above terminate.
+
+If your use of the Licensed Work does not comply with the requirements
+currently in effect as described in this License, you must purchase a
+commercial license from the Licensor, its affiliated entities, or authorized
+resellers, or you must refrain from using the Licensed Work.
+
+All copies of the original and modified Licensed Work, and derivative works
+of the Licensed Work, are subject to this License. This License applies
+separately for each version of the Licensed Work and the Change Date may vary
+for each version of the Licensed Work released by Licensor.
+
+You must conspicuously display this License on each original or modified copy
+of the Licensed Work. If you receive the Licensed Work in original or
+modified form from a third party, the terms and conditions set forth in this
+License apply to your use of that work.
+
+Any use of the Licensed Work in violation of this License will automatically
+terminate your rights under this License for the current and all other
+versions of the Licensed Work.
+
+This License does not grant you any right in any trademark or logo of
+Licensor or its affiliates (provided that you may use a trademark or logo of
+Licensor as expressly required by this License).
+
+TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
+AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
+EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
+TITLE.
+
+MariaDB hereby grants you permission to use this License’s text to license
+your works, and to refer to it using the trademark “Business Source License”,
+as long as you comply with the Covenants of Licensor below.
+
+Covenants of Licensor
+
+In consideration of the right to use this License’s text and the “Business
+Source License” name and trademark, Licensor covenants to MariaDB, and to all
+other recipients of the licensed work to be provided by Licensor:
+
+1. To specify as the Change License the GPL Version 2.0 or any later version,
+ or a license that is compatible with GPL Version 2.0 or a later version,
+ where “compatible” means that software provided under the Change License can
+ be included in a program with software provided under GPL Version 2.0 or a
+ later version. Licensor may specify additional Change Licenses without
+ limitation.
+
+2. To either: (a) specify an additional grant of rights to use that does not
+ impose any additional restriction on the right granted in this License, as
+ the Additional Use Grant; or (b) insert the text “None”.
+
+3. To specify a Change Date.
+
+4. Not to modify this License in any other way.
diff --git a/apps/emqx_bridge_syskeeper/README.md b/apps/emqx_bridge_syskeeper/README.md
new file mode 100644
index 000000000..328fd488e
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/README.md
@@ -0,0 +1,30 @@
+# EMQX Syskeeper Bridge
+
+Nari Syskeeper 2000 is a one-way Physical Isolation Net Gap.
+
+The application is used to connect EMQX and Syskeeper.
+Users can create a rule and quickly ingest IoT data to the Syskeeper by leveraging
+[EMQX Rules](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html).
+
+# Documentation
+
+- Refer to [Rules engine](https://docs.emqx.com/en/enterprise/v5.0/data-integration/rules.html)
+ for the EMQX rules engine introduction.
+
+# HTTP APIs
+
+- Several APIs are provided for bridge management, which includes create bridge,
+ update bridge, get bridge, stop or restart bridge and list bridges etc.
+
+ Refer to [API Docs - Bridges](https://docs.emqx.com/en/enterprise/v5.0/admin/api-docs.html#tag/Bridges)
+ for more detailed information.
+
+
+# Contributing
+
+Please see our [contributing.md](../../CONTRIBUTING.md).
+
+
+# License
+
+EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).
diff --git a/apps/emqx_bridge_syskeeper/doc/protocol_v1.md b/apps/emqx_bridge_syskeeper/doc/protocol_v1.md
new file mode 100644
index 000000000..ca73c300d
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/doc/protocol_v1.md
@@ -0,0 +1,370 @@
+
+# Table of Contents
+
+1. [Packet Format](#orgb2a43d1)
+2. [Common Header](#org5ca4c69)
+ 1. [Types](#org240efb3)
+ 2. [Shared Flags](#org804fcce)
+3. [Handshake Packet](#org6a73ea8)
+4. [Forward Packet](#org39c753e)
+ 1. [Flags](#org5177d26)
+ 2. [Payload](#orgb29cbd7)
+ 1. [Message Content map structure](#org75acfe6)
+5. [Heartbeat Packet](#org388b69a)
+
+
+
+
+# Packet Format
+
+
+
+
+
+ bytes
+ |
+
+ 0
+ |
+
+ 1
+ |
+
+ 2
+ |
+
+ 3
+ |
+
+ 5
+ |
+
+ 6 .. end
+ |
+
+
+
+
+ |
+
+ variable length
+ |
+
+ common header
+ |
+
+ payload
+ |
+
+
+
+The length of the remaining part(common header + payload) is indicated by the Length Header of each packet
+
+
+
+
+# Common Header
+
+
+
+
+
+ bits
+ |
+
+ 0
+ |
+
+ 1
+ |
+
+ 2
+ |
+
+ 3
+ |
+
+ 4
+ |
+
+ 5
+ |
+
+ 6
+ |
+
+ 7
+ |
+
+
+
+
+ |
+
+ packet type
+ |
+
+ shared flags
+ |
+
+
+
+
+
+
+## Types
+
+
+
+
+
+ type
+ |
+
+ usage
+ |
+
+
+
+ 0
+ |
+
+ handshake
+ |
+
+
+
+ 1
+ |
+
+ forward
+ |
+
+
+
+ 2
+ |
+
+ heartbeat
+ |
+
+
+
+
+
+
+## Shared Flags
+
+The usage of each bit is determined by the type of packet
+
+
+
+
+# Handshake Packet
+
+
+
+
+
+ bytes
+ |
+
+ 0
+ |
+
+ 1
+ |
+
+
+
+
+ |
+
+ common header
+ |
+
+ version
+ |
+
+
+
+
+
+
+# Forward Packet
+
+
+
+
+
+ bits
+ |
+
+ 0
+ |
+
+ 1
+ |
+
+ 2
+ |
+
+ 3
+ |
+
+ 4
+ |
+
+ 5
+ |
+
+ 6
+ |
+
+ 7
+ |
+
+ ...
+ |
+
+
+
+
+
+
+ |
+
+
+ packet type
+
+ |
+
+
+ |
+
+ ACK
+ |
+
+
+ payload
+
+ |
+
+
+
+ forward flags
+ |
+
+
+
+
+
+
+## Flags
+
+
+
+
+
+ flag
+ |
+
+ usage
+ |
+
+
+
+ ACK
+ |
+
+ This packet need a ACK response
+ |
+
+
+
+
+
+
+## Payload
+
+
+
+
+
+ bytes
+ |
+
+ 0
+ |
+
+ ..
+ |
+
+ n
+ |
+
+ n+1
+ |
+
+ ..
+ |
+
+ x
+ |
+
+
+
+
+ |
+
+ Content Length
+ |
+
+ Message Content
+ |
+
+
+
+- Content length is a variable length number.
+- Message content is a list in an opaque binary format whose element is a map structure
+
+
+
+
+### Message Content map structure
+
+ {
+ id: "0006081CCFF3D48F03C10000058B0000", // unique message id
+ qos: 1,
+ flags: {dup: false, retain: false},
+ from: "clientid",
+ topic: "t/1",
+ payload: "hello, world",
+ timestamp: 1697786555281
+ }
+
+
+
+
+# Heartbeat Packet
+
+
+
+
+
+ bytes
+ |
+
+ 0
+ |
+
+
+
+
+ |
+
+ common header
+ |
+
+
+
diff --git a/apps/emqx_bridge_syskeeper/doc/protocol_v1.org b/apps/emqx_bridge_syskeeper/doc/protocol_v1.org
new file mode 100644
index 000000000..12d0fe850
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/doc/protocol_v1.org
@@ -0,0 +1,80 @@
+* Packet Format
+ +-------+-----+-----+-----+-----+-----------------+----------------+
+ | bytes | 0 | 1 | 2 | 3 | 5 | 6 .. end |
+ +-------+-----+-----+-----+-----+-----------------+----------------+
+ | | variable length | common header | payload |
+ +-------+-----------------------+-----------------+----------------+
+
+ The length of the remaining part(common header + payload) is indicated by the Length Header of each packet
+
+* Common Header
+ +------+-----+-----+-----+-----+-----+-----+-----+-----+
+ | bits | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
+ +------+-----+-----+-----+-----+-----+-----+-----+-----+
+ | | packet type | shared flags |
+ +------+-----------------------+-----------------------+
+** Types
+ +----------+-----------+
+ | type | usage |
+ +----------+-----------+
+ | 0 | handshake |
+ +----------+-----------+
+ | 1 | forward |
+ +----------+-----------+
+ | 2 | heartbeat |
+ +----------+-----------+
+** Shared Flags
+ The usage of each bit is determined by the type of packet
+* Handshake Packet
+ +-------+---------------+---------------+
+ | bytes | 0 | 1 |
+ +-------+---------------+---------------+
+ | | common header | version |
+ +-------+---------------+---------------+
+* Forward Packet
+ +------+---+---+---+---+---+---+---+-----+-----------+
+ | bits | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | ... |
+ +------+---+---+---+---+---+---+---+-----+-----------+
+ | | | | ACK | |
+ | | packet type +-----------+-----+ payload |
+ | | | forward flags | |
+ +------+---------------+-----------------+-----------+
+
+** Flags
+ +------+-------------------------------------------+
+ | flag | usage |
+ +------+-------------------------------------------+
+ | ACK | This packet need a ACK response |
+ +------+-------------------------------------------+
+
+** Payload
+ +-------+-----+-------+-----+-----+-----+-----+
+ | bytes | 0 | .. | n | n+1 | .. | x |
+ +-------+-----+-------+-----+-----+-----+-----+
+ | | Content Length | Message Content |
+ +-------+-------------------+-----------------+
+
+ + Content length is a variable length number.
+ + Message content is a list in an opaque binary format whose element is a map structure
+
+*** Message Content map structure
+
+#+begin_src json
+ {
+ id: "0006081CCFF3D48F03C10000058B0000", // unique message id
+ qos: 1,
+ flags: {dup: false, retain: false},
+ from: "clientid",
+ topic: "t/1",
+ payload: "hello, world",
+ timestamp: 1697786555281
+ }
+#+end_src
+
+* Heartbeat Packet
+
+ +-------+---------------+
+ | bytes | 0 |
+ +-------+---------------+
+ | | common header |
+ +-------+---------------+
diff --git a/apps/emqx_bridge_syskeeper/docker-ct b/apps/emqx_bridge_syskeeper/docker-ct
new file mode 100644
index 000000000..80f0d394b
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/docker-ct
@@ -0,0 +1 @@
+toxiproxy
diff --git a/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl b/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl
new file mode 100644
index 000000000..b381ebf50
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/include/emqx_bridge_syskeeper.hrl
@@ -0,0 +1,15 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-ifndef(EMQX_BRIDGE_SYSKEEPER).
+-define(EMQX_BRIDGE_SYSKEEPER, true).
+
+-define(TYPE_HANDSHAKE, 0).
+-define(TYPE_FORWARD, 1).
+-define(TYPE_HEARTBEAT, 2).
+
+-type packet_type() :: handshake | forward | heartbeat.
+-type packet_data() :: none | binary() | [binary()].
+-type packet_type_val() :: ?TYPE_HANDSHAKE..?TYPE_HEARTBEAT.
+
+-endif.
diff --git a/apps/emqx_bridge_syskeeper/rebar.config b/apps/emqx_bridge_syskeeper/rebar.config
new file mode 100644
index 000000000..31879d9ce
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/rebar.config
@@ -0,0 +1,6 @@
+%% -*- mode: erlang; -*-
+{erl_opts, [debug_info]}.
+{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}}
+ , {emqx_resource, {path, "../../apps/emqx_resource"}}
+ , {emqx_bridge, {path, "../../apps/emqx_bridge"}}
+ ]}.
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src
new file mode 100644
index 000000000..a8f533867
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.app.src
@@ -0,0 +1,13 @@
+{application, emqx_bridge_syskeeper, [
+ {description, "EMQX Enterprise Bridge"},
+ {vsn, "0.1.0"},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib,
+ emqx_resource
+ ]},
+ {env, []},
+ {modules, []},
+ {links, []}
+]}.
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl
new file mode 100644
index 000000000..9e0ec4eed
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper.erl
@@ -0,0 +1,117 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_syskeeper).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+ conn_bridge_examples/1,
+ values/1
+]).
+
+-export([
+ namespace/0,
+ roots/0,
+ fields/1,
+ desc/1
+]).
+
+%% -------------------------------------------------------------------------------------------------
+%% api
+conn_bridge_examples(Method) ->
+ [
+ #{
+ <<"syskeeper">> => #{
+ summary => <<"Syskeeper Bridge">>,
+ value => values(Method)
+ }
+ }
+ ].
+
+values(_Method) ->
+ #{
+ enable => true,
+ type => syskeeper,
+ name => <<"foo">>,
+ server => <<"127.0.0.1:9092">>,
+ ack_mode => <<"no_ack">>,
+ ack_timeout => <<"10s">>,
+ pool_size => 16,
+ target_topic => <<"${topic}">>,
+ target_qos => <<"-1">>,
+ template => <<"${payload}">>,
+ resource_opts => #{
+ worker_pool_size => 16,
+ health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+ batch_size => ?DEFAULT_BATCH_SIZE,
+ batch_time => ?DEFAULT_BATCH_TIME,
+ query_mode => sync,
+ max_buffer_bytes => ?DEFAULT_BUFFER_BYTES
+ }
+ }.
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+namespace() -> "bridge_syskeeper".
+
+roots() -> [].
+
+fields("config") ->
+ [
+ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+ {target_topic,
+ mk(
+ binary(),
+ #{desc => ?DESC("target_topic"), default => <<"${topic}">>}
+ )},
+ {target_qos,
+ mk(
+ range(-1, 2),
+ #{desc => ?DESC("target_qos"), default => -1}
+ )},
+ {template,
+ mk(
+ binary(),
+ #{desc => ?DESC("template"), default => <<"${payload}">>}
+ )},
+ {resource_opts,
+ mk(
+ ref(?MODULE, "creation_opts"),
+ #{
+ required => false,
+ default => #{},
+ desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+ }
+ )}
+ ] ++ emqx_bridge_syskeeper_connector:fields(config);
+fields("creation_opts") ->
+ emqx_resource_schema:create_opts([{request_ttl, #{default => infinity}}]);
+fields("post") ->
+ [type_field(), name_field() | fields("config")];
+fields("put") ->
+ fields("config");
+fields("get") ->
+ emqx_bridge_schema:status_fields() ++ fields("post").
+
+desc("config") ->
+ ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+ ["Configuration for Syskeeper using `", string:to_upper(Method), "` method."];
+desc("creation_opts" = Name) ->
+ emqx_resource_schema:desc(Name);
+desc(_) ->
+ undefined.
+
+%% -------------------------------------------------------------------------------------------------
+
+type_field() ->
+ {type, mk(enum([syskeeper]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+ {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl
new file mode 100644
index 000000000..18822886f
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_client.erl
@@ -0,0 +1,180 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_syskeeper_client).
+
+-behaviour(gen_server).
+
+%% API
+-export([
+ start_link/1,
+ forward/3,
+ heartbeat/2
+]).
+
+%% gen_server callbacks
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3,
+ format_status/2
+]).
+
+-include("emqx_bridge_syskeeper.hrl").
+
+-type state() :: #{
+ ack_mode := need_ack | no_ack,
+ ack_timeout := timer:time(),
+ socket := undefined | inet:socket(),
+ frame_state := emqx_bridge_syskeeper_frame:state(),
+ last_error := undefined | tuple()
+}.
+
+-type send_result() :: {ok, state()} | {error, term()}.
+
+%% -------------------------------------------------------------------------------------------------
+%% API
+forward(Pid, Msg, Timeout) ->
+ call(Pid, {?FUNCTION_NAME, Msg}, Timeout).
+
+heartbeat(Pid, Timeout) ->
+ ok =:= call(Pid, ?FUNCTION_NAME, Timeout).
+
+%% -------------------------------------------------------------------------------------------------
+%% Starts Bridge which transfer data to Syskeeper
+
+start_link(Options) ->
+ gen_server:start_link(?MODULE, Options, []).
+
+%% -------------------------------------------------------------------------------------------------
+%%% gen_server callbacks
+
+%% Initialize syskeeper client
+init(#{ack_timeout := AckTimeout, ack_mode := AckMode} = Options) ->
+ erlang:process_flag(trap_exit, true),
+ connect(Options, #{
+ ack_timeout => AckTimeout,
+ ack_mode => AckMode,
+ socket => undefined,
+ last_error => undefined,
+ frame_state => emqx_bridge_syskeeper_frame:make_state_with_conf(Options)
+ }).
+
+handle_call({forward, Msgs}, _From, State) ->
+ Result = send_packet(forward, Msgs, State),
+ handle_reply_result(Result, State);
+handle_call(heartbeat, _From, State) ->
+ Result = send_ack_packet(heartbeat, none, State),
+ handle_reply_result(Result, State);
+handle_call(_Request, _From, State) ->
+ {reply, ok, State}.
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info({tcp_closed, _} = Reason, State) ->
+ {noreply, State#{socket := undefined, last_error := Reason}};
+handle_info({last_error, _, _} = Reason, State) ->
+ {noreply, State#{socket := undefined, last_error := Reason}};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, #{socket := Socket} = _State) ->
+ close_socket(Socket),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+-spec format_status(
+ Opt :: normal | terminate,
+ Status :: list()
+) -> Status :: term().
+format_status(_Opt, Status) ->
+ Status.
+
+%% ------------------------------------------------------------------------------------------------
+connect(
+ #{
+ hostname := Host,
+ port := Port
+ },
+ State
+) ->
+ case
+ gen_tcp:connect(Host, Port, [
+ {active, true},
+ {mode, binary},
+ {nodelay, true}
+ ])
+ of
+ {ok, Socket} ->
+ send_ack_packet(handshake, none, State#{socket := Socket});
+ {error, Reason} ->
+ {stop, Reason}
+ end.
+
+-spec send_ack_packet(packet_type(), packet_data(), state()) -> send_result().
+send_ack_packet(Type, Data, State) ->
+ send_packet(Type, Data, State, true).
+
+-spec send_packet(packet_type(), packet_data(), state()) -> send_result().
+send_packet(Type, Data, State) ->
+ send_packet(Type, Data, State, false).
+
+-spec send_packet(packet_type(), packet_data(), state(), boolean()) -> send_result().
+send_packet(_Type, _Data, #{socket := undefined, last_error := Reason}, _Force) ->
+ {error, Reason};
+send_packet(Type, Data, #{frame_state := FrameState} = State, Force) ->
+ Packet = emqx_bridge_syskeeper_frame:encode(Type, Data, FrameState),
+ case socket_send(Packet, State) of
+ ok ->
+ wait_ack(State, Force);
+ {error, _} = Error ->
+ Error
+ end.
+
+-spec socket_send(binary() | [binary()], state()) -> ok | {error, _Reason}.
+socket_send(Bin, State) when is_binary(Bin) ->
+ socket_send([Bin], State);
+socket_send(Bins, #{socket := Socket}) ->
+ Map = fun(Data) ->
+ Len = erlang:byte_size(Data),
+ VarLen = emqx_bridge_syskeeper_frame:serialize_variable_byte_integer(Len),
+ <>
+ end,
+ gen_tcp:send(Socket, lists:map(Map, Bins)).
+
+-spec wait_ack(state(), boolean()) -> send_result().
+wait_ack(#{ack_timeout := AckTimeout, ack_mode := AckMode} = State, Force) when
+ AckMode =:= need_ack; Force
+->
+ receive
+ {tcp, _Socket, <<16#FF>>} ->
+ {ok, State};
+ {tcp_closed, _} = Reason ->
+ {error, Reason};
+ {tcp_error, _, _} = Reason ->
+ {error, Reason}
+ after AckTimeout ->
+ {error, wait_ack_timeout}
+ end;
+wait_ack(State, _Force) ->
+ {ok, State}.
+
+close_socket(undefined) ->
+ ok;
+close_socket(Socket) ->
+ catch gen_tcp:close(Socket),
+ ok.
+
+call(Pid, Msg, Timeout) ->
+ gen_server:call(Pid, Msg, Timeout).
+
+handle_reply_result({ok, _}, State) ->
+ {reply, ok, State};
+handle_reply_result({error, Reason}, State) ->
+ {reply, {error, {recoverable_error, Reason}}, State#{last_error := Reason}}.
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl
new file mode 100644
index 000000000..219d4d0d2
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_connector.erl
@@ -0,0 +1,262 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_syskeeper_connector).
+
+-behaviour(emqx_resource).
+
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+
+-export([roots/0, fields/1]).
+
+%% `emqx_resource' API
+-export([
+ callback_mode/0,
+ query_mode/1,
+ on_start/2,
+ on_stop/2,
+ on_query/3,
+ on_batch_query/3,
+ on_get_status/2
+]).
+
+-export([
+ connect/1
+]).
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-define(SYSKEEPER_HOST_OPTIONS, #{
+ default_port => 9092
+}).
+
+-define(EXTRA_CALL_TIMEOUT, 2000).
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon schema
+roots() ->
+ [{config, #{type => hoconsc:ref(?MODULE, config)}}].
+
+fields(config) ->
+ [
+ {server, server()},
+ {ack_mode,
+ mk(
+ enum([need_ack, no_ack]),
+ #{desc => ?DESC(ack_mode), default => <<"no_ack">>}
+ )},
+ {ack_timeout,
+ mk(
+ emqx_schema:timeout_duration_ms(),
+ #{desc => ?DESC(ack_timeout), default => <<"10s">>}
+ )},
+ {pool_size, fun
+ (default) ->
+ 16;
+ (Other) ->
+ emqx_connector_schema_lib:pool_size(Other)
+ end}
+ ].
+
+server() ->
+ Meta = #{desc => ?DESC("server")},
+ emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS).
+
+%% -------------------------------------------------------------------------------------------------
+%% `emqx_resource' API
+
+callback_mode() -> always_sync.
+
+query_mode(_) -> sync.
+
+on_start(
+ InstanceId,
+ #{
+ server := Server,
+ pool_size := PoolSize,
+ ack_timeout := AckTimeout,
+ target_topic := TargetTopic,
+ target_qos := TargetQoS
+ } = Config
+) ->
+ ?SLOG(info, #{
+ msg => "starting_syskeeper_connector",
+ connector => InstanceId,
+ config => redact(Config)
+ }),
+
+ HostCfg = emqx_schema:parse_server(Server, ?SYSKEEPER_HOST_OPTIONS),
+
+ Options = [
+ {options,
+ maps:merge(
+ HostCfg,
+ maps:with([ack_mode, ack_timeout], Config)
+ )},
+ {pool_size, PoolSize}
+ ],
+
+ State = #{
+ pool_name => InstanceId,
+ target_qos => TargetQoS,
+ ack_timeout => AckTimeout,
+ templates => parse_template(Config),
+ target_topic_tks => emqx_placeholder:preproc_tmpl(TargetTopic)
+ },
+ case emqx_resource_pool:start(InstanceId, ?MODULE, Options) of
+ ok ->
+ {ok, State};
+ Error ->
+ Error
+ end.
+
+on_stop(InstanceId, _State) ->
+ ?SLOG(info, #{
+ msg => "stopping_syskeeper_connector",
+ connector => InstanceId
+ }),
+ emqx_resource_pool:stop(InstanceId).
+
+on_query(InstanceId, {send_message, _} = Query, State) ->
+ do_query(InstanceId, [Query], State);
+on_query(_InstanceId, Query, _State) ->
+ {error, {unrecoverable_error, {invalid_request, Query}}}.
+
+%% we only support batch insert
+on_batch_query(InstanceId, [{send_message, _} | _] = Query, State) ->
+ do_query(InstanceId, Query, State);
+on_batch_query(_InstanceId, Query, _State) ->
+ {error, {unrecoverable_error, {invalid_request, Query}}}.
+
+on_get_status(_InstanceId, #{pool_name := Pool, ack_timeout := AckTimeout}) ->
+ Health = emqx_resource_pool:health_check_workers(
+ Pool, {emqx_bridge_syskeeper_client, heartbeat, [AckTimeout + ?EXTRA_CALL_TIMEOUT]}
+ ),
+ status_result(Health).
+
+status_result(true) -> connected;
+status_result(false) -> connecting;
+status_result({error, _}) -> connecting.
+
+%% -------------------------------------------------------------------------------------------------
+%% Helper fns
+
+do_query(
+ InstanceId,
+ Query,
+ #{pool_name := PoolName, ack_timeout := AckTimeout} = State
+) ->
+ ?TRACE(
+ "QUERY",
+ "syskeeper_connector_received",
+ #{connector => InstanceId, query => Query, state => State}
+ ),
+
+ Result =
+ case try_apply_template(Query, State) of
+ {ok, Msg} ->
+ ecpool:pick_and_do(
+ PoolName,
+ {emqx_bridge_syskeeper_client, forward, [Msg, AckTimeout + ?EXTRA_CALL_TIMEOUT]},
+ no_handover
+ );
+ Error ->
+ Error
+ end,
+
+ case Result of
+ {error, Reason} ->
+ ?tp(
+ syskeeper_connector_query_return,
+ #{error => Reason}
+ ),
+ %% ?SLOG(error, #{
+ %% msg => "syskeeper_connector_do_query_failed",
+ %% connector => InstanceId,
+ %% query => Query,
+ %% reason => Reason
+ %% }),
+ case Reason of
+ ecpool_empty ->
+ {error, {recoverable_error, Reason}};
+ _ ->
+ Result
+ end;
+ _ ->
+ %% ?tp(
+ %% syskeeper_connector_query_return,
+ %% #{result => Result}
+ %% ),
+ Result
+ end.
+
+connect(Opts) ->
+ Options = proplists:get_value(options, Opts),
+ emqx_bridge_syskeeper_client:start_link(Options).
+
+parse_template(Config) ->
+ Templates =
+ case maps:get(template, Config, undefined) of
+ undefined -> #{};
+ <<>> -> #{};
+ Template -> #{send_message => Template}
+ end,
+
+ parse_template(maps:to_list(Templates), #{}).
+
+parse_template([{Key, H} | T], Templates) ->
+ ParamsTks = emqx_placeholder:preproc_tmpl(H),
+ parse_template(
+ T,
+ Templates#{Key => ParamsTks}
+ );
+parse_template([], Templates) ->
+ Templates.
+
+try_apply_template([{Type, _} | _] = Datas, #{templates := Templates} = State) ->
+ case maps:find(Type, Templates) of
+ {ok, Template} ->
+ {ok, apply_template(Datas, Template, State)};
+ _ ->
+ {error, {unrecoverable_error, {invalid_request, Datas}}}
+ end.
+
+apply_template(Datas, Template, State) ->
+ lists:map(
+ fun({_, Data}) ->
+ do_apply_template(Data, Template, State)
+ end,
+ Datas
+ ).
+
+do_apply_template(#{id := Id, qos := QoS, clientid := From} = Data, Template, #{
+ target_qos := TargetQoS, target_topic_tks := TargetTopicTks
+}) ->
+ Msg = maps:with([qos, flags, topic, payload, timestamp], Data),
+ Topic = emqx_placeholder:proc_tmpl(TargetTopicTks, Msg),
+ Msg#{
+ id => emqx_guid:from_hexstr(Id),
+ qos :=
+ case TargetQoS of
+ -1 ->
+ QoS;
+ _ ->
+ TargetQoS
+ end,
+ from => From,
+ topic := Topic,
+ payload := format_data(Template, Msg)
+ }.
+
+format_data([], Msg) ->
+ emqx_utils_json:encode(Msg);
+format_data(Tokens, Msg) ->
+ emqx_placeholder:proc_tmpl(Tokens, Msg).
+
+redact(Data) ->
+ emqx_utils:redact(Data, fun(Any) -> Any =:= aws_secret_access_key end).
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl
new file mode 100644
index 000000000..d2f8febb9
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame.erl
@@ -0,0 +1,163 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% @doc EMQ X Bridge Sysk Frame
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_syskeeper_frame).
+
+%% API
+-export([
+ versions/0,
+ current_version/0,
+ make_state_with_conf/1,
+ make_state/1,
+ encode/3,
+ parse/2,
+ parse_handshake/1
+]).
+
+-export([
+ bool2int/1,
+ int2bool/1,
+ marshaller/1,
+ serialize_variable_byte_integer/1,
+ parse_variable_byte_integer/1
+]).
+
+-export_type([state/0, versions/0, handshake/0, forward/0, packet/0]).
+
+-include("emqx_bridge_syskeeper.hrl").
+
+-type state() :: #{
+ handler := atom(),
+ version := versions(),
+ ack => boolean()
+}.
+
+-type versions() :: 1.
+
+-type handshake() :: #{type := handshake, version := versions()}.
+-type forward() :: #{type := forward, ack := boolean(), messages := list(map())}.
+-type heartbeat() :: #{type := heartbeat}.
+
+-type packet() ::
+ handshake()
+ | forward()
+ | heartbeat().
+
+-callback version() -> versions().
+-callback encode(packet_type_val(), packet_data(), state()) -> binary().
+-callback parse(packet_type(), binary(), state()) -> packet().
+
+-define(HIGHBIT, 2#10000000).
+-define(LOWBITS, 2#01111111).
+-define(MULTIPLIER_MAX, 16#200000).
+
+-export_type([packet_type/0]).
+
+%%-------------------------------------------------------------------
+%%% API
+%%-------------------------------------------------------------------
+-spec versions() -> list(versions()).
+versions() ->
+ [1].
+
+-spec current_version() -> versions().
+current_version() ->
+ 1.
+
+-spec make_state_with_conf(map()) -> state().
+make_state_with_conf(#{ack_mode := Mode}) ->
+ State = make_state(current_version()),
+ State#{ack => Mode =:= need_ack}.
+
+-spec make_state(versions()) -> state().
+make_state(Version) ->
+ case lists:member(Version, versions()) of
+ true ->
+ Handler = erlang:list_to_existing_atom(
+ io_lib:format("emqx_bridge_syskeeper_frame_v~B", [Version])
+ ),
+ #{
+ handler => Handler,
+ version => Version
+ };
+ _ ->
+ erlang:throw({unsupport_version, Version})
+ end.
+
+-spec encode(packet_type(), term(), state()) -> binary().
+encode(Type, Data, #{handler := Handler} = State) ->
+ Handler:encode(packet_type_val(Type), Data, State).
+
+-spec parse(binary(), state()) -> _.
+parse(<> = Bin, #{handler := Handler} = State) ->
+ Type = to_packet_type(TypeVal),
+ Handler:parse(Type, Bin, State).
+
+parse_handshake(Data) ->
+ State = make_state(1),
+ parse_handshake(Data, State).
+
+parse_handshake(Data, #{version := Version} = State) ->
+ case parse(Data, State) of
+ {ok, #{type := handshake, version := Version} = Shake} ->
+ {ok, {State, Shake}};
+ {ok, #{type := handshake, version := NewVersion}} ->
+ State2 = make_state(NewVersion),
+ parse_handshake(Data, State2);
+ Error ->
+ Error
+ end.
+
+bool2int(true) ->
+ 1;
+bool2int(_) ->
+ 0.
+
+int2bool(1) ->
+ true;
+int2bool(_) ->
+ false.
+
+marshaller(Item) when is_binary(Item) ->
+ erlang:binary_to_term(Item);
+marshaller(Item) ->
+ erlang:term_to_binary(Item).
+
+serialize_variable_byte_integer(N) when N =< ?LOWBITS ->
+ <<0:1, N:7>>;
+serialize_variable_byte_integer(N) ->
+ <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>.
+
+parse_variable_byte_integer(Bin) ->
+ parse_variable_byte_integer(Bin, 1, 0).
+
+%%-------------------------------------------------------------------
+%%% Internal functions
+%%-------------------------------------------------------------------
+to_packet_type(?TYPE_HANDSHAKE) ->
+ handshake;
+to_packet_type(?TYPE_FORWARD) ->
+ forward;
+to_packet_type(?TYPE_HEARTBEAT) ->
+ heartbeat.
+
+packet_type_val(handshake) ->
+ ?TYPE_HANDSHAKE;
+packet_type_val(forward) ->
+ ?TYPE_FORWARD;
+packet_type_val(heartbeat) ->
+ ?TYPE_HEARTBEAT.
+
+parse_variable_byte_integer(<<1:1, _Len:7, _Rest/binary>>, Multiplier, _Value) when
+ Multiplier > ?MULTIPLIER_MAX
+->
+ {error, malformed_variable_byte_integer};
+parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
+ parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
+parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
+ {ok, Value + Len * Multiplier, Rest};
+parse_variable_byte_integer(<<>>, _Multiplier, _Value) ->
+ {error, incomplete}.
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl
new file mode 100644
index 000000000..b1c35c68b
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_frame_v1.erl
@@ -0,0 +1,70 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% @doc EMQ X Bridge Sysk Frame version 1
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_syskeeper_frame_v1).
+
+%% API
+-export([
+ version/0,
+ encode/3,
+ parse/3
+]).
+
+-behaviour(emqx_bridge_syskeeper_frame).
+
+-include("emqx_bridge_syskeeper.hrl").
+
+-define(B2I(X), emqx_bridge_syskeeper_frame:bool2int((X))).
+-define(I2B(X), emqx_bridge_syskeeper_frame:int2bool((X))).
+
+-import(emqx_bridge_syskeeper_frame, [
+ serialize_variable_byte_integer/1, parse_variable_byte_integer/1, marshaller/1
+]).
+
+%%-------------------------------------------------------------------
+%%% API
+%%-------------------------------------------------------------------
+version() ->
+ 1.
+
+encode(?TYPE_HANDSHAKE = Type, _, _) ->
+ Version = version(),
+ <>;
+encode(?TYPE_FORWARD = Type, Messages, #{ack := Ack}) ->
+ encode_forward(Messages, Type, Ack);
+encode(?TYPE_HEARTBEAT = Type, _, _) ->
+ <>.
+
+-dialyzer({nowarn_function, parse/3}).
+parse(handshake, <<_:4, _:4, Version:8>>, _) ->
+ {ok, #{type => handshake, version => Version}};
+parse(forward, Bin, _) ->
+ parse_forward(Bin);
+parse(heartbeat, <<_:4, _:4>>, _) ->
+ {ok, #{type => heartbeat}}.
+
+%%-------------------------------------------------------------------
+%%% Internal functions
+%%-------------------------------------------------------------------
+encode_forward(Messages, Type, Ack) ->
+ AckVal = ?B2I(Ack),
+ Data = marshaller(Messages),
+ Len = erlang:byte_size(Data),
+ LenVal = serialize_variable_byte_integer(Len),
+ <>.
+
+parse_forward(<<_:4, AckVal:4, Bin/binary>>) ->
+ case parse_variable_byte_integer(Bin) of
+ {ok, Len, Rest} ->
+ <> = Rest,
+ {ok, #{
+ type => forward,
+ ack => ?I2B(AckVal),
+ messages => emqx_bridge_syskeeper_frame:marshaller(MsgBin)
+ }};
+ Error ->
+ Error
+ end.
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl
new file mode 100644
index 000000000..fcdcbac85
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy.erl
@@ -0,0 +1,100 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_bridge_syskeeper_proxy).
+
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+ conn_bridge_examples/1,
+ values/1
+]).
+
+-export([
+ namespace/0,
+ roots/0,
+ fields/1,
+ desc/1
+]).
+
+-define(SYSKEEPER_HOST_OPTIONS, #{
+ default_port => 9092
+}).
+
+%% -------------------------------------------------------------------------------------------------
+%% api
+conn_bridge_examples(Method) ->
+ [
+ #{
+ <<"syskeeper_proxy">> => #{
+ summary => <<"Syskeeper Bridge Proxy">>,
+ value => values(Method)
+ }
+ }
+ ].
+
+values(_Method) ->
+ #{
+ enable => true,
+ type => syskeeper_proxy,
+ name => <<"foo">>,
+ listen => <<"127.0.0.1:9092">>,
+ acceptors => 16,
+ handshake_timeout => <<"16s">>
+ }.
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+namespace() -> "bridge_syskeeper_proxy".
+
+roots() -> [].
+
+fields("config") ->
+ [
+ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+ {listen, listen()},
+ {acceptors,
+ mk(
+ non_neg_integer(),
+ #{desc => ?DESC("acceptors"), default => 16}
+ )},
+ {handshake_timeout,
+ mk(
+ emqx_schema:timeout_duration_ms(),
+ #{desc => ?DESC(handshake_timeout), default => <<"10s">>}
+ )}
+ ];
+fields("creation_opts") ->
+ emqx_resource_schema:create_opts([{worker_pool_size, #{default => 1}}]);
+fields("post") ->
+ [type_field(), name_field() | fields("config")];
+fields("put") ->
+ fields("config");
+fields("get") ->
+ emqx_bridge_schema:status_fields() ++ fields("post").
+
+desc("config") ->
+ ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+ ["Configuration for Syskeeper Proxy using `", string:to_upper(Method), "` method."];
+desc("creation_opts" = Name) ->
+ emqx_resource_schema:desc(Name);
+desc(_) ->
+ undefined.
+
+listen() ->
+ Meta = #{desc => ?DESC("listen")},
+ emqx_schema:servers_sc(Meta, ?SYSKEEPER_HOST_OPTIONS).
+
+%% -------------------------------------------------------------------------------------------------
+
+type_field() ->
+ {type, mk(enum([syskeeper_proxy]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+ {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
diff --git a/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl
new file mode 100644
index 000000000..50a49a0f3
--- /dev/null
+++ b/apps/emqx_bridge_syskeeper/src/emqx_bridge_syskeeper_proxy_server.erl
@@ -0,0 +1,251 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_bridge_syskeeper_proxy_server).
+
+-behaviour(gen_statem).
+
+-include_lib("emqx/include/logger.hrl").
+
+-elvis([{elvis_style, invalid_dynamic_call, disable}]).
+
+%% `emqx_resource' API
+-export([
+ query_mode/1,
+ on_start/2,
+ on_stop/2,
+ on_get_status/2
+]).
+
+%% API
+-export([start_link/3]).
+
+%% gen_statem callbacks
+-export([callback_mode/0, init/1, terminate/3, code_change/4]).
+-export([handle_event/4]).
+
+-type state() :: wait_ready | handshake | running.
+-type data() :: #{
+ transport := atom(),
+ socket := inet:socket(),
+ frame_state :=
+ undefined
+ | emqx_bridge_sysk_frame:state(),
+ buffer := binary(),
+ conf := map()
+}.
+
+-define(DEFAULT_PORT, 9092).
+
+%% -------------------------------------------------------------------------------------------------
+%% emqx_resource
+
+query_mode(_) ->
+ no_queries.
+
+on_start(
+ InstanceId,
+ #{
+ listen := Server,
+ acceptors := Acceptors
+ } = Config
+) ->
+ ?SLOG(info, #{
+ msg => "starting_syskeeper_connector",
+ connector => InstanceId,
+ config => Config
+ }),
+
+ #{hostname := Host, port := Port} = emqx_schema:parse_server(Server, #{
+ default_port => ?DEFAULT_PORT
+ }),
+ ListenOn = {Host, Port},
+
+ Options = [
+ {acceptors, Acceptors},
+ {tcp_options, [{mode, binary}, {reuseaddr, true}, {nodelay, true}]}
+ ],
+ MFArgs = {?MODULE, start_link, [maps:with([handshake_timeout], Config)]},
+ ok = emqx_resource:allocate_resource(InstanceId, listen_on, ListenOn),
+
+ case esockd:open(?MODULE, ListenOn, Options, MFArgs) of
+ {ok, _} ->
+ {ok, #{listen_on => ListenOn}};
+ Error ->
+ Error
+ end.
+
+on_stop(InstanceId, _State) ->
+ ?SLOG(info, #{
+ msg => "stopping_syskeeper_connector",
+ connector => InstanceId
+ }),
+ case emqx_resource:get_allocated_resources(InstanceId) of
+ #{listen_on := ListenOn} ->
+ esockd:close(?MODULE, ListenOn);
+ _ ->
+ ok
+ end.
+
+on_get_status(_InstanceId, #{listen_on := ListenOn}) ->
+ try
+ _ = esockd:listener({?MODULE, ListenOn}),
+ connected
+ catch
+ _:_ ->
+ disconnected
+ end.
+
+%% -------------------------------------------------------------------------------------------------
+-spec start_link(atom(), inet:socket(), map()) ->
+ {ok, Pid :: pid()}
+ | ignore
+ | {error, Error :: term()}.
+start_link(Transport, Socket, Conf) ->
+ gen_statem:start_link(?MODULE, [Transport, Socket, Conf], []).
+
+%% -------------------------------------------------------------------------------------------------
+%% gen_statem callbacks
+
+-spec callback_mode() -> gen_statem:callback_mode_result().
+callback_mode() -> handle_event_function.
+
+%% -------------------------------------------------------------------------------------------------
+-spec init(Args :: term()) ->
+ gen_statem:init_result(term()).
+init([Transport, Socket, Conf]) ->
+ {ok, wait_ready,
+ #{
+ transport => Transport,
+ socket => Socket,
+ conf => Conf,
+ buffer => <<>>,
+ frame_state => undefined
+ },
+ {next_event, internal, wait_ready}}.
+
+handle_event(internal, wait_ready, wait_ready, Data) ->
+ wait_ready(Data);
+handle_event(state_timeout, handshake_timeout, handshake, _Data) ->
+ %% ?LOG(error, "Handshake tiemout~n", []),
+ {stop, normal};
+handle_event(internal, try_parse, running, Data) ->
+ try_parse(running, Data);
+handle_event(info, {tcp, _Socket, Bin}, State, Data) ->
+ try_parse(State, combine_buffer(Bin, Data));
+handle_event(info, {tcp_closed, _}, _State, _Data) ->
+ {stop, normal};
+handle_event(info, {tcp_error, _, _Reason}, _State, _Data) ->
+ %% ?LOG(warning, "TCP error, reason:~p~n", [Reason]),
+ {stop, normal};
+handle_event(_Event, _Content, _State, _Data) ->
+ %% ?LOG(warning, "Unexpected event:~p, Context:~p, State:~p~n", [Event, Content, State]),
+ keep_state_and_data.
+
+-spec terminate(Reason :: term(), State :: state(), Data :: data()) ->
+ any().
+terminate(_Reason, _State, _Data) ->
+ ok.
+
+code_change(_OldVsn, State, Data, _Extra) ->
+ {ok, State, Data}.
+
+%% -------------------------------------------------------------------------------------------------
+%%% Internal functions
+send(#{transport := Transport, socket := Socket}, Bin) ->
+ Transport:send(Socket, Bin).
+
+ack(Data) ->
+ ack(Data, true).
+
+ack(Data, false) ->
+ send(Data, <<0>>);
+ack(Data, true) ->
+ send(Data, <<16#FF>>).
+
+wait_ready(
+ #{
+ transport := Transport,
+ socket := RawSocket,
+ conf := #{handshake_timeout := Timeout}
+ } =
+ Data
+) ->
+ case Transport:wait(RawSocket) of
+ {ok, Socket} ->
+ Transport:setopts(Socket, [{active, true}]),
+ {next_state, handshake,
+ Data#{
+ socket => Socket,
+ frame_state => undefined
+ },
+ {state_timeout, Timeout, handshake_timeout}};
+ {error, Reason} ->
+ ok = Transport:fast_close(RawSocket),
+ {stop, Reason}
+ end.
+
+combine_buffer(Bin, #{buffer := Buffer} = Data) ->
+ Data#{buffer := <>}.
+
+try_parse(State, #{buffer := Bin} = Data) ->
+ case emqx_bridge_syskeeper_frame:parse_variable_byte_integer(Bin) of
+ {ok, Len, Rest} ->
+ case Rest of
+ <> ->
+ Data2 = Data#{buffer := Rest2},
+ Result = parse(Payload, Data2),
+ handle_parse_result(Result, State, Data2);
+ _ ->
+ {keep_state, Data}
+ end;
+ {error, incomplete} ->
+ {keep_state, Data};
+ {error, _Reason} ->
+ %% ?LOG(warning, "Parse error, reason:~p, buffer:~p~n", [Reason, Bin]),
+ {stop, parse_error}
+ end.
+
+%% maybe handshake
+parse(Bin, #{frame_state := undefined}) ->
+ emqx_bridge_syskeeper_frame:parse_handshake(Bin);
+parse(Bin, #{frame_state := State}) ->
+ emqx_bridge_syskeeper_frame:parse(Bin, State).
+
+do_forward(Ack, Messages, Data) ->
+ lists:foreach(
+ fun(Message) ->
+ Msg = emqx_message:from_map(Message#{headers => #{}, extra => #{}}),
+ _ = emqx_broker:safe_publish(Msg)
+ end,
+ Messages
+ ),
+ case Ack of
+ true ->
+ ack(Data);
+ _ ->
+ ok
+ end.
+
+handle_parse_result({ok, Msg}, State, Data) ->
+ handle_packet(Msg, State, Data);
+handle_parse_result({error, _Reason} = Error, State, Data) ->
+ handle_parse_error(Error, State, #{buffer := _Bin} = Data),
+ %% ?LOG(warning, "Parse error, state:~p, reason:~p, buffer:~p~n", [State, Reason, Bin]),
+ {stop, parse_error}.
+
+handle_parse_error(_, handshake, Data) ->
+ ack(Data, false);
+handle_parse_error(_, _, _) ->
+ ok.
+
+handle_packet({FrameState, _Shake}, handshake, Data) ->
+ ack(Data),
+ {next_state, running, Data#{frame_state := FrameState}, {next_event, internal, try_parse}};
+handle_packet(#{type := forward, ack := Ack, messages := Messages}, running, Data) ->
+ do_forward(Ack, Messages, Data),
+ try_parse(running, Data);
+handle_packet(#{type := heartbeat}, running, Data) ->
+ ack(Data),
+ try_parse(running, Data).
diff --git a/apps/emqx_machine/priv/reboot_lists.eterm b/apps/emqx_machine/priv/reboot_lists.eterm
index bb9fc91a6..190ef1afa 100644
--- a/apps/emqx_machine/priv/reboot_lists.eterm
+++ b/apps/emqx_machine/priv/reboot_lists.eterm
@@ -127,7 +127,8 @@
emqx_dashboard_sso,
emqx_audit,
emqx_gateway_gbt32960,
- emqx_gateway_ocpp
+ emqx_gateway_ocpp,
+ emqx_bridge_syskeeper
],
%% must always be of type `load'
ce_business_apps =>
diff --git a/mix.exs b/mix.exs
index 1e6b37d18..23cef80e3 100644
--- a/mix.exs
+++ b/mix.exs
@@ -217,7 +217,8 @@ defmodule EMQXUmbrella.MixProject do
:emqx_dashboard_sso,
:emqx_audit,
:emqx_gateway_gbt32960,
- :emqx_gateway_ocpp
+ :emqx_gateway_ocpp,
+ :emqx_bridge_syskeeper
])
end
diff --git a/rebar.config.erl b/rebar.config.erl
index 54ce0d6c3..93ad9fa99 100644
--- a/rebar.config.erl
+++ b/rebar.config.erl
@@ -113,6 +113,7 @@ is_community_umbrella_app("apps/emqx_dashboard_sso") -> false;
is_community_umbrella_app("apps/emqx_audit") -> false;
is_community_umbrella_app("apps/emqx_gateway_gbt32960") -> false;
is_community_umbrella_app("apps/emqx_gateway_ocpp") -> false;
+is_community_umbrella_app("apps/emqx_bridge_syskeeper") -> false;
is_community_umbrella_app(_) -> true.
is_jq_supported() ->
diff --git a/rel/i18n/emqx_bridge_syskeeper.hocon b/rel/i18n/emqx_bridge_syskeeper.hocon
new file mode 100644
index 000000000..7cf195317
--- /dev/null
+++ b/rel/i18n/emqx_bridge_syskeeper.hocon
@@ -0,0 +1,45 @@
+emqx_bridge_syskeeper {
+
+config_enable.desc:
+"""Enable or disable this bridge"""
+
+config_enable.label:
+"""Enable Or Disable Bridge"""
+
+desc_config.desc:
+"""Configuration for a Syskeeper bridge"""
+
+desc_config.label:
+"""Syskeeper Bridge Configuration"""
+
+desc_name.desc:
+"""Bridge name."""
+
+desc_name.label:
+"""Bridge Name"""
+
+desc_type.desc:
+"""The Bridge Type"""
+
+desc_type.label:
+"""Bridge Type"""
+
+template.desc:
+"""Template"""
+
+template.label:
+"""Template"""
+
+target_topic.desc:
+"""The topic for the forwarded message"""
+
+target_topic.label:
+"""Target Topic"""
+
+target_qos.desc:
+"""The QoS for the forwarded message, -1 is for the original topic"""
+
+target_qos.label:
+"""Target QoS"""
+
+}
diff --git a/rel/i18n/emqx_bridge_syskeeper_connector.hocon b/rel/i18n/emqx_bridge_syskeeper_connector.hocon
new file mode 100644
index 000000000..a057e6647
--- /dev/null
+++ b/rel/i18n/emqx_bridge_syskeeper_connector.hocon
@@ -0,0 +1,21 @@
+emqx_bridge_syskeeper_connector {
+
+server.desc:
+"""The address of the Syskeeper proxy server"""
+
+server.label:
+"""Server"""
+
+ack_mode.desc:
+"""Specify whether the proxy server should reply with an acknowledgement for the message forwarding, can be:
- need_ack
- no_ack
"""
+
+ack_mode.label:
+"""Acknowledgement Mode"""
+
+ack_timeout.desc:
+"""The maximum time to wait for an acknowledgement from the proxy server"""
+
+ack_timeout.label:
+"""Acknowledgement Timeout"""
+
+}
diff --git a/rel/i18n/emqx_bridge_syskeeper_proxy.hocon b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon
new file mode 100644
index 000000000..f6c519216
--- /dev/null
+++ b/rel/i18n/emqx_bridge_syskeeper_proxy.hocon
@@ -0,0 +1,45 @@
+emqx_bridge_syskeeper_proxy {
+
+config_enable.desc:
+"""Enable or disable this bridge"""
+
+config_enable.label:
+"""Enable Or Disable Bridge"""
+
+desc_config.desc:
+"""Configuration for a Syskeeper proxy bridge"""
+
+desc_config.label:
+"""Syskeeper Proxy Bridge Configuration"""
+
+desc_name.desc:
+"""Bridge name"""
+
+desc_name.label:
+"""Bridge Name"""
+
+desc_type.desc:
+"""The Bridge Type"""
+
+desc_type.label:
+"""Bridge Type"""
+
+listen.desc:
+"""The listening address for this Syskeeper proxy server"""
+
+listen.label:
+"""Listen Address"""
+
+acceptors.desc:
+"""The number of the acceptors"""
+
+acceptors.label:
+"""Acceptors"""
+
+handshake_timeout.desc:
+"""The maximum to wait for the handshake when a connection is created"""
+
+handshake_timeout.label:
+"""Handshake Timeout"""
+
+}