Фильтрация неактивных сайтов на Scala

09.01.2017
11
6
#1
Есть текстовый файл с базой сайтов > 100k, может кто-нибудь подсказать как грамотно организовать параллельную обработку в функциональном стиле с целью отфильтровать неактивные? С помощью Future или AkkaStreams

Мои попытки:
Код:
@throws(classOf[java.io.IOException])
def get(url: String) = io.Source.fromURL(url).mkString

def isUp(url: String):Future[Boolean] =  Future {
  try {
    get(url)
    true
  } catch {
    case e: java.io.IOException => {
      println("URL " + url + " is down")
      false
    }
  }
}

val list = Source.fromFile("sites.txt").getLines().toList.map("http://" + _)
val futures= list.map(isUp)
Тут есть 2 проблемы:
  1. Создаю список из фьючеров, которые мгновенно запускаются параллельно, а учитывая размер коллекции, через минуту мой комп просто зависает, нужно что-то аналогичное тред пулу, только в функциональном мире
  2. Производное от 1, нужно подождать пока все фьючеры отработают, у меня есть мысли, постоянно проходить по списку и смотреть isCompleted, но это какой-то костыль
 

Kmet

Java Team
25.05.2006
1 036
8
#2
1) в Future можно передать свой ExecutionContext
по умолчанию используется ForkJoinPool, у которого, опять же по умолчанию, по потоку на ядро.
т.е. у вас не фьючеры стартуют мгновенно, а вы просто загружаете все ядра.


2) последовательно пройти список фьючеров, блокируясь до получения результата на каждом
или, собственно, Future.traverse и компания
[doublepost=1484228290,1484228209][/doublepost]но да, стримы тут были бы уместнее
 
Симпатии: Понравилось cutoff
09.01.2017
11
6
#3
Спасибо, заработало, вышел вот такой код:
Код:
import java.io.File
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source
import scalaj.http.Http

object FilterSites {
  def main(args: Array[String]): Unit = {
    def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) {
      val p = new java.io.PrintWriter(f)
      try { op(p) } finally { p.close() }
    }

    implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(50))

    val list = Source.fromFile("sites.txt").getLines().toList.map("http://"+ _)

    def isUp(url: String):Future[Boolean] =  Future {
      val success = try { Http(url).timeout(3000,1000).asString.isSuccess} catch {case e: Exception => false}
      success match {
        case true => println(Console.GREEN + url + " is up ")
        case false => println(Console.RED + url + " is down ")
      }
      success
    }

    val futures: List[Future[Boolean]] = list.map(isUp)
    Future.sequence(futures).map(result => {
      println("Success!")
      printToFile(new File("result.txt")) { p =>
        list.filter(url => result(list.indexOf(url))).foreach(p.println)
      }
    })
  }
}
}
 
Последнее редактирование: