Home Manual Reference Source Repository

lib/SplitSnapshotProvider.js

import fs from "fs";
import path from "path";
import parseSnapshotStream from "./parseSnapshotStream";

/**
 * An implementation of working directly with snapshot data, not meant for dev consumption.
 * This implementation uses Synchronous file APIs
 */
export default class SplitSnapshotProvider {
	/**
	 * @param {Object} snapshot_info Meta information required for understanding a snapshot
	 * @param {Buffer} node_buffer
	 * @param {Buffer} edge_buffer
	 * @param {Buffer} string_indices_buffer
	 * @param {Buffer} string_buffer
	 */
  constructor(
    snapshot_info,
    node_buffer,
    edge_buffer,
	samples_buffer,
    string_indices_buffer,
    string_buffer
  ) {
    const snapshot = snapshot_info;
    /**
     * @private
     */
    this.snapshot = snapshot;
		
    /**
     * @private
     */
    this.node_fields_length = snapshot.meta.node_fields.length;
    /**
     * @private
     */
	this.node_arr_length = snapshot.node_count * this.node_fields_length;
    // append edge_buffer index to get 1st edge
    /**
     * @private
     */
	this.node_struct_size = this.node_fields_length * 4 + 4;
    /**
     * @private
     */
    this.node_buffer = node_buffer;
	if (this.node_buffer.length !== this.node_struct_size * snapshot.node_count) {
		throw new Error('nodes buffer does not match the expected size (either number of nodes is incorrect, or structure is)');
	}

    /**
     * @private
     */
	this.edge_fields_length = snapshot.meta.edge_fields.length;
    /**
     * @private
     */
	this.edge_arr_length = snapshot.edge_count * this.edge_fields_length;
    // prepend node_buffer index to get node
    /**
     * @private
     */
	this.edge_struct_size = snapshot.meta.edge_fields.length * 4 + 4;
    /**
     * @private
     */
    this.edge_buffer = edge_buffer;
	if (this.edge_buffer.length !== this.edge_struct_size * snapshot.edge_count) {
		throw new Error('edges buffer does not match the expected size (either number of edges is incorrect, or structure is)')
	}
	
	/**
	 * @private
	 */
	this.sample_struct_size = 4 * 2;
	/**
	 * @private
	 */
	this.sample_arr_length = samples_buffer.length / this.sample_struct_size;
	/**
	 * @private
	 */
	this.sample_buffer = samples_buffer;
    
    /**
     * @private
     */
    this.string_indices_buffer = string_indices_buffer;
    /**
     * @private
     */
    this.strings_buffer = string_buffer;
    
    /**
     * @private
     */
	this.string_index_struct_size = 4 + 4;
  }


  /**
   * Get total number of Nodes
   * @return {number}
   */
  getNodeArraySize() {
    return this.node_arr_length;
  }


  /**
   * Get the buffer for a Node at the specified index
   * @param {number} index
   * @return {Buffer}
   */
  getNodeBuffer(index) {
    let n = index / this.node_fields_length;
    if (n !== (n | 0)) {
      throw new RangeError('index is not on a Node boundary');
    }
    let offset = n * this.node_struct_size;
    return this.node_buffer.slice(
      offset,
      offset + this.node_struct_size
    );
  }


  /**
   * Get total number of Edges
   * @return {number}
   */
  getEdgeArraySize() {
    return this.edge_arr_length;
  }


  /**
   * Get the buffer for an Edge at the specified index
   * @param {number} index
   * @return {Buffer}
   */
  getEdgeBuffer(index) {
    let n = index / this.edge_fields_length;
    if (n !== (n | 0)) {
      throw new RangeError('index is not on a Edge boundary');
    }
    let offset = n * this.edge_struct_size;
    return this.edge_buffer.slice(
      offset,
      offset + this.edge_struct_size
    );
  }


  /**
   * Get total number of Samples
   * @return {number}
   */
  getSampleArraySize() {
    return this.sample_arr_length;
  }
  

  /**
   * Gets a sample at the specified index.
   * @param {number} index Index of the sample we wish to get.
   * @return {string} 
   */
  getSampleBuffer(index) {
	  const index_offset = index * this.sample_struct_size;
	  return this.sample_buffer.slice(
		  index_offset,
		  index_offset + this.sample_struct_size
	  );
  }


