Showing posts with label hadoop. Show all posts
Showing posts with label hadoop. Show all posts

05 May 2015

Playing with hadoop/mapreduce and htsjdk/VCF : my notebook.

The aim of this test is to get a count of each type of variant/genotypes in a VCF file using Apache Hadoop and the java library for NGS htsjdk. My source code is available at: https://github.com/lindenb/hadoop-sandbox/blob/master/src/main/java/com/github/lindenb/hadoop/Test.java.

First, and this is my main problem, I needed to create a class 'VcfRow' that would contains the whole data about a variant. As I need to keep the information about all the semantics in the VCF header, each record contains the whole VCF header (!). I asked SO if there was an elegant way to save the header in the hadoop workflow but it currently seems that there is no such solution (http://stackoverflow.com/questions/30052859/hadoop-mapreduce-handling-a-text-file-with-a-header). This class
VcfRow must implement WritableComparable to be serialized by the hadoop pipeline. It's awfully sloooooow since we need to parse a htsjdk.variant.vcf.VCFHeader and a htsjdk.variant.vcf.VCFCodec for each new variant.

public static class VcfRow
implements WritableComparable<VcfRow>
 {
 private List<String> headerLines;
 private String line;
 private VariantContext ctx=null;
 private VCFHeader header =null;
 private VCFCodec codec=new VCFCodec();
 public VcfRow()
   {
 this.headerLines = Collections.emptyList();
 this.line="";
   }
 public VcfRow(List<String> headerLines,String line)
  {
 this.headerLines=headerLines; 
 this.line=line;
  }
 
@Override
public void write(DataOutput out) throws IOException
 {
 out.writeInt(this.headerLines.size());
 for(int i=0;i< this.headerLines.size();++i)
  {
  out.writeUTF(this.headerLines.get(i));
  }
 byte array[]=line.getBytes();
 out.writeInt(array.length);
 out.write(array);
 }

@Override
public void readFields(DataInput in) throws IOException
 {
 int n= in.readInt();
 this.headerLines=new ArrayList<String>(n);
 for(int i=0;i<n;++i) this.headerLines.add(in.readUTF());
 n = in.readInt();
 byte array[]=new byte[n];
 in.readFully(array);
 this.line=new String(array);
 this.codec=new VCFCodec();
 this.ctx=null;
 this.header=null;
 }

public VCFHeader getHeader()
 {
 if(this.header==null)
  {
  this.header = (VCFHeader)this.codec.readActualHeader(new MyLineIterator());
  }
 return this.header;
 }

public VariantContext getVariantContext()
 {
 if(this.ctx==null)
  {
  if(this.header==null) getHeader();//force decode header
  this.ctx=this.codec.decode(this.line);
  }
 return this.ctx;
 }

@Override
public int compareTo(VcfRow o)
 {
 int i = this.getVariantContext().getContig().compareTo(o.getVariantContext().getContig());
 if(i!=0) return i;
 i = this.getVariantContext().getStart() - o.getVariantContext().getStart();
 if(i!=0) return i;
 i =  this.getVariantContext().getReference().compareTo( o.getVariantContext().getReference());
 if(i!=0) return i;
 return this.line.compareTo(o.line);
 }

   private  class MyLineIterator
 extends AbstractIterator<String>
 implements LineIterator
 { 
 int index=0;
 @Override
 protected String advance()
  {
  if(index>= headerLines.size()) return null;
  return headerLines.get(index++);
  }
 }
}

Then a special InputFormat is created for the VCF format. As we need to keep a trace of the Header, this file declares `isSplitable==false`. The class VcfInputFormat creates an instance of RecordReader reading the whole VCF header the first time it is invoked with the method `initialize`. This 'VcfRecordReader' creates a new VcfRow for each line.

public static class VcfInputFormat extends FileInputFormat<LongWritable, VcfRow>
   {
 private List<String> headerLines=new ArrayList<String>();
 
 @Override
 public RecordReader<LongWritable, VcfRow> createRecordReader(InputSplit split,
   TaskAttemptContext context) throws IOException,
   InterruptedException {
  return new VcfRecordReader();
  }  
 @Override
 protected boolean isSplitable(JobContext context, Path filename) {
  return false;
  }
  
 //LineRecordReader
  private class VcfRecordReader extends RecordReader<LongWritable, VcfRow>
    {
  private LineRecordReader delegate=new LineRecordReader();
  public VcfRecordReader() throws IOException
    {
    }
  
   @Override
  public void initialize(InputSplit genericSplit,
    TaskAttemptContext context) throws IOException {
    delegate.initialize(genericSplit, context);
   while( delegate.nextKeyValue())
    {
    String row = delegate.getCurrentValue().toString();
    if(!row.startsWith("#")) throw new IOException("Bad VCF header");
    headerLines.add(row);
    if(row.startsWith("#CHROM")) break;
    }
    }
   @Override
  public LongWritable getCurrentKey() throws IOException,
    InterruptedException {
   return delegate.getCurrentKey();
    }
   
   @Override
  public VcfRow getCurrentValue() throws IOException,
    InterruptedException {
   Text row = this.delegate.getCurrentValue();
   return new VcfRow(headerLines,row.toString());
    }
   
   @Override
  public float getProgress() throws IOException, InterruptedException {
   return this.delegate.getProgress();
    }
   
   @Override
  public boolean nextKeyValue() throws IOException,
    InterruptedException {
   return this.delegate.nextKeyValue();
    }
   
   @Override
  public void close() throws IOException {
    delegate.close();
   }
     }
   }

The hadoop mapper uses the information of each VCFrow and produce a count of each category:
public static class VariantMapper
   extends Mapper<LongWritable, VcfRow, Text, IntWritable>{

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public void map(LongWritable key, VcfRow vcfRow, Context context ) throws IOException, InterruptedException {
  VariantContext ctx = vcfRow.getVariantContext();
  if( ctx.isIndel())
   { 
   word.set("ctx_indel");
      context.write(word, one);
   }
  if( ctx.isBiallelic())
   { 
   word.set("ctx_biallelic");
      context.write(word, one);
   }
  if( ctx.isSNP())
   { 
   word.set("ctx_snp");
   context.write(word, one);
   } 
  if( ctx.hasID())
   { 
   word.set("ctx_id");
   context.write(word, one);
   } 
  word.set("ctx_total");
  context.write(word, one);
 
  for(String sample: vcfRow.getHeader().getSampleNamesInOrder())
   {
   Genotype g =vcfRow.getVariantContext().getGenotype(sample);
   word.set(sample+" "+ctx.getType()+" "+g.getType().name());
   context.write(word, one);
   }

  }
 }

The Reducer computes the sum of each category:
public static class IntSumReducer
   extends Reducer<Text,IntWritable,Text,IntWritable> {
 private IntWritable result = new IntWritable();

 public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
  sum += val.get();
    }
   result.set(sum);
   context.write(key, result);
 }
}

