多语言展示
当前在线:764今日阅读:11今日分享:15

如何开始第一个kafka的生产者程序

本篇将带大家开始自己的第一个kafka生产者程序,推开kafka组件的神秘大门
工具/原料
1

已经搭建好的kafka,zookeeper组件环境

2

eclipse或其他代码编辑器

方法/步骤
1

首先我们需要打开自己的代码编辑器,这里我用的是eclipse来运行的,打开编辑器后我们需要新建一个maven项目,配置好他的pom文件,把相关的kafka生产者jar包下载到我们的本地,这里如果有maven环境会自动下载并引到项目当中的,如果没有maven环境,可以新建java工程,自己下载相关包手动引到项目当中。

2

新建好项目,jar包也成功导进来后,我们随便建一个目录(按个人喜好建),在目录下新建一个生产者类,这样就可以开始编写代码了。

3

下面是我已经编写好的代码了,先得配置好你要连接的kafka与zk的连接地址端口,这里我是用主机名映射的,这里如果你没有在你的本机配置他的映射就需要写他的具体ip了。发送数据到相应的topic我是通过函数传参的形式传进来的,利用for循环发送了10条数据,具体见下图。

4

接下来就可以运行代码,查看效果了,如下图所示,开启一个kafka消费者端,就可以直接消费到我们刚才发送的10条数据了。是不是十分简单呢,你也可以自己动手来尝试感受一下哟。

5

以下是我的生产者完整的demo代码:import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class KafkaProducers { private static Logger logger = LoggerFactory.getLogger(KafkaProducers.class); private static String zkHost='node30:33623,node29:33623,node28:33623'; private static String kafkaHost='node30:19092,node29:19092,node28:19092'; private static String topic = ''; public static void sendKafkaData(String text,String mxbm) { logger.info('********开始********'); KafkaProducers producers = new KafkaProducers(); topic  = mxbm; KafkaProducer kafkaProducer =producers.KafkaSender(zkHost,kafkaHost); try { producers.dojob(kafkaProducer,text); } catch (Exception e) { e.printStackTrace(); } } //生产数据 private void dojob(KafkaProducer producer,String text) throws Exception { long a= System.currentTimeMillis(); for(int i=0;i<10;i++){ sendTopic(topic,''+i,text+i,producer); } long m= System.currentTimeMillis(); System.out.println('耗时:'+(m-a)); producer.flush(); //producer.close(); } public KafkaProducer KafkaSender(String zkhost, String kafkahost){ Properties props = new Properties(); props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); props.put('zookeeper.connect', zkHost); props.put('bootstrap.servers', kafkaHost); props.put('request.required.acks', '1'); props.put('producer.type', 'sync'); props.put('message.send.max.retries', '3'); props.put('compression.codec', 'gzip');  return new KafkaProducer(props); } public void sendTopic(String topic, String key, String value,KafkaProducer procuder){ ProducerRecord record = new ProducerRecord(topic, key, value); try { procuder.send(record); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { sendKafkaData('helloWord', 'test_topic'); }}

注意事项

注意ip映射关系,如果连接不上可以把它映射配置到自己的机器上

推荐信息