Java 类io.vertx.core.file.AsyncFile 实例源码

项目:vertx-fastdfs-client    文件:FdfsStorageImpl.java   
@Override
public FdfsStorage download(FdfsFileId fileId, String fileFullPathName, long offset, long bytes,
        Handler<AsyncResult<Void>> handler) {

    vertx.fileSystem().open(fileFullPathName, new OpenOptions().setCreate(true).setWrite(true), ar -> {
        if (ar.succeeded()) {
            AsyncFile file = ar.result();

            download(fileId, file, offset, bytes, download -> {
                file.close();
                handler.handle(download);
            });
        } else {
            handler.handle(Future.failedFuture(ar.cause()));
        }
    });

    return this;
}
项目:vertx-fastdfs-client    文件:FdfsStorageImpl.java   
public static Future<LocalFile> readFile(FileSystem fs, String filefullPathName) {
    LocalFile localFile = new LocalFile();

    return Future.<FileProps>future(future -> {
        fs.props(filefullPathName, future);
    }).compose(props -> {
        localFile.setSize(props.size());

        return Future.<AsyncFile>future(future -> {
            fs.open(filefullPathName, new OpenOptions().setRead(true).setWrite(false).setCreate(false), future);
        });
    }).compose(fileStream -> {

        localFile.setFile(fileStream);

        return Future.succeededFuture(localFile);
    });
}
项目:sfs    文件:AsyncIOTest.java   
@Test
public void copyZeroLength(TestContext context) throws IOException {
    Path tmpFile = Files.createTempFile(tmpDir, "", "");

    AsyncFile asyncFile = vertx.fileSystem().openBlocking(tmpFile.toString(), new OpenOptions());
    final BufferWriteEndableWriteStream bufferWriteStream = new BufferWriteEndableWriteStream();

    Async async = context.async();
    AsyncIO.pump(asyncFile, bufferWriteStream)
            .map(new Func1<Void, Void>() {
                @Override
                public Void call(Void aVoid) {
                    VertxAssert.assertEquals(context, 0, bufferWriteStream.toBuffer().length());
                    return null;
                }
            })
            .subscribe(new TestSubscriber(context, async));
}
项目:tentacles    文件:AsyncFileStore.java   
/**
 * Asynchronously store content from source to filePath,
 * and call onEnd when finished
 * @param source
 * @param filePath
 * @param onEnd
 */
public static void asyncStore(Vertx vertx,
                       ReadStream<Buffer> source,
                       String filePath,
                       Handler<Void> onEnd) {
    checkDir(filePath);
    source.pause();
    vertx.fileSystem().open(filePath,
            new OpenOptions().setWrite(true).setCreate(true),
            fres -> {
                AsyncFile afile = fres.result();
                Pump pump = Pump.pump(source, afile);
                source.endHandler(onEnd);
                pump.start();
                source.resume();
            });
}
项目:vertx-pairtree    文件:S3Client.java   
/**
 * Uploads the file contents to S3.
 *
 * @param aBucket An S3 bucket
 * @param aKey An S3 key
 * @param aFile A file to upload
 * @param aHandler A response handler for the upload
 */
