Skip to content
Snippets Groups Projects
index.js 8.48 KiB
Newer Older
clemo's avatar
clemo committed
// import path from 'path';
import debug from 'debug';
clemo's avatar
clemo committed
// import levelUp from 'levelup';
// import levelDown from 'leveldown'; //todo: leveldown?
// import leveljs from 'level-js';
import pull from 'pull-stream';
import Ssb from '../ssb';

const info = debug('helper::ssb::cache::info');
const error = debug('helper::ssb::cache::error');
const dbug = debug('helper::ssb::cache::debug');
export class ssbCache extends Ssb {

  /** ssb Cache
clemo's avatar
clemo committed
  * @param {object} opts to key;
clemo's avatar
clemo committed
  constructor(opts = {},...args){
    super(opts,...args);
    this.msgTypes = [];
clemo's avatar
clemo committed
    this.cacheDB = opts.cacheDB; // || levelUp(path.join(this.keyPath,'idea-chache'),{valueEncoding:'json',db:levelDown});
    this.ownId;
    this.lastNode;
clemo's avatar
clemo committed
    this.open = false;
    this.collectLimit = opts.collectLimit || -1; //limit on collect queries
    this.collectLimitLive = opts.collectLimitLive || -1; //limit on live queries
  }

clemo's avatar
clemo committed
  /*setReady(){
clemo's avatar
clemo committed
    //this.cacheDB.open(() => {
      this.open = true;
      for(let f = 0; f < this._onReady.length; f++){
        this._onReady[f]();
      }
    //});
clemo's avatar
clemo committed
  }*/

  /**
  * sync last node
  * @param {function} cb - `(err) => {cache.lastNode}` will be triggert when lastNode is set
  **/
  syncLastNode(cb){
    if('undefined' !== typeof this.lastNode){
      cb();
    }else{ //get last node and call collect again
      this.cacheDB.get('lastNode',(err,lastNode) => {
        if(err){
          if(err.notFound){
            this.lastNode = 0;
            this.cacheDB.put('lastNode',{value:0},(err) => {
              if(err){
                return cb(err);
              }
              this.syncLastNode(cb);
            });
          }else{
            cb(err);
          }
        }else{
          this.lastNode = lastNode.value;
          this.syncLastNode(cb);
        }
      });
    }
  }

  /**
  * fill cache from last node
  * @param {function} cb - `(err) => {...}`
  **/
  collect(cb){
    this.syncLastNode((err)=> {
      if(err){
        return cb(err);
      }
      info('beginn collect since lastNode',this.lastNode);
clemo's avatar
clemo committed
      let rs = this.sbot.createLogStream({limit:this.collectLimit,live:false,gt:this.lastNode});
      let filter = this.getFilterByTypeStream(...this.msgTypes);
      pull(rs,filter,this.msgReader((...args) => {this.addMsg(...args)},(err,...args) => {
        //update node and send back to cb
        if(err){
          cb(err,...args);
        }else{
          this.cacheDB.put('lastNode',{value:this.lastNode},(err) => {
            info('lastNode',this.lastNode);
            cb(err,...args);
          });
        }
      }));
    });


  }
  /**
  * set lastNode in db to `this.lastNode`
  * @param {function} cb - cb `(err) => {...}`
  **/
  setLastNode(cb){
    this.cacheDB.put('lastNode',{value:this.lastNode},(err) => {
      info('lastNode',this.lastNode);
      cb(err);
    });
  }
clemo's avatar
clemo committed

  /**
  * fill cache from last node like collect but live
  * @param {function} cb - `(err) => {...}`
  * @return {object} liveFeed - `liveFeed.stop();` //will trigger cb
  **/
clemo's avatar
clemo committed
  collectLive(cb,each){
clemo's avatar
clemo committed
    // let _on = {'msg':[]}
    let _stop = false;
    this.syncLastNode((err)=> {
      if(err){
        return cb(err);
      }
      info('beginn live collect since lastNode',this.lastNode);
clemo's avatar
clemo committed
      let rs = this.sbot.createLogStream({live:true,limit:this.collectLimitLive,gt:this.lastNode});
      let filter = this.getFilterByTypeStream(...this.msgTypes);
      pull(rs,filter,this.msgReader((...args) => {
        if(_stop){
          args[1](true); //next(true) = end of stream
        }else{
clemo's avatar
clemo committed
          if(typeof each === 'undefined'){
            each(...args);
          }
clemo's avatar
clemo committed
          this.addMsg(...args);
        }
      },(err,...args) => {
        if(err){
          cb(err,...args);
        }else if(_stop){
          this.syncLastNode(_stop?()=>{}:cb);
        }
      }));
    });
clemo's avatar
clemo committed
    return {
      stop(){
        _stop = true;
        cb();
      },
      /*on(type,fn){
        _on[type].push(fn);
      }*/
    };
  }