and here is the main program:
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "snp count");
    job.setJarByClass(Test.class);
    job.setMapperClass(VariantMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    Path inputPath=new Path(args[0]);
    job.setInputFormatClass(VcfInputFormat.class);
    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

Download, compile, Run:
lindenb@hardyweinberg:~/src/hadoop-sandbox$ make -Bn
rm -rf hadoop-2.7.0
curl -L -o hadoop-2.7.0.tar.gz "http://apache.spinellicreations.com/hadoop/common/hadoop-2.7.0/hadoop-2.7.0.tar.gz"
tar xvfz hadoop-2.7.0.tar.gz
rm hadoop-2.7.0.tar.gz
touch -c hadoop-2.7.0/bin/hadoop
rm -rf htsjdk-1.130
curl -L -o 1.130.tar.gz "https://github.com/samtools/htsjdk/archive/1.130.tar.gz"
tar xvfz 1.130.tar.gz
rm 1.130.tar.gz
(cd htsjdk-1.130 && ant )
mkdir -p tmp dist
javac -d tmp -cp hadoop-2.7.0/share/hadoop/common/hadoop-common-2.7.0.jar:hadoop-2.7.0/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/hadoop-annotations-2.7.0.jar:hadoop-2.7.0/share/hadoop/common/lib/log4j-1.2.17.jar:htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar -sourcepath src/main/java src/main/java/com/github/lindenb/hadoop/Test.java 
jar cvf dist/test01.jar -C tmp .
rm -rf tmp
mkdir -p input
curl -o input/CEU.exon.2010_09.genotypes.vcf.gz "ftp://ftp-trace.ncbi.nih.gov/1000genomes/ftp/pilot_data/paper_data_sets/a_map_of_human_variation/exon/snps/CEU.exon.2010_09.genotypes.vcf.gz"
gunzip -f input/CEU.exon.2010_09.genotypes.vcf.gz
rm -rf output
HADOOP_CLASSPATH=htsjdk-1.130/dist/commons-logging-1.1.1.jar:htsjdk-1.130/dist/htsjdk-1.130.jar:htsjdk-1.130/dist/commons-jexl-2.1.1.jar:htsjdk-1.130/dist/snappy-java-1.0.3-rc3.jar hadoop-2.7.0/bin/hadoop jar dist/test01.jar com.github.lindenb.hadoop.Test \
   input/CEU.exon.2010_09.genotypes.vcf output
cat output/*

Here is the output of the last command:

15/05/05 17:18:34 INFO input.FileInputFormat: Total input paths to process : 1
15/05/05 17:18:34 INFO mapreduce.JobSubmitter: number of splits:1
15/05/05 17:18:34 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1186897577_0001
15/05/05 17:18:34 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/05/05 17:18:34 INFO mapreduce.Job: Running job: job_local1186897577_0001
15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/05/05 17:18:34 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
15/05/05 17:18:34 INFO mapred.LocalJobRunner: Waiting for map tasks
15/05/05 17:18:34 INFO mapred.LocalJobRunner: Starting task: attempt_local1186897577_0001_m_000000_0
15/05/05 17:18:34 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
15/05/05 17:18:34 INFO mapred.Task:  Using ResourceCalculatorProcessTree : [ ]
15/05/05 17:18:34 INFO mapred.MapTask: Processing split: file:/home/lindenb/src/hadoop-sandbox/input/CEU.exon.2010_09.genotypes.vcf:0+2530564
15/05/05 17:18:34 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/05/05 17:18:34 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/05/05 17:18:34 INFO mapred.MapTask: soft limit at 83886080
15/05/05 17:18:34 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/05/05 17:18:34 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/05/05 17:18:34 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/05/05 17:18:35 INFO mapreduce.Job: Job job_local1186897577_0001 running in uber mode : false
15/05/05 17:18:35 INFO mapreduce.Job:  map 0% reduce 0%
15/05/05 17:18:36 INFO mapred.LocalJobRunner: 
15/05/05 17:18:36 INFO mapred.MapTask: Starting flush of map output
15/05/05 17:18:36 INFO mapred.MapTask: Spilling map output
15/05/05 17:18:36 INFO mapred.MapTask: bufstart = 0; bufend = 7563699; bufvoid = 104857600
15/05/05 17:18:36 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 24902536(99610144); length = 1311861/6553600
15/05/05 17:18:38 INFO mapred.MapTask: Finished spill 0
15/05/05 17:18:38 INFO mapred.Task: Task:attempt_local1186897577_0001_m_000000_0 is done. And is in the process of committing
(...)
NA12843 SNP HOM_REF 2515
NA12843 SNP HOM_VAR 242
NA12843 SNP NO_CALL 293
NA12872 SNP HET 394
NA12872 SNP HOM_REF 2282
NA12872 SNP HOM_VAR 188
NA12872 SNP NO_CALL 625
NA12873 SNP HET 336
NA12873 SNP HOM_REF 2253
NA12873 SNP HOM_VAR 184
NA12873 SNP NO_CALL 716
NA12874 SNP HET 357
NA12874 SNP HOM_REF 2395
NA12874 SNP HOM_VAR 229
NA12874 SNP NO_CALL 508
NA12878 SNP HET 557
NA12878 SNP HOM_REF 2631
NA12878 SNP HOM_VAR 285
NA12878 SNP NO_CALL 16
NA12889 SNP HET 287
NA12889 SNP HOM_REF 2110
NA12889 SNP HOM_VAR 112
NA12889 SNP NO_CALL 980
NA12890 SNP HET 596
NA12890 SNP HOM_REF 2587
NA12890 SNP HOM_VAR 251
NA12890 SNP NO_CALL 55
NA12891 SNP HET 609
NA12891 SNP HOM_REF 2591
NA12891 SNP HOM_VAR 251
NA12891 SNP NO_CALL 38
NA12892 SNP HET 585
NA12892 SNP HOM_REF 2609
NA12892 SNP HOM_VAR 236
NA12892 SNP NO_CALL 59
ctx_biallelic 3489
ctx_id 3489
ctx_snp 3489
ctx_total 3489


that's it,
Pierre

14 August 2012

Apache Pig: first contact with some 'bio' data.

via wikipedia: "Apache Pig is a high-level platform for creating MapReduce programs used with Hadoop. The language for this platform is called Pig Latin. Pig Latin abstracts the programming from the Java MapReduce idiom into a notation which makes MapReduce programming high level, similar to that of SQL for RDBMS systems.".

here I've played with PIG using a local datastore (that is, different from a distributed environment) to handle some data from the UCSC database.

Download and Install

Install Pig:
$ wget "http://mirror.cc.columbia.edu/pub/software/apache/pig/pig-0.10.0/pig-0.10.0.tar.gz"
$ tar xvfz pig-0.10.0.tar.gz
$ rm pig-0.10.0.tar.gz
$ cd pig-0.10.0
$ export PIG_INSTALL=${PWD}
$ export JAVA_HOME=/your/path/to/jdk1.7

Download 'knownGene' from the UCSC:
$ wget "http://hgdownload.cse.ucsc.edu/goldenPath/hg19/database/knownGene.txt.gz"
$ gunzip knownGene.txt.gz

Start the command line interface

run Pig’s Grunt shell in local mode:
$ pig -x local 
2012-08-15 10:37:25,247 [main] INFO  org.apache.pig.Main - Apache Pig version 0.10.0 (r1328203) compiled Apr 19 2012, 22:54:12
2012-08-15 10:37:25,248 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/lindenb/tmp/HADOOP/pig-0.10.0/pig_1344933445243.log
2012-08-15 10:37:25,622 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///

Getting the number of Genes by Chromosome

knownGenes = LOAD '/home/lindenb/tmp/HADOOP/knownGene.txt' as 
  ( name:chararray ,  chrom:chararray ,  strand:chararray ,  txStart:int , 
    txEnd:int ,  cdsStart:int ,  cdsEnd:int ,  exonCount:chararray ,
     exonStarts:chararray ,  exonEnds:chararray ,  proteinID:chararray , 
     alignID:chararray );
keep the genes coding for a protein:
coding = FILTER knownGenes BY cdsStart < cdsEnd ;
Remove some columns:
coding = FOREACH coding GENERATE chrom,cdsStart,cdsEnd,strand,name;
Group by chromosome:
C = GROUP coding by chrom;
Filter out some chromosomes:
C = FILTER C by NOT(
  group matches  '.*_random' OR
  group matches  'chrUn_.*' OR
  group matches '.*hap.*'
  );
Get the count of genes for each chromosome:
D= FOREACH C GENERATE
 group as CHROM,
 COUNT(coding.name) as numberOfGenes
 ;
And dump the data:
dump D;
Result:
(chr1,6139)
(chr2,3869)
(chr3,3406)
(chr4,2166)
(chr5,2515)
(chr6,3021)
(chr7,2825)
(chr8,1936)
(chr9,2338)
(chrM,1)
(chrX,2374)
(chrY,318)
(chr10,2470)
(chr11,3579)
(chr12,3066)
(chr13,924)
(chr14,2009)
(chr15,1819)
(chr16,2510)
(chr17,3396)
(chr18,862)
(chr19,3784)
(chr20,1570)
(chr21,663)
(chr22,1348)
Interestingly, noting seems to happen until your ask to dump the data.

Finding the overlapping genes

I want to get a list of pairs of genes overlapping and having an opposite strand. I've not been able to find a quick way to join two tables using a complex criteria.

Create a two identical lists of genes E1 and E2 . Add an extra column "1" that will be used to join both tables.

E1 = FOREACH coding GENERATE 1 as pivot , $0 , $1 , $2 , $3, $4;
E2 = FOREACH coding GENERATE 1 as pivot , $0 , $1 , $2 , $3, $4;
Join the tables using the extra column.
E3 = join E1 by pivot, E2 by pivot;
Extract and rename the fields from the join:
E3 = FOREACH E3 generate 
 $1 as chrom1, $2 as start1, $3 as end1, $4 as strand1, $5 as name1,
 $7 as chrom2, $8 as start2, $9 as end2, $10 as strand2, $11 as name2
 ;
At this point, the data in E3 look like this:
(...)
(chr1,664484,665108,-,uc009vjm.3,chr1,324342,325605,+,uc001aau.3)
(chr1,664484,665108,-,uc009vjm.3,chr1,664484,665108,-,uc001abe.4)
(chr1,664484,665108,-,uc009vjm.3,chr1,324342,325605,+,uc009vjk.2)
(chr1,664484,665108,-,uc009vjm.3,chr1,664484,665108,-,uc009vjm.3)
(chr1,664484,665108,-,uc009vjm.3,chr1,12189,13639,+,uc010nxq.1)
(chr1,664484,665108,-,uc009vjm.3,chr1,367658,368597,+,uc010nxu.2)
(chr1,664484,665108,-,uc009vjm.3,chr1,621095,622034,-,uc010nxv.2)
(chr1,664484,665108,-,uc009vjm.3,chr1,324514,325605,+,uc021oeh.1)
(chr1,664484,665108,-,uc009vjm.3,chr1,327745,328213,+,uc021oei.1)
(...)
Extract the overlapping genes:
E3= FILTER E3 BY
    name1 < name2 AND
    chrom1==chrom2 AND
    strand1!=strand2 AND
    NOT(end1 < start2 OR end2 < start1);
and dump the result:
dump E3
After a few hours the result is computed:
(...)
(chr9,119188129,120177216,-,uc004bjt.2,chr9,119460021,119461983,+,uc004bjw.2)
(chr9,119188129,120177216,-,uc004bjt.2,chr9,119460021,119461983,+,uc004bjx.2)
(chr9,119188129,120177216,-,uc004bjt.2,chr9,119460021,119461983,+,uc022bmo.1)
(chr9,119460021,119461983,+,uc004bjw.2,chr9,119188129,119903719,-,uc022bml.1)
(chr9,119460021,119461983,+,uc004bjw.2,chr9,119188129,119903719,-,uc022bmm.1)
(chr9,119460021,119461983,+,uc004bjx.2,chr9,119188129,119903719,-,uc022bml.1)
(chr9,119460021,119461983,+,uc004bjx.2,chr9,119188129,119903719,-,uc022bmm.1)
(chr9,129724568,129981048,+,uc004bqo.2,chr9,129851217,129871010,-,uc004bqr.1)
(chr9,129724568,129981048,+,uc004bqo.2,chr9,129851217,129856116,-,uc010mxg.1)
(chr9,129724568,129979280,+,uc004bqq.4,chr9,129851217,129871010,-,uc004bqr.1)
(chr9,129724568,129979280,+,uc004bqq.4,chr9,129851217,129856116,-,uc010mxg.1)
(chr9,129851217,129871010,-,uc004bqr.1,chr9,129724568,129940183,+,uc022bno.1)
(chr9,129851217,129871010,-,uc004bqr.1,chr9,129724568,129946390,+,uc011mab.2)
(chr9,129851217,129871010,-,uc004bqr.1,chr9,129724568,129979280,+,uc011mac.2)
(chr9,129851217,129856116,-,uc010mxg.1,chr9,129724568,129940183,+,uc022bno.1)
(chr9,129851217,129856116,-,uc010mxg.1,chr9,129724568,129946390,+,uc011mab.2)
(chr9,129851217,129856116,-,uc010mxg.1,chr9,129724568,129979280,+,uc011mac.2)
(chr9,130455527,130477918,-,uc004brm.3,chr9,130469310,130476184,+,uc004brn.1)
(chr9,131703812,131719311,+,uc004bwq.1,chr9,131707965,131709582,-,uc004bwr.3)
(chrX,11156982,11445715,-,uc004cun.1,chrX,11312908,11318732,+,uc004cus.3)
(chrX,11156982,11445715,-,uc004cun.1,chrX,11312908,11318732,+,uc004cut.3)
(chrX,11156982,11445715,-,uc004cun.1,chrX,11312908,11318732,+,uc004cuu.3)
(chrX,11156982,11682948,-,uc004cup.1,chrX,11312908,11318732,+,uc004cus.3)
(chrX,11156982,11682948,-,uc004cup.1,chrX,11312908,11318732,+,uc004cut.3)
(chrX,11156982,11682948,-,uc004cup.1,chrX,11312908,11318732,+,uc004cuu.3)
(...)
That was very slow. There might be a better way to do this and I wonder if using a hadoop filesystem would really speed the computation. At this point I'll continue to use a SQL database for such small amount of data.

That's it.

Pierre




15 June 2011

Sorting and Joining some Genotypes with Hadoop.



This post describes how I've used hadoop to merge two large files produced by our Affymetrix-Axiom Genotyping platform.
Image via OpenWet Ware
(Note: working with the last version of hadoop has been a headache: it contains some classes with the same name: ('org.apache.hadoop.mapreduce.Reducer' and 'org.apache.hadoop.mapred.Reducer' ), many classes have been deprecated ( org.apache.hadoop.mapred.JobConf ) but they are still needed to run some specific algorithms ( `jobConf.setInputFormat(CompositeInputFormat.class)`), etc...

The Input Files


The annotation file

This file describes the genotyped markers (chrom,position,alleleA,allleB, etc...).

(... header ...)
"Probe Set ID","dbSNP RS ID","Chromosome","Physical Position","Strand","ChrX pseudo-autosomal region 1","Cytoband","Flank","Allele A","Allele B","Associated Gene","Genetic Map","Microsatellite","Allele Frequencies","Heterozygous Allele Frequencies","Number of individuals/Number of chromosomes","In Hapmap","Strand Versus dbSNP","Probe Count","ChrX pseudo-autosomal region 2","Minor Allele","Minor Allele Frequency","OMIM"
"AX-11086612","rs10001348","4","29912308","+","0","p15.1","gtattcagttgaacacaaatcagtgcatgt[A/G]","A","G","ENST00000467087 // downstream // 2885305 // Hs.724550 // STIM2 // 57620 // stromal interaction molecule 2 /// ENST00000361762 // upstream // 809728 // Hs.479439 // PCDH7 // 5099 // protocadherin 7 /// NM_001169117 // downstream // 2885305 // Hs.724550 // STIM2 // 57620 // stromal interaction molecule 2 /// NM_032456 // upstream // 809728 // Hs.724529 // PCDH7 // 5099 // protocadherin 7","50.0229786923511 // D4S418 // D4S2408 // --- // --- /// 44.2128449948474 // D4S2397 // D4S2430 // ATA27C07 // GCT6F03 /// 42.4637111703432 // --- // --- // 226002 // 46437","D4S333 // upstream // 47955 /// D4S605 // downstream // 97312","0.872881356 // 0.127118644 // Caucasian /// 0.833333333 // 0.166666667 // Han Chinese /// 0.777777778 // 0.222222222 // Japanese /// 0.775 // 0.225 // Yoruban","0.254237288 // Caucasian /// 0.288888889 // Han Chinese /// 0.355555556 // Japanese /// 0.35 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","G // Caucasian /// G // Han Chinese /// G // Japanese /// G // Yoruban","0.127118644 // Caucasian /// 0.166666667 // Han Chinese /// 0.222222222 // Japanese /// 0.225 // Yoruban","---"
"AX-11086611","rs10001340","4","130341127","+","0","q28.2","[A/C]agggcattcatctcagcttactatttgggaaaaat","A","C","ENST00000281146 // downstream // 306645 // Hs.567679 // C4orf33 // 132321 // chromosome 4 open reading frame 33 /// ENST00000394248 // upstream // 3729342 // Hs.192859 // PCDH10 // 57575 // protocadherin 10 /// NM_173487 // downstream // 307285 // Hs.567679 // C4orf33 // 132321 // chromosome 4 open reading frame 33 /// NM_020815 // upstream // 3729342 // Hs.192859 // PCDH10 // 57575 // protocadherin 10","127.864057946266 // D4S1615 // D4S2365 // --- // --- /// 129.756132396152 // D4S2938 // D4S2365 // AFMA284WG5 // GATA10A12 /// 124.03426335901 // D4S2394 // --- // --- // 55218","D4S3198 // upstream // 331274 /// D4S2394 // downstream // 43310","0.815789474 // 0.184210526 // Caucasian /// 1.0 // 0.0 // Han Chinese /// 1.0 // 0.0 // Japanese /// 0.816666667 // 0.183333333 // Yoruban","0.368421053 // Caucasian /// 0.0 // Han Chinese /// 0.0 // Japanese /// 0.266666667 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","C // Caucasian /// C // Han Chinese /// C // Japanese /// C // Yoruban","0.184210526 // Caucasian /// 0.0 // Han Chinese /// 0.0 // Japanese /// 0.183333333 // Yoruban","---"
"AX-11086610","rs10001337","4","54351529","+","0","q12","atgaggagtagccacatgatctaagcacct[C/T]","T","C","ENST00000306888 // --- // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// ENST00000263925 // --- // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// NM_001126328 // intron // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1 /// NM_032622 // intron // 0 // Hs.518760 // LNX1 // 84708 // ligand of numb-protein X 1","67.4182086016315 // D4S2971 // D4S1594 // --- // --- /// 62.2091955728879 // D4S2971 // UNKNOWN // AFMB312YG1 // GATA61B02 /// 61.6059658777947 // --- // GATA61B02 // 149925 // ---","D4S461 // upstream // 151923 /// D4S2583E // downstream // 24481","0.118644068 // 0.881355932 // Caucasian /// 0.111111111 // 0.888888889 // Han Chinese /// 0.122222222 // 0.877777778 // Japanese /// 0.025 // 0.975 // Yoruban","0.203389831 // Caucasian /// 0.133333333 // Han Chinese /// 0.244444444 // Japanese /// 0.05 // Yoruban","60.0 // Caucasian /// 45.0 // Han Chinese /// 45.0 // Japanese /// 60.0 // Yoruban","YES","same","1","0","T // Caucasian /// T // Han Chinese /// T // Japanese /// T // Yoruban","0.118644068 // Caucasian /// 0.111111111 // Han Chinese /// 0.122222222 // Japanese /// 0.025 // Yoruban","---"
(...)

Calls

This file is returned by the Axiom machine. Each number encodes a genotype (AA/AB/BB/nil).
(..header...)
probeset_id A05.CEL A06.CEL A03.CEL A04.CEL A01.CEL A10.CEL A02.CEL A11.CEL A12.CEL A09.CEL A07.CEL A08.CEL B01.CEL B02.CEL B06.CEL B03.CEL B04.CEL B05.CEL B07.CEL B08.CEL B09.CEL B10.CEL C02.CEL B11.CEL B12.CEL C01.CEL C03.CEL C04.CEL C05.CEL C06.CEL C07.CEL C08.CEL C09.CEL C10.CEL C11.CEL C12.CEL D01.CEL D02.CEL D03.CEL D04.CEL D05.CEL D06.CEL D07.CEL D08.CEL D09.CEL D10.CEL D11.CEL D12.CEL E01.CEL E02.CEL E03.CEL E04.CEL E05.CEL E06.CEL E07.CEL E08.CEL E09.CEL E10.CEL E11.CEL E12.CEL F01.CEL F02.CEL F03.CEL F04.CEL F05.CEL F06.CEL F07.CEL F08.CEL H08.CEL H07.CEL G07.CEL G08.CEL H03.CEL H04.CEL H06.CEL H05.CEL G09.CEL G10.CEL H01.CEL H02.CEL G11.CEL G12.CEL G03.CEL G04.CEL H10.CEL H09.CEL F10.CEL F09.CEL G01.CEL G02.CEL F11.CEL F12.CEL G05.CEL G06.CEL H11.CEL H12.CEL
AX-11086525 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 0 1 0 1 0 0 1 1 2 1 1 0 0 0 0 1 0 0 1 0 0 1 1 0 1 0 1 1 2 0 1 0 0 1 2 0 2
AX-11086526 2 1 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 2 2 2 2 2 2 2 2 2 2 2 1 2 2 0 2 2 2 1 1 1 2 2 2 2 2 2 2 2 2 1 1 1 2 1 2 1 2 2 0 2 1 2 2 1 2 2 2 2 1 2 2 2 2 2 1 2 1 1 2 2 1 2 2 2 2 2 2 2 2 2 1 2 2 2
AX-11086527 2 0 2 2 2 1 2 2 2 2 2 2 2 2 2 2 2 2 2 2 1 2 2 2 2 2 2 2 2 1 2 2 1 1 0 2 1 2 1 2 2 1 1 1 1 2 2 1 0 2 1 2 1 2 2 2 1 2 1 1 1 1 2 2 2 2 2 1 1 0 2 0 1 0 1 0 1 2 1 1 0 1 1 0 2 1 1 1 0 1 1 1 2 2 1 1
(...)

Joined file

.. and this is what I want to obtain at the end:
AX-11086525 3 165621955 rs10000341 T T T T T T T T T T T T T T T G T T T T T T T T T T T T T T T T T T T T T T T G T G T T T T T G T T T T T T T T T T T T T T T T T T T T T T T T T T T G T T T T T G T T T G T T T T T T T T T T T T T T T T T T T T T G T T T T T T T T T G T T T G T T T G T T T T T G T G G G T G T G T T T T T T T T T G T T T T T G T T T T T G T G T T T G T T T G T G G G T T T G T T T T T G G G T T G G
AX-11086526 3 5237152 rs10000142 C C T C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C C T C C C C C C C C C C C C C C C C C C C C C C C T C C C C C T T C C C C C C T C T C T C C C C C C C C C C C C C C C C C C C T C T C T C C C T C C C T C C C C C T T C C T C C C C C T C C C C C C C C C T C C C C C C C C C C C T C C C T C T C C C C C T C C C C C C C C C C C C C C C C C C C T C C C C C C C
(...)

Setting up Hadoop and HDFS


export JAVA_HOME=/your/path/to/JDK
cd hadoop-0.20.203.0

Change the config
in "conf/core-site.xml":
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

in "conf/hdfs-site.xml":
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

in "conf/mapred-site.xml":
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
</configuration>


Setup ssh for no password:
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

change chmod for ssh:
$ chmod 700 ~/.ssh/
$ chmod 640 ~/.ssh/authorized_keys

if needed, add an alias in '/etc/hosts' and restart:"sudo /etc/rc.d/init.d/network restart"
Format HDFS:
bin/hadoop namenode -format
11/06/15 08:32:13 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = srv-clc-04.u915.irt.univ-nantes.prive3/127.0.0.1
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 0.20.203.0
STARTUP_MSG: build = http://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-203 -r 1099333; compiled by 'oom' on Wed May 4 07:57:50 PDT 2011
************************************************************/
11/06/15 08:32:13 INFO util.GSet: VM type = 64-bit
11/06/15 08:32:13 INFO util.GSet: 2% max memory = 19.1675 MB
11/06/15 08:32:13 INFO util.GSet: capacity = 2^21 = 2097152 entries
11/06/15 08:32:13 INFO util.GSet: recommended=2097152, actual=2097152
11/06/15 08:32:13 INFO namenode.FSNamesystem: fsOwner=lindenb
11/06/15 08:32:13 INFO namenode.FSNamesystem: supergroup=supergroup
11/06/15 08:32:13 INFO namenode.FSNamesystem: isPermissionEnabled=true
11/06/15 08:32:13 INFO namenode.FSNamesystem: dfs.block.invalidate.limit=100
11/06/15 08:32:13 INFO namenode.FSNamesystem: isAccessTokenEnabled=false accessKeyUpdateInterval=0 min(s), accessTokenLifetime=0 min(s)
11/06/15 08:32:13 INFO namenode.NameNode: Caching file names occuring more than 10 times
11/06/15 08:32:13 INFO common.Storage: Image file of size 113 saved in 0 seconds.
11/06/15 08:32:14 INFO common.Storage: Storage directory /home/lindenb/tmp/HADOOP/dfs/name has been successfully formatted.
11/06/15 08:32:14 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at srv-clc-04.u915.irt.univ-nantes.prive3/127.0.0.1

... and start HDFS:
$ ./bin/start-all.sh 
namenode running as process 1151. Stop it first.
localhost: starting datanode, logging to /home/lindenb/package/hadoop-0.20.203.0/bin/../logs/hadoop-lindenb-datanode-srv-clc-04.u915.irt.univ-nantes.prive3.out
localhost: secondarynamenode running as process 1490. Stop it first.
jobtracker running as process 1588. Stop it first.
localhost: starting tasktracker, logging to /home/lindenb/package/hadoop-0.20.203.0/bin/../logs/hadoop-lindenb-tasktracker-srv-clc-04.u915.irt.univ-nantes.prive3.out

Copy the Axiom data to HDFS:
$bin/hadoop fs  -mkdir myfolder
$ bin/hadoop fs -copyFromLocal ~/Axiom_GW_Hu_SNP.r2.na31.annot.csv myfolder/
$ bin/hadoop fs -copyFromLocal ~/AxiomGT1.calls.txt myfolder/

The Classes

The annotation and the genotypes file will be sorted and we're going to merge both results. The classes implement Writable in order to be (un)serialized from/to HDFS .

Genotypes

This class contains the four possible codes for the genotypes (0, 1, 2 or -1):
 static public class Genotypes implements Writable
{
private byte array[];
public Genotypes()
{
this(0);
}
public Genotypes(int n)
{
this.array=new byte[n];
}
@Override
public void readFields(DataInput in) throws IOException
{
int n=in.readInt();
this.array=new byte[n];
in.readFully(this.array);
}
@Override
public void write(DataOutput out) throws IOException
{
out.writeInt(this.array.length);
out.write(this.array);
}
}

Position

A position on the genome.
static public class Position implements WritableComparable<Position>
{
private String chrom="N/A";
private int position=-1;

public Position()
{
}
public Position(String chrom,int position)
{
this.chrom=chrom;
this.position=position;
}
public String getChrom()
{
return chrom;
}

public int getPosition()
{
return position;
}

@Override
public void readFields(DataInput in) throws IOException {
this.chrom=in.readUTF();
this.position=in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(chrom);
out.writeInt(position);
}
@Override
public int compareTo(Position o) {
int i=chrom.compareTo(o.chrom);
if(i!=0) return i;
return position - o.position;
}
@Override
public String toString() {
return chrom+":"+position;
}
}

Marker

This class contains the data available in the annotation file:
static public class Marker implements WritableComparable<Marker>
{
private Position position=new Position();
private String alleleA;
private String alleleB;
private String probeSetId;
private String rsid;
public Marker()
{
}

public Position getPosition() {
return position;
}

public String getAlleleA() {
return alleleA;
}

public String getAlleleB() {
return alleleB;
}

public String getProbeSetId() {
return probeSetId;
}

public String getRsid() {
return rsid;
}

@Override
public void readFields(DataInput in) throws IOException {
this.position.readFields(in);
this.alleleA=in.readUTF();
this.alleleB=in.readUTF();
this.probeSetId=in.readUTF();
this.rsid=in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
this.position.write(out);
out.writeUTF(this.alleleA);
out.writeUTF(this.alleleB);
out.writeUTF(this.probeSetId);
out.writeUTF(this.rsid);
}
@Override
public int compareTo(Marker o) {
return position.compareTo(o.position);
}
@Override
public String toString() {
return this.probeSetId+" "+position;
}
}

MapReduce for sorting the Annotation file.

We need to split each CSV line and sort the file on the probeSetId. The output will be a key/value [probeSetId(String)/Marker].
Mapper
public static class SortMarkerByProbIdMapper extends Mapper<LongWritable, Text, Text, Marker>
{
(...)
@Override
protected void map(
LongWritable key,
Text value,
Context context)
throws java.io.IOException ,InterruptedException
{
//ignore header and comment
if( value.find("\"Probe Set ID\"")==0 ||
value.find("#")==0 )
{
return;
}

List<String> array=splitCSV(new String(value.getBytes(),0,value.getLength()));
if(array.get(this.chromCol).equals("---")) return;//undefined chromosome

Marker m=new Marker();
m.position=new Position(
array.get(this.chromCol),
Integer.parseInt(array.get(this.posCol))
);
m.alleleA=array.get(this.alleleACol);
m.alleleB=array.get(this.alleleBCol);
m.probeSetId=array.get(this.probeSetIdCol);
m.rsid=array.get(this.rsIdCol);
context.write(new Text(m.probeSetId),m);
}
}

Reducer
public static class SortMarkerByProbIdReducer extends Reducer<Text, Marker, Text, Marker>
{
@Override
protected void reduce(
Text key,
Iterable<Marker> values,
Context context
) throws java.io.IOException ,InterruptedException
{
Marker marker=null;
for(Marker i:values)
{
if(marker!=null) throw new IOException("Duplicate marker id "+key);
marker=i;
}
if(marker==null) return;
context.write(key,marker);
}
}

Job
private void sortMarkersByProbeid(Configuration conf) throws Exception
{
//JobConf jobConf=new JobConf(conf);

FileSystem fs=FileSystem.get(conf);
final Path outPath=new Path(SORTED_MARKERS_PATH);
final Path inPath=new Path("myfolder/Axiom_GW_Hu_SNP.r2.na31.annot.csv");

List<String> header=null;
(...)
/*
(...)
open the file and find the column indexes
(...)
*/

Job job = new Job(conf, Hadoop01.class.getName());
job.setJarByClass(Hadoop01.class);
job.setMapperClass(SortMarkerByProbIdMapper.class);
job.setReducerClass(SortMarkerByProbIdReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Marker.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Marker.class);
job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);


if(fs.exists(outPath))
{
fs.delete(outPath, true);
}

FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);


