Home Parsing stops with Akka Streams mapAsync
Reply: 1

Parsing stops with Akka Streams mapAsync

Cassie
1#
Cassie Published in 2018-02-13 14:20:14Z

I am parsing 50000 records which contain their titles and URLs on the web page. While parsing, I am writing them to the database, which is PostgreSQL. I deployed my application using docker-compose. However, it keeps stopping on some page without any reason. I tried to write some logs to figure out what's happening, but there is no connection error or anything like that.

Here is my code for parsing and writing to the database:

object App {
  val db = Database.forURL("jdbc:postgresql://db:5432/toloka?user=user&password=password")
  val browser = JsoupBrowser()
  val catRepo = new CategoryRepo(db)
  val torrentRepo = new TorrentRepo(db)
  val torrentForParseRepo = new TorrentForParseRepo(db)
  val parallelismFactor = 10
  val groupFactor = 10
  implicit val system = ActorSystem("TolokaParser")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

def parseAndWriteTorrentsForParseToDb(doc: App.browser.DocumentType) = {
    Source(getRecordsLists(doc))
      .grouped(groupFactor)
      .mapAsync(parallelismFactor) { torrentForParse: Seq[TorrentForParse] =>
        torrentForParseRepo.createInBatch(torrentForParse)
      }
      .runWith(Sink.ignore)
  }

 def getRecordsLists(doc: App.browser.DocumentType) = {
    val pages = generatePagesFromHomePage(doc)
    println("torrent links generated")
    println(pages.size)
    val result = for {
      page <- pages
    } yield {
      println(s"Parsing torrent list...$page")
      val tmp = getTitlesAndLinksTuple(getTitlesList(browser.get(page)), getLinksList(browser.get(page)))
      println(tmp.size)
      tmp
    }
    println("torrent links and names tupled")
    result flatten
  }

}

What may be the cause of such problems?

EmiCareOfCell44
2#
EmiCareOfCell44 Reply to 2018-02-13 22:11:52Z

Put a supervision strategy to avoid stream finalization in case of error. Such as:

val decider: Supervision.Decider = {
  case _ => Supervision.Resume
}

def parseAndWriteTorrentsForParseToDb = {
  Source.fromIterator(() => List(1,2,3).toIterator)
    .grouped(1)
    .mapAsync(1) { torrentForParse: Seq[Int] =>
      Future { 0 }
    }
    .withAttributes(ActorAttributes.supervisionStrategy(decider))
    .runWith(Sink.ignore)
}

The stream should not stop with this async stage config

You need to login account before you can post.

About| Privacy statement| Terms of Service| Advertising| Contact us| Help| Sitemap|
Processed in 0.361486 second(s) , Gzip On .

© 2016 Powered by mzan.com design MATCHINFO