  /**
   * Gets a string at the specified index.
   * @param {number} index Index of the string we wish to get.
   * @return {string} 
   */
  getString(index) {
    const index_offset = index * this.string_index_struct_size;
    const chunk = this.string_indices_buffer.slice(
      index_offset,
      index_offset + this.string_index_struct_size
    );
    const offset = chunk.readUInt32BE(0);
    const length = chunk.readUInt32BE(4);
    return this.strings_buffer.slice(
      offset,
      offset + length
    ).toString();
  }

  /**
	* @return {Object} Metadata from the snapshot data required to find Node and Field structure.
	*/
  getMeta() {
    return this.snapshot.meta;
  }
	
  /**
   * Convenience method for saving snapshot data in a faster loading format to a directory
   * @param {string} outdir Path of the directory to save to
   * @param {function(err: *)} callback Callback notified when there is an error or completion
   * @return {undefined}
   */
  writeToDirectory(outdir, callback) {
		const meta_file = path.join(outdir, 'snapshot.json');
		const nodes_file = path.join(outdir, 'nodes');
		const edges_file = path.join(outdir, 'edges');
		const strings_file = path.join(outdir, 'strings');
		const string_indices_file = path.join(outdir, 'string_indices');
		
		let todo = 5;
		function done(e) {
			todo--;
			if (todo < 0) {
				return;
			}
			if (e) {
				callback(e);
				todo = 0;
			}
			else if (todo === 0) {
				callback(null);
			}
		}
		
		fs.writeFile(meta_file, this.snapshot, done);
		fs.writeFile(nodes_file, this.node_buffer, done);
		fs.writeFile(edges_file, this.edge_buffer, done);
		fs.writeFile(strings_file, this.strings_buffer, done);
		fs.writeFile(string_indices_file, this.string_indices_buffer, done);
	}
	
  
	
