sqlstore.go 11 KB


  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/grafana/grafana/pkg/bus"
  13. "github.com/grafana/grafana/pkg/log"
  14. m "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/registry"
  16. "github.com/grafana/grafana/pkg/services/annotations"
  17. "github.com/grafana/grafana/pkg/services/cache"
  18. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  19. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  21. "github.com/grafana/grafana/pkg/setting"
  22. "github.com/go-sql-driver/mysql"
  23. "github.com/go-xorm/xorm"
  24. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  25. _ "github.com/lib/pq"
  26. sqlite3 "github.com/mattn/go-sqlite3"
  27. )
  28. var (
  29. x *xorm.Engine
  30. dialect migrator.Dialect
  31. sqlog log.Logger = log.New("sqlstore")
  32. )
  33. const ContextSessionName = "db-session"
  34. func init() {
  35. registry.Register(&registry.Descriptor{
  36. Name: "SqlStore",
  37. Instance: &SqlStore{},
  38. InitPriority: registry.High,
  39. })
  40. }
  41. type SqlStore struct {
  42. Cfg *setting.Cfg `inject:""`
  43. Bus bus.Bus `inject:""`
  44. CacheService *cache.CacheService `inject:""`
  45. dbCfg DatabaseConfig
  46. engine *xorm.Engine
  47. log log.Logger
  48. Dialect migrator.Dialect
  49. skipEnsureAdmin bool
  50. }
  51. // NewSession returns a new DBSession
  52. func (ss *SqlStore) NewSession() *DBSession {
  53. return &DBSession{Session: ss.engine.NewSession()}
  54. }
  55. // WithDbSession calls the callback with an session attached to the context.
  56. func (ss *SqlStore) WithDbSession(ctx context.Context, callback dbTransactionFunc) error {
  57. sess, err := startSession(ctx, ss.engine, false)
  58. if err != nil {
  59. return err
  60. }
  61. return callback(sess)
  62. }
  63. // WithTransactionalDbSession calls the callback with an session within a transaction
  64. func (ss *SqlStore) WithTransactionalDbSession(ctx context.Context, callback dbTransactionFunc) error {
  65. return ss.inTransactionWithRetryCtx(ctx, callback, 0)
  66. }
  67. func (ss *SqlStore) inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
  68. sess, err := startSession(ctx, ss.engine, true)
  69. if err != nil {
  70. return err
  71. }
  72. defer sess.Close()
  73. err = callback(sess)
  74. // special handling of database locked errors for sqlite, then we can retry 3 times
  75. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  76. if sqlError.Code == sqlite3.ErrLocked {
  77. sess.Rollback()
  78. time.Sleep(time.Millisecond * time.Duration(10))
  79. sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
  80. return ss.inTransactionWithRetryCtx(ctx, callback, retry+1)
  81. }
  82. }
  83. if err != nil {
  84. sess.Rollback()
  85. return err
  86. } else if err = sess.Commit(); err != nil {
  87. return err
  88. }
  89. if len(sess.events) > 0 {
  90. for _, e := range sess.events {
  91. if err = bus.Publish(e); err != nil {
  92. log.Error(3, "Failed to publish event after commit. error: %v", err)
  93. }
  94. }
  95. }
  96. return nil
  97. }
  98. func (ss *SqlStore) Init() error {
  99. ss.log = log.New("sqlstore")
  100. ss.readConfig()
  101. engine, err := ss.getEngine()
  102. if err != nil {
  103. return fmt.Errorf("Fail to connect to database: %v", err)
  104. }
  105. ss.engine = engine
  106. ss.Dialect = migrator.NewDialect(ss.engine)
  107. // temporarily still set global var
  108. x = engine
  109. dialect = ss.Dialect
  110. migrator := migrator.NewMigrator(x)
  111. migrations.AddMigrations(migrator)
  112. for _, descriptor := range registry.GetServices() {
  113. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  114. if ok {
  115. sc.AddMigration(migrator)
  116. }
  117. }
  118. if err := migrator.Start(); err != nil {
  119. return fmt.Errorf("Migration failed err: %v", err)
  120. }
  121. // Init repo instances
  122. annotations.SetRepository(&SqlAnnotationRepo{})
  123. ss.Bus.SetTransactionManager(ss)
  124. // Register handlers
  125. ss.addUserQueryAndCommandHandlers()
  126. // ensure admin user
  127. if ss.skipEnsureAdmin {
  128. return nil
  129. }
  130. return ss.ensureAdminUser()
  131. }
  132. func (ss *SqlStore) ensureAdminUser() error {
  133. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  134. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  135. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  136. if err != nil {
  137. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  138. }
  139. if systemUserCountQuery.Result.Count > 0 {
  140. return nil
  141. }
  142. cmd := m.CreateUserCommand{}
  143. cmd.Login = setting.AdminUser
  144. cmd.Email = setting.AdminUser + "@localhost"
  145. cmd.Password = setting.AdminPassword
  146. cmd.IsAdmin = true
  147. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  148. return fmt.Errorf("Failed to create admin user: %v", err)
  149. }
  150. ss.log.Info("Created default admin", "user", setting.AdminUser)
  151. return nil
  152. })
  153. return err
  154. }
  155. func (ss *SqlStore) buildExtraConnectionString(sep rune) string {
  156. if ss.dbCfg.UrlQueryParams == nil {
  157. return ""
  158. }
  159. var sb strings.Builder
  160. for key, values := range ss.dbCfg.UrlQueryParams {
  161. for _, value := range values {
  162. sb.WriteRune(sep)
  163. sb.WriteString(key)
  164. sb.WriteRune('=')
  165. sb.WriteString(value)
  166. }
  167. }
  168. return sb.String()
  169. }
  170. func (ss *SqlStore) buildConnectionString() (string, error) {
  171. cnnstr := ss.dbCfg.ConnectionString
  172. // special case used by integration tests
  173. if cnnstr != "" {
  174. return cnnstr, nil
  175. }
  176. switch ss.dbCfg.Type {
  177. case migrator.MYSQL:
  178. protocol := "tcp"
  179. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  180. protocol = "unix"
  181. }
  182. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  183. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  184. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  185. tlsCert, err := makeCert("custom", ss.dbCfg)
  186. if err != nil {
  187. return "", err
  188. }
  189. mysql.RegisterTLSConfig("custom", tlsCert)
  190. cnnstr += "&tls=custom"
  191. }
  192. cnnstr += ss.buildExtraConnectionString('&')
  193. case migrator.POSTGRES:
  194. var host, port = "127.0.0.1", "5432"
  195. fields := strings.Split(ss.dbCfg.Host, ":")
  196. if len(fields) > 0 && len(strings.TrimSpace(fields[0])) > 0 {
  197. host = fields[0]
  198. }
  199. if len(fields) > 1 && len(strings.TrimSpace(fields[1])) > 0 {
  200. port = fields[1]
  201. }
  202. if ss.dbCfg.Pwd == "" {
  203. ss.dbCfg.Pwd = "''"
  204. }
  205. if ss.dbCfg.User == "" {
  206. ss.dbCfg.User = "''"
  207. }
  208. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  209. cnnstr += ss.buildExtraConnectionString(' ')
  210. case migrator.SQLITE:
  211. // special case for tests
  212. if !filepath.IsAbs(ss.dbCfg.Path) {
  213. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  214. }
  215. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  216. cnnstr += ss.buildExtraConnectionString('&')
  217. default:
  218. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  219. }
  220. return cnnstr, nil
  221. }
  222. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  223. connectionString, err := ss.buildConnectionString()
  224. if err != nil {
  225. return nil, err
  226. }
  227. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  228. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  229. if err != nil {
  230. return nil, err
  231. }
  232. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  233. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  234. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  235. // configure sql logging
  236. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  237. if !debugSql {
  238. engine.SetLogger(&xorm.DiscardLogger{})
  239. } else {
  240. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  241. engine.ShowSQL(true)
  242. engine.ShowExecTime(true)
  243. }
  244. return engine, nil
  245. }
  246. func (ss *SqlStore) readConfig() {
  247. sec := ss.Cfg.Raw.Section("database")
  248. cfgURL := sec.Key("url").String()
  249. if len(cfgURL) != 0 {
  250. dbURL, _ := url.Parse(cfgURL)
  251. ss.dbCfg.Type = dbURL.Scheme
  252. ss.dbCfg.Host = dbURL.Host
  253. pathSplit := strings.Split(dbURL.Path, "/")
  254. if len(pathSplit) > 1 {
  255. ss.dbCfg.Name = pathSplit[1]
  256. }
  257. userInfo := dbURL.User
  258. if userInfo != nil {
  259. ss.dbCfg.User = userInfo.Username()
  260. ss.dbCfg.Pwd, _ = userInfo.Password()
  261. }
  262. ss.dbCfg.UrlQueryParams = dbURL.Query()
  263. } else {
  264. ss.dbCfg.Type = sec.Key("type").String()
  265. ss.dbCfg.Host = sec.Key("host").String()
  266. ss.dbCfg.Name = sec.Key("name").String()
  267. ss.dbCfg.User = sec.Key("user").String()
  268. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  269. ss.dbCfg.Pwd = sec.Key("password").String()
  270. }
  271. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  272. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  273. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  274. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  275. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  276. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  277. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  278. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  279. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  280. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  281. }
  282. func InitTestDB(t *testing.T) *SqlStore {
  283. t.Helper()
  284. sqlstore := &SqlStore{}
  285. sqlstore.skipEnsureAdmin = true
  286. sqlstore.Bus = bus.New()
  287. sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
  288. dbType := migrator.SQLITE
  289. // environment variable present for test db?
  290. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  291. dbType = db
  292. }
  293. // set test db config
  294. sqlstore.Cfg = setting.NewCfg()
  295. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  296. sec.NewKey("type", dbType)
  297. switch dbType {
  298. case "mysql":
  299. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  300. case "postgres":
  301. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  302. default:
  303. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  304. }
  305. // need to get engine to clean db before we init
  306. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  307. if err != nil {
  308. t.Fatalf("Failed to init test database: %v", err)
  309. }
  310. sqlstore.Dialect = migrator.NewDialect(engine)
  311. // temp global var until we get rid of global vars
  312. dialect = sqlstore.Dialect
  313. if err := dialect.CleanDB(); err != nil {
  314. t.Fatalf("Failed to clean test db %v", err)
  315. }
  316. if err := sqlstore.Init(); err != nil {
  317. t.Fatalf("Failed to init test database: %v", err)
  318. }
  319. sqlstore.engine.DatabaseTZ = time.UTC
  320. sqlstore.engine.TZLocation = time.UTC
  321. return sqlstore
  322. }
  323. func IsTestDbMySql() bool {
  324. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  325. return db == migrator.MYSQL
  326. }
  327. return false
  328. }
  329. func IsTestDbPostgres() bool {
  330. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  331. return db == migrator.POSTGRES
  332. }
  333. return false
  334. }
  335. type DatabaseConfig struct {
  336. Type string
  337. Host string
  338. Name string
  339. User string
  340. Pwd string
  341. Path string
  342. SslMode string
  343. CaCertPath string
  344. ClientKeyPath string
  345. ClientCertPath string
  346. ServerCertName string
  347. ConnectionString string
  348. MaxOpenConn int
  349. MaxIdleConn int
  350. ConnMaxLifetime int
  351. CacheMode string
  352. UrlQueryParams map[string][]string
  353. }