Apache Kafka Issues

Good morning, currently I’m having trouble with following through on the tutorial for the Apache Kafka setup in the Maker’s blog. Right now, I wanted to insert this code to replace it in the DumpProcessor java file but currently I’m coming up with the compiling error that the variables in this code are undefined. What steps do I need to take to fix this?

> 
    return new SourceRecord[]{new SourceRecord(null, //sourcePartition
                   null,                //sourceOffset
                   kafkaTopic,          //topic
                   null,                //partition
                   null,                //keySchema
                   mTopic,              //key
                   null,                //valueSchema
                   mMessage.toString(), //value
                   new Long(123L))};    //long timestamp
} 
Best Regards, Keith Springs

Good morning,

unfortunately, I cannot reproduce the described error. I followed the steps in my tutorial and replaced the method in the DumbProcessor class. The variables should be known from the other method “process”.  Here is the complete class code:

package com.evokly.kafka.connect.mqtt.sample;

import com.evokly.kafka.connect.mqtt.MqttMessageProcessor;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * Copyright 2016 Evokly S.A.
 *
 * See LICENSE file for License
 **/
public class DumbProcessor implements MqttMessageProcessor {
    private static final Logger log = LoggerFactory.getLogger(DumbProcessor.class);
    private MqttMessage mMessage;
    private Object mTopic;

    @Override
    public MqttMessageProcessor process(String topic, MqttMessage message) {
        log.debug("processing data for topic: {}; with message {}", topic, message);
        this.mTopic = topic;
        this.mMessage = message;
        return this;
    }

    @Override
    public SourceRecord[] getRecords(String kafkaTopic) {

    return new SourceRecord[]{new SourceRecord(null,                //sourcePartition
                                               null,                //sourceOffset
                                               kafkaTopic,          //topic
                                               null,                //partition
                                               null,                //keySchema
                                               mTopic,              //key
                                               null,                //valueSchema
                                               mMessage.toString(), //value
                                               new Long(123L))};    //long timestamp
    }
}

Hope that helps. 
Did you try to compile the code without the modifications to the discussed part? Probably, this could help to identify the root cause.
Nevertheless, if you need further support don’t hesitate to ask.

Best regards

Nils Hettig