Sprak streaming问题总结
0
如果是缺少class:
18/12/11 16:24:29 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka010.KafkaRDDPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
设置conf.setJars
,注意这里需要将jar放入到集群能够访问的地方,例如:hdfs
:
conf.setJars(new String[] {
"hdfs://master:9000/home/jars/kafka-clients-2.0.0.jar",
"hdfs://master:9000/home/jars/spark-streaming-kafka-0-10_2.11-2.4.0.jar"
});
然后提示:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test, partition = 0, offset = 28, CreateTime = 1544517476606, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = d d d))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 1)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
这个是由于ConsumerRecord
这个类没有实现序列化接口导致,设置序列化配置:
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");