public void put(final String aBucket, final String aKey, final AsyncFile aFile,
        final Handler<HttpClientResponse> aHandler) {
    final S3ClientRequest request = createPutRequest(aBucket, aKey, aHandler);
    final Buffer buffer = Buffer.buffer();

    aFile.endHandler(event -> {
        request.putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buffer.length()));
        request.end(buffer);
    });

    aFile.handler(data -> {
        buffer.appendBuffer(data);
    });
}
项目:sfs    文件:AsyncIO.java   
public static Observable<Void> close(AsyncFile asyncFile) {
    return Observable.defer(() -> {
        ObservableFuture<Void> rh = RxHelper.observableFuture();
        asyncFile.close(rh.toHandler());
        return rh;
    });
}
项目:sfs    文件:JournalFileTest.java   
@Test
public void testMetadataAndReadStream(TestContext context) throws IOException {

    byte[] data = new byte[256 * 1024 * 1024];
    getCurrentInstance().nextBytesBlocking(data);

    Path dataFile = path.resolve(".data");
    write(dataFile, data, CREATE_NEW);

    long size = size(dataFile);
    final byte[] expectedDataSha512 = hash(dataFile.toFile(), sha512()).asBytes();
    final AsyncFile bigFile = sfsVertx.fileSystem().openBlocking(dataFile.toString(), new OpenOptions());

    Path journalPath = path.resolve(".journal");
    JournalFile journalFile = new JournalFile(journalPath);

    Async async = context.async();

    aVoid()
            .flatMap(aVoid -> journalFile.open(sfsVertx))
            .flatMap(aVoid -> journalFile.enableWrites(sfsVertx))
            .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata0", UTF_8.toString()), size, bigFile))
            .doOnNext(aVoid -> bigFile.setReadPos(0))
            .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata1", UTF_8.toString()), size, bigFile))
            .doOnNext(aVoid -> bigFile.setReadPos(0))
            .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata2", UTF_8.toString()), size, bigFile))
            .doOnNext(aVoid -> bigFile.setReadPos(0))
            .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata3", UTF_8.toString()), size, bigFile))
            .doOnNext(aVoid -> bigFile.setReadPos(0))
            .flatMap(aVoid -> journalFile.append(sfsVertx, buffer("metadata4", UTF_8.toString()), size, bigFile))
            // assert stuff before closing
            .flatMap(aVoid -> assertScanDataReadStream(context, sfsVertx, journalFile, 5, "metadata", expectedDataSha512))
            .flatMap(aVoid -> journalFile.disableWrites(sfsVertx))
            .flatMap(aVoid -> journalFile.force(sfsVertx, true))
            .flatMap(aVoid -> journalFile.close(sfsVertx))
            // assert stuff can be read closing and opening
            .flatMap(aVoid -> journalFile.open(sfsVertx))
            .flatMap(aVoid -> assertScanDataReadStream(context, sfsVertx, journalFile, 5, "metadata", expectedDataSha512))
            .subscribe(new TestSubscriber(context, async));
}
项目:sfs    文件:PipedStreamTest.java   
@Test
public void testImmediatePumpFile(TestContext context) throws IOException {
    SfsVertx vertx = new SfsVertxImpl(rule.vertx(), backgroundPool, ioPool);

    byte[] bytes = new byte[256];
    getCurrentInstance().nextBytesBlocking(bytes);
    Path path = createTempFile("", "");

    try (OutputStream outputStream = newOutputStream(path)) {
        for (int i = 0; i < 10000; i++) {
            outputStream.write(bytes);
        }
    }

    final byte[] sha512 = hash(path.toFile(), sha512()).asBytes();

    Async async = context.async();
    aVoid()
            .flatMap(aVoid -> {
                AsyncFile asyncFile = vertx.fileSystem().openBlocking(path.toString(), new OpenOptions());
                PipedReadStream pipedReadStream = new PipedReadStream();
                PipedEndableWriteStream pipedEndableWriteStream = new PipedEndableWriteStream(pipedReadStream);
                Observable<Void> producer = pump(asyncFile, pipedEndableWriteStream);
                DigestEndableWriteStream digestWriteStream = new DigestEndableWriteStream(new NullEndableWriteStream(), SHA512);
                Observable<Void> consumer = pump(pipedReadStream, digestWriteStream);
                return combineSinglesDelayError(producer, consumer, (aVoid1, aVoid2) -> {
                    assertArrayEquals(context, sha512, digestWriteStream.getDigest(SHA512).get());
                    return (Void) null;
                });
            })
            .doOnTerminate(() -> {
                try {
                    deleteIfExists(path);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            })
            .subscribe(new TestSubscriber(context, async));
}
项目:georocket    文件:FileStore.java   
@Override
public void getOne(String path, Handler<AsyncResult<ChunkReadStream>> handler) {
  String absolutePath = Paths.get(root, path).toString();

  // check if chunk exists
  FileSystem fs = vertx.fileSystem();
  ObservableFuture<Boolean> observable = RxHelper.observableFuture();
  fs.exists(absolutePath, observable.toHandler());
  observable
    .flatMap(exists -> {
      if (!exists) {
        return Observable.error(new FileNotFoundException("Could not find chunk: " + path));
      }
      return Observable.just(exists);
    })
    .flatMap(exists -> {
      // get chunk's size
      ObservableFuture<FileProps> propsObservable = RxHelper.observableFuture();
      fs.props(absolutePath, propsObservable.toHandler());
      return propsObservable;
    })
    .map(props -> props.size())
    .flatMap(size -> {
      // open chunk
      ObservableFuture<AsyncFile> openObservable = RxHelper.observableFuture();
      OpenOptions openOptions = new OpenOptions().setCreate(false).setWrite(false);
      fs.open(absolutePath, openOptions, openObservable.toHandler());
      return openObservable.map(f -> new FileChunkReadStream(size, f));
    })
    .subscribe(readStream -> {
      // send chunk to peer
      handler.handle(Future.succeededFuture(readStream));
    }, err -> {
      handler.handle(Future.failedFuture(err));
    });
}
项目:react-streams    文件:RsFileSystem.java   
public Stream<RsAsyncFile> open(String path, OpenOptions options) {
    return Stream.asOne(subscription -> {
        fileSystem.open(path, options, new Handler<AsyncResult<AsyncFile>>() {
            @Override
            public void handle(AsyncResult<AsyncFile> event) {
                if (event.succeeded()) {
                    subscription.sendNext(new RsAsyncFile(event.result()));
                    subscription.sendComplete();
                } else {
                    subscription.sendError(event.cause());
                }                               
            }
        });
    });
}
项目:vertx-rx    文件:NativeExamples.java   
public void toObservable(Vertx vertx) {
  FileSystem fileSystem = vertx.fileSystem();
  fileSystem.open("/data.txt", new OpenOptions(), result -> {
    AsyncFile file = result.result();
    Observable<Buffer> observable = RxHelper.toObservable(file);
    observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
  });
}
项目:vertx-rx    文件:NativeExamples.java   
public void unmarshaller(FileSystem fileSystem) {
  fileSystem.open("/data.txt", new OpenOptions(), result -> {
    AsyncFile file = result.result();
    Observable<Buffer> observable = RxHelper.toObservable(file);
    observable.lift(RxHelper.unmarshaller(MyPojo.class)).subscribe(
        mypojo -> {
          // Process the object
        }
    );
  });
}
项目:vertx-rx    文件:NativeExamples.java   
public void toFlowable(Vertx vertx) {
  FileSystem fileSystem = vertx.fileSystem();
  fileSystem.open("/data.txt", new OpenOptions(), result -> {
    AsyncFile file = result.result();
    Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
    observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
  });
}
项目:vertx-rx    文件:NativeExamples.java   
public void unmarshaller(FileSystem fileSystem) {
  fileSystem.open("/data.txt", new OpenOptions(), result -> {
    AsyncFile file = result.result();
    Flowable<Buffer> observable = FlowableHelper.toFlowable(file);
    observable.compose(FlowableHelper.unmarshaller(MyPojo.class)).subscribe(
        mypojo -> {
          // Process the object
        }
    );
  });
}
项目:vertx-fastdfs-client    文件:FdfsStorageImpl.java   
private LocalFile setFile(AsyncFile file) {
    this.file = file;
    return this;
}
项目:chlorophytum-semantics    文件:DownloadService.java   
public Future<Void> download(String target) {

        Future<Void> future = Future.future();

        HttpClientOptions options = new HttpClientOptions();
        options.setKeepAlive(false);
        options.setLogActivity(true);

        HttpClient httpClient = vertx.createHttpClient(options);

        httpClient.get(WORDNET_PORT, WORDNET_HOST, WORDNET_FILE, httpEvent -> {
            // pause the http response till we complete setting up our async file handler
                httpEvent.pause();
                // setup file open handler
                OpenOptions openOptions = new OpenOptions();
                vertx.fileSystem().open(target, openOptions, fileEvent -> {
                    if (fileEvent.failed()) {
                        fileEvent.cause().printStackTrace();
                        return;
                    }
                    final AsyncFile asynFile = fileEvent.result();
                    final Pump downloadPump = Pump.pump(httpEvent, asynFile);
                    downloadPump.start();
                    // resume the receive operation
                        httpEvent.resume();
                        httpEvent.endHandler(event -> {
                            // close the file
                            asynFile.flush().close(closeEvent -> {
                            });
                            logger.info("Downloaded size = {}", downloadPump.numberPumped());

                            future.complete();

                        });
                    });
            }).exceptionHandler(e -> {
            logger.error("Error while downloading the dictionary!", e);
            future.fail(e);
        }).end();

        return future;
    }
项目:actson    文件:VertxExample.java   
/**
 * Asynchronously parse a file and print JSON events to System.out
 * @param filename the name of the JSON file to parse
 * @param handler a handler that will be called when the file has been parsed
 * or when an error has occurred
 */
private void parseFile(String filename, Handler<AsyncResult<Void>> handler) {
  OpenOptions options = new OpenOptions()
      .setRead(true)
      .setWrite(false);

  // asynchronously open the file
  vertx.fileSystem().open(filename, options, ar -> {
    if (ar.failed()) {
      handler.handle(Future.failedFuture(ar.cause()));
      return;
    }

    JsonParser parser = new JsonParser();
    AsyncFile f = ar.result();

    Supplier<Boolean> processEvents = () -> {
      // process events from the parser until it needs more input
      int event;
      do {
        event = parser.nextEvent();

        // print all events to System.out
        if (event != JsonEvent.NEED_MORE_INPUT) {
          System.out.println("JSON event: " + event);
        }

        // handle values, errors, and end of file
        if (event == JsonEvent.VALUE_STRING) {
          System.out.println("VALUE: " + parser.getCurrentString());
        } else if (event == JsonEvent.EOF) {
          handler.handle(Future.succeededFuture());
          return false;
        } else if (event == JsonEvent.ERROR) {
          handler.handle(Future.failedFuture("Syntax error"));
          return false;
        }
      } while (event != JsonEvent.NEED_MORE_INPUT);

      return true;
    };

    f.exceptionHandler(t -> {
      handler.handle(Future.failedFuture(t));
    });

    f.handler(buf -> {
      // forward bytes read from the file to the parser
      byte[] bytes = buf.getBytes();
      int i = 0;
      while (i < bytes.length) {
        i += parser.getFeeder().feed(bytes, i, bytes.length - i);
        if (!processEvents.get()) {
          f.handler(null);
          f.endHandler(null);
          break;
        }
      }
    });

    f.endHandler(v -> {
      // process events one last time
      parser.getFeeder().done();
      processEvents.get();
    });
  });
}
项目:sfs    文件:AsyncFileEndableWriteStream.java   
public AsyncFileEndableWriteStream(AsyncFile delegate) {
    this.delegate = delegate;
}
项目:sfs    文件:AsyncFileEndableWriteStream.java   
public AsyncFile getDelegate() {
    return delegate;
}
项目:sfs    文件:RemoteBlobActionsTest.java   
@Override
public Observable<HeaderBlob> call(Void aVoid) {
    return aVoid()
            .flatMap(aVoid1 -> {
                ObservableFuture<AsyncFile> handler = RxHelper.observableFuture();
                OpenOptions openOptions = new OpenOptions();
                openOptions.setCreate(true)
                        .setRead(true)
                        .setWrite(true);
                vertx.vertx().fileSystem().open(data.toString(), openOptions, handler.toHandler());
                return handler;
            }).flatMap(asyncFile -> {
                final long size;
                final byte[] md5;
                final byte[] sha512;
                try {
                    size = size(data);
                    md5 = hash(data.toFile(), md5()).asBytes();
                    sha512 = hash(data.toFile(), sha512()).asBytes();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }

                return just(fromNullable(getFirst(vertx.verticle().nodes().volumeManager().volumes(), null)))
                        .map(volumeOptional -> volumeOptional.get())
                        .flatMap(new Func1<String, Observable<HeaderBlob>>() {
                            @Override
                            public Observable<HeaderBlob> call(String volumeId) {
                                out.println("PPPP1");
                                return remoteNode.createWriteStream(volumeId, size, SHA512, MD5)
                                        .flatMap(nodeWriteStream -> {
                                            out.println("PPPP2");
                                            return nodeWriteStream.consume(asyncFile);
                                        })
                                        .map(blob -> {
                                            out.println("PPPP3");
                                            asyncFile.close();
                                            return blob;
                                        })
                                        .map(blob -> {
                                            assertArrayEquals(testContext, md5, blob.getDigest(MD5).get());
                                            assertArrayEquals(testContext, sha512, blob.getDigest(SHA512).get());
                                            return blob;
                                        });
                            }
                        });
            });
}
项目:sfs    文件:CreateUpdateDeleteObjectTest.java   
@Test
public void testLargeUpload(TestContext context) throws IOException {

    byte[] data = new byte[256];
    getCurrentInstance().nextBytesBlocking(data);
    int dataSize = 256 * 1024;

    Path tempFile = createTempFile(tmpDir, "", "");

    int bytesWritten = 0;
    try (OutputStream out = newOutputStream(tempFile)) {
        while (bytesWritten < dataSize) {
            out.write(data);
            bytesWritten += data.length;
        }
    }

    long size = size(tempFile);
    final byte[] md5 = hash(tempFile.toFile(), md5()).asBytes();
    final byte[] sha512 = hash(tempFile.toFile(), sha512()).asBytes();
    final AsyncFile bigFile = vertx.fileSystem().openBlocking(tempFile.toString(), new OpenOptions());

    Async async = context.async();
    prepareContainer(context)

            // put an object then get/head the object
            .flatMap(new PutObjectStream(httpClient, accountName, containerName, objectName, authNonAdmin, bigFile)
                    .setHeader(CONTENT_LENGTH, valueOf(size))
                    .setHeader(ETAG, base16().lowerCase().encode(md5)))
            .map(new HttpClientResponseHeaderLogger())
            .map(new AssertHttpClientResponseStatusCode(context, HTTP_CREATED))
            .map(new AssertObjectHeaders(context, 0, false, 0, md5, sha512, 0))
            .map(new ToVoid<HttpClientResponse>())
            .flatMap(new GetObject(httpClient, accountName, containerName, objectName, authNonAdmin))
            .map(new HttpClientResponseHeaderLogger())
            .map(new AssertHttpClientResponseStatusCode(context, HTTP_OK))
            .map(new AssertObjectHeaders(context, 0, false, size, md5, sha512, 1))
            .flatMap(new Func1<HttpClientResponse, Observable<HttpClientResponse>>() {
                @Override
                public Observable<HttpClientResponse> call(final HttpClientResponse httpClientResponse) {
                    final DigestEndableWriteStream digestWriteStream = new DigestEndableWriteStream(new NullEndableWriteStream(), SHA512, MD5);
                    return pump(httpClientResponse, digestWriteStream)
                            .map(new Func1<Void, HttpClientResponse>() {
                                @Override
                                public HttpClientResponse call(Void aVoid) {
                                    assertArrayEquals(context, md5, digestWriteStream.getDigest(MD5).get());
                                    assertArrayEquals(context, sha512, digestWriteStream.getDigest(SHA512).get());
                                    return httpClientResponse;
                                }
                            });
                }
            })
            .map(new ToVoid<HttpClientResponse>())
            .subscribe(new TestSubscriber(context, async));
}
项目:sfs    文件:CreateUpdateDeleteObjectTest.java   
@Test
public void testEncryptedLargeUpload(TestContext context) throws IOException {

    byte[] data = new byte[256];
    getCurrentInstance().nextBytesBlocking(data);
    int dataSize = 256 * 1024;

    Path tempFile = createTempFile(tmpDir, "", "");

    int bytesWritten = 0;
    try (OutputStream out = newOutputStream(tempFile)) {
        while (bytesWritten < dataSize) {
            out.write(data);
            bytesWritten += data.length;
        }
    }

    long size = size(tempFile);
    final byte[] md5 = hash(tempFile.toFile(), md5()).asBytes();
    final byte[] sha512 = hash(tempFile.toFile(), sha512()).asBytes();
    final AsyncFile bigFile = vertx.fileSystem().openBlocking(tempFile.toString(), new OpenOptions());

    Async async = context.async();
    prepareContainer(context)
            // put an object then get/head the object
            .flatMap(new PutObjectStream(httpClient, accountName, containerName, objectName, authNonAdmin, bigFile)
                    .setHeader(CONTENT_LENGTH, valueOf(size))
                    .setHeader(ETAG, base16().lowerCase().encode(md5))
                    .setHeader(X_SERVER_SIDE_ENCRYPTION, "true"))
            .map(new HttpClientResponseHeaderLogger())
            .map(new AssertHttpClientResponseStatusCode(context, HTTP_CREATED))
            .map(new AssertObjectHeaders(context, 0, true, 0, md5, sha512, 0))
            .map(new ToVoid<HttpClientResponse>())
            .flatMap(new GetObject(httpClient, accountName, containerName, objectName, authNonAdmin))
            .map(new HttpClientResponseHeaderLogger())
            .map(new AssertHttpClientResponseStatusCode(context, HTTP_OK))
            .map(new AssertObjectHeaders(context, 0, true, size, md5, sha512, 1))
            .flatMap(new Func1<HttpClientResponse, Observable<HttpClientResponse>>() {
                @Override
                public Observable<HttpClientResponse> call(final HttpClientResponse httpClientResponse) {
                    final DigestEndableWriteStream digestWriteStream = new DigestEndableWriteStream(new NullEndableWriteStream(), SHA512, MD5);
                    return pump(httpClientResponse, digestWriteStream)
                            .map(new Func1<Void, HttpClientResponse>() {
                                @Override
                                public HttpClientResponse call(Void aVoid) {
                                    assertArrayEquals(context, md5, digestWriteStream.getDigest(MD5).get());
                                    assertArrayEquals(context, sha512, digestWriteStream.getDigest(SHA512).get());
                                    return httpClientResponse;
                                }
                            });
                }
            })
            .map(new ToVoid<HttpClientResponse>())
            .subscribe(new TestSubscriber(context, async));
}
项目:mewbase    文件:AFBasicFile.java   
public AFBasicFile(AsyncFile af) {
    this.af = af;
}
项目:georocket    文件:FileChunkReadStream.java   
/**
 * Constructs a new read stream
 * @param size the chunk's size
 * @param delegate the underlying read stream
 */
public FileChunkReadStream(long size, AsyncFile delegate) {
  super(delegate);
  this.size = size;
  this.file = delegate;
}
项目:georocket    文件:ImportCommand.java   
/**
 * Upload a file to GeoRocket
 * @param path path to file to import
 * @param client the GeoRocket client
 * @param vertx the Vert.x instance
 * @return an observable that will emit when the file has been uploaded
 */
protected Observable<Void> importFile(String path, GeoRocketClient client, Vertx vertx) {
  // open file
  FileSystem fs = vertx.fileSystem();
  OpenOptions openOptions = new OpenOptions().setCreate(false).setWrite(false);
  return fs.rxOpen(path, openOptions)
    // get file size
    .flatMap(f -> fs.rxProps(path).map(props -> Pair.of(f, props.size())))
    // import file
    .flatMapObservable(f -> {
      ObservableFuture<Void> o = RxHelper.observableFuture();
      Handler<AsyncResult<Void>> handler = o.toHandler();
      AsyncFile file = f.getLeft().getDelegate();

      WriteStream<Buffer> out = client.getStore()
        .startImport(layer, tags, properties, Optional.of(f.getRight()),
          fallbackCRS, handler);

      AtomicBoolean fileClosed = new AtomicBoolean();

      Pump pump = Pump.pump(file, out);
      file.endHandler(v -> {
        file.close();
        out.end();
        fileClosed.set(true);
      });

      Handler<Throwable> exceptionHandler = t -> {
        if (!fileClosed.get()) {
          file.endHandler(null);
          file.close();
        }
        handler.handle(Future.failedFuture(t));
      };
      file.exceptionHandler(exceptionHandler);
      out.exceptionHandler(exceptionHandler);

      pump.start();
      return o;
  });
}
项目:mesh    文件:PropReadFileStream.java   
private PropReadFileStream(FileProps props, AsyncFile file, String path) {
    this.props = props;
    this.file = file;
    this.path = path;
}
项目:mesh    文件:RxUtil.java   
public static Observable<Buffer> toBufferObs(AsyncFile file) {
    return toBufferObs(new io.vertx.reactivex.core.file.AsyncFile(file));
}
项目:mesh    文件:RxUtil.java   
public static Observable<Buffer> toBufferObs(io.vertx.reactivex.core.file.AsyncFile file) {
    return file.toObservable().map(io.vertx.reactivex.core.buffer.Buffer::getDelegate).doOnTerminate(() -> file.close());
}
项目:mesh    文件:BinaryFieldHandler.java   
/**
 * Processes the upload and set the binary information (e.g.: image dimensions) within the provided field. The binary data will be stored in the
 * {@link BinaryStorage} if desired.
 * 
 * @param ac
 * @param ul
 *            Upload to process
 * @param field
 *            Field which will be updated with the extracted information
 * @param storeBinary
 *            Whether to store the data in the binary store
 */
private void processUpload(ActionContext ac, FileUpload ul, BinaryGraphField field, boolean storeBinary) {
    AsyncFile asyncFile = Mesh.vertx().fileSystem().openBlocking(ul.uploadedFileName(), new OpenOptions());
    Binary binary = field.getBinary();
    String hash = binary.getSHA512Sum();
    String binaryUuid = binary.getUuid();
    String contentType = ul.contentType();
    boolean isImage = contentType.startsWith("image/");

    // Calculate how many streams will connect to the data stream
    int neededDataStreams = 0;
    if (isImage) {
        neededDataStreams++;
    }
    if (storeBinary) {
        neededDataStreams++;
    }

    if (neededDataStreams > 0) {
        Observable<Buffer> stream = RxUtil.toBufferObs(asyncFile).publish().autoConnect(neededDataStreams);

        // Only gather image info for actual images. Otherwise return an empty image info object.
        Single<Optional<ImageInfo>> imageInfo = Single.just(Optional.empty());
        if (isImage) {
            imageInfo = processImageInfo(ac, stream);
        }

        // Store the data
        Single<Long> store = Single.just(ul.size());
        if (storeBinary) {
            store = binaryStorage.store(stream, binaryUuid).andThen(Single.just(ul.size()));
        }

        // Handle the data in parallel

        TransformationResult info = Single.zip(imageInfo, store, (imageinfoOpt, size) -> {
            ImageInfo iinfo = null;
            if (imageinfoOpt.isPresent()) {
                iinfo = imageinfoOpt.get();
            }
            return new TransformationResult(hash, 0, iinfo, null);
        }).blockingGet();
        // Only add image information if image properties were found
        if (info.getImageInfo() != null) {
            binary.setImageHeight(info.getImageInfo().getHeight());
            binary.setImageWidth(info.getImageInfo().getWidth());
            field.setImageDominantColor(info.getImageInfo().getDominantColor());
        }

    }

    field.setFileName(ul.fileName());
    field.getBinary().setSize(ul.size());
    field.setMimeType(contentType);
}
项目:raml-module-builder    文件:ProcessUploads.java   
/**
 * Main work done by the FileDataHandler which reads in line by line and passes on that line
 * to the correct Importer implementation for line processing
 * @param fileSize
 * @param conf
 * @param replyHandler - the handler returns a job object with success and error counter parameters
 * to be persisted by the job runner
 */
private void parseFile(long fileSize, Job conf, Handler<AsyncResult<Job>> replyHandler) {
  String file = conf.getParameters().get(0).getValue();
  vertx.fileSystem().open(file, new OpenOptions(), ar -> {
    if (ar.succeeded()) {
      AsyncFile rs = ar.result();
      rs.handler(new FileDataHandler(vertx, conf, fileSize, importerCache.get(conf.getName()), reply -> {
        if(reply.failed()){
          if(reply.cause().getMessage().contains(RTFConsts.STATUS_ERROR_THRESHOLD)){
            log.error("Stopping import... Error threshold exceeded for file " + file);
            try{
              //can throw an exception if the error threshold is met at
              //the last bulk where the endHandler is called before the stop on error is called
              rs.pause().close();
            }
            catch(Exception e){
              log.error("Error threshold hit on last block of data ", e);
            }
            replyHandler.handle(io.vertx.core.Future.failedFuture(RTFConsts.STATUS_ERROR_THRESHOLD));
          }
        }
        else{
          replyHandler.handle(io.vertx.core.Future.succeededFuture(reply.result()));
        }
      }));
      rs.exceptionHandler(t -> {
        log.error("Error reading from file " + file, t);
        replyHandler.handle(io.vertx.core.Future.failedFuture(RTFConsts.STATUS_ERROR));
      });
      rs.endHandler(v -> {
        rs.close(ar2 -> {
          if (ar2.failed()) {
            log.error("Error closing file " + file, ar2.cause());
          }
        });
      });
    } else {
      log.error("Error opening file " + file, ar.cause());
      replyHandler.handle(io.vertx.core.Future.failedFuture(RTFConsts.STATUS_ERROR));
    }
  });
}
项目:react-streams    文件:RsAsyncFile.java   
public RsAsyncFile(AsyncFile asyncFile) {
    this.asyncFile = asyncFile;
}
项目:vertx-web    文件:WebClientTest.java   
private void testRequestWithBody(HttpMethod method, boolean chunked) throws Exception {
  String expected = TestUtils.randomAlphaString(1024 * 1024);
  File f = File.createTempFile("vertx", ".data");
  f.deleteOnExit();
  Files.write(f.toPath(), expected.getBytes());
  waitFor(2);
  server.requestHandler(req -> req.bodyHandler(buff -> {
    assertEquals(method, req.method());
    assertEquals(Buffer.buffer(expected), buff);
    complete();
    req.response().end();
  }));
  startServer();
  vertx.runOnContext(v -> {
    AsyncFile asyncFile = vertx.fileSystem().openBlocking(f.getAbsolutePath(), new OpenOptions());

    HttpRequest<Buffer> builder = null;

    switch (method) {
      case POST:
        builder = client.post(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath");
        break;
      case PUT:
        builder = client.put(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath");
        break;
      case PATCH:
        builder = client.patch(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST, "/somepath");
        break;
      default:
        fail("Invalid HTTP method");
    }

    if (!chunked) {
      builder = builder.putHeader("Content-Length", "" + expected.length());
    }
    builder.sendStream(asyncFile, onSuccess(resp -> {
          assertEquals(200, resp.statusCode());
          complete();
        }));
  });
  await();
}
项目:vertx-fastdfs-client    文件:FdfsStorageImpl.java   
public AsyncFile getFile() {
    return file;
}
项目:mesh    文件:PropReadFileStream.java   
/**
 * Gets the opened file, ready to read.
 * 
 * @return
 */
public AsyncFile getFile() {
    return file;
}