diff --git a/iniDataForLinux b/iniDataForLinux index 6992c85..3f6b8d6 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/iniData b/iniDataForMacOs similarity index 66% rename from iniData rename to iniDataForMacOs index 69b8f44..6784c81 100755 Binary files a/iniData and b/iniDataForMacOs differ diff --git a/main.go b/main.go index 7664275..867a2d8 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/csv" "encoding/json" + "flag" "fmt" "io" "log" @@ -30,47 +31,30 @@ import ( "gorm.io/gorm/logger" ) -/* -var ( - applogger *logrus.Logger - redisClient *redis.Client - executableDir string - redisAddress = "mysql5.weu.me:6379" - redisPassword = "" - redisDB = 1 - sftpAddress = "192.168.10.86:49156" - sftpUser = "demo" - sftpPassword = "demo" - sftpDir = "/sftp/test" - dbAddress = "192.168.10.18:1433" - dbUser = "sa" - dbPassword = "Aa123123" - dbName = "sephora" - zipPath = "RawData/Zip/" - txtPath = "RawData/Txt/" - logPath = "logs/" -) -*/ - -var ( //正式环境 - - applogger *logrus.Logger - redisClient *redis.Client - executableDir string - redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" - redisPassword = "3Nsb4Pmsl9bcLs24mL12l" - redisDB = 233 - sftpAddress = "esftp.sephora.com.cn:20981" - sftpUser = "CHN-CRMTOWemedia-wemedia" - sftpPassword = "uoWdMHEv39ZFjiOg" - sftpDir = "/CN-CRMTOWemedia/SMS" - dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433" - dbUser = "sephora_sms" - dbPassword = "5ORiiLmgkniC0EqF" - dbName = "sephora_sms" - zipPath = "RawData/Zip/" - txtPath = "RawData/Txt/" - logPath = "logs/" +var ( //初始化变量 + env string + applogger *logrus.Logger + redisClient *redis.Client + executableDir string + redisAddress string + redisPassword string + redisDB int + sftpAddress string + sftpUser string + sftpPassword string + sftpDir string + dbAddress string + dbUser string + dbPassword string + dbName string + zipPath string + txtPath string + logPath string + batchSize int //提交数据 + insertSize int //一次性入库 + insertChanSize int //通道缓冲数 + goSize int //协程数 + taskTime int ) type Batch struct { @@ -137,6 +121,55 @@ func init() { log.Fatal(err) } + flag.StringVar(&env, "env", "dev", "运行模式") + flag.Parse() + switch env { + case "dev": + fmt.Print("测试环境配置以生效\n") + redisAddress = "mysql5.weu.me:6379" + redisPassword = "" + redisDB = 1 + sftpAddress = "192.168.10.86:49156" + sftpUser = "demo" + sftpPassword = "demo" + sftpDir = "/sftp/test" + dbAddress = "192.168.10.18:1433" + dbUser = "sa" + dbPassword = "Aa123123" + dbName = "sephora" + zipPath = "RawData/Zip/" + txtPath = "RawData/Txt/" + logPath = "logs/" + batchSize = 5000 //提交数据 + insertSize = 500 //一次性入库 + insertChanSize = 10 //通道缓冲数 + goSize = 10 //协程数 + taskTime = 1 + case "prod": + fmt.Print("正式环境配置以生效\n") + redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" + redisPassword = "3Nsb4Pmsl9bcLs24mL12l" + redisDB = 233 + sftpAddress = "esftp.sephora.com.cn:20981" + sftpUser = "CHN-CRMTOWemedia-wemedia" + sftpPassword = "uoWdMHEv39ZFjiOg" + sftpDir = "/CN-CRMTOWemedia/SMS" + dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433" + dbUser = "sephora_sms" + dbPassword = "5ORiiLmgkniC0EqF" + dbName = "sephora_sms" + zipPath = "RawData/Zip/" + txtPath = "RawData/Txt/" + logPath = "logs/" + batchSize = 5000 //提交数据 + insertSize = 500 //一次性入库 + insertChanSize = 50 //通道缓冲数 + goSize = 150 //协程数 + taskTime = 60 + default: + panic(fmt.Errorf("无效的运行模式: %s", env)) + } + redisClient = redis.NewClient(&redis.Options{ Addr: redisAddress, Password: redisPassword, @@ -160,7 +193,7 @@ func main() { } applogger = logrus.New() logPath := filepath.Join(executableDir, "logs") - logFileName := "sms_processing_" + time.Now().Format("2006-01-02") + ".log" + logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log" logFileHook := &lumberjack.Logger{ Filename: filepath.Join(logPath, logFileName), } @@ -168,18 +201,21 @@ func main() { go downloadDecompression() // 启动立即执行一次 - ticker := time.NewTicker(60 * time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数 - tickCount := 1 //记录循环次数 - defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 + ticker := time.NewTicker(time.Duration(taskTime) * time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数 + tickCount := 2 //记录循环次数 + defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 // 循环处理任务 for { select { case <-ticker.C: // 定时器触发时执行的任务函数 - logFileName := "sms_processing_" + time.Now().Format("2006-01-02") + ".log" - logFileHook.Filename = filepath.Join(logPath, logFileName) - fmt.Printf("尝试第%d此执行....\n", tickCount) - applogger.Info(fmt.Sprintf("尝试第%d此执行....\n", tickCount)) + logFileName := "sms_processing_" + time.Now().Format("2006_01_02") + ".log" + logFileHook := &lumberjack.Logger{ + Filename: filepath.Join(logPath, logFileName), + } + applogger.SetOutput(logFileHook) + fmt.Printf("尝试第%d次执行....\n", tickCount) + applogger.Info(fmt.Sprintf("尝试第%d次执行....", tickCount)) go downloadDecompression() // 在新协程中异步执行 tickCount++ } @@ -252,8 +288,8 @@ func downloadDecompression() { applogger.Info("下载文件失败,文件名: %s,错误信息:%v", file.Name(), err) continue } - fmt.Printf("%s(数据包):下载完成\n", file.Name()) - applogger.Info(fmt.Sprintf("%s(数据包):下载完成", file.Name())) + fmt.Printf("%s(数据包)下载完成\n", file.Name()) + applogger.Info(fmt.Sprintf("%s(数据包)下载完成", file.Name())) // Unzip file zipReader, err := zip.OpenReader(path.Join(executableDir, zipPath, file.Name())) if err != nil { @@ -370,8 +406,8 @@ func batchInsert(fileName string) { SendEmail(subject, body) //发送邮件 fmt.Printf("%s(批次文件)入库完成 \n", fileName) applogger.Info(fmt.Sprintf("%s(批次文件)入库完成", fileName)) - fmt.Printf("执行时间%s 插入批次次数:%d\n\n\n", elapsed, batchRows) - applogger.Info(fmt.Sprintf("执行时间%s 插入批次次数:%d", elapsed, batchRows)) + fmt.Printf("%s(批次文件)执行时间%s 插入批次次数:%d\n\n\n", fileName, elapsed, batchRows) + applogger.Info(fmt.Sprintf("%s(批次文件)执行时间%s 插入批次次数:%d", fileName, elapsed, batchRows)) } } @@ -386,10 +422,6 @@ func batchDataInsert(fileName string) { defer file.Close() db, _ := connectToDB() // Insert data in batches using multiple goroutines - batchSize := 5000 //提交数据 - insertSize := 500 //一次性入库 - insertChanSize := 50 //通道缓冲数 - goSize := 150 //协程数 var wg sync.WaitGroup dataBatchChan := make(chan []BatcheData, insertChanSize) for i := 0; i < goSize; i++ { @@ -451,14 +483,14 @@ func batchDataInsert(fileName string) { applogger.Info("个性化字段合并失败,文件名:%s,错误信息%v", fileName, err) continue } + if _, ok := duplicateCount[row[2]]; !ok { + duplicateCount[row[2]] = 0 + } // Check if record exists in hashset key := fmt.Sprintf("%s-%s", row[2], row[3]) if _, exists := hs[key]; exists { //如果批次数据重复 bi++ // Increment duplicate count - if _, ok := duplicateCount[row[2]]; !ok { - duplicateCount[row[2]] = 0 - } duplicateCount[row[2]]++ dataBatchDuplicate = append(dataBatchDuplicate, BatchDataDuplicateLog{ CommunicationChannelID: row[2], @@ -544,10 +576,10 @@ func batchDataInsert(fileName string) { subject := "丝芙兰数据包处理完成" body := "数据包:" + fileName + ";\n总数:" + strconv.Itoa(count+bi) + ";\n过滤重复数:" + strconv.Itoa(bi) + ";\n过滤后总数:" + strconv.Itoa(count) + ";\n处理完成,请前往管理平台查看处理。" SendEmail(subject, body) //发送邮件 - applogger.Info(fmt.Sprintf(":%s(数据包) ,入库完成", fileName)) + applogger.Info(fmt.Sprintf("%s(数据包) ,入库完成", fileName)) fmt.Printf("%s(数据包) 入库完成\n", fileName) - fmt.Printf("执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", elapsed, count, bi) - applogger.Info(fmt.Sprintf("执行时间:%s 插入数据:%d条 过滤数数:%d条", elapsed, count, bi)) + fmt.Printf("%s(数据包) 执行时间:%s 插入数据:%d 过滤数:%d\n\n\n", fileName, elapsed, count, bi) + applogger.Info(fmt.Sprintf("%s(数据包)执行时间:%s 插入数据:%d条 过滤数数:%d条", fileName, elapsed, count, bi)) } } @@ -582,8 +614,8 @@ func SendEmail(subject string, body string) error { smtpPort := 465 from := "auto_system@wemediacn.com" password := "yJJYYcy5NKx2y69r" - //to := []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} - to := []string{"chejiulong@wemediacn.com"} + to := []string{"chejiulong@wemediacn.com", "xiayujuan@wemediacn.com", "wangyuanbing@wemediacn.com", "tangweiqi@wemediacn.com"} + //to := []string{"chejiulong@wemediacn.com"} // 邮件内容 m := gomail.NewMessage()