if(!job.waitForCompletion(true) )
{
throw new IOException("Cannot complete job");
}
}

MapReduce for sorting the 'Genotypes' file.

The output will be a key/value [probeSetId(String)/Genotypes].
Mapper
public static class SortCallByProbIdMapper extends Mapper<LongWritable, Text, Text, Genotypes>
{
final private Pattern tab=Pattern.compile("[\t]");
@Override
protected void map(
LongWritable key,
Text value,
Context context)
throws java.io.IOException ,InterruptedException
{
//ignore header and comment
if( value.find("probeset_id")==0 ||
value.find("%")==0 )
{
return;
}

String tokens[]=tab.split(new String(value.getBytes(),0,value.getLength()));
Genotypes genotypes=new Genotypes(tokens.length-1);
for(int i=1;i< tokens.length;++i)
{
genotypes.array[i-1]=Byte.parseByte(tokens[i]);
}
context.write(new Text(tokens[0]),genotypes);
}
}

Reducer
public static class SortCallByProbIdReducer extends Reducer<Text, Genotypes, Text, Genotypes>
{
@Override
protected void reduce(
Text key,
Iterable<Genotypes> values,
Context context
) throws java.io.IOException ,InterruptedException
{
Genotypes array=null;
for(Genotypes i:values)
{
if(array!=null) throw new IOException("Duplicate marker id "+key);
array=i;
}
if(array==null) return;
context.write(key,array);
}
}

