feat(ds): Don't store #message record in the DB
This commit is contained in:
parent
ee191803ea
commit
c69b82455e
|
@ -99,8 +99,27 @@
|
|||
%% Limit on the number of wildcard levels in the learned topic trie:
|
||||
-define(WILDCARD_LIMIT, 10).
|
||||
|
||||
%% Persistent (durable) term representing `#message{}' record. Must
|
||||
%% not change.
|
||||
-type value_v1() ::
|
||||
{
|
||||
_Id :: binary(),
|
||||
_Qos :: 0..2,
|
||||
_From :: atom() | binary(),
|
||||
_Flags :: emqx_types:flags(),
|
||||
_Headsers :: emqx_types:headers(),
|
||||
_Topic :: emqx_types:topic(),
|
||||
_Payload :: emqx_types:payload(),
|
||||
_Timestamp :: integer(),
|
||||
_Extra :: term()
|
||||
}.
|
||||
|
||||
-include("emqx_ds_bitmask.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-endif.
|
||||
|
||||
%%================================================================================
|
||||
%% API funcions
|
||||
%%================================================================================
|
||||
|
@ -389,11 +408,39 @@ hash_topic_level(TopicLevel) ->
|
|||
<<Int:64, _/binary>> = erlang:md5(TopicLevel),
|
||||
Int.
|
||||
|
||||
-spec message_to_value_v1(emqx_types:message()) -> value_v1().
|
||||
message_to_value_v1(#message{
|
||||
id = Id,
|
||||
qos = Qos,
|
||||
from = From,
|
||||
flags = Flags,
|
||||
headers = Headers,
|
||||
topic = Topic,
|
||||
payload = Payload,
|
||||
timestamp = Timestamp,
|
||||
extra = Extra
|
||||
}) ->
|
||||
{Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, Extra}.
|
||||
|
||||
-spec value_v1_to_message(value_v1()) -> emqx_types:message().
|
||||
value_v1_to_message({Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, Extra}) ->
|
||||
#message{
|
||||
id = Id,
|
||||
qos = Qos,
|
||||
from = From,
|
||||
flags = Flags,
|
||||
headers = Headers,
|
||||
topic = Topic,
|
||||
payload = Payload,
|
||||
timestamp = Timestamp,
|
||||
extra = Extra
|
||||
}.
|
||||
|
||||
serialize(Msg) ->
|
||||
term_to_binary(Msg).
|
||||
term_to_binary(message_to_value_v1(Msg)).
|
||||
|
||||
deserialize(Blob) ->
|
||||
binary_to_term(Blob).
|
||||
value_v1_to_message(binary_to_term(Blob)).
|
||||
|
||||
-define(BYTE_SIZE, 8).
|
||||
|
||||
|
@ -452,3 +499,21 @@ data_cf(GenId) ->
|
|||
-spec trie_cf(emqx_ds_storage_layer:gen_id()) -> [char()].
|
||||
trie_cf(GenId) ->
|
||||
"emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
||||
serialize_deserialize_test() ->
|
||||
Msg = #message{
|
||||
id = <<"message_id_val">>,
|
||||
qos = 2,
|
||||
from = <<"from_val">>,
|
||||
flags = #{sys => true, dup => true},
|
||||
headers = #{foo => bar},
|
||||
topic = <<"topic/value">>,
|
||||
payload = [<<"foo">>, <<"bar">>],
|
||||
timestamp = 42424242,
|
||||
extra = "extra_val"
|
||||
},
|
||||
?assertEqual(Msg, deserialize(serialize(Msg))).
|
||||
|
||||
-endif.
|
||||
|
|
Loading…
Reference in New Issue