import path from 'path'; import debug from 'debug'; import levelUp from 'levelup'; import levelDown from 'memdown'; //todo: leveldown? import pull from 'pull-stream'; import Ssb from '../ssb'; const info = debug('helper::ssb::cache::info'); const error = debug('helper::ssb::cache::error'); const dbug = debug('helper::ssb::cache::debug'); export class ssbCache extends Ssb { /** ssb Cache * @param {string} path to key; */ constructor(opts = {},...args){ super(opts,...args); this.msgTypes = []; this.cacheDB = levelUp(path.join(this.keyPath,'idea-chache'),{valueEncoding:'json',db:levelDown}); this.ownId; this.lastNode; this.open = false; this.collectLimit = opts.collectLimit || -1; //limit on collect queries this.collectLimitLive = opts.collectLimitLive || -1; //limit on live queries } setReady(){ //this.cacheDB.open(() => { this.open = true; for(let f = 0; f < this._onReady.length; f++){ this._onReady[f](); } //}); } /** * sync last node * @param {function} cb - `(err) => {cache.lastNode}` will be triggert when lastNode is set **/ syncLastNode(cb){ if('undefined' !== typeof this.lastNode){ cb(); }else{ //get last node and call collect again this.cacheDB.get('lastNode',(err,lastNode) => { if(err){ if(err.notFound){ this.lastNode = 0; this.cacheDB.put('lastNode',{value:0},(err) => { if(err){ return cb(err); } this.syncLastNode(cb); }); }else{ cb(err); } }else{ this.lastNode = lastNode.value; this.syncLastNode(cb); } }); } } /** * fill cache from last node * @param {function} cb - `(err) => {...}` **/ collect(cb){ this.syncLastNode((err)=> { if(err){ return cb(err); } info('beginn collect since lastNode',this.lastNode); let rs = this.sbot.createLogStream({limit:this.collectLimit,live:false,gt:this.lastNode}); let filter = this.getFilterByTypeStream(...this.msgTypes); pull(rs,filter,this.msgReader((...args) => {this.addMsg(...args)},(err,...args) => { //update node and send back to cb if(err){ cb(err,...args); }else{ this.cacheDB.put('lastNode',{value:this.lastNode},(err) => { info('lastNode',this.lastNode); cb(err,...args); }); } })); }); } /** * set lastNode in db to `this.lastNode` * @param {function} cb - cb `(err) => {...}` **/ setLastNode(cb){ this.cacheDB.put('lastNode',{value:this.lastNode},(err) => { info('lastNode',this.lastNode); cb(err); }); } /** * fill cache from last node like collect but live * @param {function} cb - `(err) => {...}` * @return {object} liveFeed - `liveFeed.stop();` //will trigger cb **/ collectLive(cb){ // let _on = {'msg':[]} let _stop = false; this.syncLastNode((err)=> { if(err){ return cb(err); } info('beginn live collect since lastNode',this.lastNode); let rs = this.sbot.createLogStream({live:true,limit:this.collectLimitLive,gt:this.lastNode}); let filter = this.getFilterByTypeStream(...this.msgTypes); pull(rs,filter,this.msgReader((...args) => { if(_stop){ args[1](true); //next(true) = end of stream }else{ //trigger! this.addMsg(...args); } },(err,...args) => { if(err){ cb(err,...args); }else if(_stop){ this.syncLastNode(_stop?()=>{}:cb); } })); }); return { stop(){ _stop = true; cb(); }, /*on(type,fn){ _on[type].push(fn); }*/ }; } /**add types of msgs you are interested in(t1,t2,...) @param {string} type1 e.g. `post` @param {string} type2 e.g. `àbout` @param {string} type3 e.g. `git-repo` ... **/ addMsgType(...types) { for (let t in types) { if (this.msgTypes.indexOf(types[t]) === -1) { this.msgTypes.push(types[t]); } } } /** * newMsg * @param {object} msg - obj from ssb-stream * @param {function} next - continue cb `(err) => {}` **/ addMsg(data,next){ //check errors if('undefined' === typeof this.id){ return next("id not defined"); } if(data.timestamp < this.lastNode){ return next(`timestamp mismatch \`rm -r ${this.keyPath}/idea-cache\` datatime: ${data.timestamp}, lastnode: ${this.lastNode}`); } if('undefined' === typeof data.value.content){ dbug('addMsg content not defined',data); return next(); } if('undefined' === typeof data.value.content.type){ dbug('addMsg type not defined',data); return next(); } //continue with valid data this.lastNode = data.timestamp; if('about' === data.value.content.type){ this.getUserById(data.value.content.about,(err,user) => { if(err){ return next(err===true?null:err); } this.updateUserProfile(user,data.value,next); }); }else if('contact' === data.value.content.type){ this.getUserById(data.value.content.contact,(err,user) => { if(err){ return next(err===true?null:err); } this.updateUserContact(user,data.value,next); }); }else if(this.msgTypes.indexOf(data.value.content.type) !== -1){ this.getUserById(data.value.author,(err,user) => { if(err){ return next(err===true?null:err); } this.updateUserType(user,data,next); }); }else{ next(); } } /** * updateUserType * @param {object} author - db obj * @param {object} msg - {key,value} of new msg * @param {function} next - cb **/ updateUserType(author,data,next){ const value = data.value; if('undefined' === typeof author.types[value.content.type]){ author.types[value.content.type] = {}; } author.types[value.content.type][data.key] = {key:data.key} this.cacheDB.put(author.key,author,next); } /** * updateUserProfile * @param {object} user - db obj * @param {object} value - new value msg * @param {function} next - cb **/ updateUserProfile(user,value,next){ if('undefined' !== typeof value.content.name){ //rm previous suggestion for(let name in user.name){ for(let author in user.name[name]){ if(author === value.author){ user.name[name][author] = 0; } } } //add new suggestion if('undefined' === typeof user.name[value.content.name]){ user.name[value.content.name] = {}; } user.name[value.content.name][value.author] = 1; } if('object' === typeof value.content.image && 'undefined' !== typeof value.content.image.link){ for(let image in user.image){ for(let author in user.image[image]){ if(author === value.author){ user.image[image][author] = 0; } } } if('undefined' === typeof user.image[value.content.image.link]){ user.image[value.content.image.link] = {}; } user.image[value.content.image.link][value.author] = 1; } this.cacheDB.put(user.key,user,next); } /** * updateUserContact * @param {object} user - db obj * @param {object} value - new value msg * @param {function} next - cb */ updateUserContact(contactUser,value,next){ this.getUserById(value.author,(err,author) => { //get follower if(err){ return next(err); } // dbug('contact:',value); author.following[contactUser.id] = value.content.following?1:0; this.cacheDB.put(author.key,author,(err) => { //save author if(err){ return next(err); } contactUser.follower[value.author] = 1; this.cacheDB.put(contactUser.key,contactUser,next); //save follower }); }); } /** * get User by Id * @param {string} id - @.... * @param {function} cb -`(err,value)=>{...}` **/ getUserById(id,cb){ if('undefined' !== typeof id && id[0] == "@"){ const key = 'user.'+id; this.cacheDB.get(key,(err,user) => { if(err && err.notFound){ cb(null,{id,key,name:{},image:{},types:{},following:{},follower:{}}); }else{ cb(err,user); } }); }else{ cb(true,null); } } } export default ssbCache;