I have several csv files with next structure
{“unixTimestampMs”:1609531200000,“timezoneId”:“Europe/Moscow”,“payload”:…}
I need to read all of them in a zstream by time condition. (ex. take 10 records from first file, then 5 records from second file, 15 records from third file and so on by round robin system and ordered by time)
Previously I had single file and it was like that:
def fileStream: Stream[Throwable, Result] =
for {
res <-
fileSource(appConf.sourceFilePath)
.mapZIO(transactionService.mkMsgFromString)
.collectSome
.groupAdjacentBy(_.unixTimestampMs)
.map(evenlyDistributeMessages)
.flatMap(ZStream.fromChunk(_))
.mapZIO(sleepIfRequired)
.mapZIOPar(proxyConf.maxConnections, proxyConf.maxConnections)(emitMessage(_, uriToSendMessages))
} yield res
where
def fileSource(file: String): Stream[Throwable, String] =
ZStream
.acquireReleaseWith(transactionsFileService.getFileAsSource(file))(releaseFile)
.flatMap(f => ZStream.fromIterator(f.getLines().drop(1), 256))
so fileSource gives me my expected stream.
How to rewrite it for multiple files case and merge in a zstream ordered by time?