diff --git a/iniDataForLinux b/iniDataForLinux index 17f12ae..862d81a 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/iniDataForMacOs b/iniDataForMacOs index b07d73e..4fb6086 100755 Binary files a/iniDataForMacOs and b/iniDataForMacOs differ diff --git a/main.go b/main.go index 8760825..d5e6885 100644 --- a/main.go +++ b/main.go @@ -55,6 +55,7 @@ var ( //初始化变量 insertChanSize int //通道缓冲数 goSize int //协程数 taskTime int + to []string ) type Batch struct { @@ -138,6 +139,7 @@ func init() { insertChanSize = 10 //通道缓冲数 goSize = 10 //协程数 taskTime = 1 + to = []string{"chejiulong@wemediacn.com"} case "prod": //fmt.Print("正式环境配置已生效\n") redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" @@ -159,6 +161,7 @@ func init() { insertChanSize = 50 //通道缓冲数 goSize = 10 //协程数 taskTime = 60 + to = []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} default: panic(fmt.Errorf("无效的运行模式: %s", env)) } @@ -201,7 +204,7 @@ func main() { } logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件 applogger.SetOutput(logOutput) - applogger.Info(fmt.Sprintf("程序启动,加载%s环境....", env)) + applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试第1次执行...", env)) go downloadDecompression() // 启动立即执行一次 ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 创建一个定时器 @@ -220,7 +223,7 @@ func main() { logOutput := io.MultiWriter(os.Stdout, logFileHook) // 将日志同时输出到控制台和文件 applogger.SetOutput(logOutput) //fmt.Printf("尝试第%d次执行....\n", tickCount) - applogger.Info(fmt.Sprintf("尝试第%d次执行....", tickCount)) + applogger.Info(fmt.Sprintf("尝试第%d次执行...", tickCount)) go downloadDecompression() // 在新协程中异步执行 tickCount++ } @@ -234,8 +237,7 @@ func downloadDecompression() { // 写入执行中标记 err := redisClient.Set("iniDataStatus", 1, 0).Err() if err != nil { - fmt.Printf("写入任务执行中标记失败:%s\n", err.Error()) - applogger.Info("写入任务执行中标记失败:%v", err) + applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err)) } // Connect to SFTP server sshConfig := &ssh.ClientConfig{ @@ -247,19 +249,16 @@ func downloadDecompression() { } sshClient, err := ssh.Dial("tcp", sftpAddress, sshConfig) if err != nil { - fmt.Printf("写入任务执行中标记失败:%s\n", err.Error()) - applogger.Info("写入任务执行中标记失败:%v", err) + applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err)) } sftpClient, err := sftp.NewClient(sshClient) if err != nil { - fmt.Printf("写入任务执行中标记失败:%s\n", err.Error()) - applogger.Info("写入任务执行中标记失败:%v", err) + applogger.Error(fmt.Sprintf("写入任务执行中标记失败:%v", err)) } defer sftpClient.Close() files, err := sftpClient.ReadDir(sftpDir) if err != nil { - fmt.Printf("sftp目录不存在: 错误信息:%s,\n", err.Error()) - applogger.Info("sftp目录不存在: 错误信息:%v", err) + applogger.Error(fmt.Sprintf("sftp目录不存在: 错误信息:%v", err)) } it := 1 fmt.Printf("共%d个文件\n", len(files)) @@ -270,7 +269,6 @@ func downloadDecompression() { fileKey := fmt.Sprintf("downloaded:%s", file.Name()) if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 { fmt.Println("跳过已处理过的文件:" + file.Name()) - //applogger.Info("跳过已处理过的文件:" + file.Name()) continue } @@ -279,27 +277,27 @@ func downloadDecompression() { // Download file srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { - applogger.Info("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err) + applogger.Error(fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err)) continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, zipPath, file.Name())) if err != nil { - applogger.Info("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err)) continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { - applogger.Info("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err)) continue } //fmt.Printf("%s(数据包)下载完成\n", file.Name()) - applogger.Info(fmt.Sprintf("%s(数据包)下载完成", file.Name())) + applogger.Error(fmt.Sprintf("%s(数据包)下载完成", file.Name())) // Unzip file zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) if err != nil { - applogger.Info("解压文件失败: 文件名:%s,错误信息:%v", file.Name(), err) + applogger.Error(fmt.Sprintf("解压文件失败: 文件名:%s,错误信息:%v", file.Name(), err)) continue } defer zipReader.Close() @@ -310,11 +308,11 @@ func downloadDecompression() { //fmt.Print("系统文件.DS_Store,跳过处理", zipFile.Name) continue } else if filepath.Ext(zipFile.Name) != ".txt" { - fmt.Print("文件类型不正确,跳过处理", zipFile.Name) + applogger.Error(fmt.Sprintf("文件类型不正确,跳过处理: %v", zipFile.Name)) continue } if err != nil || zipFileReader == nil { - applogger.Info("Failed to open zip file: %v", err) + applogger.Error(fmt.Sprintf("Failed to open zip file: %v", err)) fmt.Print("压缩文件处理结束") continue } @@ -322,14 +320,14 @@ func downloadDecompression() { // Create the file unzipFile, err := os.Create(path.Join(executableDir, txtPath, zipFile.Name)) if err != nil { - applogger.Info("创建压缩后的文件失败,文件名:%s,错误信息: %v", zipFile.Name, err) + applogger.Error(fmt.Sprintf("创建压缩后的文件失败,文件名:%s,错误信息: %v", zipFile.Name, err)) continue } defer unzipFile.Close() // Write the unzip data to the file _, err = io.Copy(unzipFile, zipFileReader) if err != nil { - applogger.Info("文件解压失败,文件名:%s,错误信息: %v", zipFileReader, err) + applogger.Error(fmt.Sprintf("文件解压失败,文件名:%s,错误信息: %v", zipFileReader, err)) continue } applogger.Info(fmt.Sprintf("%s(数据包)解压完成", zipFile.Name)) @@ -340,28 +338,28 @@ func downloadDecompression() { } else if filepath.Ext(file.Name()) == ".txt" { srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { - applogger.Info("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err) + applogger.Error(fmt.Sprintf("打开sftp文件失败,文件名:%s,错误信息: %v", file.Name(), err)) continue } defer srcFile.Close() dstFile, err := os.Create(path.Join(executableDir, txtPath, file.Name())) if err != nil { - applogger.Info("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(fmt.Sprintf("创建本地文件失败,文件名: %s,错误信息:%v", file.Name(), err)) continue } defer dstFile.Close() if _, err := io.Copy(dstFile, srcFile); err != nil { - applogger.Info("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) + applogger.Error(fmt.Sprintf("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err)) continue } - fmt.Printf("%s(批次文件)下载完成 \n", file.Name()) + //fmt.Printf("%s(批次文件)下载完成 \n", file.Name()) applogger.Info(fmt.Sprintf("%s(批次文件)下载完成", file.Name())) batchInsert(file.Name()) } err = redisClient.Set(fileKey, 1, 0).Err() //写入下载、解压、入库完成标记 if err != nil { //fmt.Printf("写入文件处理完成标记失败文件名:%s,错误信息:%v\n", file.Name(), err) - applogger.Info("写入文件处理完成标记失败文件名:%s,错误信息:v%\n", file.Name(), err) + applogger.Info(fmt.Sprintf("写入文件处理完成标记失败文件名:%s,错误信息:v%\n", file.Name(), err)) } } redisClient.Del("iniDataStatus") //删除任务执行中标记 @@ -376,8 +374,7 @@ func batchInsert(fileName string) { db, _ := connectToDB() file, err := os.Open(path.Join(executableDir, txtPath, fileName)) if err != nil { - //fmt.Printf("文件打开失败文件名:%s,错误信息%v\n", fileName, err) - applogger.Info("文件打开失败文件名:%s,错误信息%v", fileName, err) + applogger.Error(fmt.Sprintf("文件打开失败文件名:%s,错误信息%v", fileName, err)) } else { defer file.Close() reader := csv.NewReader(bufio.NewReader(file)) @@ -410,9 +407,7 @@ func batchInsert(fileName string) { subject := "丝芙兰批次文件处理完成" body := "批次文件:" + fileName + ";\n批次数:" + strconv.Itoa(batchRows) + ";\n处理完成,请前往管理平台查看处理。" SendEmail(subject, body) //发送邮件 - //fmt.Printf("%s(批次文件)入库完成 \n", fileName) applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName)) - //fmt.Printf("%s(批次文件)执行时间%s 插入批次次数:%d\n\n\n", fileName, elapsed, batchRows) applogger.Info(fmt.Sprintf("%s(批次文件)执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) } } @@ -422,8 +417,7 @@ func batchDataInsert(fileName string) { // Open file file, err := os.Open(path.Join(executableDir, txtPath, fileName)) if err != nil { - //fmt.Printf("文件打开失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info(fmt.Sprintf("文件打开失败,文件名:%s,错误信息%v", fileName, err)) + applogger.Error(fmt.Sprintf("文件打开失败,文件名:%s,错误信息%v", fileName, err)) } else { defer file.Close() db, _ := connectToDB() @@ -471,8 +465,7 @@ func batchDataInsert(fileName string) { line := scanner.Text() row, err := csv.NewReader(strings.NewReader(line)).Read() if err != nil { - //fmt.Printf("文件按行读取失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info(fmt.Sprintf("文件按行读取失败,文件名:%s,错误信息%v", fileName, err)) + applogger.Error(fmt.Sprintf("文件按行读取失败,文件名:%s,错误信息%v", fileName, err)) continue } reservedFields := map[string]string{ //合并个性化字段 @@ -485,8 +478,7 @@ func batchDataInsert(fileName string) { } reservedFieldsJson, err := json.Marshal(reservedFields) //个性化字段转json if err != nil { - //fmt.Printf("个性化字段合并失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info(fmt.Sprintf("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err)) + applogger.Error(fmt.Sprintf("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err)) continue } if _, ok := duplicateCount[row[2]]; !ok { @@ -507,8 +499,7 @@ func batchDataInsert(fileName string) { if len(dataBatchDuplicate) >= batchSize { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { - //fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) + applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) } else { dataBatchDuplicate = make([]BatchDataDuplicateLog, 0, batchSize) } @@ -561,16 +552,14 @@ func batchDataInsert(fileName string) { } err = db.CreateInBatches(bpi, insertSize).Error if err != nil { - //fmt.Printf("插入批次处理信息失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err)) + applogger.Error(fmt.Sprintf("插入批次处理信息失败,文件名:%s,错误信息%v", fileName, err)) } //插入批此重复数据 if len(dataBatchDuplicate) > 0 { err = db.CreateInBatches(dataBatchDuplicate, insertSize).Error if err != nil { - //fmt.Printf("插入重复数据失败,文件名:%s,错误信息%v\n", fileName, err) - applogger.Info(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) + applogger.Error(fmt.Sprintf("插入重复数据失败,文件名:%s,错误信息%v", fileName, err)) } else { dataBatchDuplicate = nil } @@ -582,9 +571,7 @@ func batchDataInsert(fileName string) { subject := "丝芙兰数据包处理完成" body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。" SendEmail(subject, body) //发送邮件 - applogger.Info(fmt.Sprintf("%s(数据包) ,入库完成", fileName)) - //fmt.Printf("%s(数据包) 入库完成\n", fileName) - //fmt.Printf("%s(数据包) 执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", fileName, elapsed, count, bi) + applogger.Info(fmt.Sprintf(fmt.Sprintf("%s(数据包) ,入库完成", fileName))) applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi)) } } @@ -603,8 +590,7 @@ func connectToDB() (*gorm.DB, error) { break } if attempt >= maxAttempts { - fmt.Printf("数据库连接失败,错误信息%v\n", err) - applogger.Info("数据库连接失败,错误信息%v", err) + applogger.Error(fmt.Sprintf("数据库连接失败,错误信息%v", err)) return nil, err } time.Sleep(backoff) @@ -620,8 +606,6 @@ func SendEmail(subject string, body string) error { smtpPort := 465 from := "auto_system@wemediacn.com" password := "yJJYYcy5NKx2y69r" - to := []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} - //to := []string{"chejiulong@wemediacn.com"} // 邮件内容 m := gomail.NewMessage() @@ -634,8 +618,7 @@ func SendEmail(subject string, body string) error { d.TLSConfig = &tls.Config{InsecureSkipVerify: true} err := d.DialAndSend(m) if err != nil { - fmt.Printf("邮件发送失败,错误信息%v\n", err) - applogger.Info("邮件发送失败,错误信息%v", err) + applogger.Warn(fmt.Sprintf("邮件发送失败,错误信息%v", err)) } return nil }