task.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "fmt"
  6. "log"
  7. "strconv"
  8. "strings"
  9. "xiaoniaokuaiyan.com/xiaoniao/config"
  10. "xiaoniaokuaiyan.com/xiaoniao/constants"
  11. "time"
  12. "xiaoniaokuaiyan.com/xiaoniao/entity"
  13. "xiaoniaokuaiyan.com/xiaoniao/util"
  14. "github.com/jmoiron/sqlx"
  15. "gopkg.in/redis.v2"
  16. )
  17. //type DetectItem struct {
  18. // Name string `db:"name" json:"name"`
  19. // //MarketPrice float32 `db:"market_price" json:"marketPrice"`
  20. //}
  21. //
  22. //type ProductDB struct {
  23. // Id string `db:"id" json:"id"`
  24. // Name string `db:"name" json:"name"`
  25. // Price int `db:"price" json:"price"`
  26. // IsRecommend int `db:"is_recommend" json:"isRecommend"`
  27. // Tags string `db:"tags" json:"tags"`
  28. // Picture string `db:"picture" json:"picture"`
  29. // Items []string `db:"-" json:"items"`
  30. //}
  31. func main() {
  32. var (
  33. client *redis.Client
  34. nowStr string
  35. orderPayKey = "order_unpay"
  36. ticker = time.NewTicker(time.Second)
  37. db *sqlx.DB
  38. oid string
  39. timeStr string
  40. mId int64
  41. delaySec int
  42. )
  43. fs := flag.NewFlagSet("", flag.ExitOnError)
  44. configFile := fs.String("f", "config.ini", "app config file")
  45. config.Reload(*configFile)
  46. defer ticker.Stop()
  47. db = util.GetWriteSqlDB()
  48. strSql := "select id, created_at, m_id, delay_deadtime from t_order where status = 1"
  49. rows, err := db.Query(strSql)
  50. if err != nil {
  51. log.Fatal(err)
  52. }
  53. client = util.GetRedis()
  54. client.Select(12)
  55. for rows.Next() {
  56. rows.Scan(&oid, &timeStr, &mId, &delaySec)
  57. client.HSet(orderPayKey, oid, fmt.Sprintf("%s;%d;%d", timeStr, mId, delaySec))
  58. }
  59. var delayTimeSecs int64
  60. for {
  61. select {
  62. case <-ticker.C:
  63. client = util.GetRedis()
  64. client.Select(12)
  65. msCmd := client.HGetAllMap(orderPayKey)
  66. if err := msCmd.Err(); err != nil {
  67. log.Panicln(err)
  68. }
  69. kvs := msCmd.Val()
  70. for key, val := range kvs {
  71. temp := strings.Split(val, ";")
  72. if len(temp) < 2 {
  73. continue
  74. }
  75. delayTimeSecs = int64(time.Second) * 30 * 60
  76. if len(temp) == 3 {
  77. delaySec, err = strconv.Atoi(temp[2])
  78. if err != nil {
  79. log.Println(err, val)
  80. continue
  81. }
  82. delayTimeSecs = int64(time.Second) * int64(delaySec)
  83. }
  84. nowStr = time.Now().Add(-time.Duration(delayTimeSecs)).Format("2006-01-02 15:04:05")
  85. if temp[0] < nowStr {
  86. tx, _ := db.Begin()
  87. //todo set order status to unused
  88. sr, _ := tx.Exec("update t_order set status = 9, m_id = 0 where id = ? and status = 1", key)
  89. if er, _ := sr.RowsAffected(); er <= 0 {
  90. log.Println("update order status of ", key, " failed")
  91. client.HDel(orderPayKey, key)
  92. tx.Rollback()
  93. continue
  94. }
  95. if temp[1] != "0" {
  96. sr, _ = tx.Exec("update t_producer_info set remain_num = remain_num + 1 where id = ?", temp[1])
  97. if er, _ := sr.RowsAffected(); er <= 0 {
  98. log.Println("failed to update producer info of ", key, temp[1])
  99. client.HDel(orderPayKey, key)
  100. tx.Rollback()
  101. continue
  102. }
  103. }
  104. orderItem := entity.OrderDB{}
  105. err1 := db.Get(&orderItem, "select * FROM t_order WHERE id = ?", key)
  106. //回滚库存
  107. var ops = []entity.OrderProduct{}
  108. db.Select(&ops, "select t1.*, t2.stock_switch from t_order_product t1 left join t_product t2 on t1.product_id = t2.id where order_id = ?", key)
  109. for _, op := range ops {
  110. if op.StockSwitch == "ON" {
  111. sql := "update t_product set stock = stock + ? where id = ?"
  112. if orderItem.Source == "sp_zfb" {
  113. sql = "update t_product set zfb_stock = zfb_stock + ? where id = ?"
  114. }
  115. result, _ := tx.Exec(sql, op.Quantity, op.ProductId)
  116. if ra, _ := result.RowsAffected(); ra <= 0 {
  117. tx.Rollback()
  118. log.Println(key + "更新库存失败")
  119. }
  120. }
  121. }
  122. db.Exec("update t_discount_ticket set order_id = null, status = 0 where order_id = ?;", key)
  123. intCmd := client.HDel(orderPayKey, key)
  124. if err := intCmd.Err(); err != nil {
  125. tx.Rollback()
  126. log.Println(err)
  127. continue
  128. }
  129. tx.Commit()
  130. if err1 == nil {
  131. //20210303 添加状态通知
  132. if util.SourceCheck(orderItem.Source) {
  133. log.Printf("%s,send status 9", key)
  134. msg := map[string]interface{}{
  135. "user": "system",
  136. "orderid": orderItem.Id,
  137. "source": orderItem.Source,
  138. "status": constants.ORDERSTATUS_UNUSED,
  139. }
  140. buf, _ := json.Marshal(msg)
  141. client.Publish("order-status-change", string(buf))
  142. }
  143. }
  144. }
  145. }
  146. }
  147. }
  148. }