diff --git a/iniData b/iniData.exe similarity index 63% rename from iniData rename to iniData.exe index ed02175..ef97ab0 100755 Binary files a/iniData and b/iniData.exe differ diff --git a/iniDataForLinux b/iniDataForLinux index 2b1025e..8e8c8a7 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/main.go b/main.go index 32b08bd..9e7967c 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "io" "log" "os" + "os/signal" "path" "path/filepath" "strconv" @@ -29,7 +30,9 @@ import ( "gorm.io/gorm/logger" ) +/* var ( + applogger *logrus.Logger redisClient *redis.Client executableDir string @@ -39,6 +42,7 @@ var ( sftpAddress = "192.168.10.86:49156" sftpUser = "demo" sftpPassword = "demo" + sftpDir = "/sftp/test" dbAddress = "192.168.10.18:1433" dbUser = "sa" dbPassword = "Aa123123" @@ -50,6 +54,30 @@ var ( insertSize = 500 //一次性入库 cSize = 10 //入库协程数 +) +*/ +var ( //正式环境 + applogger *logrus.Logger + redisClient *redis.Client + executableDir string + redisAddress = "r-bp11564d96842414128.redis.rds.aliyuncs.com:6379" + redisPassword = "3Nsb4Pmsl9bcLs24mL12l" + redisDB = 233 + sftpAddress = "10.11.0.63:22" + sftpUser = "wyeth2018" + sftpPassword = "nYydNHOu" + sftpDir = "/wyeth2018/sephora_sms" + dbAddress = "rm-bp16l424ln96q1ouk.sqlserver.rds.aliyuncs.com:3433" + dbUser = "sephora_sms" + dbPassword = "5ORiiLmgkniC0EqF" + dbName = "sephora_sms" + zipPath = "RawData/Zip/" + txtPath = "RawData/Txt/" + logPath = "logs/" + batchSize = 5000 //提交数据 + insertSize = 500 //一次性入库 + cSize = 10 //入库协程数 + ) type Batch struct { @@ -138,38 +166,42 @@ func init() { } func main() { - // 打开一个文件作为锁 - lockFile, err := os.OpenFile(".lock", os.O_CREATE|os.O_RDWR, 0666) - if err != nil { - fmt.Println("打开锁文件失败:", err) - os.Exit(1) - } - defer lockFile.Close() - // 尝试获取文件的独占锁 - err = syscall.Flock(int(lockFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) - if err != nil { - fmt.Println("程序已经在运行,本程序无法同时运行多个") - os.Exit(1) - } + // 创建一个channel用于接收信号 + signalChan := make(chan os.Signal, 1) + + // 注册SIGINT, SIGTERM信号处理函数 + signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) + + // 启动goroutine等待信号的到来 + go func() { + for { + select { + case <-signalChan: + // 当信号到来时,执行特定的代码 + fmt.Println("程序已经在运行,本程序无法同时运行多个") + os.Exit(1) + } + } + }() ticker := time.NewTicker(time.Minute) // 创建一个定时器,每隔一分钟触发一次任务函数 - //tickCount := 1 //记录循环次数 - defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 + tickCount := 1 //记录循环次数 + defer ticker.Stop() // 延迟关闭定时器,确保所有的定时器事件都被处理完毕 // 循环处理任务 for { select { case <-ticker.C: // 定时器触发时执行的任务函数 - //fmt.Printf("尝试第%d执行....\n", tickCount) + fmt.Printf("尝试第%d执行....\n", tickCount) go downloadDecompression() // 在新协程中异步执行 - //tickCount++ + tickCount++ } } } func downloadDecompression() { if exists, _ := redisClient.Exists("iniDataStatus").Result(); exists == 1 { - //fmt.Println("批次执行中,跳过本次") + fmt.Println("批次执行中,跳过本次") } else { // 写入执行中标记 err := redisClient.Set("iniDataStatus", 1, 0).Err() @@ -193,15 +225,15 @@ func downloadDecompression() { panic(err) } defer sftpClient.Close() - files, err := sftpClient.ReadDir("/sftp/test") + files, err := sftpClient.ReadDir(sftpDir) if err != nil { - applogger.Fatalf("Failed to read SFTP directory: %v", err) + applogger.Fatalf("sftp目录不存在: %v", err) } - //it := 1 - //fmt.Printf("共%d个文件\n", len(files)) + it := 1 + fmt.Printf("共%d个文件\n", len(files)) for _, file := range files { - //fmt.Printf("第%d个文件\n", it) - //it++ + fmt.Printf("第%d个文件\n", it) + it++ // Check if file has been downloaded before fileKey := fmt.Sprintf("downloaded:%s", file.Name()) if exists, _ := redisClient.Exists(fileKey).Result(); exists == 1 { @@ -211,7 +243,7 @@ func downloadDecompression() { if filepath.Ext(file.Name()) == ".zip" { //fmt.Println("下载开始(数据包):" + file.Name()) // Download file - srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name())) + srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { applogger.Fatalf("Failed to download file: %v", err) continue @@ -274,9 +306,9 @@ func downloadDecompression() { fmt.Print("入库完成(数据包)::" + zipFile.Name) } } else if filepath.Ext(file.Name()) == ".txt" { - srcFile, err := sftpClient.Open(path.Join("/sftp/test", file.Name())) + srcFile, err := sftpClient.Open(path.Join(sftpDir, file.Name())) if err != nil { - applogger.Fatalf("Failed to download file: %v", err) + applogger.Fatalf("sftp目录不存在: %v", err) continue } defer srcFile.Close() @@ -484,6 +516,7 @@ func batchDataInsert(fileName string) { if len(dataBatch) > 0 { dataBatchChan <- dataBatch dataBatch = make([]BatcheData, 0, batchSize) + fmt.Print("文件读取完成,最后一批提交至通道") } close(dataBatchChan) @@ -535,9 +568,9 @@ func connectToDB() (*gorm.DB, error) { attempt := 1 maxAttempts := 5 backoff := time.Second - logger := logger.Default.LogMode(logger.Silent) + logger := logger.Default.LogMode(logger.Error) //Silent、Error、Warn、Info for { - db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger}) + db, err = gorm.Open(sqlserver.Open(dsn), &gorm.Config{Logger: logger, SkipDefaultTransaction: true}) if err == nil { break }