import org.springframework.batch.item.ItemProcessor import org.springframework.batch.item.ItemReader import org.springframework.batch.item.ItemWriter import pl.edu.icm.coansys.transformers.PublicationData import pl.edu.icm.coansys.transformers.impl.HBaseClientThrift import pl.edu.icm.coansys.transformers.impl.PublicationAnalysisServiceImpl import pl.edu.icm.synat.api.services.store.StatelessStore import pl.edu.icm.synat.api.services.store.model.Record import pl.edu.icm.synat.content.coansys.importer.HBaseWriter import pl.edu.icm.synat.content.coansys.importer.RecordToPublicationDataProcessor import pl.edu.icm.synat.content.coansys.importer.StoreReader def destinationHost = '0.0.0.0' def destinationPort = 9090 def serverTimeout = 20000 def tableName = 'testTableName' def bufferSize = 100 def store = serviceUtils.getService('Store', StatelessStore.class) ItemReader storeReader = new StoreReader(store, bufferSize) ItemProcessor recordToPublicationDataProcessor = new RecordToPublicationDataProcessor() ItemWriter hBaseWriter = createHBaseWriter(destinationHost, destinationPort, serverTimeout, tableName) private ItemWriter createHBaseWriter(String destinationHost, int destinationPort, int serverTimeout, String tableName) { def hBaseClient = new HBaseClientThrift(destinationHost, destinationPort, serverTimeout) def publicationAnalysisService = new PublicationAnalysisServiceImpl(hBaseClient, tableName, 'm', 'c') def hBaseWriter = new HBaseWriter(publicationAnalysisService, hBaseClient, destinationHost, destinationPort) return hBaseWriter } def isPublicationsAvailable = true while(isPublicationsAvailable) { def publications = [] Record record = null while ((record = storeReader.read()) != null) { publications.add(recordToPublicationDataProcessor.process(record)) } hBaseWriter.write(publications) isPublicationsAvailable = publications.size() > 0 }