feat: add Sparkplug encode and decode functions to the rule engine
Fixes: https://emqx.atlassian.net/browse/EMQX-10429
This commit is contained in:
parent
ca52310f2b
commit
039e27a153
|
@ -19,6 +19,10 @@
|
|||
-include("rule_engine.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
-include_lib("emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl").
|
||||
-endif.
|
||||
|
||||
-elvis([{elvis_style, god_modules, disable}]).
|
||||
|
||||
%% IoT Funcs
|
||||
|
@ -1128,10 +1132,31 @@ timezone_to_offset_seconds(TimeZone) ->
|
|||
%% @end
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
%% EE
|
||||
|
||||
'$handle_undefined_function'(sparkplug_decode, [Data]) ->
|
||||
'$handle_undefined_function'(
|
||||
schema_decode,
|
||||
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>]
|
||||
);
|
||||
'$handle_undefined_function'(sparkplug_decode, [Data | MoreArgs]) ->
|
||||
'$handle_undefined_function'(
|
||||
schema_decode,
|
||||
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
|
||||
);
|
||||
'$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) ->
|
||||
emqx_ee_schema_registry_serde:decode(SchemaId, Data, MoreArgs);
|
||||
'$handle_undefined_function'(schema_decode, Args) ->
|
||||
error({args_count_error, {schema_decode, Args}});
|
||||
'$handle_undefined_function'(sparkplug_encode, [Term]) ->
|
||||
'$handle_undefined_function'(
|
||||
schema_encode,
|
||||
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>]
|
||||
);
|
||||
'$handle_undefined_function'(sparkplug_encode, [Term | MoreArgs]) ->
|
||||
'$handle_undefined_function'(
|
||||
schema_encode,
|
||||
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
|
||||
);
|
||||
'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) ->
|
||||
%% encode outputs iolists, but when the rule actions process those
|
||||
%% it might wrongly encode them as JSON lists, so we force them to
|
||||
|
|
|
@ -12,6 +12,9 @@
|
|||
-define(SERDE_TAB, emqx_ee_schema_registry_serde_tab).
|
||||
-define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab).
|
||||
|
||||
-define(EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME,
|
||||
<<"__CiYAWBja87PleCyKZ58h__SparkPlug_B_BUILT-IN">>
|
||||
).
|
||||
-type schema_name() :: binary().
|
||||
-type schema_source() :: binary().
|
||||
|
||||
|
|
|
@ -0,0 +1,277 @@
|
|||
Eclipse Public License - v 2.0
|
||||
|
||||
THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE
|
||||
PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION
|
||||
OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
|
||||
|
||||
1. DEFINITIONS
|
||||
|
||||
"Contribution" means:
|
||||
|
||||
a) in the case of the initial Contributor, the initial content
|
||||
Distributed under this Agreement, and
|
||||
|
||||
b) in the case of each subsequent Contributor:
|
||||
i) changes to the Program, and
|
||||
ii) additions to the Program;
|
||||
where such changes and/or additions to the Program originate from
|
||||
and are Distributed by that particular Contributor. A Contribution
|
||||
"originates" from a Contributor if it was added to the Program by
|
||||
such Contributor itself or anyone acting on such Contributor's behalf.
|
||||
Contributions do not include changes or additions to the Program that
|
||||
are not Modified Works.
|
||||
|
||||
"Contributor" means any person or entity that Distributes the Program.
|
||||
|
||||
"Licensed Patents" mean patent claims licensable by a Contributor which
|
||||
are necessarily infringed by the use or sale of its Contribution alone
|
||||
or when combined with the Program.
|
||||
|
||||
"Program" means the Contributions Distributed in accordance with this
|
||||
Agreement.
|
||||
|
||||
"Recipient" means anyone who receives the Program under this Agreement
|
||||
or any Secondary License (as applicable), including Contributors.
|
||||
|
||||
"Derivative Works" shall mean any work, whether in Source Code or other
|
||||
form, that is based on (or derived from) the Program and for which the
|
||||
editorial revisions, annotations, elaborations, or other modifications
|
||||
represent, as a whole, an original work of authorship.
|
||||
|
||||
"Modified Works" shall mean any work in Source Code or other form that
|
||||
results from an addition to, deletion from, or modification of the
|
||||
contents of the Program, including, for purposes of clarity any new file
|
||||
in Source Code form that contains any contents of the Program. Modified
|
||||
Works shall not include works that contain only declarations,
|
||||
interfaces, types, classes, structures, or files of the Program solely
|
||||
in each case in order to link to, bind by name, or subclass the Program
|
||||
or Modified Works thereof.
|
||||
|
||||
"Distribute" means the acts of a) distributing or b) making available
|
||||
in any manner that enables the transfer of a copy.
|
||||
|
||||
"Source Code" means the form of a Program preferred for making
|
||||
modifications, including but not limited to software source code,
|
||||
documentation source, and configuration files.
|
||||
|
||||
"Secondary License" means either the GNU General Public License,
|
||||
Version 2.0, or any later versions of that license, including any
|
||||
exceptions or additional permissions as identified by the initial
|
||||
Contributor.
|
||||
|
||||
2. GRANT OF RIGHTS
|
||||
|
||||
a) Subject to the terms of this Agreement, each Contributor hereby
|
||||
grants Recipient a non-exclusive, worldwide, royalty-free copyright
|
||||
license to reproduce, prepare Derivative Works of, publicly display,
|
||||
publicly perform, Distribute and sublicense the Contribution of such
|
||||
Contributor, if any, and such Derivative Works.
|
||||
|
||||
b) Subject to the terms of this Agreement, each Contributor hereby
|
||||
grants Recipient a non-exclusive, worldwide, royalty-free patent
|
||||
license under Licensed Patents to make, use, sell, offer to sell,
|
||||
import and otherwise transfer the Contribution of such Contributor,
|
||||
if any, in Source Code or other form. This patent license shall
|
||||
apply to the combination of the Contribution and the Program if, at
|
||||
the time the Contribution is added by the Contributor, such addition
|
||||
of the Contribution causes such combination to be covered by the
|
||||
Licensed Patents. The patent license shall not apply to any other
|
||||
combinations which include the Contribution. No hardware per se is
|
||||
licensed hereunder.
|
||||
|
||||
c) Recipient understands that although each Contributor grants the
|
||||
licenses to its Contributions set forth herein, no assurances are
|
||||
provided by any Contributor that the Program does not infringe the
|
||||
patent or other intellectual property rights of any other entity.
|
||||
Each Contributor disclaims any liability to Recipient for claims
|
||||
brought by any other entity based on infringement of intellectual
|
||||
property rights or otherwise. As a condition to exercising the
|
||||
rights and licenses granted hereunder, each Recipient hereby
|
||||
assumes sole responsibility to secure any other intellectual
|
||||
property rights needed, if any. For example, if a third party
|
||||
patent license is required to allow Recipient to Distribute the
|
||||
Program, it is Recipient's responsibility to acquire that license
|
||||
before distributing the Program.
|
||||
|
||||
d) Each Contributor represents that to its knowledge it has
|
||||
sufficient copyright rights in its Contribution, if any, to grant
|
||||
the copyright license set forth in this Agreement.
|
||||
|
||||
e) Notwithstanding the terms of any Secondary License, no
|
||||
Contributor makes additional grants to any Recipient (other than
|
||||
those set forth in this Agreement) as a result of such Recipient's
|
||||
receipt of the Program under the terms of a Secondary License
|
||||
(if permitted under the terms of Section 3).
|
||||
|
||||
3. REQUIREMENTS
|
||||
|
||||
3.1 If a Contributor Distributes the Program in any form, then:
|
||||
|
||||
a) the Program must also be made available as Source Code, in
|
||||
accordance with section 3.2, and the Contributor must accompany
|
||||
the Program with a statement that the Source Code for the Program
|
||||
is available under this Agreement, and informs Recipients how to
|
||||
obtain it in a reasonable manner on or through a medium customarily
|
||||
used for software exchange; and
|
||||
|
||||
b) the Contributor may Distribute the Program under a license
|
||||
different than this Agreement, provided that such license:
|
||||
i) effectively disclaims on behalf of all other Contributors all
|
||||
warranties and conditions, express and implied, including
|
||||
warranties or conditions of title and non-infringement, and
|
||||
implied warranties or conditions of merchantability and fitness
|
||||
for a particular purpose;
|
||||
|
||||
ii) effectively excludes on behalf of all other Contributors all
|
||||
liability for damages, including direct, indirect, special,
|
||||
incidental and consequential damages, such as lost profits;
|
||||
|
||||
iii) does not attempt to limit or alter the recipients' rights
|
||||
in the Source Code under section 3.2; and
|
||||
|
||||
iv) requires any subsequent distribution of the Program by any
|
||||
party to be under a license that satisfies the requirements
|
||||
of this section 3.
|
||||
|
||||
3.2 When the Program is Distributed as Source Code:
|
||||
|
||||
a) it must be made available under this Agreement, or if the
|
||||
Program (i) is combined with other material in a separate file or
|
||||
files made available under a Secondary License, and (ii) the initial
|
||||
Contributor attached to the Source Code the notice described in
|
||||
Exhibit A of this Agreement, then the Program may be made available
|
||||
under the terms of such Secondary Licenses, and
|
||||
|
||||
b) a copy of this Agreement must be included with each copy of
|
||||
the Program.
|
||||
|
||||
3.3 Contributors may not remove or alter any copyright, patent,
|
||||
trademark, attribution notices, disclaimers of warranty, or limitations
|
||||
of liability ("notices") contained within the Program from any copy of
|
||||
the Program which they Distribute, provided that Contributors may add
|
||||
their own appropriate notices.
|
||||
|
||||
4. COMMERCIAL DISTRIBUTION
|
||||
|
||||
Commercial distributors of software may accept certain responsibilities
|
||||
with respect to end users, business partners and the like. While this
|
||||
license is intended to facilitate the commercial use of the Program,
|
||||
the Contributor who includes the Program in a commercial product
|
||||
offering should do so in a manner which does not create potential
|
||||
liability for other Contributors. Therefore, if a Contributor includes
|
||||
the Program in a commercial product offering, such Contributor
|
||||
("Commercial Contributor") hereby agrees to defend and indemnify every
|
||||
other Contributor ("Indemnified Contributor") against any losses,
|
||||
damages and costs (collectively "Losses") arising from claims, lawsuits
|
||||
and other legal actions brought by a third party against the Indemnified
|
||||
Contributor to the extent caused by the acts or omissions of such
|
||||
Commercial Contributor in connection with its distribution of the Program
|
||||
in a commercial product offering. The obligations in this section do not
|
||||
apply to any claims or Losses relating to any actual or alleged
|
||||
intellectual property infringement. In order to qualify, an Indemnified
|
||||
Contributor must: a) promptly notify the Commercial Contributor in
|
||||
writing of such claim, and b) allow the Commercial Contributor to control,
|
||||
and cooperate with the Commercial Contributor in, the defense and any
|
||||
related settlement negotiations. The Indemnified Contributor may
|
||||
participate in any such claim at its own expense.
|
||||
|
||||
For example, a Contributor might include the Program in a commercial
|
||||
product offering, Product X. That Contributor is then a Commercial
|
||||
Contributor. If that Commercial Contributor then makes performance
|
||||
claims, or offers warranties related to Product X, those performance
|
||||
claims and warranties are such Commercial Contributor's responsibility
|
||||
alone. Under this section, the Commercial Contributor would have to
|
||||
defend claims against the other Contributors related to those performance
|
||||
claims and warranties, and if a court requires any other Contributor to
|
||||
pay any damages as a result, the Commercial Contributor must pay
|
||||
those damages.
|
||||
|
||||
5. NO WARRANTY
|
||||
|
||||
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
|
||||
PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS"
|
||||
BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR
|
||||
IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF
|
||||
TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR
|
||||
PURPOSE. Each Recipient is solely responsible for determining the
|
||||
appropriateness of using and distributing the Program and assumes all
|
||||
risks associated with its exercise of rights under this Agreement,
|
||||
including but not limited to the risks and costs of program errors,
|
||||
compliance with applicable laws, damage to or loss of data, programs
|
||||
or equipment, and unavailability or interruption of operations.
|
||||
|
||||
6. DISCLAIMER OF LIABILITY
|
||||
|
||||
EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT
|
||||
PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS
|
||||
SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
|
||||
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST
|
||||
PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
||||
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
||||
ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE
|
||||
EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE
|
||||
POSSIBILITY OF SUCH DAMAGES.
|
||||
|
||||
7. GENERAL
|
||||
|
||||
If any provision of this Agreement is invalid or unenforceable under
|
||||
applicable law, it shall not affect the validity or enforceability of
|
||||
the remainder of the terms of this Agreement, and without further
|
||||
action by the parties hereto, such provision shall be reformed to the
|
||||
minimum extent necessary to make such provision valid and enforceable.
|
||||
|
||||
If Recipient institutes patent litigation against any entity
|
||||
(including a cross-claim or counterclaim in a lawsuit) alleging that the
|
||||
Program itself (excluding combinations of the Program with other software
|
||||
or hardware) infringes such Recipient's patent(s), then such Recipient's
|
||||
rights granted under Section 2(b) shall terminate as of the date such
|
||||
litigation is filed.
|
||||
|
||||
All Recipient's rights under this Agreement shall terminate if it
|
||||
fails to comply with any of the material terms or conditions of this
|
||||
Agreement and does not cure such failure in a reasonable period of
|
||||
time after becoming aware of such noncompliance. If all Recipient's
|
||||
rights under this Agreement terminate, Recipient agrees to cease use
|
||||
and distribution of the Program as soon as reasonably practicable.
|
||||
However, Recipient's obligations under this Agreement and any licenses
|
||||
granted by Recipient relating to the Program shall continue and survive.
|
||||
|
||||
Everyone is permitted to copy and distribute copies of this Agreement,
|
||||
but in order to avoid inconsistency the Agreement is copyrighted and
|
||||
may only be modified in the following manner. The Agreement Steward
|
||||
reserves the right to publish new versions (including revisions) of
|
||||
this Agreement from time to time. No one other than the Agreement
|
||||
Steward has the right to modify this Agreement. The Eclipse Foundation
|
||||
is the initial Agreement Steward. The Eclipse Foundation may assign the
|
||||
responsibility to serve as the Agreement Steward to a suitable separate
|
||||
entity. Each new version of the Agreement will be given a distinguishing
|
||||
version number. The Program (including Contributions) may always be
|
||||
Distributed subject to the version of the Agreement under which it was
|
||||
received. In addition, after a new version of the Agreement is published,
|
||||
Contributor may elect to Distribute the Program (including its
|
||||
Contributions) under the new version.
|
||||
|
||||
Except as expressly stated in Sections 2(a) and 2(b) above, Recipient
|
||||
receives no rights or licenses to the intellectual property of any
|
||||
Contributor under this Agreement, whether expressly, by implication,
|
||||
estoppel or otherwise. All rights in the Program not expressly granted
|
||||
under this Agreement are reserved. Nothing in this Agreement is intended
|
||||
to be enforceable by any entity that is not a Contributor or Recipient.
|
||||
No third-party beneficiary rights are created under this Agreement.
|
||||
|
||||
Exhibit A - Form of Secondary Licenses Notice
|
||||
|
||||
"This Source Code may also be made available under the following
|
||||
Secondary Licenses when the conditions for such availability set forth
|
||||
in the Eclipse Public License, v. 2.0 are satisfied: {name license(s),
|
||||
version(s), and exceptions or additional permissions here}."
|
||||
|
||||
Simply including a copy of this Agreement, including this Exhibit A
|
||||
is not sufficient to license the Source Code under Secondary Licenses.
|
||||
|
||||
If it is not possible or desirable to put the notice in a particular
|
||||
file, then You may include the notice in a location (such as a LICENSE
|
||||
file in a relevant directory) where a recipient would be likely to
|
||||
look for such a notice.
|
||||
|
||||
You may add additional accurate notices of copyright ownership.
|
|
@ -0,0 +1,229 @@
|
|||
// Downloaded from: https://github.com/eclipse/tahu/blob/46f25e79f34234e6145d11108660dfd9133ae50d/sparkplug_b/sparkplug_b.proto
|
||||
//
|
||||
// License for this file is located in the same directory as this file.
|
||||
//
|
||||
// * Copyright (c) 2015, 2018 Cirrus Link Solutions and others
|
||||
// *
|
||||
// * This program and the accompanying materials are made available under the
|
||||
// * terms of the Eclipse Public License 2.0 which is available at
|
||||
// * http://www.eclipse.org/legal/epl-2.0.
|
||||
// *
|
||||
// * SPDX-License-Identifier: EPL-2.0
|
||||
// *
|
||||
// * Contributors:
|
||||
// * Cirrus Link Solutions - initial implementation
|
||||
|
||||
//
|
||||
// To compile:
|
||||
// cd client_libraries/java
|
||||
// protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto
|
||||
//
|
||||
|
||||
|
||||
syntax = "proto2";
|
||||
|
||||
package org.eclipse.tahu.protobuf;
|
||||
|
||||
option java_package = "org.eclipse.tahu.protobuf";
|
||||
option java_outer_classname = "SparkplugBProto";
|
||||
|
||||
enum DataType {
|
||||
// Indexes of Data Types
|
||||
|
||||
// Unknown placeholder for future expansion.
|
||||
Unknown = 0;
|
||||
|
||||
// Basic Types
|
||||
Int8 = 1;
|
||||
Int16 = 2;
|
||||
Int32 = 3;
|
||||
Int64 = 4;
|
||||
UInt8 = 5;
|
||||
UInt16 = 6;
|
||||
UInt32 = 7;
|
||||
UInt64 = 8;
|
||||
Float = 9;
|
||||
Double = 10;
|
||||
Boolean = 11;
|
||||
String = 12;
|
||||
DateTime = 13;
|
||||
Text = 14;
|
||||
|
||||
// Additional Metric Types
|
||||
UUID = 15;
|
||||
DataSet = 16;
|
||||
Bytes = 17;
|
||||
File = 18;
|
||||
Template = 19;
|
||||
|
||||
// Additional PropertyValue Types
|
||||
PropertySet = 20;
|
||||
PropertySetList = 21;
|
||||
|
||||
// Array Types
|
||||
Int8Array = 22;
|
||||
Int16Array = 23;
|
||||
Int32Array = 24;
|
||||
Int64Array = 25;
|
||||
UInt8Array = 26;
|
||||
UInt16Array = 27;
|
||||
UInt32Array = 28;
|
||||
UInt64Array = 29;
|
||||
FloatArray = 30;
|
||||
DoubleArray = 31;
|
||||
BooleanArray = 32;
|
||||
StringArray = 33;
|
||||
DateTimeArray = 34;
|
||||
}
|
||||
|
||||
message Payload {
|
||||
|
||||
message Template {
|
||||
|
||||
message Parameter {
|
||||
optional string name = 1;
|
||||
optional uint32 type = 2;
|
||||
|
||||
oneof value {
|
||||
uint32 int_value = 3;
|
||||
uint64 long_value = 4;
|
||||
float float_value = 5;
|
||||
double double_value = 6;
|
||||
bool boolean_value = 7;
|
||||
string string_value = 8;
|
||||
ParameterValueExtension extension_value = 9;
|
||||
}
|
||||
|
||||
message ParameterValueExtension {
|
||||
extensions 1 to max;
|
||||
}
|
||||
}
|
||||
|
||||
optional string version = 1; // The version of the Template to prevent mismatches
|
||||
repeated Metric metrics = 2; // Each metric includes a name, datatype, and optionally a value
|
||||
repeated Parameter parameters = 3;
|
||||
optional string template_ref = 4; // MUST be a reference to a template definition if this is an instance (i.e. the name of the template definition) - MUST be omitted for template definitions
|
||||
optional bool is_definition = 5;
|
||||
extensions 6 to max;
|
||||
}
|
||||
|
||||
message DataSet {
|
||||
|
||||
message DataSetValue {
|
||||
|
||||
oneof value {
|
||||
uint32 int_value = 1;
|
||||
uint64 long_value = 2;
|
||||
float float_value = 3;
|
||||
double double_value = 4;
|
||||
bool boolean_value = 5;
|
||||
string string_value = 6;
|
||||
DataSetValueExtension extension_value = 7;
|
||||
}
|
||||
|
||||
message DataSetValueExtension {
|
||||
extensions 1 to max;
|
||||
}
|
||||
}
|
||||
|
||||
message Row {
|
||||
repeated DataSetValue elements = 1;
|
||||
extensions 2 to max; // For third party extensions
|
||||
}
|
||||
|
||||
optional uint64 num_of_columns = 1;
|
||||
repeated string columns = 2;
|
||||
repeated uint32 types = 3;
|
||||
repeated Row rows = 4;
|
||||
extensions 5 to max; // For third party extensions
|
||||
}
|
||||
|
||||
message PropertyValue {
|
||||
|
||||
optional uint32 type = 1;
|
||||
optional bool is_null = 2;
|
||||
|
||||
oneof value {
|
||||
uint32 int_value = 3;
|
||||
uint64 long_value = 4;
|
||||
float float_value = 5;
|
||||
double double_value = 6;
|
||||
bool boolean_value = 7;
|
||||
string string_value = 8;
|
||||
PropertySet propertyset_value = 9;
|
||||
PropertySetList propertysets_value = 10; // List of Property Values
|
||||
PropertyValueExtension extension_value = 11;
|
||||
}
|
||||
|
||||
message PropertyValueExtension {
|
||||
extensions 1 to max;
|
||||
}
|
||||
}
|
||||
|
||||
message PropertySet {
|
||||
repeated string keys = 1; // Names of the properties
|
||||
repeated PropertyValue values = 2;
|
||||
extensions 3 to max;
|
||||
}
|
||||
|
||||
message PropertySetList {
|
||||
repeated PropertySet propertyset = 1;
|
||||
extensions 2 to max;
|
||||
}
|
||||
|
||||
message MetaData {
|
||||
// Bytes specific metadata
|
||||
optional bool is_multi_part = 1;
|
||||
|
||||
// General metadata
|
||||
optional string content_type = 2; // Content/Media type
|
||||
optional uint64 size = 3; // File size, String size, Multi-part size, etc
|
||||
optional uint64 seq = 4; // Sequence number for multi-part messages
|
||||
|
||||
// File metadata
|
||||
optional string file_name = 5; // File name
|
||||
optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc)
|
||||
optional string md5 = 7; // md5 of data
|
||||
|
||||
// Catchalls and future expansion
|
||||
optional string description = 8; // Could be anything such as json or xml of custom properties
|
||||
extensions 9 to max;
|
||||
}
|
||||
|
||||
message Metric {
|
||||
|
||||
optional string name = 1; // Metric name - should only be included on birth
|
||||
optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages
|
||||
optional uint64 timestamp = 3; // Timestamp associated with data acquisition time
|
||||
optional uint32 datatype = 4; // DataType of the metric/tag value
|
||||
optional bool is_historical = 5; // If this is historical data and should not update real time tag
|
||||
optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag
|
||||
optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes.
|
||||
optional MetaData metadata = 8; // Metadata for the payload
|
||||
optional PropertySet properties = 9;
|
||||
|
||||
oneof value {
|
||||
uint32 int_value = 10;
|
||||
uint64 long_value = 11;
|
||||
float float_value = 12;
|
||||
double double_value = 13;
|
||||
bool boolean_value = 14;
|
||||
string string_value = 15;
|
||||
bytes bytes_value = 16; // Bytes, File
|
||||
DataSet dataset_value = 17;
|
||||
Template template_value = 18;
|
||||
MetricValueExtension extension_value = 19;
|
||||
}
|
||||
|
||||
message MetricValueExtension {
|
||||
extensions 1 to max;
|
||||
}
|
||||
}
|
||||
|
||||
optional uint64 timestamp = 1; // Timestamp at message sending time
|
||||
repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs
|
||||
optional uint64 seq = 3; // Sequence number
|
||||
optional string uuid = 4; // UUID to track message type in terms of schema definitions
|
||||
optional bytes body = 5; // To optionally bypass the whole definition above
|
||||
extensions 6 to max; // For third party extensions
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_ee_schema_registry, [
|
||||
{description, "EMQX Schema Registry"},
|
||||
{vsn, "0.1.4"},
|
||||
{vsn, "0.1.5"},
|
||||
{registered, [emqx_ee_schema_registry_sup]},
|
||||
{mod, {emqx_ee_schema_registry_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -38,6 +38,11 @@
|
|||
import_config/1
|
||||
]).
|
||||
|
||||
%% For testing
|
||||
-export([
|
||||
ensure_serde_absent/1
|
||||
]).
|
||||
|
||||
-type schema() :: #{
|
||||
type := serde_type(),
|
||||
source := binary(),
|
||||
|
@ -232,11 +237,46 @@ create_tables() ->
|
|||
ok.
|
||||
|
||||
do_build_serdes(Schemas) ->
|
||||
%% We build a special serde for the Sparkplug B payload. This serde is used
|
||||
%% by the rule engine functions sparkplug_decode/1 and sparkplug_encode/1.
|
||||
maybe_build_sparkplug_b_serde(),
|
||||
%% TODO: use some kind of mutex to make each core build a
|
||||
%% different serde to avoid duplicate work. Maybe ekka_locker?
|
||||
maps:foreach(fun do_build_serde/2, Schemas),
|
||||
?tp(schema_registry_serdes_built, #{}).
|
||||
|
||||
maybe_build_sparkplug_b_serde() ->
|
||||
case get_schema(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) of
|
||||
{error, not_found} ->
|
||||
do_build_serde_no_check(
|
||||
?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME,
|
||||
#{
|
||||
type => protobuf,
|
||||
source => get_schema_source(<<"sparkplug_b.proto">>)
|
||||
}
|
||||
);
|
||||
{ok, _} ->
|
||||
ok
|
||||
end.
|
||||
|
||||
get_schema_source(Filename) ->
|
||||
{ok, App} = application:get_application(),
|
||||
FilePath =
|
||||
case code:priv_dir(App) of
|
||||
{error, bad_name} ->
|
||||
erlang:error(
|
||||
{error, <<"Could not find data directory (priv) for Schema Registry">>}
|
||||
);
|
||||
Dir ->
|
||||
filename:join(Dir, Filename)
|
||||
end,
|
||||
case file:read_file(FilePath) of
|
||||
{ok, Content} ->
|
||||
Content;
|
||||
{error, Reason} ->
|
||||
erlang:error({error, Reason})
|
||||
end.
|
||||
|
||||
build_serdes(Serdes) ->
|
||||
build_serdes(Serdes, []).
|
||||
|
||||
|
@ -251,9 +291,18 @@ build_serdes([{Name, Params} | Rest], Acc0) ->
|
|||
build_serdes([], _Acc) ->
|
||||
ok.
|
||||
|
||||
do_build_serde(Name0, #{type := Type, source := Source}) ->
|
||||
do_build_serde(Name, Serde) when not is_binary(Name) ->
|
||||
do_build_serde(to_bin(Name), Serde);
|
||||
do_build_serde(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, _Serde) ->
|
||||
{error,
|
||||
erlang:iolist_to_binary(
|
||||
io_lib:format("Illigal schema name ~s", [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME])
|
||||
)};
|
||||
do_build_serde(Name, Serde) ->
|
||||
do_build_serde_no_check(Name, Serde).
|
||||
|
||||
do_build_serde_no_check(Name, #{type := Type, source := Source}) ->
|
||||
try
|
||||
Name = to_bin(Name0),
|
||||
{Serializer, Deserializer, Destructor} =
|
||||
emqx_ee_schema_registry_serde:make_serde(Type, Name, Source),
|
||||
Serde = #serde{
|
||||
|
@ -270,7 +319,7 @@ do_build_serde(Name0, #{type := Type, source := Source}) ->
|
|||
error,
|
||||
#{
|
||||
msg => "error_building_serde",
|
||||
name => Name0,
|
||||
name => Name,
|
||||
type => Type,
|
||||
kind => Kind,
|
||||
error => Error,
|
||||
|
@ -280,11 +329,15 @@ do_build_serde(Name0, #{type := Type, source := Source}) ->
|
|||
{error, Error}
|
||||
end.
|
||||
|
||||
ensure_serde_absent(Name) when not is_binary(Name) ->
|
||||
ensure_serde_absent(to_bin(Name));
|
||||
% ensure_serde_absent(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) ->
|
||||
% {error, <<"Cannot delete serde for Sparkplug B schema">>};
|
||||
ensure_serde_absent(Name) ->
|
||||
case get_serde(Name) of
|
||||
{ok, #{destructor := Destructor}} ->
|
||||
Destructor(),
|
||||
ok = mria:dirty_delete(?SERDE_TAB, to_bin(Name));
|
||||
ok = mria:dirty_delete(?SERDE_TAB, Name);
|
||||
{error, not_found} ->
|
||||
ok
|
||||
end.
|
||||
|
|
|
@ -96,7 +96,7 @@ inject_avro_name(Name, Source0) ->
|
|||
-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
|
||||
make_protobuf_serde_mod(Name, Source) ->
|
||||
{SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
|
||||
case lazy_generate_protobuf_code(SerdeMod0, Source) of
|
||||
case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of
|
||||
{ok, SerdeMod, ModBinary} ->
|
||||
load_code(SerdeMod, SerdeModFileName, ModBinary),
|
||||
SerdeMod;
|
||||
|
@ -121,30 +121,30 @@ protobuf_serde_mod_name(Name) ->
|
|||
SerdeModFileName = SerdeModName ++ ".memory",
|
||||
{SerdeMod, SerdeModFileName}.
|
||||
|
||||
-spec lazy_generate_protobuf_code(module(), schema_source()) ->
|
||||
-spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) ->
|
||||
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
|
||||
lazy_generate_protobuf_code(SerdeMod0, Source) ->
|
||||
lazy_generate_protobuf_code(Name, SerdeMod0, Source) ->
|
||||
%% We run this inside a transaction with locks to avoid running
|
||||
%% the compile on all nodes; only one will get the lock, compile
|
||||
%% the schema, and other nodes will simply read the final result.
|
||||
{atomic, Res} = mria:transaction(
|
||||
?SCHEMA_REGISTRY_SHARD,
|
||||
fun lazy_generate_protobuf_code_trans/2,
|
||||
[SerdeMod0, Source]
|
||||
fun lazy_generate_protobuf_code_trans/3,
|
||||
[Name, SerdeMod0, Source]
|
||||
),
|
||||
Res.
|
||||
|
||||
-spec lazy_generate_protobuf_code_trans(module(), schema_source()) ->
|
||||
-spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) ->
|
||||
{ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}.
|
||||
lazy_generate_protobuf_code_trans(SerdeMod0, Source) ->
|
||||
lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) ->
|
||||
Fingerprint = erlang:md5(Source),
|
||||
_ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write),
|
||||
case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of
|
||||
[#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] ->
|
||||
?tp(schema_registry_protobuf_cache_hit, #{}),
|
||||
?tp(schema_registry_protobuf_cache_hit, #{name => Name}),
|
||||
{ok, SerdeMod, ModBinary};
|
||||
[] ->
|
||||
?tp(schema_registry_protobuf_cache_miss, #{}),
|
||||
?tp(schema_registry_protobuf_cache_miss, #{name => Name}),
|
||||
case generate_protobuf_code(SerdeMod0, Source) of
|
||||
{ok, SerdeMod, ModBinary} ->
|
||||
CacheEntry = #protobuf_cache{
|
||||
|
|
Loading…
Reference in New Issue