Job
public void sortCallsByProbeid(Configuration conf) throws Exception
{
//JobConf jobConf=new JobConf(conf);

FileSystem fs=FileSystem.get(conf);
final Path outPath=new Path(SORTED_CALLS_PATH);
final Path inPath=new Path("myfolder/AxiomGT1.calls.txt");

Job job = new Job(conf, Hadoop01.class.getName());
job.setJarByClass(Hadoop01.class);
job.setMapperClass(SortCallByProbIdMapper.class);
job.setReducerClass(SortCallByProbIdReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Genotypes.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Genotypes.class);

job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);



if(fs.exists(outPath))
{
fs.delete(outPath, true);
}

FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);


if(!job.waitForCompletion(true) )
{
throw new IOException("Cannot complete job");
}
}

Joining the two sorted files

It was the most complicated part, as I didn't found much documentation about how to 'join' two files in HDFS with the latest hadoop API. The following solution seems to work and it only needs a reducer:
Reducer
public static class SortCallByProbIdReducer extends Reducer<Text, Genotypes, Text, Genotypes>
{
@Override
protected void reduce(
Text key,
Iterable<Genotypes> values,
Context context
) throws java.io.IOException ,InterruptedException
{
Genotypes array=null;
for(Genotypes i:values)
{
if(array!=null) throw new IOException("Duplicate marker id "+key);
array=i;
}
if(array==null) return;
context.write(key,array);
}
}

