Merge remote-tracking branch 'origin/master' into release-56
This commit is contained in:
commit
6a2731f2da
|
@ -112,8 +112,8 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
profile:
|
||||
- ${{ inputs.profile }}
|
||||
- ${{ inputs.profile }}-elixir
|
||||
- ["${{ inputs.profile }}", "${{ inputs.profile == 'emqx' && 'docker.io,public.ecr.aws' || 'docker.io' }}"]
|
||||
- ["${{ inputs.profile }}-elixir", "${{ inputs.profile == 'emqx' && 'docker.io,public.ecr.aws' || 'docker.io' }}"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
|
||||
|
@ -121,7 +121,7 @@ jobs:
|
|||
ref: ${{ github.event.inputs.ref }}
|
||||
- uses: actions/download-artifact@eaceaf801fd36c7dee90939fad912460b18a1ffe # v4.1.2
|
||||
with:
|
||||
pattern: "${{ matrix.profile }}-*.tar.gz"
|
||||
pattern: "${{ matrix.profile[0] }}-*.tar.gz"
|
||||
path: _packages
|
||||
merge-multiple: true
|
||||
|
||||
|
@ -158,8 +158,8 @@ jobs:
|
|||
|
||||
- name: Build docker image
|
||||
env:
|
||||
PROFILE: ${{ matrix.profile }}
|
||||
DOCKER_REGISTRY: 'docker.io,public.ecr.aws'
|
||||
PROFILE: ${{ matrix.profile[0] }}
|
||||
DOCKER_REGISTRY: ${{ matrix.profile[1] }}
|
||||
DOCKER_ORG: ${{ github.repository_owner }}
|
||||
DOCKER_LATEST: ${{ inputs.latest }}
|
||||
DOCKER_PUSH: false
|
||||
|
|
|
@ -497,7 +497,7 @@ do_t_session_expiration(_Config, Opts) ->
|
|||
ok.
|
||||
|
||||
t_session_gc(Config) ->
|
||||
[Node1, Node2, _Node3] = Nodes = ?config(nodes, Config),
|
||||
[Node1, _Node2, _Node3] = Nodes = ?config(nodes, Config),
|
||||
[
|
||||
Port1,
|
||||
Port2,
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx, [
|
||||
{id, "emqx"},
|
||||
{description, "EMQX Core"},
|
||||
{vsn, "5.1.20"},
|
||||
{vsn, "5.2.0"},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [
|
||||
|
|
|
@ -721,8 +721,8 @@ do_get_chann_conn_mod(ClientId, ChanPid) ->
|
|||
end.
|
||||
|
||||
mark_channel_connected(ChanPid) ->
|
||||
?tp(emqx_cm_connected_client_count_inc, #{chan_pid => ChanPid}),
|
||||
ets:insert_new(?CHAN_LIVE_TAB, {ChanPid, true}),
|
||||
?tp(emqx_cm_connected_client_count_inc, #{chan_pid => ChanPid}),
|
||||
ok.
|
||||
|
||||
mark_channel_disconnected(ChanPid) ->
|
||||
|
|
|
@ -497,6 +497,15 @@ fill_defaults(RawConf, Opts) ->
|
|||
).
|
||||
|
||||
-spec fill_defaults(module(), raw_config(), hocon_tconf:opts()) -> map().
|
||||
fill_defaults(_SchemaMod, RawConf = #{<<"durable_storage">> := _}, _) ->
|
||||
%% FIXME: kludge to prevent `emqx_config' module from filling in
|
||||
%% the default values for backends and layouts. These records are
|
||||
%% inside unions, and adding default values there will add
|
||||
%% incompatible fields.
|
||||
%%
|
||||
%% Note: this function is called for each individual conf root, so
|
||||
%% this clause only affects this particular subtree.
|
||||
RawConf;
|
||||
fill_defaults(SchemaMod, RawConf, Opts0) ->
|
||||
Opts = maps:merge(#{required => false, make_serializable => true}, Opts0),
|
||||
hocon_tconf:check_plain(
|
||||
|
|
|
@ -0,0 +1,266 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Schema for EMQX_DS databases.
|
||||
-module(emqx_ds_schema).
|
||||
|
||||
%% API:
|
||||
-export([schema/0, translate_builtin/1]).
|
||||
|
||||
%% Behavior callbacks:
|
||||
-export([fields/1, desc/1, namespace/0]).
|
||||
|
||||
-include("emqx_schema.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
-include_lib("hocon/include/hocon_types.hrl").
|
||||
|
||||
%%================================================================================
|
||||
%% Type declarations
|
||||
%%================================================================================
|
||||
|
||||
%%================================================================================
|
||||
%% API
|
||||
%%================================================================================
|
||||
|
||||
translate_builtin(#{
|
||||
backend := builtin,
|
||||
n_shards := NShards,
|
||||
replication_factor := ReplFactor,
|
||||
layout := Layout
|
||||
}) ->
|
||||
Storage =
|
||||
case Layout of
|
||||
#{
|
||||
type := wildcard_optimized,
|
||||
bits_per_topic_level := BitsPerTopicLevel,
|
||||
epoch_bits := EpochBits,
|
||||
topic_index_bytes := TIBytes
|
||||
} ->
|
||||
{emqx_ds_storage_bitfield_lts, #{
|
||||
bits_per_topic_level => BitsPerTopicLevel,
|
||||
topic_index_bytes => TIBytes,
|
||||
epoch_bits => EpochBits
|
||||
}};
|
||||
#{type := reference} ->
|
||||
{emqx_ds_storage_reference, #{}}
|
||||
end,
|
||||
#{
|
||||
backend => builtin,
|
||||
n_shards => NShards,
|
||||
replication_factor => ReplFactor,
|
||||
storage => Storage
|
||||
}.
|
||||
|
||||
%%================================================================================
|
||||
%% Behavior callbacks
|
||||
%%================================================================================
|
||||
|
||||
namespace() ->
|
||||
durable_storage.
|
||||
|
||||
schema() ->
|
||||
[
|
||||
{messages,
|
||||
ds_schema(#{
|
||||
default =>
|
||||
#{
|
||||
<<"backend">> => builtin
|
||||
},
|
||||
importance => ?IMPORTANCE_MEDIUM,
|
||||
desc => ?DESC(messages)
|
||||
})}
|
||||
].
|
||||
|
||||
fields(builtin) ->
|
||||
%% Schema for the builtin backend:
|
||||
[
|
||||
{backend,
|
||||
sc(
|
||||
builtin,
|
||||
#{
|
||||
'readOnly' => true,
|
||||
default => builtin,
|
||||
importance => ?IMPORTANCE_MEDIUM,
|
||||
desc => ?DESC(builtin_backend)
|
||||
}
|
||||
)},
|
||||
{'_config_handler',
|
||||
sc(
|
||||
{module(), atom()},
|
||||
#{
|
||||
'readOnly' => true,
|
||||
default => {?MODULE, translate_builtin},
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{data_dir,
|
||||
sc(
|
||||
string(),
|
||||
#{
|
||||
mapping => "emqx_durable_storage.db_data_dir",
|
||||
required => false,
|
||||
importance => ?IMPORTANCE_MEDIUM,
|
||||
desc => ?DESC(builtin_data_dir)
|
||||
}
|
||||
)},
|
||||
{n_shards,
|
||||
sc(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 16,
|
||||
importance => ?IMPORTANCE_MEDIUM,
|
||||
desc => ?DESC(builtin_n_shards)
|
||||
}
|
||||
)},
|
||||
{replication_factor,
|
||||
sc(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 3,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{local_write_buffer,
|
||||
sc(
|
||||
ref(builtin_local_write_buffer),
|
||||
#{
|
||||
importance => ?IMPORTANCE_HIDDEN,
|
||||
desc => ?DESC(builtin_local_write_buffer)
|
||||
}
|
||||
)},
|
||||
{layout,
|
||||
sc(
|
||||
hoconsc:union(builtin_layouts()),
|
||||
#{
|
||||
desc => ?DESC(builtin_layout),
|
||||
importance => ?IMPORTANCE_MEDIUM,
|
||||
default =>
|
||||
#{
|
||||
<<"type">> => wildcard_optimized
|
||||
}
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(builtin_local_write_buffer) ->
|
||||
[
|
||||
{max_items,
|
||||
sc(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 1000,
|
||||
mapping => "emqx_durable_storage.egress_batch_size",
|
||||
importance => ?IMPORTANCE_HIDDEN,
|
||||
desc => ?DESC(builtin_local_write_buffer_max_items)
|
||||
}
|
||||
)},
|
||||
{flush_interval,
|
||||
sc(
|
||||
emqx_schema:timeout_duration_ms(),
|
||||
#{
|
||||
default => 100,
|
||||
mapping => "emqx_durable_storage.egress_flush_interval",
|
||||
importance => ?IMPORTANCE_HIDDEN,
|
||||
desc => ?DESC(builtin_local_write_buffer_flush_interval)
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(layout_builtin_wildcard_optimized) ->
|
||||
[
|
||||
{type,
|
||||
sc(
|
||||
wildcard_optimized,
|
||||
#{
|
||||
'readOnly' => true,
|
||||
default => wildcard_optimized,
|
||||
desc => ?DESC(layout_builtin_wildcard_optimized_type)
|
||||
}
|
||||
)},
|
||||
{bits_per_topic_level,
|
||||
sc(
|
||||
range(1, 64),
|
||||
#{
|
||||
default => 64,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{epoch_bits,
|
||||
sc(
|
||||
range(0, 64),
|
||||
#{
|
||||
default => 10,
|
||||
importance => ?IMPORTANCE_HIDDEN,
|
||||
desc => ?DESC(wildcard_optimized_epoch_bits)
|
||||
}
|
||||
)},
|
||||
{topic_index_bytes,
|
||||
sc(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 4,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields(layout_builtin_reference) ->
|
||||
[
|
||||
{type,
|
||||
sc(
|
||||
reference,
|
||||
#{
|
||||
'readOnly' => true,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
||||
desc(builtin) ->
|
||||
?DESC(builtin);
|
||||
desc(builtin_local_write_buffer) ->
|
||||
?DESC(builtin_local_write_buffer);
|
||||
desc(layout_builtin_wildcard_optimized) ->
|
||||
?DESC(layout_builtin_wildcard_optimized);
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
%%================================================================================
|
||||
%% Internal functions
|
||||
%%================================================================================
|
||||
|
||||
ds_schema(Options) ->
|
||||
sc(
|
||||
hoconsc:union([
|
||||
ref(builtin)
|
||||
| emqx_schema_hooks:injection_point('durable_storage.backends', [])
|
||||
]),
|
||||
Options
|
||||
).
|
||||
|
||||
-ifndef(TEST).
|
||||
builtin_layouts() ->
|
||||
[ref(layout_builtin_wildcard_optimized)].
|
||||
-else.
|
||||
builtin_layouts() ->
|
||||
%% Reference layout stores everything in one stream, so it's not
|
||||
%% suitable for production use. However, it's very simple and
|
||||
%% produces a very predictabale replay order, which can be useful
|
||||
%% for testing and debugging:
|
||||
[ref(layout_builtin_wildcard_optimized), ref(layout_builtin_reference)].
|
||||
-endif.
|
||||
|
||||
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
|
||||
|
||||
ref(StructName) -> hoconsc:ref(?MODULE, StructName).
|
|
@ -52,7 +52,7 @@ is_persistence_enabled() ->
|
|||
|
||||
-spec storage_backend() -> emqx_ds:create_db_opts().
|
||||
storage_backend() ->
|
||||
storage_backend(emqx_config:get([session_persistence, storage])).
|
||||
storage_backend([durable_storage, messages]).
|
||||
|
||||
%% Dev-only option: force all messages to go through
|
||||
%% `emqx_persistent_session_ds':
|
||||
|
@ -60,23 +60,9 @@ storage_backend() ->
|
|||
force_ds() ->
|
||||
emqx_config:get([session_persistence, force_persistence]).
|
||||
|
||||
storage_backend(#{
|
||||
builtin := #{
|
||||
enable := true,
|
||||
n_shards := NShards,
|
||||
replication_factor := ReplicationFactor
|
||||
}
|
||||
}) ->
|
||||
#{
|
||||
backend => builtin,
|
||||
storage => {emqx_ds_storage_bitfield_lts, #{}},
|
||||
n_shards => NShards,
|
||||
replication_factor => ReplicationFactor
|
||||
};
|
||||
storage_backend(#{
|
||||
fdb := #{enable := true} = FDBConfig
|
||||
}) ->
|
||||
FDBConfig#{backend => fdb}.
|
||||
storage_backend(Path) ->
|
||||
ConfigTree = #{'_config_handler' := {Module, Function}} = emqx_config:get(Path),
|
||||
apply(Module, Function, [ConfigTree]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -733,7 +733,7 @@ fetch_new_messages(Session = #{s := S}, ClientInfo) ->
|
|||
fetch_new_messages([], Session, _ClientInfo) ->
|
||||
Session;
|
||||
fetch_new_messages([I | Streams], Session0 = #{inflight := Inflight}, ClientInfo) ->
|
||||
BatchSize = emqx_config:get([session_persistence, max_batch_size]),
|
||||
BatchSize = emqx_config:get([session_persistence, batch_size]),
|
||||
case emqx_persistent_session_ds_inflight:n_buffered(all, Inflight) >= BatchSize of
|
||||
true ->
|
||||
%% Buffer is full:
|
||||
|
|
|
@ -254,6 +254,14 @@ roots(medium) ->
|
|||
sc(
|
||||
ref("overload_protection"),
|
||||
#{importance => ?IMPORTANCE_HIDDEN}
|
||||
)},
|
||||
{durable_storage,
|
||||
sc(
|
||||
ref(durable_storage),
|
||||
#{
|
||||
importance => ?IMPORTANCE_MEDIUM,
|
||||
desc => ?DESC(durable_storage)
|
||||
}
|
||||
)}
|
||||
];
|
||||
roots(low) ->
|
||||
|
@ -295,16 +303,6 @@ roots(low) ->
|
|||
converter => fun flapping_detect_converter/2
|
||||
}
|
||||
)},
|
||||
{persistent_session_store,
|
||||
sc(
|
||||
ref("persistent_session_store"),
|
||||
#{
|
||||
%% NOTE
|
||||
%% Due to some quirks in interaction between `emqx_config` and
|
||||
%% `hocon_tconf`, schema roots cannot currently be deprecated.
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{session_persistence,
|
||||
sc(
|
||||
ref("session_persistence"),
|
||||
|
@ -324,111 +322,6 @@ roots(low) ->
|
|||
)}
|
||||
].
|
||||
|
||||
fields("persistent_session_store") ->
|
||||
Deprecated = #{deprecated => {since, "5.4.0"}},
|
||||
[
|
||||
{"enabled",
|
||||
sc(
|
||||
boolean(),
|
||||
Deprecated#{
|
||||
default => false,
|
||||
%% TODO(5.2): change field name to 'enable' and keep 'enabled' as an alias
|
||||
aliases => [enable],
|
||||
desc => ?DESC(persistent_session_store_enabled)
|
||||
}
|
||||
)},
|
||||
{"ds",
|
||||
sc(
|
||||
boolean(),
|
||||
Deprecated#{
|
||||
default => false,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"on_disc",
|
||||
sc(
|
||||
boolean(),
|
||||
Deprecated#{
|
||||
default => true,
|
||||
desc => ?DESC(persistent_store_on_disc)
|
||||
}
|
||||
)},
|
||||
{"ram_cache",
|
||||
sc(
|
||||
boolean(),
|
||||
Deprecated#{
|
||||
default => false,
|
||||
desc => ?DESC(persistent_store_ram_cache)
|
||||
}
|
||||
)},
|
||||
{"backend",
|
||||
sc(
|
||||
hoconsc:union([ref("persistent_session_builtin")]),
|
||||
Deprecated#{
|
||||
default => #{
|
||||
<<"type">> => <<"builtin">>,
|
||||
<<"session">> =>
|
||||
#{<<"ram_cache">> => true},
|
||||
<<"session_messages">> =>
|
||||
#{<<"ram_cache">> => true},
|
||||
<<"messages">> =>
|
||||
#{<<"ram_cache">> => false}
|
||||
},
|
||||
desc => ?DESC(persistent_session_store_backend)
|
||||
}
|
||||
)},
|
||||
{"max_retain_undelivered",
|
||||
sc(
|
||||
duration(),
|
||||
Deprecated#{
|
||||
default => <<"1h">>,
|
||||
desc => ?DESC(persistent_session_store_max_retain_undelivered)
|
||||
}
|
||||
)},
|
||||
{"message_gc_interval",
|
||||
sc(
|
||||
duration(),
|
||||
Deprecated#{
|
||||
default => <<"1h">>,
|
||||
desc => ?DESC(persistent_session_store_message_gc_interval)
|
||||
}
|
||||
)},
|
||||
{"session_message_gc_interval",
|
||||
sc(
|
||||
duration(),
|
||||
Deprecated#{
|
||||
default => <<"1m">>,
|
||||
desc => ?DESC(persistent_session_store_session_message_gc_interval)
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields("persistent_table_mria_opts") ->
|
||||
[
|
||||
{"ram_cache",
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
default => true,
|
||||
desc => ?DESC(persistent_store_ram_cache)
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields("persistent_session_builtin") ->
|
||||
[
|
||||
{"type", sc(hoconsc:enum([builtin]), #{default => builtin, desc => ""})},
|
||||
{"session",
|
||||
sc(ref("persistent_table_mria_opts"), #{
|
||||
desc => ?DESC(persistent_session_builtin_session_table)
|
||||
})},
|
||||
{"session_messages",
|
||||
sc(ref("persistent_table_mria_opts"), #{
|
||||
desc => ?DESC(persistent_session_builtin_sess_msg_table)
|
||||
})},
|
||||
{"messages",
|
||||
sc(ref("persistent_table_mria_opts"), #{
|
||||
desc => ?DESC(persistent_session_builtin_messages_table)
|
||||
})}
|
||||
];
|
||||
fields("stats") ->
|
||||
[
|
||||
{"enable",
|
||||
|
@ -1769,30 +1662,13 @@ fields("session_persistence") ->
|
|||
default => false
|
||||
}
|
||||
)},
|
||||
{"storage",
|
||||
sc(
|
||||
ref("session_storage_backend"), #{
|
||||
desc => ?DESC(session_persistence_storage),
|
||||
validator => fun validate_backend_enabled/1,
|
||||
default => #{
|
||||
<<"builtin">> => #{}
|
||||
}
|
||||
}
|
||||
)},
|
||||
{"max_batch_size",
|
||||
{"batch_size",
|
||||
sc(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 100,
|
||||
desc => ?DESC(session_ds_max_batch_size)
|
||||
}
|
||||
)},
|
||||
{"min_batch_size",
|
||||
sc(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 100,
|
||||
desc => ?DESC(session_ds_min_batch_size)
|
||||
desc => ?DESC(session_ds_batch_size),
|
||||
importance => ?IMPORTANCE_MEDIUM
|
||||
}
|
||||
)},
|
||||
{"idle_poll_interval",
|
||||
|
@ -1854,69 +1730,8 @@ fields("session_persistence") ->
|
|||
}
|
||||
)}
|
||||
];
|
||||
fields("session_storage_backend") ->
|
||||
[
|
||||
{"builtin",
|
||||
sc(ref("session_storage_backend_builtin"), #{
|
||||
desc => ?DESC(session_storage_backend_builtin),
|
||||
required => {false, recursively}
|
||||
})}
|
||||
] ++ emqx_schema_hooks:injection_point('session_persistence.storage_backends', []);
|
||||
fields("session_storage_backend_builtin") ->
|
||||
[
|
||||
{"enable",
|
||||
sc(
|
||||
boolean(),
|
||||
#{
|
||||
desc => ?DESC(session_storage_backend_enable),
|
||||
default => true
|
||||
}
|
||||
)},
|
||||
{"data_dir",
|
||||
sc(
|
||||
string(),
|
||||
#{
|
||||
desc => ?DESC(session_builtin_data_dir),
|
||||
mapping => "emqx_durable_storage.db_data_dir",
|
||||
required => false,
|
||||
importance => ?IMPORTANCE_LOW
|
||||
}
|
||||
)},
|
||||
{"n_shards",
|
||||
sc(
|
||||
pos_integer(),
|
||||
#{
|
||||
desc => ?DESC(session_builtin_n_shards),
|
||||
default => 16
|
||||
}
|
||||
)},
|
||||
{"replication_factor",
|
||||
sc(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 3,
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"egress_batch_size",
|
||||
sc(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 1000,
|
||||
mapping => "emqx_durable_storage.egress_batch_size",
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)},
|
||||
{"egress_flush_interval",
|
||||
sc(
|
||||
timeout_duration_ms(),
|
||||
#{
|
||||
default => 100,
|
||||
mapping => "emqx_durable_storage.egress_flush_interval",
|
||||
importance => ?IMPORTANCE_HIDDEN
|
||||
}
|
||||
)}
|
||||
].
|
||||
fields(durable_storage) ->
|
||||
emqx_ds_schema:schema().
|
||||
|
||||
mqtt_listener(Bind) ->
|
||||
base_listener(Bind) ++
|
||||
|
@ -2170,6 +1985,8 @@ desc("crl_cache") ->
|
|||
"Global CRL cache options.";
|
||||
desc("session_persistence") ->
|
||||
"Settings governing durable sessions persistence.";
|
||||
desc(durable_storage) ->
|
||||
?DESC(durable_storage);
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
|
@ -2192,17 +2009,6 @@ ensure_list(V) ->
|
|||
filter(Opts) ->
|
||||
[{K, V} || {K, V} <- Opts, V =/= undefined].
|
||||
|
||||
validate_backend_enabled(Config) ->
|
||||
Enabled = maps:filter(fun(_, #{<<"enable">> := E}) -> E end, Config),
|
||||
case maps:to_list(Enabled) of
|
||||
[{_Type, _BackendConfig}] ->
|
||||
ok;
|
||||
_Conflicts = [_ | _] ->
|
||||
{error, multiple_enabled_backends};
|
||||
_None = [] ->
|
||||
{error, no_enabled_backend}
|
||||
end.
|
||||
|
||||
%% @private This function defines the SSL opts which are commonly used by
|
||||
%% SSL listener and client.
|
||||
-spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema().
|
||||
|
|
|
@ -559,45 +559,70 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
|
|||
%% we spawn several clients simultaneously to cause the race
|
||||
%% condition for the client id lock
|
||||
NumClients = 20,
|
||||
ConnectSuccessCntr = counters:new(1, []),
|
||||
ConnectFailCntr = counters:new(1, []),
|
||||
ConnectFun =
|
||||
fun() ->
|
||||
process_flag(trap_exit, true),
|
||||
try
|
||||
{ok, ConnPid} =
|
||||
emqtt:start_link([
|
||||
{clean_start, true},
|
||||
{clientid, ClientID}
|
||||
| Config
|
||||
]),
|
||||
{ok, _} = emqtt:ConnFun(ConnPid),
|
||||
counters:add(ConnectSuccessCntr, 1, 1)
|
||||
catch
|
||||
_:_ ->
|
||||
counters:add(ConnectFailCntr, 1, 1)
|
||||
end
|
||||
end,
|
||||
{ok, {ok, [_, _]}} =
|
||||
wait_for_events(
|
||||
fun() ->
|
||||
lists:foreach(
|
||||
fun(_) ->
|
||||
spawn(
|
||||
fun() ->
|
||||
{ok, ConnPid} =
|
||||
emqtt:start_link([
|
||||
{clean_start, true},
|
||||
{clientid, ClientID}
|
||||
| Config
|
||||
]),
|
||||
%% don't assert the result: most of them fail
|
||||
%% during the race
|
||||
emqtt:ConnFun(ConnPid),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok
|
||||
spawn(ConnectFun)
|
||||
end,
|
||||
lists:seq(1, NumClients)
|
||||
)
|
||||
),
|
||||
ok
|
||||
end,
|
||||
%% there can be only one channel that wins the race for the
|
||||
%% lock for this client id. we also expect a decrement
|
||||
%% event because the client dies along with the ephemeral
|
||||
%% process.
|
||||
%% At least one channel acquires the lock for this client id. We
|
||||
%% also expect a decrement event because the client dies along with
|
||||
%% the ephemeral process.
|
||||
[
|
||||
emqx_cm_connected_client_count_inc,
|
||||
emqx_cm_connected_client_count_dec
|
||||
emqx_cm_connected_client_count_dec_done
|
||||
],
|
||||
1000
|
||||
_Timeout = 10000
|
||||
),
|
||||
%% Since more than one pair of inc/dec may be emitted, we need to
|
||||
%% wait for full stabilization
|
||||
timer:sleep(100),
|
||||
%% It must be 0 again because we spawn-linked the clients in
|
||||
%% ephemeral processes above, and all should be dead now.
|
||||
?retry(
|
||||
_Sleep = 100,
|
||||
_Retries = 100,
|
||||
begin
|
||||
ConnectSuccessCnt = counters:get(ConnectSuccessCntr, 1),
|
||||
ConnectFailCnt = counters:get(ConnectFailCntr, 1),
|
||||
NumClients = ConnectSuccessCnt + ConnectFailCnt
|
||||
end
|
||||
),
|
||||
ConnectSuccessCnt = counters:get(ConnectSuccessCntr, 1),
|
||||
?assert(ConnectSuccessCnt > 0),
|
||||
EventsThatShouldHaveHappened = lists:flatten(
|
||||
lists:duplicate(
|
||||
ConnectSuccessCnt,
|
||||
[
|
||||
emqx_cm_connected_client_count_inc,
|
||||
emqx_cm_connected_client_count_dec_done
|
||||
]
|
||||
)
|
||||
),
|
||||
wait_for_events(fun() -> ok end, EventsThatShouldHaveHappened, 10000, infinity),
|
||||
%% It must be 0 again because we got enough
|
||||
%% emqx_cm_connected_client_count_dec_done events
|
||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||
%% connecting again
|
||||
{ok, ConnPid1} = emqtt:start_link([
|
||||
|
@ -608,7 +633,8 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
|
|||
{{ok, _}, {ok, [_]}} =
|
||||
wait_for_events(
|
||||
fun() -> emqtt:ConnFun(ConnPid1) end,
|
||||
[emqx_cm_connected_client_count_inc]
|
||||
[emqx_cm_connected_client_count_inc],
|
||||
_Timeout = 10000
|
||||
),
|
||||
?assertEqual(1, emqx_cm:get_connected_client_count()),
|
||||
%% abnormal exit of channel process
|
||||
|
@ -620,9 +646,10 @@ t_connected_client_count_transient_takeover(Config) when is_list(Config) ->
|
|||
ok
|
||||
end,
|
||||
[
|
||||
emqx_cm_connected_client_count_dec,
|
||||
emqx_cm_connected_client_count_dec_done,
|
||||
emqx_cm_process_down
|
||||
]
|
||||
],
|
||||
_Timeout = 10000
|
||||
),
|
||||
?assertEqual(0, emqx_cm:get_connected_client_count()),
|
||||
ok;
|
||||
|
@ -735,11 +762,14 @@ wait_for_events(Action, Kinds) ->
|
|||
wait_for_events(Action, Kinds, 500).
|
||||
|
||||
wait_for_events(Action, Kinds, Timeout) ->
|
||||
wait_for_events(Action, Kinds, Timeout, 0).
|
||||
|
||||
wait_for_events(Action, Kinds, Timeout, BackInTime) ->
|
||||
Predicate = fun(#{?snk_kind := K}) ->
|
||||
lists:member(K, Kinds)
|
||||
end,
|
||||
N = length(Kinds),
|
||||
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, 0),
|
||||
{ok, Sub} = snabbkaffe_collector:subscribe(Predicate, N, Timeout, BackInTime),
|
||||
Res = Action(),
|
||||
case snabbkaffe_collector:receive_events(Sub) of
|
||||
{timeout, _} ->
|
||||
|
|
|
@ -50,7 +50,7 @@ init_per_testcase(t_message_gc = TestCase, Config) ->
|
|||
Opts = #{
|
||||
extra_emqx_conf =>
|
||||
"\n session_persistence.message_retention_period = 1s"
|
||||
"\n session_persistence.storage.builtin.n_shards = 3"
|
||||
"\n durable_storage.messages.n_shards = 3"
|
||||
},
|
||||
common_init_per_testcase(TestCase, [{n_shards, 3} | Config], Opts);
|
||||
init_per_testcase(TestCase, Config) ->
|
||||
|
|
|
@ -312,7 +312,7 @@ get_bridge_api(BridgeKind, BridgeType, BridgeName) ->
|
|||
Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]),
|
||||
ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]),
|
||||
Res = request(get, Path, Params),
|
||||
ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]),
|
||||
ct:pal("get bridge ~p result: ~p", [{BridgeKind, BridgeType, BridgeName}, Res]),
|
||||
Res.
|
||||
|
||||
create_bridge_api(Config) ->
|
||||
|
|
|
@ -33,6 +33,7 @@ all() ->
|
|||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:clear_screen(),
|
||||
GCPEmulatorHost = os:getenv("GCP_EMULATOR_HOST", "toxiproxy"),
|
||||
GCPEmulatorPortStr = os:getenv("GCP_EMULATOR_PORT", "8085"),
|
||||
GCPEmulatorPort = list_to_integer(GCPEmulatorPortStr),
|
||||
|
|
|
@ -208,3 +208,48 @@ t_consume(Config) ->
|
|||
}
|
||||
),
|
||||
ok.
|
||||
|
||||
t_update_topic(Config) ->
|
||||
%% Tests that, if a bridge originally has the legacy field `topic_mapping' filled in
|
||||
%% and later is updated using v2 APIs, then the legacy field is cleared and the new
|
||||
%% `topic' field is used.
|
||||
ConnectorConfig = ?config(connector_config, Config),
|
||||
SourceConfig = ?config(source_config, Config),
|
||||
Name = ?config(source_name, Config),
|
||||
V1Config0 = emqx_action_info:connector_action_config_to_bridge_v1_config(
|
||||
?SOURCE_TYPE_BIN,
|
||||
ConnectorConfig,
|
||||
SourceConfig
|
||||
),
|
||||
V1Config = emqx_utils_maps:deep_put(
|
||||
[<<"consumer">>, <<"topic_mapping">>],
|
||||
V1Config0,
|
||||
[
|
||||
#{
|
||||
<<"pubsub_topic">> => <<"old_topic">>,
|
||||
<<"mqtt_topic">> => <<"">>,
|
||||
<<"qos">> => 2,
|
||||
<<"payload_template">> => <<"template">>
|
||||
}
|
||||
]
|
||||
),
|
||||
%% Note: using v1 API
|
||||
{ok, {{_, 201, _}, _, _}} = emqx_bridge_testlib:create_bridge_api(
|
||||
?SOURCE_TYPE_BIN,
|
||||
Name,
|
||||
V1Config
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, {{_, 200, _}, _, #{<<"parameters">> := #{<<"topic">> := <<"old_topic">>}}}},
|
||||
emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name)
|
||||
),
|
||||
%% Note: we don't add `topic_mapping' again here to the parameters.
|
||||
{ok, {{_, 200, _}, _, _}} = emqx_bridge_v2_testlib:update_bridge_api(
|
||||
Config,
|
||||
#{<<"parameters">> => #{<<"topic">> => <<"new_topic">>}}
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, {{_, 200, _}, _, #{<<"parameters">> := #{<<"topic">> := <<"new_topic">>}}}},
|
||||
emqx_bridge_v2_testlib:get_source_api(?SOURCE_TYPE_BIN, Name)
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_bridge_http, [
|
||||
{description, "EMQX HTTP Bridge and Connector Application"},
|
||||
{vsn, "0.2.3"},
|
||||
{vsn, "0.2.4"},
|
||||
{registered, []},
|
||||
{applications, [kernel, stdlib, emqx_resource, ehttpc]},
|
||||
{env, [{emqx_action_info_modules, [emqx_bridge_http_action_info]}]},
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_bridge_iotdb, [
|
||||
{description, "EMQX Enterprise Apache IoTDB Bridge"},
|
||||
{vsn, "0.1.6"},
|
||||
{vsn, "0.1.7"},
|
||||
{modules, [
|
||||
emqx_bridge_iotdb,
|
||||
emqx_bridge_iotdb_connector
|
||||
|
|
|
@ -1033,7 +1033,7 @@ fields("log_throttling") ->
|
|||
[
|
||||
{time_window,
|
||||
sc(
|
||||
emqx_schema:duration_s(),
|
||||
emqx_schema:timeout_duration_s(),
|
||||
#{
|
||||
default => <<"1m">>,
|
||||
desc => ?DESC("log_throttling_time_window"),
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_connector, [
|
||||
{description, "EMQX Data Integration Connectors"},
|
||||
{vsn, "0.1.39"},
|
||||
{vsn, "0.2.0"},
|
||||
{registered, []},
|
||||
{mod, {emqx_connector_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -150,7 +150,16 @@
|
|||
|
||||
-type message_store_opts() ::
|
||||
#{
|
||||
sync => boolean()
|
||||
%% Whether to wait until the message storage has been acknowledged to return from
|
||||
%% `store_batch'.
|
||||
%% Default: `true'.
|
||||
sync => boolean(),
|
||||
%% Whether the whole batch given to `store_batch' should be inserted atomically as
|
||||
%% a unit. Note: the whole batch must be crafted so that it belongs to a single
|
||||
%% shard (if applicable to the backend), as the batch will be split accordingly
|
||||
%% even if this flag is `true'.
|
||||
%% Default: `false'.
|
||||
atomic => boolean()
|
||||
}.
|
||||
|
||||
-type generic_db_opts() ::
|
||||
|
@ -263,7 +272,7 @@ list_generations_with_lifetimes(DB) ->
|
|||
Mod = ?module(DB),
|
||||
call_if_implemented(Mod, list_generations_with_lifetimes, [DB], #{}).
|
||||
|
||||
-spec drop_generation(db(), generation_rank()) -> ok | {error, _}.
|
||||
-spec drop_generation(db(), ds_specific_generation_rank()) -> ok | {error, _}.
|
||||
drop_generation(DB, GenId) ->
|
||||
Mod = ?module(DB),
|
||||
case erlang:function_exported(Mod, drop_generation, 2) of
|
||||
|
|
|
@ -51,6 +51,7 @@
|
|||
-define(flush, flush).
|
||||
|
||||
-record(enqueue_req, {message :: emqx_types:message(), sync :: boolean()}).
|
||||
-record(enqueue_atomic_req, {batch :: [emqx_types:message()], sync :: boolean()}).
|
||||
|
||||
%%================================================================================
|
||||
%% API functions
|
||||
|
@ -64,13 +65,34 @@ start_link(DB, Shard) ->
|
|||
ok.
|
||||
store_batch(DB, Messages, Opts) ->
|
||||
Sync = maps:get(sync, Opts, true),
|
||||
lists:foreach(
|
||||
fun(Message) ->
|
||||
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
|
||||
gen_server:call(?via(DB, Shard), #enqueue_req{message = Message, sync = Sync})
|
||||
end,
|
||||
Messages
|
||||
).
|
||||
case maps:get(atomic, Opts, false) of
|
||||
false ->
|
||||
lists:foreach(
|
||||
fun(Message) ->
|
||||
Shard = emqx_ds_replication_layer:shard_of_message(DB, Message, clientid),
|
||||
gen_server:call(?via(DB, Shard), #enqueue_req{
|
||||
message = Message,
|
||||
sync = Sync
|
||||
})
|
||||
end,
|
||||
Messages
|
||||
);
|
||||
true ->
|
||||
maps:foreach(
|
||||
fun(Shard, Batch) ->
|
||||
gen_server:call(?via(DB, Shard), #enqueue_atomic_req{
|
||||
batch = Batch,
|
||||
sync = Sync
|
||||
})
|
||||
end,
|
||||
maps:groups_from_list(
|
||||
fun(Message) ->
|
||||
emqx_ds_replication_layer:shard_of_message(DB, Message, clientid)
|
||||
end,
|
||||
Messages
|
||||
)
|
||||
)
|
||||
end.
|
||||
|
||||
%%================================================================================
|
||||
%% behavior callbacks
|
||||
|
@ -101,6 +123,9 @@ init([DB, Shard]) ->
|
|||
|
||||
handle_call(#enqueue_req{message = Msg, sync = Sync}, From, S) ->
|
||||
do_enqueue(From, Sync, Msg, S);
|
||||
handle_call(#enqueue_atomic_req{batch = Batch, sync = Sync}, From, S) ->
|
||||
Len = length(Batch),
|
||||
do_enqueue(From, Sync, {atomic, Len, Batch}, S);
|
||||
handle_call(_Call, _From, S) ->
|
||||
{reply, {error, unknown_call}, S}.
|
||||
|
||||
|
@ -131,7 +156,7 @@ do_flush(
|
|||
Batch = #{?tag => ?BATCH, ?batch_messages => lists:reverse(Messages)},
|
||||
ok = emqx_ds_proto_v2:store_batch(Leader, DB, Shard, Batch, #{}),
|
||||
[gen_server:reply(From, ok) || From <- lists:reverse(Replies)],
|
||||
?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard}),
|
||||
?tp(emqx_ds_replication_layer_egress_flush, #{db => DB, shard => Shard, batch => Messages}),
|
||||
erlang:garbage_collect(),
|
||||
S#s{
|
||||
n = 0,
|
||||
|
@ -140,9 +165,15 @@ do_flush(
|
|||
tref = start_timer()
|
||||
}.
|
||||
|
||||
do_enqueue(From, Sync, Msg, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
|
||||
do_enqueue(From, Sync, MsgOrBatch, S0 = #s{n = N, batch = Batch, pending_replies = Replies}) ->
|
||||
NMax = application:get_env(emqx_durable_storage, egress_batch_size, 1000),
|
||||
S1 = S0#s{n = N + 1, batch = [Msg | Batch]},
|
||||
S1 =
|
||||
case MsgOrBatch of
|
||||
{atomic, NumMsgs, Msgs} ->
|
||||
S0#s{n = N + NumMsgs, batch = Msgs ++ Batch};
|
||||
Msg ->
|
||||
S0#s{n = N + 1, batch = [Msg | Batch]}
|
||||
end,
|
||||
S2 =
|
||||
case N >= NMax of
|
||||
true ->
|
||||
|
|
|
@ -230,6 +230,19 @@ drop(_Shard, DBHandle, GenId, CFRefs, #s{}) ->
|
|||
emqx_ds_storage_layer:shard_id(), s(), [emqx_types:message()], emqx_ds:message_store_opts()
|
||||
) ->
|
||||
emqx_ds:store_batch_result().
|
||||
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options = #{atomic := true}) ->
|
||||
{ok, Batch} = rocksdb:batch(),
|
||||
lists:foreach(
|
||||
fun(Msg) ->
|
||||
{Key, _} = make_key(S, Msg),
|
||||
Val = serialize(Msg),
|
||||
rocksdb:batch_put(Batch, Data, Key, Val)
|
||||
end,
|
||||
Messages
|
||||
),
|
||||
Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
|
||||
rocksdb:release_batch(Batch),
|
||||
Res;
|
||||
store_batch(_ShardId, S = #s{db = DB, data = Data}, Messages, _Options) ->
|
||||
lists:foreach(
|
||||
fun(Msg) ->
|
||||
|
|
|
@ -90,6 +90,20 @@ drop(_ShardId, DBHandle, _GenId, _CFRefs, #s{cf = CFHandle}) ->
|
|||
ok = rocksdb:drop_column_family(DBHandle, CFHandle),
|
||||
ok.
|
||||
|
||||
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options = #{atomic := true}) ->
|
||||
{ok, Batch} = rocksdb:batch(),
|
||||
lists:foreach(
|
||||
fun(Msg) ->
|
||||
Id = erlang:unique_integer([monotonic]),
|
||||
Key = <<Id:64>>,
|
||||
Val = term_to_binary(Msg),
|
||||
rocksdb:batch_put(Batch, CF, Key, Val)
|
||||
end,
|
||||
Messages
|
||||
),
|
||||
Res = rocksdb:write_batch(DB, Batch, _WriteOptions = []),
|
||||
rocksdb:release_batch(Batch),
|
||||
Res;
|
||||
store_batch(_ShardId, #s{db = DB, cf = CF}, Messages, _Options) ->
|
||||
lists:foreach(
|
||||
fun(Msg) ->
|
||||
|
|
|
@ -307,6 +307,71 @@ t_08_smoke_list_drop_generation(_Config) ->
|
|||
),
|
||||
ok.
|
||||
|
||||
t_09_atomic_store_batch(_Config) ->
|
||||
DB = ?FUNCTION_NAME,
|
||||
?check_trace(
|
||||
begin
|
||||
application:set_env(emqx_durable_storage, egress_batch_size, 1),
|
||||
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||
Msgs = [
|
||||
message(<<"1">>, <<"1">>, 0),
|
||||
message(<<"2">>, <<"2">>, 1),
|
||||
message(<<"3">>, <<"3">>, 2)
|
||||
],
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_ds:store_batch(DB, Msgs, #{
|
||||
atomic => true,
|
||||
sync => true
|
||||
})
|
||||
),
|
||||
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
%% Must contain exactly one flush with all messages.
|
||||
?assertMatch(
|
||||
[#{batch := [_, _, _]}],
|
||||
?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_10_non_atomic_store_batch(_Config) ->
|
||||
DB = ?FUNCTION_NAME,
|
||||
?check_trace(
|
||||
begin
|
||||
application:set_env(emqx_durable_storage, egress_batch_size, 1),
|
||||
?assertMatch(ok, emqx_ds:open_db(DB, opts())),
|
||||
Msgs = [
|
||||
message(<<"1">>, <<"1">>, 0),
|
||||
message(<<"2">>, <<"2">>, 1),
|
||||
message(<<"3">>, <<"3">>, 2)
|
||||
],
|
||||
%% Non-atomic batches may be split.
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_ds:store_batch(DB, Msgs, #{
|
||||
atomic => false,
|
||||
sync => true
|
||||
})
|
||||
),
|
||||
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
%% Should contain one flush per message.
|
||||
?assertMatch(
|
||||
[#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
|
||||
?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_drop_generation_with_never_used_iterator(_Config) ->
|
||||
%% This test checks how the iterator behaves when:
|
||||
%% 1) it's created at generation 1 and not consumed from.
|
||||
|
@ -549,6 +614,7 @@ iterate(DB, It0, BatchSize, Acc) ->
|
|||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:clear_screen(),
|
||||
Apps = emqx_cth_suite:start(
|
||||
[mria, emqx_durable_storage],
|
||||
#{work_dir => ?config(priv_dir, Config)}
|
||||
|
|
|
@ -219,6 +219,69 @@ t_replay(_Config) ->
|
|||
?assert(check(?SHARD, <<"#">>, 0, Messages)),
|
||||
ok.
|
||||
|
||||
t_atomic_store_batch(_Config) ->
|
||||
DB = ?FUNCTION_NAME,
|
||||
?check_trace(
|
||||
begin
|
||||
application:set_env(emqx_durable_storage, egress_batch_size, 1),
|
||||
Msgs = [
|
||||
make_message(0, <<"1">>, <<"1">>),
|
||||
make_message(1, <<"2">>, <<"2">>),
|
||||
make_message(2, <<"3">>, <<"3">>)
|
||||
],
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_ds:store_batch(DB, Msgs, #{
|
||||
atomic => true,
|
||||
sync => true
|
||||
})
|
||||
),
|
||||
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
%% Must contain exactly one flush with all messages.
|
||||
?assertMatch(
|
||||
[#{batch := [_, _, _]}],
|
||||
?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_non_atomic_store_batch(_Config) ->
|
||||
DB = ?FUNCTION_NAME,
|
||||
?check_trace(
|
||||
begin
|
||||
application:set_env(emqx_durable_storage, egress_batch_size, 1),
|
||||
Msgs = [
|
||||
make_message(0, <<"1">>, <<"1">>),
|
||||
make_message(1, <<"2">>, <<"2">>),
|
||||
make_message(2, <<"3">>, <<"3">>)
|
||||
],
|
||||
%% Non-atomic batches may be split.
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_ds:store_batch(DB, Msgs, #{
|
||||
atomic => false,
|
||||
sync => true
|
||||
})
|
||||
),
|
||||
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
%% Should contain one flush per message.
|
||||
?assertMatch(
|
||||
[#{batch := [_]}, #{batch := [_]}, #{batch := [_]}],
|
||||
?of_kind(emqx_ds_replication_layer_egress_flush, Trace)
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
check(Shard, TopicFilter, StartTime, ExpectedMessages) ->
|
||||
ExpectedFiltered = lists:filter(
|
||||
fun(#message{topic = Topic, timestamp = TS}) ->
|
||||
|
@ -418,6 +481,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
|||
suite() -> [{timetrap, {seconds, 20}}].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:clear_screen(),
|
||||
Apps = emqx_cth_suite:start(
|
||||
[emqx_durable_storage],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang; -*-
|
||||
|
||||
{deps, [
|
||||
{jesse, "1.7.0"},
|
||||
{jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}},
|
||||
{emqx, {path, "../../apps/emqx"}},
|
||||
{emqx_utils, {path, "../emqx_utils"}},
|
||||
{emqx_gateway, {path, "../../apps/emqx_gateway"}}
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_management, [
|
||||
{description, "EMQX Management API and CLI"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.0.38"},
|
||||
{vsn, "5.1.0"},
|
||||
{modules, []},
|
||||
{registered, [emqx_management_sup]},
|
||||
{applications, [
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_prometheus, [
|
||||
{description, "Prometheus for EMQX"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.0.20"},
|
||||
{vsn, "5.1.0"},
|
||||
{modules, []},
|
||||
{registered, [emqx_prometheus_sup]},
|
||||
{applications, [kernel, stdlib, prometheus, emqx, emqx_auth, emqx_resource, emqx_management]},
|
||||
|
|
|
@ -1,57 +1,32 @@
|
|||
|
||||
EMQX Retainer
|
||||
==============
|
||||
# Retainer
|
||||
|
||||
The retainer plugin is responsible for storing retained MQTT messages.
|
||||
The `emqx_retainer` application is responsible for storing retained MQTT messages.
|
||||
|
||||
Configuration
|
||||
-------------
|
||||
The retained messages are messages associated with a topic that are stored on the broker and delivered to any new subscribers to that topic.
|
||||
|
||||
etc/emqx_retainer.conf:
|
||||
More information about retained messages can be found in the following resources
|
||||
* [MQTT specification 3.3.1.3](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html).
|
||||
* Retained message concept in [EMQX documentation](https://www.emqx.io/docs/en/v5.0/messaging/mqtt-concepts.html#retained-message).
|
||||
* Instructions for publishing retained messages in [EMQX documentation](https://www.emqx.io/docs/en/v5.0/messaging/mqtt-retained-message.html#publish-retained-message-with-mqttx-client).
|
||||
* [The Beginner's Guide to MQTT Retained Messages](https://www.emqx.com/en/blog/mqtt5-features-retain-message).
|
||||
|
||||
```
|
||||
## Where to store the retained messages.
|
||||
## Notice that all nodes in a cluster are to have the same storage_type.
|
||||
##
|
||||
## Value: ram | disc
|
||||
## - ram: memory only
|
||||
## - disc: both memory and disc
|
||||
##
|
||||
## Default: ram
|
||||
retainer.storage_type = ram
|
||||
## Usage
|
||||
|
||||
## Maximum number of retained messages allowed.
|
||||
##
|
||||
## Value: Number >= 0
|
||||
retainer.max_retained_messages = 1000000
|
||||
The `emqx_retainer` application is enabled by default. To turn it off, add the following configuration to the `emqx.conf` file:
|
||||
|
||||
## Maximum payload size of a retained message.
|
||||
##
|
||||
## Value: Bytes
|
||||
retainer.max_payload_size = 64KB
|
||||
|
||||
## Expiration interval of the retained messages. Never expire if the value is 0.
|
||||
##
|
||||
## Value: Duration
|
||||
## - h: hour
|
||||
## - m: minute
|
||||
## - s: second
|
||||
##
|
||||
## Examples:
|
||||
## - 2h: 2 hours
|
||||
## - 30m: 30 minutes
|
||||
## - 20s: 20 seconds
|
||||
##
|
||||
## Default: 0
|
||||
retainer.expiry_interval = 0
|
||||
```
|
||||
retainer {
|
||||
enable = false
|
||||
}
|
||||
```
|
||||
|
||||
License
|
||||
-------
|
||||
For other options, see the [configuration](https://www.emqx.io/docs/en/v5.2/configuration/configuration-manual.html#retainer) documentation.
|
||||
|
||||
Apache License Version 2.0
|
||||
## Contributing
|
||||
|
||||
Author
|
||||
------
|
||||
Please see our [contributing.md](../../CONTRIBUTING.md).
|
||||
|
||||
EMQX Team
|
||||
## License
|
||||
|
||||
See [LICENSE](../../APL.txt)
|
||||
|
|
|
@ -157,7 +157,7 @@ t_store_and_clean(_) ->
|
|||
|
||||
ok = emqtt:disconnect(C1).
|
||||
|
||||
t_retain_handling(_) ->
|
||||
t_retain_handling(Config) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
|
||||
|
@ -173,11 +173,12 @@ t_retain_handling(_) ->
|
|||
?assertEqual(0, length(receive_messages(1))),
|
||||
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>),
|
||||
|
||||
emqtt:publish(
|
||||
publish(
|
||||
C1,
|
||||
<<"retained">>,
|
||||
<<"this is a retained message">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
[{qos, 0}, {retain, true}],
|
||||
Config
|
||||
),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||
|
@ -205,7 +206,7 @@ t_retain_handling(_) ->
|
|||
emqtt:publish(C1, <<"retained">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||
ok = emqtt:disconnect(C1).
|
||||
|
||||
t_wildcard_subscription(_) ->
|
||||
t_wildcard_subscription(Config) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
emqtt:publish(
|
||||
|
@ -226,17 +227,19 @@ t_wildcard_subscription(_) ->
|
|||
<<"this is a retained message 2">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
),
|
||||
emqtt:publish(
|
||||
publish(
|
||||
C1,
|
||||
<<"/x/y/z">>,
|
||||
<<"this is a retained message 3">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
[{qos, 0}, {retain, true}],
|
||||
Config
|
||||
),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0),
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"/+/y/#">>, 0),
|
||||
?assertEqual(4, length(receive_messages(4))),
|
||||
Msgs = receive_messages(4),
|
||||
?assertEqual(4, length(Msgs), #{msgs => Msgs}),
|
||||
|
||||
emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||
emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||
|
@ -244,7 +247,7 @@ t_wildcard_subscription(_) ->
|
|||
emqtt:publish(C1, <<"/x/y/z">>, <<"">>, [{qos, 0}, {retain, true}]),
|
||||
ok = emqtt:disconnect(C1).
|
||||
|
||||
t_message_expiry(_) ->
|
||||
t_message_expiry(Config) ->
|
||||
ConfMod = fun(Conf) ->
|
||||
Conf#{<<"delivery_rate">> := <<"infinity">>}
|
||||
end,
|
||||
|
@ -279,11 +282,12 @@ t_message_expiry(_) ->
|
|||
<<"don't expire">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
),
|
||||
emqtt:publish(
|
||||
publish(
|
||||
C1,
|
||||
<<"$SYS/retained/4">>,
|
||||
<<"don't expire">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
[{qos, 0}, {retain, true}],
|
||||
Config
|
||||
),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
||||
|
@ -307,14 +311,14 @@ t_message_expiry(_) ->
|
|||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
|
||||
t_message_expiry_2(_) ->
|
||||
t_message_expiry_2(Config) ->
|
||||
ConfMod = fun(Conf) ->
|
||||
Conf#{<<"msg_expiry_interval">> := <<"2s">>}
|
||||
end,
|
||||
Case = fun() ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
emqtt:publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}]),
|
||||
publish(C1, <<"retained">>, <<"expire">>, [{qos, 0}, {retain, true}], Config),
|
||||
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||
?assertEqual(1, length(receive_messages(1))),
|
||||
|
@ -348,7 +352,7 @@ t_table_full(_) ->
|
|||
end,
|
||||
with_conf(ConfMod, Case).
|
||||
|
||||
t_clean(_) ->
|
||||
t_clean(Config) ->
|
||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
emqtt:publish(
|
||||
|
@ -363,11 +367,12 @@ t_clean(_) ->
|
|||
<<"this is a retained message 1">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
),
|
||||
emqtt:publish(
|
||||
publish(
|
||||
C1,
|
||||
<<"retained/test/0">>,
|
||||
<<"this is a retained message 2">>,
|
||||
[{qos, 0}, {retain, true}]
|
||||
[{qos, 0}, {retain, true}],
|
||||
Config
|
||||
),
|
||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||
?assertEqual(3, length(receive_messages(3))),
|
||||
|
@ -871,3 +876,36 @@ make_limiter_json(Rate) ->
|
|||
<<"initial">> => 0,
|
||||
<<"burst">> => <<"0">>
|
||||
}.
|
||||
|
||||
publish(Client, Topic, Payload, Opts, TCConfig) ->
|
||||
PublishOpts = publish_opts(TCConfig),
|
||||
do_publish(Client, Topic, Payload, Opts, PublishOpts).
|
||||
|
||||
publish_opts(TCConfig) ->
|
||||
Timeout = proplists:get_value(publish_wait_timeout, TCConfig, undefined),
|
||||
Predicate =
|
||||
case proplists:get_value(publish_wait_predicate, TCConfig, undefined) of
|
||||
undefined -> undefined;
|
||||
{NEvents, Pred} -> {predicate, {NEvents, Pred, Timeout}};
|
||||
Pred -> {predicate, {1, Pred, Timeout}}
|
||||
end,
|
||||
Sleep =
|
||||
case proplists:get_value(sleep_after_publish, TCConfig, undefined) of
|
||||
undefined -> undefined;
|
||||
Time -> {sleep, Time}
|
||||
end,
|
||||
emqx_maybe:define(Predicate, Sleep).
|
||||
|
||||
do_publish(Client, Topic, Payload, Opts, undefined) ->
|
||||
emqtt:publish(Client, Topic, Payload, Opts);
|
||||
do_publish(Client, Topic, Payload, Opts, {predicate, {NEvents, Predicate, Timeout}}) ->
|
||||
%% Do not delete this clause: it's used by other retainer implementation tests
|
||||
{ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, Timeout),
|
||||
Res = emqtt:publish(Client, Topic, Payload, Opts),
|
||||
{ok, _} = snabbkaffe:receive_events(SRef0),
|
||||
Res;
|
||||
do_publish(Client, Topic, Payload, Opts, {sleep, Time}) ->
|
||||
%% Do not delete this clause: it's used by other retainer implementation tests
|
||||
Res = emqtt:publish(Client, Topic, Payload, Opts),
|
||||
ct:sleep(Time),
|
||||
Res.
|
||||
|
|
|
@ -96,6 +96,8 @@
|
|||
bytesize/1,
|
||||
subbits/2,
|
||||
subbits/3,
|
||||
subbits/4,
|
||||
subbits/5,
|
||||
subbits/6
|
||||
]).
|
||||
|
||||
|
@ -556,6 +558,16 @@ subbits(Bits, Len) when is_integer(Len), is_bitstring(Bits) ->
|
|||
subbits(Bits, Start, Len) when is_integer(Start), is_integer(Len), is_bitstring(Bits) ->
|
||||
get_subbits(Bits, Start, Len, <<"integer">>, <<"unsigned">>, <<"big">>).
|
||||
|
||||
subbits(Bits, Start, Len, Type) when
|
||||
is_integer(Start), is_integer(Len), is_bitstring(Bits)
|
||||
->
|
||||
get_subbits(Bits, Start, Len, Type, <<"unsigned">>, <<"big">>).
|
||||
|
||||
subbits(Bits, Start, Len, Type, Signedness) when
|
||||
is_integer(Start), is_integer(Len), is_bitstring(Bits)
|
||||
->
|
||||
get_subbits(Bits, Start, Len, Type, Signedness, <<"big">>).
|
||||
|
||||
subbits(Bits, Start, Len, Type, Signedness, Endianness) when
|
||||
is_integer(Start), is_integer(Len), is_bitstring(Bits)
|
||||
->
|
||||
|
|
|
@ -911,6 +911,17 @@ t_subbits2_float(_) ->
|
|||
ct:pal(";;;;~p", [R2]),
|
||||
?assert((RL2 >= 0 andalso RL2 < 0.0001) orelse (RL2 =< 0 andalso RL2 > -0.0001)).
|
||||
|
||||
t_subbits_4_args(_) ->
|
||||
R = apply_func(subbits, [<<5.3:64/float>>, 1, 64, <<"float">>]),
|
||||
RL = (5.3 - R),
|
||||
?assert((RL >= 0 andalso RL < 0.0001) orelse (RL =< 0 andalso RL > -0.0001)).
|
||||
|
||||
t_subbits_5_args(_) ->
|
||||
?assertEqual(
|
||||
456,
|
||||
apply_func(subbits, [<<456:32/integer>>, 1, 32, <<"integer">>, <<"unsigned">>])
|
||||
).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Test cases for Hash funcs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -26,13 +26,15 @@
|
|||
-type encoded_data() :: iodata().
|
||||
-type decoded_data() :: map().
|
||||
|
||||
-type serde_type() :: avro | protobuf.
|
||||
-type serde_type() :: avro | protobuf | json.
|
||||
-type serde_opts() :: map().
|
||||
|
||||
-record(serde, {
|
||||
name :: schema_name(),
|
||||
type :: serde_type(),
|
||||
eval_context :: term()
|
||||
eval_context :: term(),
|
||||
%% for future use
|
||||
extra = []
|
||||
}).
|
||||
-type serde() :: #serde{}.
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
{emqx_utils, {path, "../emqx_utils"}},
|
||||
{emqx_rule_engine, {path, "../emqx_rule_engine"}},
|
||||
{erlavro, {git, "https://github.com/emqx/erlavro.git", {tag, "2.10.0"}}},
|
||||
{jesse, {git, "https://github.com/emqx/jesse.git", {tag, "1.7.12"}}},
|
||||
{gpb, "4.19.9"}
|
||||
]}.
|
||||
|
||||
|
|
|
@ -10,7 +10,8 @@
|
|||
kernel,
|
||||
stdlib,
|
||||
erlavro,
|
||||
gpb
|
||||
gpb,
|
||||
jesse
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
|
|
@ -218,7 +218,10 @@ terminate(_Reason, _State) ->
|
|||
%%-------------------------------------------------------------------------------------------------
|
||||
|
||||
create_tables() ->
|
||||
ok = emqx_utils_ets:new(?SERDE_TAB, [public, {keypos, #serde.name}]),
|
||||
ok = emqx_utils_ets:new(?SERDE_TAB, [public, ordered_set, {keypos, #serde.name}]),
|
||||
%% have to create the table for jesse_database otherwise the on-demand table will disappear
|
||||
%% when the caller process dies
|
||||
ok = emqx_utils_ets:new(jesse_ets, [public, ordered_set]),
|
||||
ok = mria:create_table(?PROTOBUF_CACHE_TAB, [
|
||||
{type, set},
|
||||
{rlog_shard, ?SCHEMA_REGISTRY_SHARD},
|
||||
|
@ -312,8 +315,9 @@ ensure_serde_absent(Name) when not is_binary(Name) ->
|
|||
ensure_serde_absent(Name) ->
|
||||
case get_serde(Name) of
|
||||
{ok, Serde} ->
|
||||
ok = emqx_schema_registry_serde:destroy(Serde),
|
||||
_ = ets:delete(?SERDE_TAB, Name),
|
||||
ok = emqx_schema_registry_serde:destroy(Serde);
|
||||
ok;
|
||||
{error, not_found} ->
|
||||
ok
|
||||
end.
|
||||
|
|
|
@ -52,34 +52,48 @@ fields(?CONF_KEY_ROOT) ->
|
|||
];
|
||||
fields(avro) ->
|
||||
[
|
||||
{type, mk(avro, #{required => true, desc => ?DESC("schema_type")})},
|
||||
{source,
|
||||
mk(emqx_schema:json_binary(), #{required => true, desc => ?DESC("schema_source")})},
|
||||
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
|
||||
{type, mk(avro, #{required => true, desc => ?DESC("schema_type_avro")})}
|
||||
| common_fields(emqx_schema:json_binary())
|
||||
];
|
||||
fields(protobuf) ->
|
||||
[
|
||||
{type, mk(protobuf, #{required => true, desc => ?DESC("schema_type")})},
|
||||
{source, mk(binary(), #{required => true, desc => ?DESC("schema_source")})},
|
||||
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
|
||||
{type, mk(protobuf, #{required => true, desc => ?DESC("schema_type_protobuf")})}
|
||||
| common_fields(binary())
|
||||
];
|
||||
fields(json) ->
|
||||
[
|
||||
{type, mk(json, #{required => true, desc => ?DESC("schema_type_json")})}
|
||||
| common_fields(emqx_schema:json_binary())
|
||||
];
|
||||
fields("get_avro") ->
|
||||
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(avro)];
|
||||
fields("get_protobuf") ->
|
||||
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(protobuf)];
|
||||
fields("get_json") ->
|
||||
[{name, mk(binary(), #{required => true, desc => ?DESC("schema_name")})} | fields(json)];
|
||||
fields("put_avro") ->
|
||||
fields(avro);
|
||||
fields("put_protobuf") ->
|
||||
fields(protobuf);
|
||||
fields("put_json") ->
|
||||
fields(json);
|
||||
fields("post_" ++ Type) ->
|
||||
fields("get_" ++ Type).
|
||||
|
||||
common_fields(SourceType) ->
|
||||
[
|
||||
{source, mk(SourceType, #{required => true, desc => ?DESC("schema_source")})},
|
||||
{description, mk(binary(), #{default => <<>>, desc => ?DESC("schema_description")})}
|
||||
].
|
||||
|
||||
desc(?CONF_KEY_ROOT) ->
|
||||
?DESC("schema_registry_root");
|
||||
desc(avro) ->
|
||||
?DESC("avro_type");
|
||||
desc(protobuf) ->
|
||||
?DESC("protobuf_type");
|
||||
desc(json) ->
|
||||
?DESC("json_type");
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
|
@ -121,7 +135,7 @@ mk(Type, Meta) -> hoconsc:mk(Type, Meta).
|
|||
ref(Name) -> hoconsc:ref(?MODULE, Name).
|
||||
|
||||
supported_serde_types() ->
|
||||
[avro, protobuf].
|
||||
[avro, protobuf, json].
|
||||
|
||||
refs() ->
|
||||
[ref(Type) || Type <- supported_serde_types()].
|
||||
|
@ -132,6 +146,8 @@ refs(#{<<"type">> := <<"avro">>}) ->
|
|||
[ref(avro)];
|
||||
refs(#{<<"type">> := <<"protobuf">>}) ->
|
||||
[ref(protobuf)];
|
||||
refs(#{<<"type">> := <<"json">>}) ->
|
||||
[ref(json)];
|
||||
refs(_) ->
|
||||
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
|
||||
throw(#{
|
||||
|
@ -140,7 +156,7 @@ refs(_) ->
|
|||
}).
|
||||
|
||||
refs_get_api() ->
|
||||
[ref("get_avro"), ref("get_protobuf")].
|
||||
[ref("get_avro"), ref("get_protobuf"), ref("get_json")].
|
||||
|
||||
refs_get_api(#{<<"type">> := TypeAtom} = Value) when is_atom(TypeAtom) ->
|
||||
refs(Value#{<<"type">> := atom_to_binary(TypeAtom)});
|
||||
|
@ -148,6 +164,8 @@ refs_get_api(#{<<"type">> := <<"avro">>}) ->
|
|||
[ref("get_avro")];
|
||||
refs_get_api(#{<<"type">> := <<"protobuf">>}) ->
|
||||
[ref("get_protobuf")];
|
||||
refs_get_api(#{<<"type">> := <<"json">>}) ->
|
||||
[ref("get_json")];
|
||||
refs_get_api(_) ->
|
||||
Expected = lists:join(" | ", [atom_to_list(T) || T <- supported_serde_types()]),
|
||||
throw(#{
|
||||
|
|
|
@ -11,20 +11,32 @@
|
|||
|
||||
%% API
|
||||
-export([
|
||||
decode/2,
|
||||
decode/3,
|
||||
encode/2,
|
||||
encode/3,
|
||||
make_serde/3,
|
||||
handle_rule_function/2,
|
||||
destroy/1
|
||||
]).
|
||||
|
||||
%% Tests
|
||||
-export([
|
||||
decode/2,
|
||||
decode/3,
|
||||
encode/2,
|
||||
encode/3,
|
||||
eval_decode/2,
|
||||
eval_encode/2
|
||||
]).
|
||||
|
||||
-define(BOOL(SerdeName, EXPR),
|
||||
try
|
||||
_ = EXPR,
|
||||
true
|
||||
catch
|
||||
error:Reason ->
|
||||
?SLOG(debug, #{msg => "schema_check_failed", schema => SerdeName, reason => Reason}),
|
||||
false
|
||||
end
|
||||
).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% API
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -40,10 +52,6 @@ handle_rule_function(sparkplug_decode, [Data | MoreArgs]) ->
|
|||
schema_decode,
|
||||
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Data | MoreArgs]
|
||||
);
|
||||
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
|
||||
decode(SchemaId, Data, MoreArgs);
|
||||
handle_rule_function(schema_decode, Args) ->
|
||||
error({args_count_error, {schema_decode, Args}});
|
||||
handle_rule_function(sparkplug_encode, [Term]) ->
|
||||
handle_rule_function(
|
||||
schema_encode,
|
||||
|
@ -54,6 +62,10 @@ handle_rule_function(sparkplug_encode, [Term | MoreArgs]) ->
|
|||
schema_encode,
|
||||
[?EMQX_SCHEMA_REGISTRY_SPARKPLUGB_SCHEMA_NAME, Term | MoreArgs]
|
||||
);
|
||||
handle_rule_function(schema_decode, [SchemaId, Data | MoreArgs]) ->
|
||||
decode(SchemaId, Data, MoreArgs);
|
||||
handle_rule_function(schema_decode, Args) ->
|
||||
error({args_count_error, {schema_decode, Args}});
|
||||
handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
|
||||
%% encode outputs iolists, but when the rule actions process those
|
||||
%% it might wrongly encode them as JSON lists, so we force them to
|
||||
|
@ -62,33 +74,56 @@ handle_rule_function(schema_encode, [SchemaId, Term | MoreArgs]) ->
|
|||
iolist_to_binary(IOList);
|
||||
handle_rule_function(schema_encode, Args) ->
|
||||
error({args_count_error, {schema_encode, Args}});
|
||||
handle_rule_function(schema_check, [SchemaId, Data | MoreArgs]) ->
|
||||
schema_check(SchemaId, Data, MoreArgs);
|
||||
handle_rule_function(_, _) ->
|
||||
{error, no_match_for_function}.
|
||||
|
||||
-spec schema_check(schema_name(), decoded_data() | encoded_data(), [term()]) -> decoded_data().
|
||||
schema_check(SerdeName, Data, VarArgs) when is_list(VarArgs), is_binary(Data) ->
|
||||
with_serde(
|
||||
SerdeName,
|
||||
fun(Serde) ->
|
||||
?BOOL(SerdeName, eval_decode(Serde, [Data | VarArgs]))
|
||||
end
|
||||
);
|
||||
schema_check(SerdeName, Data, VarArgs) when is_list(VarArgs), is_map(Data) ->
|
||||
with_serde(
|
||||
SerdeName,
|
||||
fun(Serde) ->
|
||||
?BOOL(SerdeName, eval_encode(Serde, [Data | VarArgs]))
|
||||
end
|
||||
).
|
||||
|
||||
-spec decode(schema_name(), encoded_data()) -> decoded_data().
|
||||
decode(SerdeName, RawData) ->
|
||||
decode(SerdeName, RawData, []).
|
||||
|
||||
-spec decode(schema_name(), encoded_data(), [term()]) -> decoded_data().
|
||||
decode(SerdeName, RawData, VarArgs) when is_list(VarArgs) ->
|
||||
case emqx_schema_registry:get_serde(SerdeName) of
|
||||
{error, not_found} ->
|
||||
error({serde_not_found, SerdeName});
|
||||
{ok, Serde} ->
|
||||
eval_decode(Serde, [RawData | VarArgs])
|
||||
end.
|
||||
with_serde(SerdeName, fun(Serde) ->
|
||||
eval_decode(Serde, [RawData | VarArgs])
|
||||
end).
|
||||
|
||||
-spec encode(schema_name(), decoded_data()) -> encoded_data().
|
||||
encode(SerdeName, RawData) ->
|
||||
encode(SerdeName, RawData, []).
|
||||
|
||||
-spec encode(schema_name(), decoded_data(), [term()]) -> encoded_data().
|
||||
encode(SerdeName, EncodedData, VarArgs) when is_list(VarArgs) ->
|
||||
case emqx_schema_registry:get_serde(SerdeName) of
|
||||
{error, not_found} ->
|
||||
error({serde_not_found, SerdeName});
|
||||
encode(SerdeName, Data, VarArgs) when is_list(VarArgs) ->
|
||||
with_serde(
|
||||
SerdeName,
|
||||
fun(Serde) ->
|
||||
eval_encode(Serde, [Data | VarArgs])
|
||||
end
|
||||
).
|
||||
|
||||
with_serde(Name, F) ->
|
||||
case emqx_schema_registry:get_serde(Name) of
|
||||
{ok, Serde} ->
|
||||
eval_encode(Serde, [EncodedData | VarArgs])
|
||||
F(Serde);
|
||||
{error, not_found} ->
|
||||
error({serde_not_found, Name})
|
||||
end.
|
||||
|
||||
-spec make_serde(serde_type(), schema_name(), schema_source()) -> serde().
|
||||
|
@ -108,7 +143,17 @@ make_serde(protobuf, Name, Source) ->
|
|||
name = Name,
|
||||
type = protobuf,
|
||||
eval_context = SerdeMod
|
||||
}.
|
||||
};
|
||||
make_serde(json, Name, Source) ->
|
||||
case json_decode(Source) of
|
||||
SchemaObj when is_map(SchemaObj) ->
|
||||
%% jesse:add_schema adds any map() without further validation
|
||||
%% if it's not a map, then case_clause
|
||||
ok = jesse_add_schema(Name, SchemaObj),
|
||||
#serde{name = Name, type = json};
|
||||
_NotMap ->
|
||||
error({invalid_json_schema, bad_schema_object})
|
||||
end.
|
||||
|
||||
eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
|
||||
Opts = avro:make_decoder_options([{map_type, map}, {record_type, map}]),
|
||||
|
@ -116,14 +161,29 @@ eval_decode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
|
|||
eval_decode(#serde{type = protobuf, eval_context = SerdeMod}, [EncodedData, MessageName0]) ->
|
||||
MessageName = binary_to_existing_atom(MessageName0, utf8),
|
||||
Decoded = apply(SerdeMod, decode_msg, [EncodedData, MessageName]),
|
||||
emqx_utils_maps:binary_key_map(Decoded).
|
||||
emqx_utils_maps:binary_key_map(Decoded);
|
||||
eval_decode(#serde{type = json, name = Name}, [Data]) ->
|
||||
true = is_binary(Data),
|
||||
Term = json_decode(Data),
|
||||
{ok, NewTerm} = jesse_validate(Name, Term),
|
||||
NewTerm.
|
||||
|
||||
eval_encode(#serde{type = avro, name = Name, eval_context = Store}, [Data]) ->
|
||||
avro_binary_encoder:encode(Store, Name, Data);
|
||||
eval_encode(#serde{type = protobuf, eval_context = SerdeMod}, [DecodedData0, MessageName0]) ->
|
||||
DecodedData = emqx_utils_maps:safe_atom_key_map(DecodedData0),
|
||||
MessageName = binary_to_existing_atom(MessageName0, utf8),
|
||||
apply(SerdeMod, encode_msg, [DecodedData, MessageName]).
|
||||
apply(SerdeMod, encode_msg, [DecodedData, MessageName]);
|
||||
eval_encode(#serde{type = json, name = Name}, [Map]) ->
|
||||
%% The input Map may not be a valid JSON term for jesse
|
||||
Data = iolist_to_binary(emqx_utils_json:encode(Map)),
|
||||
NewMap = json_decode(Data),
|
||||
case jesse_validate(Name, NewMap) of
|
||||
{ok, _} ->
|
||||
Data;
|
||||
{error, Reason} ->
|
||||
error(Reason)
|
||||
end.
|
||||
|
||||
destroy(#serde{type = avro, name = _Name}) ->
|
||||
?tp(serde_destroyed, #{type => avro, name => _Name}),
|
||||
|
@ -131,12 +191,31 @@ destroy(#serde{type = avro, name = _Name}) ->
|
|||
destroy(#serde{type = protobuf, name = _Name, eval_context = SerdeMod}) ->
|
||||
unload_code(SerdeMod),
|
||||
?tp(serde_destroyed, #{type => protobuf, name => _Name}),
|
||||
ok;
|
||||
destroy(#serde{type = json, name = Name}) ->
|
||||
ok = jesse_del_schema(Name),
|
||||
?tp(serde_destroyed, #{type => json, name => Name}),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
json_decode(Data) ->
|
||||
emqx_utils_json:decode(Data, [return_maps]).
|
||||
|
||||
jesse_add_schema(Name, Obj) ->
|
||||
jesse:add_schema(jesse_name(Name), Obj).
|
||||
|
||||
jesse_del_schema(Name) ->
|
||||
jesse:del_schema(jesse_name(Name)).
|
||||
|
||||
jesse_validate(Name, Map) ->
|
||||
jesse:validate(jesse_name(Name), Map, []).
|
||||
|
||||
jesse_name(Str) ->
|
||||
unicode:characters_to_list(Str).
|
||||
|
||||
-spec make_protobuf_serde_mod(schema_name(), schema_source()) -> module().
|
||||
make_protobuf_serde_mod(Name, Source) ->
|
||||
{SerdeMod0, SerdeModFileName} = protobuf_serde_mod_name(Name),
|
||||
|
|
|
@ -23,14 +23,15 @@
|
|||
all() ->
|
||||
[
|
||||
{group, avro},
|
||||
{group, protobuf}
|
||||
{group, protobuf},
|
||||
{group, json}
|
||||
] ++ sparkplug_tests().
|
||||
|
||||
groups() ->
|
||||
AllTCsExceptSP = emqx_common_test_helpers:all(?MODULE) -- sparkplug_tests(),
|
||||
ProtobufOnlyTCs = protobuf_only_tcs(),
|
||||
TCs = AllTCsExceptSP -- ProtobufOnlyTCs,
|
||||
[{avro, TCs}, {protobuf, AllTCsExceptSP}].
|
||||
[{avro, TCs}, {json, TCs}, {protobuf, AllTCsExceptSP}].
|
||||
|
||||
protobuf_only_tcs() ->
|
||||
[
|
||||
|
@ -57,6 +58,8 @@ end_per_suite(_Config) ->
|
|||
|
||||
init_per_group(avro, Config) ->
|
||||
[{serde_type, avro} | Config];
|
||||
init_per_group(json, Config) ->
|
||||
[{serde_type, json} | Config];
|
||||
init_per_group(protobuf, Config) ->
|
||||
[{serde_type, protobuf} | Config];
|
||||
init_per_group(_Group, Config) ->
|
||||
|
@ -140,6 +143,18 @@ schema_params(avro) ->
|
|||
},
|
||||
SourceBin = emqx_utils_json:encode(Source),
|
||||
#{type => avro, source => SourceBin};
|
||||
schema_params(json) ->
|
||||
Source =
|
||||
#{
|
||||
type => object,
|
||||
properties => #{
|
||||
i => #{type => integer},
|
||||
s => #{type => string}
|
||||
},
|
||||
required => [<<"i">>, <<"s">>]
|
||||
},
|
||||
SourceBin = emqx_utils_json:encode(Source),
|
||||
#{type => json, source => SourceBin};
|
||||
schema_params(protobuf) ->
|
||||
SourceBin =
|
||||
<<
|
||||
|
@ -162,7 +177,7 @@ create_serde(SerdeType, SerdeName) ->
|
|||
ok = emqx_schema_registry:add_schema(SerdeName, Schema),
|
||||
ok.
|
||||
|
||||
test_params_for(avro, encode_decode1) ->
|
||||
test_params_for(Type, encode_decode1) when Type =:= avro; Type =:= json ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
|
@ -186,7 +201,7 @@ test_params_for(avro, encode_decode1) ->
|
|||
expected_rule_output => ExpectedRuleOutput,
|
||||
extra_args => ExtraArgs
|
||||
};
|
||||
test_params_for(avro, encode1) ->
|
||||
test_params_for(Type, encode1) when Type =:= avro; Type =:= json ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
|
@ -202,7 +217,7 @@ test_params_for(avro, encode1) ->
|
|||
payload_template => PayloadTemplate,
|
||||
extra_args => ExtraArgs
|
||||
};
|
||||
test_params_for(avro, decode1) ->
|
||||
test_params_for(Type, decode1) when Type =:= avro; Type =:= json ->
|
||||
SQL =
|
||||
<<
|
||||
"select\n"
|
||||
|
@ -503,13 +518,18 @@ t_encode(Config) ->
|
|||
PayloadBin = emqx_utils_json:encode(Payload),
|
||||
emqx:publish(emqx_message:make(<<"t">>, PayloadBin)),
|
||||
Published = receive_published(?LINE),
|
||||
?assertMatch(
|
||||
#{payload := P} when is_binary(P),
|
||||
Published
|
||||
),
|
||||
#{payload := Encoded} = Published,
|
||||
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
|
||||
?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs])),
|
||||
case SerdeType of
|
||||
json ->
|
||||
%% should have received binary
|
||||
%% but since it's valid json, so it got
|
||||
%% 'safe_decode' decoded in receive_published
|
||||
?assertMatch(#{payload := #{<<"i">> := _, <<"s">> := _}}, Published);
|
||||
_ ->
|
||||
?assertMatch(#{payload := B} when is_binary(B), Published),
|
||||
#{payload := Encoded} = Published,
|
||||
{ok, Serde} = emqx_schema_registry:get_serde(SerdeName),
|
||||
?assertEqual(Payload, eval_decode(Serde, [Encoded | ExtraArgs]))
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_decode(Config) ->
|
||||
|
@ -607,8 +627,13 @@ t_protobuf_union_decode(Config) ->
|
|||
t_fail_rollback(Config) ->
|
||||
SerdeType = ?config(serde_type, Config),
|
||||
OkSchema = emqx_utils_maps:binary_key_map(schema_params(SerdeType)),
|
||||
BrokenSchema = OkSchema#{<<"source">> := <<"{}">>},
|
||||
|
||||
BrokenSchema =
|
||||
case SerdeType of
|
||||
json ->
|
||||
OkSchema#{<<"source">> := <<"not a json value">>};
|
||||
_ ->
|
||||
OkSchema#{<<"source">> := <<"{}">>}
|
||||
end,
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_conf:update(
|
||||
|
|
|
@ -23,14 +23,16 @@
|
|||
all() ->
|
||||
[
|
||||
{group, avro},
|
||||
{group, protobuf}
|
||||
{group, protobuf},
|
||||
{group, json}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
AllTCs = emqx_common_test_helpers:all(?MODULE),
|
||||
[
|
||||
{avro, AllTCs},
|
||||
{protobuf, AllTCs}
|
||||
{protobuf, AllTCs},
|
||||
{json, AllTCs}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
|
@ -80,6 +82,23 @@ init_per_group(protobuf, Config) ->
|
|||
{schema_source, SourceBin},
|
||||
{invalid_schema_source, InvalidSourceBin}
|
||||
| Config
|
||||
];
|
||||
init_per_group(json, Config) ->
|
||||
Source =
|
||||
#{
|
||||
properties => #{
|
||||
foo => #{},
|
||||
bar => #{}
|
||||
},
|
||||
required => [<<"foo">>]
|
||||
},
|
||||
SourceBin = emqx_utils_json:encode(Source),
|
||||
InvalidSourceBin = <<"\"not an object\"">>,
|
||||
[
|
||||
{serde_type, json},
|
||||
{schema_source, SourceBin},
|
||||
{invalid_schema_source, InvalidSourceBin}
|
||||
| Config
|
||||
].
|
||||
|
||||
end_per_group(_Group, _Config) ->
|
||||
|
@ -279,7 +298,7 @@ t_crud(Config) ->
|
|||
<<"code">> := <<"BAD_REQUEST">>,
|
||||
<<"message">> :=
|
||||
#{
|
||||
<<"expected">> := <<"avro | protobuf">>,
|
||||
<<"expected">> := <<"avro | protobuf | json">>,
|
||||
<<"field_name">> := <<"type">>
|
||||
}
|
||||
}},
|
||||
|
@ -302,7 +321,7 @@ t_crud(Config) ->
|
|||
<<"code">> := <<"BAD_REQUEST">>,
|
||||
<<"message">> :=
|
||||
#{
|
||||
<<"expected">> := <<"avro | protobuf">>,
|
||||
<<"expected">> := <<"avro | protobuf | json">>,
|
||||
<<"field_name">> := <<"type">>
|
||||
}
|
||||
}},
|
||||
|
|
|
@ -15,6 +15,10 @@
|
|||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
-define(APPS, [emqx_conf, emqx_rule_engine, emqx_schema_registry]).
|
||||
-define(INVALID_JSON, #{
|
||||
reason := #{expected := "emqx_schema:json_binary()"},
|
||||
kind := validation_error
|
||||
}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
|
@ -79,7 +83,21 @@ schema_params(protobuf) ->
|
|||
" }\n"
|
||||
" "
|
||||
>>,
|
||||
#{type => protobuf, source => SourceBin}.
|
||||
#{type => protobuf, source => SourceBin};
|
||||
schema_params(json) ->
|
||||
Source =
|
||||
#{
|
||||
<<"$schema">> => <<"http://json-schema.org/draft-06/schema#">>,
|
||||
<<"$id">> => <<"http://json-schema.org/draft-06/schema#">>,
|
||||
type => object,
|
||||
properties => #{
|
||||
foo => #{type => integer},
|
||||
bar => #{type => integer}
|
||||
},
|
||||
required => [<<"foo">>]
|
||||
},
|
||||
SourceBin = emqx_utils_json:encode(Source),
|
||||
#{type => json, source => SourceBin}.
|
||||
|
||||
assert_roundtrip(SerdeName, Original) ->
|
||||
Encoded = emqx_schema_registry_serde:encode(SerdeName, Original),
|
||||
|
@ -109,10 +127,7 @@ t_avro_invalid_json_schema(_Config) ->
|
|||
SerdeName = my_serde,
|
||||
Params = schema_params(avro),
|
||||
WrongParams = Params#{source := <<"{">>},
|
||||
?assertMatch(
|
||||
{error, #{reason := #{expected := _}}},
|
||||
emqx_schema_registry:add_schema(SerdeName, WrongParams)
|
||||
),
|
||||
?assertMatch({error, ?INVALID_JSON}, emqx_schema_registry:add_schema(SerdeName, WrongParams)),
|
||||
ok.
|
||||
|
||||
t_avro_invalid_schema(_Config) ->
|
||||
|
@ -128,14 +143,27 @@ t_avro_invalid_schema(_Config) ->
|
|||
t_serde_not_found(_Config) ->
|
||||
%% for coverage
|
||||
NonexistentSerde = <<"nonexistent">>,
|
||||
Original = #{},
|
||||
EncodeData = #{},
|
||||
DecodeData = <<"data">>,
|
||||
?assertError(
|
||||
{serde_not_found, NonexistentSerde},
|
||||
emqx_schema_registry_serde:encode(NonexistentSerde, Original)
|
||||
emqx_schema_registry_serde:encode(NonexistentSerde, EncodeData)
|
||||
),
|
||||
?assertError(
|
||||
{serde_not_found, NonexistentSerde},
|
||||
emqx_schema_registry_serde:decode(NonexistentSerde, Original)
|
||||
emqx_schema_registry_serde:decode(NonexistentSerde, DecodeData)
|
||||
),
|
||||
?assertError(
|
||||
{serde_not_found, NonexistentSerde},
|
||||
emqx_schema_registry_serde:handle_rule_function(schema_check, [
|
||||
NonexistentSerde, EncodeData
|
||||
])
|
||||
),
|
||||
?assertError(
|
||||
{serde_not_found, NonexistentSerde},
|
||||
emqx_schema_registry_serde:handle_rule_function(schema_check, [
|
||||
NonexistentSerde, DecodeData
|
||||
])
|
||||
),
|
||||
ok.
|
||||
|
||||
|
@ -171,3 +199,44 @@ t_protobuf_invalid_schema(_Config) ->
|
|||
emqx_schema_registry:add_schema(SerdeName, WrongParams)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_json_invalid_schema(_Config) ->
|
||||
SerdeName = invalid_json,
|
||||
Params = schema_params(json),
|
||||
BadParams1 = Params#{source := <<"not valid json value">>},
|
||||
BadParams2 = Params#{source := <<"\"not an object\"">>},
|
||||
BadParams3 = Params#{source := <<"{\"foo\": 1}">>},
|
||||
?assertMatch({error, ?INVALID_JSON}, emqx_schema_registry:add_schema(SerdeName, BadParams1)),
|
||||
?assertMatch(
|
||||
{error, {post_config_update, _, {invalid_json_schema, bad_schema_object}}},
|
||||
emqx_schema_registry:add_schema(SerdeName, BadParams2)
|
||||
),
|
||||
?assertMatch(
|
||||
ok,
|
||||
emqx_schema_registry:add_schema(SerdeName, BadParams3)
|
||||
),
|
||||
ok.
|
||||
|
||||
t_roundtrip_json(_Config) ->
|
||||
SerdeName = my_json_schema,
|
||||
Params = schema_params(json),
|
||||
ok = emqx_schema_registry:add_schema(SerdeName, Params),
|
||||
Original = #{<<"foo">> => 1, <<"bar">> => 2},
|
||||
assert_roundtrip(SerdeName, Original),
|
||||
ok.
|
||||
|
||||
t_json_validation(_Config) ->
|
||||
SerdeName = my_json_schema,
|
||||
Params = schema_params(json),
|
||||
ok = emqx_schema_registry:add_schema(SerdeName, Params),
|
||||
F = fun(Fn, Data) ->
|
||||
emqx_schema_registry_serde:handle_rule_function(Fn, [SerdeName, Data])
|
||||
end,
|
||||
OK = #{<<"foo">> => 1, <<"bar">> => 2},
|
||||
NotOk = #{<<"bar">> => 2},
|
||||
?assert(F(schema_check, OK)),
|
||||
?assert(F(schema_check, <<"{\"foo\": 1, \"bar\": 2}">>)),
|
||||
?assertNot(F(schema_check, NotOk)),
|
||||
?assertNot(F(schema_check, <<"{\"bar\": 2}">>)),
|
||||
?assertNot(F(schema_check, <<"{\"foo\": \"notinteger\", \"bar\": 2}">>)),
|
||||
ok.
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
{application, emqx_utils, [
|
||||
{description, "Miscellaneous utilities for EMQX apps"},
|
||||
% strict semver, bump manually!
|
||||
{vsn, "5.0.16"},
|
||||
{vsn, "5.1.0"},
|
||||
{modules, [
|
||||
emqx_utils,
|
||||
emqx_utils_api,
|
||||
|
|
20
build
20
build
|
@ -470,10 +470,8 @@ make_docker() {
|
|||
)
|
||||
:> ./.emqx_docker_image_tags
|
||||
for r in "${DOCKER_REGISTRIES[@]}"; do
|
||||
if ! is_ecr_and_enterprise "$r" "$PROFILE"; then
|
||||
DOCKER_BUILDX_ARGS+=(--tag "$r/${EMQX_IMAGE_TAG}")
|
||||
echo "$r/${EMQX_IMAGE_TAG}" >> ./.emqx_docker_image_tags
|
||||
fi
|
||||
DOCKER_BUILDX_ARGS+=(--tag "$r/${EMQX_IMAGE_TAG}")
|
||||
echo "$r/${EMQX_IMAGE_TAG}" >> ./.emqx_docker_image_tags
|
||||
done
|
||||
if [ "${DOCKER_BUILD_NOCACHE:-false}" = true ]; then
|
||||
DOCKER_BUILDX_ARGS+=(--no-cache)
|
||||
|
@ -483,14 +481,12 @@ make_docker() {
|
|||
fi
|
||||
if [ "${DOCKER_LATEST:-false}" = true ]; then
|
||||
for r in "${DOCKER_REGISTRIES[@]}"; do
|
||||
if ! is_ecr_and_enterprise "$r" "$PROFILE"; then
|
||||
DOCKER_BUILDX_ARGS+=(--tag "$r/${EMQX_BASE_DOCKER_TAG}:latest${SUFFIX}")
|
||||
echo "$r/${EMQX_BASE_DOCKER_TAG}:latest${SUFFIX}" >> ./.emqx_docker_image_tags
|
||||
DOCKER_BUILDX_ARGS+=(--tag "$r/${EMQX_BASE_DOCKER_TAG}:${VSN_MAJOR}.${VSN_MINOR}${SUFFIX}")
|
||||
echo "$r/${EMQX_BASE_DOCKER_TAG}:${VSN_MAJOR}.${VSN_MINOR}${SUFFIX}" >> ./.emqx_docker_image_tags
|
||||
DOCKER_BUILDX_ARGS+=(--tag "$r/${EMQX_BASE_DOCKER_TAG}:${VSN_MAJOR}.${VSN_MINOR}.${VSN_PATCH}${SUFFIX}")
|
||||
echo "$r/${EMQX_BASE_DOCKER_TAG}:${VSN_MAJOR}.${VSN_MINOR}.${VSN_PATCH}${SUFFIX}" >> ./.emqx_docker_image_tags
|
||||
fi
|
||||
DOCKER_BUILDX_ARGS+=(--tag "$r/${EMQX_BASE_DOCKER_TAG}:latest${SUFFIX}")
|
||||
echo "$r/${EMQX_BASE_DOCKER_TAG}:latest${SUFFIX}" >> ./.emqx_docker_image_tags
|
||||
DOCKER_BUILDX_ARGS+=(--tag "$r/${EMQX_BASE_DOCKER_TAG}:${VSN_MAJOR}.${VSN_MINOR}${SUFFIX}")
|
||||
echo "$r/${EMQX_BASE_DOCKER_TAG}:${VSN_MAJOR}.${VSN_MINOR}${SUFFIX}" >> ./.emqx_docker_image_tags
|
||||
DOCKER_BUILDX_ARGS+=(--tag "$r/${EMQX_BASE_DOCKER_TAG}:${VSN_MAJOR}.${VSN_MINOR}.${VSN_PATCH}${SUFFIX}")
|
||||
echo "$r/${EMQX_BASE_DOCKER_TAG}:${VSN_MAJOR}.${VSN_MINOR}.${VSN_PATCH}${SUFFIX}" >> ./.emqx_docker_image_tags
|
||||
done
|
||||
fi
|
||||
if [ "${DOCKER_PLATFORMS:-default}" != 'default' ]; then
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
Add a new configuration root: `durable_storage`.
|
||||
|
||||
This configuration tree contains the settings related to the new persistent session feature.
|
|
@ -0,0 +1 @@
|
|||
The subbits functions with 4 and 5 parameters are documented but did not exist in the implementation. These functions have now been added.
|
|
@ -8,8 +8,6 @@
|
|||
|
||||
- [#12471](https://github.com/emqx/emqx/pull/12471) Fixed an issue that data integration configurations failed to load correctly during upgrades from EMQX version 5.0.2 to newer releases.
|
||||
|
||||
- [#12542](https://github.com/emqx/emqx/pull/12542) Redacted authorization headers to exclude basic authorization credentials from debug logs in the HTTP Server connector, mitigating potential security risks.
|
||||
|
||||
- [#12598](https://github.com/emqx/emqx/pull/12598) Fixed an issue that users were unable to subscribe to or unsubscribe from shared topic filters via HTTP API.
|
||||
|
||||
The affected APIs include:
|
||||
|
@ -24,6 +22,10 @@
|
|||
|
||||
- [#12606](https://github.com/emqx/emqx/pull/12606) The Prometheus API experienced crashes when the specified SSL certificate file did not exist in the given path. Now, when an SSL certificate file is missing, the `emqx_cert_expiry_at` metric will report a value of 0, indicating the non-existence of the certificate.
|
||||
|
||||
- [#12620](https://github.com/emqx/emqx/pull/12620) Redacted sensitive information in HTTP headers to exclude authentication and authorization credentials from `debug` level logs in the HTTP Server connector, mitigating potential security risks.
|
||||
|
||||
- [#12632](https://github.com/emqx/emqx/pull/12632) Fixed an issue where the rule engine's SQL built-in function `date_to_unix_ts` produced incorrect results for dates starting from March 1st on leap years.
|
||||
|
||||
- [#12608](https://github.com/emqx/emqx/pull/12608) Fixed a `function_clause` error in the IoTDB action caused by the absence of a `payload` field in query data.
|
||||
|
||||
- [#12610](https://github.com/emqx/emqx/pull/12610) Fixed an issue where connections to the LDAP connector could unexpectedly disconnect after a certain period of time.
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
Add JSON schema to schema registry.
|
||||
|
||||
JSON Schema supports [Draft 03](http://tools.ietf.org/html/draft-zyp-json-schema-03), [Draft 04](http://tools.ietf.org/html/draft-zyp-json-schema-04) and [Draft 06](https://datatracker.ietf.org/doc/html/draft-wright-json-schema-00).
|
|
@ -4,8 +4,6 @@
|
|||
|
||||
- [#12471](https://github.com/emqx/emqx/pull/12471) Fixed an issue that data integration configurations failed to load correctly during upgrades from EMQX version 5.0.2 to newer releases.
|
||||
|
||||
- [#12542](https://github.com/emqx/emqx/pull/12542) Redacted authorization headers to exclude basic authorization credentials from debug logs in the HTTP Server connector, mitigating potential security risks.
|
||||
|
||||
- [#12598](https://github.com/emqx/emqx/pull/12598) Fixed an issue that users were unable to subscribe to or unsubscribe from shared topic filters via HTTP API.
|
||||
|
||||
The affected APIs include:
|
||||
|
@ -19,3 +17,8 @@
|
|||
- [#12601](https://github.com/emqx/emqx/pull/12601) Fixed an issue where logs of the LDAP driver were not being captured. Now, all logs are recorded at the `info` level.
|
||||
|
||||
- [#12606](https://github.com/emqx/emqx/pull/12606) The Prometheus API experienced crashes when the specified SSL certificate file did not exist in the given path. Now, when an SSL certificate file is missing, the `emqx_cert_expiry_at` metric will report a value of 0, indicating the non-existence of the certificate.
|
||||
|
||||
- [#12620](https://github.com/emqx/emqx/pull/12620) Redacted sensitive information in HTTP headers to exclude authentication and authorization credentials from `debug` level logs in the HTTP Server connector, mitigating potential security risks.
|
||||
|
||||
- [#12632](https://github.com/emqx/emqx/pull/12632) Fixed an issue where the rule engine's SQL built-in function `date_to_unix_ts` produced incorrect results for dates starting from March 1st on leap years.
|
||||
|
||||
|
|
|
@ -50,8 +50,7 @@ deps(Config) ->
|
|||
|
||||
overrides() ->
|
||||
[
|
||||
{add, [{extra_src_dirs, [{"etc", [{recursive, true}]}]}]},
|
||||
{add, jesse, [{erl_opts, [nowarn_match_float_zero]}]}
|
||||
{add, [{extra_src_dirs, [{"etc", [{recursive, true}]}]}]}
|
||||
] ++ snabbkaffe_overrides().
|
||||
|
||||
%% Temporary workaround for a rebar3 erl_opts duplication
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
emqx_ds_schema {
|
||||
|
||||
messages.label: "MQTT message storage"
|
||||
messages.desc:
|
||||
"""~
|
||||
Configuration related to the durable storage of MQTT messages.~"""
|
||||
|
||||
builtin.label: "Builtin backend"
|
||||
builtin.desc:
|
||||
"""~
|
||||
Builtin session storage backend utilizing embedded RocksDB key-value store.~"""
|
||||
|
||||
builtin_backend.label: "Backend type"
|
||||
builtin_backend.desc:
|
||||
"""~
|
||||
Built-in backend.~"""
|
||||
|
||||
builtin_data_dir.label: "Database location"
|
||||
builtin_data_dir.desc:
|
||||
"""~
|
||||
File system directory where the database is located.
|
||||
|
||||
By default, it is equal to `node.data_dir`.~"""
|
||||
|
||||
builtin_n_shards.label: "Number of shards"
|
||||
builtin_n_shards.desc:
|
||||
"""~
|
||||
The built-in durable storage partitions data into shards.
|
||||
This configuration parameter defines the number of shards.
|
||||
Please note that it takes effect only during the initialization of the durable storage database.
|
||||
Changing this configuration parameter after the database has been already created won't take any effect.~"""
|
||||
|
||||
builtin_local_write_buffer.label: "Local write buffer"
|
||||
builtin_local_write_buffer.desc:
|
||||
"""~
|
||||
Configuration related to the buffering of messages sent from the local node to the shard leader.
|
||||
|
||||
EMQX accumulates PUBLISH messages from the local clients in a write buffer before committing them to the durable storage.
|
||||
This helps to hide network latency between EMQX nodes and improves write throughput.~"""
|
||||
|
||||
builtin_local_write_buffer_max_items.label: "Max items"
|
||||
builtin_local_write_buffer_max_items.desc:
|
||||
"""~
|
||||
This configuration parameter defines maximum number of messages stored in the local write buffer.~"""
|
||||
|
||||
builtin_local_write_buffer_flush_interval.label: "Flush interval"
|
||||
builtin_local_write_buffer_flush_interval.desc:
|
||||
"""~
|
||||
Maximum linger time for the buffered messages.
|
||||
Local write buffer will be flushed _at least_ as often as `flush_interval`.
|
||||
|
||||
Larger values of `flush_interval` may lead to higher throughput and better overall performance, but may increase end-to-end latency.~"""
|
||||
|
||||
builtin_layout.label: "Storage layout"
|
||||
builtin_layout.desc:
|
||||
"""~
|
||||
Storage layout is a method of arranging messages from various topics and clients on disc.
|
||||
|
||||
Depending on the type of workload and the topic structure, different types of strategies for storing the data can be employed to maximize efficiency of reading messages from the durable storage.~"""
|
||||
|
||||
layout_builtin_wildcard_optimized.label: "Wildcard-optimized storage layout"
|
||||
layout_builtin_wildcard_optimized.desc:
|
||||
"""~
|
||||
_Wildcard-optimized_ layout is designed to maximize the throughput of wildcard subscriptions covering large numbers of topics.
|
||||
|
||||
For example, it can handle scenarios where a very large number of clients publish data to the topics containing their client ID, such as: `sensor/%device-version%/%clientid%/temperature`, `sensor/%device-version%/%clientid%/pressure`, etc.
|
||||
This layout will automatically group such topics into a single stream, so a client subscribing to a topic filter containing wildcards (such as `sensor/+/+/temperature`) will be able to consume messages published by all devices as a single batch.
|
||||
|
||||
This layout is efficient for non-wildcard subscriptions as well.~"""
|
||||
|
||||
layout_builtin_wildcard_optimized_type.label: "Layout type"
|
||||
layout_builtin_wildcard_optimized_type.desc:
|
||||
"""~
|
||||
Wildcard-optimized layout type.~"""
|
||||
|
||||
wildcard_optimized_epoch_bits.label: "Epoch bits"
|
||||
wildcard_optimized_epoch_bits.desc:
|
||||
"""~
|
||||
Wildcard-optimized layout partitions messages recorded at different times into "epochs".
|
||||
Reading messages from a single epoch can be done very efficiently, so larger epochs improve the throughput of subscribers, but may increase end-to-end latency.
|
||||
|
||||
Time span covered by each epoch grows exponentially with the value of `epoch_bits`:
|
||||
|
||||
- `epoch_bits = 1`: epoch time = 1 millisecond
|
||||
- `epoch_bits = 2`: 2 milliseconds
|
||||
...
|
||||
- `epoch_bits = 10`: 1024 milliseconds
|
||||
- `epoch_bits = 13`: ~8 seconds
|
||||
...~"""
|
||||
|
||||
}
|
|
@ -148,12 +148,6 @@ mqtt_max_subscriptions.desc:
|
|||
mqtt_max_subscriptions.label:
|
||||
"""Max Subscriptions"""
|
||||
|
||||
persistent_session_builtin_messages_table.desc:
|
||||
"""Performance tuning options for built-in messages table."""
|
||||
|
||||
persistent_session_builtin_messages_table.label:
|
||||
"""Persistent messages"""
|
||||
|
||||
sysmon_os_cpu_low_watermark.desc:
|
||||
"""The threshold, as percentage of system CPU load,
|
||||
for how much system cpu can be used before the corresponding alarm is cleared. Disabled on Windows platform"""
|
||||
|
@ -370,12 +364,6 @@ sysmon_top_num_items.desc:
|
|||
sysmon_top_num_items.label:
|
||||
"""Top num items"""
|
||||
|
||||
persistent_session_builtin_session_table.desc:
|
||||
"""Performance tuning options for built-in session table."""
|
||||
|
||||
persistent_session_builtin_session_table.label:
|
||||
"""Persistent session"""
|
||||
|
||||
mqtt_upgrade_qos.desc:
|
||||
"""Force upgrade of QoS level according to subscription."""
|
||||
|
||||
|
@ -518,14 +506,6 @@ mqtt_max_inflight.desc:
|
|||
mqtt_max_inflight.label:
|
||||
"""Max Inflight"""
|
||||
|
||||
persistent_session_store_enabled.desc:
|
||||
"""Use the database to store information about persistent sessions.
|
||||
This makes it possible to migrate a client connection to another
|
||||
cluster node if a node is stopped."""
|
||||
|
||||
persistent_session_store_enabled.label:
|
||||
"""Enable persistent session store"""
|
||||
|
||||
fields_deflate_opts_level.desc:
|
||||
"""Compression level."""
|
||||
|
||||
|
@ -544,14 +524,6 @@ fields_mqtt_quic_listener_load_balancing_mode.desc:
|
|||
fields_mqtt_quic_listener_load_balancing_mode.label:
|
||||
"""Load balancing mode"""
|
||||
|
||||
persistent_session_store_session_message_gc_interval.desc:
|
||||
"""The starting interval for garbage collection of transient data for
|
||||
persistent session messages. This does not affect the lifetime length
|
||||
of persistent session messages."""
|
||||
|
||||
persistent_session_store_session_message_gc_interval.label:
|
||||
"""Session message GC interval"""
|
||||
|
||||
server_ssl_opts_schema_ocsp_refresh_http_timeout.desc:
|
||||
"""The timeout for the HTTP request when checking OCSP responses."""
|
||||
|
||||
|
@ -612,12 +584,6 @@ broker_session_locking_strategy.desc:
|
|||
- `quorum`: select some nodes to lock the session
|
||||
- `all`: lock the session on all the nodes in the cluster"""
|
||||
|
||||
persistent_store_ram_cache.desc:
|
||||
"""Maintain a copy of the data in RAM for faster access."""
|
||||
|
||||
persistent_store_ram_cache.label:
|
||||
"""RAM cache"""
|
||||
|
||||
fields_mqtt_quic_listener_stream_recv_window_default.desc:
|
||||
"""Initial stream receive window size. Default: 32678"""
|
||||
|
||||
|
@ -834,14 +800,6 @@ force_shutdown_max_heap_size.desc:
|
|||
force_shutdown_max_heap_size.label:
|
||||
"""Total heap size"""
|
||||
|
||||
persistent_store_on_disc.desc:
|
||||
"""Save information about the persistent sessions on disc.
|
||||
If this option is enabled, persistent sessions will survive full restart of the cluster.
|
||||
Otherwise, all the data will be stored in RAM, and it will be lost when all the nodes in the cluster are stopped."""
|
||||
|
||||
persistent_store_on_disc.label:
|
||||
"""Persist on disc"""
|
||||
|
||||
mqtt_ignore_loop_deliver.desc:
|
||||
"""Whether the messages sent by the MQTT v3.1.1/v3.1.0 client will be looped back to the publisher itself, similar to <code>No Local</code> in MQTT 5.0."""
|
||||
|
||||
|
@ -1051,13 +1009,6 @@ base_listener_limiter.desc:
|
|||
base_listener_limiter.label:
|
||||
"""Type of the rate limit."""
|
||||
|
||||
persistent_session_store_backend.desc:
|
||||
"""Database management system used to store information about persistent sessions and messages.
|
||||
- `builtin`: Use the embedded database (mria)"""
|
||||
|
||||
persistent_session_store_backend.label:
|
||||
"""Backend"""
|
||||
|
||||
alarm_validity_period.desc:
|
||||
"""Retention time of deactivated alarms. Alarms are not deleted immediately
|
||||
when deactivated, but after the retention time."""
|
||||
|
@ -1095,14 +1046,6 @@ To disable this feature, input <code>""</code> in the text box below. Only appli
|
|||
mqtt_response_information.label:
|
||||
"""Response Information"""
|
||||
|
||||
persistent_session_store_max_retain_undelivered.desc:
|
||||
"""The time messages that was not delivered to a persistent session
|
||||
is stored before being garbage collected if the node the previous
|
||||
session was handled on restarts of is stopped."""
|
||||
|
||||
persistent_session_store_max_retain_undelivered.label:
|
||||
"""Max retain undelivered"""
|
||||
|
||||
fields_mqtt_quic_listener_migration_enabled.desc:
|
||||
"""Enable clients to migrate IP addresses and tuples. Requires a cooperative load-balancer, or no load-balancer. Default: 1 (Enabled)"""
|
||||
|
||||
|
@ -1199,12 +1142,6 @@ until the subscriber disconnects.
|
|||
- `local`: send to a random local subscriber. If local
|
||||
subscriber was not found, send to a random subscriber cluster-wide"""
|
||||
|
||||
persistent_session_builtin_sess_msg_table.desc:
|
||||
"""Performance tuning options for built-in session messages table."""
|
||||
|
||||
persistent_session_builtin_sess_msg_table.label:
|
||||
"""Persistent session messages"""
|
||||
|
||||
mqtt_mqueue_store_qos0.desc:
|
||||
"""Specifies whether to store QoS 0 messages in the message queue while the connection is down but the session remains."""
|
||||
|
||||
|
@ -1389,14 +1326,6 @@ Supported configurations are the following:
|
|||
mqtt_peer_cert_as_clientid.label:
|
||||
"""Use Peer Certificate as Client ID"""
|
||||
|
||||
persistent_session_store_message_gc_interval.desc:
|
||||
"""The starting interval for garbage collection of undelivered messages to
|
||||
a persistent session. This affects how often the "max_retain_undelivered"
|
||||
is checked for removal."""
|
||||
|
||||
persistent_session_store_message_gc_interval.label:
|
||||
"""Message GC interval"""
|
||||
|
||||
broker_shared_dispatch_ack_enabled.desc:
|
||||
"""Deprecated.
|
||||
This was designed to avoid dispatching messages to a shared-subscription session which has the client disconnected.
|
||||
|
@ -1602,41 +1531,46 @@ resource_tags.label:
|
|||
resource_tags.desc:
|
||||
"""Tags to annotate this config entry."""
|
||||
|
||||
session_persistence_enable.label:
|
||||
"""Enable session persistence"""
|
||||
|
||||
session_persistence_enable.desc:
|
||||
"""Use durable storage for client sessions persistence.
|
||||
If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime."""
|
||||
If enabled, sessions configured to outlive client connections, along with their corresponding messages, will be durably stored and survive broker downtime.
|
||||
|
||||
session_persistence_storage.desc:
|
||||
"""Durable storage backend to use for session persistence."""
|
||||
:::warning
|
||||
This feature is currently experimental. Please don't enable it in the production environments that contain valuable data.
|
||||
:::"""
|
||||
|
||||
session_storage_backend_enable.desc:
|
||||
"""Enable this backend."""
|
||||
|
||||
session_builtin_n_shards.desc:
|
||||
"""Number of shards used for storing the messages."""
|
||||
|
||||
session_storage_backend_builtin.desc:
|
||||
"""Builtin session storage backend utilizing embedded RocksDB key-value store."""
|
||||
session_ds_session_gc_interval.label:
|
||||
"""Session garbage collection interval"""
|
||||
|
||||
session_ds_session_gc_interval.desc:
|
||||
"""The interval at which session garbage collection is executed for persistent sessions."""
|
||||
|
||||
session_ds_session_gc_batch_size.label:
|
||||
"""Session garbage collection batch size"""
|
||||
|
||||
session_ds_session_gc_batch_size.desc:
|
||||
"""The size of each batch of expired persistent sessions to be garbage collected per iteration."""
|
||||
|
||||
session_ds_max_batch_size.desc:
|
||||
session_ds_batch_size.label:
|
||||
"""Batch size"""
|
||||
|
||||
session_ds_batch_size.desc:
|
||||
"""This value affects the flow control for the persistent sessions.
|
||||
The session queries the DB for the new messages in batches.
|
||||
Size of the batch doesn't exceed this value or `ReceiveMaximum`, whichever is smaller."""
|
||||
Persistent session queries the durable message storage in batches.
|
||||
This value specifies size of the batch.
|
||||
|
||||
session_ds_min_batch_size.desc:
|
||||
"""This value affects the flow control for the persistent sessions.
|
||||
The session will query the DB for the new messages when the value of `FreeSpace` variable is larger than this value or `ReceiveMaximum` / 2, whichever is smaller.
|
||||
Note: larger batches generally improve the throughput and overall performance of the system, but increase RAM usage per client."""
|
||||
|
||||
`FreeSpace` is calculated as `ReceiveMaximum` for the session - number of inflight messages."""
|
||||
durable_storage.label:
|
||||
"""Durable storage"""
|
||||
|
||||
session_ds_message_retention_period.desc:
|
||||
"""The minimum amount of time that messages should be retained for. After messages have been in storage for at least this period of time, they'll be dropped."""
|
||||
durable_storage.desc:
|
||||
"""Configuration related to the EMQX durable storages.
|
||||
|
||||
EMQX uses durable storages to offload various data, such as MQTT messages, to disc."""
|
||||
|
||||
}
|
||||
|
|
|
@ -12,6 +12,14 @@ protobuf_type.desc:
|
|||
protobuf_type.label:
|
||||
"""Protocol Buffers"""
|
||||
|
||||
json_type.desc: """~
|
||||
Supports JSON Schema
|
||||
[Draft 03](http://tools.ietf.org/html/draft-zyp-json-schema-03)
|
||||
[Draft 04](http://tools.ietf.org/html/draft-zyp-json-schema-04) and
|
||||
[Draft 06](https://datatracker.ietf.org/doc/html/draft-wright-json-schema-00)."""
|
||||
|
||||
json_type.label: "JSON Schema"
|
||||
|
||||
schema_description.desc:
|
||||
"""A description for this schema."""
|
||||
|
||||
|
@ -42,10 +50,22 @@ schema_source.desc:
|
|||
schema_source.label:
|
||||
"""Schema source"""
|
||||
|
||||
schema_type.desc:
|
||||
"""Schema type."""
|
||||
schema_type_avro.desc:
|
||||
"""Must be `avro` for Avro schema."""
|
||||
|
||||
schema_type.label:
|
||||
"""Schema type"""
|
||||
schema_type_avro.label:
|
||||
"""Avro Schema"""
|
||||
|
||||
schema_type_protobuf.desc:
|
||||
"""Must be `protobuf` for protobuf schema."""
|
||||
|
||||
schema_type_protobuf.label:
|
||||
"""Protobuf Schema"""
|
||||
|
||||
schema_type_json.desc:
|
||||
"""Must be `json` for JSON schema."""
|
||||
|
||||
schema_type_json.label:
|
||||
"""JSON Schema"""
|
||||
|
||||
}
|
||||
|
|
|
@ -2,6 +2,10 @@
|
|||
|
||||
set -euo pipefail
|
||||
|
||||
if [ -n "${FORCE:-}" ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
OPT="${1:--c}"
|
||||
|
||||
# mix format check is quite fast
|
||||
|
|
|
@ -198,6 +198,7 @@ procs
|
|||
progname
|
||||
prometheus
|
||||
proto
|
||||
protobuf
|
||||
ps
|
||||
psk
|
||||
pubsub
|
||||
|
@ -285,9 +286,11 @@ TDengine
|
|||
clickhouse
|
||||
FormatType
|
||||
RocketMQ
|
||||
RocksDB
|
||||
Keyspace
|
||||
OpenTSDB
|
||||
saml
|
||||
storages
|
||||
idp
|
||||
ocpp
|
||||
OCPP
|
||||
|
|
Loading…
Reference in New Issue