diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 4a2cb2ea1..f0640c7dc 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -84,6 +84,12 @@ import_config/1 ]). +%% For setting and getting extra rule engine SQL functions module +-export([ + extra_functions_module/0, + set_extra_functions_module/1 +]). + -define(RULE_ENGINE, ?MODULE). -define(T_CALL, infinity). @@ -542,3 +548,21 @@ get_egress_bridges(Actions) -> emqx_bridge_resource:bridge_id(BridgeType, BridgeName) || {bridge, BridgeType, BridgeName, _ResId} <- Actions ]. + +%% For allowing an external application to add extra "built-in" functions to the +%% rule engine SQL like language. The module set by +%% set_extra_functions_module/1 should export a function called +%% handle_rule_function with two parameters (the first being an atom for the +%% the function name and the second a list of arguments). The function should +%% should return the result or {error, no_match_for_function} if it cannot +%% handle the function. See '$handle_undefined_function' in the emqx_rule_funcs +%% module. See also callback function declaration in emqx_rule_funcs.erl. + +-spec extra_functions_module() -> module() | undefined. +extra_functions_module() -> + persistent_term:get({?MODULE, extra_functions}, undefined). + +-spec set_extra_functions_module(module()) -> ok. +set_extra_functions_module(Mod) -> + persistent_term:put({?MODULE, extra_functions}, Mod), + ok. diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 954c8fa88..fc18ed4f0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -232,6 +232,10 @@ timezone_to_offset_seconds/1 ]). +%% See extra_functions_module/0 and set_extra_functions_module/1 in the +%% emqx_rule_engine module +-callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}. + %% MongoDB specific date functions. These functions return a date tuple. The %% MongoDB bridge converts such date tuples to a MongoDB date type. The %% following functions are therefore only useful for rules with at least one @@ -1141,35 +1145,27 @@ timezone_to_second(TimeZone) -> timezone_to_offset_seconds(TimeZone) -> emqx_calendar:offset_second(TimeZone). -%% @doc This is for sql funcs that should be handled in the specific modules. -%% Here the emqx_rule_funcs module acts as a proxy, forwarding -%% the function handling to the worker module. -%% @end --if(?EMQX_RELEASE_EDITION == ee). -%% EE -'$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) -> - emqx_ee_schema_registry_serde:decode(SchemaId, Data, MoreArgs); -'$handle_undefined_function'(schema_decode, Args) -> - error({args_count_error, {schema_decode, Args}}); -'$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) -> - %% encode outputs iolists, but when the rule actions process those - %% it might wrongly encode them as JSON lists, so we force them to - %% binaries here. - IOList = emqx_ee_schema_registry_serde:encode(SchemaId, Term, MoreArgs), - iolist_to_binary(IOList); -'$handle_undefined_function'(schema_encode, Args) -> - error({args_count_error, {schema_encode, Args}}); '$handle_undefined_function'(sprintf, [Format | Args]) -> erlang:apply(fun sprintf_s/2, [Format, Args]); -'$handle_undefined_function'(Fun, Args) -> - error({sql_function_not_supported, function_literal(Fun, Args)}). --else. -%% CE -'$handle_undefined_function'(sprintf, [Format | Args]) -> - erlang:apply(fun sprintf_s/2, [Format, Args]); -'$handle_undefined_function'(Fun, Args) -> - error({sql_function_not_supported, function_literal(Fun, Args)}). --endif. +%% This is for functions that should be handled in another module +%% (currently this module is emqx_ee_schema_registry_serde in the case of EE but +%% could be changed to another module in the future). +'$handle_undefined_function'(FunctionName, Args) -> + case emqx_rule_engine:extra_functions_module() of + undefined -> + throw_sql_function_not_supported(FunctionName, Args); + Mod -> + case Mod:handle_rule_function(FunctionName, Args) of + {error, no_match_for_function} -> + throw_sql_function_not_supported(FunctionName, Args); + Result -> + Result + end + end. + +-spec throw_sql_function_not_supported(atom(), list()) -> no_return(). +throw_sql_function_not_supported(FunctionName, Args) -> + error({sql_function_not_supported, function_literal(FunctionName, Args)}). map_path(Key) -> {path, [{key, P} || P <- string:split(Key, ".", all)]}. diff --git a/changes/ee/feat-11169.en.md b/changes/ee/feat-11169.en.md new file mode 100644 index 000000000..25c758449 --- /dev/null +++ b/changes/ee/feat-11169.en.md @@ -0,0 +1,26 @@ +Two new built-in functions `sparkplug_decode` and `sparkplug_encode` have been added to the rule engine SQL like language. These functions are used to decode and encode Sparkplug B messages. The functions are used as follows: + +Decode a Sparkplug B message: + +```sql +select + sparkplug_decode(payload) as decoded +from t + +``` + +Encode a Sparkplug B message: + +```sql +select + sparkplug_encode(json_decode(payload)) as encoded +from t +``` + +One can also specify a Sparkplug B message type by specifying it as the second argument to the `sparkplug_decode` and `sparkplug_encode` functions. The default is `Payload`: + +```sql +select + sparkplug_encode(sparkplug_decode(payload, 'Payload'), 'Payload') as encoded +from t +``` diff --git a/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl b/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl index 058abf007..86a3a55e0 100644 --- a/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl +++ b/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl @@ -12,6 +12,9 @@ -define(SERDE_TAB, emqx_ee_schema_registry_serde_tab). -define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab). +-define(EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, + <<"__CiYAWBja87PleCyKZ58h__SparkPlug_B_BUILT-IN">> +). -type schema_name() :: binary(). -type schema_source() :: binary(). diff --git a/lib-ee/emqx_ee_schema_registry/priv/LICENSE b/lib-ee/emqx_ee_schema_registry/priv/LICENSE new file mode 100644 index 000000000..d3087e4c5 --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/priv/LICENSE @@ -0,0 +1,277 @@ +Eclipse Public License - v 2.0 + + THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE + PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION + OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + + a) in the case of the initial Contributor, the initial content + Distributed under this Agreement, and + + b) in the case of each subsequent Contributor: + i) changes to the Program, and + ii) additions to the Program; + where such changes and/or additions to the Program originate from + and are Distributed by that particular Contributor. A Contribution + "originates" from a Contributor if it was added to the Program by + such Contributor itself or anyone acting on such Contributor's behalf. + Contributions do not include changes or additions to the Program that + are not Modified Works. + +"Contributor" means any person or entity that Distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which +are necessarily infringed by the use or sale of its Contribution alone +or when combined with the Program. + +"Program" means the Contributions Distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement +or any Secondary License (as applicable), including Contributors. + +"Derivative Works" shall mean any work, whether in Source Code or other +form, that is based on (or derived from) the Program and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. + +"Modified Works" shall mean any work in Source Code or other form that +results from an addition to, deletion from, or modification of the +contents of the Program, including, for purposes of clarity any new file +in Source Code form that contains any contents of the Program. Modified +Works shall not include works that contain only declarations, +interfaces, types, classes, structures, or files of the Program solely +in each case in order to link to, bind by name, or subclass the Program +or Modified Works thereof. + +"Distribute" means the acts of a) distributing or b) making available +in any manner that enables the transfer of a copy. + +"Source Code" means the form of a Program preferred for making +modifications, including but not limited to software source code, +documentation source, and configuration files. + +"Secondary License" means either the GNU General Public License, +Version 2.0, or any later versions of that license, including any +exceptions or additional permissions as identified by the initial +Contributor. + +2. GRANT OF RIGHTS + + a) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free copyright + license to reproduce, prepare Derivative Works of, publicly display, + publicly perform, Distribute and sublicense the Contribution of such + Contributor, if any, and such Derivative Works. + + b) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free patent + license under Licensed Patents to make, use, sell, offer to sell, + import and otherwise transfer the Contribution of such Contributor, + if any, in Source Code or other form. This patent license shall + apply to the combination of the Contribution and the Program if, at + the time the Contribution is added by the Contributor, such addition + of the Contribution causes such combination to be covered by the + Licensed Patents. The patent license shall not apply to any other + combinations which include the Contribution. No hardware per se is + licensed hereunder. + + c) Recipient understands that although each Contributor grants the + licenses to its Contributions set forth herein, no assurances are + provided by any Contributor that the Program does not infringe the + patent or other intellectual property rights of any other entity. + Each Contributor disclaims any liability to Recipient for claims + brought by any other entity based on infringement of intellectual + property rights or otherwise. As a condition to exercising the + rights and licenses granted hereunder, each Recipient hereby + assumes sole responsibility to secure any other intellectual + property rights needed, if any. For example, if a third party + patent license is required to allow Recipient to Distribute the + Program, it is Recipient's responsibility to acquire that license + before distributing the Program. + + d) Each Contributor represents that to its knowledge it has + sufficient copyright rights in its Contribution, if any, to grant + the copyright license set forth in this Agreement. + + e) Notwithstanding the terms of any Secondary License, no + Contributor makes additional grants to any Recipient (other than + those set forth in this Agreement) as a result of such Recipient's + receipt of the Program under the terms of a Secondary License + (if permitted under the terms of Section 3). + +3. REQUIREMENTS + +3.1 If a Contributor Distributes the Program in any form, then: + + a) the Program must also be made available as Source Code, in + accordance with section 3.2, and the Contributor must accompany + the Program with a statement that the Source Code for the Program + is available under this Agreement, and informs Recipients how to + obtain it in a reasonable manner on or through a medium customarily + used for software exchange; and + + b) the Contributor may Distribute the Program under a license + different than this Agreement, provided that such license: + i) effectively disclaims on behalf of all other Contributors all + warranties and conditions, express and implied, including + warranties or conditions of title and non-infringement, and + implied warranties or conditions of merchantability and fitness + for a particular purpose; + + ii) effectively excludes on behalf of all other Contributors all + liability for damages, including direct, indirect, special, + incidental and consequential damages, such as lost profits; + + iii) does not attempt to limit or alter the recipients' rights + in the Source Code under section 3.2; and + + iv) requires any subsequent distribution of the Program by any + party to be under a license that satisfies the requirements + of this section 3. + +3.2 When the Program is Distributed as Source Code: + + a) it must be made available under this Agreement, or if the + Program (i) is combined with other material in a separate file or + files made available under a Secondary License, and (ii) the initial + Contributor attached to the Source Code the notice described in + Exhibit A of this Agreement, then the Program may be made available + under the terms of such Secondary Licenses, and + + b) a copy of this Agreement must be included with each copy of + the Program. + +3.3 Contributors may not remove or alter any copyright, patent, +trademark, attribution notices, disclaimers of warranty, or limitations +of liability ("notices") contained within the Program from any copy of +the Program which they Distribute, provided that Contributors may add +their own appropriate notices. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities +with respect to end users, business partners and the like. While this +license is intended to facilitate the commercial use of the Program, +the Contributor who includes the Program in a commercial product +offering should do so in a manner which does not create potential +liability for other Contributors. Therefore, if a Contributor includes +the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and indemnify every +other Contributor ("Indemnified Contributor") against any losses, +damages and costs (collectively "Losses") arising from claims, lawsuits +and other legal actions brought by a third party against the Indemnified +Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program +in a commercial product offering. The obligations in this section do not +apply to any claims or Losses relating to any actual or alleged +intellectual property infringement. In order to qualify, an Indemnified +Contributor must: a) promptly notify the Commercial Contributor in +writing of such claim, and b) allow the Commercial Contributor to control, +and cooperate with the Commercial Contributor in, the defense and any +related settlement negotiations. The Indemnified Contributor may +participate in any such claim at its own expense. + +For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those performance +claims and warranties, and if a court requires any other Contributor to +pay any damages as a result, the Commercial Contributor must pay +those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS" +BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR +IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF +TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR +PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all +risks associated with its exercise of rights under this Agreement, +including but not limited to the risks and costs of program errors, +compliance with applicable laws, damage to or loss of data, programs +or equipment, and unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS +SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST +PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further +action by the parties hereto, such provision shall be reformed to the +minimum extent necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other software +or hardware) infringes such Recipient's patent(s), then such Recipient's +rights granted under Section 2(b) shall terminate as of the date such +litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of +time after becoming aware of such noncompliance. If all Recipient's +rights under this Agreement terminate, Recipient agrees to cease use +and distribution of the Program as soon as reasonably practicable. +However, Recipient's obligations under this Agreement and any licenses +granted by Recipient relating to the Program shall continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, +but in order to avoid inconsistency the Agreement is copyrighted and +may only be modified in the following manner. The Agreement Steward +reserves the right to publish new versions (including revisions) of +this Agreement from time to time. No one other than the Agreement +Steward has the right to modify this Agreement. The Eclipse Foundation +is the initial Agreement Steward. The Eclipse Foundation may assign the +responsibility to serve as the Agreement Steward to a suitable separate +entity. Each new version of the Agreement will be given a distinguishing +version number. The Program (including Contributions) may always be +Distributed subject to the version of the Agreement under which it was +received. In addition, after a new version of the Agreement is published, +Contributor may elect to Distribute the Program (including its +Contributions) under the new version. + +Except as expressly stated in Sections 2(a) and 2(b) above, Recipient +receives no rights or licenses to the intellectual property of any +Contributor under this Agreement, whether expressly, by implication, +estoppel or otherwise. All rights in the Program not expressly granted +under this Agreement are reserved. Nothing in this Agreement is intended +to be enforceable by any entity that is not a Contributor or Recipient. +No third-party beneficiary rights are created under this Agreement. + +Exhibit A - Form of Secondary Licenses Notice + +"This Source Code may also be made available under the following +Secondary Licenses when the conditions for such availability set forth +in the Eclipse Public License, v. 2.0 are satisfied: {name license(s), +version(s), and exceptions or additional permissions here}." + + Simply including a copy of this Agreement, including this Exhibit A + is not sufficient to license the Source Code under Secondary Licenses. + + If it is not possible or desirable to put the notice in a particular + file, then You may include the notice in a location (such as a LICENSE + file in a relevant directory) where a recipient would be likely to + look for such a notice. + + You may add additional accurate notices of copyright ownership. diff --git a/lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto b/lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto new file mode 100644 index 000000000..914ce726f --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto @@ -0,0 +1,229 @@ +// Downloaded from: https://github.com/eclipse/tahu/blob/46f25e79f34234e6145d11108660dfd9133ae50d/sparkplug_b/sparkplug_b.proto +// +// License for this file is located in the same directory as this file. +// +// * Copyright (c) 2015, 2018 Cirrus Link Solutions and others +// * +// * This program and the accompanying materials are made available under the +// * terms of the Eclipse Public License 2.0 which is available at +// * http://www.eclipse.org/legal/epl-2.0. +// * +// * SPDX-License-Identifier: EPL-2.0 +// * +// * Contributors: +// * Cirrus Link Solutions - initial implementation + +// +// To compile: +// cd client_libraries/java +// protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto +// + + +syntax = "proto2"; + +package org.eclipse.tahu.protobuf; + +option java_package = "org.eclipse.tahu.protobuf"; +option java_outer_classname = "SparkplugBProto"; + +enum DataType { + // Indexes of Data Types + + // Unknown placeholder for future expansion. + Unknown = 0; + + // Basic Types + Int8 = 1; + Int16 = 2; + Int32 = 3; + Int64 = 4; + UInt8 = 5; + UInt16 = 6; + UInt32 = 7; + UInt64 = 8; + Float = 9; + Double = 10; + Boolean = 11; + String = 12; + DateTime = 13; + Text = 14; + + // Additional Metric Types + UUID = 15; + DataSet = 16; + Bytes = 17; + File = 18; + Template = 19; + + // Additional PropertyValue Types + PropertySet = 20; + PropertySetList = 21; + + // Array Types + Int8Array = 22; + Int16Array = 23; + Int32Array = 24; + Int64Array = 25; + UInt8Array = 26; + UInt16Array = 27; + UInt32Array = 28; + UInt64Array = 29; + FloatArray = 30; + DoubleArray = 31; + BooleanArray = 32; + StringArray = 33; + DateTimeArray = 34; +} + +message Payload { + + message Template { + + message Parameter { + optional string name = 1; + optional uint32 type = 2; + + oneof value { + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; + ParameterValueExtension extension_value = 9; + } + + message ParameterValueExtension { + extensions 1 to max; + } + } + + optional string version = 1; // The version of the Template to prevent mismatches + repeated Metric metrics = 2; // Each metric includes a name, datatype, and optionally a value + repeated Parameter parameters = 3; + optional string template_ref = 4; // MUST be a reference to a template definition if this is an instance (i.e. the name of the template definition) - MUST be omitted for template definitions + optional bool is_definition = 5; + extensions 6 to max; + } + + message DataSet { + + message DataSetValue { + + oneof value { + uint32 int_value = 1; + uint64 long_value = 2; + float float_value = 3; + double double_value = 4; + bool boolean_value = 5; + string string_value = 6; + DataSetValueExtension extension_value = 7; + } + + message DataSetValueExtension { + extensions 1 to max; + } + } + + message Row { + repeated DataSetValue elements = 1; + extensions 2 to max; // For third party extensions + } + + optional uint64 num_of_columns = 1; + repeated string columns = 2; + repeated uint32 types = 3; + repeated Row rows = 4; + extensions 5 to max; // For third party extensions + } + + message PropertyValue { + + optional uint32 type = 1; + optional bool is_null = 2; + + oneof value { + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; + PropertySet propertyset_value = 9; + PropertySetList propertysets_value = 10; // List of Property Values + PropertyValueExtension extension_value = 11; + } + + message PropertyValueExtension { + extensions 1 to max; + } + } + + message PropertySet { + repeated string keys = 1; // Names of the properties + repeated PropertyValue values = 2; + extensions 3 to max; + } + + message PropertySetList { + repeated PropertySet propertyset = 1; + extensions 2 to max; + } + + message MetaData { + // Bytes specific metadata + optional bool is_multi_part = 1; + + // General metadata + optional string content_type = 2; // Content/Media type + optional uint64 size = 3; // File size, String size, Multi-part size, etc + optional uint64 seq = 4; // Sequence number for multi-part messages + + // File metadata + optional string file_name = 5; // File name + optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc) + optional string md5 = 7; // md5 of data + + // Catchalls and future expansion + optional string description = 8; // Could be anything such as json or xml of custom properties + extensions 9 to max; + } + + message Metric { + + optional string name = 1; // Metric name - should only be included on birth + optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages + optional uint64 timestamp = 3; // Timestamp associated with data acquisition time + optional uint32 datatype = 4; // DataType of the metric/tag value + optional bool is_historical = 5; // If this is historical data and should not update real time tag + optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag + optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes. + optional MetaData metadata = 8; // Metadata for the payload + optional PropertySet properties = 9; + + oneof value { + uint32 int_value = 10; + uint64 long_value = 11; + float float_value = 12; + double double_value = 13; + bool boolean_value = 14; + string string_value = 15; + bytes bytes_value = 16; // Bytes, File + DataSet dataset_value = 17; + Template template_value = 18; + MetricValueExtension extension_value = 19; + } + + message MetricValueExtension { + extensions 1 to max; + } + } + + optional uint64 timestamp = 1; // Timestamp at message sending time + repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs + optional uint64 seq = 3; // Sequence number + optional string uuid = 4; // UUID to track message type in terms of schema definitions + optional bytes body = 5; // To optionally bypass the whole definition above + extensions 6 to max; // For third party extensions +} diff --git a/lib-ee/emqx_ee_schema_registry/rebar.config b/lib-ee/emqx_ee_schema_registry/rebar.config index e42ff7278..309d9cdf8 100644 --- a/lib-ee/emqx_ee_schema_registry/rebar.config +++ b/lib-ee/emqx_ee_schema_registry/rebar.config @@ -4,6 +4,7 @@ {deps, [ {emqx, {path, "../../apps/emqx"}}, {emqx_utils, {path, "../../apps/emqx_utils"}}, + {emqx_rule_engine, {path, "../../apps/emqx_rule_engine"}}, {erlavro, {git, "https://github.com/klarna/erlavro.git", {tag, "2.9.8"}}}, {gpb, "4.19.7"} ]}. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src index b71ed01e5..3dda869ba 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src @@ -1,8 +1,11 @@ {application, emqx_ee_schema_registry, [ {description, "EMQX Schema Registry"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, [emqx_ee_schema_registry_sup]}, {mod, {emqx_ee_schema_registry_app, []}}, + {included_applications, [ + emqx_rule_engine + ]}, {applications, [ kernel, stdlib, diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl index 90127e629..31153805c 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -232,11 +232,46 @@ create_tables() -> ok. do_build_serdes(Schemas) -> + %% We build a special serde for the Sparkplug B payload. This serde is used + %% by the rule engine functions sparkplug_decode/1 and sparkplug_encode/1. + ok = maybe_build_sparkplug_b_serde(), %% TODO: use some kind of mutex to make each core build a %% different serde to avoid duplicate work. Maybe ekka_locker? maps:foreach(fun do_build_serde/2, Schemas), ?tp(schema_registry_serdes_built, #{}). +maybe_build_sparkplug_b_serde() -> + case get_schema(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) of + {error, not_found} -> + do_build_serde( + ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, + #{ + type => protobuf, + source => get_schema_source(<<"sparkplug_b.proto">>) + } + ); + {ok, _} -> + ok + end. + +get_schema_source(Filename) -> + {ok, App} = application:get_application(), + FilePath = + case code:priv_dir(App) of + {error, bad_name} -> + erlang:error( + {error, <<"Could not find data directory (priv) for Schema Registry">>} + ); + Dir -> + filename:join(Dir, Filename) + end, + case file:read_file(FilePath) of + {ok, Content} -> + Content; + {error, Reason} -> + erlang:error({error, Reason}) + end. + build_serdes(Serdes) -> build_serdes(Serdes, []). @@ -251,9 +286,10 @@ build_serdes([{Name, Params} | Rest], Acc0) -> build_serdes([], _Acc) -> ok. -do_build_serde(Name0, #{type := Type, source := Source}) -> +do_build_serde(Name, Serde) when not is_binary(Name) -> + do_build_serde(to_bin(Name), Serde); +do_build_serde(Name, #{type := Type, source := Source}) -> try - Name = to_bin(Name0), {Serializer, Deserializer, Destructor} = emqx_ee_schema_registry_serde:make_serde(Type, Name, Source), Serde = #serde{ @@ -270,7 +306,7 @@ do_build_serde(Name0, #{type := Type, source := Source}) -> error, #{ msg => "error_building_serde", - name => Name0, + name => Name, type => Type, kind => Kind, error => Error, @@ -280,11 +316,13 @@ do_build_serde(Name0, #{type := Type, source := Source}) -> {error, Error} end. +ensure_serde_absent(Name) when not is_binary(Name) -> + ensure_serde_absent(to_bin(Name)); ensure_serde_absent(Name) -> case get_serde(Name) of {ok, #{destructor := Destructor}} -> Destructor(), - ok = mria:dirty_delete(?SERDE_TAB, to_bin(Name)); + ok = mria:dirty_delete(?SERDE_TAB, Name); {error, not_found} -> ok end. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl index 85d35be1f..f9cd5810e 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_app.erl @@ -10,6 +10,9 @@ -export([start/2, stop/1]). start(_StartType, _StartArgs) -> + %% Register rule engine extra functions module so that we can handle decode + %% and encode functions called from the rule engine SQL like language + ok = emqx_rule_engine:set_extra_functions_module(emqx_ee_schema_registry_serde), ok = mria_rlog:wait_for_shards([?SCHEMA_REGISTRY_SHARD], infinity), %% HTTP API handler emqx_conf:add_handler([?CONF_KEY_ROOT, schemas, '?'], emqx_ee_schema_registry), diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl index 237ec706f..d641cf593 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_schema.erl @@ -42,7 +42,8 @@ fields(?CONF_KEY_ROOT) -> ), #{ default => #{}, - desc => ?DESC("schema_registry_schemas") + desc => ?DESC("schema_registry_schemas"), + validator => fun validate_name/1 } )} ]; @@ -89,6 +90,15 @@ union_member_selector_get_api(all_union_members) -> union_member_selector_get_api({value, V}) -> refs_get_api(V). +validate_name(NameSchemaMap) -> + case maps:is_key(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, NameSchemaMap) of + true -> + {error, + <<"Illegal schema name ", ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME/binary>>}; + false -> + ok + end. + %%------------------------------------------------------------------------------ %% `minirest_trails' "APIs" %%------------------------------------------------------------------------------ diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl index c65574032..15a254e8f 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl @@ -3,6 +3,8 @@ %%-------------------------------------------------------------------- -module(emqx_ee_schema_registry_serde). +-behaviour(emqx_rule_funcs). + -include("emqx_ee_schema_registry.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -15,13 +17,50 @@ decode/3, encode/2, encode/3, - make_serde/3 + make_serde/3, + handle_rule_function/2 ]). %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ +-spec handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}. +handle_rule_function(sparkplug_decode, [Data]) -> + handle_rule_function( + schema_decode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>] + ); +handle_rule_function(sparkplug_decode, [Data | MoreArgs]) -> + handle_rule_function( + schema_decode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs] + ); +handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) -> + decode(SchemaId, Data, MoreArgs); +handle_rule_function(schema_decode, Args) -> + error({args_count_error, {schema_decode, Args}}); +handle_rule_function(sparkplug_encode, [Term]) -> + handle_rule_function( + schema_encode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>] + ); +handle_rule_function(sparkplug_encode, [Term | MoreArgs]) -> + handle_rule_function( + schema_encode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] + ); +handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) -> + %% encode outputs iolists, but when the rule actions process those + %% it might wrongly encode them as JSON lists, so we force them to + %% binaries here. + IOList = encode(SchemaId, Term, MoreArgs), + iolist_to_binary(IOList); +handle_rule_function(schema_encode, Args) -> + error({args_count_error, {schema_encode, Args}}); +handle_rule_function(_, _) -> + {error, no_match_for_function}. + -spec decode(schema_name(), encoded_data()) -> decoded_data(). decode(SerdeName, RawData) -> decode(SerdeName, RawData, []). @@ -96,7 +135,7 @@ inject_avro_name(Name, Source0) -> -spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module(). make_protobuf_serde_mod(Name, Source) -> {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name), - case lazy_generate_protobuf_code(SerdeMod0, Source) of + case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of {ok, SerdeMod, ModBinary} -> load_code(SerdeMod, SerdeModFileName, ModBinary), SerdeMod; @@ -121,30 +160,30 @@ protobuf_serde_mod_name(Name) -> SerdeModFileName = SerdeModName ++ ".memory", {SerdeMod, SerdeModFileName}. --spec lazy_generate_protobuf_code(module(), schema_source()) -> +-spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) -> {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. -lazy_generate_protobuf_code(SerdeMod0, Source) -> +lazy_generate_protobuf_code(Name, SerdeMod0, Source) -> %% We run this inside a transaction with locks to avoid running %% the compile on all nodes; only one will get the lock, compile %% the schema, and other nodes will simply read the final result. {atomic, Res} = mria:transaction( ?SCHEMA_REGISTRY_SHARD, - fun lazy_generate_protobuf_code_trans/2, - [SerdeMod0, Source] + fun lazy_generate_protobuf_code_trans/3, + [Name, SerdeMod0, Source] ), Res. --spec lazy_generate_protobuf_code_trans(module(), schema_source()) -> +-spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) -> {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. -lazy_generate_protobuf_code_trans(SerdeMod0, Source) -> +lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) -> Fingerprint = erlang:md5(Source), _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write), case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] -> - ?tp(schema_registry_protobuf_cache_hit, #{}), + ?tp(schema_registry_protobuf_cache_hit, #{name => Name}), {ok, SerdeMod, ModBinary}; [] -> - ?tp(schema_registry_protobuf_cache_miss, #{}), + ?tp(schema_registry_protobuf_cache_miss, #{name => Name}), case generate_protobuf_code(SerdeMod0, Source) of {ok, SerdeMod, ModBinary} -> CacheEntry = #protobuf_cache{ diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 9167fed9e..484b89996 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -21,13 +21,16 @@ %%------------------------------------------------------------------------------ all() -> - [{group, avro}, {group, protobuf}]. + [ + {group, avro}, + {group, protobuf} + ] ++ sparkplug_tests(). groups() -> - AllTCs = emqx_common_test_helpers:all(?MODULE), + AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(), ProtobufOnlyTCs = protobuf_only_tcs(), - TCs = AllTCs -- ProtobufOnlyTCs, - [{avro, TCs}, {protobuf, AllTCs}]. + TCs = AllTCsExceptSP -- ProtobufOnlyTCs, + [{avro, TCs}, {protobuf, AllTCsExceptSP}]. protobuf_only_tcs() -> [ @@ -35,6 +38,13 @@ protobuf_only_tcs() -> t_protobuf_union_decode ]. +sparkplug_tests() -> + [ + t_sparkplug_decode, + t_sparkplug_encode, + t_sparkplug_decode_encode_with_message_name + ]. + init_per_suite(Config) -> emqx_config:save_schema_mod_and_names(emqx_ee_schema_registry_schema), emqx_mgmt_api_test_util:init_suite(?APPS), @@ -411,13 +421,18 @@ serde_deletion_calls_destructor_spec(#{serde_type := SerdeType}, Trace) -> protobuf_unique_cache_hit_spec(#{serde_type := protobuf} = Res, Trace) -> #{nodes := Nodes} = Res, - CacheEvents = ?of_kind( + CacheEvents0 = ?of_kind( [ schema_registry_protobuf_cache_hit, schema_registry_protobuf_cache_miss ], Trace ), + CacheEvents = [ + Event + || Event <- CacheEvents0, + maps:get(name, Event, no_name) =/= ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME + ], ?assertMatch( [ schema_registry_protobuf_cache_hit, @@ -731,3 +746,106 @@ t_import_config(_Config) -> {ok, #{root_key => schema_registry, changed => [Path]}}, emqx_ee_schema_registry:import_config(RawConf1) ). + +sparkplug_example_data_base64() -> + <<"CPHh67HrMBIqChxjb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xc2VjGPXh67HrMCACUKgDEikKHGNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxXzVzZWMY9eHrseswIAJQVBIqCh1jb3VudGVyX2dyb3VwMS9jb3VudGVyMV8xMHNlYxj14eux6zAgAlAqEigKG2NvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3J1bhj14eux6zAgBVABEioKHWNvdW50ZXJfZ3JvdXAxL2NvdW50ZXIxX3Jlc2V0GPXh67HrMCAFUAAYWA==">>. + +sparkplug_example_data() -> + #{ + <<"metrics">> => + [ + #{ + <<"datatype">> => 2, + <<"int_value">> => 424, + <<"name">> => <<"counter_group1/counter1_1sec">>, + <<"timestamp">> => 1678094561525 + }, + #{ + <<"datatype">> => 2, + <<"int_value">> => 84, + <<"name">> => <<"counter_group1/counter1_5sec">>, + <<"timestamp">> => 1678094561525 + }, + #{ + <<"datatype">> => 2, + <<"int_value">> => 42, + <<"name">> => <<"counter_group1/counter1_10sec">>, + <<"timestamp">> => 1678094561525 + }, + #{ + <<"datatype">> => 5, + <<"int_value">> => 1, + <<"name">> => <<"counter_group1/counter1_run">>, + <<"timestamp">> => 1678094561525 + }, + #{ + <<"datatype">> => 5, + <<"int_value">> => 0, + <<"name">> => <<"counter_group1/counter1_reset">>, + <<"timestamp">> => 1678094561525 + } + ], + <<"seq">> => 88, + <<"timestamp">> => 1678094561521 + }. + +wait_for_sparkplug_schema_registered() -> + ?retry( + 100, + 100, + [_] = ets:lookup(?SERDE_TAB, ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) + ). + +t_sparkplug_decode(_Config) -> + SQL = + << + "select\n" + " sparkplug_decode(payload) as decoded\n" + "from t\n" + >>, + PayloadBase64 = sparkplug_example_data_base64(), + {ok, _} = create_rule_http(#{sql => SQL}), + PayloadBin = base64:decode(PayloadBase64), + ExpectedRuleOutput = + #{<<"decoded">> => sparkplug_example_data()}, + wait_for_sparkplug_schema_registered(), + emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), + Res = receive_action_results(), + ?assertMatch(#{data := ExpectedRuleOutput}, Res), + ok. + +t_sparkplug_encode(_Config) -> + %% Default message name field is 'Payload' + SQL = + << + "select\n" + " sparkplug_encode(json_decode(payload)) as encoded\n" + "from t\n" + >>, + PayloadJSONBin = emqx_utils_json:encode(sparkplug_example_data()), + {ok, _} = create_rule_http(#{sql => SQL}), + ExpectedRuleOutput = + #{<<"encoded">> => base64:decode(sparkplug_example_data_base64())}, + wait_for_sparkplug_schema_registered(), + emqx:publish(emqx_message:make(<<"t">>, PayloadJSONBin)), + Res = receive_action_results(), + ?assertMatch(#{data := ExpectedRuleOutput}, Res), + ok. + +t_sparkplug_decode_encode_with_message_name(_Config) -> + SQL = + << + "select\n" + " sparkplug_encode(sparkplug_decode(payload, 'Payload'), 'Payload') as encoded\n" + "from t\n" + >>, + PayloadBase64 = sparkplug_example_data_base64(), + PayloadBin = base64:decode(PayloadBase64), + {ok, _} = create_rule_http(#{sql => SQL}), + ExpectedRuleOutput = + #{<<"encoded">> => PayloadBin}, + wait_for_sparkplug_schema_registered(), + emqx:publish(emqx_message:make(<<"t">>, PayloadBin)), + Res = receive_action_results(), + ?assertMatch(#{data := ExpectedRuleOutput}, Res), + ok. diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl index ee6a693db..f3fdcd52f 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_http_api_SUITE.erl @@ -12,6 +12,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include("emqx_ee_schema_registry.hrl"). + -define(APPS, [emqx_conf, emqx_ee_schema_registry]). %%------------------------------------------------------------------------------ @@ -182,7 +184,6 @@ t_crud(Config) -> }}, request({delete, SchemaName}) ), - %% create a schema ?assertMatch( {ok, 201, #{ @@ -193,6 +194,18 @@ t_crud(Config) -> }}, request({post, Params}) ), + %% Test that we can't create a schema with the special Sparkplug B name + %% (the special Sparkplug B name contains a random sequence of chars so + %% should be very unlikely that users try to do this) + ParmsWithForbiddenName = maps:put( + <<"name">>, ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Params + ), + ?assertMatch( + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">> + }}, + request({post, ParmsWithForbiddenName}) + ), ?assertMatch( {ok, 200, #{ <<"type">> := SerdeTypeBin,