Job
private void join(Configuration conf) throws Exception
{
FileSystem fs=FileSystem.get(conf);
Path outPath=new Path(JOIN_PATH);
if(fs.exists(outPath))
{
fs.delete(outPath, true);
}

final String compose=CompositeInputFormat.compose(
"inner",
SequenceFileInputFormat.class,
new Path(SORTED_MARKERS_PATH),
new Path(SORTED_CALLS_PATH)
);
System.err.println(compose);
JobConf jobConf = new JobConf(conf, getClass());
jobConf.setJobName("join");
jobConf.setInputFormat(CompositeInputFormat.class);
jobConf.set("mapred.join.expr",
compose);

jobConf.setMapOutputKeyClass(Text.class);
jobConf.setMapOutputValueClass(TupleWritable.class);
jobConf.setOutputValueClass(Text.class);//TupleWritable ?
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setReducerClass(JoinReducer.class);

//jobConf.setMapOutputValueClass(Text.class);
org.apache.hadoop.mapred.FileOutputFormat.setOutputPath(jobConf, outPath);
JobClient.runJob(jobConf);
}

Full source code



That's it,

Pierre

21 April 2009

Hadoop, my notebook: HDFS

This post is about the Apache Hadoop, an open-source algorithm implementing the MapReduce algorithm. This first notebook focuses on HDFS, the Hadoop file system, and follows the great Yahoo! Hadoop Tutorial Home. Forget the clusters, I'm running this hadoop engine on my one and only laptop.

