1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| @Override public synchronized boolean append(TxnHeader hdr, Record txn, TxnDigest digest) throws IOException { if (hdr == null) { return false; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn( "Current zxid {} is <= {} for {}", hdr.getZxid(), lastZxidSeen, Request.op2String(hdr.getType())); } else { lastZxidSeen = hdr.getZxid(); } if (logStream == null) { LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid())); fos = new FileOutputStream(logFileWrite); logStream = new BufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId); fhdr.serialize(oa, "fileheader"); logStream.flush(); filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos); } filePadding.padFile(fos.getChannel()); byte[] buf = Util.marshallTxnEntry(hdr, txn, digest); if (buf == null || buf.length == 0) { throw new IOException("Faulty serialization for header " + "and txn"); } Checksum crc = makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf);
return true; }
|