@Bean Job personEtl(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, FlatFileItemReader<Person> reader, JdbcBatchItemWriter<Person> writer ) { Step step = stepBuilderFactory.get("file-to-database") .<Person, Person>chunk(5) .reader(reader) .writer(writer) .build(); return jobBuilderFactory.get("etl") .start(step) .build(); }
@Bean public Step importTicketStep(final StepBuilderFactory stepBuilderFactory, @Qualifier("jpaTransactionManagerForBatch") final PlatformTransactionManager jpaTransactionManager, final @Value("${ticket.chunk.size}") int chunkSize, final ItemReader<Ticket> ticketReader, final ItemWriter<Ticket> ticketWriter, final ItemProcessor<Ticket, Ticket> importTicketProcessor) { return stepBuilderFactory.get("importTicketStep") .<Ticket, Ticket>chunk(chunkSize) .reader(ticketReader) .processor(importTicketProcessor) .writer(ticketWriter) .transactionManager(jpaTransactionManager) .build(); }
@Bean public Step chunkStep() { return stepCreators.get("chunkStep") .<Department, Department>chunk(5) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
private void handleTransition(Deque<Flow> executionDeque, TaskAppNode taskAppNode) { String beanName = getBeanName(taskAppNode); Step currentStep = this.context.getBean(beanName, Step.class); FlowBuilder<Flow> builder = new FlowBuilder<Flow>(beanName) .from(currentStep); boolean wildCardPresent = false; for (TransitionNode transitionNode : taskAppNode.getTransitions()) { String transitionBeanName = getBeanName(transitionNode); wildCardPresent = transitionNode.getStatusToCheck().equals(WILD_CARD); Step transitionStep = this.context.getBean(transitionBeanName, Step.class); builder.on(transitionNode.getStatusToCheck()).to(transitionStep) .from(currentStep); } if (wildCardPresent && !executionDeque.isEmpty()) { throw new IllegalStateException( "Invalid flow following '*' specifier."); } else { //if there are nodes are in the execution Deque. Make sure that //they are processed as a target of the wildcard instead of the //whole transition. if (!executionDeque.isEmpty()) { Deque<Flow> resultDeque = new LinkedList<>(); handleFlowForSegment(executionDeque, resultDeque); builder.on(WILD_CARD).to(resultDeque.pop()).from(currentStep); } } executionDeque.push(builder.end()); }
@Override public Step getObject() throws Exception { TaskLauncherTasklet taskLauncherTasklet = new TaskLauncherTasklet( this.taskOperations, taskConfigurer.getTaskExplorer(), this.composedTaskProperties, this.taskName); taskLauncherTasklet.setArguments(this.arguments); taskLauncherTasklet.setProperties(this.taskSpecificProps); String stepName = this.taskName; return this.steps.get(stepName) .tasklet(taskLauncherTasklet) .transactionAttribute(getTransactionAttribute()) .listener(this.composedTaskStepExecutionListener) .build(); }
@Bean public Step step1() { return stepBuilderFactory.get("step1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { // get path of file in src/main/resources Path xmlDocPath = Paths.get(getFilePath()); // process the file to json String json = processXML2JSON(xmlDocPath); // insert json into mongodb insertToMongo(json); return RepeatStatus.FINISHED; } }).build(); }
@Bean public Job discreteJob() { AbstractJob job = new AbstractJob("discreteRegisteredJob") { @Override public Collection<String> getStepNames() { return Collections.emptySet(); } @Override public Step getStep(String stepName) { return null; } @Override protected void doExecute(JobExecution execution) throws JobExecutionException { execution.setStatus(BatchStatus.COMPLETED); } }; job.setJobRepository(this.jobRepository); return job; }
@Bean public Job discreteJob() { AbstractJob job = new AbstractJob("discreteLocalJob") { @Override public Collection<String> getStepNames() { return Collections.emptySet(); } @Override public Step getStep(String stepName) { return null; } @Override protected void doExecute(JobExecution execution) throws JobExecutionException { execution.setStatus(BatchStatus.COMPLETED); } }; job.setJobRepository(this.jobRepository); return job; }
@Bean @JobScope public Step step1( StepBuilderFactory stepBuilderFactory, DatabaseClientProvider databaseClientProvider, @Value("#{jobParameters['input_file_path']}") String inputFilePath, @Value("#{jobParameters['graph_name']}") String graphName) { RdfTripleItemReader<Map<String, Object>> reader = new RdfTripleItemReader<Map<String, Object>>(); reader.setFileName(inputFilePath); RdfTripleItemWriter writer = new RdfTripleItemWriter(databaseClientProvider.getDatabaseClient(), graphName); return stepBuilderFactory.get("step1") .<Map<String, Object>, Map<String, Object>>chunk(10) .reader(reader) .writer(writer) .build(); }
@Bean public Step createInventoryEntryStep(final BlockingSphereClient sphereClient, final ItemReader<ProductProjection> inventoryEntryReader, final ItemProcessor<ProductProjection, List<InventoryEntryDraft>> inventoryEntryProcessor, final ItemWriter<List<InventoryEntryDraft>> inventoryEntryWriter) { final StepBuilder stepBuilder = stepBuilderFactory.get("createInventoryEntryStep"); return stepBuilder .<ProductProjection, List<InventoryEntryDraft>>chunk(1) .reader(inventoryEntryReader) .processor(inventoryEntryProcessor) .writer(inventoryEntryWriter) .faultTolerant() .skip(ErrorResponseException.class) .skipLimit(1) .build(); }
@Bean protected Step deleteProductsStep(final BlockingSphereClient sphereClient, final ItemWriter<Versioned<Product>> productDeleteWriter) { return stepBuilderFactory.get("deleteProductsStep") .<Product, Product>chunk(50) .reader(ItemReaderFactory.sortedByIdQueryReader(sphereClient, ProductQuery.of())) .writer(productDeleteWriter) .build(); }
@Test public void testMissingStepExecution() throws Exception { when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn(true); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn(true); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn(true); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("foo"); when(this.beanFactory.getBeanNamesForType(Step.class)).thenReturn(new String[] {"foo", "bar", "baz"}); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn("2"); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn("1"); try { this.handler.run(); } catch (NoSuchStepException nsse) { assertEquals("No StepExecution could be located for step execution id 2 within job execution 1", nsse.getMessage()); } }
@Test public void testRunSuccessful() throws Exception { StepExecution workerStep = new StepExecution("workerStep", new JobExecution(1L), 2L); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn(true); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn(true); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn(true); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); when(this.beanFactory.getBeanNamesForType(Step.class)).thenReturn(new String[] {"workerStep", "foo", "bar"}); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn("2"); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn("1"); when(this.jobExplorer.getStepExecution(1L, 2L)).thenReturn(workerStep); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); when(this.beanFactory.getBean("workerStep", Step.class)).thenReturn(this.step); handler.run(); verify(this.step).execute(workerStep); verifyZeroInteractions(this.jobRepository); }
@Test public void testJobInterruptedException() throws Exception { StepExecution workerStep = new StepExecution("workerStep", new JobExecution(1L), 2L); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn(true); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn(true); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn(true); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); when(this.beanFactory.getBeanNamesForType(Step.class)).thenReturn(new String[] {"workerStep", "foo", "bar"}); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn("2"); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn("1"); when(this.jobExplorer.getStepExecution(1L, 2L)).thenReturn(workerStep); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); when(this.beanFactory.getBean("workerStep", Step.class)).thenReturn(this.step); doThrow(new JobInterruptedException("expected")).when(this.step).execute(workerStep); handler.run(); verify(this.jobRepository).update(this.stepExecutionArgumentCaptor.capture()); assertEquals(BatchStatus.STOPPED, this.stepExecutionArgumentCaptor.getValue().getStatus()); }
@Test public void testRuntimeException() throws Exception { StepExecution workerStep = new StepExecution("workerStep", new JobExecution(1L), 2L); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn(true); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn(true); when(this.environment.containsProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn(true); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); when(this.beanFactory.getBeanNamesForType(Step.class)).thenReturn(new String[] {"workerStep", "foo", "bar"}); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_EXECUTION_ID)).thenReturn("2"); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_JOB_EXECUTION_ID)).thenReturn("1"); when(this.jobExplorer.getStepExecution(1L, 2L)).thenReturn(workerStep); when(this.environment.getProperty(DeployerPartitionHandler.SPRING_CLOUD_TASK_STEP_NAME)).thenReturn("workerStep"); when(this.beanFactory.getBean("workerStep", Step.class)).thenReturn(this.step); doThrow(new RuntimeException("expected")).when(this.step).execute(workerStep); handler.run(); verify(this.jobRepository).update(this.stepExecutionArgumentCaptor.capture()); assertEquals(BatchStatus.FAILED, this.stepExecutionArgumentCaptor.getValue().getStatus()); }
@Bean public Step chunkStep() { final int chunkSize = 100; final ItemProcessor<String, String> processor = (item) -> { Thread.sleep(100); return item; }; final ItemWriter<String> writer = (items) -> { Thread.sleep(1000); }; return steps.get("Chunk Step") // .<String, String> chunk(chunkSize) // .reader(itemReader()) // .processor(processor) // .writer(writer) // .build(); }
protected Step wsCallAndGenerateAndSendPaycheckStep(String stepName) { CompositeItemProcessor<TaxCalculation, PayCheck> compositeItemProcessor = new CompositeItemProcessor<>(); compositeItemProcessor.setDelegates(Arrays.asList( callWebserviceProcessor, sendPaycheckProcessor )); return stepBuilders.get(stepName) .<TaxCalculation, PayCheck>chunk(5) .faultTolerant() .skipPolicy(maxConsecutiveExceptionsSkipPolicy) .noRollback(TaxWebServiceNonFatalException.class) .noRollback(EmailSenderException.class) .reader(wsCallItemReader) .processor(compositeItemProcessor) .writer(wsCallItemWriter) .listener(createMonthlyTaxForEmployeeListener) .listener(maxConsecutiveExceptionsSkipPolicy) .listener(failedStepStepExecutionListener) .listener(singleJVMJobProgressListener) .allowStartIfComplete(true) .taskExecutor(taskExecutor) .build(); }
@Bean public Step step(){ return stepBuilders.get("step") .<Item,Item>chunk(3) .reader(reader()) .processor(processor()) .writer(writer()) .readerIsTransactionalQueue() .listener(readListener()) .listener(processListener()) .listener(writeListener()) .faultTolerant() .skip(MetricsTestException.class) .skipLimit(4) .listener(skipListener()) .listener(chunkListener()) .build(); }
@Bean public Step step(){ return stepBuilders.get("step") .<Item,Item>chunk(3) .reader(reader()) .processor(processor()) .writer(writer()) .listener(readListener()) .listener(processListener()) .listener(writeListener()) .faultTolerant() .processorNonTransactional() .skip(MetricsTestException.class) .skipLimit(4) .listener(skipListener()) .listener(chunkListener()) .build(); }
@Bean public Step step(){ return stepBuilders.get("step") .<Item,Item>chunk(3) .reader(reader()) .processor(processor()) .writer(writer()) .listener(readListener()) .listener(processListener()) .listener(writeListener()) .faultTolerant() .skip(MetricsTestException.class) .skipLimit(4) .listener(skipListener()) .listener(chunkListener()) .build(); }
@Bean(name = readCsvFileIntoTableStep) public Step readCsvFileIntoTableStep( StepBuilderFactory stepBuilderFactory, PlatformTransactionManager platformTransactionManager, @Qualifier(readCsvFileIntoTableStepReader) ItemReader<Customer> ir, @Qualifier(readCsvFileIntoTableStepProcessor) ItemProcessor<Customer, Customer> itemProcessor, @Qualifier(readCsvFileIntoTableStepWriter) ItemWriter<Customer> iw) { StepBuilder builder = stepBuilderFactory.get(readCsvFileIntoTableStep); return builder.<Customer, Customer>chunk(3) .reader(ir) .processor(itemProcessor) .writer(iw) .transactionManager(platformTransactionManager) .build(); }
@Bean public Job importProductsJob(Tasklet decompressTasklet, ItemReader<Product> reader) { Step decompress = stepBuilders.get("decompress") .tasklet(decompressTasklet) .repository(jobRepository) .transactionManager(transactionManager) .build(); Step readWriteProducts = stepBuilders.get("readWriteProducts") .<Product, Product>chunk(3) .reader(reader) .writer(writer()) .faultTolerant() .skipLimit(5) .skip(FlatFileParseException.class) .build(); return jobBuilders.get("importProductsJob") .repository(jobRepository) .listener(loggerListener) .start(decompress) .next(readWriteProducts) .build(); }
@Bean public Job importProductsJob() { Step decompressStep = stepBuilders.get("decompressStep") .tasklet(decompressTasklet()) .allowStartIfComplete(true) .build(); Step readWriteProductsStep = stepBuilders.get("readWriteProductsStep") .tasklet(readWriteProductsTasklet()) .build(); return jobBuilders.get("importProductsJob") .start(decompressStep) .next(readWriteProductsStep) .build(); }
@Bean public Job importProductsLimitJob() { Step decompressStepLimit = stepBuilders.get("decompressStepLimit") .tasklet(decompressTaskletLimit()) .build(); // NOTE: Start Limit is 3 Step readWriteProductsStepLimit = stepBuilders.get("readWriteProductsStepLimit") .tasklet(readWriteProductsTaskletLimit()) .startLimit(3) .build(); return jobBuilders.get("importProductsLimitJob") .start(decompressStepLimit) .next(readWriteProductsStepLimit) .build(); }
@Bean public Job importProductsJob() { Step importProductsStep = stepBuilders.get("importProductsStep") .<String, String>chunk(5) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .skipLimit(5) .skip(FlatFileParseException.class) .skip(DataIntegrityViolationException.class) .listener(skipListener) .build(); return jobBuilders.get("importProductsJob") .start(importProductsStep) .build(); }
@Bean public Job job() { Step step = stepBuilders.get("step") .<String, String>chunk(5) .reader(reader()) .processor(processor()) .writer(writer()) .faultTolerant().retryLimit(3).skipLimit(3) .retry(OptimisticLockingFailureException.class) .retry(DeadlockLoserDataAccessException.class) .skip(DeadlockLoserDataAccessException.class) .listener(mockRetryListener()) .listener(retryListener()) .build(); return jobBuilders.get("job").start(step).build(); }
@Bean public Step taskletStep() { return stepCreators.get("taskletStep") .tasklet(tasklet()) .taskExecutor(getAsyncExecutor()) .build(); }
@Bean public Step chunkStep() { return stepCreators.get("chunkStep") .<Department, Department>chunk(5) .reader(reader()) .processor(processor()) .writer(writer()) .taskExecutor(getAsyncExecutor()) .build(); }
@Bean("step1") public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Employee> reader, ItemProcessor<Employee, Permanent> processor) { return stepBuilderFactory.get("step1") .<Employee, Permanent>chunk(5) .reader(reader) .processor(processor) .writer(writer()) .build(); }
@Bean("step2") public Step step2(StepBuilderFactory stepBuilderFactory, ItemReader<Employee> reader, ItemProcessor<Employee, Permanent> processor) { return stepBuilderFactory.get("step2") .<Employee, Permanent>chunk(2) .reader(reader) .processor(processor) .writer(xmlWriter()) .build(); }
private Step createTaskletStepWithListener(final String taskName, StepExecutionListener stepExecutionListener) { return this.steps.get(taskName) .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { return RepeatStatus.FINISHED; } }) .transactionAttribute(getTransactionAttribute()) .listener(stepExecutionListener) .build(); }
private Step createTaskletStep(final String taskName) { return this.steps.get(taskName) .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { return RepeatStatus.FINISHED; } }) .transactionAttribute(getTransactionAttribute()) .build(); }
public Step step2(){ return stepBuilderFactory.get("step2") .tasklet(new Tasklet(){ @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception{ // checks if our collection exists Boolean doesexist = mongoTemplate.collectionExists("foo"); System.out.println("Status of collection returns :::::::::::::::::::::" + doesexist); // show all DBObjects in foo collection DBCursor alldocs = mongoTemplate.getCollection("foo").find(); List<DBObject> dbarray = alldocs.toArray(); System.out.println("list of db objects returns:::::::::::::::::::::" + dbarray); // execute the three methods we defined for querying the foo collection String result = doCollect(); String resultTwo = doCollectTwo(); String resultThree = doCollectThree(); System.out.println(" RESULT:::::::::::::::::::::" + result); System.out.println(" RESULT:::::::::::::::::::::" + resultTwo); System.out.println(" RESULT:::::::::::::::::::::" + resultThree); return RepeatStatus.FINISHED; } }).build(); }
@Bean public Step githubStep1() throws Exception { return stepBuilderFactory.get("githubStep1") .tasklet(new Tasklet() { public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { // GitHubDTO gitHubDTO = githubJobUI.getDummyStats(); GitHubDTO gitHubDTO = githubJobUI.getGitHubStats(); long currentStatId = githubJobUI.getCurrentGithubId(); gitHubDTO.setStatId(currentStatId); gitHubDTO.setStatDate(new Date()); githubJobUI.saveGithubStats(gitHubDTO); chunkContext .getStepContext() .getStepExecution() .getJobExecution() .getExecutionContext() .put("statId", currentStatId); logger.info("Working with GitHubDTO: " + gitHubDTO.toString()); return RepeatStatus.FINISHED; } }) .listener(githubPromotionListener()) .build(); }
@Bean public Step chunkStep() { return stepBuilders.get("chunkStep") .<Customer, Customer>chunk(20) .reader(reader()) .processor(processor()) .writer(writer()) .build(); }
/** * Create job one, step one. * * @return Step step one */ @Bean public Step oneStep() { return stepBuilderFactory.get("one-step") // .<Integer, SampleObject>chunk(STEP_ONE_CHUNK) // .reader(reader) .processor(processor) .writer(writter) // .build(); }