Commit 988e94c9 by Girish A Nandolkar

synchronus job creation and getting the result back is done and asynchronus response is done

1 parent 3d6f5032
......@@ -5,6 +5,9 @@ version := "0.1"
scalaVersion := "2.12.4"
//resolvers += "theatr.us" at "http://repo.theatr.us"
PB.targets in Compile := Seq(
scalapb.gen() -> (sourceManaged in Compile).value
)
......@@ -13,6 +16,8 @@ libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-ws" % "2.6.7",
"com.softwaremill.sttp" %% "core" % "1.1.0",
"com.softwaremill.sttp" %% "json4s" % "1.1.0",
"com.typesafe.akka" %% "akka-actor" % "2.4.8",
// "us.theatr" %% "akka-quartz" % "0.3.0",
"io.grpc" % "grpc-netty" % com.trueaccord.scalapb.compiler.Version.grpcJavaVersion,
"com.trueaccord.scalapb" %% "scalapb-runtime" % com.trueaccord.scalapb.compiler.Version.scalapbVersion % "protobuf",
"com.trueaccord.scalapb" %% "scalapb-runtime-grpc" % com.trueaccord.scalapb.compiler.Version.scalapbVersion
......
......@@ -4,7 +4,9 @@ package com.example.protos;
service CreateSparkJob {
rpc CreateJob (SparkJobRequest) returns (SparkJobReply) {}
rpc SparkJobStatus (SparkJobStatusReq) returns (SparkJobReply) {}
rpc SparkJobStatus (SparkJobStatusReq) returns (SparkJobStatusReply) {}
rpc SparkJobResponse (SparkJobResponseReq) returns (SparkJobResponseReply) {}
rpc CreateJobAndGetResult(SparkJobRequest) returns (SparkJobResponseReply) {}
}
// The request message containing the debug, dbName, filename, reporttype, instanceDbs, startDate,endDate, reportGenerateType,tallyCompanyName,allowCatogery.
......@@ -18,15 +20,28 @@ message SparkJobRequest {
string endDate = 7;
string reportGenerateType = 8;
string tallyCompanyName = 9;
string allowedCatogery = 10;
}
// The response message containing the greetings
message SparkJobReply {
string message = 1;
string submissionId = 1;
}
message SparkJobStatusReq{
string submitionId = 1;
string submissionId = 1;
}
message SparkJobStatusReply{
string status = 1;
}
message SparkJobResponseReq{
string submissionId = 1;
string filePath = 2;
}
message SparkJobResponseReply{
string status = 1;
string message = 2;
}
import com.example.protos.sparkJob.CreateSparkJobGrpc.{CreateSparkJob, CreateSparkJobBlockingStub}
import com.example.protos.sparkJob.{CreateSparkJobGrpc, SparkJobReply, SparkJobRequest, SparkJobStatusReq}
import com.example.protos.sparkJob._
import io.grpc.ManagedChannelBuilder
......@@ -9,17 +9,25 @@ object CreateSparkJobClient {
val channel = ManagedChannelBuilder.forAddress("localhost",50051)
.usePlaintext(true).build()
val request = SparkJobRequest("true","rapidor_realmarketing","abc.csv","payment_collection","none","01/02/2017","15/02/2017","","","")
val request = SparkJobRequest("true","rapidor_realmarketing","/tmp/abc.csv","consolidated_report","none","01/02/2017","15/02/2017","","")
val sparkJobStub:CreateSparkJobBlockingStub = CreateSparkJobGrpc.blockingStub(channel)
val sparkJobStub:CreateSparkJobBlockingStub = CreateSparkJobGrpc.blockingStub(channel)
val response:SparkJobReply = sparkJobStub.createJob(request)
/* val response:SparkJobReply = sparkJobStub.createJob(request)
println("response message--------------->\t"+response.message)
println("response message--------------->\t"+response.submissionId)
val statusResponse:SparkJobReply = sparkJobStub.sparkJobStatus(SparkJobStatusReq(response.message))
val statusResponse:SparkJobStatusReply = sparkJobStub.sparkJobStatus(SparkJobStatusReq(response.submissionId))
println("Status response message--------------->\t"+statusResponse.message)
println("Status response message--------------->\t"+statusResponse.status)
val getResponse:SparkJobResponseReply = sparkJobStub.sparkJobResponse(SparkJobResponseReq(response.submissionId))
println("response message--------------->\t"+getResponse.message + "------>" +getResponse.status)
*/
val getSyncResponse:SparkJobResponseReply = sparkJobStub.createJobAndGetResult(request)
println("response message--------------->\t"+getSyncResponse.message + "------>" +getSyncResponse.status)
}
}
import javax.inject.Inject
import com.example.protos.sparkJob.{CreateSparkJobGrpc, SparkJobReply, SparkJobRequest,SparkJobStatusReq}
import com.example.protos.sparkJob._
import com.example.protos.sparkJob.CreateSparkJobGrpc.CreateSparkJob
import models.Models.SparkJobArgs
import play.api.libs.json._ // JSON library
import play.api.libs.json._
import com.softwaremill.sttp._
import com.softwaremill.sttp.json4s._
import models.Models._
import java.io.File
import akka.actor.{Actor, ActorSystem, Props}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
object CreateSparkJobServer extends TestServerConnect{
class CreateSparkJobServer extends CreateSparkJob {
// val system = ActorSystem("mySystem")
/*
* This method is use to create the spark job in the spark stand alone cluster and it returns the response of
* parameters ----> action:String,message:String,serverSparkVersion:String,submissionId:String,success:Boolean
*/
def createJob(req:SparkJobRequest): Future[SparkJobReply] ={
implicit val backend = HttpURLConnectionBackend()
val sparkJobArgs = SparkJobArgs(action = "CreateSubmissionRequest",appArgs = List(s"${req.debug}",s"${req.dbName}",s"${req.fileName}",s"${req.reportType}",s"${req.instanceDbs}",s"${req.startDate}",s"${req.endDate}",s"${req.reportGenerateType}",s"${req.tallyCompanyName}",s"${req.allowedCatogery}"),
val sparkJobArgs = SparkJobArgs(action = "CreateSubmissionRequest",appArgs = List(s"${req.debug}",s"${req.dbName}",s"${req.fileName}",s"${req.reportType}",s"${req.instanceDbs}",s"${req.startDate}",s"${req.endDate}",s"${req.reportGenerateType}",s"${req.tallyCompanyName}"),
appResource = System.getenv("SPARK_APP_RESOURCE"),clientSparkVersion = "1",
environmentVariables = EnvironmentVariables(SPARK_ENV_LOADED = "1", RAPIDOR_CURRENT_DB_NAME = System.getenv("RAPIDOR_CURRENT_DB_NAME"),
RAPIDOR_DB_HOSTNAME = System.getenv("RAPIDOR_DB_HOSTNAME"),RAPIDOR_DB_PORT = System.getenv("RAPIDOR_DB_PORT"),RAPIDOR_PLANX_DB_PASSWORD = System.getenv("RAPIDOR_PLANX_DB_PASSWORD"),
......@@ -40,27 +50,151 @@ object CreateSparkJobServer extends TestServerConnect{
.send()
println("----1111111---------->"+sparkResponse.body)
println("----1111111---------->"+sparkResponse)
// println("----1111111---------->"+sparkResponse.body)
// println("----1111111---------->"+sparkResponse)
println("-------------->"+req)
backend.close()
Future.successful(SparkJobReply(message = sparkResponse.body.right.get.submissionId))
Future.successful(SparkJobReply(submissionId = sparkResponse.body.right.get.submissionId))
}
def sparkJobStatus(submisionId:SparkJobStatusReq): Future[SparkJobReply] ={
/*
* This method is used to check the status of the given submissionId
* this will give the response of These
* parameters ----> action:String,driverState:String,serverSparkVersion:String,submissionId:String,success:Boolean,workerHostPort:String,workerId:String
* And the driverState are ---> `RUNNING`, `FINISHED`
*/
def sparkJobStatus(submissionId:SparkJobStatusReq): Future[SparkJobStatusReply] ={
implicit val backend = HttpURLConnectionBackend()
val sparkJobStatus = sttp.get(uri"http://192.168.1.50:6066/v1/submissions/status/${submisionId.submitionId}")
val sparkJobStatusRes = sttp.get(uri"http://192.168.1.50:6066/v1/submissions/status/${submissionId.submissionId}")
.response(asJson[StatusResponse])
.send()
println("-------status------>"+sparkJobStatusRes)
backend.close()
Future.successful(SparkJobStatusReply(status = sparkJobStatusRes.body.right.get.driverState))
}
/*
* This method is used to get the response of finished job
* and this will return the stauts is true/false and message
*/
def sparkJobResponse(resArgs:SparkJobResponseReq): Future[SparkJobResponseReply] ={
var responseData = SparkJobResponseReply(status = "",message = "")
implicit val backend = HttpURLConnectionBackend()
val sparkJobResp = sttp.get(uri"http://192.168.1.50:6066/v1/submissions/status/${resArgs.submissionId}")
.response(asJson[StatusResponse])
.send()
println("-------status------>"+sparkJobStatus)
val jobResponse = sparkJobResp.body.right.get
val file = if(resArgs.filePath.contains("/tmp")) resArgs.filePath else "/tmp/"+resArgs.filePath
val fileExist = new File(file).exists()
if(jobResponse.driverState == "FINISHED" && fileExist) {
responseData =SparkJobResponseReply(status = "true",message = "")
}else{
responseData =SparkJobResponseReply(status = "false",message = "getting error when generating the report")
}
backend.close()
Future.successful(SparkJobReply(message = sparkJobStatus.body.right.get.driverState))
Future.successful(responseData)
}
/*
* This method is used to get the synchronous method to get the all information of job and it will get the FINISHED job response
*/
def createJobAndGetResult(reqArgs:SparkJobRequest): Future[SparkJobResponseReply] ={
var sparkJobRes = SparkJobResponseReply(status = "",message = "")
implicit val backend = HttpURLConnectionBackend()
println("inside the syncronous createJobAndGetResult method")
val result:Future[SparkJobResponseReply] = Future {
val sparkJobArgs = SparkJobArgs(action = "CreateSubmissionRequest", appArgs = List(s"${reqArgs.debug}", s"${reqArgs.dbName}", s"${reqArgs.fileName}", s"${reqArgs.reportType}", s"${reqArgs.instanceDbs}", s"${reqArgs.startDate}", s"${reqArgs.endDate}", s"${reqArgs.reportGenerateType}", s"${reqArgs.tallyCompanyName}"),
appResource = System.getenv("SPARK_APP_RESOURCE"), clientSparkVersion = "1",
environmentVariables = EnvironmentVariables(SPARK_ENV_LOADED = "1", RAPIDOR_CURRENT_DB_NAME = System.getenv("RAPIDOR_CURRENT_DB_NAME"),
RAPIDOR_DB_HOSTNAME = System.getenv("RAPIDOR_DB_HOSTNAME"), RAPIDOR_DB_PORT = System.getenv("RAPIDOR_DB_PORT"), RAPIDOR_PLANX_DB_PASSWORD = System.getenv("RAPIDOR_PLANX_DB_PASSWORD"),
RAPIDOR_PLANX_DB_USERNAME = System.getenv("RAPIDOR_PLANX_DB_USERNAME")),
mainClass = "com.acelrtech.Main", sparkProperties = SparkProperties(`spark.app.name` = "MyJob",
`spark.driver.cores` = "6", `spark.eventLog.enabled` = "true", `spark.submit.deployMode` = "cluster", `spark.driver.memory` = "4G",
`spark.executor.cores` = "4", `spark.executor.memory` = "2G"))
val inputJson = Json.toJson(sparkJobArgs)
val payload = Json.stringify(inputJson)
val jobCreateResponse = sttp.contentType("application/json")
.post(uri"http://192.168.1.50:6066/v1/submissions/create")
.body(payload)
.response(asJson[ResponseJson])
.send()
var statusRes=(submissionId:String) => {
implicit val backend = HttpURLConnectionBackend()
val x = sttp.get(uri"http://192.168.1.50:6066/v1/submissions/status/${submissionId}")
.response(asJson[StatusResponse])
.send()
backend.close()
x
}
var sparkJobStatusRes: Response[StatusResponse] = statusRes(jobCreateResponse.body.right.get.submissionId)
var timeShedule = System.currentTimeMillis() + System.getenv("SHEDULE_TIME_FOR_STATUS").toLong
var currentTime = System.currentTimeMillis()
while (sparkJobStatusRes.body.right.get.driverState == "RUNNING" && currentTime <= timeShedule) {
sparkJobStatusRes = statusRes(jobCreateResponse.body.right.get.submissionId)
println(sparkJobStatusRes.body.right.get.submissionId + "---------->" + sparkJobStatusRes.body.right.get.driverState + "-------------->" + currentTime)
Thread.sleep(5000)
currentTime = System.currentTimeMillis()
}
/* val sparkJobActor = system.actorOf(Props[Test],"newActor")
implicit val ec = system.dispatcher
val act = system.scheduler.schedule(5 seconds,30 seconds, sparkJobActor,jobCreateResponse.body.right.get.submissionId)
act*/
val jobResponse = sparkJobStatusRes.body.right.get
val file = if(reqArgs.fileName.contains("/tmp")) reqArgs.fileName else "/tmp/"+reqArgs.fileName
val fileExist = new File(file).exists()
if(jobResponse.driverState == "FINISHED" && fileExist) {
sparkJobRes =SparkJobResponseReply(status = "true",message = "")
}else{
sparkJobRes =SparkJobResponseReply(status = "false",message = "getting error when generating the report")
}
backend.close()
println("---Finished--->")
sparkJobRes
}
//Future.successful(sparkJobRes)
result
}
}
/*class Test extends Actor{
def receive={
case s:String=> sender() ! new CreateSparkJobServer().statusRes(s)
}
}*/
def main(args:Array[String]): Unit ={
val ssd = CreateSparkJobGrpc.bindService(new CreateSparkJobServer, ExecutionContext.global)
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!