03 . Go开发一个日志平台之Elasticsearch使用及kafka消费消息发送到Elasticsearch
时间:2022-07-25
本文章向大家介绍03 . Go开发一个日志平台之Elasticsearch使用及kafka消费消息发送到Elasticsearch,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
Elasticsearch使用
详细使用请看我写的Go操作Elasticsearch专篇
https://www.cnblogs.com/you-men/p/13391265.html
example1
package main
import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
)
var eshost = "http://192.168.43.176:9200"
var client *elastic.Client
type Tyweet struct {
User string
Message string
}
//创建
func main() {
var err error
client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(eshost))
if err !=nil{
fmt.Println("connect es error",err)
}
//使用结构体
tweet := Tyweet{User: "youmen",Message: "Take Five"}
_,err = client.Index().
Index("user").
Type("tweet").
Id("1").
BodyJson(tweet).
Do(context.Background())
if err != nil{
// Handle error
panic(err)
return
}
fmt.Println("Insert index success")
}
example2
package main
import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
)
var client *elastic.Client
var host = "http://192.168.43.176:9200"
type Employee struct {
FirstName string `json:"first_name"`
LastName string `json:"last_name"`
Age int `json:"age"`
About string `json:"about"`
Interests []string `json:"interests"`
}
//初始化
func init() {
//errorlog := log.New(os.Stdout, "APP", log.LstdFlags)
var err error
//这个地方有个小坑 不加上elastic.SetSniff(false) 会连接不上
client, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(host))
if err != nil {
panic(err)
}
_,_,err = client.Ping(host).Do(context.Background())
if err != nil {
panic(err)
}
//fmt.Printf("Elasticsearch returned with code %d and version %sn", code, info.Version.Number)
_,err = client.ElasticsearchVersion(host)
if err != nil {
panic(err)
}
//fmt.Printf("Elasticsearch version %sn", esversion)
}
//创建
func create() {
//使用结构体
e1 := Employee{"Jane", "Smith", 32, "I like to collect rock albums", []string{"music"}}
put1, err := client.Index().
Index("megacorp").
Type("employee").
Id("1").
BodyJson(e1).
Do(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Indexed tweet %s to index s%s, type %sn", put1.Id, put1.Index, put1.Type)
//使用字符串
e2 := `{"first_name":"John","last_name":"Smith","age":25,"about":"I love to go rock climbing","interests":["sports","music"]}`
put2, err := client.Index().
Index("megacorp").
Type("employee").
Id("2").
BodyJson(e2).
Do(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Indexed tweet %s to index s%s, type %sn", put2.Id, put2.Index, put2.Type)
e3 := `{"first_name":"Douglas","last_name":"Fir","age":35,"about":"I like to build cabinets","interests":["forestry"]}`
put3, err := client.Index().
Index("megacorp").
Type("employee").
Id("3").
BodyJson(e3).
Do(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("Indexed tweet %s to index s%s, type %sn", put3.Id, put3.Index, put3.Type)
}
func main() {
create()
}
kafka消费消息发送ES
kafka消费消息
package Initial
import (
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
"time"
)
func Run() (err error) {
partitionList, err := kafkaClient.client.Partitions(kafkaClient.topic)
if err != nil {
logs.Error("Failed to get the list of partitions: ", err)
return
}
for partition := range partitionList {
pc, errRet := kafkaClient.client.ConsumePartition(kafkaClient.topic, int32(partition), sarama.OffsetNewest)
if errRet != nil {
err = errRet
logs.Error("Failed to start consumer for partition %d: %sn", partition, err)
return
}
defer pc.AsyncClose()
go func(pc sarama.PartitionConsumer) {
kafkaClient.wg.Add(1)
for msg := range pc.Messages() {
logs.Debug("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
// 发送日志到es
err = sendToES(kafkaClient.topic,msg.Value)
if err != nil{
logs.Warn("send to es failed, err:%v",err)
}
}
kafkaClient.wg.Done()
}(pc)
}
kafkaClient.wg.Wait()
time.Sleep(time.Hour)
return
}
发送到es
package Initial
import (
"context"
"fmt"
"github.com/olivere/elastic/v7"
)
var esclient *elastic.Client
type LogMessage struct {
App string
Topic string
Message string
}
type Tyweet struct {
User string
Message string
}
//创建
func InitEs(addr string) (err error) {
esclient, err = elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(addr))
if err != nil {
fmt.Println("connect es error", err)
}
return
}
func sendToES(topic string, data []byte) (err error) {
msg := &LogMessage{}
msg.Topic = topic
msg.Message = string(data)
_, err = esclient.Index().
Index(topic).
Type(topic).
//Id(fmt.Sprintf("%d", i)).
BodyJson(msg).
Do(context.Background())
if err != nil {
// Handle error
panic(err)
return
}
return
}
验证数据是否kafka消息被消费并发送到es
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- React Hooks-useTypescript!
- spring-boot项目优雅的http客户端工具,真香!
- 人人都可以学会生存分析(学徒数据挖掘)
- 谈谈const跟Object.freeze()
- Java String类源码阅读笔记
- 别再用JSON配置文件了
- 什么,你一定要基于FPKM标准化表达矩阵做单细胞差异分析
- Tomcat 9最新版安装与使用手册,tomcat更改端口号,tomcat控制台乱码问题解决方法
- Python 技术篇-读取遍历指定路径的文件,区分文件和文件夹
- MySQL蜜罐获取攻击者微信ID
- PyQt5 技巧篇-增加一个类级变量,类级变量的设置方法,类级"常量"设置方法
- PyQt5 技巧篇-按钮隐藏并保留位置,设置按钮的可见度,设置按钮透明度
- PyQt5 技巧篇-复选框绑定行内容,全选、清空、展示选中的内容功能实现演示,设置复选框选中,检查复选框选中状态
- PyQt5 技巧篇-QWidget、Dialog界面固定大小设置
- 力扣:地下城游戏,手把手教你做困难题