Project

General

Profile

« Previous | Next » 

Revision 3a4ffe5e

Added by François ARMAND about 7 years ago

Fixes #10119: Scala actors are deprecated in scala 2.11 and removed in 2.12: update inventory-endpoint

View differences:

inventory-provisioning-web/pom.xml
<version>${rudder-version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-actors</artifactId>
<version>${scala-version}</version>
</dependency>
<!-- Compile dependencies -->
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.monix</groupId>
<artifactId>monix-reactive_${scala-binary-version}</artifactId>
<version>${monix-version}</version>
</dependency>
<!-- Needed for fileupload -->
<dependency>
<groupId>commons-io</groupId>
inventory-provisioning-web/src/main/scala/com/normation/inventory/provisioning/endpoint/FusionReportEndpoint.scala
package com.normation.inventory.provisioning.endpoint
import scala.util.control.NonFatal
import com.normation.inventory.services.core.FullInventoryRepository
import com.unboundid.ldif.LDIFChangeRecord
import org.joda.time.Duration
import org.joda.time.format.PeriodFormat
import org.springframework.http.{ HttpStatus, ResponseEntity }
import org.springframework.http.HttpHeaders
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod
import org.springframework.web.multipart.MultipartFile
import org.springframework.web.multipart.support.DefaultMultipartHttpServletRequest
import org.springframework.http.{HttpStatus,ResponseEntity}
import com.normation.inventory.domain._
import com.normation.inventory.services.provisioning._
import javax.servlet.http.HttpServletRequest
import monix.execution.Ack
import monix.execution.Ack.Continue
import monix.execution.Scheduler
import monix.execution.atomic.AtomicInt
import monix.reactive.Observer
import monix.reactive.OverflowStrategy.Unbounded
import monix.reactive.observers.BufferedSubscriber
import monix.reactive.observers.Subscriber
import net.liftweb.common._
import scala.collection.JavaConversions._
import org.joda.time.Duration
import org.joda.time.format.PeriodFormat
import org.slf4j.LoggerFactory
import java.io.{IOException, File, FileInputStream, InputStream, FileOutputStream}
import org.springframework.web.bind.annotation.RequestMethod
import org.springframework.web.bind.annotation.RequestMethod.GET
import org.springframework.web.bind.annotation.RequestMethod.POST
import FusionReportEndpoint._
import com.unboundid.ldif.LDIFChangeRecord
import javax.servlet.http.HttpServletRequest
import com.normation.inventory.services.core.FullInventoryRepository
import org.springframework.util.MultiValueMap
import org.springframework.util.CollectionUtils.MultiValueMapAdapter
import org.springframework.http.HttpHeaders
import scala.util.control.NonFatal
import com.normation.inventory.services.provisioning.ReportSaver
import com.normation.inventory.domain.CertifiedKey
import com.normation.inventory.services.provisioning.ReportUnmarshaller
import com.normation.inventory.domain.UndefinedKey
import com.normation.inventory.services.provisioning.InventoryDigestServiceV1
import com.normation.inventory.domain.InventoryReport
object FusionReportEndpoint{
val printer = PeriodFormat.getDefault
}
//messages to know if the backend accepted to process the report
final case object OkToSave
final case object TooManyInQueue
final case class QueueInfo(max: Int, current: Int)
final case object GetQueueInfo
@Controller
class FusionReportEndpoint(
unmarshaller:ReportUnmarshaller
, reportSaver:ReportSaver[Seq[LDIFChangeRecord]]
, queueSize: Int
, repo : FullInventoryRepository[Seq[LDIFChangeRecord]]
, digestService : InventoryDigestServiceV1
unmarshaller : ReportUnmarshaller
, reportSaver : ReportSaver[Seq[LDIFChangeRecord]]
, maxQueueSize : Int
, repo : FullInventoryRepository[Seq[LDIFChangeRecord]]
, digestService: InventoryDigestServiceV1
) extends Loggable {
//start the report processor actor
ReportProcessor.start
// current queue size
lazy val queueSize = AtomicInt(0)
// the asynchrone, bufferised processor
lazy val reportProcess = {
// the synchronize report processor
// we use the i/o scheduler given the kind of task for report processor
val syncReportProcess = Subscriber(new ProcessInventoryObserver(queueSize), Scheduler.io())
// and we async/buf it. The "overflow" strategy is unbound, because we
// will manage it by hand with the queuesize
BufferedSubscriber[InventoryReport](syncReportProcess, Unbounded)
}
/**
......
* that the endpoint is alive
*/
@RequestMapping(
value = Array("/api/status"),
value = Array("/api/status"),
method = Array(RequestMethod.GET)
)
def checkStatus() = new ResponseEntity("OK", HttpStatus.OK)
......
* Info on current number of elements in queue
*/
@RequestMapping(
value = Array("/api/info"),
value = Array("/api/info"),
method = Array(RequestMethod.GET)
)
def queueInfo() = {
(ReportProcessor !? GetQueueInfo) match {
case QueueInfo(max, current) =>
val saturated = (current+1) >= max
val code = if(saturated) HttpStatus.TOO_MANY_REQUESTS else HttpStatus.OK
val json = s"""{"queueMaxSize":$max, "queueFillCount":$current, "queueSaturated":$saturated}"""
val headers = new HttpHeaders()
headers.add("content-type", "application/json")
new ResponseEntity(json, headers, code)
case x =>
new ResponseEntity(s"Internal error: the queue info query answered with the unknown message: '${x}'", HttpStatus.INTERNAL_SERVER_ERROR)
}
// get the current size, the remaining of the answer is based on that
val current = queueSize.get
val saturated = (current+1) >= maxQueueSize
val code = if(saturated) HttpStatus.TOO_MANY_REQUESTS else HttpStatus.OK
val json = s"""{"queueMaxSize":$maxQueueSize, "queueFillCount":$current, "queueSaturated":$saturated}"""
val headers = new HttpHeaders()
headers.add("content-type", "application/json")
new ResponseEntity(json, headers, code)
}
/**
......
* @return
*/
@RequestMapping(
value = Array("/upload"),
value = Array("/upload"),
method = Array(RequestMethod.POST)
)
def onSubmit(request: HttpServletRequest) = {
......
}
def save(report: InventoryReport) = {
(ReportProcessor !? report) match {
case OkToSave =>
//release connection
val canDo = synchronized {
if(queueSize.incrementAndGet(1) <= maxQueueSize) {
// the decrement will be done by the report processor
true
} else {
// clean the not used increment
queueSize.decrement(1)
false
}
}
if(canDo) {
//queue the inventory processing
reportProcess.onNext(report)
new ResponseEntity("Inventory correctly received and sent to inventory processor.\n", HttpStatus.ACCEPTED)
case TooManyInQueue =>
} else {
new ResponseEntity("Too many inventories waiting to be saved.\n", HttpStatus.SERVICE_UNAVAILABLE)
}
}
def parseInventory(inventoryFile : MultipartFile, signatureFile : Option[MultipartFile]) = {
def parseInventory(inventoryFile : MultipartFile, signatureFile : Option[MultipartFile]): ResponseEntity[String]= {
val inventory = inventoryFile.getOriginalFilename()
//copy the session file somewhere where it won't be deleted on that method return
logger.info(s"New input inventory: '${inventory}'")
......
val signatureStream = sig.getInputStream()
val inventoryStream = inventoryFile.getInputStream()
val response = for {
digest <- digestService.parse(signatureStream)
digest <- digestService.parse(signatureStream)
(key,_) <- digestService.getKey(report)
checked <- digestService.check(key, digest, inventoryFile.getInputStream)
checked <- digestService.check(key, digest, inventoryFile.getInputStream)
} yield {
if (checked) {
// Signature is valid, send it to save engine
......
request match {
case multipart: DefaultMultipartHttpServletRequest =>
import scala.collection.JavaConversions._
val params : Map[String,MultipartFile]= multipart.getFileMap.toMap
import scala.collection.JavaConverters._
val params : scala.collection.mutable.Map[String, MultipartFile] = multipart.getFileMap.asScala
/*
* params are :
......
val inventoryParam = "file"
val signatureParam = "signature"
(params.get(inventoryParam),params.get(signatureParam)) match {
(params.get(inventoryParam), params.get(signatureParam)) match {
// No inventory, error
case (None,_) =>
defaultBadAnswer("No inventory sent")
......
}
}
import scala.actors.Actor
import Actor._
/**
* An asynchronous actor process the query
* Encapsulate the logic to process new incoming inventories.
*
* The processing is purelly synchrone and monotheaded,
* asynchronicity and multithreading are managed in the caller.
*
* It is not the consumer that manage the queue size, it only
* decrease it when it terminates a processing.
*
*/
private object ReportProcessor extends Actor {
self =>
//a message from the processor to self
//saying it finish processing
case object Processed
var inQueue = 0
//that actor only check the number of queued elements and decide to
//queue it or not
override def act = {
loop { react {
case GetQueueInfo =>
reply(QueueInfo(queueSize, inQueue))
case i:InventoryReport =>
if(inQueue < queueSize) {
actualProcessor ! i
reply(OkToSave)
inQueue += 1
} else {
logger.warn(s"Not processing inventory ${i.name} because there is already the maximum number (${inQueue}) of inventory waiting to be processed")
reply(TooManyInQueue)
}
class ProcessInventoryObserver(queueSize: AtomicInt) extends Observer.Sync[InventoryReport] {
case Processed =>
inQueue -= 1
} }
}
/*
* The only caller. It is not allowed to throw (non fatal) exceptions
*/
def onNext(report: InventoryReport): Ack = {
//this is the actor that process the report
val actualProcessor = new Actor {
override def act = {
loop { react {
case i:InventoryReport =>
saveReport(i)
self ! Processed
} }
try {
saveReport(report)
} catch {
case NonFatal(ex) =>
logger.error(s"Error when processing inventory report '${report.name}': ${ex.getMessage}", ex)
}
// always tell the feeder that the report is processed
queueSize.decrement(1)
// this is the contract of observer
Continue
}
/*
* That one should not be called. Log the error.
*/
def onError(ex: Throwable): Unit = {
logger.error(s"The async inventory proccessor got an 'error' message with the following exception: ${ex.getMessage}", ex)
}
actualProcessor.start
/*
* That one should not happens, as the normal usage is to wait
* for the closure of web server
*/
def onComplete(): Unit = {
logger.error(s"The async inventory proccessor got an 'termination' message.")
}
}

Also available in: Unified diff