【技术实践】Spark+SQL作业查询API
Spark SQL作业查询API
1.需求
查询当前用户的所有Spark SQL的执行历史情况。
技术实现:
第一步.首先查询yarn app中,当前用户的 yarn app(name=spark JDBC Server,name=当前用户),得到该app的tracking url,就是spak ui或sparkhistory ui:
第二步.从该ui的API获取作业:
2. WEB UI
YARN app:
运行中的App,Tracking UI http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/:
已结束的app history http://slave5.cluster.local:18081/history/application_1541409376608_0044/jobs/ :
基本类似。
3.YARN API
查看app:
curl http://slave5.cluster.local:8088/ws/v1/cluster/apps?user=hdfs&name="Thrift JDBC/ODBC Server"
name参数不支持:
* state [deprecated] - state of the application
* states - applications matching the given application states, specified as a comma-separated list.
* finalStatus - the final status of the application - reported by the application itself
* user - user name
* queue - queue name
* limit - total number of app objects to be returned
* startedTimeBegin - applications with start time beginning with this time, specified in ms since epoch
* startedTimeEnd - applications with start time ending with this time, specified in ms since epoch
* finishedTimeBegin - applications with finish time beginning with this time, specified in ms since epoch
* finishedTimeEnd - applications with finish time ending with this time, specified in ms since epoch
* applicationTypes - applications matching the given application types, specified as a comma-separated list.
* applicationTags - applications matching any of the given application tags, specified as a comma-separated list.
* deSelects - a generic fields which will be skipped in the result.
而应用的类型也是在代码中写死的:https://github.com/apache/spark/blob/v2.4.0/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
def createApplicationSubmissionContext(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
appContext.setQueue(sparkConf.get(QUEUE_NAME))
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
只能查出来之后再按 name="Thrift JDBC/ODBC Server" 过滤。
查询到的app列表:
http://slave5.cluster.local:8088/proxy/application_1541409376608_0041/
http://slave2:8042/node/containerlogs/container_e82_1541409376608_0041_01_000001/hdfs
4.SPARK API
从yarn app中取得trackingUrl属性,查询spark sql任务。
4.1 查看app信息
历史App的 job:
curl http://slave5.cluster.local:18081/api/v1/applications
[
{
"id": "local-1543304247490",
"name": "Thrift JDBC/ODBC Server",
"attempts": [
{
"startTime": "2018-11-27T07:37:25.240GMT",
"endTime": "2018-11-27T07:37:30.094GMT",
"lastUpdated": "2018-11-27T07:37:30.886GMT",
"duration": 4854,
"sparkUser": "hdfs",
"completed": true,
"appSparkVersion": "2.3.2",
"startTimeEpoch": 1543304245240,
"endTimeEpoch": 1543304250094,
"lastUpdatedEpoch": 1543304250886
}
]
}
当前运行的App的job:
curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications
[ {
"id" : "application_1541409376608_0046",
"name" : "Thrift JDBC/ODBC Server",
"attempts" : [ {
"startTime" : "2018-11-27T07:40:28.712GMT",
"endTime" : "1969-12-31T23:59:59.999GMT",
"lastUpdated" : "2018-11-27T07:40:28.712GMT",
"duration" : 0,
"sparkUser" : "spark",
"completed" : false,
"appSparkVersion" : "2.3.1.3.0.0.0-1634",
"lastUpdatedEpoch" : 1543304428712,
"startTimeEpoch" : 1543304428712,
"endTimeEpoch" : -1
} ]
} ]
注:我们使用了yarn端口的proxy来访问spark ui的API,可以屏蔽后端spark ui的地址。
4.2 查看sql job
job列表:
curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/jobs
[ {
"jobId" : 0,
"name" : "run at AccessController.java:0",
"submissionTime" : "2018-11-27T07:41:50.904GMT",
"completionTime" : "2018-11-27T07:42:31.746GMT",
"stageIds" : [ 0 ],
"jobGroup" : "afd3e9b1-8810-4df8-a25c-12f0fd5e6163",
"status" : "SUCCEEDED",
"numTasks" : 2,
"numActiveTasks" : 0,
"numCompletedTasks" : 2,
"numSkippedTasks" : 0,
"numFailedTasks" : 0,
"numKilledTasks" : 0,
"numCompletedIndices" : 2,
"numActiveStages" : 0,
"numCompletedStages" : 1,
"numSkippedStages" : 0,
"numFailedStages" : 0,
"killedTasksSummary" : { }
} ]
某个job:
curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/jobs/0
{
"jobId" : 0,
"name" : "run at AccessController.java:0",
"submissionTime" : "2018-11-27T07:41:50.904GMT",
"completionTime" : "2018-11-27T07:42:31.746GMT",
"stageIds" : [ 0 ],
"jobGroup" : "afd3e9b1-8810-4df8-a25c-12f0fd5e6163",
"status" : "SUCCEEDED",
"numTasks" : 2,
"numActiveTasks" : 0,
"numCompletedTasks" : 2,
"numSkippedTasks" : 0,
"numFailedTasks" : 0,
"numKilledTasks" : 0,
"numCompletedIndices" : 2,
"numActiveStages" : 0,
"numCompletedStages" : 1,
"numSkippedStages" : 0,
"numFailedStages" : 0,
"killedTasksSummary" : { }
}
job返回的信息中没有执行的sql和时间等信息,需要进一步查询stageIds中的stage数据。一个job可能会保护多个stage,需要汇总多个stage的执行时间:
curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/stage/0
[
{
"status": "COMPLETE",
"stageId": 0,
"attemptId": 0,
"numTasks": 2,
"numActiveTasks": 0,
"numCompleteTasks": 2,
"numFailedTasks": 0,
"numKilledTasks": 0,
"numCompletedIndices": 2,
"executorRunTime": 23209,
"executorCpuTime": 3730752861,
"submissionTime": "2018-11-27T07:41:51.030GMT",
"firstTaskLaunchedTime": "2018-11-27T07:42:07.414GMT",
"completionTime": "2018-11-27T07:42:31.741GMT",
"inputBytes": 8718,
"inputRecords": 500,
"outputBytes": 0,
"outputRecords": 0,
"shuffleReadBytes": 0,
"shuffleReadRecords": 0,
"shuffleWriteBytes": 0,
"shuffleWriteRecords": 0,
"memoryBytesSpilled": 0,
"diskBytesSpilled": 0,
"name": "run at AccessController.java:0",
"description": "select * from spokes",
"details": "org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:171)\njava.security.AccessController.doPrivileged(Native Method)\njavax.security.auth.Subject.doAs(Subject.java:422)\norg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688)\norg.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:185)\njava.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\njava.util.concurrent.FutureTask.run(FutureTask.java:266)\njava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\njava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\njava.lang.Thread.run(Thread.java:745)",
"schedulingPool": "default",
"rddIds": [
4,
3,
2,
0,
1
],
"accumulatorUpdates": [
],
"tasks": {
"0": {
"taskId": 0,
"index": 0,
"attempt": 0,
"launchTime": "2018-11-27T07:42:07.414GMT",
"duration": 23071,
"executorId": "1",
"host": "slave3",
"status": "SUCCESS",
"taskLocality": "NODE_LOCAL",
"speculative": false,
"accumulatorUpdates": [
],
"taskMetrics": {
"executorDeserializeTime": 4366,
"executorDeserializeCpuTime": 426890376,
"executorRunTime": 16589,
"executorCpuTime": 1993215589,
"resultSize": 3787,
"jvmGcTime": 1626,
"resultSerializationTime": 3,
"memoryBytesSpilled": 0,
"diskBytesSpilled": 0,
"peakExecutionMemory": 0,
"inputMetrics": {
"bytesRead": 5812,
"recordsRead": 251
},
"outputMetrics": {
"bytesWritten": 0,
"recordsWritten": 0
},
"shuffleReadMetrics": {
"remoteBlocksFetched": 0,
"localBlocksFetched": 0,
"fetchWaitTime": 0,
"remoteBytesRead": 0,
"remoteBytesReadToDisk": 0,
"localBytesRead": 0,
"recordsRead": 0
},
"shuffleWriteMetrics": {
"bytesWritten": 0,
"writeTime": 0,
"recordsWritten": 0
}
}
},
"1": {
"taskId": 1,
"index": 1,
"attempt": 0,
"launchTime": "2018-11-27T07:42:22.386GMT",
"duration": 9350,
"executorId": "2",
"host": "slave3",
"status": "SUCCESS",
"taskLocality": "NODE_LOCAL",
"speculative": false,
"accumulatorUpdates": [
],
"taskMetrics": {
"executorDeserializeTime": 1449,
"executorDeserializeCpuTime": 369738428,
"executorRunTime": 6620,
"executorCpuTime": 1737537272,
"resultSize": 3770,
"jvmGcTime": 1133,
"resultSerializationTime": 1,
"memoryBytesSpilled": 0,
"diskBytesSpilled": 0,
"peakExecutionMemory": 0,
"inputMetrics": {
"bytesRead": 2906,
"recordsRead": 249
},
"outputMetrics": {
"bytesWritten": 0,
"recordsWritten": 0
},
"shuffleReadMetrics": {
"remoteBlocksFetched": 0,
"localBlocksFetched": 0,
"fetchWaitTime": 0,
"remoteBytesRead": 0,
"remoteBytesReadToDisk": 0,
"localBytesRead": 0,
"recordsRead": 0
},
"shuffleWriteMetrics": {
"bytesWritten": 0,
"writeTime": 0,
"recordsWritten": 0
}
}
}
},
"executorSummary": {
"1": {
"taskTime": 23071,
"failedTasks": 0,
"succeededTasks": 1,
"killedTasks": 0,
"inputBytes": 5812,
"inputRecords": 251,
"outputBytes": 0,
"outputRecords": 0,
"shuffleRead": 0,
"shuffleReadRecords": 0,
"shuffleWrite": 0,
"shuffleWriteRecords": 0,
"memoryBytesSpilled": 0,
"diskBytesSpilled": 0
},
"2": {
"taskTime": 9350,
"failedTasks": 0,
"succeededTasks": 1,
"killedTasks": 0,
"inputBytes": 2906,
"inputRecords": 249,
"outputBytes": 0,
"outputRecords": 0,
"shuffleRead": 0,
"shuffleReadRecords": 0,
"shuffleWrite": 0,
"shuffleWriteRecords": 0,
"memoryBytesSpilled": 0,
"diskBytesSpilled": 0
}
},
"killedTasksSummary": {
}
}
]
作者:贾德星
职务:云服务集团云计算产品中心高级架构师
专业领域:大数据
专家简介:系统软件架构师,具备二十余年一线软件开发的工作经历,经验丰富。主持研发浪潮大数据平台产品云海InsightHD,专注于大数据Hadoop/Spark/流计算/机器学习/深度学习等相关技术组件的研究与应用及组件研发。参与起草信息技术国家标准二十余项,已正式发布12项国家标准。研发并申请9项国家专利获得授权。