From 039e27a153422028e3d0e7d517a521a84787d4a8 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 29 Jun 2023 12:36:07 +0200 Subject: [PATCH] feat: add Sparkplug encode and decode functions to the rule engine Fixes: https://emqx.atlassian.net/browse/EMQX-10429 --- apps/emqx_rule_engine/src/emqx_rule_funcs.erl | 25 ++ .../include/emqx_ee_schema_registry.hrl | 3 + lib-ee/emqx_ee_schema_registry/priv/LICENCE | 277 ++++++++++++++++++ .../priv/sparkplug_b.proto | 229 +++++++++++++++ .../src/emqx_ee_schema_registry.app.src | 2 +- .../src/emqx_ee_schema_registry.erl | 61 +++- .../src/emqx_ee_schema_registry_serde.erl | 18 +- 7 files changed, 601 insertions(+), 14 deletions(-) create mode 100644 lib-ee/emqx_ee_schema_registry/priv/LICENCE create mode 100644 lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index de9bf0485..475880ed1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -19,6 +19,10 @@ -include("rule_engine.hrl"). -include_lib("emqx/include/logger.hrl"). +-if(?EMQX_RELEASE_EDITION == ee). +-include_lib("emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl"). +-endif. + -elvis([{elvis_style, god_modules, disable}]). %% IoT Funcs @@ -1128,10 +1132,31 @@ timezone_to_offset_seconds(TimeZone) -> %% @end -if(?EMQX_RELEASE_EDITION == ee). %% EE + +'$handle_undefined_function'(sparkplug_decode, [Data]) -> + '$handle_undefined_function'( + schema_decode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data, <<"Payload">>] + ); +'$handle_undefined_function'(sparkplug_decode, [Data | MoreArgs]) -> + '$handle_undefined_function'( + schema_decode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs] + ); '$handle_undefined_function'(schema_decode, [SchemaId, Data | MoreArgs]) -> emqx_ee_schema_registry_serde:decode(SchemaId, Data, MoreArgs); '$handle_undefined_function'(schema_decode, Args) -> error({args_count_error, {schema_decode, Args}}); +'$handle_undefined_function'(sparkplug_encode, [Term]) -> + '$handle_undefined_function'( + schema_encode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term, <<"Payload">>] + ); +'$handle_undefined_function'(sparkplug_encode, [Term | MoreArgs]) -> + '$handle_undefined_function'( + schema_encode, + [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs] + ); '$handle_undefined_function'(schema_encode, [SchemaId, Term | MoreArgs]) -> %% encode outputs iolists, but when the rule actions process those %% it might wrongly encode them as JSON lists, so we force them to diff --git a/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl b/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl index 058abf007..86a3a55e0 100644 --- a/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl +++ b/lib-ee/emqx_ee_schema_registry/include/emqx_ee_schema_registry.hrl @@ -12,6 +12,9 @@ -define(SERDE_TAB, emqx_ee_schema_registry_serde_tab). -define(PROTOBUF_CACHE_TAB, emqx_ee_schema_registry_protobuf_cache_tab). +-define(EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, + <<"__CiYAWBja87PleCyKZ58h__SparkPlug_B_BUILT-IN">> +). -type schema_name() :: binary(). -type schema_source() :: binary(). diff --git a/lib-ee/emqx_ee_schema_registry/priv/LICENCE b/lib-ee/emqx_ee_schema_registry/priv/LICENCE new file mode 100644 index 000000000..d3087e4c5 --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/priv/LICENCE @@ -0,0 +1,277 @@ +Eclipse Public License - v 2.0 + + THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE + PUBLIC LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION + OF THE PROGRAM CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT. + +1. DEFINITIONS + +"Contribution" means: + + a) in the case of the initial Contributor, the initial content + Distributed under this Agreement, and + + b) in the case of each subsequent Contributor: + i) changes to the Program, and + ii) additions to the Program; + where such changes and/or additions to the Program originate from + and are Distributed by that particular Contributor. A Contribution + "originates" from a Contributor if it was added to the Program by + such Contributor itself or anyone acting on such Contributor's behalf. + Contributions do not include changes or additions to the Program that + are not Modified Works. + +"Contributor" means any person or entity that Distributes the Program. + +"Licensed Patents" mean patent claims licensable by a Contributor which +are necessarily infringed by the use or sale of its Contribution alone +or when combined with the Program. + +"Program" means the Contributions Distributed in accordance with this +Agreement. + +"Recipient" means anyone who receives the Program under this Agreement +or any Secondary License (as applicable), including Contributors. + +"Derivative Works" shall mean any work, whether in Source Code or other +form, that is based on (or derived from) the Program and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. + +"Modified Works" shall mean any work in Source Code or other form that +results from an addition to, deletion from, or modification of the +contents of the Program, including, for purposes of clarity any new file +in Source Code form that contains any contents of the Program. Modified +Works shall not include works that contain only declarations, +interfaces, types, classes, structures, or files of the Program solely +in each case in order to link to, bind by name, or subclass the Program +or Modified Works thereof. + +"Distribute" means the acts of a) distributing or b) making available +in any manner that enables the transfer of a copy. + +"Source Code" means the form of a Program preferred for making +modifications, including but not limited to software source code, +documentation source, and configuration files. + +"Secondary License" means either the GNU General Public License, +Version 2.0, or any later versions of that license, including any +exceptions or additional permissions as identified by the initial +Contributor. + +2. GRANT OF RIGHTS + + a) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free copyright + license to reproduce, prepare Derivative Works of, publicly display, + publicly perform, Distribute and sublicense the Contribution of such + Contributor, if any, and such Derivative Works. + + b) Subject to the terms of this Agreement, each Contributor hereby + grants Recipient a non-exclusive, worldwide, royalty-free patent + license under Licensed Patents to make, use, sell, offer to sell, + import and otherwise transfer the Contribution of such Contributor, + if any, in Source Code or other form. This patent license shall + apply to the combination of the Contribution and the Program if, at + the time the Contribution is added by the Contributor, such addition + of the Contribution causes such combination to be covered by the + Licensed Patents. The patent license shall not apply to any other + combinations which include the Contribution. No hardware per se is + licensed hereunder. + + c) Recipient understands that although each Contributor grants the + licenses to its Contributions set forth herein, no assurances are + provided by any Contributor that the Program does not infringe the + patent or other intellectual property rights of any other entity. + Each Contributor disclaims any liability to Recipient for claims + brought by any other entity based on infringement of intellectual + property rights or otherwise. As a condition to exercising the + rights and licenses granted hereunder, each Recipient hereby + assumes sole responsibility to secure any other intellectual + property rights needed, if any. For example, if a third party + patent license is required to allow Recipient to Distribute the + Program, it is Recipient's responsibility to acquire that license + before distributing the Program. + + d) Each Contributor represents that to its knowledge it has + sufficient copyright rights in its Contribution, if any, to grant + the copyright license set forth in this Agreement. + + e) Notwithstanding the terms of any Secondary License, no + Contributor makes additional grants to any Recipient (other than + those set forth in this Agreement) as a result of such Recipient's + receipt of the Program under the terms of a Secondary License + (if permitted under the terms of Section 3). + +3. REQUIREMENTS + +3.1 If a Contributor Distributes the Program in any form, then: + + a) the Program must also be made available as Source Code, in + accordance with section 3.2, and the Contributor must accompany + the Program with a statement that the Source Code for the Program + is available under this Agreement, and informs Recipients how to + obtain it in a reasonable manner on or through a medium customarily + used for software exchange; and + + b) the Contributor may Distribute the Program under a license + different than this Agreement, provided that such license: + i) effectively disclaims on behalf of all other Contributors all + warranties and conditions, express and implied, including + warranties or conditions of title and non-infringement, and + implied warranties or conditions of merchantability and fitness + for a particular purpose; + + ii) effectively excludes on behalf of all other Contributors all + liability for damages, including direct, indirect, special, + incidental and consequential damages, such as lost profits; + + iii) does not attempt to limit or alter the recipients' rights + in the Source Code under section 3.2; and + + iv) requires any subsequent distribution of the Program by any + party to be under a license that satisfies the requirements + of this section 3. + +3.2 When the Program is Distributed as Source Code: + + a) it must be made available under this Agreement, or if the + Program (i) is combined with other material in a separate file or + files made available under a Secondary License, and (ii) the initial + Contributor attached to the Source Code the notice described in + Exhibit A of this Agreement, then the Program may be made available + under the terms of such Secondary Licenses, and + + b) a copy of this Agreement must be included with each copy of + the Program. + +3.3 Contributors may not remove or alter any copyright, patent, +trademark, attribution notices, disclaimers of warranty, or limitations +of liability ("notices") contained within the Program from any copy of +the Program which they Distribute, provided that Contributors may add +their own appropriate notices. + +4. COMMERCIAL DISTRIBUTION + +Commercial distributors of software may accept certain responsibilities +with respect to end users, business partners and the like. While this +license is intended to facilitate the commercial use of the Program, +the Contributor who includes the Program in a commercial product +offering should do so in a manner which does not create potential +liability for other Contributors. Therefore, if a Contributor includes +the Program in a commercial product offering, such Contributor +("Commercial Contributor") hereby agrees to defend and indemnify every +other Contributor ("Indemnified Contributor") against any losses, +damages and costs (collectively "Losses") arising from claims, lawsuits +and other legal actions brought by a third party against the Indemnified +Contributor to the extent caused by the acts or omissions of such +Commercial Contributor in connection with its distribution of the Program +in a commercial product offering. The obligations in this section do not +apply to any claims or Losses relating to any actual or alleged +intellectual property infringement. In order to qualify, an Indemnified +Contributor must: a) promptly notify the Commercial Contributor in +writing of such claim, and b) allow the Commercial Contributor to control, +and cooperate with the Commercial Contributor in, the defense and any +related settlement negotiations. The Indemnified Contributor may +participate in any such claim at its own expense. + +For example, a Contributor might include the Program in a commercial +product offering, Product X. That Contributor is then a Commercial +Contributor. If that Commercial Contributor then makes performance +claims, or offers warranties related to Product X, those performance +claims and warranties are such Commercial Contributor's responsibility +alone. Under this section, the Commercial Contributor would have to +defend claims against the other Contributors related to those performance +claims and warranties, and if a court requires any other Contributor to +pay any damages as a result, the Commercial Contributor must pay +those damages. + +5. NO WARRANTY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, THE PROGRAM IS PROVIDED ON AN "AS IS" +BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR +IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF +TITLE, NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR +PURPOSE. Each Recipient is solely responsible for determining the +appropriateness of using and distributing the Program and assumes all +risks associated with its exercise of rights under this Agreement, +including but not limited to the risks and costs of program errors, +compliance with applicable laws, damage to or loss of data, programs +or equipment, and unavailability or interruption of operations. + +6. DISCLAIMER OF LIABILITY + +EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, AND TO THE EXTENT +PERMITTED BY APPLICABLE LAW, NEITHER RECIPIENT NOR ANY CONTRIBUTORS +SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST +PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE +EXERCISE OF ANY RIGHTS GRANTED HEREUNDER, EVEN IF ADVISED OF THE +POSSIBILITY OF SUCH DAMAGES. + +7. GENERAL + +If any provision of this Agreement is invalid or unenforceable under +applicable law, it shall not affect the validity or enforceability of +the remainder of the terms of this Agreement, and without further +action by the parties hereto, such provision shall be reformed to the +minimum extent necessary to make such provision valid and enforceable. + +If Recipient institutes patent litigation against any entity +(including a cross-claim or counterclaim in a lawsuit) alleging that the +Program itself (excluding combinations of the Program with other software +or hardware) infringes such Recipient's patent(s), then such Recipient's +rights granted under Section 2(b) shall terminate as of the date such +litigation is filed. + +All Recipient's rights under this Agreement shall terminate if it +fails to comply with any of the material terms or conditions of this +Agreement and does not cure such failure in a reasonable period of +time after becoming aware of such noncompliance. If all Recipient's +rights under this Agreement terminate, Recipient agrees to cease use +and distribution of the Program as soon as reasonably practicable. +However, Recipient's obligations under this Agreement and any licenses +granted by Recipient relating to the Program shall continue and survive. + +Everyone is permitted to copy and distribute copies of this Agreement, +but in order to avoid inconsistency the Agreement is copyrighted and +may only be modified in the following manner. The Agreement Steward +reserves the right to publish new versions (including revisions) of +this Agreement from time to time. No one other than the Agreement +Steward has the right to modify this Agreement. The Eclipse Foundation +is the initial Agreement Steward. The Eclipse Foundation may assign the +responsibility to serve as the Agreement Steward to a suitable separate +entity. Each new version of the Agreement will be given a distinguishing +version number. The Program (including Contributions) may always be +Distributed subject to the version of the Agreement under which it was +received. In addition, after a new version of the Agreement is published, +Contributor may elect to Distribute the Program (including its +Contributions) under the new version. + +Except as expressly stated in Sections 2(a) and 2(b) above, Recipient +receives no rights or licenses to the intellectual property of any +Contributor under this Agreement, whether expressly, by implication, +estoppel or otherwise. All rights in the Program not expressly granted +under this Agreement are reserved. Nothing in this Agreement is intended +to be enforceable by any entity that is not a Contributor or Recipient. +No third-party beneficiary rights are created under this Agreement. + +Exhibit A - Form of Secondary Licenses Notice + +"This Source Code may also be made available under the following +Secondary Licenses when the conditions for such availability set forth +in the Eclipse Public License, v. 2.0 are satisfied: {name license(s), +version(s), and exceptions or additional permissions here}." + + Simply including a copy of this Agreement, including this Exhibit A + is not sufficient to license the Source Code under Secondary Licenses. + + If it is not possible or desirable to put the notice in a particular + file, then You may include the notice in a location (such as a LICENSE + file in a relevant directory) where a recipient would be likely to + look for such a notice. + + You may add additional accurate notices of copyright ownership. diff --git a/lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto b/lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto new file mode 100644 index 000000000..914ce726f --- /dev/null +++ b/lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto @@ -0,0 +1,229 @@ +// Downloaded from: https://github.com/eclipse/tahu/blob/46f25e79f34234e6145d11108660dfd9133ae50d/sparkplug_b/sparkplug_b.proto +// +// License for this file is located in the same directory as this file. +// +// * Copyright (c) 2015, 2018 Cirrus Link Solutions and others +// * +// * This program and the accompanying materials are made available under the +// * terms of the Eclipse Public License 2.0 which is available at +// * http://www.eclipse.org/legal/epl-2.0. +// * +// * SPDX-License-Identifier: EPL-2.0 +// * +// * Contributors: +// * Cirrus Link Solutions - initial implementation + +// +// To compile: +// cd client_libraries/java +// protoc --proto_path=../../ --java_out=src/main/java ../../sparkplug_b.proto +// + + +syntax = "proto2"; + +package org.eclipse.tahu.protobuf; + +option java_package = "org.eclipse.tahu.protobuf"; +option java_outer_classname = "SparkplugBProto"; + +enum DataType { + // Indexes of Data Types + + // Unknown placeholder for future expansion. + Unknown = 0; + + // Basic Types + Int8 = 1; + Int16 = 2; + Int32 = 3; + Int64 = 4; + UInt8 = 5; + UInt16 = 6; + UInt32 = 7; + UInt64 = 8; + Float = 9; + Double = 10; + Boolean = 11; + String = 12; + DateTime = 13; + Text = 14; + + // Additional Metric Types + UUID = 15; + DataSet = 16; + Bytes = 17; + File = 18; + Template = 19; + + // Additional PropertyValue Types + PropertySet = 20; + PropertySetList = 21; + + // Array Types + Int8Array = 22; + Int16Array = 23; + Int32Array = 24; + Int64Array = 25; + UInt8Array = 26; + UInt16Array = 27; + UInt32Array = 28; + UInt64Array = 29; + FloatArray = 30; + DoubleArray = 31; + BooleanArray = 32; + StringArray = 33; + DateTimeArray = 34; +} + +message Payload { + + message Template { + + message Parameter { + optional string name = 1; + optional uint32 type = 2; + + oneof value { + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; + ParameterValueExtension extension_value = 9; + } + + message ParameterValueExtension { + extensions 1 to max; + } + } + + optional string version = 1; // The version of the Template to prevent mismatches + repeated Metric metrics = 2; // Each metric includes a name, datatype, and optionally a value + repeated Parameter parameters = 3; + optional string template_ref = 4; // MUST be a reference to a template definition if this is an instance (i.e. the name of the template definition) - MUST be omitted for template definitions + optional bool is_definition = 5; + extensions 6 to max; + } + + message DataSet { + + message DataSetValue { + + oneof value { + uint32 int_value = 1; + uint64 long_value = 2; + float float_value = 3; + double double_value = 4; + bool boolean_value = 5; + string string_value = 6; + DataSetValueExtension extension_value = 7; + } + + message DataSetValueExtension { + extensions 1 to max; + } + } + + message Row { + repeated DataSetValue elements = 1; + extensions 2 to max; // For third party extensions + } + + optional uint64 num_of_columns = 1; + repeated string columns = 2; + repeated uint32 types = 3; + repeated Row rows = 4; + extensions 5 to max; // For third party extensions + } + + message PropertyValue { + + optional uint32 type = 1; + optional bool is_null = 2; + + oneof value { + uint32 int_value = 3; + uint64 long_value = 4; + float float_value = 5; + double double_value = 6; + bool boolean_value = 7; + string string_value = 8; + PropertySet propertyset_value = 9; + PropertySetList propertysets_value = 10; // List of Property Values + PropertyValueExtension extension_value = 11; + } + + message PropertyValueExtension { + extensions 1 to max; + } + } + + message PropertySet { + repeated string keys = 1; // Names of the properties + repeated PropertyValue values = 2; + extensions 3 to max; + } + + message PropertySetList { + repeated PropertySet propertyset = 1; + extensions 2 to max; + } + + message MetaData { + // Bytes specific metadata + optional bool is_multi_part = 1; + + // General metadata + optional string content_type = 2; // Content/Media type + optional uint64 size = 3; // File size, String size, Multi-part size, etc + optional uint64 seq = 4; // Sequence number for multi-part messages + + // File metadata + optional string file_name = 5; // File name + optional string file_type = 6; // File type (i.e. xml, json, txt, cpp, etc) + optional string md5 = 7; // md5 of data + + // Catchalls and future expansion + optional string description = 8; // Could be anything such as json or xml of custom properties + extensions 9 to max; + } + + message Metric { + + optional string name = 1; // Metric name - should only be included on birth + optional uint64 alias = 2; // Metric alias - tied to name on birth and included in all later DATA messages + optional uint64 timestamp = 3; // Timestamp associated with data acquisition time + optional uint32 datatype = 4; // DataType of the metric/tag value + optional bool is_historical = 5; // If this is historical data and should not update real time tag + optional bool is_transient = 6; // Tells consuming clients such as MQTT Engine to not store this as a tag + optional bool is_null = 7; // If this is null - explicitly say so rather than using -1, false, etc for some datatypes. + optional MetaData metadata = 8; // Metadata for the payload + optional PropertySet properties = 9; + + oneof value { + uint32 int_value = 10; + uint64 long_value = 11; + float float_value = 12; + double double_value = 13; + bool boolean_value = 14; + string string_value = 15; + bytes bytes_value = 16; // Bytes, File + DataSet dataset_value = 17; + Template template_value = 18; + MetricValueExtension extension_value = 19; + } + + message MetricValueExtension { + extensions 1 to max; + } + } + + optional uint64 timestamp = 1; // Timestamp at message sending time + repeated Metric metrics = 2; // Repeated forever - no limit in Google Protobufs + optional uint64 seq = 3; // Sequence number + optional string uuid = 4; // UUID to track message type in terms of schema definitions + optional bytes body = 5; // To optionally bypass the whole definition above + extensions 6 to max; // For third party extensions +} diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src index b71ed01e5..c0073c5c2 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_schema_registry, [ {description, "EMQX Schema Registry"}, - {vsn, "0.1.4"}, + {vsn, "0.1.5"}, {registered, [emqx_ee_schema_registry_sup]}, {mod, {emqx_ee_schema_registry_app, []}}, {applications, [ diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl index 90127e629..1d35d8ffa 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry.erl @@ -38,6 +38,11 @@ import_config/1 ]). +%% For testing +-export([ + ensure_serde_absent/1 +]). + -type schema() :: #{ type := serde_type(), source := binary(), @@ -232,11 +237,46 @@ create_tables() -> ok. do_build_serdes(Schemas) -> + %% We build a special serde for the Sparkplug B payload. This serde is used + %% by the rule engine functions sparkplug_decode/1 and sparkplug_encode/1. + maybe_build_sparkplug_b_serde(), %% TODO: use some kind of mutex to make each core build a %% different serde to avoid duplicate work. Maybe ekka_locker? maps:foreach(fun do_build_serde/2, Schemas), ?tp(schema_registry_serdes_built, #{}). +maybe_build_sparkplug_b_serde() -> + case get_schema(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) of + {error, not_found} -> + do_build_serde_no_check( + ?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, + #{ + type => protobuf, + source => get_schema_source(<<"sparkplug_b.proto">>) + } + ); + {ok, _} -> + ok + end. + +get_schema_source(Filename) -> + {ok, App} = application:get_application(), + FilePath = + case code:priv_dir(App) of + {error, bad_name} -> + erlang:error( + {error, <<"Could not find data directory (priv) for Schema Registry">>} + ); + Dir -> + filename:join(Dir, Filename) + end, + case file:read_file(FilePath) of + {ok, Content} -> + Content; + {error, Reason} -> + erlang:error({error, Reason}) + end. + build_serdes(Serdes) -> build_serdes(Serdes, []). @@ -251,9 +291,18 @@ build_serdes([{Name, Params} | Rest], Acc0) -> build_serdes([], _Acc) -> ok. -do_build_serde(Name0, #{type := Type, source := Source}) -> +do_build_serde(Name, Serde) when not is_binary(Name) -> + do_build_serde(to_bin(Name), Serde); +do_build_serde(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, _Serde) -> + {error, + erlang:iolist_to_binary( + io_lib:format("Illigal schema name ~s", [?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME]) + )}; +do_build_serde(Name, Serde) -> + do_build_serde_no_check(Name, Serde). + +do_build_serde_no_check(Name, #{type := Type, source := Source}) -> try - Name = to_bin(Name0), {Serializer, Deserializer, Destructor} = emqx_ee_schema_registry_serde:make_serde(Type, Name, Source), Serde = #serde{ @@ -270,7 +319,7 @@ do_build_serde(Name0, #{type := Type, source := Source}) -> error, #{ msg => "error_building_serde", - name => Name0, + name => Name, type => Type, kind => Kind, error => Error, @@ -280,11 +329,15 @@ do_build_serde(Name0, #{type := Type, source := Source}) -> {error, Error} end. +ensure_serde_absent(Name) when not is_binary(Name) -> + ensure_serde_absent(to_bin(Name)); +% ensure_serde_absent(?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME) -> +% {error, <<"Cannot delete serde for Sparkplug B schema">>}; ensure_serde_absent(Name) -> case get_serde(Name) of {ok, #{destructor := Destructor}} -> Destructor(), - ok = mria:dirty_delete(?SERDE_TAB, to_bin(Name)); + ok = mria:dirty_delete(?SERDE_TAB, Name); {error, not_found} -> ok end. diff --git a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl index c65574032..fa3b66c22 100644 --- a/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl +++ b/lib-ee/emqx_ee_schema_registry/src/emqx_ee_schema_registry_serde.erl @@ -96,7 +96,7 @@ inject_avro_name(Name, Source0) -> -spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module(). make_protobuf_serde_mod(Name, Source) -> {SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name), - case lazy_generate_protobuf_code(SerdeMod0, Source) of + case lazy_generate_protobuf_code(Name, SerdeMod0, Source) of {ok, SerdeMod, ModBinary} -> load_code(SerdeMod, SerdeModFileName, ModBinary), SerdeMod; @@ -121,30 +121,30 @@ protobuf_serde_mod_name(Name) -> SerdeModFileName = SerdeModName ++ ".memory", {SerdeMod, SerdeModFileName}. --spec lazy_generate_protobuf_code(module(), schema_source()) -> +-spec lazy_generate_protobuf_code(schema_name(), module(), schema_source()) -> {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. -lazy_generate_protobuf_code(SerdeMod0, Source) -> +lazy_generate_protobuf_code(Name, SerdeMod0, Source) -> %% We run this inside a transaction with locks to avoid running %% the compile on all nodes; only one will get the lock, compile %% the schema, and other nodes will simply read the final result. {atomic, Res} = mria:transaction( ?SCHEMA_REGISTRY_SHARD, - fun lazy_generate_protobuf_code_trans/2, - [SerdeMod0, Source] + fun lazy_generate_protobuf_code_trans/3, + [Name, SerdeMod0, Source] ), Res. --spec lazy_generate_protobuf_code_trans(module(), schema_source()) -> +-spec lazy_generate_protobuf_code_trans(schema_name(), module(), schema_source()) -> {ok, module(), binary()} | {error, #{error := term(), warnings := [term()]}}. -lazy_generate_protobuf_code_trans(SerdeMod0, Source) -> +lazy_generate_protobuf_code_trans(Name, SerdeMod0, Source) -> Fingerprint = erlang:md5(Source), _ = mnesia:lock({record, ?PROTOBUF_CACHE_TAB, Fingerprint}, write), case mnesia:read(?PROTOBUF_CACHE_TAB, Fingerprint) of [#protobuf_cache{module = SerdeMod, module_binary = ModBinary}] -> - ?tp(schema_registry_protobuf_cache_hit, #{}), + ?tp(schema_registry_protobuf_cache_hit, #{name => Name}), {ok, SerdeMod, ModBinary}; [] -> - ?tp(schema_registry_protobuf_cache_miss, #{}), + ?tp(schema_registry_protobuf_cache_miss, #{name => Name}), case generate_protobuf_code(SerdeMod0, Source) of {ok, SerdeMod, ModBinary} -> CacheEntry = #protobuf_cache{