添加按批次统计插入数据
This commit is contained in:
parent
c276846a65
commit
2e874375ef
BIN
iniDataForLinux
Executable file
BIN
iniDataForLinux
Executable file
Binary file not shown.
20
main.go
20
main.go
@ -81,6 +81,7 @@ type BatchProcessingInformation struct {
|
|||||||
ID uint `gorm:"primaryKey;autoIncrement"`
|
ID uint `gorm:"primaryKey;autoIncrement"`
|
||||||
CommunicationChannelID string `gorm:"column:communication_channel_id"`
|
CommunicationChannelID string `gorm:"column:communication_channel_id"`
|
||||||
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
|
RepeatTargetsMember int `gorm:"column:repeat_targets_member"`
|
||||||
|
InsertsTargetsMember int `gorm:"column:inserts_targets_member"`
|
||||||
DataFileName string `gorm:"column:data_file_name"`
|
DataFileName string `gorm:"column:data_file_name"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -289,10 +290,11 @@ func downloadDecompression() {
|
|||||||
applogger.Fatalf("Failed to download file: %v", err)
|
applogger.Fatalf("Failed to download file: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
fmt.Println("下载完成(批次文件):" + file.Name())
|
fmt.Printf("下载完成(批次文件):%s \n", file.Name())
|
||||||
applogger.Info("下载完成(批次文件):%d", file.Name())
|
applogger.Info("下载完成(批次文件):%d", file.Name())
|
||||||
batchInsert(file.Name())
|
batchInsert(file.Name())
|
||||||
fmt.Println("入库完成(批次文件)" + file.Name())
|
fmt.Printf("入库完成(批次文件)%s \n", file.Name())
|
||||||
|
|
||||||
applogger.Info("入库完成(批次文件):%d\n", file.Name())
|
applogger.Info("入库完成(批次文件):%d\n", file.Name())
|
||||||
}
|
}
|
||||||
err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记
|
err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记
|
||||||
@ -407,6 +409,7 @@ func batchDataInsert(fileName string) {
|
|||||||
scanner.Scan() // skip first line
|
scanner.Scan() // skip first line
|
||||||
bi := 0
|
bi := 0
|
||||||
duplicateCount := make(map[string]int)
|
duplicateCount := make(map[string]int)
|
||||||
|
insertsCount := make(map[string]int)
|
||||||
var count int
|
var count int
|
||||||
|
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
@ -432,7 +435,6 @@ func batchDataInsert(fileName string) {
|
|||||||
key := fmt.Sprintf("%s-%s", row[2], row[3])
|
key := fmt.Sprintf("%s-%s", row[2], row[3])
|
||||||
if _, exists := hs[key]; exists {
|
if _, exists := hs[key]; exists {
|
||||||
bi++
|
bi++
|
||||||
//filtered = append(filtered, key)
|
|
||||||
// Increment duplicate count
|
// Increment duplicate count
|
||||||
if _, ok := duplicateCount[row[2]]; !ok {
|
if _, ok := duplicateCount[row[2]]; !ok {
|
||||||
duplicateCount[row[2]] = 0
|
duplicateCount[row[2]] = 0
|
||||||
@ -472,6 +474,10 @@ func batchDataInsert(fileName string) {
|
|||||||
dataBatchChan <- dataBatch
|
dataBatchChan <- dataBatch
|
||||||
dataBatch = make([]BatcheData, 0, batchSize)
|
dataBatch = make([]BatcheData, 0, batchSize)
|
||||||
}
|
}
|
||||||
|
if _, ok := insertsCount[row[2]]; !ok {
|
||||||
|
insertsCount[row[2]] = 0
|
||||||
|
}
|
||||||
|
insertsCount[row[2]]++
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -482,7 +488,7 @@ func batchDataInsert(fileName string) {
|
|||||||
|
|
||||||
close(dataBatchChan)
|
close(dataBatchChan)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait() //所有入库全部完成
|
||||||
|
|
||||||
//插入批次处理信息
|
//插入批次处理信息
|
||||||
bpi := []BatchProcessingInformation{}
|
bpi := []BatchProcessingInformation{}
|
||||||
@ -490,6 +496,7 @@ func batchDataInsert(fileName string) {
|
|||||||
bpi = append(bpi, BatchProcessingInformation{
|
bpi = append(bpi, BatchProcessingInformation{
|
||||||
CommunicationChannelID: key,
|
CommunicationChannelID: key,
|
||||||
RepeatTargetsMember: value,
|
RepeatTargetsMember: value,
|
||||||
|
InsertsTargetsMember: insertsCount[key],
|
||||||
DataFileName: fileName,
|
DataFileName: fileName,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -506,6 +513,8 @@ func batchDataInsert(fileName string) {
|
|||||||
}
|
}
|
||||||
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
|
dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//发送提醒邮件
|
||||||
subject := "丝芙兰数据包处理完成"
|
subject := "丝芙兰数据包处理完成"
|
||||||
body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。"
|
body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。"
|
||||||
err = SendEmail(subject, body) //发送邮件
|
err = SendEmail(subject, body) //发送邮件
|
||||||
@ -513,6 +522,7 @@ func batchDataInsert(fileName string) {
|
|||||||
applogger.Info("邮件发送失:%d", err)
|
applogger.Info("邮件发送失:%d", err)
|
||||||
fmt.Print(err)
|
fmt.Print(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
elapsed := time.Since(start)
|
elapsed := time.Since(start)
|
||||||
|
|
||||||
fmt.Printf("批次数据入库函数执行时间:%s;\n插入数:%d条\n过滤数:%d条\n", elapsed, count, bi)
|
fmt.Printf("批次数据入库函数执行时间:%s;\n插入数:%d条\n过滤数:%d条\n", elapsed, count, bi)
|
||||||
@ -547,7 +557,7 @@ func SendEmail(subject string, body string) error {
|
|||||||
smtpPort := 465
|
smtpPort := 465
|
||||||
from := "chejiulong@wemediacn.com"
|
from := "chejiulong@wemediacn.com"
|
||||||
password := "hdQfpav4x8LwbJPH"
|
password := "hdQfpav4x8LwbJPH"
|
||||||
to := []string{"chejiulong@wemediacn.com", "wangyuanbing@wemediacn.com"}
|
to := []string{"chejiulong@wemediacn.com"}
|
||||||
// 邮件内容
|
// 邮件内容
|
||||||
m := gomail.NewMessage()
|
m := gomail.NewMessage()
|
||||||
m.SetHeader("From", from)
|
m.SetHeader("From", from)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user