Downloading & Installing


~/tmp/HADOOP> wget "http://apache.multidist.com/hadoop/core/hadoop-0.19.1/hadoop-0.19.1.tar.gz
Saving to: `hadoop-0.19.1.tar.gz'

100%[======================================>] 55,745,146 487K/s in 1m 53s

2009-04-21 20:52:04 (480 KB/s) - `hadoop-0.19.1.tar.gz' saved [55745146/55745146]
~/tmp/HADOOP> tar xfz hadoop-0.19.1.tar.gz
~/tmp/HADOOP> rm hadoop-0.19.1.tar.gz
~/tmp/HADOOP> mkdir -p hdfs/data
~/tmp/HADOOP> mkdir -p hdfs/name
#hum... this step was not clear as I'm not a ssh guru. I had to give my root password to make the server starts
~/tmp/HADOOP> ssh-keygen -t rsa -P 'password' -f ~/.ssh/id_rsa
Generating public/private dsa key pair.
Your identification has been saved in /home/pierre/.ssh/id_rsa.
Your public key has been saved in /home/pierre/.ssh/id_rsa.pub.
The key fingerprint is:
17:c0:29:b4:56:d1:d3:dd:ae:d5:ba:3e:5b:33:b0:99 pierre@linux-zfgk
~/tmp/HADOOP> cat ~/.ssh/id_rsa.pub >> ~/.ssh/autorized_keys