  /**
   * Convenience method for loading snapshot data from a directory
   * @param {string} outdir Path of the directory to load
   * @param {function(err: *, provider: SplitSnapshotProvider)} callback Callback notified when there is an error or completion
   * @return {undefined}
   */
	static fromDirectory(outdir, callback) {
		const meta_file = path.join(outdir, 'snapshot.json');
		const nodes_file = path.join(outdir, 'nodes');
		const edges_file = path.join(outdir, 'edges');
		const samples_file = path.join(outdir, 'samples');
		const strings_file = path.join(outdir, 'strings');
		const string_indices_file = path.join(outdir, 'string_indices');
		
		let todo = 6;
		let snapshot_info;
		let nodes_buffer;
		let edges_buffer;
		let samples_buffer;
		let strings_buffer;
		let string_indices_buffer;
		function done(e) {
			todo--;
			if (todo < 0) {
				return;
			}
			if (e) {
				callback(e);
				todo = 0;
			}
			else if (todo === 0) {
				callback(null, new SplitSnapshotProvider(
					snapshot_info,
					nodes_buffer,
					edges_buffer,
					samples_buffer,
					string_indices_buffer,
					strings_buffer
				));
			}
		}
		fs.readFile(meta_file, (e,b) => {
			if (e) {
				done(e);
				return;
			}
			try {
				snapshot_info = JSON.parse(String(b));
				done();
			}
			catch (e) {
				done(e);
			}
		});
		fs.readFile(nodes_file, (e,b) => {
			if (e) {
				done(e);
				return;
			}
			nodes_buffer = b;
			done();
		});
		fs.readFile(edges_file, (e,b) => {
			if (e) {
				done(e);
				return;
			}
			edges_buffer = b;
			done();
		});
		fs.readFile(samples_file, (e,b) => {
			if (e) {
				done(e);
				return;
			}
			samples_buffer = b;
			done();
		});
		fs.readFile(strings_file, (e,b) => {
			if (e) {
				done(e);
				return;
			}
			strings_buffer = b;
			done();
		});
		fs.readFile(string_indices_file, (e,b) => {
			if (e) {
				done(e);
				return;
			}
			string_indices_buffer = b;
			done();
		});
	}
  
  
  /**
   * Convenience method for loading snapshot data from a stream
   * @param {Stream} stream Path of the directory to load
   * @param {function(err: *, provider: SplitSnapshotProvider)} callback Callback notified when there is an error or completion
   * @return {undefined}
   */
	static fromStream(stream, callback) {
		// track this to attach to nodes
		let edge_offset = 0;
		let node_offset = -1;
		
		let edge_count_field_offset = 0;
		
		let snapshot_info = null;
		let nodes_buffer_index = 0;
		let nodes_buffer = null;
		let edges_buffer_index = 0;
		let edges_buffer = null;
		let samples_buffer_length = 0;
		let samples_buffer = new Buffer(0);
		let strings_buffer_length = 0;
		let strings_buffer = null;
		let strings_indices_buffer_length = 0;
		let strings_indices_buffer = null;
		
		let node_struct_size = 0;
		
		stream.pipe(parseSnapshotStream({
			onsnapshot_info(snapshot) {
				edge_count_field_offset = snapshot.meta.node_fields.indexOf('edge_count') * 4;
				snapshot_info = snapshot;
				const node_fields_length = snapshot.meta.node_fields.length;
				// append edge_buffer index to get 1st edge
				node_struct_size = node_fields_length * 4 + 4;
				nodes_buffer = new Buffer(node_struct_size * snapshot.node_count);
			
				// prepend node_buffer index to get node
				const edge_struct_size = snapshot.meta.edge_fields.length * 4 + 4;
				edges_buffer = new Buffer(edge_struct_size * snapshot.edge_count);
				
				strings_indices_buffer = new Buffer(4 * 1024);
				strings_buffer = new Buffer(4 * 1024);
			},
			onnode(buffer) {
				if (buffer == null) {
					edge_offset = 0;
					return;
				}
				buffer.copy(nodes_buffer, nodes_buffer_index);
				nodes_buffer_index += buffer.length;
				nodes_buffer.writeUInt32BE(edge_offset, nodes_buffer_index);
				nodes_buffer_index += 4;
				const edge_count = buffer.readUInt32BE(edge_count_field_offset);
				edge_offset += edge_count;
			},
			onedge(buffer) {
				if (buffer == null) {
					return;
				}
				buffer.copy(edges_buffer, edges_buffer_index);
				edges_buffer_index += buffer.length;
				if (edge_offset == 0) {
					node_offset++;
					edge_offset = nodes_buffer.readUInt32BE(node_struct_size * node_offset + edge_count_field_offset);
				}
				else {
					edge_offset--;
				}
				edges_buffer.writeUInt32BE(node_offset * node_struct_size, edges_buffer_index);
				edges_buffer_index += 4;
			},
			onsample(buffer) {
				if (buffer == null) {
					samples_buffer = samples_buffer.slice(0, samples_buffer_length);
					return;
				}
				if (samples_buffer.length - samples_buffer_length < 8) {
					samples_buffer = Buffer.concat([samples_buffer, new Buffer(4096)]);
				}
				buffer.copy(samples_buffer, samples_buffer_length);
				samples_buffer_length += buffer.length;
			},
			onstring(str) {
				if (str == null) {
					strings_indices_buffer = strings_indices_buffer.slice(0, strings_indices_buffer_length);
					strings_buffer = strings_buffer.slice(0, strings_buffer_length);
					return;
				}
				// enforce utf8 encoding
				let str_buf = new Buffer(str, 'utf8');
				let str_buf_length = str_buf.length;
				
				if (strings_indices_buffer.length - strings_indices_buffer_length < 8) {
					strings_indices_buffer = Buffer.concat([strings_indices_buffer, new Buffer(4096)]);
				}
				strings_indices_buffer.writeUInt32BE(strings_buffer_length, strings_indices_buffer_length);
				strings_indices_buffer_length += 4;
				strings_indices_buffer.writeUInt32BE(str_buf_length, strings_indices_buffer_length);
				strings_indices_buffer_length += 4;
				
				if (strings_buffer.length - strings_buffer_length < str_buf_length) {
					strings_buffer = Buffer.concat([strings_buffer, new Buffer(4096)]);
				}
				str_buf.copy(strings_buffer, strings_buffer_length);
				strings_buffer_length += str_buf_length;
			}
		}))
		.on('error', e=>callback(e))
		.on('end', ()=>{
			callback(null, new SplitSnapshotProvider(
				snapshot_info,
				nodes_buffer,
				edges_buffer,
				samples_buffer,
				strings_indices_buffer,
				strings_buffer
				)
			);
		});
	}
}