Revision 3a4ffe5e

Added by François ARMAND about 7 years ago

Fixes #10119: Scala actors are deprecated in scala 2.11 and removed in 2.12: update inventory-endpoint

package com.normation.inventory.provisioning.endpoint
import scala.util.control.NonFatal
import com.unboundid.ldif.LDIFChangeRecord
import org.joda.time.Duration
import org.joda.time.format.PeriodFormat
import org.springframework.http.{ HttpStatus, ResponseEntity }
import org.springframework.http.HttpHeaders
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod
import org.springframework.web.multipart.MultipartFile
import org.springframework.http.{HttpStatus,ResponseEntity}
import com.normation.inventory.domain._
import javax.servlet.http.HttpServletRequest
import monix.execution.Ack
import monix.execution.Ack.Continue
import monix.execution.Scheduler
import monix.execution.atomic.AtomicInt
import monix.reactive.Observer
import monix.reactive.OverflowStrategy.Unbounded
import monix.reactive.observers.BufferedSubscriber
import monix.reactive.observers.Subscriber
import net.liftweb.common._
import scala.collection.JavaConversions._
import org.joda.time.Duration
import org.joda.time.format.PeriodFormat
import org.slf4j.LoggerFactory
import{IOException, File, FileInputStream, InputStream, FileOutputStream}
import org.springframework.web.bind.annotation.RequestMethod
import org.springframework.web.bind.annotation.RequestMethod.GET
import org.springframework.web.bind.annotation.RequestMethod.POST
import FusionReportEndpoint._
import com.unboundid.ldif.LDIFChangeRecord
import javax.servlet.http.HttpServletRequest
import org.springframework.util.MultiValueMap
import org.springframework.util.CollectionUtils.MultiValueMapAdapter
import org.springframework.http.HttpHeaders
import scala.util.control.NonFatal
import com.normation.inventory.domain.CertifiedKey
import com.normation.inventory.domain.UndefinedKey
import com.normation.inventory.domain.InventoryReport
object FusionReportEndpoint{
val printer = PeriodFormat.getDefault
//messages to know if the backend accepted to process the report
final case object OkToSave
final case object TooManyInQueue
final case class QueueInfo(max: Int, current: Int)
final case object GetQueueInfo
class FusionReportEndpoint(
, reportSaver:ReportSaver[Seq[LDIFChangeRecord]]
, queueSize: Int
, repo : FullInventoryRepository[Seq[LDIFChangeRecord]]
, digestService : InventoryDigestServiceV1
unmarshaller : ReportUnmarshaller
, reportSaver : ReportSaver[Seq[LDIFChangeRecord]]
, maxQueueSize : Int
, repo : FullInventoryRepository[Seq[LDIFChangeRecord]]
, digestService: InventoryDigestServiceV1
) extends Loggable {
//start the report processor actor
// current queue size
lazy val queueSize = AtomicInt(0)
// the asynchrone, bufferised processor
lazy val reportProcess = {
// the synchronize report processor
// we use the i/o scheduler given the kind of task for report processor
val syncReportProcess = Subscriber(new ProcessInventoryObserver(queueSize),
// and we async/buf it. The "overflow" strategy is unbound, because we
// will manage it by hand with the queuesize
BufferedSubscriber[InventoryReport](syncReportProcess, Unbounded)
* that the endpoint is alive
value = Array("/api/status"),
value = Array("/api/status"),
method = Array(RequestMethod.GET)
def checkStatus() = new ResponseEntity("OK", HttpStatus.OK)
* Info on current number of elements in queue
value = Array("/api/info"),
value = Array("/api/info"),
method = Array(RequestMethod.GET)
def queueInfo() = {
(ReportProcessor !? GetQueueInfo) match {
case QueueInfo(max, current) =>
val saturated = (current+1) >= max
val code = if(saturated) HttpStatus.TOO_MANY_REQUESTS else HttpStatus.OK
val json = s"""{"queueMaxSize":$max, "queueFillCount":$current, "queueSaturated":$saturated}"""
val headers = new HttpHeaders()
headers.add("content-type", "application/json")
new ResponseEntity(json, headers, code)
case x =>
new ResponseEntity(s"Internal error: the queue info query answered with the unknown message: '${x}'", HttpStatus.INTERNAL_SERVER_ERROR)
// get the current size, the remaining of the answer is based on that
val current = queueSize.get
val saturated = (current+1) >= maxQueueSize
val code = if(saturated) HttpStatus.TOO_MANY_REQUESTS else HttpStatus.OK
val json = s"""{"queueMaxSize":$maxQueueSize, "queueFillCount":$current, "queueSaturated":$saturated}"""
val headers = new HttpHeaders()
headers.add("content-type", "application/json")
new ResponseEntity(json, headers, code)
* @return
value = Array("/upload"),
value = Array("/upload"),
method = Array(RequestMethod.POST)
def onSubmit(request: HttpServletRequest) = {
def save(report: InventoryReport) = {
(ReportProcessor !? report) match {
case OkToSave =>
//release connection
val canDo = synchronized {
if(queueSize.incrementAndGet(1) <= maxQueueSize) {
// the decrement will be done by the report processor
} else {
// clean the not used increment
if(canDo) {
//queue the inventory processing
new ResponseEntity("Inventory correctly received and sent to inventory processor.\n", HttpStatus.ACCEPTED)
case TooManyInQueue =>
} else {
new ResponseEntity("Too many inventories waiting to be saved.\n", HttpStatus.SERVICE_UNAVAILABLE)
def parseInventory(inventoryFile : MultipartFile, signatureFile : Option[MultipartFile]) = {
def parseInventory(inventoryFile : MultipartFile, signatureFile : Option[MultipartFile]): ResponseEntity[String]= {
val inventory = inventoryFile.getOriginalFilename()
//copy the session file somewhere where it won't be deleted on that method return"New input inventory: '${inventory}'")
val signatureStream = sig.getInputStream()
val inventoryStream = inventoryFile.getInputStream()
val response = for {
digest <- digestService.parse(signatureStream)
digest <- digestService.parse(signatureStream)
(key,_) <- digestService.getKey(report)
checked <- digestService.check(key, digest, inventoryFile.getInputStream)
checked <- digestService.check(key, digest, inventoryFile.getInputStream)
} yield {
if (checked) {
// Signature is valid, send it to save engine
request match {
case multipart: DefaultMultipartHttpServletRequest =>
import scala.collection.JavaConversions._
val params : Map[String,MultipartFile]= multipart.getFileMap.toMap
import scala.collection.JavaConverters._
val params : scala.collection.mutable.Map[String, MultipartFile] = multipart.getFileMap.asScala
* params are :
val inventoryParam = "file"
val signatureParam = "signature"
(params.get(inventoryParam),params.get(signatureParam)) match {
(params.get(inventoryParam), params.get(signatureParam)) match {
// No inventory, error
case (None,_) =>
defaultBadAnswer("No inventory sent")
import scala.actors.Actor
import Actor._
* An asynchronous actor process the query
* Encapsulate the logic to process new incoming inventories.
* The processing is purelly synchrone and monotheaded,
* asynchronicity and multithreading are managed in the caller.
* It is not the consumer that manage the queue size, it only
* decrease it when it terminates a processing.
private object ReportProcessor extends Actor {
self =>
//a message from the processor to self
//saying it finish processing
case object Processed
var inQueue = 0
//that actor only check the number of queued elements and decide to
//queue it or not
override def act = {
loop { react {
case GetQueueInfo =>
reply(QueueInfo(queueSize, inQueue))
case i:InventoryReport =>
if(inQueue < queueSize) {
actualProcessor ! i
inQueue += 1
} else {
logger.warn(s"Not processing inventory ${} because there is already the maximum number (${inQueue}) of inventory waiting to be processed")
class ProcessInventoryObserver(queueSize: AtomicInt) extends Observer.Sync[InventoryReport] {
case Processed =>
inQueue -= 1
} }
* The only caller. It is not allowed to throw (non fatal) exceptions
def onNext(report: InventoryReport): Ack = {
//this is the actor that process the report
val actualProcessor = new Actor {
override def act = {
loop { react {
case i:InventoryReport =>
self ! Processed
} }
try {
} catch {
case NonFatal(ex) =>
logger.error(s"Error when processing inventory report '${}': ${ex.getMessage}", ex)
// always tell the feeder that the report is processed
// this is the contract of observer
* That one should not be called. Log the error.
def onError(ex: Throwable): Unit = {
logger.error(s"The async inventory proccessor got an 'error' message with the following exception: ${ex.getMessage}", ex)
* That one should not happens, as the normal usage is to wait
* for the closure of web server
def onComplete(): Unit = {
logger.error(s"The async inventory proccessor got an 'termination' message.")

