sqlstore.go 11 KB


  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/grafana/grafana/pkg/bus"
  13. "github.com/grafana/grafana/pkg/log"
  14. m "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/registry"
  16. "github.com/grafana/grafana/pkg/services/annotations"
  17. "github.com/grafana/grafana/pkg/services/cache"
  18. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  19. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  21. "github.com/grafana/grafana/pkg/setting"
  22. "github.com/grafana/grafana/pkg/util"
  23. "github.com/go-sql-driver/mysql"
  24. "github.com/go-xorm/xorm"
  25. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  26. _ "github.com/lib/pq"
  27. sqlite3 "github.com/mattn/go-sqlite3"
  28. )
  29. var (
  30. x *xorm.Engine
  31. dialect migrator.Dialect
  32. sqlog log.Logger = log.New("sqlstore")
  33. )
  34. const ContextSessionName = "db-session"
  35. func init() {
  36. registry.Register(&registry.Descriptor{
  37. Name: "SqlStore",
  38. Instance: &SqlStore{},
  39. InitPriority: registry.High,
  40. })
  41. }
  42. type SqlStore struct {
  43. Cfg *setting.Cfg `inject:""`
  44. Bus bus.Bus `inject:""`
  45. CacheService *cache.CacheService `inject:""`
  46. dbCfg DatabaseConfig
  47. engine *xorm.Engine
  48. log log.Logger
  49. Dialect migrator.Dialect
  50. skipEnsureAdmin bool
  51. }
  52. // NewSession returns a new DBSession
  53. func (ss *SqlStore) NewSession() *DBSession {
  54. return &DBSession{Session: ss.engine.NewSession()}
  55. }
  56. // WithDbSession calls the callback with an session attached to the context.
  57. func (ss *SqlStore) WithDbSession(ctx context.Context, callback dbTransactionFunc) error {
  58. sess, err := startSession(ctx, ss.engine, false)
  59. if err != nil {
  60. return err
  61. }
  62. return callback(sess)
  63. }
  64. // WithTransactionalDbSession calls the callback with an session within a transaction
  65. func (ss *SqlStore) WithTransactionalDbSession(ctx context.Context, callback dbTransactionFunc) error {
  66. return ss.inTransactionWithRetryCtx(ctx, callback, 0)
  67. }
  68. func (ss *SqlStore) inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
  69. sess, err := startSession(ctx, ss.engine, true)
  70. if err != nil {
  71. return err
  72. }
  73. defer sess.Close()
  74. err = callback(sess)
  75. // special handling of database locked errors for sqlite, then we can retry 3 times
  76. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  77. if sqlError.Code == sqlite3.ErrLocked {
  78. sess.Rollback()
  79. time.Sleep(time.Millisecond * time.Duration(10))
  80. sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
  81. return ss.inTransactionWithRetryCtx(ctx, callback, retry+1)
  82. }
  83. }
  84. if err != nil {
  85. sess.Rollback()
  86. return err
  87. } else if err = sess.Commit(); err != nil {
  88. return err
  89. }
  90. if len(sess.events) > 0 {
  91. for _, e := range sess.events {
  92. if err = bus.Publish(e); err != nil {
  93. log.Error(3, "Failed to publish event after commit. error: %v", err)
  94. }
  95. }
  96. }
  97. return nil
  98. }
  99. func (ss *SqlStore) Init() error {
  100. ss.log = log.New("sqlstore")
  101. ss.readConfig()
  102. engine, err := ss.getEngine()
  103. if err != nil {
  104. return fmt.Errorf("Fail to connect to database: %v", err)
  105. }
  106. ss.engine = engine
  107. ss.Dialect = migrator.NewDialect(ss.engine)
  108. // temporarily still set global var
  109. x = engine
  110. dialect = ss.Dialect
  111. migrator := migrator.NewMigrator(x)
  112. migrations.AddMigrations(migrator)
  113. for _, descriptor := range registry.GetServices() {
  114. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  115. if ok {
  116. sc.AddMigration(migrator)
  117. }
  118. }
  119. if err := migrator.Start(); err != nil {
  120. return fmt.Errorf("Migration failed err: %v", err)
  121. }
  122. // Init repo instances
  123. annotations.SetRepository(&SqlAnnotationRepo{})
  124. ss.Bus.SetTransactionManager(ss)
  125. // Register handlers
  126. ss.addUserQueryAndCommandHandlers()
  127. // ensure admin user
  128. if ss.skipEnsureAdmin {
  129. return nil
  130. }
  131. return ss.ensureAdminUser()
  132. }
  133. func (ss *SqlStore) ensureAdminUser() error {
  134. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  135. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  136. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  137. if err != nil {
  138. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  139. }
  140. if systemUserCountQuery.Result.Count > 0 {
  141. return nil
  142. }
  143. cmd := m.CreateUserCommand{}
  144. cmd.Login = setting.AdminUser
  145. cmd.Email = setting.AdminUser + "@localhost"
  146. cmd.Password = setting.AdminPassword
  147. cmd.IsAdmin = true
  148. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  149. return fmt.Errorf("Failed to create admin user: %v", err)
  150. }
  151. ss.log.Info("Created default admin", "user", setting.AdminUser)
  152. return nil
  153. })
  154. return err
  155. }
  156. func (ss *SqlStore) buildExtraConnectionString(sep rune) string {
  157. if ss.dbCfg.UrlQueryParams == nil {
  158. return ""
  159. }
  160. var sb strings.Builder
  161. for key, values := range ss.dbCfg.UrlQueryParams {
  162. for _, value := range values {
  163. sb.WriteRune(sep)
  164. sb.WriteString(key)
  165. sb.WriteRune('=')
  166. sb.WriteString(value)
  167. }
  168. }
  169. return sb.String()
  170. }
  171. func (ss *SqlStore) buildConnectionString() (string, error) {
  172. cnnstr := ss.dbCfg.ConnectionString
  173. // special case used by integration tests
  174. if cnnstr != "" {
  175. return cnnstr, nil
  176. }
  177. switch ss.dbCfg.Type {
  178. case migrator.MYSQL:
  179. protocol := "tcp"
  180. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  181. protocol = "unix"
  182. }
  183. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  184. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  185. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  186. tlsCert, err := makeCert("custom", ss.dbCfg)
  187. if err != nil {
  188. return "", err
  189. }
  190. mysql.RegisterTLSConfig("custom", tlsCert)
  191. cnnstr += "&tls=custom"
  192. }
  193. cnnstr += ss.buildExtraConnectionString('&')
  194. case migrator.POSTGRES:
  195. host, port := util.SplitHostPortDefault(ss.dbCfg.Host, "127.0.0.1", "5432")
  196. if ss.dbCfg.Pwd == "" {
  197. ss.dbCfg.Pwd = "''"
  198. }
  199. if ss.dbCfg.User == "" {
  200. ss.dbCfg.User = "''"
  201. }
  202. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  203. cnnstr += ss.buildExtraConnectionString(' ')
  204. case migrator.SQLITE:
  205. // special case for tests
  206. if !filepath.IsAbs(ss.dbCfg.Path) {
  207. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  208. }
  209. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  210. cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
  211. cnnstr += ss.buildExtraConnectionString('&')
  212. default:
  213. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  214. }
  215. return cnnstr, nil
  216. }
  217. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  218. connectionString, err := ss.buildConnectionString()
  219. if err != nil {
  220. return nil, err
  221. }
  222. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  223. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  224. if err != nil {
  225. return nil, err
  226. }
  227. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  228. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  229. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  230. // configure sql logging
  231. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  232. if !debugSql {
  233. engine.SetLogger(&xorm.DiscardLogger{})
  234. } else {
  235. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  236. engine.ShowSQL(true)
  237. engine.ShowExecTime(true)
  238. }
  239. return engine, nil
  240. }
  241. func (ss *SqlStore) readConfig() {
  242. sec := ss.Cfg.Raw.Section("database")
  243. cfgURL := sec.Key("url").String()
  244. if len(cfgURL) != 0 {
  245. dbURL, _ := url.Parse(cfgURL)
  246. ss.dbCfg.Type = dbURL.Scheme
  247. ss.dbCfg.Host = dbURL.Host
  248. pathSplit := strings.Split(dbURL.Path, "/")
  249. if len(pathSplit) > 1 {
  250. ss.dbCfg.Name = pathSplit[1]
  251. }
  252. userInfo := dbURL.User
  253. if userInfo != nil {
  254. ss.dbCfg.User = userInfo.Username()
  255. ss.dbCfg.Pwd, _ = userInfo.Password()
  256. }
  257. ss.dbCfg.UrlQueryParams = dbURL.Query()
  258. } else {
  259. ss.dbCfg.Type = sec.Key("type").String()
  260. ss.dbCfg.Host = sec.Key("host").String()
  261. ss.dbCfg.Name = sec.Key("name").String()
  262. ss.dbCfg.User = sec.Key("user").String()
  263. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  264. ss.dbCfg.Pwd = sec.Key("password").String()
  265. }
  266. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  267. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  268. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  269. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  270. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  271. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  272. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  273. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  274. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  275. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  276. }
  277. func InitTestDB(t *testing.T) *SqlStore {
  278. t.Helper()
  279. sqlstore := &SqlStore{}
  280. sqlstore.skipEnsureAdmin = true
  281. sqlstore.Bus = bus.New()
  282. sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
  283. dbType := migrator.SQLITE
  284. // environment variable present for test db?
  285. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  286. dbType = db
  287. }
  288. // set test db config
  289. sqlstore.Cfg = setting.NewCfg()
  290. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  291. sec.NewKey("type", dbType)
  292. switch dbType {
  293. case "mysql":
  294. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  295. case "postgres":
  296. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  297. default:
  298. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  299. }
  300. // need to get engine to clean db before we init
  301. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  302. if err != nil {
  303. t.Fatalf("Failed to init test database: %v", err)
  304. }
  305. sqlstore.Dialect = migrator.NewDialect(engine)
  306. // temp global var until we get rid of global vars
  307. dialect = sqlstore.Dialect
  308. if err := dialect.CleanDB(); err != nil {
  309. t.Fatalf("Failed to clean test db %v", err)
  310. }
  311. if err := sqlstore.Init(); err != nil {
  312. t.Fatalf("Failed to init test database: %v", err)
  313. }
  314. sqlstore.engine.DatabaseTZ = time.UTC
  315. sqlstore.engine.TZLocation = time.UTC
  316. return sqlstore
  317. }
  318. func IsTestDbMySql() bool {
  319. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  320. return db == migrator.MYSQL
  321. }
  322. return false
  323. }
  324. func IsTestDbPostgres() bool {
  325. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  326. return db == migrator.POSTGRES
  327. }
  328. return false
  329. }
  330. type DatabaseConfig struct {
  331. Type string
  332. Host string
  333. Name string
  334. User string
  335. Pwd string
  336. Path string
  337. SslMode string
  338. CaCertPath string
  339. ClientKeyPath string
  340. ClientCertPath string
  341. ServerCertName string
  342. ConnectionString string
  343. MaxOpenConn int
  344. MaxIdleConn int
  345. ConnMaxLifetime int
  346. CacheMode string
  347. UrlQueryParams map[string][]string
  348. }