diff --git a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl index 2d4949919..6a69a20f3 100644 --- a/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl +++ b/apps/emqx_durable_storage/src/emqx_ds_storage_bitfield_lts.erl @@ -99,8 +99,27 @@ %% Limit on the number of wildcard levels in the learned topic trie: -define(WILDCARD_LIMIT, 10). +%% Persistent (durable) term representing `#message{}' record. Must +%% not change. +-type value_v1() :: + { + _Id :: binary(), + _Qos :: 0..2, + _From :: atom() | binary(), + _Flags :: emqx_types:flags(), + _Headsers :: emqx_types:headers(), + _Topic :: emqx_types:topic(), + _Payload :: emqx_types:payload(), + _Timestamp :: integer(), + _Extra :: term() + }. + -include("emqx_ds_bitmask.hrl"). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + %%================================================================================ %% API funcions %%================================================================================ @@ -389,11 +408,39 @@ hash_topic_level(TopicLevel) -> <> = erlang:md5(TopicLevel), Int. +-spec message_to_value_v1(emqx_types:message()) -> value_v1(). +message_to_value_v1(#message{ + id = Id, + qos = Qos, + from = From, + flags = Flags, + headers = Headers, + topic = Topic, + payload = Payload, + timestamp = Timestamp, + extra = Extra +}) -> + {Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, Extra}. + +-spec value_v1_to_message(value_v1()) -> emqx_types:message(). +value_v1_to_message({Id, Qos, From, Flags, Headers, Topic, Payload, Timestamp, Extra}) -> + #message{ + id = Id, + qos = Qos, + from = From, + flags = Flags, + headers = Headers, + topic = Topic, + payload = Payload, + timestamp = Timestamp, + extra = Extra + }. + serialize(Msg) -> - term_to_binary(Msg). + term_to_binary(message_to_value_v1(Msg)). deserialize(Blob) -> - binary_to_term(Blob). + value_v1_to_message(binary_to_term(Blob)). -define(BYTE_SIZE, 8). @@ -452,3 +499,21 @@ data_cf(GenId) -> -spec trie_cf(emqx_ds_storage_layer:gen_id()) -> [char()]. trie_cf(GenId) -> "emqx_ds_storage_bitfield_lts_trie" ++ integer_to_list(GenId). + +-ifdef(TEST). + +serialize_deserialize_test() -> + Msg = #message{ + id = <<"message_id_val">>, + qos = 2, + from = <<"from_val">>, + flags = #{sys => true, dup => true}, + headers = #{foo => bar}, + topic = <<"topic/value">>, + payload = [<<"foo">>, <<"bar">>], + timestamp = 42424242, + extra = "extra_val" + }, + ?assertEqual(Msg, deserialize(serialize(Msg))). + +-endif.