hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dennis Kubes <ku...@apache.org>
Subject Re: To solve the checksum errors on the non-ecc mem machines.
Date Tue, 14 Aug 2007 14:35:15 GMT
How does this fix the non-ecc memory errors?

Dennis Kubes

Daeseong Kim wrote:
> To solve the checksum errors on the non-ecc memory machines, I
> modified some codes in DFSClient.java and DataNode.java.
> 
> The idea is very simple.
> The original CHUNK structure is
> {chunk size}{chunk data}{chunk size}{chunk data}...
> 
> The modified CHUNK structure is
> {chunk size}{chunk data}{chunk crc}{chunk size}{chunk data}{chunk crc}...
> 
> Here is codes.
> ------------------------
> DFSClient.java
> 
> import java.util.zip.*;
> 
> private synchronized void endBlock() throws IOException {
>       long sleeptime = 400;
>       //
>       // Done with local copy
>       //
>       closeBackupStream();
>       Checksum sum = new CRC32();
> 
>       //
>       // Send it to datanode
>       //
>       boolean sentOk = false;
>       int remainingAttempts =
>         conf.getInt("dfs.client.block.write.retries", 3);
>       while (!sentOk) {
>         nextBlockOutputStream();
>         InputStream in = new FileInputStream(backupFile);
>         try {
>           byte buf[] = new byte[BUFFER_SIZE];
>           int bytesRead = in.read(buf);
>           while (bytesRead > 0) {
>           	
>           	boolean checked = false;
>           	while (!checked) {   	
>             	blockStream.writeLong((long) bytesRead);
>             	blockStream.write(buf, 0, bytesRead);
>             	
>             	// here we will send crc data
>             	sum.reset();
>             	sum.update(buf, 0, bytesRead);
>             	int crc = (int) sum.getValue();
>             	blockStream.writeInt(crc);
>             	blockStream.flush();
>             	
>             	byte re = (byte) blockReplyStream.read();
>             	if (re == 0x00) checked = true;
>             }
> 
>             if (progress != null) { progress.progress(); }
>             bytesRead = in.read(buf);
>           }
>           internalClose();
>           sentOk = true;
>         } catch (IOException ie) {
>           handleSocketException(ie);
>           remainingAttempts -= 1;
>           if (remainingAttempts == 0) {
>             throw ie;
>           }
>           try {
>             Thread.sleep(sleeptime);
>           } catch (InterruptedException e) {
>           }
>         } finally {
>           in.close();
>         }
>       }
> 
>       bytesWrittenToBlock = 0;
>       //
>       // Delete local backup, start new one
>       //
>       deleteBackupFile();
>       File tmpFile = newBackupFile();
>       bytesWrittenToBlock = 0;
>       backupStream = new FileOutputStream(tmpFile);
>       backupFile = tmpFile;
>     }
> 
> 
> DataNode.java
> 
> import java.util.zip.*;
> 
> private void writeBlock(DataInputStream in) throws IOException {
>       //
>       // Read in the header
>       //
>       DataOutputStream reply =
>         new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));
>       try {
>         boolean shouldReportBlock = in.readBoolean();
>         Block b = new Block();
>         b.readFields(in);
>         int numTargets = in.readInt();
>         if (numTargets <= 0) {
>           throw new IOException("Mislabelled incoming datastream.");
>         }
>         DatanodeInfo targets[] = new DatanodeInfo[numTargets];
>         for (int i = 0; i < targets.length; i++) {
>           DatanodeInfo tmp = new DatanodeInfo();
>           tmp.readFields(in);
>           targets[i] = tmp;
>         }
>         byte encodingType = (byte) in.read();
>         long len = in.readLong();
> 
>         //
>         // Make sure curTarget is equal to this machine
>         //
>         DatanodeInfo curTarget = targets[0];
> 
>         //
>         // Track all the places we've successfully written the block
>         //
>         Vector<DatanodeInfo> mirrors = new Vector<DatanodeInfo>();
> 
>         //
>         // Open local disk out
>         //
>         OutputStream o;
>         try {
>           o = data.writeToBlock(b);
>         } catch( IOException e ) {
>           checkDiskError( e );
>           throw e;
>         }
>         DataOutputStream out = new DataOutputStream(new
> BufferedOutputStream(o));
>         InetSocketAddress mirrorTarget = null;
>         String mirrorNode = null;
>         try {
>           //
>           // Open network conn to backup machine, if
>           // appropriate
>           //
>           DataInputStream in2 = null;
>           DataOutputStream out2 = null;
>           if (targets.length > 1) {
>             // Connect to backup machine
>             mirrorNode = targets[1].getName();
>             mirrorTarget = createSocketAddr(mirrorNode);
>             try {
>               Socket s2 = new Socket();
>               s2.connect(mirrorTarget, READ_TIMEOUT);
>               s2.setSoTimeout(READ_TIMEOUT);
>               out2 = new DataOutputStream(new
> BufferedOutputStream(s2.getOutputStream()));
>               in2 = new DataInputStream(new
> BufferedInputStream(s2.getInputStream()));
> 
>               // Write connection header
>               out2.write(OP_WRITE_BLOCK);
>               out2.writeBoolean(shouldReportBlock);
>               b.write(out2);
>               out2.writeInt(targets.length - 1);
>               for (int i = 1; i < targets.length; i++) {
>                 targets[i].write(out2);
>               }
>               out2.write(encodingType);
>               out2.writeLong(len);
>               myMetrics.replicatedBlocks(1);
>             } catch (IOException ie) {
>               if (out2 != null) {
>                 LOG.info("Exception connecting to mirror " + mirrorNode
>                          + "\n" + StringUtils.stringifyException(ie));
>                 try {
>                   out2.close();
>                   in2.close();
>                 } catch (IOException out2close) {
>                 } finally {
>                   out2 = null;
>                   in2 = null;
>                 }
>               }
>             }
>           }
> 
>           //
>           // Process incoming data, copy to disk and
>           // maybe to network. First copy to the network before
>           // writing to local disk so that all datanodes might
>           // write to local disk in parallel.
>           //
>           boolean anotherChunk = len != 0;
>           byte buf[] = new byte[BUFFER_SIZE];
> 
>           Checksum sum = new CRC32();
> 
>           while (anotherChunk) {
>           	
>           	if (encodingType == CHUNKED_ENCODING)
>           	{
> 	          	// read fully
> 	          	int pos = 0;
> 	          	int remain = (int) len;
> 	          	while (remain > 0) {
> 	          		int bytesRead = in.read(buf, pos, remain);
> 	          		if (bytesRead < 0) {
> 	          			throw new EOFException("EOF reading from "+s.toString());
> 	          		}
> 	          		pos += bytesRead;
> 	          		remain -= bytesRead;
> 	          	}
> 	          	
> 	          	// read crc
> 	          	int crc = in.readInt();
> 	          	sum.reset();
> 	          	sum.update(buf, 0, (int)len);
> 	          	int res = (int) sum.getValue();
> 	          	
> 	          	if (crc == res) {
> 	          		reply.write(0x00);
> 	          		reply.flush();
> 
> 	          		if (out2 != null) {
> 	          			try {
> 	          				out2.write(buf, 0, (int)len);
> 	          				out2.writeInt(crc);
> 	          				out2.flush();
> 	          				byte re = (byte) in2.read();
>             				if (re != 0x00) throw new IOException("fail to copy
> data to replica");
>                   } catch (IOException out2e) {
>                     LOG.info("Exception writing to mirror " + mirrorNode
>                              + "\n" + StringUtils.stringifyException(out2e));
>                     //
>                     // If stream-copy fails, continue
>                     // writing to disk.  We shouldn't
>                     // interrupt client write.
>                     //
>                     try {
>                       out2.close();
>                       in2.close();
>                     } catch (IOException out2close) {
>                     } finally {
>                       out2 = null;
>                       in2 = null;
>                     }
>                   }
>                 }
>                 try {
>                   out.write(buf, 0, (int)len);
>                   myMetrics.wroteBytes((int)len);
>                 } catch (IOException iex) {
>                   checkDiskError(iex);
>                   throw iex;
>                 }
> 
>                 len = in.readLong();
> 	              if (out2 != null) {
> 	                try {
> 	                  out2.writeLong(len);
> 	                } catch (IOException ie) {
> 	                  LOG.info("Exception writing to mirror " + mirrorNode
> 	                           + "\n" + StringUtils.stringifyException(ie));
> 	                  try {
> 	                    out2.close();
> 	                    in2.close();
> 	                  } catch (IOException ie2) {
> 	                    // NOTHING
> 	                  } finally {
> 	                    out2 = null;
> 	                    in2 = null;
> 	                  }
> 	                }
> 	              }
> 
> 	          	} else {
> 	          		reply.write(0x01);
> 	          		reply.flush();
> 	          		len = in.readLong();
> 	          	}
> 	          	
> 	          	if (len == 0) {
>                 anotherChunk = false;
>               }
> 	          } else if (encodingType == RUNLENGTH_ENCODING) {
> 	          	
> 	          	while (len > 0) {
> 	              int bytesRead = in.read(buf, 0, (int)Math.min(buf.length, len));
> 	              if (bytesRead < 0) {
> 	                throw new EOFException("EOF reading from "+s.toString());
> 	              }
> 	              if (bytesRead > 0) {
> 	                if (out2 != null) {
> 	                  try {
> 	                    out2.write(buf, 0, bytesRead);
> 	                  } catch (IOException out2e) {
> 	                    LOG.info("Exception writing to mirror " + mirrorNode
> 	                             + "\n" + StringUtils.stringifyException(out2e));
> 	                    //
> 	                    // If stream-copy fails, continue
> 	                    // writing to disk.  We shouldn't
> 	                    // interrupt client write.
> 	                    //
> 	                    try {
> 	                      out2.close();
> 	                      in2.close();
> 	                    } catch (IOException out2close) {
> 	                    } finally {
> 	                      out2 = null;
> 	                      in2 = null;
> 	                    }
> 	                  }
> 	                }
> 	                try {
> 	                  out.write(buf, 0, bytesRead);
> 	                  myMetrics.wroteBytes(bytesRead);
> 	                } catch (IOException iex) {
> 	                  checkDiskError(iex);
> 	                  throw iex;
> 	                }
> 	                len -= bytesRead;
> 	              }
>             	}
> 	          	
>               anotherChunk = false;
>             }
>           }
> 
>           if (out2 != null) {
>             try {
>               out2.flush();
>               long complete = in2.readLong();
>               if (complete != WRITE_COMPLETE) {
>                 LOG.info("Conflicting value for WRITE_COMPLETE: " + complete);
>               }
>               LocatedBlock newLB = new LocatedBlock();
>               newLB.readFields(in2);
>               in2.close();
>               out2.close();
>               DatanodeInfo mirrorsSoFar[] = newLB.getLocations();
>               for (int k = 0; k < mirrorsSoFar.length; k++) {
>                 mirrors.add(mirrorsSoFar[k]);
>               }
>             } catch (IOException ie) {
>               LOG.info("Exception writing to mirror " + mirrorNode
>                        + "\n" + StringUtils.stringifyException(ie));
>               try {
>                 out2.close();
>                 in2.close();
>               } catch (IOException ie2) {
>                 // NOTHING
>               } finally {
>                 out2 = null;
>                 in2 = null;
>               }
>             }
>           }
>           if (out2 == null) {
>             LOG.info("Received block " + b + " from " +
>                      s.getInetAddress());
>           } else {
>             LOG.info("Received block " + b + " from " +
>                      s.getInetAddress() +
>                      " and mirrored to " + mirrorTarget);
>           }
>         } finally {
>           try {
>             out.close();
>           } catch (IOException iex) {
>             checkDiskError(iex);
>             throw iex;
>           }
>         }
>         data.finalizeBlock(b);
>         myMetrics.wroteBlocks(1);
> 
>         //
>         // Tell the namenode that we've received this block
>         // in full, if we've been asked to.  This is done
>         // during NameNode-directed block transfers, but not
>         // client writes.
>         //
>         if (shouldReportBlock) {
>           synchronized (receivedBlockList) {
>             receivedBlockList.add(b);
>             receivedBlockList.notifyAll();
>           }
>         }
> 
>         //
>         // Tell client job is done, and reply with
>         // the new LocatedBlock.
>         //
>         reply.writeLong(WRITE_COMPLETE);
>         mirrors.add(curTarget);
>         LocatedBlock newLB = new LocatedBlock(b, mirrors.toArray(new
> DatanodeInfo[mirrors.size()]));
>         newLB.write(reply);
>       } finally {
>         reply.close();
>       }
>     }
>   }

Mime
View raw message