Editing the Cluster configuration


Edit the file hadoop-0.19.1/conf/hadoop-site.xml.
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
<description>This is the URI (protocol specifier, hostname, and port) that describes the NameNode (main Node) for the cluster.</description>
</property>
<property>
<name>dfs.data.dir</name>
<value>/home/pierre/tmp/HADOOP/hdfs/data</value>
<description>This is the path on the local file system in which the DataNode instance should store its data</description>
</property>
<property>
<name>dfs.name.dir</name>
<value>/home/pierre/tmp/HADOOP/hdfs/name</value>
<description>This is the path on the local file system of the NameNode instance where the NameNode metadata is stored.</description>
</property>
</configuration>

Formatting HDFS


HDFS the Hadoop Distributed File System "HDFS is a block-structured file system: individual files are broken into blocks of a fixed size. These blocks are stored across a cluster of one or more machines with data storage capacity. A file can be made of several blocks, and they are not necessarily stored on the same machine(...)If several machines must be involved in the serving of a file, then a file could be rendered unavailable by the loss of any one of those machines. HDFS combats this problem by replicating each block across a number of machines (3, by default)."
~/tmp/HADOOP> hadoop-0.19.1/bin/hadoop namenode -format
09/04/21 21:11:18 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG: host = linux-zfgk.site/127.0.0.2
STARTUP_MSG: args = [-format]
STARTUP_MSG: version = 0.19.1
STARTUP_MSG: build = https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19 -r 745977; compiled by 'ndaley' on Fri Feb 20 00:16:34 UTC 2009
************************************************************/
Re-format filesystem in /home/pierre/tmp/HADOOP/hdfs/name ? (Y or N) Y
09/04/21 21:11:29 INFO namenode.FSNamesystem: fsOwner=pierre,users,dialout,video
09/04/21 21:11:29 INFO namenode.FSNamesystem: supergroup=supergroup
09/04/21 21:11:29 INFO namenode.FSNamesystem: isPermissionEnabled=true
09/04/21 21:11:29 INFO common.Storage: Image file of size 96 saved in 0 seconds.
09/04/21 21:11:29 INFO common.Storage: Storage directory /home/pierre/tmp/HADOOP/hdfs/name has been successfully formatted.
09/04/21 21:11:29 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at linux-zfgk.site/127.0.0.2
************************************************************/

