search.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. package search
  2. import (
  3. "fmt"
  4. "io"
  5. "log"
  6. "net"
  7. "strings"
  8. "time"
  9. "xiaoniaokuaiyan.com/xiaoniao/config"
  10. "github.com/blevesearch/bleve"
  11. _ "github.com/blevesearch/bleve/search/highlight/highlighter/html"
  12. bq "github.com/blevesearch/bleve/search/query"
  13. "gopkg.in/guregu/null.v3"
  14. "gopkg.in/redis.v2"
  15. _ "xiaoniaokuaiyan.com/xiaoniao/search/analyzer"
  16. "xiaoniaokuaiyan.com/xiaoniao/util"
  17. )
  18. var engine bleve.Index
  19. type SearchResult struct {
  20. Total uint64 `json:"total"`
  21. PageIndex int `json:"pageIndex"`
  22. Hits []interface{} `json:"hits"`
  23. Size int `json:"size"`
  24. Took time.Duration `json:"took"`
  25. }
  26. type HitItem struct {
  27. Locations interface{} `json:"-"`
  28. Fragments interface{} `json:"-"`
  29. Fields map[string]interface{} `json:"fields"`
  30. }
  31. type QueryParam struct {
  32. Text string `json:"text"`
  33. CityId uint `json:"cityId"`
  34. PageIndex int `json:"pageIndex"`
  35. Size int `json"size"`
  36. IsHighlight bool `json:"isHighlight"`
  37. CateIds []string `json:"cate_ids"`
  38. SortBy string `json:"sort_by"`
  39. IsSale bool `json:"is_sale"`
  40. IsZFB bool `json:"is_zfb"`
  41. }
  42. const DEFAULT_PAGE_SIZE = 10
  43. func Query(param *QueryParam) (*SearchResult, error) {
  44. //if param.CityId <= 0 {
  45. // return nil, fmt.Errorf("param city id is invalid, passed %v", param.CityId)
  46. //}
  47. text := strings.ToLower(param.Text)
  48. text = strings.Trim(text, " ")
  49. var (
  50. textQ bq.Query
  51. )
  52. if text != "" {
  53. query := bleve.NewQueryStringQuery(fmt.Sprintf("name:%s items:%s keywords:%s", text, text, text))
  54. pyQuery := bleve.NewRegexpQuery(".*" + text + ".*")
  55. pyQuery.SetField("py")
  56. jpQuery := bleve.NewRegexpQuery(".*" + text + ".*")
  57. jpQuery.SetField("jp")
  58. tQ := bleve.NewDisjunctionQuery(
  59. query,
  60. jpQuery,
  61. pyQuery,
  62. )
  63. tQ.SetMin(1)
  64. textQ = tQ
  65. } else {
  66. textQ = bleve.NewMatchAllQuery()
  67. }
  68. var q bq.Query = textQ
  69. cityQ := bleve.NewTermQuery(fmt.Sprintf("%d", param.CityId))
  70. cityQ.SetField("cityIds")
  71. var conjunctionQuery *bq.ConjunctionQuery = bleve.NewConjunctionQuery(textQ)
  72. if param.CityId > 0 {
  73. conjunctionQuery.AddQuery(cityQ)
  74. }
  75. if len(param.CateIds) > 0 {
  76. cateQ := bleve.NewTermQuery(param.CateIds[0])
  77. cateQ.SetField("cates")
  78. conjunctionQuery.AddQuery(cateQ)
  79. }
  80. if param.IsSale {
  81. saleQ := bleve.NewBoolFieldQuery(true)
  82. saleQ.SetField("is_sale")
  83. conjunctionQuery.AddQuery(saleQ)
  84. }
  85. if param.IsZFB {
  86. zfbQ := bleve.NewBoolFieldQuery(true)
  87. zfbQ.SetField("is_zfb")
  88. conjunctionQuery.AddQuery(zfbQ)
  89. }
  90. if len(conjunctionQuery.Conjuncts) > 0 {
  91. q = conjunctionQuery
  92. }
  93. request := bleve.NewSearchRequest(q)
  94. if param.SortBy != "" {
  95. request.SortBy([]string{param.SortBy})
  96. }
  97. //request.Fields = []string{"*"}
  98. if param.IsHighlight {
  99. request.Highlight = bleve.NewHighlightWithStyle("custom")
  100. request.Highlight.Fields = []string{"name", "items"}
  101. }
  102. request.Fields = []string{"id", "name", "price", "picture", "isRecommend", "items", "cityIds", "market_price", "zfb_price"}
  103. request.From = (param.PageIndex - 1) * param.Size
  104. if param.Size <= 0 {
  105. param.Size = DEFAULT_PAGE_SIZE
  106. }
  107. request.Size = param.Size
  108. result, err := engine.Search(request)
  109. if err != nil {
  110. return nil, err
  111. }
  112. searchResult := &SearchResult{
  113. Total: result.Total,
  114. PageIndex: param.PageIndex,
  115. Size: param.Size,
  116. Took: result.Took,
  117. }
  118. for _, hit := range result.Hits {
  119. hitem := &HitItem{
  120. Locations: hit.Locations,
  121. Fragments: hit.Fragments,
  122. Fields: hit.Fields,
  123. }
  124. //替换搜索匹配项
  125. for k, v := range hit.Fragments {
  126. hitem.Fields[k] = v
  127. }
  128. if param.IsZFB {
  129. hitem.Fields["price"] = hitem.Fields["zfb_price"]
  130. }
  131. searchResult.Hits = append(searchResult.Hits, hitem)
  132. }
  133. return searchResult, nil
  134. }
  135. func GetSearchTips(text string, cityId int, isZFB bool) ([]TipItem, error) {
  136. text = strings.ToLower(text)
  137. query := bleve.NewQueryStringQuery(fmt.Sprintf("name:%s items:%s keywords:%s", text, text, text))
  138. cityQ := bleve.NewTermQuery(fmt.Sprintf("%d", cityId))
  139. cityQ.SetField("cityIds")
  140. pyQuery := bleve.NewRegexpQuery(".*" + text + ".*")
  141. pyQuery.SetField("py")
  142. jpQuery := bleve.NewRegexpQuery(".*" + text + ".*")
  143. jpQuery.SetField("jp")
  144. textQ := bleve.NewDisjunctionQuery(
  145. query,
  146. jpQuery,
  147. pyQuery,
  148. )
  149. zfbQ := bleve.NewBoolFieldQuery(true)
  150. zfbQ.SetField("is_zfb")
  151. //var q bq.Query
  152. //if cityId > 0 {
  153. // q = bleve.NewConjunctionQuery(
  154. // textQ,
  155. // cityQ,
  156. // )
  157. //} else {
  158. // q = textQ
  159. //}
  160. q := bleve.NewConjunctionQuery(textQ)
  161. if cityId > 0 {
  162. q.AddQuery(cityQ)
  163. }
  164. if isZFB {
  165. q.AddQuery(zfbQ)
  166. }
  167. request := bleve.NewSearchRequest(q)
  168. request.Fields = []string{"name", "items"}
  169. request.Size = 10
  170. result, err := engine.Search(request)
  171. if err != nil {
  172. return nil, err
  173. }
  174. tipList := []TipItem{}
  175. for _, hit := range result.Hits {
  176. //fmt.Println(hit.Fields["name"])
  177. tipList = append(tipList, TipItem{hit.Fields["name"]})
  178. }
  179. return tipList, nil
  180. }
  181. type TipItem struct {
  182. Tip interface{} `json:"tip"`
  183. }
  184. type productDB struct {
  185. Id string `db:"id" json:"id"`
  186. Name string `db:"name" json:"name"`
  187. Price int `db:"price" json:"price"`
  188. IsRecommend int `db:"is_recommend" json:"isRecommend"`
  189. Keywords string `db:"-" json:"keywords"`
  190. TKeywords null.String `db:"keywords" json:"tkeywords"`
  191. Picture string `db:"picture" json:"picture"`
  192. Items string `db:"-" json:"items"`
  193. CityIds []string `db:"-" json:"cityIds"`
  194. Pinyin string `db:"-" json:"py"`
  195. JP string `db:"-" json:"jp"`
  196. Cates []string `db:"-" json:"cates"`
  197. SaleNum int `db:"sale_num" json:"sale_num"`
  198. SortNo int `db:"sort_no" json:"sort_no"`
  199. CreatedAt string `db:"created_at" json:"created_at"`
  200. PutawayTime string `db:"-" json:"putaway_time"`
  201. TPutawayTime null.String `db:"putaway_time" json:"-"`
  202. MarketPrice int `db:"market_price" json:"market_price"`
  203. IsSale bool `db:"-" json:"is_sale"`
  204. IsZFB bool `db:"is_zfb" json:"is_zfb"`
  205. ZPrice int `db:"zfb_price" json:"zfb_price"`
  206. }
  207. func UpdateIndex() {
  208. var (
  209. client *redis.Client
  210. //productIndex = engine
  211. )
  212. //defer ps.Close()
  213. RESETPUBSUB:
  214. client = util.GetRedis()
  215. ps := client.PubSub()
  216. err := ps.Subscribe("product_trace")
  217. if err != nil {
  218. log.Println("Subscribe error:", err)
  219. return
  220. }
  221. for {
  222. payload, err := ps.ReceiveTimeout(time.Second * 5)
  223. if err != nil {
  224. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  225. continue
  226. }
  227. if err == io.EOF {
  228. ps.Close()
  229. time.Sleep(time.Second * 5)
  230. goto RESETPUBSUB
  231. }
  232. log.Println(err)
  233. continue
  234. }
  235. if payload == nil {
  236. continue
  237. }
  238. switch payload.(type) {
  239. case *redis.Subscription:
  240. case *redis.Message:
  241. msg := payload.(*redis.Message)
  242. if msg.Channel == "product_trace" {
  243. pieces := strings.Split(msg.Payload, "-")
  244. if len(pieces) < 2 {
  245. log.Println("error:wrong product_change message")
  246. break
  247. }
  248. if pieces[1] == "UPDATE" {
  249. err = addProductIdx(pieces[0])
  250. if err != nil {
  251. log.Println(err)
  252. break
  253. }
  254. } else if pieces[1] == "DELETE" {
  255. err = deleteProductIdx(pieces[0])
  256. if err != nil {
  257. log.Println(err)
  258. break
  259. }
  260. } else if pieces[1] == "ALL" {
  261. err = allProductIdx()
  262. if err != nil {
  263. log.Println(err)
  264. break
  265. }
  266. }
  267. }
  268. default:
  269. fmt.Printf("redis: unknown message: %v\n", payload)
  270. }
  271. }
  272. }
  273. func addProductIdx(productId string) error {
  274. strSql := "select id, name,price, is_recommend, picture,keywords, sale_num,putaway_time,sort_no,created_at,market_price,is_zfb,zfb_price from t_product where id = ? and is_putaway=0"
  275. var product = productDB{}
  276. db := util.GetWriteSqlDB()
  277. err := db.Get(&product, strSql, productId)
  278. if err != nil {
  279. return fmt.Errorf("query product %d error: %v", productId, err)
  280. }
  281. strSql = "select t1.name from t_detect_product t1 left join t_product_detect_product t2 on t1.id = t2.detect_product_id where t2.product_id = ?"
  282. itemList := []string{}
  283. db.Select(&itemList, strSql, product.Id)
  284. product.Items = strings.Join(itemList, " ")
  285. /*strSql = "select tag_name from t_product_tag t1 left join t_tag t2 on t1.tag_id = t2.id where t1.product_id = ?"
  286. tagList := []string{}
  287. db.Select(&tagList, strSql, product.Id)
  288. product.Tags = strings.Join(tagList, " ")*/
  289. product.Keywords = product.TKeywords.String
  290. product.PutawayTime = product.TPutawayTime.String
  291. product.IsSale = product.Price != product.MarketPrice
  292. strSql = "select city_id from v_product_city where product_id = ?"
  293. cityIds := []string{}
  294. db.Select(&cityIds, strSql, product.Id)
  295. product.CityIds = cityIds
  296. product.Pinyin, product.JP, _ = util.GetPinyin(product.Name)
  297. strSql = "SELECT cat_id from t_product_category_product where product_id = ?"
  298. var cateIds = []string{}
  299. db.Select(&cateIds, strSql, product.Id)
  300. product.Cates = cateIds
  301. return engine.Index(product.Id, product)
  302. }
  303. func deleteProductIdx(productId string) error {
  304. return engine.Delete(productId)
  305. }
  306. func Init() {
  307. var err error
  308. var INDEX_NAME = config.IniConf.Section("search").Key("IndexName").Value()
  309. fmt.Println(INDEX_NAME)
  310. engine, err = bleve.Open(INDEX_NAME)
  311. if err != nil {
  312. log.Fatal(err)
  313. }
  314. _, err = bleve.Config.Cache.DefineFragmenter("size300", map[string]interface{}{
  315. "type": "simple",
  316. "size": 300.0,
  317. })
  318. if err != nil {
  319. log.Fatal(err)
  320. }
  321. _, err = bleve.Config.Cache.DefineHighlighter("custom", map[string]interface{}{
  322. "type": "simple",
  323. "fragmenter": "size300",
  324. "formatter": "html",
  325. })
  326. if err != nil {
  327. log.Fatal(err)
  328. }
  329. }
  330. func init() {
  331. config.RegistChangeCallback(Init)
  332. }
  333. func allProductIdx() error {
  334. strSql := "select id, name,price, is_recommend, picture, keywords,sale_num,putaway_time,sort_no,created_at,market_price,is_zfb,zfb_price from t_product where is_putaway = 0 and is_delete = 0;"
  335. db := util.GetSqlDB()
  336. productList := []productDB{}
  337. err := db.Select(&productList, strSql)
  338. if err != nil {
  339. log.Fatal(err)
  340. }
  341. for _, product := range productList {
  342. strSql := "select t1.name from t_detect_product t1 left join t_product_detect_product t2 on t1.id = t2.detect_product_id where t2.product_id = " + product.Id
  343. itemList := []string{}
  344. db.Select(&itemList, strSql)
  345. //strSql = "select tag_name from t_product_tag t1 left join t_tag t2 on t1.tag_id = t2.id where t1.product_id = ?"
  346. //tagList := []string{}
  347. //db.Select(&tagList, strSql, product.Id)
  348. strSql = "select city_id from v_product_city where product_id = ?"
  349. cityIds := []string{}
  350. db.Select(&cityIds, strSql, product.Id)
  351. product.Items = strings.Join(itemList, " ")
  352. //product.Tags = strings.Join(tagList, " ")
  353. product.CityIds = cityIds
  354. product.Pinyin, product.JP, _ = util.GetPinyin(product.Name)
  355. product.Keywords = product.TKeywords.String
  356. product.PutawayTime = product.TPutawayTime.String
  357. product.IsSale = product.Price != product.MarketPrice
  358. strSql = "SELECT cat_id from t_product_category_product where product_id = ?"
  359. var cateIds = []string{}
  360. db.Select(&cateIds, strSql, product.Id)
  361. product.Cates = cateIds
  362. err = engine.Index(product.Id, product)
  363. }
  364. return err
  365. }