bulkcopy_sql.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package mssql
  2. import (
  3. "database/sql/driver"
  4. "encoding/json"
  5. "errors"
  6. )
  7. type copyin struct {
  8. cn *MssqlConn
  9. bulkcopy *MssqlBulk
  10. closed bool
  11. }
  12. type SerializableBulkConfig struct {
  13. TableName string
  14. ColumnsName []string
  15. Options MssqlBulkOptions
  16. }
  17. func (d *MssqlDriver) OpenConnection(dsn string) (*MssqlConn, error) {
  18. return d.open(dsn)
  19. }
  20. func (c *MssqlConn) prepareCopyIn(query string) (_ driver.Stmt, err error) {
  21. config_json := query[11:]
  22. bulkconfig := SerializableBulkConfig{}
  23. err = json.Unmarshal([]byte(config_json), &bulkconfig)
  24. if err != nil {
  25. return
  26. }
  27. bulkcopy := c.CreateBulk(bulkconfig.TableName, bulkconfig.ColumnsName)
  28. bulkcopy.Options = bulkconfig.Options
  29. ci := &copyin{
  30. cn: c,
  31. bulkcopy: bulkcopy,
  32. }
  33. return ci, nil
  34. }
  35. func CopyIn(table string, options MssqlBulkOptions, columns ...string) string {
  36. bulkconfig := &SerializableBulkConfig{TableName: table, Options: options, ColumnsName: columns}
  37. config_json, err := json.Marshal(bulkconfig)
  38. if err != nil {
  39. panic(err)
  40. }
  41. stmt := "INSERTBULK " + string(config_json)
  42. return stmt
  43. }
  44. func (ci *copyin) NumInput() int {
  45. return -1
  46. }
  47. func (ci *copyin) Query(v []driver.Value) (r driver.Rows, err error) {
  48. return nil, errors.New("ErrNotSupported")
  49. }
  50. func (ci *copyin) Exec(v []driver.Value) (r driver.Result, err error) {
  51. if ci.closed {
  52. return nil, errors.New("errCopyInClosed")
  53. }
  54. if len(v) == 0 {
  55. rowCount, err := ci.bulkcopy.Done()
  56. ci.closed = true
  57. return driver.RowsAffected(rowCount), err
  58. }
  59. t := make([]interface{}, len(v))
  60. for i, val := range v {
  61. t[i] = val
  62. }
  63. err = ci.bulkcopy.AddRow(t)
  64. if err != nil {
  65. return
  66. }
  67. return driver.RowsAffected(0), nil
  68. }
  69. func (ci *copyin) Close() (err error) {
  70. return nil
  71. }