TransWikia.com

Spark and Spark streaming output difference for the same job

Stack Overflow Asked by Guru on November 13, 2020

I am doing some POC with Spark and Spark streaming for my project. So all I am doing is reading a file name from Topic. Downloading the file from a "src/main/sresource" and executing the usual "WordCount" frequency application.

 
@KafkaListener(topics = Constants.ABCWordTopic, groupId = Constants.ABC_WORD_COMSUMER_GROUP_ID) 
public void processTask(@Payload String fileResourcePath) {
        log.info("ABC Receiving task from WordProducer filepath {} at time {}", fileResourcePath,
                LocalDateTime.now());
        // Spark job
        /*
         * JavaRDD wordRDD =
         * sparkContext.parallelize(Arrays.asList(extractFile(fileResourcePath).split(" ")));
         * log.info("ABC Map Contents : {}", wordRDD.countByValue().toString());
         * wordRDD.coalesce(1,
         * true).saveAsTextFile("ResultSparklog_"+ System.currentTimeMillis());
         */
        // Spark Streaming job
        JavaPairDStream wordPairStream = streamingContext
                .textFileStream(extractFile(fileResourcePath))
                .flatMap(line -> Arrays.asList(SPACE.split(line)).iterator())
                .mapToPair(s -> new Tuple2(s, 1)).reduceByKey((i1, i2) -> i1 + i2);
        wordPairStream.foreachRDD(wordRDD -> {
        //  javaFunctions(wordTempRDD).writerBuilder("vocabulary", "words", mapToRow(String.class))
        //                  .saveToCassandra();
            log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString());
            wordRDD.coalesce(1, true)
                    .saveAsTextFile("SparkStreamResultlog_" + System.currentTimeMillis());
        });
        streamingContext.start();
        try {
            streamingContext.awaitTerminationOrTimeout(-1);
        } catch (InterruptedException e) {
            log.error("Terminated streaming context {}", e);
        }
    }

  • In the above code I am listening from a Kafka Topic("ABCtopic") and
    processing it. The "Spark job" commented code works perfectly fine.
    It counts the word and gives the results as expected, however "spark
    streaming job" code does not behave as expected and it outputs null.
  • The line log.info("ABC Map Contents : {}", wordRDD.keys().countByValue().toString()); gives '{}' as output.
    Writing to a file is empty. Being new to Spark streaming from what
    little in know "Spark streaming" is an additional library for
    continuously processing data at real time from any source like a
    file, topic etc.
  • What is missing in the above code for the spark streaming to output
    'null' in the highlighted log line and also in the output data file
    which is being written to disk whereas the Spark job does the same
    job perfectly fine.

One Answer

Adding his answer for other folks who may be stuck on this point. From the first glance it looks like it should work, however reading the documentation on Spark here is the conclusion.
The "streamingContext.textFileStream(..)" API does not read static content from any directory. Therefore, it is unable to read the files from the directory or rather process it. It is meant to read streaming data therefore data has to be added or updated in the monitoring directory. Therefore a quick hack from what I have read on the web is to move the files or update the files into the windows directory (Iam using windows 10) once the program execution has begun (i.e StreamingContext.start has executed).
Please note that I was NOT able to get it to execute even after I tried all those hacks, but given that this is not supposedly the right usecase for streaming (reading from a folder and processing can be easily achieved with Spark job which is what my code demonstrated) I have to leave it at that.

Answered by Guru on November 13, 2020

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP