diff --git a/etc/emq.conf b/etc/emq.conf index a266ba003..d6cb87838 100644 --- a/etc/emq.conf +++ b/etc/emq.conf @@ -166,6 +166,38 @@ mqtt.plugins.etc_dir = etc/plugins/ ## File to store loaded plugin names. mqtt.plugins.loaded_file = data/loaded_plugins +##------------------------------------------------------------------- +## MQTT Modules +##------------------------------------------------------------------- + +## Enable retainer module +mqtt.module.retainer = on + +## disc: disc_copies, ram: ram_copies +mqtt.module.retainer.storage_type = ram + +## Max number of retained messages +mqtt.module.retainer.max_message_num = 100000 + +## Max Payload Size of retained message +mqtt.module.retainer.max_payload_size = 64KB + +## Expired after seconds, never expired if 0 +mqtt.module.retainer.expired_after = 0 + +## Enable presence module +## Client presence management module. Publish presence messages when +## client connected or disconnected. +mqtt.module.presence = on + +mqtt.module.presence.qos = 0 + +## Enable subscription module +## Subscribe topics automatically when client connected +mqtt.module.subscription = on + +mqtt.module.subscription.topics = $client/%c=1,$user/%u=1 + ##-------------------------------------------------------------------- ## MQTT Listeners ##-------------------------------------------------------------------- diff --git a/priv/emq.schema b/priv/emq.schema index 16deda53f..1e9f1ebc4 100644 --- a/priv/emq.schema +++ b/priv/emq.schema @@ -605,6 +605,83 @@ end}. [Listeners(tcp), Listeners(ssl), Listeners(http), Listeners(https)] end}. +%%-------------------------------------------------------------------- +%% MQTT Modules +%%-------------------------------------------------------------------- + +{mapping, "mqtt.module.retainer", "emqttd.modules", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.retainer.storage_type", "emqttd.modules", [ + {default, ram}, + {datatype, {enum, [disc, ram]}} +]}. + +{mapping, "mqtt.module.retainer.max_message_num", "emqttd.modules", [ + {default, 100000}, + {datatype, integer} +]}. + +{mapping, "mqtt.module.retainer.max_payload_size", "emqttd.modules", [ + {default, "64KB"}, + {datatype, bytesize} +]}. + +{mapping, "mqtt.module.retainer.expired_after", "emqttd.modules", [ + {default, 0}, + {datatype, integer} +]}. + +{mapping, "mqtt.module.presence", "emqttd.modules", [ + {default, on}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.presence.qos", "emqttd.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "mqtt.module.subscription", "emqttd.modules", [ + {default, off}, + {datatype, flag} +]}. + +{mapping, "mqtt.module.subscription.topics", "emqttd.modules", [ + {default, undefined}, + {datatype, string} +]}. + +{translation, "emqttd.modules", fun(Conf) -> + WithMod = fun(Name, OptsF) -> + Key = "mqtt.module." ++ atom_to_list(Name), + case cuttlefish:conf_get(Key, Conf, false) of + true -> [{Name, OptsF(Key)}]; + false -> [] + end + end, + RetainOpts = fun(Prefix) -> + [{storage_type, cuttlefish:conf_get(Prefix ++ ".storage_type", Conf, ram)}, + {max_message_num, cuttlefish:conf_get(Prefix ++ ".max_message_num", Conf, undefined)}, + {max_payload_size, cuttlefish:conf_get(Prefix ++ ".max_payload_size", Conf, undefined)}, + {expired_after, cuttlefish:conf_get(Prefix ++ ".expired_after", Conf, 0)}] + end, + PresOpts = fun(Prefix) -> + [{qos, cuttlefish:conf_get(Prefix ++ ".qos", Conf, 0)}] + end, + ParseFun = fun(undefined) -> []; + (Topics) -> [begin + [Topic, Qos] = string:tokens(S, "="), + {list_to_binary(Topic), list_to_integer(Qos)} + end || S <- string:tokens(Topics, ",")] + end, + SubOpts = fun(Prefix) -> [{topics, ParseFun(cuttlefish:conf_get(Prefix ++ ".topics", Conf))}] end, + lists:append([WithMod(retainer, RetainOpts), WithMod(presence, PresOpts), WithMod(subscription, SubOpts)]) +end}. + %%-------------------------------------------------------------------- %% System Monitor %%-------------------------------------------------------------------- diff --git a/src/emqttd_app.erl b/src/emqttd_app.erl index f303274f6..57d348025 100644 --- a/src/emqttd_app.erl +++ b/src/emqttd_app.erl @@ -143,12 +143,11 @@ worker_spec(M, F, A) -> %% Load Modules %%-------------------------------------------------------------------- -%% @doc load all modules +%% @doc Load all modules load_all_mods() -> - ok. - %%lists:foreach(fun load_mod/1, gen_conf:list(emqttd, module)). + lists:foreach(fun load_mod/1, emqttd:env(modules, [])). -load_mod({module, Name, Opts}) -> +load_mod({Name, Opts}) -> Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), case catch Mod:load(Opts) of ok -> lager:info("Load module ~s successfully", [Name]); @@ -158,7 +157,7 @@ load_mod({module, Name, Opts}) -> %% @doc Is module enabled? -spec(is_mod_enabled(Name :: atom()) -> boolean()). -is_mod_enabled(Name) -> lists:keyfind(Name, 2, gen_conf:list(emqttd, module)). +is_mod_enabled(Name) -> lists:keyfind(Name, 1, emqttd:env(modules, [])). %%-------------------------------------------------------------------- %% Start Listeners