diff --git a/iniDataForLinux b/iniDataForLinux index bf81f0f..6018114 100755 Binary files a/iniDataForLinux and b/iniDataForLinux differ diff --git a/iniDataForMacOs b/iniDataForMacOs index bb609f5..faf9b95 100755 Binary files a/iniDataForMacOs and b/iniDataForMacOs differ diff --git a/main.go b/main.go index a40373f..cc10cc5 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "os" "path" "path/filepath" + "reflect" "regexp" "sort" "strconv" @@ -53,6 +54,7 @@ func init() { applogger.Info(fmt.Sprintf("程序启动,加载%s环境,尝试执行...", env)) go downloadDecompression() // 启动立即执行一次数据下载、处理 go queryBatchState() + //go delData() } func main() { @@ -78,39 +80,78 @@ func main() { case <-notification.C: iniLog() applogger.Info("开始清除历史数据") - // 删除15天前的批次数据 - rowsAffected, err := deleteOldData(&BatcheData{}, 15) - if err != nil { - handleError(err, "删除15天前的批次数据失败") - } else { - applogger.Info(fmt.Sprintf("成功删除 %d 行 15天前的批次数据", rowsAffected)) - } - - // 删除15天前的批次数据重复日志 - rowsAffected, err = deleteOldData(&BatchDataDuplicateLog{}, 15) - if err != nil { - handleError(err, "删除15天前的批次数据重复日志失败") - } else { - applogger.Info(fmt.Sprintf("成功删除 %d 行 15天前的批次数据重复日志", rowsAffected)) - } + delData() notification.Reset(time.Until(nextNotificationTime())) applogger.Info("清除历史数据完成") } } } +// 删除历史数据和历史重复数据 +func delData() { + // 删除15天前的批次数据 + err := deleteOldData(&BatcheData{}, dataExpirationDays) + handleError(err, "删除15天前的批次数据失败") + + // 删除15天前的批次数据重复日志 + err = deleteOldData(&BatchDataDuplicateLog{}, dataExpirationDays) + handleError(err, "删除15天前的批次数据重复日志失败") + +} + // 删除15天前的数据 -func deleteOldData(model interface{}, daysAgo int) (int64, error) { +func deleteOldData(model interface{}, daysAgo int) error { + start := time.Now() + totalRowsAffected := int64(0) db, err := connectToDB() if err != nil { - return 0, fmt.Errorf("连接数据库失败: %w", err) + return fmt.Errorf("连接数据库失败: %w", err) } threshold := time.Now().AddDate(0, 0, -daysAgo) - result := db.Where("created_at < ?", threshold).Delete(model) - if result.Error != nil { - return 0, result.Error + + for { + tx := db.Begin() + defer func() { + if r := recover(); r != nil { + tx.Rollback() + } + }() + if err := tx.Error; err != nil { + return err + } + + var ids []int + result := tx.Model(model).Where("created_at < ?", threshold).Limit(delDataSize).Pluck("id", &ids) + if result.Error != nil { + tx.Rollback() + return result.Error + } + + if len(ids) == 0 { + break + } + + result = tx.Where("id IN (?)", ids).Delete(model) + if result.Error != nil { + tx.Rollback() + return result.Error + } + + if err := tx.Commit().Error; err != nil { + return err + } + + totalRowsAffected += result.RowsAffected + if len(ids) < batchSize { + break + } } - return result.RowsAffected, nil + time.Sleep(time.Second) + elapsed := time.Since(start) + modelName := reflect.TypeOf(model).Elem().Name() + + applogger.Info(fmt.Sprintf("执行删除%v用时%s成功删除数据%d", modelName, elapsed.String(), totalRowsAffected)) + return nil } // 计算下一次执行时间 @@ -1144,6 +1185,8 @@ func iniConfi() { token = "7100477930234217" lastCallPath = "RawData/LastCall" verifySignatureKey = "qZ6v1&H#Wjx+yRm2D@*sJF$tnfL83Ia" + dataExpirationDays = 14 + delDataSize = 60000 case "prod": //fmt.Print("正式环境配置已生效\n") @@ -1154,7 +1197,7 @@ func iniConfi() { sftpUser = "CHN-CRMTOWemedia-wemedia" sftpPassword = "uoWdMHEv39ZFjiOg" sftpDir = "/CN-CRMTOWemedia/SMS" - dbAddress = "rm-bp1cb0x329c1dwid5.mysql.rds.aliyuncs.com" + dbAddress = "rds0yslqyg1iuze8txux545.mysql.rds.aliyuncs.com" dbPort = "3306" dbUser = "sephora" dbPassword = "YfbGJWsFkH4pXgPY" @@ -1172,6 +1215,8 @@ func iniConfi() { token = "7100178600777091" //7100477930234217 lastCallPath = "RawData/LastCall" verifySignatureKey = "Lzxf6asgwFRTsANZ8gkXJnP3EmOyNO#&" + dataExpirationDays = 14 + delDataSize = 60000 default: panic(fmt.Errorf("无效的运行模式: %s", env)) @@ -1259,6 +1304,8 @@ var ( //初始化变量 token string lastCallPath string verifySignatureKey string + dataExpirationDays int + delDataSize int ) type Batch struct {