Mosca lib/server.js

    Description

    Copyright (c) 2013-2014 Matteo Collina, http://matteocollina.com

    Permission is hereby granted, free of charge, to any person
    obtaining a copy of this software and associated documentation
    files (the "Software"), to deal in the Software without
    restriction, including without limitation the rights to use,
    copy, modify, merge, publish, distribute, sublicense, and/or sell
    copies of the Software, and to permit persons to whom the
    Software is furnished to do so, subject to the following
    conditions:

    The above copyright notice and this permission notice shall be
    included in all copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
    EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
    OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
    NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
    HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
    WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
    OTHER DEALINGS IN THE SOFTWARE.

    Source

    "use strict";
    
    var mqtt = require("mqtt");
    var mows = require("mows");
    var http = require("http");
    var https = require("https");
    var async = require("async");
    var fs    = require("fs");
    var ascoltatori = require("ascoltatori");
    var EventEmitter = require("events").EventEmitter;
    var bunyan = require("bunyan");
    var extend = require("extend");
    var Client = require("./client");
    var Stats = require("./stats");
    var express = require("express");
    var uuid = require("node-uuid");
    var browserify = require('browserify-middleware');
    var defaults = {
      port: 1883,
      backend: {
        json: false
      },
      stats: true,
      baseRetryTimeout: 1000,
      logger: {
        name: "mosca",
        level: 40,
        serializers: {
          client: clientSerializer,
          packet: packetSerializer
        }
      }
    };
    var nop = function() {};

    Server

    function
    Server()
    • @param: {Object}optsThe option object
    • @param: {Function}callbackThe ready callback

    Description

    The Mosca Server is a very simple MQTT server that
    provides a simple event-based API to craft your own MQTT logic
    It supports QoS 0 & 1, without external storage.
    It is backed by Ascoltatori, and it descends from
    EventEmitter.

    Options:
    - port, the port where to create the server.
    - backend, all the options for creating the Ascoltatore
    that will power this server.
    - ascoltatore, the ascoltatore to use (instead of backend).
    - baseRetryTimeout, the retry timeout for the exponential
    backoff algorithm (default is 1s).
    - logger, the options for Bunyan.
    - logger.childOf, the parent Bunyan logger.
    - persistence, the options for the persistence.
    A sub-key factory is used to specify what persistence
    to use.
    - secure, an object that includes three properties:
    - port, the port that will be used to open the secure server
    - keyPath, the path to the key
    - certPath, the path to the certificate
    - allowNonSecure, starts both the secure and the unsecure sevrver.
    - http, an object that includes the properties:
    - port, the port that will be used to open the http server
    - bundle, serve the bundled mqtt client
    - static, serve a directory through the static express
    middleware

    Events:
    - clientConnected, when a client is connected;
    the client is passed as a parameter.
    - clientDisconnecting, when a client is being disconnected;
    the client is passed as a parameter.
    - clientDisconnected, when a client is disconnected;
    the client is passed as a parameter.
    - published, when a new message is published;
    the packet and the client are passed as parameters.
    - subscribed, when a client is subscribed to a topic;
    the topic and the client are passed as parameters.
    - unsubscribed, when a client is unsubscribed to a topic;
    the topic and the client are passed as parameters.

    Source

    function Server(opts, callback) {
    
      if (!(this instanceof Server)) {
        return new Server(opts, callback);
      }
    
      EventEmitter.call(this);
    
      this.opts = extend(true, {}, defaults, opts);
    
      if (this.opts.persistence && this.opts.persistence.factory) {
        this.opts.persistence.factory(this.opts.persistence).wire(this);
      }
    
      callback = callback || function() {};
    
      this._dedupId = 0;
      this.clients = {};
    
      if (this.opts.logger.childOf) {
        this.logger = this.opts.logger.childOf;
        delete this.opts.logger.childOf;
        delete this.opts.logger.name;
        this.logger = this.logger.child(this.opts.logger);
      } else {
        this.logger = bunyan.createLogger(this.opts.logger);
      }
    
      if(this.opts.stats) {
        new Stats().wire(this);
      }
    
      var that = this;
    
      var serveWrap = function(connection) {
        // disable Nagle algorithm
        connection.stream.setNoDelay(true);
        new Client(connection, that);
      };
    
      // each Server has a dummy id for logging purposes
      this.id = this.opts.id || uuid().split("-")[0];
    
      this.ascoltatore = opts.ascoltatore || ascoltatori.build(this.opts.backend);
      this.ascoltatore.on("error", this.emit.bind(this));
    
      // initialize servers list
      this.servers = [];
    
      that.once("ready", function() {
        callback(null, that);
      });
    
      async.series([
        function(cb) {
          that.ascoltatore.on("ready", cb);
        },
        function(cb) {
          var server = null;
          var app = express();
          if (that.opts.http) {
            server = http.createServer(app);
    
            if (that.opts.http.bundle) {
              that.serveBundle(app);
            }
    
            if (that.opts.http.static) {
              app.use(express.static(that.opts.http.static));
            }
    
            that.attachHttpServer(server);
            that.servers.push(server);
            that.opts.http.port = that.opts.http.port || 3000;
            server.listen(that.opts.http.port, cb);
          } else {
            cb();
          }
        },
        function(cb) {
          var server = null;
          var app = express();
          if (that.opts.https) {
            server = https.createServer({
              key: fs.readFileSync(that.opts.secure.keyPath),
              cert: fs.readFileSync(that.opts.secure.certPath)
            }, app);
    
            if (that.opts.https.bundle) {
              that.serveBundle(app);
            }
    
            if (that.opts.https.static) {
              app.use(express.static(that.opts.https.static));
            }
    
            that.attachHttpServer(server);
            that.servers.push(server);
            that.opts.https.port = that.opts.https.port || 3001;
            server.listen(that.opts.https.port, cb);
          } else {
            cb();
          }
        },
        function(cb) {
          var server = null;
          if (that.opts.secure && !that.opts.onlyHttp) {
            server = mqtt.createSecureServer(that.opts.secure.keyPath, that.opts.secure.certPath, serveWrap);
            that.servers.push(server);
            that.opts.secure.port = that.opts.secure.port || 8883;
            server.listen(that.opts.secure.port, cb);
          } else {
            cb();
          }
        }, function(cb) {
          if ((typeof that.opts.secure === 'undefined' || that.opts.allowNonSecure) && !that.opts.onlyHttp) {
            var server = mqtt.createServer(serveWrap);
            that.servers.push(server);
            server.listen(that.opts.port, cb);
          } else {
            cb();
          }
        }, function(cb) {
          var logInfo = {
            port: (that.opts.onlyHttp ? undefined : that.opts.port),
            securePort: (that.opts.secure || {}).port,
            httpPort: (that.opts.http || {}).port,
            httpsPort: (that.opts.https || {}).port
          };
    
          that.logger.info(logInfo, "server started");
    
          that.servers.forEach(function(server) {
            server.maxConnections = 100000;
          });
          that.emit("ready");
        }
      ]);
    
      that.on("clientConnected", function(client) {
        this.clients[client.id] = client;
      });
    
      that.on("clientDisconnected", function(client) {
        delete this.clients[client.id];
      });
    }
    
    module.exports = Server;
    
    Server.prototype = Object.create(EventEmitter.prototype);
    
    Server.prototype.toString = function() {
      return 'mosca.Server';
    };

    publish

    method
    Server.prototype.publish()
    • @param: {Object}packetThe MQTT packet, it should include the
    • @: topic, payload, qos, and retain keys.
    • @param: {Object}clientThe client object (internal)
    • @param: {String}passwordThe password

    Description

    Publishes a packet on the MQTT broker.

    Source

    Server.prototype.publish = function publish(packet, client, callback) {
    
      var that = this;
      var logger = this.logger;
    
      if (typeof client === 'function') {
        callback = client;
        client = null;
      } else if (client) {
        logger = client.logger;
      }
    
      if (!callback) {
        callback = nop;
      }
    
      var options = {
        qos: packet.qos,
        mosca: {
          client: client, // the client object
          packet: packet  // the packet being sent
        }
      };
    
      this.ascoltatore.publish(
        packet.topic,
        packet.payload,
        options,
        function() {
          that.storePacket(packet, function() {
            that.published(packet, client, function() {
              logger.info({ packet: packet }, "published packet");
              that.emit("published", packet, client);
              callback();
            });
          });
        }
      );
    };

    authenticate

    method
    Server.prototype.authenticate()
    • @param: {Object}clientThe MQTTConnection that is a client
    • @param: {String}usernameThe username
    • @param: {String}passwordThe password
    • @param: {Function}callbackThe callback to return the verdict

    Description

    The function that will be used to authenticate users.
    This default implementation authenticate everybody.
    Override at will.

    Source

    Server.prototype.authenticate = function(client, username, password, callback) {
      callback(null, true);
    };

    published

    method
    Server.prototype.published()
    • @param: {Object}packetThe MQTT packet
    • @param: {Object}clientThe MQTTConnection that is a client
    • @param: {Function}callbackThe callback to send the puback

    Description

    The function that is called before after receiving a publish message but before
    answering with puback for QoS 1 packet.
    This default implementation does nothing
    Override at will

    Source

    Server.prototype.published = function(packet, client, callback) {
      callback(null);
    };

    authorizePublish

    method
    Server.prototype.authorizePublish()
    • @param: {Object}clientThe MQTTConnection that is a client
    • @param: {String}topicThe topic
    • @param: {String}paylodThe paylod
    • @param: {Function}callbackThe callback to return the verdict

    Description

    The function that will be used to authorize clients to publish to topics.
    This default implementation authorize everybody.
    Override at will

    Source

    Server.prototype.authorizePublish = function(client, topic, payload, callback) {
      callback(null, true);
    };

    authorizeSubscribe

    method
    Server.prototype.authorizeSubscribe()
    • @param: {Object}clientThe MQTTConnection that is a client
    • @param: {String}topicThe topic
    • @param: {Function}callbackThe callback to return the verdict

    Description

    The function that will be used to authorize clients to subscribe to topics.
    This default implementation authorize everybody.
    Override at will

    Source

    Server.prototype.authorizeSubscribe = function(client, topic, callback) {
      callback(null, true);
    };

    storePacket

    method
    Server.prototype.storePacket()
    • @param: {Object}packetThe MQTT packet to store
    • @param: {Function}callback

    Description

    Store a packet for future usage, if needed.
    Only packets with the retained flag are setted, or for which
    there is an "offline" subscription".
    This is a NOP, override at will.

    Source

    Server.prototype.storePacket = function(packet, callback) {
      if (callback) {
        callback();
      }
    };

    forwardRetained

    method
    Server.prototype.forwardRetained()
    • @param: {String}patternThe topic pattern.
    • @param: {MoscaClient}clientThe client to forward the packet's to.
    • @param: {Function}callback

    Description

    Forward all the retained messages of the specified pattern to
    the client.
    This is a NOP, override at will.

    Source

    Server.prototype.forwardRetained = function(pattern, client, callback) {
      if (callback) {
        callback();
      }
    };

    restoreClientSubscriptions

    method
    Server.prototype.restoreClientSubscriptions()
    • @param: {MoscaClient}client
    • @param: {Function}callback

    Description

    Restores the previous subscriptions in the client
    This is a NOP, override at will.

    Source

    Server.prototype.restoreClientSubscriptions = function(client, callback) {
      if (callback) {
        callback();
      }
    };

    forwardOfflinePackets

    method
    Server.prototype.forwardOfflinePackets()
    • @param: {MoscaClient}client
    • @param: {Function}callback

    Description

    Forward all the offline messages the client has received when it was offline.
    This is a NOP, override at will.

    Source

    Server.prototype.forwardOfflinePackets = function(client, callback) {
      if (callback) {
        callback();
      }
    };

    persistClient

    method
    Server.prototype.persistClient()
    • @param: {MoscaClient}client
    • @param: {Function}callback

    Description

    Persist a client.
    This is a NOP, override at will.

    Source

    Server.prototype.persistClient = function(client, callback) {
      if (callback) {
        callback();
      }
    };

    close

    method
    Server.prototype.close()
    • @param: {Function}callbackThe closed callback function

    Description

    Closes the server.

    Source

    Server.prototype.close = function(callback) {
      var that = this;
      var stuffToClose = [];
    
      Object.keys(that.clients).forEach(function(i) {
        stuffToClose.push(that.clients[i]);
      });
    
      that.servers.forEach(function(server) {
        stuffToClose.push(server);
      });
    
      async.each(stuffToClose, function(toClose, cb) {
        toClose.close(cb);
      }, function() {
        that.ascoltatore.close(function () {
          that.logger.info("server closed");
          that.emit("closed");
          if (callback) {
            callback();
          }
        });
      });
    };

    attachHttpServer

    method
    Server.prototype.attachHttpServer()
    • @param: {HttpServer}server

    Description

    Attach a Mosca server to an existing http server

    Source

    Server.prototype.attachHttpServer = function(server) {
      var that = this;
      mows.attachServer(server, function(conn) {
        new Client(conn, that);
      });
    };

    serveBundle

    method
    Server.prototype.serveBundle()
    • @param: {ExpressApp}app

    Description

    Add the middleware for serving the bundle
    to an Express app.

    Source

    Server.prototype.serveBundle = function(app) {
      app.get('/mqtt.js', browserify(require.resolve('mows'), {
        standalone: 'mqtt'
      }));
    };
    
    Server.prototype.nextDedupId = function() {
      return this._dedupId++;
    };