  /**add types of msgs you are interested in(t1,t2,...)
  @param {string} type1 e.g. `post`
  @param {string} type2 e.g. `àbout`
  @param {string} type3 e.g. `git-repo`
  ...
  **/
  addMsgType(...types) {
    for (let t in types) {
      if (this.msgTypes.indexOf(types[t]) === -1) {
        this.msgTypes.push(types[t]);
      }
    }
  }

  /**
  * newMsg
  * @param {object} msg - obj from ssb-stream
  * @param {function} next - continue cb `(err) => {}`
  **/
  addMsg(data,next){
    //check errors
    if('undefined' === typeof this.id){
      return next("id not defined");
    }
    if(data.timestamp < this.lastNode){
clemo's avatar
clemo committed
      return next(`timestamp mismatch \`rm -r ${this.keyPath}/idea-cache\` datatime: ${data.timestamp}, lastnode: ${this.lastNode}`);
    }
    if('undefined' === typeof data.value.content){
      dbug('addMsg content not defined',data);
      return next();
    }
    if('undefined' === typeof data.value.content.type){
      dbug('addMsg type not defined',data);
      return next();
    }

    //continue with valid data
    this.lastNode = data.timestamp;
    if('about' === data.value.content.type){
      this.getUserById(data.value.content.about,(err,user) => {
        if(err){
          return next(err===true?null:err);
        }
        this.updateUserProfile(user,data.value,next);
      });
    }else if('contact' === data.value.content.type){
      this.getUserById(data.value.content.contact,(err,user) => {
        if(err){
          return next(err===true?null:err);
        }
        this.updateUserContact(user,data.value,next);
      });
    }else if(this.msgTypes.indexOf(data.value.content.type) !== -1){
      this.getUserById(data.value.author,(err,user) => {
        if(err){
          return next(err===true?null:err);
        }
        this.updateUserType(user,data,next);
      });
    }else{
      next();
    }

  }

  /**
  * updateUserType
  * @param {object} author - db obj
  * @param {object} msg - {key,value} of new msg
  * @param {function} next - cb
  **/
  updateUserType(author,data,next){
    const value = data.value;
    if('undefined' === typeof author.types[value.content.type]){
      author.types[value.content.type] = {};
    }
    author.types[value.content.type][data.key] = {key:data.key}
    this.cacheDB.put(author.key,author,next);

  }

  /**
  * updateUserProfile
  * @param {object} user - db obj
  * @param {object} value - new value msg
  * @param {function} next - cb
  **/
  updateUserProfile(user,value,next){
    if('undefined' !== typeof value.content.name){
      //rm previous suggestion
      for(let name in user.name){
        for(let author in user.name[name]){
          if(author === value.author){
            user.name[name][author] = 0;
          }
        }
      }
      //add new suggestion
      if('undefined' === typeof user.name[value.content.name]){
        user.name[value.content.name] = {};
      }
      user.name[value.content.name][value.author] = 1;
    }
    if('object' === typeof value.content.image
    && 'undefined' !== typeof value.content.image.link){
      for(let image in user.image){
        for(let author in user.image[image]){
          if(author === value.author){
            user.image[image][author] = 0;
          }
        }
      }
      if('undefined' === typeof user.image[value.content.image.link]){
        user.image[value.content.image.link] = {};
      }
      user.image[value.content.image.link][value.author] = 1;
    }
    this.cacheDB.put(user.key,user,next);
  }

  /**
  * updateUserContact
  * @param {object} user - db obj
  * @param {object} value - new value msg
  * @param {function} next - cb
  */
clemo's avatar
clemo committed
  updateUserContact(contactUser,value,next){
    this.getUserById(value.author,(err,author) => { //get follower
      if(err){
        return next(err);
      }
clemo's avatar
clemo committed
      // dbug('contact:',value);
      author.following[contactUser.id] = value.content.following?1:0;
      this.cacheDB.put(author.key,author,(err) => { //save author
        if(err){
          return next(err);
        }
clemo's avatar
clemo committed
        contactUser.follower[value.author] = 1;
        this.cacheDB.put(contactUser.key,contactUser,next); //save follower
      });
    });
  }

  /**
  * get User by Id
  * @param {string} id - @....
  * @param {function} cb -`(err,value)=>{...}`
  **/
  getUserById(id,cb){
    if('undefined' !== typeof id && id[0] == "@"){
      const key = 'user.'+id;
      this.cacheDB.get(key,(err,user) => {
        if(err && err.notFound){
          cb(null,{id,key,name:{},image:{},types:{},following:{},follower:{}});
        }else{
          cb(err,user);
        }
      });
    }else{
      cb(true,null);
    }
  }

}
export default ssbCache;