Commit 5861c05b by Girish A Nandolkar

spark create job using scala http client is done

1 parent 89948a5b
<component name="libraryTable">
<library name="SBT: com.softwaremill.sttp:core_2.12:1.1.0:jar">
<CLASSES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.softwaremill.sttp/core_2.12/jars/core_2.12-1.1.0.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.softwaremill.sttp/core_2.12/srcs/core_2.12-1.1.0-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="SBT: com.softwaremill.sttp:json4s_2.12:1.1.0:jar">
<CLASSES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.softwaremill.sttp/json4s_2.12/jars/json4s_2.12-1.1.0.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.softwaremill.sttp/json4s_2.12/srcs/json4s_2.12-1.1.0-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="SBT: com.thoughtworks.paranamer:paranamer:2.8:jar">
<CLASSES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.thoughtworks.paranamer/paranamer/bundles/paranamer-2.8.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$USER_HOME$/.ivy2/cache/com.thoughtworks.paranamer/paranamer/srcs/paranamer-2.8-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="SBT: org.json4s:json4s-ast_2.12:3.5.3:jar">
<CLASSES>
<root url="jar://$USER_HOME$/.ivy2/cache/org.json4s/json4s-ast_2.12/jars/json4s-ast_2.12-3.5.3.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$USER_HOME$/.ivy2/cache/org.json4s/json4s-ast_2.12/srcs/json4s-ast_2.12-3.5.3-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="SBT: org.json4s:json4s-core_2.12:3.5.3:jar">
<CLASSES>
<root url="jar://$USER_HOME$/.ivy2/cache/org.json4s/json4s-core_2.12/jars/json4s-core_2.12-3.5.3.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$USER_HOME$/.ivy2/cache/org.json4s/json4s-core_2.12/srcs/json4s-core_2.12-3.5.3-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="SBT: org.json4s:json4s-native_2.12:3.5.3:jar">
<CLASSES>
<root url="jar://$USER_HOME$/.ivy2/cache/org.json4s/json4s-native_2.12/jars/json4s-native_2.12-3.5.3.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$USER_HOME$/.ivy2/cache/org.json4s/json4s-native_2.12/srcs/json4s-native_2.12-3.5.3-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
<component name="libraryTable">
<library name="SBT: org.json4s:json4s-scalap_2.12:3.5.3:jar">
<CLASSES>
<root url="jar://$USER_HOME$/.ivy2/cache/org.json4s/json4s-scalap_2.12/jars/json4s-scalap_2.12-3.5.3.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES>
<root url="jar://$USER_HOME$/.ivy2/cache/org.json4s/json4s-scalap_2.12/srcs/json4s-scalap_2.12-3.5.3-sources.jar!/" />
</SOURCES>
</library>
</component>
\ No newline at end of file
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
</set> </set>
</option> </option>
<option name="sbtVersion" value="0.13.16" /> <option name="sbtVersion" value="0.13.16" />
<option name="useAutoImport" value="true" />
<option name="useOurOwnAutoImport" value="true" />
</SbtProjectSettings> </SbtProjectSettings>
</option> </option>
</component> </component>
......
...@@ -11,6 +11,8 @@ PB.targets in Compile := Seq( ...@@ -11,6 +11,8 @@ PB.targets in Compile := Seq(
libraryDependencies ++= Seq( libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-ws" % "2.6.7", "com.typesafe.play" %% "play-ws" % "2.6.7",
"com.softwaremill.sttp" %% "core" % "1.1.0",
"com.softwaremill.sttp" %% "json4s" % "1.1.0",
"io.grpc" % "grpc-netty" % com.trueaccord.scalapb.compiler.Version.grpcJavaVersion, "io.grpc" % "grpc-netty" % com.trueaccord.scalapb.compiler.Version.grpcJavaVersion,
"com.trueaccord.scalapb" %% "scalapb-runtime" % com.trueaccord.scalapb.compiler.Version.scalapbVersion % "protobuf", "com.trueaccord.scalapb" %% "scalapb-runtime" % com.trueaccord.scalapb.compiler.Version.scalapbVersion % "protobuf",
"com.trueaccord.scalapb" %% "scalapb-runtime-grpc" % com.trueaccord.scalapb.compiler.Version.scalapbVersion "com.trueaccord.scalapb" %% "scalapb-runtime-grpc" % com.trueaccord.scalapb.compiler.Version.scalapbVersion
......
...@@ -17,6 +17,9 @@ object CreateSparkJobClient { ...@@ -17,6 +17,9 @@ object CreateSparkJobClient {
println("response message--------------->\t"+response.message) println("response message--------------->\t"+response.message)
/* val statusResponse:SparkJobReply = sparkJobStub.(request)
println("response message--------------->\t"+statusResponse.message)*/
} }
} }
...@@ -4,58 +4,62 @@ import javax.inject.Inject ...@@ -4,58 +4,62 @@ import javax.inject.Inject
import com.example.protos.sparkJob.{CreateSparkJobGrpc, SparkJobReply, SparkJobRequest} import com.example.protos.sparkJob.{CreateSparkJobGrpc, SparkJobReply, SparkJobRequest}
import com.example.protos.sparkJob.CreateSparkJobGrpc.CreateSparkJob import com.example.protos.sparkJob.CreateSparkJobGrpc.CreateSparkJob
import play.api.libs.ws import models.Models.SparkJobArgs
import play.api.libs.ws._ import play.api.libs.json._ // JSON library
import play.libs.{Json, ws}
import play.libs.ws.WSClient import com.softwaremill.sttp._
import com.softwaremill.sttp.json4s._
import models.Models._
import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.{ExecutionContext, Future}
object CreateSparkJobServer extends TestServerConnect{ object CreateSparkJobServer extends TestServerConnect{
//@Inject()(ws:WSClient)
class CreateSparkJobServer extends CreateSparkJob { class CreateSparkJobServer extends CreateSparkJob {
def createJob(req:SparkJobRequest): Future[SparkJobReply] ={ def createJob(req:SparkJobRequest): Future[SparkJobReply] ={
val strData = s"""{
"action" : "CreateSubmissionRequest", implicit val backend = HttpURLConnectionBackend()
"appArgs" : [s"${req.debug}","${req.dbName}","${req.fileName}","${req.reportType}","${req.instanceDbs}","${req.startDate}","${req.endDate}","${req.reportGenerateType}","${req.tallyCompanyName}","${req.allowedCatogery}"],
"appResource" : "file:/home/girish/mywork/rapidor/rapidorweb/order_system/reports/spark/target/scala-2.11/rapidor-spark_2.11-1.0.jar", val sparkJobArgs = SparkJobArgs(action = "CreateSubmissionRequest",appArgs = List(s"${req.debug}",s"${req.dbName}",s"${req.fileName}",s"${req.reportType}",s"${req.instanceDbs}",s"${req.startDate}",s"${req.endDate}",s"${req.reportGenerateType}",s"${req.tallyCompanyName}",s"${req.allowedCatogery}"),
"clientSparkVersion" : "1.5.0", appResource = System.getenv("SPARK_APP_RESOURCE"),clientSparkVersion = "1",
"environmentVariables" : { environmentVariables = EnvironmentVariables(SPARK_ENV_LOADED = "1", RAPIDOR_CURRENT_DB_NAME = System.getenv("RAPIDOR_CURRENT_DB_NAME"),
"SPARK_ENV_LOADED" : "1", RAPIDOR_DB_HOSTNAME = System.getenv("RAPIDOR_DB_HOSTNAME"),RAPIDOR_DB_PORT = System.getenv("RAPIDOR_DB_PORT"),RAPIDOR_PLANX_DB_PASSWORD = System.getenv("RAPIDOR_PLANX_DB_PASSWORD"),
"RAPIDOR_CURRENT_DB_NAME": "rapidor_testrealmarketing", RAPIDOR_PLANX_DB_USERNAME = System.getenv("RAPIDOR_PLANX_DB_USERNAME")),
"RAPIDOR_DB_HOSTNAME":"localhost", mainClass = "com.acelrtech.Main",sparkProperties = SparkProperties(`spark.app.name` = "MyJob",
"RAPIDOR_DB_PORT" : "5434", `spark.driver.cores` = "6",`spark.eventLog.enabled` = "true",`spark.submit.deployMode` = "cluster", `spark.driver.memory` = "4G",
"RAPIDOR_PLANX_DB_PASSWORD" : "%5Bguessme321%2Bguessme321%5D", `spark.executor.cores` = "4",`spark.executor.memory` = "2G"))
"RAPIDOR_PLANX_DB_USERNAME":"postgres",
"SPARK_CLASSPATH" : "/home/girish/SOFTWARES/postgress_jdbc_jar/postgresql-42.1.4.jre7.jar", val inputJson = Json.toJson(sparkJobArgs)
"SPARK_PATH":"/home/girish/spark/spark-2.1.0-bin-hadoop2.7" val payload = Json.stringify(inputJson)
}, val sparkResponse = sttp.contentType("application/json")
"mainClass" : "com.acelrtech.Main", .post(uri"http://192.168.1.50:6066/v1/submissions/create")
"sparkProperties" : { .body(payload)
"spark.jars" : "file:/home/girish/mywork/rapidor/rapidorweb/order_system/reports/spark/target/scala-2.11/rapidor-spark_2.11-1.0.jar", .response(asJson[ResponseJson])
"spark.driver.supervise" : "true", .send()
"spark.app.name" : "MyJob",
"spark.eventLog.enabled": "true",
"spark.driver.memory":"2G", println("----1111111---------->"+sparkResponse.body)
"spark.driver.cores":6, println("----1111111---------->"+sparkResponse)
"spark.executor.memory":"2G",
"spark.executor.cores":2,
"spark.submit.deployMode" : "cluster",
"spark.master" : "spark://girish:6066"
}
}"""
val data = Json.parse(strData)
// val response = ws.url("http://girish:6066/v1/submissions/create").addHeader("Content-Type","application/json").post(data)
// println("----1111111---------->"+response)
println("-------------->"+req) println("-------------->"+req)
Future.successful(SparkJobReply(message = s"Success fully Done")) backend.close()
Future.successful(SparkJobReply(message = sparkResponse.body.right.get.submissionId))
}
def sparkJobStatus(submisionId:String): Future[SparkJobReply] ={
implicit val backend = HttpURLConnectionBackend()
val sparkJobStatus = sttp.get(uri"http://192.168.1.50:6066/v1/submissions/status/${submisionId}")
.response(asJson[StatusResponse])
.send()
println("-------status------>"+sparkJobStatus)
backend.close()
Future.successful(SparkJobReply(message = ""))
} }
} }
......
package models
import play.api.libs.json.Json
object Models {
case class EnvironmentVariables(SPARK_ENV_LOADED:String, RAPIDOR_CURRENT_DB_NAME:String,RAPIDOR_DB_HOSTNAME:String,RAPIDOR_DB_PORT:String,
RAPIDOR_PLANX_DB_PASSWORD:String,RAPIDOR_PLANX_DB_USERNAME:String)
object EnvironmentVariables {
implicit val format1 = Json.format[EnvironmentVariables]
}
case class SparkProperties(`spark.app.name`:String,`spark.eventLog.enabled`:String,`spark.driver.memory`:String,`spark.driver.cores`:String,
`spark.executor.memory`:String,`spark.executor.cores`:String,`spark.submit.deployMode`:String)
object SparkProperties {
implicit val format2 = Json.format[SparkProperties]
}
import EnvironmentVariables.format1
import SparkProperties.format2
case class SparkJobArgs(action:String,appArgs:List[String],appResource:String,clientSparkVersion:String,
environmentVariables:EnvironmentVariables,mainClass:String,
sparkProperties:SparkProperties)
object SparkJobArgs {
implicit val format3 = Json.format[SparkJobArgs]
}
import SparkJobArgs._
case class ResponseJson(action:String,message:String,serverSparkVersion:String,submissionId:String,success:Boolean)
object ResponseJson{
implicit val format4 = Json.format[ResponseJson]
}
import ResponseJson._
case class StatusResponse(action:String,driverState:String,serverSparkVersion:String,submissionId:String,success:Boolean,workerHostPort:String,workerId:String)
object StatusResponse{
implicit val formatStatus = Json.format[StatusResponse]
}
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!