Revision 3a4ffe5e
Added by François ARMAND about 7 years ago
inventory-provisioning-web/pom.xml | ||
---|---|---|
<version>${rudder-version}</version>
|
||
</dependency>
|
||
|
||
<dependency>
|
||
<groupId>org.scala-lang</groupId>
|
||
<artifactId>scala-actors</artifactId>
|
||
<version>${scala-version}</version>
|
||
</dependency>
|
||
|
||
<!-- Compile dependencies -->
|
||
<dependency>
|
||
<groupId>commons-fileupload</groupId>
|
||
<artifactId>commons-fileupload</artifactId>
|
||
<version>1.3.2</version>
|
||
</dependency>
|
||
<dependency>
|
||
<groupId>io.monix</groupId>
|
||
<artifactId>monix-reactive_${scala-binary-version}</artifactId>
|
||
<version>${monix-version}</version>
|
||
</dependency>
|
||
<!-- Needed for fileupload -->
|
||
<dependency>
|
||
<groupId>commons-io</groupId>
|
inventory-provisioning-web/src/main/scala/com/normation/inventory/provisioning/endpoint/FusionReportEndpoint.scala | ||
---|---|---|
package com.normation.inventory.provisioning.endpoint
|
||
|
||
|
||
import scala.util.control.NonFatal
|
||
|
||
import com.normation.inventory.services.core.FullInventoryRepository
|
||
import com.unboundid.ldif.LDIFChangeRecord
|
||
|
||
import org.joda.time.Duration
|
||
import org.joda.time.format.PeriodFormat
|
||
import org.springframework.http.{ HttpStatus, ResponseEntity }
|
||
import org.springframework.http.HttpHeaders
|
||
import org.springframework.stereotype.Controller
|
||
import org.springframework.web.bind.annotation.RequestMapping
|
||
import org.springframework.web.bind.annotation.RequestMethod
|
||
import org.springframework.web.multipart.MultipartFile
|
||
import org.springframework.web.multipart.support.DefaultMultipartHttpServletRequest
|
||
import org.springframework.http.{HttpStatus,ResponseEntity}
|
||
import com.normation.inventory.domain._
|
||
import com.normation.inventory.services.provisioning._
|
||
|
||
import javax.servlet.http.HttpServletRequest
|
||
import monix.execution.Ack
|
||
import monix.execution.Ack.Continue
|
||
import monix.execution.Scheduler
|
||
import monix.execution.atomic.AtomicInt
|
||
import monix.reactive.Observer
|
||
import monix.reactive.OverflowStrategy.Unbounded
|
||
import monix.reactive.observers.BufferedSubscriber
|
||
import monix.reactive.observers.Subscriber
|
||
import net.liftweb.common._
|
||
import scala.collection.JavaConversions._
|
||
import org.joda.time.Duration
|
||
import org.joda.time.format.PeriodFormat
|
||
import org.slf4j.LoggerFactory
|
||
import java.io.{IOException, File, FileInputStream, InputStream, FileOutputStream}
|
||
import org.springframework.web.bind.annotation.RequestMethod
|
||
import org.springframework.web.bind.annotation.RequestMethod.GET
|
||
import org.springframework.web.bind.annotation.RequestMethod.POST
|
||
|
||
import FusionReportEndpoint._
|
||
import com.unboundid.ldif.LDIFChangeRecord
|
||
import javax.servlet.http.HttpServletRequest
|
||
import com.normation.inventory.services.core.FullInventoryRepository
|
||
import org.springframework.util.MultiValueMap
|
||
import org.springframework.util.CollectionUtils.MultiValueMapAdapter
|
||
import org.springframework.http.HttpHeaders
|
||
import scala.util.control.NonFatal
|
||
import com.normation.inventory.services.provisioning.ReportSaver
|
||
import com.normation.inventory.domain.CertifiedKey
|
||
import com.normation.inventory.services.provisioning.ReportUnmarshaller
|
||
import com.normation.inventory.domain.UndefinedKey
|
||
import com.normation.inventory.services.provisioning.InventoryDigestServiceV1
|
||
import com.normation.inventory.domain.InventoryReport
|
||
|
||
object FusionReportEndpoint{
|
||
val printer = PeriodFormat.getDefault
|
||
}
|
||
|
||
//messages to know if the backend accepted to process the report
|
||
final case object OkToSave
|
||
final case object TooManyInQueue
|
||
final case class QueueInfo(max: Int, current: Int)
|
||
final case object GetQueueInfo
|
||
|
||
|
||
@Controller
|
||
class FusionReportEndpoint(
|
||
unmarshaller:ReportUnmarshaller
|
||
, reportSaver:ReportSaver[Seq[LDIFChangeRecord]]
|
||
, queueSize: Int
|
||
, repo : FullInventoryRepository[Seq[LDIFChangeRecord]]
|
||
, digestService : InventoryDigestServiceV1
|
||
unmarshaller : ReportUnmarshaller
|
||
, reportSaver : ReportSaver[Seq[LDIFChangeRecord]]
|
||
, maxQueueSize : Int
|
||
, repo : FullInventoryRepository[Seq[LDIFChangeRecord]]
|
||
, digestService: InventoryDigestServiceV1
|
||
) extends Loggable {
|
||
|
||
//start the report processor actor
|
||
ReportProcessor.start
|
||
|
||
|
||
// current queue size
|
||
lazy val queueSize = AtomicInt(0)
|
||
|
||
// the asynchrone, bufferised processor
|
||
lazy val reportProcess = {
|
||
// the synchronize report processor
|
||
// we use the i/o scheduler given the kind of task for report processor
|
||
val syncReportProcess = Subscriber(new ProcessInventoryObserver(queueSize), Scheduler.io())
|
||
|
||
// and we async/buf it. The "overflow" strategy is unbound, because we
|
||
// will manage it by hand with the queuesize
|
||
BufferedSubscriber[InventoryReport](syncReportProcess, Unbounded)
|
||
}
|
||
|
||
|
||
/**
|
||
... | ... | |
* that the endpoint is alive
|
||
*/
|
||
@RequestMapping(
|
||
value = Array("/api/status"),
|
||
value = Array("/api/status"),
|
||
method = Array(RequestMethod.GET)
|
||
)
|
||
def checkStatus() = new ResponseEntity("OK", HttpStatus.OK)
|
||
... | ... | |
* Info on current number of elements in queue
|
||
*/
|
||
@RequestMapping(
|
||
value = Array("/api/info"),
|
||
value = Array("/api/info"),
|
||
method = Array(RequestMethod.GET)
|
||
)
|
||
def queueInfo() = {
|
||
(ReportProcessor !? GetQueueInfo) match {
|
||
case QueueInfo(max, current) =>
|
||
val saturated = (current+1) >= max
|
||
val code = if(saturated) HttpStatus.TOO_MANY_REQUESTS else HttpStatus.OK
|
||
val json = s"""{"queueMaxSize":$max, "queueFillCount":$current, "queueSaturated":$saturated}"""
|
||
val headers = new HttpHeaders()
|
||
headers.add("content-type", "application/json")
|
||
new ResponseEntity(json, headers, code)
|
||
case x =>
|
||
new ResponseEntity(s"Internal error: the queue info query answered with the unknown message: '${x}'", HttpStatus.INTERNAL_SERVER_ERROR)
|
||
}
|
||
// get the current size, the remaining of the answer is based on that
|
||
val current = queueSize.get
|
||
val saturated = (current+1) >= maxQueueSize
|
||
val code = if(saturated) HttpStatus.TOO_MANY_REQUESTS else HttpStatus.OK
|
||
val json = s"""{"queueMaxSize":$maxQueueSize, "queueFillCount":$current, "queueSaturated":$saturated}"""
|
||
val headers = new HttpHeaders()
|
||
headers.add("content-type", "application/json")
|
||
new ResponseEntity(json, headers, code)
|
||
}
|
||
|
||
/**
|
||
... | ... | |
* @return
|
||
*/
|
||
@RequestMapping(
|
||
value = Array("/upload"),
|
||
value = Array("/upload"),
|
||
method = Array(RequestMethod.POST)
|
||
)
|
||
def onSubmit(request: HttpServletRequest) = {
|
||
... | ... | |
}
|
||
|
||
def save(report: InventoryReport) = {
|
||
(ReportProcessor !? report) match {
|
||
case OkToSave =>
|
||
//release connection
|
||
|
||
val canDo = synchronized {
|
||
if(queueSize.incrementAndGet(1) <= maxQueueSize) {
|
||
// the decrement will be done by the report processor
|
||
true
|
||
} else {
|
||
// clean the not used increment
|
||
queueSize.decrement(1)
|
||
false
|
||
}
|
||
}
|
||
|
||
if(canDo) {
|
||
//queue the inventory processing
|
||
reportProcess.onNext(report)
|
||
new ResponseEntity("Inventory correctly received and sent to inventory processor.\n", HttpStatus.ACCEPTED)
|
||
case TooManyInQueue =>
|
||
} else {
|
||
new ResponseEntity("Too many inventories waiting to be saved.\n", HttpStatus.SERVICE_UNAVAILABLE)
|
||
}
|
||
}
|
||
|
||
def parseInventory(inventoryFile : MultipartFile, signatureFile : Option[MultipartFile]) = {
|
||
def parseInventory(inventoryFile : MultipartFile, signatureFile : Option[MultipartFile]): ResponseEntity[String]= {
|
||
val inventory = inventoryFile.getOriginalFilename()
|
||
//copy the session file somewhere where it won't be deleted on that method return
|
||
logger.info(s"New input inventory: '${inventory}'")
|
||
... | ... | |
val signatureStream = sig.getInputStream()
|
||
val inventoryStream = inventoryFile.getInputStream()
|
||
val response = for {
|
||
digest <- digestService.parse(signatureStream)
|
||
digest <- digestService.parse(signatureStream)
|
||
(key,_) <- digestService.getKey(report)
|
||
checked <- digestService.check(key, digest, inventoryFile.getInputStream)
|
||
checked <- digestService.check(key, digest, inventoryFile.getInputStream)
|
||
} yield {
|
||
if (checked) {
|
||
// Signature is valid, send it to save engine
|
||
... | ... | |
|
||
request match {
|
||
case multipart: DefaultMultipartHttpServletRequest =>
|
||
import scala.collection.JavaConversions._
|
||
val params : Map[String,MultipartFile]= multipart.getFileMap.toMap
|
||
import scala.collection.JavaConverters._
|
||
val params : scala.collection.mutable.Map[String, MultipartFile] = multipart.getFileMap.asScala
|
||
|
||
/*
|
||
* params are :
|
||
... | ... | |
val inventoryParam = "file"
|
||
val signatureParam = "signature"
|
||
|
||
(params.get(inventoryParam),params.get(signatureParam)) match {
|
||
(params.get(inventoryParam), params.get(signatureParam)) match {
|
||
// No inventory, error
|
||
case (None,_) =>
|
||
defaultBadAnswer("No inventory sent")
|
||
... | ... | |
}
|
||
}
|
||
|
||
import scala.actors.Actor
|
||
import Actor._
|
||
|
||
/**
|
||
* An asynchronous actor process the query
|
||
* Encapsulate the logic to process new incoming inventories.
|
||
*
|
||
* The processing is purelly synchrone and monotheaded,
|
||
* asynchronicity and multithreading are managed in the caller.
|
||
*
|
||
* It is not the consumer that manage the queue size, it only
|
||
* decrease it when it terminates a processing.
|
||
*
|
||
*/
|
||
private object ReportProcessor extends Actor {
|
||
self =>
|
||
|
||
//a message from the processor to self
|
||
//saying it finish processing
|
||
case object Processed
|
||
|
||
var inQueue = 0
|
||
|
||
//that actor only check the number of queued elements and decide to
|
||
//queue it or not
|
||
override def act = {
|
||
loop { react {
|
||
case GetQueueInfo =>
|
||
reply(QueueInfo(queueSize, inQueue))
|
||
|
||
case i:InventoryReport =>
|
||
|
||
if(inQueue < queueSize) {
|
||
actualProcessor ! i
|
||
reply(OkToSave)
|
||
inQueue += 1
|
||
} else {
|
||
logger.warn(s"Not processing inventory ${i.name} because there is already the maximum number (${inQueue}) of inventory waiting to be processed")
|
||
reply(TooManyInQueue)
|
||
}
|
||
class ProcessInventoryObserver(queueSize: AtomicInt) extends Observer.Sync[InventoryReport] {
|
||
|
||
case Processed =>
|
||
inQueue -= 1
|
||
} }
|
||
}
|
||
/*
|
||
* The only caller. It is not allowed to throw (non fatal) exceptions
|
||
*/
|
||
def onNext(report: InventoryReport): Ack = {
|
||
|
||
//this is the actor that process the report
|
||
val actualProcessor = new Actor {
|
||
override def act = {
|
||
loop { react {
|
||
case i:InventoryReport =>
|
||
saveReport(i)
|
||
self ! Processed
|
||
} }
|
||
try {
|
||
saveReport(report)
|
||
} catch {
|
||
case NonFatal(ex) =>
|
||
logger.error(s"Error when processing inventory report '${report.name}': ${ex.getMessage}", ex)
|
||
}
|
||
// always tell the feeder that the report is processed
|
||
queueSize.decrement(1)
|
||
// this is the contract of observer
|
||
Continue
|
||
}
|
||
|
||
/*
|
||
* That one should not be called. Log the error.
|
||
*/
|
||
def onError(ex: Throwable): Unit = {
|
||
logger.error(s"The async inventory proccessor got an 'error' message with the following exception: ${ex.getMessage}", ex)
|
||
}
|
||
|
||
actualProcessor.start
|
||
/*
|
||
* That one should not happens, as the normal usage is to wait
|
||
* for the closure of web server
|
||
*/
|
||
def onComplete(): Unit = {
|
||
logger.error(s"The async inventory proccessor got an 'termination' message.")
|
||
}
|
||
|
||
}
|
||
|
Also available in: Unified diff
Fixes #10119: Scala actors are deprecated in scala 2.11 and removed in 2.12: update inventory-endpoint