protected void divideProcessing (int bufferSize, StringBuffer bf, ByteBuffer bb, GatheringByteChannel outChannel, String charset) throws IOException { int bfsCnt = bf.length()/bufferSize+1; for(int i=0;i<bfsCnt;i++) { String sub = null; if(i<bfsCnt-1) { sub = bf.substring(i*bufferSize, (i+1)*bufferSize); } else { sub = bf.substring(i*bufferSize, i*bufferSize+(bf.length()%bufferSize)); } if(sub != null){ bb.put(sub.getBytes(charset)); bb.flip(); outChannel.write(bb); bb.clear(); } } bf.setLength(0); }
@Override public void writeTo(GatheringByteChannel channel) throws IOException { // nothing to do if (length == 0) { return; } int currentLength = length; int currentOffset = offset; BytesRef ref = new BytesRef(); while (currentLength > 0) { // try to align to the underlying pages while writing, so no new arrays will be created. int fragmentSize = Math.min(currentLength, PAGE_SIZE - (currentOffset % PAGE_SIZE)); boolean newArray = bytearray.get(currentOffset, fragmentSize, ref); assert !newArray : "PagedBytesReference failed to align with underlying bytearray. offset [" + currentOffset + "], size [" + fragmentSize + "]"; Channels.writeToChannel(ref.bytes, ref.offset, ref.length, channel); currentLength -= ref.length; currentOffset += ref.length; } assert currentLength == 0; }
@Override public long writeTo(GatheringByteChannel channel) throws IOException { if (completed()) throw new KafkaException("This operation cannot be invoked on a complete request."); int totalWrittenPerCall = 0; boolean sendComplete; do { long written = current.writeTo(channel); totalWrittenPerCall += written; sendComplete = current.completed(); if (sendComplete) nextSendOrDone(); } while (!completed() && sendComplete); totalWritten += totalWrittenPerCall; if (completed() && totalWritten != size) log.error("mismatch in sending bytes over socket; expected: " + size + " actual: " + totalWritten); log.trace("Bytes written as part of multi-send call: {}, total bytes written so far: {}, expected bytes to write: {}", totalWrittenPerCall, totalWritten, size); return totalWrittenPerCall; }
@Override public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException { long newSize = Math.min(channel.size(), end) - start; int oldSize = sizeInBytes(); if (newSize < oldSize) throw new KafkaException(String.format( "Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), oldSize, newSize)); long position = start + offset; int count = Math.min(length, oldSize); final long bytesTransferred; if (destChannel instanceof TransportLayer) { TransportLayer tl = (TransportLayer) destChannel; bytesTransferred = tl.transferFrom(channel, position, count); } else { bytesTransferred = channel.transferTo(position, count, destChannel); } return bytesTransferred; }
@Override public long writeTo(GatheringByteChannel channel) throws IOException { long written = 0; if (remaining > 0) { written = records.writeTo(channel, size() - remaining, remaining); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; } pending = TransportLayers.hasPendingWrites(channel); if (remaining <= 0 && pending) channel.write(EMPTY_BYTE_BUFFER); return written; }
@Override public long writeTo(GatheringByteChannel channel) throws IOException { if (completed()) throw new KafkaException("This operation cannot be completed on a complete request."); int totalWrittenPerCall = 0; boolean sendComplete = false; do { long written = current.writeTo(channel); totalWritten += written; totalWrittenPerCall += written; sendComplete = current.completed(); if (sendComplete) nextSendOrDone(); } while (!completed() && sendComplete); if (log.isTraceEnabled()) log.trace("Bytes written as part of multisend call : " + totalWrittenPerCall + "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + size); return totalWrittenPerCall; }
protected long transfer(ByteBuf source, WritableByteChannel target, MultipartState sourceFullyWrittenState) throws IOException { int transferred = 0; if (target instanceof GatheringByteChannel) { transferred = source.readBytes((GatheringByteChannel) target, (int) source.readableBytes()); } else { for (ByteBuffer byteBuffer : source.nioBuffers()) { int len = byteBuffer.remaining(); int written = target.write(byteBuffer); transferred += written; if (written != len) { // couldn't write full buffer, exit loop break; } } // assume this is a basic single ByteBuf source.readerIndex(source.readerIndex() + transferred); } if (source.isReadable()) { slowTarget = true; } else { state = sourceFullyWrittenState; } return transferred; }
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { checkIndex(index, length); if (length == 0) { return 0; } ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = memory.duplicate(); } index = idx(index); tmpBuf.clear().position(index).limit(index + length); return out.write(tmpBuf); }
public static void gather() { ByteBuffer header = ByteBuffer.allocate(14); ByteBuffer body = ByteBuffer.allocate(100); header.put(headers.getBytes()); body.put(bodys.getBytes()); GatheringByteChannel channel = getChannel(); try { header.flip(); body.flip(); channel.write(new ByteBuffer[] { header, body }); } catch (IOException e) { e.printStackTrace(); } }
private void sendBuffer (ByteBuffer data, GatheringByteChannel channel, String contentType) throws Exception { ByteBuffer [] buffers = { staticHdr, dynHdr, data }; staticHdr.rewind(); cbtemp.clear(); cbtemp.put ("Content-Length: " + data.limit()); cbtemp.put (LINE_SEP); cbtemp.put ("Content-Type: "); cbtemp.put (contentType); cbtemp.put (LINE_SEP); cbtemp.put (LINE_SEP); cbtemp.flip(); buffers [1] = utf8.encode (cbtemp); while (channel.write (buffers) != 0) { // nothing } }
public static void main (String [] argv) throws Exception { int reps = 10; if (argv.length > 0) { reps = Integer.parseInt (argv [0]); } FileOutputStream fos = new FileOutputStream (DEMOGRAPHIC); GatheringByteChannel gatherChannel = fos.getChannel(); // generate some brilliant marcom, er, repurposed content ByteBuffer [] bs = utterBS (reps); // deliver the message to the waiting market while (gatherChannel.write (bs) > 0) { // empty body // loop until write() returns zero } System.out.println ("Mindshare paradigms synergized to " + DEMOGRAPHIC); fos.close(); }
/** * Write some of this set to the given channel. * * @param destChannel The channel to write to. * @param writePosition The position in the message set to begin writing from. * @param size The maximum number of bytes to write * @return The number of bytes actually written. */ @Override public int writeTo(final GatheringByteChannel destChannel, final long writePosition, final int size) throws IOException { // Ensure that the underlying size has not changed. int newSize = Math.min((int) channel.size(), end) - start; if (newSize < _size.get()) { throw new SamsaException(String.format( "Size of FileMessageSet %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), _size.get(), newSize)); } int bytesTransferred = (int) channel.transferTo(start + writePosition, Math.min(size, sizeInBytes()), destChannel); if (LOG.isTraceEnabled()) { LOG.trace("FileMessageSet {} : bytes transferred : {} bytes requested for transfer : {}", file.getAbsolutePath(), bytesTransferred, Math.min(size, sizeInBytes())); } return bytesTransferred; }
private static Sink newSink(Object out) { if (out instanceof OutputStream) { return new OioSink((OutputStream) out); } if (out instanceof GatheringByteChannel) { return new NioSink((GatheringByteChannel) out); } throw new FastdfsException("unknown sink output type " + out.getClass().getName()); }
public void getBox(WritableByteChannel writableByteChannel) throws IOException { ByteBuffer bb = ByteBuffer.allocate(16); long size = getSize(); if (isSmallBox(size)) { IsoTypeWriter.writeUInt32(bb, size); } else { IsoTypeWriter.writeUInt32(bb, 1); } bb.put(IsoFile.fourCCtoBytes("mdat")); if (isSmallBox(size)) { bb.put(new byte[8]); } else { IsoTypeWriter.writeUInt64(bb, size); } bb.rewind(); writableByteChannel.write(bb); if (writableByteChannel instanceof GatheringByteChannel) { List<ByteBuffer> nuSamples = unifyAdjacentBuffers(samples); for (int i = 0; i < Math.ceil((double) nuSamples.size() / STEPSIZE); i++) { List<ByteBuffer> sublist = nuSamples.subList( i * STEPSIZE, // start (i + 1) * STEPSIZE < nuSamples.size() ? (i + 1) * STEPSIZE : nuSamples.size()); // end ByteBuffer sampleArray[] = sublist.toArray(new ByteBuffer[sublist.size()]); do { ((GatheringByteChannel) writableByteChannel).write(sampleArray); } while (sampleArray[sampleArray.length - 1].remaining() > 0); } //System.err.println(bytesWritten); } else { for (ByteBuffer sample : samples) { sample.rewind(); writableByteChannel.write(sample); } } }
/** * Copies bytes from source {@link org.jboss.netty.buffer.ChannelBuffer} to a {@link java.nio.channels.GatheringByteChannel} * * @param source ChannelBuffer to copy from * @param sourceIndex index in <i>source</i> to start copying from * @param length how many bytes to copy * @param channel target GatheringByteChannel */ public static void writeToChannel(ChannelBuffer source, int sourceIndex, int length, GatheringByteChannel channel) throws IOException { while (length > 0) { int written = source.getBytes(sourceIndex, channel, length); sourceIndex += written; length -= written; } assert length == 0; }
@Override public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; pending = TransportLayers.hasPendingWrites(channel); return written; }
@Override public long writeTo(GatheringByteChannel channel, long position, int length) throws IOException { if (position > Integer.MAX_VALUE) throw new IllegalArgumentException("position should not be greater than Integer.MAX_VALUE: " + position); if (position + length > buffer.limit()) throw new IllegalArgumentException("position+length should not be greater than buffer.limit(), position: " + position + ", length: " + length + ", buffer.limit(): " + buffer.limit()); int pos = (int) position; ByteBuffer dup = buffer.duplicate(); dup.position(pos); dup.limit(pos + length); return channel.write(dup); }
/** * Write all records to the given channel (including partial records). * @param channel The channel to write to * @return The number of bytes written * @throws IOException For any IO errors writing to the channel */ public int writeFullyTo(GatheringByteChannel channel) throws IOException { buffer.mark(); int written = 0; while (written < sizeInBytes()) written += channel.write(buffer); buffer.reset(); return written; }
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { if (useGathering()) { return (int) out.write(toByteBuffers(index, length)); } // XXX Gathering write is not supported because of a known issue. // See http://bugs.sun.com/view_bug.do?bug_id=6210541 // This issue appeared in 2004 and is still unresolved!? return out.write(toByteBuffer(index, length)); }
@Override public long writeTo(GatheringByteChannel channel) throws IOException { long written = channel.write(buffers); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; // This is temporary workaround. As Send , Receive interfaces are being used by BlockingChannel. // Once BlockingChannel is removed we can make Send, Receive to work with transportLayer rather than // GatheringByteChannel or ScatteringByteChannel. if (channel instanceof TransportLayer) pending = ((TransportLayer) channel).hasPendingWrites(); return written; }
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException { int length=0; Buffer buf0 = header==null?null:header.buffer(); Buffer buf1 = buffer==null?null:buffer.buffer(); if (_channel instanceof GatheringByteChannel && header!=null && header.length()!=0 && buf0 instanceof NIOBuffer && buffer!=null && buffer.length()!=0 && buf1 instanceof NIOBuffer) { length = gatheringFlush(header,((NIOBuffer)buf0).getByteBuffer(),buffer,((NIOBuffer)buf1).getByteBuffer()); } else { // flush header if (header!=null && header.length()>0) length=flush(header); // flush buffer if ((header==null || header.length()==0) && buffer!=null && buffer.length()>0) length+=flush(buffer); // flush trailer if ((header==null || header.length()==0) && (buffer==null || buffer.length()==0) && trailer!=null && trailer.length()>0) length+=flush(trailer); } return length; }
protected int gatheringFlush(Buffer header, ByteBuffer bbuf0, Buffer buffer, ByteBuffer bbuf1) throws IOException { int length; synchronized(this) { // Adjust position indexs of buf0 and buf1 bbuf0=bbuf0.asReadOnlyBuffer(); bbuf0.position(header.getIndex()); bbuf0.limit(header.putIndex()); bbuf1=bbuf1.asReadOnlyBuffer(); bbuf1.position(buffer.getIndex()); bbuf1.limit(buffer.putIndex()); _gather2[0]=bbuf0; _gather2[1]=bbuf1; // do the gathering write. length=(int)((GatheringByteChannel)_channel).write(_gather2); int hl=header.length(); if (length>hl) { header.clear(); buffer.skip(length-hl); } else if (length>0) { header.skip(length); } } return length; }
@Override public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { // adapted from UnpooledDirectByteBuf: checkIndex(index, length); if (length == 0) { return 0; } ByteBuffer tmpBuf = memorySegment.wrap(index, length); return out.write(tmpBuf); }
public static void gatherWrite(ByteBuffer[] byteBuffers,String file) throws IOException { FileOutputStream fos = new FileOutputStream(file); GatheringByteChannel gatherChannel = fos.getChannel(); while(gatherChannel.write(byteBuffers)>0){ } System.out.println("全部写入文件:"+file); gatherChannel.close(); fos.close(); }
public static void gatherWrite (String[] strs,String file) throws IOException { FileOutputStream fos = new FileOutputStream("d:\\aa.txt"); GatheringByteChannel gatherChannel = fos.getChannel(); ByteBuffer[] byteBuffers = utterBS(); while(gatherChannel.write(byteBuffers)>0){ } System.out.println("全部写入文件:"+file); fos.close(); }
/** * Drain pending buffers one at a time into the socket * @param channel * @return * @throws IOException */ @Override int drainTo (final GatheringByteChannel channel) throws IOException { int bytesWritten = 0; long rc = 0; do { /* * Nothing to write */ if (m_currentWriteBuffer == null && m_queuedBuffers.isEmpty()) { break; } ByteBuffer buffer = null; if (m_currentWriteBuffer == null) { m_currentWriteBuffer = m_queuedBuffers.poll(); buffer = m_currentWriteBuffer.b(); buffer.flip(); } else { buffer = m_currentWriteBuffer.b(); } rc = channel.write(buffer); //Discard the buffer back to a pool if no data remains if (!buffer.hasRemaining()) { m_currentWriteBuffer.discard(); m_currentWriteBuffer = null; m_messagesWritten++; } bytesWritten += rc; } while (rc > 0); m_bytesWritten += bytesWritten; return bytesWritten; }
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { ensureAccessible(); if (length == 0) { return 0; } ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = buffer.duplicate(); } tmpBuf.clear().position(index).limit(index + length); return out.write(tmpBuf); }
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { checkIndex(index, length); index = idx(index); ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = ByteBuffer.wrap(memory); } return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length)); }
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException { ensureAccessible(); ByteBuffer tmpBuf; if (internal) { tmpBuf = internalNioBuffer(); } else { tmpBuf = ByteBuffer.wrap(array); } return out.write((ByteBuffer) tmpBuf.clear().position(index).limit(index + length)); }