Starting HDFS


~/tmp/HADOOP> hadoop-0.19.1/bin/start-dfs.sh
starting namenode, logging to /home/pierre/tmp/HADOOP/hadoop-0.19.1/bin/../logs/hadoop-pierre-namenode-linux-zfgk.out
Password:
localhost: starting datanode, logging to /home/pierre/tmp/HADOOP/hadoop-0.19.1/bin/../logs/hadoop-pierre-datanode-linux-zfgk.out
Password:
localhost: starting secondarynamenode, logging to /home/pierre/tmp/HADOOP/hadoop-0.19.1/bin/../logs/hadoop-pierre-secondarynamenode-linux-zfgk.out

Playing with HDFS


First Download a few SNP from UCSC/dbsnp into ~/local.xls.
~/tmp/HADOOP> mysql -N --user=genome --host=genome-mysql.cse.ucsc.edu -A -D hg18 -e 'select name,chrom,chromStart,avHet from snp129 where avHet!=0 and name like "rs12345%" ' > ~/local.xls

Creating directories
~/tmp/HADOOP> hadoop-0.19.1/bin/hadoop dfs -mkdir /user
~/tmp/HADOOP> hadoop-0.19.1/bin/hadoop dfs -mkdir /user/pierre

Copying a file "local.xls" from your local file system to HDFS
~/tmp/HADOOP> hadoop-0.19.1/bin/hadoop dfs -put ~/local.xls stored.xls

Recursive listing of HDFS
~/tmp/HADOOP> hadoop-0.19.1/bin/hadoop dfs -lsr /
drwxr-xr-x - pierre supergroup 0 2009-04-21 21:45 /user
drwxr-xr-x - pierre supergroup 0 2009-04-21 21:45 /user/pierre
-rw-r--r-- 3 pierre supergroup 308367 2009-04-21 21:45 /user/pierre/stored.xls

'cat' the first lines of the SNP file stored on HDFS:
~/tmp/HADOOP> hadoop-0.19.1/bin/hadoop dfs -cat /user/pierre/stored.xls | head
rs12345003 chr9 1765426 0.02375
rs12345004 chr9 2962430 0.055768
rs12345006 chr9 74304094 0.009615
rs12345007 chr9 73759324 0.112463
rs12345008 chr9 88421765 0.014184
rs12345013 chr9 78951530 0.104463
rs12345014 chr9 78542260 0.490608
rs12345015 chr9 10121973 0.201446
rs12345016 chr9 2698257 0.456279
rs12345027 chr9 8399632 0.04828

Removing a file. Note: "On startup, the NameNode enters a special state called Safemode." I could not delete a file before I used "dfsadmin -safemode leave".
~/tmp/HADOOP> hadoop-0.19.1/bin/hadoop dfsadmin -safemode leave
Safe mode is OFF
~/tmp/HADOOP> hadoop-0.19.1/bin/hadoop dfs -rm /user/pierre/stored.xls
Deleted hdfs://localhost:9000/user/pierre/stored.xls

Check there is NO file named stored.xls in the local file system !
~/tmp/HADOOP> find hdfs/
hdfs/
hdfs/data
hdfs/data/detach
hdfs/data/in_use.lock
hdfs/data/tmp
hdfs/data/current
hdfs/data/current/blk_3340572659657793789
hdfs/data/current/dncp_block_verification.log.curr
hdfs/data/current/blk_3340572659657793789_1002.meta
hdfs/data/current/VERSION
hdfs/data/storage
hdfs/name
hdfs/name/in_use.lock
hdfs/name/current
hdfs/name/current/edits
hdfs/name/current/VERSION
hdfs/name/current/fsimage
hdfs/name/current/fstime
hdfs/name/image
hdfs/name/image/fsimage


Stop HDFS


~/tmp/HADOOP> hadoop-0.19.1/bin/stop-dfs.sh
stopping namenode
Password:
localhost: stopping datanode
Password:
localhost: stopping secondarynamenode



Pierre