sqlstore.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/grafana/grafana/pkg/bus"
  13. "github.com/grafana/grafana/pkg/log"
  14. m "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/registry"
  16. "github.com/grafana/grafana/pkg/services/annotations"
  17. "github.com/grafana/grafana/pkg/services/cache"
  18. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  19. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  21. "github.com/grafana/grafana/pkg/setting"
  22. "github.com/go-sql-driver/mysql"
  23. "github.com/go-xorm/xorm"
  24. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  25. _ "github.com/lib/pq"
  26. sqlite3 "github.com/mattn/go-sqlite3"
  27. )
  28. var (
  29. x *xorm.Engine
  30. dialect migrator.Dialect
  31. sqlog log.Logger = log.New("sqlstore")
  32. )
  33. const ContextSessionName = "db-session"
  34. func init() {
  35. registry.Register(&registry.Descriptor{
  36. Name: "SqlStore",
  37. Instance: &SqlStore{},
  38. InitPriority: registry.High,
  39. })
  40. }
  41. type SqlStore struct {
  42. Cfg *setting.Cfg `inject:""`
  43. Bus bus.Bus `inject:""`
  44. CacheService *cache.CacheService `inject:""`
  45. dbCfg DatabaseConfig
  46. engine *xorm.Engine
  47. log log.Logger
  48. Dialect migrator.Dialect
  49. skipEnsureAdmin bool
  50. }
  51. // NewSession returns a new DBSession
  52. func (ss *SqlStore) NewSession() *DBSession {
  53. return &DBSession{Session: ss.engine.NewSession()}
  54. }
  55. // WithDbSession calls the callback with an session attached to the context.
  56. func (ss *SqlStore) WithDbSession(ctx context.Context, callback dbTransactionFunc) error {
  57. sess, err := startSession(ctx, ss.engine, false)
  58. if err != nil {
  59. return err
  60. }
  61. return callback(sess)
  62. }
  63. // WithTransactionalDbSession calls the callback with an session within a transaction
  64. func (ss *SqlStore) WithTransactionalDbSession(ctx context.Context, callback dbTransactionFunc) error {
  65. return ss.inTransactionWithRetryCtx(ctx, callback, 0)
  66. }
  67. func (ss *SqlStore) inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
  68. sess, err := startSession(ctx, ss.engine, true)
  69. if err != nil {
  70. return err
  71. }
  72. defer sess.Close()
  73. err = callback(sess)
  74. // special handling of database locked errors for sqlite, then we can retry 3 times
  75. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  76. if sqlError.Code == sqlite3.ErrLocked {
  77. sess.Rollback()
  78. time.Sleep(time.Millisecond * time.Duration(10))
  79. sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
  80. return ss.inTransactionWithRetryCtx(ctx, callback, retry+1)
  81. }
  82. }
  83. if err != nil {
  84. sess.Rollback()
  85. return err
  86. } else if err = sess.Commit(); err != nil {
  87. return err
  88. }
  89. if len(sess.events) > 0 {
  90. for _, e := range sess.events {
  91. if err = bus.Publish(e); err != nil {
  92. log.Error(3, "Failed to publish event after commit. error: %v", err)
  93. }
  94. }
  95. }
  96. return nil
  97. }
  98. func (ss *SqlStore) Init() error {
  99. ss.log = log.New("sqlstore")
  100. ss.readConfig()
  101. engine, err := ss.getEngine()
  102. if err != nil {
  103. return fmt.Errorf("Fail to connect to database: %v", err)
  104. }
  105. ss.engine = engine
  106. ss.Dialect = migrator.NewDialect(ss.engine)
  107. // temporarily still set global var
  108. x = engine
  109. dialect = ss.Dialect
  110. migrator := migrator.NewMigrator(x)
  111. migrations.AddMigrations(migrator)
  112. for _, descriptor := range registry.GetServices() {
  113. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  114. if ok {
  115. sc.AddMigration(migrator)
  116. }
  117. }
  118. if err := migrator.Start(); err != nil {
  119. return fmt.Errorf("Migration failed err: %v", err)
  120. }
  121. // Init repo instances
  122. annotations.SetRepository(&SqlAnnotationRepo{})
  123. ss.Bus.SetTransactionManager(ss)
  124. // Register handlers
  125. ss.addUserQueryAndCommandHandlers()
  126. // ensure admin user
  127. if ss.skipEnsureAdmin {
  128. return nil
  129. }
  130. return ss.ensureAdminUser()
  131. }
  132. func (ss *SqlStore) ensureAdminUser() error {
  133. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  134. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  135. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  136. if err != nil {
  137. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  138. }
  139. if systemUserCountQuery.Result.Count > 0 {
  140. return nil
  141. }
  142. cmd := m.CreateUserCommand{}
  143. cmd.Login = setting.AdminUser
  144. cmd.Email = setting.AdminUser + "@localhost"
  145. cmd.Password = setting.AdminPassword
  146. cmd.IsAdmin = true
  147. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  148. return fmt.Errorf("Failed to create admin user: %v", err)
  149. }
  150. ss.log.Info("Created default admin", "user", setting.AdminUser)
  151. return nil
  152. })
  153. return err
  154. }
  155. func (ss *SqlStore) buildConnectionString() (string, error) {
  156. cnnstr := ss.dbCfg.ConnectionString
  157. // special case used by integration tests
  158. if cnnstr != "" {
  159. return cnnstr, nil
  160. }
  161. switch ss.dbCfg.Type {
  162. case migrator.MYSQL:
  163. protocol := "tcp"
  164. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  165. protocol = "unix"
  166. }
  167. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  168. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  169. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  170. tlsCert, err := makeCert("custom", ss.dbCfg)
  171. if err != nil {
  172. return "", err
  173. }
  174. mysql.RegisterTLSConfig("custom", tlsCert)
  175. cnnstr += "&tls=custom"
  176. }
  177. case migrator.POSTGRES:
  178. var host, port = "127.0.0.1", "5432"
  179. fields := strings.Split(ss.dbCfg.Host, ":")
  180. if len(fields) > 0 && len(strings.TrimSpace(fields[0])) > 0 {
  181. host = fields[0]
  182. }
  183. if len(fields) > 1 && len(strings.TrimSpace(fields[1])) > 0 {
  184. port = fields[1]
  185. }
  186. if ss.dbCfg.Pwd == "" {
  187. ss.dbCfg.Pwd = "''"
  188. }
  189. if ss.dbCfg.User == "" {
  190. ss.dbCfg.User = "''"
  191. }
  192. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  193. case migrator.SQLITE:
  194. // special case for tests
  195. if !filepath.IsAbs(ss.dbCfg.Path) {
  196. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  197. }
  198. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  199. cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
  200. default:
  201. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  202. }
  203. return cnnstr, nil
  204. }
  205. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  206. connectionString, err := ss.buildConnectionString()
  207. if err != nil {
  208. return nil, err
  209. }
  210. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  211. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  212. if err != nil {
  213. return nil, err
  214. }
  215. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  216. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  217. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  218. // configure sql logging
  219. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  220. if !debugSql {
  221. engine.SetLogger(&xorm.DiscardLogger{})
  222. } else {
  223. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  224. engine.ShowSQL(true)
  225. engine.ShowExecTime(true)
  226. }
  227. return engine, nil
  228. }
  229. func (ss *SqlStore) readConfig() {
  230. sec := ss.Cfg.Raw.Section("database")
  231. cfgURL := sec.Key("url").String()
  232. if len(cfgURL) != 0 {
  233. dbURL, _ := url.Parse(cfgURL)
  234. ss.dbCfg.Type = dbURL.Scheme
  235. ss.dbCfg.Host = dbURL.Host
  236. pathSplit := strings.Split(dbURL.Path, "/")
  237. if len(pathSplit) > 1 {
  238. ss.dbCfg.Name = pathSplit[1]
  239. }
  240. userInfo := dbURL.User
  241. if userInfo != nil {
  242. ss.dbCfg.User = userInfo.Username()
  243. ss.dbCfg.Pwd, _ = userInfo.Password()
  244. }
  245. } else {
  246. ss.dbCfg.Type = sec.Key("type").String()
  247. ss.dbCfg.Host = sec.Key("host").String()
  248. ss.dbCfg.Name = sec.Key("name").String()
  249. ss.dbCfg.User = sec.Key("user").String()
  250. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  251. ss.dbCfg.Pwd = sec.Key("password").String()
  252. }
  253. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  254. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  255. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  256. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  257. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  258. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  259. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  260. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  261. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  262. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  263. }
  264. func InitTestDB(t *testing.T) *SqlStore {
  265. t.Helper()
  266. sqlstore := &SqlStore{}
  267. sqlstore.skipEnsureAdmin = true
  268. sqlstore.Bus = bus.New()
  269. sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
  270. dbType := migrator.SQLITE
  271. // environment variable present for test db?
  272. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  273. dbType = db
  274. }
  275. // set test db config
  276. sqlstore.Cfg = setting.NewCfg()
  277. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  278. sec.NewKey("type", dbType)
  279. switch dbType {
  280. case "mysql":
  281. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  282. case "postgres":
  283. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  284. default:
  285. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  286. }
  287. // need to get engine to clean db before we init
  288. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  289. if err != nil {
  290. t.Fatalf("Failed to init test database: %v", err)
  291. }
  292. sqlstore.Dialect = migrator.NewDialect(engine)
  293. // temp global var until we get rid of global vars
  294. dialect = sqlstore.Dialect
  295. if err := dialect.CleanDB(); err != nil {
  296. t.Fatalf("Failed to clean test db %v", err)
  297. }
  298. if err := sqlstore.Init(); err != nil {
  299. t.Fatalf("Failed to init test database: %v", err)
  300. }
  301. sqlstore.engine.DatabaseTZ = time.UTC
  302. sqlstore.engine.TZLocation = time.UTC
  303. return sqlstore
  304. }
  305. func IsTestDbMySql() bool {
  306. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  307. return db == migrator.MYSQL
  308. }
  309. return false
  310. }
  311. func IsTestDbPostgres() bool {
  312. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  313. return db == migrator.POSTGRES
  314. }
  315. return false
  316. }
  317. type DatabaseConfig struct {
  318. Type string
  319. Host string
  320. Name string
  321. User string
  322. Pwd string
  323. Path string
  324. SslMode string
  325. CaCertPath string
  326. ClientKeyPath string
  327. ClientCertPath string
  328. ServerCertName string
  329. ConnectionString string
  330. MaxOpenConn int
  331. MaxIdleConn int
  332. ConnMaxLifetime int
  333. CacheMode string
  334. }